summaryrefslogblamecommitdiff
path: root/external/construct/lib/bitstream.py
blob: 3b51f66f9a855db2f6a2a6f379f0f22ebae020b7 (plain) (tree)
















































































                                                                               
import six
from construct.lib.binary import encode_bin, decode_bin

try:
    bytes
except NameError:
    bytes = str

class BitStreamReader(object):
    __slots__ = ["substream", "buffer", "total_size"]

    def __init__(self, substream):
        self.substream = substream
        self.total_size = 0
        self.buffer = six.b("")

    def close(self):
        if self.total_size % 8 != 0:
            raise ValueError("total size of read data must be a multiple of 8",
                self.total_size)

    def tell(self):
        return self.substream.tell()

    def seek(self, pos, whence = 0):
        self.buffer = six.b("")
        self.total_size = 0
        self.substream.seek(pos, whence)

    def read(self, count):
        if count < 0:
            raise ValueError("count cannot be negative")

        l = len(self.buffer)
        if count == 0:
            data = six.b("")
        elif count <= l:
            data = self.buffer[:count]
            self.buffer = self.buffer[count:]
        else:
            data = self.buffer
            count -= l
            count_bytes = count // 8
            if count & 7:
                count_bytes += 1
            buf = encode_bin(self.substream.read(count_bytes))
            data += buf[:count]
            self.buffer = buf[count:]
        self.total_size += len(data)
        return data

class BitStreamWriter(object):
    __slots__ = ["substream", "buffer", "pos"]

    def __init__(self, substream):
        self.substream = substream
        self.buffer = []
        self.pos = 0

    def close(self):
        self.flush()

    def flush(self):
        raw = decode_bin(six.b("").join(self.buffer))
        self.substream.write(raw)
        self.buffer = []
        self.pos = 0

    def tell(self):
        return self.substream.tell() + self.pos // 8

    def seek(self, pos, whence = 0):
        self.flush()
        self.substream.seek(pos, whence)

    def write(self, data):
        if not data:
            return
        if not isinstance(data, bytes):
            raise TypeError("data must be a string, not %r" % (type(data),))
        self.buffer.append(data)