payload_info_unittest.py 13 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357
  1. #!/usr/bin/python2
  2. #
  3. # Copyright (C) 2015 The Android Open Source Project
  4. #
  5. # Licensed under the Apache License, Version 2.0 (the "License");
  6. # you may not use this file except in compliance with the License.
  7. # You may obtain a copy of the License at
  8. #
  9. # http://www.apache.org/licenses/LICENSE-2.0
  10. #
  11. # Unless required by applicable law or agreed to in writing, software
  12. # distributed under the License is distributed on an "AS IS" BASIS,
  13. # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  14. # See the License for the specific language governing permissions and
  15. # limitations under the License.
  16. #
  17. """Unit testing payload_info.py."""
  18. from __future__ import print_function
  19. import StringIO
  20. import collections
  21. import mock
  22. import sys
  23. import unittest
  24. import payload_info
  25. import update_payload
  26. from contextlib import contextmanager
  27. from update_payload import update_metadata_pb2
  28. class FakePayloadError(Exception):
  29. """A generic error when using the FakePayload."""
  30. class FakeOption(object):
  31. """Fake options object for testing."""
  32. def __init__(self, **kwargs):
  33. self.list_ops = False
  34. self.stats = False
  35. self.signatures = False
  36. for key, val in kwargs.iteritems():
  37. setattr(self, key, val)
  38. if not hasattr(self, 'payload_file'):
  39. self.payload_file = None
  40. class FakeOp(object):
  41. """Fake manifest operation for testing."""
  42. def __init__(self, src_extents, dst_extents, op_type, **kwargs):
  43. self.src_extents = src_extents
  44. self.dst_extents = dst_extents
  45. self.type = op_type
  46. for key, val in kwargs.iteritems():
  47. setattr(self, key, val)
  48. def HasField(self, field):
  49. return hasattr(self, field)
  50. class FakePartition(object):
  51. """Fake PartitionUpdate field for testing."""
  52. def __init__(self, partition_name, operations):
  53. self.partition_name = partition_name
  54. self.operations = operations
  55. class FakeManifest(object):
  56. """Fake manifest for testing."""
  57. def __init__(self, major_version):
  58. FakeExtent = collections.namedtuple('FakeExtent',
  59. ['start_block', 'num_blocks'])
  60. self.install_operations = [FakeOp([],
  61. [FakeExtent(1, 1), FakeExtent(2, 2)],
  62. update_payload.common.OpType.REPLACE_BZ,
  63. dst_length=3*4096,
  64. data_offset=1,
  65. data_length=1)]
  66. self.kernel_install_operations = [FakeOp(
  67. [FakeExtent(1, 1)],
  68. [FakeExtent(x, x) for x in xrange(20)],
  69. update_payload.common.OpType.SOURCE_COPY,
  70. src_length=4096)]
  71. if major_version == payload_info.MAJOR_PAYLOAD_VERSION_BRILLO:
  72. self.partitions = [FakePartition('root', self.install_operations),
  73. FakePartition('kernel',
  74. self.kernel_install_operations)]
  75. self.install_operations = self.kernel_install_operations = []
  76. self.block_size = 4096
  77. self.minor_version = 4
  78. FakePartInfo = collections.namedtuple('FakePartInfo', ['size'])
  79. self.old_rootfs_info = FakePartInfo(1 * 4096)
  80. self.old_kernel_info = FakePartInfo(2 * 4096)
  81. self.new_rootfs_info = FakePartInfo(3 * 4096)
  82. self.new_kernel_info = FakePartInfo(4 * 4096)
  83. self.signatures_offset = None
  84. self.signatures_size = None
  85. def HasField(self, field_name):
  86. """Fake HasField method based on the python members."""
  87. return hasattr(self, field_name) and getattr(self, field_name) is not None
  88. class FakeHeader(object):
  89. """Fake payload header for testing."""
  90. def __init__(self, version, manifest_len, metadata_signature_len):
  91. self.version = version
  92. self.manifest_len = manifest_len
  93. self.metadata_signature_len = metadata_signature_len
  94. @property
  95. def size(self):
  96. return (20 if self.version == payload_info.MAJOR_PAYLOAD_VERSION_CHROMEOS
  97. else 24)
  98. class FakePayload(object):
  99. """Fake payload for testing."""
  100. def __init__(self, major_version):
  101. self._header = FakeHeader(major_version, 222, 0)
  102. self.header = None
  103. self._manifest = FakeManifest(major_version)
  104. self.manifest = None
  105. self._blobs = {}
  106. self._payload_signatures = update_metadata_pb2.Signatures()
  107. self._metadata_signatures = update_metadata_pb2.Signatures()
  108. def Init(self):
  109. """Fake Init that sets header and manifest.
  110. Failing to call Init() will not make header and manifest available to the
  111. test.
  112. """
  113. self.header = self._header
  114. self.manifest = self._manifest
  115. def ReadDataBlob(self, offset, length):
  116. """Return the blob that should be present at the offset location"""
  117. if not offset in self._blobs:
  118. raise FakePayloadError('Requested blob at unknown offset %d' % offset)
  119. blob = self._blobs[offset]
  120. if len(blob) != length:
  121. raise FakePayloadError('Read blob with the wrong length (expect: %d, '
  122. 'actual: %d)' % (len(blob), length))
  123. return blob
  124. @staticmethod
  125. def _AddSignatureToProto(proto, **kwargs):
  126. """Add a new Signature element to the passed proto."""
  127. new_signature = proto.signatures.add()
  128. for key, val in kwargs.iteritems():
  129. setattr(new_signature, key, val)
  130. def AddPayloadSignature(self, **kwargs):
  131. self._AddSignatureToProto(self._payload_signatures, **kwargs)
  132. blob = self._payload_signatures.SerializeToString()
  133. self._manifest.signatures_offset = 1234
  134. self._manifest.signatures_size = len(blob)
  135. self._blobs[self._manifest.signatures_offset] = blob
  136. def AddMetadataSignature(self, **kwargs):
  137. self._AddSignatureToProto(self._metadata_signatures, **kwargs)
  138. if self._header.metadata_signature_len:
  139. del self._blobs[-self._header.metadata_signature_len]
  140. blob = self._metadata_signatures.SerializeToString()
  141. self._header.metadata_signature_len = len(blob)
  142. self._blobs[-len(blob)] = blob
  143. class PayloadCommandTest(unittest.TestCase):
  144. """Test class for our PayloadCommand class."""
  145. @contextmanager
  146. def OutputCapturer(self):
  147. """A tool for capturing the sys.stdout"""
  148. stdout = sys.stdout
  149. try:
  150. sys.stdout = StringIO.StringIO()
  151. yield sys.stdout
  152. finally:
  153. sys.stdout = stdout
  154. def TestCommand(self, payload_cmd, payload, expected_out):
  155. """A tool for testing a payload command.
  156. It tests that a payload command which runs with a given payload produces a
  157. correct output.
  158. """
  159. with mock.patch.object(update_payload, 'Payload', return_value=payload), \
  160. self.OutputCapturer() as output:
  161. payload_cmd.Run()
  162. self.assertEquals(output.getvalue(), expected_out)
  163. def testDisplayValue(self):
  164. """Verify that DisplayValue prints what we expect."""
  165. with self.OutputCapturer() as output:
  166. payload_info.DisplayValue('key', 'value')
  167. self.assertEquals(output.getvalue(), 'key: value\n')
  168. def testRun(self):
  169. """Verify that Run parses and displays the payload like we expect."""
  170. payload_cmd = payload_info.PayloadCommand(FakeOption(action='show'))
  171. payload = FakePayload(payload_info.MAJOR_PAYLOAD_VERSION_CHROMEOS)
  172. expected_out = """Payload version: 1
  173. Manifest length: 222
  174. Number of operations: 1
  175. Number of kernel ops: 1
  176. Block size: 4096
  177. Minor version: 4
  178. """
  179. self.TestCommand(payload_cmd, payload, expected_out)
  180. def testListOpsOnVersion1(self):
  181. """Verify that the --list_ops option gives the correct output."""
  182. payload_cmd = payload_info.PayloadCommand(
  183. FakeOption(list_ops=True, action='show'))
  184. payload = FakePayload(payload_info.MAJOR_PAYLOAD_VERSION_CHROMEOS)
  185. expected_out = """Payload version: 1
  186. Manifest length: 222
  187. Number of operations: 1
  188. Number of kernel ops: 1
  189. Block size: 4096
  190. Minor version: 4
  191. Install operations:
  192. 0: REPLACE_BZ
  193. Data offset: 1
  194. Data length: 1
  195. Destination: 2 extents (3 blocks)
  196. (1,1) (2,2)
  197. Kernel install operations:
  198. 0: SOURCE_COPY
  199. Source: 1 extent (1 block)
  200. (1,1)
  201. Destination: 20 extents (190 blocks)
  202. (0,0) (1,1) (2,2) (3,3) (4,4) (5,5) (6,6) (7,7) (8,8) (9,9) (10,10)
  203. (11,11) (12,12) (13,13) (14,14) (15,15) (16,16) (17,17) (18,18) (19,19)
  204. """
  205. self.TestCommand(payload_cmd, payload, expected_out)
  206. def testListOpsOnVersion2(self):
  207. """Verify that the --list_ops option gives the correct output."""
  208. payload_cmd = payload_info.PayloadCommand(
  209. FakeOption(list_ops=True, action='show'))
  210. payload = FakePayload(payload_info.MAJOR_PAYLOAD_VERSION_BRILLO)
  211. expected_out = """Payload version: 2
  212. Manifest length: 222
  213. Number of partitions: 2
  214. Number of "root" ops: 1
  215. Number of "kernel" ops: 1
  216. Block size: 4096
  217. Minor version: 4
  218. root install operations:
  219. 0: REPLACE_BZ
  220. Data offset: 1
  221. Data length: 1
  222. Destination: 2 extents (3 blocks)
  223. (1,1) (2,2)
  224. kernel install operations:
  225. 0: SOURCE_COPY
  226. Source: 1 extent (1 block)
  227. (1,1)
  228. Destination: 20 extents (190 blocks)
  229. (0,0) (1,1) (2,2) (3,3) (4,4) (5,5) (6,6) (7,7) (8,8) (9,9) (10,10)
  230. (11,11) (12,12) (13,13) (14,14) (15,15) (16,16) (17,17) (18,18) (19,19)
  231. """
  232. self.TestCommand(payload_cmd, payload, expected_out)
  233. def testStatsOnVersion1(self):
  234. """Verify that the --stats option works correctly."""
  235. payload_cmd = payload_info.PayloadCommand(
  236. FakeOption(stats=True, action='show'))
  237. payload = FakePayload(payload_info.MAJOR_PAYLOAD_VERSION_CHROMEOS)
  238. expected_out = """Payload version: 1
  239. Manifest length: 222
  240. Number of operations: 1
  241. Number of kernel ops: 1
  242. Block size: 4096
  243. Minor version: 4
  244. Blocks read: 11
  245. Blocks written: 193
  246. Seeks when writing: 18
  247. """
  248. self.TestCommand(payload_cmd, payload, expected_out)
  249. def testStatsOnVersion2(self):
  250. """Verify that the --stats option works correctly on version 2."""
  251. payload_cmd = payload_info.PayloadCommand(
  252. FakeOption(stats=True, action='show'))
  253. payload = FakePayload(payload_info.MAJOR_PAYLOAD_VERSION_BRILLO)
  254. expected_out = """Payload version: 2
  255. Manifest length: 222
  256. Number of partitions: 2
  257. Number of "root" ops: 1
  258. Number of "kernel" ops: 1
  259. Block size: 4096
  260. Minor version: 4
  261. Blocks read: 11
  262. Blocks written: 193
  263. Seeks when writing: 18
  264. """
  265. self.TestCommand(payload_cmd, payload, expected_out)
  266. def testEmptySignatures(self):
  267. """Verify that the --signatures option works with unsigned payloads."""
  268. payload_cmd = payload_info.PayloadCommand(
  269. FakeOption(action='show', signatures=True))
  270. payload = FakePayload(payload_info.MAJOR_PAYLOAD_VERSION_CHROMEOS)
  271. expected_out = """Payload version: 1
  272. Manifest length: 222
  273. Number of operations: 1
  274. Number of kernel ops: 1
  275. Block size: 4096
  276. Minor version: 4
  277. No metadata signatures stored in the payload
  278. No payload signatures stored in the payload
  279. """
  280. self.TestCommand(payload_cmd, payload, expected_out)
  281. def testSignatures(self):
  282. """Verify that the --signatures option shows the present signatures."""
  283. payload_cmd = payload_info.PayloadCommand(
  284. FakeOption(action='show', signatures=True))
  285. payload = FakePayload(payload_info.MAJOR_PAYLOAD_VERSION_BRILLO)
  286. payload.AddPayloadSignature(version=1,
  287. data='12345678abcdefgh\x00\x01\x02\x03')
  288. payload.AddPayloadSignature(data='I am a signature so access is yes.')
  289. payload.AddMetadataSignature(data='\x00\x0a\x0c')
  290. expected_out = """Payload version: 2
  291. Manifest length: 222
  292. Number of partitions: 2
  293. Number of "root" ops: 1
  294. Number of "kernel" ops: 1
  295. Block size: 4096
  296. Minor version: 4
  297. Metadata signatures blob: file_offset=246 (7 bytes)
  298. Metadata signatures: (1 entries)
  299. version=None, hex_data: (3 bytes)
  300. 00 0a 0c | ...
  301. Payload signatures blob: blob_offset=1234 (64 bytes)
  302. Payload signatures: (2 entries)
  303. version=1, hex_data: (20 bytes)
  304. 31 32 33 34 35 36 37 38 61 62 63 64 65 66 67 68 | 12345678abcdefgh
  305. 00 01 02 03 | ....
  306. version=None, hex_data: (34 bytes)
  307. 49 20 61 6d 20 61 20 73 69 67 6e 61 74 75 72 65 | I am a signature
  308. 20 73 6f 20 61 63 63 65 73 73 20 69 73 20 79 65 | so access is ye
  309. 73 2e | s.
  310. """
  311. self.TestCommand(payload_cmd, payload, expected_out)
  312. if __name__ == '__main__':
  313. unittest.main()