Edit

kc3-lang/brotli/python/tests/decompressor_test.py

Branch :

  • Show log

    Commit

  • Author : Robert Obryk
    Date : 2024-09-18 15:25:06
    Hash : eb3a31e2
    Message : add max_length to Python streaming decompression

  • python/tests/decompressor_test.py
  • # Copyright 2016 The Brotli Authors. All rights reserved.
    #
    # Distributed under MIT license.
    # See file LICENSE for detail or copy at https://opensource.org/licenses/MIT
    
    import functools
    import os
    import unittest
    
    from . import _test_utils
    import brotli
    
    
    def _get_original_name(test_data):
        return test_data.split('.compressed')[0]
    
    
    class TestDecompressor(_test_utils.TestCase):
    
        CHUNK_SIZE = 1
    
        def setUp(self):
            self.decompressor = brotli.Decompressor()
    
        def tearDown(self):
            self.decompressor = None
    
        def _check_decompression(self, test_data):
            # Verify decompression matches the original.
            temp_uncompressed = _test_utils.get_temp_uncompressed_name(test_data)
            original = _get_original_name(test_data)
            self.assertFilesMatch(temp_uncompressed, original)
    
        def _decompress(self, test_data):
            temp_uncompressed = _test_utils.get_temp_uncompressed_name(test_data)
            with open(temp_uncompressed, 'wb') as out_file:
                with open(test_data, 'rb') as in_file:
                    read_chunk = functools.partial(in_file.read, self.CHUNK_SIZE)
                    for data in iter(read_chunk, b''):
                        out_file.write(self.decompressor.process(data))
            self.assertTrue(self.decompressor.is_finished())
    
        def _decompress_with_limit(self, test_data, max_output_length):
            temp_uncompressed = _test_utils.get_temp_uncompressed_name(test_data)
            with open(temp_uncompressed, 'wb') as out_file:
                with open(test_data, 'rb') as in_file:
                    chunk_iter = iter(functools.partial(in_file.read, 10 * 1024), b'')
                    while not self.decompressor.is_finished():
                        data = b''
                        if self.decompressor.can_accept_more_data():
                            data = next(chunk_iter, b'')
                        decompressed_data = self.decompressor.process(data, max_output_length=max_output_length)
                        self.assertTrue(len(decompressed_data) <= max_output_length)
                        out_file.write(decompressed_data)
                    self.assertTrue(next(chunk_iter, None) == None)
    
        def _test_decompress(self, test_data):
            self._decompress(test_data)
            self._check_decompression(test_data)
    
        def _test_decompress_with_limit(self, test_data):
            self._decompress_with_limit(test_data, max_output_length=20)
            self._check_decompression(test_data)
    
        def test_too_much_input(self):
            with open(os.path.join(_test_utils.TESTDATA_DIR, "zerosukkanooa.compressed"), 'rb') as in_file:
                compressed = in_file.read()
                self.decompressor.process(compressed[:-1], max_output_length=1)
                # the following assertion checks whether the test setup is correct
                self.assertTrue(not self.decompressor.can_accept_more_data())
                with self.assertRaises(brotli.error):
                    self.decompressor.process(compressed[-1:])
    
        def test_changing_limit(self):
            test_data = os.path.join(_test_utils.TESTDATA_DIR, "zerosukkanooa.compressed")
            temp_uncompressed = _test_utils.get_temp_uncompressed_name(test_data)
            with open(temp_uncompressed, 'wb') as out_file:
                with open(test_data, 'rb') as in_file:
                    compressed = in_file.read()
                    uncompressed = self.decompressor.process(compressed[:-1], max_output_length=1)
                    self.assertTrue(len(uncompressed) <= 1)
                    out_file.write(uncompressed)
                    while not self.decompressor.can_accept_more_data():
                        out_file.write(self.decompressor.process(b''))
                    out_file.write(self.decompressor.process(compressed[-1:]))
            self._check_decompression(test_data)
    
        def test_garbage_appended(self):
            with self.assertRaises(brotli.error):
                self.decompressor.process(brotli.compress(b'a') + b'a')
    
        def test_already_finished(self):
            self.decompressor.process(brotli.compress(b'a'))
            with self.assertRaises(brotli.error):
                self.decompressor.process(b'a')
    
    
    _test_utils.generate_test_methods(TestDecompressor, for_decompression=True)
    
    if __name__ == '__main__':
        unittest.main()