Edit

kc3-lang/brotli/python/tests/decompressor_test.py

Branch :

  • Show log

    Commit

  • Author : Evgenii Kliuchnikov
    Date : 2025-09-25 07:59:41
    Hash : 14068984
    Message : Build and test with PY2.7 PiperOrigin-RevId: 811352084

  • python/tests/decompressor_test.py
  • # Copyright 2016 The Brotli Authors. All rights reserved.
    #
    # Distributed under MIT license.
    # See file LICENSE for detail or copy at https://opensource.org/licenses/MIT
    
    import functools
    import os
    import unittest
    
    import brotli
    
    from . import _test_utils
    
    
    def _get_original_name(test_data):
      return test_data.split('.compressed')[0]
    
    
    class TestDecompressor(_test_utils.TestCase):
    
      CHUNK_SIZE = 1
      MIN_OUTPUT_BUFFER_SIZE = 32768  # Actually, several bytes less.
    
      def setUp(self):
        # super().setUp()  # Requires Py3+
        self.decompressor = brotli.Decompressor()
    
      def tearDown(self):
        self.decompressor = None
        # super().tearDown()  # Requires Py3+
    
      def _check_decompression(self, test_data):
        # Verify decompression matches the original.
        temp_uncompressed = _test_utils.get_temp_uncompressed_name(test_data)
        original = _get_original_name(test_data)
        self.assert_files_match(temp_uncompressed, original)
    
      def _decompress(self, test_data):
        temp_uncompressed = _test_utils.get_temp_uncompressed_name(test_data)
        with open(temp_uncompressed, 'wb') as out_file:
          with open(test_data, 'rb') as in_file:
            read_chunk = functools.partial(in_file.read, self.CHUNK_SIZE)
            for data in iter(read_chunk, b''):
              out_file.write(self.decompressor.process(data))
        self.assertTrue(self.decompressor.is_finished())
    
      def _decompress_with_limit(self, test_data):
        output_buffer_limit = 10922
        temp_uncompressed = _test_utils.get_temp_uncompressed_name(test_data)
        with open(temp_uncompressed, 'wb') as out_file:
          with open(test_data, 'rb') as in_file:
            chunk_iter = iter(functools.partial(in_file.read, 10 * 1024), b'')
            while not self.decompressor.is_finished():
              data = b''
              if self.decompressor.can_accept_more_data():
                data = next(chunk_iter, b'')
              decompressed_data = self.decompressor.process(
                  data, output_buffer_limit=output_buffer_limit
              )
              self.assertLessEqual(
                  len(decompressed_data), self.MIN_OUTPUT_BUFFER_SIZE
              )
              out_file.write(decompressed_data)
            self.assertIsNone(next(chunk_iter, None))
    
      def _test_decompress(self, test_data):
        self._decompress(test_data)
        self._check_decompression(test_data)
    
      def _test_decompress_with_limit(self, test_data):
        self._decompress_with_limit(test_data)
        self._check_decompression(test_data)
    
      def test_too_much_input(self):
        with open(
            os.path.join(_test_utils.TESTDATA_DIR, 'zerosukkanooa.compressed'), 'rb'
        ) as in_file:
          compressed = in_file.read()
          self.decompressor.process(compressed[:-1], output_buffer_limit=10240)
          # the following assertion checks whether the test setup is correct
          self.assertFalse(self.decompressor.can_accept_more_data())
          with self.assertRaises(brotli.error):
            self.decompressor.process(compressed[-1:])
    
      def test_changing_limit(self):
        test_data = os.path.join(
            _test_utils.TESTDATA_DIR, 'zerosukkanooa.compressed'
        )
        check_output = os.path.exists(test_data.replace('.compressed', ''))
        temp_uncompressed = _test_utils.get_temp_uncompressed_name(test_data)
        with open(temp_uncompressed, 'wb') as out_file:
          with open(test_data, 'rb') as in_file:
            compressed = in_file.read()
            uncompressed = self.decompressor.process(
                compressed[:-1], output_buffer_limit=10240
            )
            self.assertLessEqual(len(uncompressed), self.MIN_OUTPUT_BUFFER_SIZE)
            out_file.write(uncompressed)
            while not self.decompressor.can_accept_more_data():
              out_file.write(self.decompressor.process(b''))
            out_file.write(self.decompressor.process(compressed[-1:]))
        if check_output:
          self._check_decompression(test_data)
    
      def test_garbage_appended(self):
        with self.assertRaises(brotli.error):
          self.decompressor.process(brotli.compress(b'a') + b'a')
    
      def test_already_finished(self):
        self.decompressor.process(brotli.compress(b'a'))
        with self.assertRaises(brotli.error):
          self.decompressor.process(b'a')
    
    
    _test_utils.generate_test_methods(TestDecompressor, for_decompression=True)
    
    if __name__ == '__main__':
      unittest.main()