|
- #!/usr/bin/python2
- #
- # Copyright (C) 2015 The Android Open Source Project
- #
- # Licensed under the Apache License, Version 2.0 (the "License");
- # you may not use this file except in compliance with the License.
- # You may obtain a copy of the License at
- #
- # http://www.apache.org/licenses/LICENSE-2.0
- #
- # Unless required by applicable law or agreed to in writing, software
- # distributed under the License is distributed on an "AS IS" BASIS,
- # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- # See the License for the specific language governing permissions and
- # limitations under the License.
- #
- """Unit testing payload_info.py."""
- from __future__ import print_function
- import StringIO
- import collections
- import mock
- import sys
- import unittest
- import payload_info
- import update_payload
- from contextlib import contextmanager
- from update_payload import update_metadata_pb2
- class FakePayloadError(Exception):
- """A generic error when using the FakePayload."""
- class FakeOption(object):
- """Fake options object for testing."""
- def __init__(self, **kwargs):
- self.list_ops = False
- self.stats = False
- self.signatures = False
- for key, val in kwargs.iteritems():
- setattr(self, key, val)
- if not hasattr(self, 'payload_file'):
- self.payload_file = None
- class FakeOp(object):
- """Fake manifest operation for testing."""
- def __init__(self, src_extents, dst_extents, op_type, **kwargs):
- self.src_extents = src_extents
- self.dst_extents = dst_extents
- self.type = op_type
- for key, val in kwargs.iteritems():
- setattr(self, key, val)
- def HasField(self, field):
- return hasattr(self, field)
- class FakePartition(object):
- """Fake PartitionUpdate field for testing."""
- def __init__(self, partition_name, operations):
- self.partition_name = partition_name
- self.operations = operations
- class FakeManifest(object):
- """Fake manifest for testing."""
- def __init__(self, major_version):
- FakeExtent = collections.namedtuple('FakeExtent',
- ['start_block', 'num_blocks'])
- self.install_operations = [FakeOp([],
- [FakeExtent(1, 1), FakeExtent(2, 2)],
- update_payload.common.OpType.REPLACE_BZ,
- dst_length=3*4096,
- data_offset=1,
- data_length=1)]
- self.kernel_install_operations = [FakeOp(
- [FakeExtent(1, 1)],
- [FakeExtent(x, x) for x in xrange(20)],
- update_payload.common.OpType.SOURCE_COPY,
- src_length=4096)]
- if major_version == payload_info.MAJOR_PAYLOAD_VERSION_BRILLO:
- self.partitions = [FakePartition('root', self.install_operations),
- FakePartition('kernel',
- self.kernel_install_operations)]
- self.install_operations = self.kernel_install_operations = []
- self.block_size = 4096
- self.minor_version = 4
- FakePartInfo = collections.namedtuple('FakePartInfo', ['size'])
- self.old_rootfs_info = FakePartInfo(1 * 4096)
- self.old_kernel_info = FakePartInfo(2 * 4096)
- self.new_rootfs_info = FakePartInfo(3 * 4096)
- self.new_kernel_info = FakePartInfo(4 * 4096)
- self.signatures_offset = None
- self.signatures_size = None
- def HasField(self, field_name):
- """Fake HasField method based on the python members."""
- return hasattr(self, field_name) and getattr(self, field_name) is not None
- class FakeHeader(object):
- """Fake payload header for testing."""
- def __init__(self, version, manifest_len, metadata_signature_len):
- self.version = version
- self.manifest_len = manifest_len
- self.metadata_signature_len = metadata_signature_len
- @property
- def size(self):
- return (20 if self.version == payload_info.MAJOR_PAYLOAD_VERSION_CHROMEOS
- else 24)
- class FakePayload(object):
- """Fake payload for testing."""
- def __init__(self, major_version):
- self._header = FakeHeader(major_version, 222, 0)
- self.header = None
- self._manifest = FakeManifest(major_version)
- self.manifest = None
- self._blobs = {}
- self._payload_signatures = update_metadata_pb2.Signatures()
- self._metadata_signatures = update_metadata_pb2.Signatures()
- def Init(self):
- """Fake Init that sets header and manifest.
- Failing to call Init() will not make header and manifest available to the
- test.
- """
- self.header = self._header
- self.manifest = self._manifest
- def ReadDataBlob(self, offset, length):
- """Return the blob that should be present at the offset location"""
- if not offset in self._blobs:
- raise FakePayloadError('Requested blob at unknown offset %d' % offset)
- blob = self._blobs[offset]
- if len(blob) != length:
- raise FakePayloadError('Read blob with the wrong length (expect: %d, '
- 'actual: %d)' % (len(blob), length))
- return blob
- @staticmethod
- def _AddSignatureToProto(proto, **kwargs):
- """Add a new Signature element to the passed proto."""
- new_signature = proto.signatures.add()
- for key, val in kwargs.iteritems():
- setattr(new_signature, key, val)
- def AddPayloadSignature(self, **kwargs):
- self._AddSignatureToProto(self._payload_signatures, **kwargs)
- blob = self._payload_signatures.SerializeToString()
- self._manifest.signatures_offset = 1234
- self._manifest.signatures_size = len(blob)
- self._blobs[self._manifest.signatures_offset] = blob
- def AddMetadataSignature(self, **kwargs):
- self._AddSignatureToProto(self._metadata_signatures, **kwargs)
- if self._header.metadata_signature_len:
- del self._blobs[-self._header.metadata_signature_len]
- blob = self._metadata_signatures.SerializeToString()
- self._header.metadata_signature_len = len(blob)
- self._blobs[-len(blob)] = blob
- class PayloadCommandTest(unittest.TestCase):
- """Test class for our PayloadCommand class."""
- @contextmanager
- def OutputCapturer(self):
- """A tool for capturing the sys.stdout"""
- stdout = sys.stdout
- try:
- sys.stdout = StringIO.StringIO()
- yield sys.stdout
- finally:
- sys.stdout = stdout
- def TestCommand(self, payload_cmd, payload, expected_out):
- """A tool for testing a payload command.
- It tests that a payload command which runs with a given payload produces a
- correct output.
- """
- with mock.patch.object(update_payload, 'Payload', return_value=payload), \
- self.OutputCapturer() as output:
- payload_cmd.Run()
- self.assertEquals(output.getvalue(), expected_out)
- def testDisplayValue(self):
- """Verify that DisplayValue prints what we expect."""
- with self.OutputCapturer() as output:
- payload_info.DisplayValue('key', 'value')
- self.assertEquals(output.getvalue(), 'key: value\n')
- def testRun(self):
- """Verify that Run parses and displays the payload like we expect."""
- payload_cmd = payload_info.PayloadCommand(FakeOption(action='show'))
- payload = FakePayload(payload_info.MAJOR_PAYLOAD_VERSION_CHROMEOS)
- expected_out = """Payload version: 1
- Manifest length: 222
- Number of operations: 1
- Number of kernel ops: 1
- Block size: 4096
- Minor version: 4
- """
- self.TestCommand(payload_cmd, payload, expected_out)
- def testListOpsOnVersion1(self):
- """Verify that the --list_ops option gives the correct output."""
- payload_cmd = payload_info.PayloadCommand(
- FakeOption(list_ops=True, action='show'))
- payload = FakePayload(payload_info.MAJOR_PAYLOAD_VERSION_CHROMEOS)
- expected_out = """Payload version: 1
- Manifest length: 222
- Number of operations: 1
- Number of kernel ops: 1
- Block size: 4096
- Minor version: 4
- Install operations:
- 0: REPLACE_BZ
- Data offset: 1
- Data length: 1
- Destination: 2 extents (3 blocks)
- (1,1) (2,2)
- Kernel install operations:
- 0: SOURCE_COPY
- Source: 1 extent (1 block)
- (1,1)
- Destination: 20 extents (190 blocks)
- (0,0) (1,1) (2,2) (3,3) (4,4) (5,5) (6,6) (7,7) (8,8) (9,9) (10,10)
- (11,11) (12,12) (13,13) (14,14) (15,15) (16,16) (17,17) (18,18) (19,19)
- """
- self.TestCommand(payload_cmd, payload, expected_out)
- def testListOpsOnVersion2(self):
- """Verify that the --list_ops option gives the correct output."""
- payload_cmd = payload_info.PayloadCommand(
- FakeOption(list_ops=True, action='show'))
- payload = FakePayload(payload_info.MAJOR_PAYLOAD_VERSION_BRILLO)
- expected_out = """Payload version: 2
- Manifest length: 222
- Number of partitions: 2
- Number of "root" ops: 1
- Number of "kernel" ops: 1
- Block size: 4096
- Minor version: 4
- root install operations:
- 0: REPLACE_BZ
- Data offset: 1
- Data length: 1
- Destination: 2 extents (3 blocks)
- (1,1) (2,2)
- kernel install operations:
- 0: SOURCE_COPY
- Source: 1 extent (1 block)
- (1,1)
- Destination: 20 extents (190 blocks)
- (0,0) (1,1) (2,2) (3,3) (4,4) (5,5) (6,6) (7,7) (8,8) (9,9) (10,10)
- (11,11) (12,12) (13,13) (14,14) (15,15) (16,16) (17,17) (18,18) (19,19)
- """
- self.TestCommand(payload_cmd, payload, expected_out)
- def testStatsOnVersion1(self):
- """Verify that the --stats option works correctly."""
- payload_cmd = payload_info.PayloadCommand(
- FakeOption(stats=True, action='show'))
- payload = FakePayload(payload_info.MAJOR_PAYLOAD_VERSION_CHROMEOS)
- expected_out = """Payload version: 1
- Manifest length: 222
- Number of operations: 1
- Number of kernel ops: 1
- Block size: 4096
- Minor version: 4
- Blocks read: 11
- Blocks written: 193
- Seeks when writing: 18
- """
- self.TestCommand(payload_cmd, payload, expected_out)
- def testStatsOnVersion2(self):
- """Verify that the --stats option works correctly on version 2."""
- payload_cmd = payload_info.PayloadCommand(
- FakeOption(stats=True, action='show'))
- payload = FakePayload(payload_info.MAJOR_PAYLOAD_VERSION_BRILLO)
- expected_out = """Payload version: 2
- Manifest length: 222
- Number of partitions: 2
- Number of "root" ops: 1
- Number of "kernel" ops: 1
- Block size: 4096
- Minor version: 4
- Blocks read: 11
- Blocks written: 193
- Seeks when writing: 18
- """
- self.TestCommand(payload_cmd, payload, expected_out)
- def testEmptySignatures(self):
- """Verify that the --signatures option works with unsigned payloads."""
- payload_cmd = payload_info.PayloadCommand(
- FakeOption(action='show', signatures=True))
- payload = FakePayload(payload_info.MAJOR_PAYLOAD_VERSION_CHROMEOS)
- expected_out = """Payload version: 1
- Manifest length: 222
- Number of operations: 1
- Number of kernel ops: 1
- Block size: 4096
- Minor version: 4
- No metadata signatures stored in the payload
- No payload signatures stored in the payload
- """
- self.TestCommand(payload_cmd, payload, expected_out)
- def testSignatures(self):
- """Verify that the --signatures option shows the present signatures."""
- payload_cmd = payload_info.PayloadCommand(
- FakeOption(action='show', signatures=True))
- payload = FakePayload(payload_info.MAJOR_PAYLOAD_VERSION_BRILLO)
- payload.AddPayloadSignature(version=1,
- data='12345678abcdefgh\x00\x01\x02\x03')
- payload.AddPayloadSignature(data='I am a signature so access is yes.')
- payload.AddMetadataSignature(data='\x00\x0a\x0c')
- expected_out = """Payload version: 2
- Manifest length: 222
- Number of partitions: 2
- Number of "root" ops: 1
- Number of "kernel" ops: 1
- Block size: 4096
- Minor version: 4
- Metadata signatures blob: file_offset=246 (7 bytes)
- Metadata signatures: (1 entries)
- version=None, hex_data: (3 bytes)
- 00 0a 0c | ...
- Payload signatures blob: blob_offset=1234 (64 bytes)
- Payload signatures: (2 entries)
- version=1, hex_data: (20 bytes)
- 31 32 33 34 35 36 37 38 61 62 63 64 65 66 67 68 | 12345678abcdefgh
- 00 01 02 03 | ....
- version=None, hex_data: (34 bytes)
- 49 20 61 6d 20 61 20 73 69 67 6e 61 74 75 72 65 | I am a signature
- 20 73 6f 20 61 63 63 65 73 73 20 69 73 20 79 65 | so access is ye
- 73 2e | s.
- """
- self.TestCommand(payload_cmd, payload, expected_out)
- if __name__ == '__main__':
- unittest.main()
|