|
- #!/usr/bin/env python3
- #
- # Copyright (C) 2015 The Android Open Source Project
- #
- # Licensed under the Apache License, Version 2.0 (the "License");
- # you may not use this file except in compliance with the License.
- # You may obtain a copy of the License at
- #
- # http://www.apache.org/licenses/LICENSE-2.0
- #
- # Unless required by applicable law or agreed to in writing, software
- # distributed under the License is distributed on an "AS IS" BASIS,
- # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- # See the License for the specific language governing permissions and
- # limitations under the License.
- #
- """Tests for the adb program itself.
- This differs from things in test_device.py in that there is no API for these
- things. Most of these tests involve specific error messages or the help text.
- """
- import contextlib
- import os
- import random
- import select
- import socket
- import struct
- import subprocess
- import sys
- import threading
- import time
- import unittest
- import warnings
- @contextlib.contextmanager
- def fake_adbd(protocol=socket.AF_INET, port=0):
- """Creates a fake ADB daemon that just replies with a CNXN packet."""
- serversock = socket.socket(protocol, socket.SOCK_STREAM)
- serversock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
- if protocol == socket.AF_INET:
- serversock.bind(("127.0.0.1", port))
- else:
- serversock.bind(("::1", port))
- serversock.listen(1)
- # A pipe that is used to signal the thread that it should terminate.
- readsock, writesock = socket.socketpair()
- def _adb_packet(command: bytes, arg0: int, arg1: int, data: bytes) -> bytes:
- bin_command = struct.unpack("I", command)[0]
- buf = struct.pack("IIIIII", bin_command, arg0, arg1, len(data), 0,
- bin_command ^ 0xffffffff)
- buf += data
- return buf
- def _handle(sock):
- with contextlib.closing(sock) as serversock:
- rlist = [readsock, serversock]
- cnxn_sent = {}
- while True:
- read_ready, _, _ = select.select(rlist, [], [])
- for ready in read_ready:
- if ready == readsock:
- # Closure pipe
- for f in rlist:
- f.close()
- return
- elif ready == serversock:
- # Server socket
- conn, _ = ready.accept()
- rlist.append(conn)
- else:
- # Client socket
- data = ready.recv(1024)
- if not data or data.startswith(b"OPEN"):
- if ready in cnxn_sent:
- del cnxn_sent[ready]
- ready.shutdown(socket.SHUT_RDWR)
- ready.close()
- rlist.remove(ready)
- continue
- if ready in cnxn_sent:
- continue
- cnxn_sent[ready] = True
- ready.sendall(_adb_packet(b"CNXN", 0x01000001, 1024 * 1024,
- b"device::ro.product.name=fakeadb"))
- port = serversock.getsockname()[1]
- server_thread = threading.Thread(target=_handle, args=(serversock,))
- server_thread.start()
- try:
- yield port, writesock
- finally:
- writesock.close()
- server_thread.join()
- @contextlib.contextmanager
- def adb_connect(unittest, serial):
- """Context manager for an ADB connection.
- This automatically disconnects when done with the connection.
- """
- output = subprocess.check_output(["adb", "connect", serial])
- unittest.assertEqual(output.strip(),
- "connected to {}".format(serial).encode("utf8"))
- try:
- yield
- finally:
- # Perform best-effort disconnection. Discard the output.
- subprocess.Popen(["adb", "disconnect", serial],
- stdout=subprocess.PIPE,
- stderr=subprocess.PIPE).communicate()
- @contextlib.contextmanager
- def adb_server():
- """Context manager for an ADB server.
- This creates an ADB server and returns the port it's listening on.
- """
- port = 5038
- # Kill any existing server on this non-default port.
- subprocess.check_output(["adb", "-P", str(port), "kill-server"],
- stderr=subprocess.STDOUT)
- read_pipe, write_pipe = os.pipe()
- if sys.platform == "win32":
- import msvcrt
- write_handle = msvcrt.get_osfhandle(write_pipe)
- os.set_handle_inheritable(write_handle, True)
- reply_fd = str(write_handle)
- else:
- os.set_inheritable(write_pipe, True)
- reply_fd = str(write_pipe)
- proc = subprocess.Popen(["adb", "-L", "tcp:localhost:{}".format(port),
- "fork-server", "server",
- "--reply-fd", reply_fd], close_fds=False)
- try:
- os.close(write_pipe)
- greeting = os.read(read_pipe, 1024)
- assert greeting == b"OK\n", repr(greeting)
- yield port
- finally:
- proc.terminate()
- proc.wait()
- class CommandlineTest(unittest.TestCase):
- """Tests for the ADB commandline."""
- def test_help(self):
- """Make sure we get _something_ out of help."""
- out = subprocess.check_output(
- ["adb", "help"], stderr=subprocess.STDOUT)
- self.assertGreater(len(out), 0)
- def test_version(self):
- """Get a version number out of the output of adb."""
- lines = subprocess.check_output(["adb", "version"]).splitlines()
- version_line = lines[0]
- self.assertRegex(
- version_line, rb"^Android Debug Bridge version \d+\.\d+\.\d+$")
- if len(lines) == 2:
- # Newer versions of ADB have a second line of output for the
- # version that includes a specific revision (git SHA).
- revision_line = lines[1]
- self.assertRegex(
- revision_line, rb"^Revision [0-9a-f]{12}-android$")
- def test_tcpip_error_messages(self):
- """Make sure 'adb tcpip' parsing is sane."""
- proc = subprocess.Popen(["adb", "tcpip"],
- stdout=subprocess.PIPE,
- stderr=subprocess.STDOUT)
- out, _ = proc.communicate()
- self.assertEqual(1, proc.returncode)
- self.assertIn(b"requires an argument", out)
- proc = subprocess.Popen(["adb", "tcpip", "foo"],
- stdout=subprocess.PIPE,
- stderr=subprocess.STDOUT)
- out, _ = proc.communicate()
- self.assertEqual(1, proc.returncode)
- self.assertIn(b"invalid port", out)
- class ServerTest(unittest.TestCase):
- """Tests for the ADB server."""
- @staticmethod
- def _read_pipe_and_set_event(pipe, event):
- """Reads a pipe until it is closed, then sets the event."""
- pipe.read()
- event.set()
- def test_handle_inheritance(self):
- """Test that launch_server() does not inherit handles.
- launch_server() should not let the adb server inherit
- stdin/stdout/stderr handles, which can cause callers of adb.exe to hang.
- This test also runs fine on unix even though the impetus is an issue
- unique to Windows.
- """
- # This test takes 5 seconds to run on Windows: if there is no adb server
- # running on the the port used below, adb kill-server tries to make a
- # TCP connection to a closed port and that takes 1 second on Windows;
- # adb start-server does the same TCP connection which takes another
- # second, and it waits 3 seconds after starting the server.
- # Start adb client with redirected stdin/stdout/stderr to check if it
- # passes those redirections to the adb server that it starts. To do
- # this, run an instance of the adb server on a non-default port so we
- # don't conflict with a pre-existing adb server that may already be
- # setup with adb TCP/emulator connections. If there is a pre-existing
- # adb server, this also tests whether multiple instances of the adb
- # server conflict on adb.log.
- port = 5038
- # Kill any existing server on this non-default port.
- subprocess.check_output(["adb", "-P", str(port), "kill-server"],
- stderr=subprocess.STDOUT)
- try:
- # We get warnings for unclosed files for the subprocess's pipes,
- # and it's somewhat cumbersome to close them, so just ignore this.
- warnings.simplefilter("ignore", ResourceWarning)
- # Run the adb client and have it start the adb server.
- proc = subprocess.Popen(["adb", "-P", str(port), "start-server"],
- stdin=subprocess.PIPE,
- stdout=subprocess.PIPE,
- stderr=subprocess.PIPE)
- # Start threads that set events when stdout/stderr are closed.
- stdout_event = threading.Event()
- stdout_thread = threading.Thread(
- target=ServerTest._read_pipe_and_set_event,
- args=(proc.stdout, stdout_event))
- stdout_thread.start()
- stderr_event = threading.Event()
- stderr_thread = threading.Thread(
- target=ServerTest._read_pipe_and_set_event,
- args=(proc.stderr, stderr_event))
- stderr_thread.start()
- # Wait for the adb client to finish. Once that has occurred, if
- # stdin/stderr/stdout are still open, it must be open in the adb
- # server.
- proc.wait()
- # Try to write to stdin which we expect is closed. If it isn't
- # closed, we should get an IOError. If we don't get an IOError,
- # stdin must still be open in the adb server. The adb client is
- # probably letting the adb server inherit stdin which would be
- # wrong.
- with self.assertRaises(IOError):
- proc.stdin.write(b"x")
- proc.stdin.flush()
- # Wait a few seconds for stdout/stderr to be closed (in the success
- # case, this won't wait at all). If there is a timeout, that means
- # stdout/stderr were not closed and and they must be open in the adb
- # server, suggesting that the adb client is letting the adb server
- # inherit stdout/stderr which would be wrong.
- self.assertTrue(stdout_event.wait(5), "adb stdout not closed")
- self.assertTrue(stderr_event.wait(5), "adb stderr not closed")
- stdout_thread.join()
- stderr_thread.join()
- finally:
- # If we started a server, kill it.
- subprocess.check_output(["adb", "-P", str(port), "kill-server"],
- stderr=subprocess.STDOUT)
- class EmulatorTest(unittest.TestCase):
- """Tests for the emulator connection."""
- def _reset_socket_on_close(self, sock):
- """Use SO_LINGER to cause TCP RST segment to be sent on socket close."""
- # The linger structure is two shorts on Windows, but two ints on Unix.
- linger_format = "hh" if os.name == "nt" else "ii"
- l_onoff = 1
- l_linger = 0
- sock.setsockopt(socket.SOL_SOCKET, socket.SO_LINGER,
- struct.pack(linger_format, l_onoff, l_linger))
- # Verify that we set the linger structure properly by retrieving it.
- linger = sock.getsockopt(socket.SOL_SOCKET, socket.SO_LINGER, 16)
- self.assertEqual((l_onoff, l_linger),
- struct.unpack_from(linger_format, linger))
- def test_emu_kill(self):
- """Ensure that adb emu kill works.
- Bug: https://code.google.com/p/android/issues/detail?id=21021
- """
- with contextlib.closing(
- socket.socket(socket.AF_INET, socket.SOCK_STREAM)) as listener:
- # Use SO_REUSEADDR so subsequent runs of the test can grab the port
- # even if it is in TIME_WAIT.
- listener.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
- listener.bind(("127.0.0.1", 0))
- listener.listen(4)
- port = listener.getsockname()[1]
- # Now that listening has started, start adb emu kill, telling it to
- # connect to our mock emulator.
- proc = subprocess.Popen(
- ["adb", "-s", "emulator-" + str(port), "emu", "kill"],
- stderr=subprocess.STDOUT)
- accepted_connection, addr = listener.accept()
- with contextlib.closing(accepted_connection) as conn:
- # If WSAECONNABORTED (10053) is raised by any socket calls,
- # then adb probably isn't reading the data that we sent it.
- conn.sendall(("Android Console: type 'help' for a list "
- "of commands\r\n").encode("utf8"))
- conn.sendall(b"OK\r\n")
- with contextlib.closing(conn.makefile()) as connf:
- line = connf.readline()
- if line.startswith("auth"):
- # Ignore the first auth line.
- line = connf.readline()
- self.assertEqual("kill\n", line)
- self.assertEqual("quit\n", connf.readline())
- conn.sendall(b"OK: killing emulator, bye bye\r\n")
- # Use SO_LINGER to send TCP RST segment to test whether adb
- # ignores WSAECONNRESET on Windows. This happens with the
- # real emulator because it just calls exit() without closing
- # the socket or calling shutdown(SD_SEND). At process
- # termination, Windows sends a TCP RST segment for every
- # open socket that shutdown(SD_SEND) wasn't used on.
- self._reset_socket_on_close(conn)
- # Wait for adb to finish, so we can check return code.
- proc.communicate()
- # If this fails, adb probably isn't ignoring WSAECONNRESET when
- # reading the response from the adb emu kill command (on Windows).
- self.assertEqual(0, proc.returncode)
- def test_emulator_connect(self):
- """Ensure that the emulator can connect.
- Bug: http://b/78991667
- """
- with adb_server() as server_port:
- with fake_adbd() as (port, _):
- serial = "emulator-{}".format(port - 1)
- # Ensure that the emulator is not there.
- try:
- subprocess.check_output(["adb", "-P", str(server_port),
- "-s", serial, "get-state"],
- stderr=subprocess.STDOUT)
- self.fail("Device should not be available")
- except subprocess.CalledProcessError as err:
- self.assertEqual(
- err.output.strip(),
- "error: device '{}' not found".format(serial).encode("utf8"))
- # Let the ADB server know that the emulator has started.
- with contextlib.closing(
- socket.socket(socket.AF_INET, socket.SOCK_STREAM)) as sock:
- sock.connect(("localhost", server_port))
- command = "host:emulator:{}".format(port).encode("utf8")
- sock.sendall(b"%04x%s" % (len(command), command))
- # Ensure the emulator is there.
- subprocess.check_call(["adb", "-P", str(server_port),
- "-s", serial, "wait-for-device"])
- output = subprocess.check_output(["adb", "-P", str(server_port),
- "-s", serial, "get-state"])
- self.assertEqual(output.strip(), b"device")
- class ConnectionTest(unittest.TestCase):
- """Tests for adb connect."""
- def test_connect_ipv4_ipv6(self):
- """Ensure that `adb connect localhost:1234` will try both IPv4 and IPv6.
- Bug: http://b/30313466
- """
- for protocol in (socket.AF_INET, socket.AF_INET6):
- try:
- with fake_adbd(protocol=protocol) as (port, _):
- serial = "localhost:{}".format(port)
- with adb_connect(self, serial):
- pass
- except socket.error:
- print("IPv6 not available, skipping")
- continue
- def test_already_connected(self):
- """Ensure that an already-connected device stays connected."""
- with fake_adbd() as (port, _):
- serial = "localhost:{}".format(port)
- with adb_connect(self, serial):
- # b/31250450: this always returns 0 but probably shouldn't.
- output = subprocess.check_output(["adb", "connect", serial])
- self.assertEqual(
- output.strip(),
- "already connected to {}".format(serial).encode("utf8"))
- @unittest.skip("Currently failing b/123247844")
- def test_reconnect(self):
- """Ensure that a disconnected device reconnects."""
- with fake_adbd() as (port, _):
- serial = "localhost:{}".format(port)
- with adb_connect(self, serial):
- # Wait a bit to give adb some time to connect.
- time.sleep(0.25)
- output = subprocess.check_output(["adb", "-s", serial,
- "get-state"])
- self.assertEqual(output.strip(), b"device")
- # This will fail.
- proc = subprocess.Popen(["adb", "-s", serial, "shell", "true"],
- stdout=subprocess.PIPE,
- stderr=subprocess.STDOUT)
- output, _ = proc.communicate()
- self.assertEqual(output.strip(), b"error: closed")
- subprocess.check_call(["adb", "-s", serial, "wait-for-device"])
- output = subprocess.check_output(["adb", "-s", serial,
- "get-state"])
- self.assertEqual(output.strip(), b"device")
- # Once we explicitly kick a device, it won't attempt to
- # reconnect.
- output = subprocess.check_output(["adb", "disconnect", serial])
- self.assertEqual(
- output.strip(),
- "disconnected {}".format(serial).encode("utf8"))
- try:
- subprocess.check_output(["adb", "-s", serial, "get-state"],
- stderr=subprocess.STDOUT)
- self.fail("Device should not be available")
- except subprocess.CalledProcessError as err:
- self.assertEqual(
- err.output.strip(),
- "error: device '{}' not found".format(serial).encode("utf8"))
- class DisconnectionTest(unittest.TestCase):
- """Tests for adb disconnect."""
- def test_disconnect(self):
- """Ensure that `adb disconnect` takes effect immediately."""
- def _devices(port):
- output = subprocess.check_output(["adb", "-P", str(port), "devices"])
- return [x.split("\t") for x in output.decode("utf8").strip().splitlines()[1:]]
- with adb_server() as server_port:
- with fake_adbd() as (port, sock):
- device_name = "localhost:{}".format(port)
- output = subprocess.check_output(["adb", "-P", str(server_port),
- "connect", device_name])
- self.assertEqual(output.strip(),
- "connected to {}".format(device_name).encode("utf8"))
- self.assertEqual(_devices(server_port), [[device_name, "device"]])
- # Send a deliberately malformed packet to make the device go offline.
- packet = struct.pack("IIIIII", 0, 0, 0, 0, 0, 0)
- sock.sendall(packet)
- # Wait a bit.
- time.sleep(0.1)
- self.assertEqual(_devices(server_port), [[device_name, "offline"]])
- # Disconnect the device.
- output = subprocess.check_output(["adb", "-P", str(server_port),
- "disconnect", device_name])
- # Wait a bit.
- time.sleep(0.1)
- self.assertEqual(_devices(server_port), [])
- @unittest.skipUnless(sys.platform == "win32", "requires Windows")
- class PowerTest(unittest.TestCase):
- def test_resume_usb_kick(self):
- """Resuming from sleep/hibernate should kick USB devices."""
- try:
- usb_serial = subprocess.check_output(["adb", "-d", "get-serialno"]).strip()
- except subprocess.CalledProcessError:
- # If there are multiple USB devices, we don't have a way to check whether the selected
- # device is USB.
- raise unittest.SkipTest('requires single USB device')
- try:
- serial = subprocess.check_output(["adb", "get-serialno"]).strip()
- except subprocess.CalledProcessError:
- # Did you forget to select a device with $ANDROID_SERIAL?
- raise unittest.SkipTest('requires $ANDROID_SERIAL set to a USB device')
- # Test only works with USB devices because adb _power_notification_thread does not kick
- # non-USB devices on resume event.
- if serial != usb_serial:
- raise unittest.SkipTest('requires USB device')
- # Run an adb shell command in the background that takes a while to complete.
- proc = subprocess.Popen(['adb', 'shell', 'sleep', '5'])
- # Wait for startup of adb server's _power_notification_thread.
- time.sleep(0.1)
- # Simulate resuming from sleep/hibernation by sending Windows message.
- import ctypes
- from ctypes import wintypes
- HWND_BROADCAST = 0xffff
- WM_POWERBROADCAST = 0x218
- PBT_APMRESUMEAUTOMATIC = 0x12
- PostMessageW = ctypes.windll.user32.PostMessageW
- PostMessageW.restype = wintypes.BOOL
- PostMessageW.argtypes = (wintypes.HWND, wintypes.UINT, wintypes.WPARAM, wintypes.LPARAM)
- result = PostMessageW(HWND_BROADCAST, WM_POWERBROADCAST, PBT_APMRESUMEAUTOMATIC, 0)
- if not result:
- raise ctypes.WinError()
- # Wait for connection to adb shell to be broken by _power_notification_thread detecting the
- # Windows message.
- start = time.time()
- proc.wait()
- end = time.time()
- # If the power event was detected, the adb shell command should be broken very quickly.
- self.assertLess(end - start, 2)
- def main():
- """Main entrypoint."""
- random.seed(0)
- unittest.main(verbosity=3)
- if __name__ == "__main__":
- main()
|