Edit

kc3-lang/brotli/python/bro.py

Branch :

  • Show log

    Commit

  • Author : Thomas Fischbacher
    Date : 2023-07-19 05:43:51
    Hash : acc26565
    Message : Small Python modernization of Brotli code. PiperOrigin-RevId: 549289787

  • python/bro.py
  • #! /usr/bin/env python
    """Compression/decompression utility using the Brotli algorithm."""
    
    # Note: Python2 has been deprecated long ago, but some projects out in
    # the wide world may still use it nevertheless. This should not
    # deprive them from being able to run Brotli.
    from __future__ import print_function
    
    import argparse
    import os
    import platform
    import sys
    
    import brotli
    
    
    # default values of encoder parameters
    _DEFAULT_PARAMS = {
        'mode': brotli.MODE_GENERIC,
        'quality': 11,
        'lgwin': 22,
        'lgblock': 0,
    }
    
    
    def get_binary_stdio(stream):
        """Return the specified stdin/stdout/stderr stream.
    
        If the stdio stream requested (i.e. sys.(stdin|stdout|stderr))
        has been replaced with a stream object that does not have a `.buffer`
        attribute, this will return the original stdio stream's buffer, i.e.
        `sys.__(stdin|stdout|stderr)__.buffer`.
    
        Args:
          stream: One of 'stdin', 'stdout', 'stderr'.
    
        Returns:
          The stream, as a 'raw' buffer object (i.e. io.BufferedIOBase subclass
          instance such as io.Bufferedreader/io.BufferedWriter), suitable for
          reading/writing binary data from/to it.
        """
        if stream == 'stdin': stdio = sys.stdin
        elif stream == 'stdout': stdio = sys.stdout
        elif stream == 'stderr': stdio = sys.stderr
        else:
            raise ValueError('invalid stream name: %s' % (stream,))
        if sys.version_info[0] < 3:
            if sys.platform == 'win32':
                # set I/O stream binary flag on python2.x (Windows)
                runtime = platform.python_implementation()
                if runtime == 'PyPy':
                    # the msvcrt trick doesn't work in pypy, so use fdopen().
                    mode = 'rb' if stream == 'stdin' else 'wb'
                    stdio = os.fdopen(stdio.fileno(), mode, 0)
                else:
                    # this works with CPython -- untested on other implementations
                    import msvcrt
                    msvcrt.setmode(stdio.fileno(), os.O_BINARY)
            return stdio
        else:
            try:
                return stdio.buffer
            except AttributeError:
                # The Python reference explains
                # (-> https://docs.python.org/3/library/sys.html#sys.stdin)
                # that the `.buffer` attribute might not exist, since
                # the standard streams might have been replaced by something else
                # (such as an `io.StringIO()` - perhaps via
                # `contextlib.redirect_stdout()`).
                # We fall back to the original stdio in these cases.
                if stream == 'stdin': return sys.__stdin__.buffer
                if stream == 'stdout': return sys.__stdout__.buffer
                if stream == 'stderr': return sys.__stderr__.buffer
                assert False, 'Impossible Situation.'
    
    
    def main(args=None):
    
        parser = argparse.ArgumentParser(
            prog=os.path.basename(__file__), description=__doc__)
        parser.add_argument(
            '--version', action='version', version=brotli.version)
        parser.add_argument(
            '-i',
            '--input',
            metavar='FILE',
            type=str,
            dest='infile',
            help='Input file',
            default=None)
        parser.add_argument(
            '-o',
            '--output',
            metavar='FILE',
            type=str,
            dest='outfile',
            help='Output file',
            default=None)
        parser.add_argument(
            '-f',
            '--force',
            action='store_true',
            help='Overwrite existing output file',
            default=False)
        parser.add_argument(
            '-d',
            '--decompress',
            action='store_true',
            help='Decompress input file',
            default=False)
        params = parser.add_argument_group('optional encoder parameters')
        params.add_argument(
            '-m',
            '--mode',
            metavar='MODE',
            type=int,
            choices=[0, 1, 2],
            help='The compression mode can be 0 for generic input, '
            '1 for UTF-8 encoded text, or 2 for WOFF 2.0 font data. '
            'Defaults to 0.')
        params.add_argument(
            '-q',
            '--quality',
            metavar='QUALITY',
            type=int,
            choices=list(range(0, 12)),
            help='Controls the compression-speed vs compression-density '
            'tradeoff. The higher the quality, the slower the '
            'compression. Range is 0 to 11. Defaults to 11.')
        params.add_argument(
            '--lgwin',
            metavar='LGWIN',
            type=int,
            choices=list(range(10, 25)),
            help='Base 2 logarithm of the sliding window size. Range is '
            '10 to 24. Defaults to 22.')
        params.add_argument(
            '--lgblock',
            metavar='LGBLOCK',
            type=int,
            choices=[0] + list(range(16, 25)),
            help='Base 2 logarithm of the maximum input block size. '
            'Range is 16 to 24. If set to 0, the value will be set based '
            'on the quality. Defaults to 0.')
        # set default values using global _DEFAULT_PARAMS dictionary
        parser.set_defaults(**_DEFAULT_PARAMS)
    
        options = parser.parse_args(args=args)
    
        if options.infile:
            try:
                with open(options.infile, 'rb') as infile:
                    data = infile.read()
            except OSError:
                parser.error('Could not read --infile: %s' % (infile,))
        else:
            if sys.stdin.isatty():
                # interactive console, just quit
                parser.error('No input (called from interactive terminal).')
            infile = get_binary_stdio('stdin')
            data = infile.read()
    
        if options.outfile:
            # Caution! If `options.outfile` is a broken symlink, will try to
            # redirect the write according to symlink.
            if os.path.exists(options.outfile) and not options.force:
                parser.error(('Target --outfile=%s already exists, '
                              'but --force was not requested.') % (outfile,))
            outfile = open(options.outfile, 'wb')
            did_open_outfile = True
        else:
            outfile = get_binary_stdio('stdout')
            did_open_outfile = False
        try:
            try:
                if options.decompress:
                    data = brotli.decompress(data)
                else:
                    data = brotli.compress(
                        data,
                        mode=options.mode,
                        quality=options.quality,
                        lgwin=options.lgwin,
                        lgblock=options.lgblock)
                outfile.write(data)
            finally:
                if did_open_outfile: outfile.close()
        except brotli.error as e:
            parser.exit(1,
                        'bro: error: %s: %s' % (e, options.infile or '{stdin}'))
    
    
    if __name__ == '__main__':
        main()