summaryrefslogtreecommitdiff
path: root/external/construct/lib/bitstream.py
blob: 3b51f66f9a855db2f6a2a6f379f0f22ebae020b7 (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
import six
from construct.lib.binary import encode_bin, decode_bin

try:
    bytes
except NameError:
    bytes = str

class BitStreamReader(object):
    __slots__ = ["substream", "buffer", "total_size"]

    def __init__(self, substream):
        self.substream = substream
        self.total_size = 0
        self.buffer = six.b("")

    def close(self):
        if self.total_size % 8 != 0:
            raise ValueError("total size of read data must be a multiple of 8",
                self.total_size)

    def tell(self):
        return self.substream.tell()

    def seek(self, pos, whence = 0):
        self.buffer = six.b("")
        self.total_size = 0
        self.substream.seek(pos, whence)

    def read(self, count):
        if count < 0:
            raise ValueError("count cannot be negative")

        l = len(self.buffer)
        if count == 0:
            data = six.b("")
        elif count <= l:
            data = self.buffer[:count]
            self.buffer = self.buffer[count:]
        else:
            data = self.buffer
            count -= l
            count_bytes = count // 8
            if count & 7:
                count_bytes += 1
            buf = encode_bin(self.substream.read(count_bytes))
            data += buf[:count]
            self.buffer = buf[count:]
        self.total_size += len(data)
        return data

class BitStreamWriter(object):
    __slots__ = ["substream", "buffer", "pos"]

    def __init__(self, substream):
        self.substream = substream
        self.buffer = []
        self.pos = 0

    def close(self):
        self.flush()

    def flush(self):
        raw = decode_bin(six.b("").join(self.buffer))
        self.substream.write(raw)
        self.buffer = []
        self.pos = 0

    def tell(self):
        return self.substream.tell() + self.pos // 8

    def seek(self, pos, whence = 0):
        self.flush()
        self.substream.seek(pos, whence)

    def write(self, data):
        if not data:
            return
        if not isinstance(data, bytes):
            raise TypeError("data must be a string, not %r" % (type(data),))
        self.buffer.append(data)