diff options
Diffstat (limited to 'external/construct/core.py')
-rw-r--r-- | external/construct/core.py | 1411 |
1 files changed, 1411 insertions, 0 deletions
diff --git a/external/construct/core.py b/external/construct/core.py new file mode 100644 index 0000000..e1800e0 --- /dev/null +++ b/external/construct/core.py @@ -0,0 +1,1411 @@ +from struct import Struct as Packer + +from construct.lib.py3compat import BytesIO, advance_iterator, bchr +from construct.lib import Container, ListContainer, LazyContainer +import sys +import six + +try: + bytes +except NameError: + bytes = str + +#=============================================================================== +# exceptions +#=============================================================================== +class ConstructError(Exception): + pass +class FieldError(ConstructError): + pass +class SizeofError(ConstructError): + pass +class AdaptationError(ConstructError): + pass +class ArrayError(ConstructError): + pass +class RangeError(ConstructError): + pass +class SwitchError(ConstructError): + pass +class SelectError(ConstructError): + pass +class TerminatorError(ConstructError): + pass +class OverwriteError(ValueError): + pass + +#=============================================================================== +# abstract constructs +#=============================================================================== +class Construct(object): + """ + The mother of all constructs. + + This object is generally not directly instantiated, and it does not + directly implement parsing and building, so it is largely only of interest + to subclass implementors. + + The external user API: + + * ``parse()`` + * ``parse_stream()`` + * ``build()`` + * ``build_stream()`` + * ``sizeof()`` + + Subclass authors should not override the external methods. Instead, + another API is available: + + * ``_parse()`` + * ``_build()`` + * ``_sizeof()`` + + There is also a flag API: + + * ``_set_flag()`` + * ``_clear_flag()`` + * ``_inherit_flags()`` + * ``_is_flag()`` + + And stateful copying: + + * ``__getstate__()`` + * ``__setstate__()`` + + Attributes and Inheritance + ========================== + + All constructs have a name and flags. The name is used for naming struct + members and context dictionaries. Note that the name can either be a + string, or None if the name is not needed. A single underscore ("_") is a + reserved name, and so are names starting with a less-than character ("<"). + The name should be descriptive, short, and valid as a Python identifier, + although these rules are not enforced. + + The flags specify additional behavioral information about this construct. + Flags are used by enclosing constructs to determine a proper course of + action. Flags are inherited by default, from inner subconstructs to outer + constructs. The enclosing construct may set new flags or clear existing + ones, as necessary. + + For example, if ``FLAG_COPY_CONTEXT`` is set, repeaters will pass a copy of + the context for each iteration, which is necessary for OnDemand parsing. + """ + + FLAG_COPY_CONTEXT = 0x0001 + FLAG_DYNAMIC = 0x0002 + FLAG_EMBED = 0x0004 + FLAG_NESTING = 0x0008 + + __slots__ = ["name", "conflags"] + def __init__(self, name, flags = 0): + if name is not None: + if not isinstance(name, six.string_types): + raise TypeError("name must be a string or None", name) + if name == "_" or name.startswith("<"): + raise ValueError("reserved name", name) + self.name = name + self.conflags = flags + + def __repr__(self): + return "%s(%r)" % (self.__class__.__name__, self.name) + + def _set_flag(self, flag): + """ + Set the given flag or flags. + + :param int flag: flag to set; may be OR'd combination of flags + """ + + self.conflags |= flag + + def _clear_flag(self, flag): + """ + Clear the given flag or flags. + + :param int flag: flag to clear; may be OR'd combination of flags + """ + + self.conflags &= ~flag + + def _inherit_flags(self, *subcons): + """ + Pull flags from subconstructs. + """ + + for sc in subcons: + self._set_flag(sc.conflags) + + def _is_flag(self, flag): + """ + Check whether a given flag is set. + + :param int flag: flag to check + """ + + return bool(self.conflags & flag) + + def __getstate__(self): + """ + Obtain a dictionary representing this construct's state. + """ + + attrs = {} + if hasattr(self, "__dict__"): + attrs.update(self.__dict__) + slots = [] + c = self.__class__ + while c is not None: + if hasattr(c, "__slots__"): + slots.extend(c.__slots__) + c = c.__base__ + for name in slots: + if hasattr(self, name): + attrs[name] = getattr(self, name) + return attrs + + def __setstate__(self, attrs): + """ + Set this construct's state to a given state. + """ + for name, value in attrs.items(): + setattr(self, name, value) + + def __copy__(self): + """returns a copy of this construct""" + self2 = object.__new__(self.__class__) + self2.__setstate__(self, self.__getstate__()) + return self2 + + def parse(self, data): + """ + Parse an in-memory buffer. + + Strings, buffers, memoryviews, and other complete buffers can be + parsed with this method. + """ + + return self.parse_stream(BytesIO(data)) + + def parse_stream(self, stream): + """ + Parse a stream. + + Files, pipes, sockets, and other streaming sources of data are handled + by this method. + """ + + return self._parse(stream, Container()) + + def _parse(self, stream, context): + """ + Override me in your subclass. + """ + + raise NotImplementedError() + + def build(self, obj): + """ + Build an object in memory. + """ + stream = BytesIO() + self.build_stream(obj, stream) + return stream.getvalue() + + def build_stream(self, obj, stream): + """ + Build an object directly into a stream. + """ + self._build(obj, stream, Container()) + + def _build(self, obj, stream, context): + """ + Override me in your subclass. + """ + + raise NotImplementedError() + + def sizeof(self, context=None): + """ + Calculate the size of this object, optionally using a context. + + Some constructs have no fixed size and can only know their size for a + given hunk of data; these constructs will raise an error if they are + not passed a context. + + :param context: contextual data + + :returns: int of the length of this construct + :raises SizeofError: the size could not be determined + """ + + if context is None: + context = Container() + try: + return self._sizeof(context) + except Exception: + raise SizeofError(sys.exc_info()[1]) + + def _sizeof(self, context): + """ + Override me in your subclass. + """ + + raise SizeofError("Raw Constructs have no size!") + +class Subconstruct(Construct): + """ + Abstract subconstruct (wraps an inner construct, inheriting its + name and flags). + + Subconstructs wrap an inner Construct, inheriting its name and flags. + + :param subcon: the construct to wrap + """ + + __slots__ = ["subcon"] + def __init__(self, subcon): + Construct.__init__(self, subcon.name, subcon.conflags) + self.subcon = subcon + def _parse(self, stream, context): + return self.subcon._parse(stream, context) + def _build(self, obj, stream, context): + self.subcon._build(obj, stream, context) + def _sizeof(self, context): + return self.subcon._sizeof(context) + +class Adapter(Subconstruct): + """ + Abstract adapter parent class. + + Adapters should implement ``_decode()`` and ``_encode()``. + + :param subcon: the construct to wrap + """ + + __slots__ = [] + def _parse(self, stream, context): + return self._decode(self.subcon._parse(stream, context), context) + def _build(self, obj, stream, context): + self.subcon._build(self._encode(obj, context), stream, context) + def _decode(self, obj, context): + raise NotImplementedError() + def _encode(self, obj, context): + raise NotImplementedError() + + +#=============================================================================== +# Fields +#=============================================================================== +def _read_stream(stream, length): + if length < 0: + raise ValueError("length must be >= 0", length) + data = stream.read(length) + if len(data) != length: + raise FieldError("expected %d, found %d" % (length, len(data))) + return data + +def _write_stream(stream, length, data): + if length < 0: + raise ValueError("length must be >= 0", length) + if len(data) != length: + raise FieldError("expected %d, found %d" % (length, len(data))) + stream.write(data) + +class StaticField(Construct): + """ + A fixed-size byte field. + + :param name: field name + :param length: number of bytes in the field + """ + + __slots__ = ["length"] + def __init__(self, name, length): + Construct.__init__(self, name) + self.length = length + def _parse(self, stream, context): + return _read_stream(stream, self.length) + def _build(self, obj, stream, context): + _write_stream(stream, self.length, bchr(obj) if isinstance(obj, int) else obj) + def _sizeof(self, context): + return self.length + +class FormatField(StaticField): + """ + A field that uses ``struct`` to pack and unpack data. + + See ``struct`` documentation for instructions on crafting format strings. + + :param name: name of the field + :param endianness: format endianness string; one of "<", ">", or "=" + :param format: a single format character + """ + + __slots__ = ["packer"] + def __init__(self, name, endianity, format): + if endianity not in (">", "<", "="): + raise ValueError("endianity must be be '=', '<', or '>'", + endianity) + if len(format) != 1: + raise ValueError("must specify one and only one format char") + self.packer = Packer(endianity + format) + StaticField.__init__(self, name, self.packer.size) + def __getstate__(self): + attrs = StaticField.__getstate__(self) + attrs["packer"] = attrs["packer"].format + return attrs + def __setstate__(self, attrs): + attrs["packer"] = Packer(attrs["packer"]) + return StaticField.__setstate__(self, attrs) + def _parse(self, stream, context): + try: + return self.packer.unpack(_read_stream(stream, self.length))[0] + except Exception: + raise FieldError(sys.exc_info()[1]) + def _build(self, obj, stream, context): + try: + _write_stream(stream, self.length, self.packer.pack(obj)) + except Exception: + raise FieldError(sys.exc_info()[1]) + +class MetaField(Construct): + r""" + A variable-length field. The length is obtained at runtime from a + function. + + :param name: name of the field + :param lengthfunc: callable that takes a context and returns length as an int + + Example:: + + >>> foo = Struct("foo", + ... Byte("length"), + ... MetaField("data", lambda ctx: ctx["length"]) + ... ) + >>> foo.parse("\x03ABC") + Container(data = 'ABC', length = 3) + >>> foo.parse("\x04ABCD") + Container(data = 'ABCD', length = 4) + """ + + __slots__ = ["lengthfunc"] + def __init__(self, name, lengthfunc): + Construct.__init__(self, name) + self.lengthfunc = lengthfunc + self._set_flag(self.FLAG_DYNAMIC) + def _parse(self, stream, context): + return _read_stream(stream, self.lengthfunc(context)) + def _build(self, obj, stream, context): + _write_stream(stream, self.lengthfunc(context), obj) + def _sizeof(self, context): + return self.lengthfunc(context) + + +#=============================================================================== +# arrays and repeaters +#=============================================================================== +class MetaArray(Subconstruct): + """ + An array (repeater) of a meta-count. The array will iterate exactly + ``countfunc()`` times. Will raise ArrayError if less elements are found. + + .. seealso:: + + The :func:`~construct.macros.Array` macro, :func:`Range` and :func:`RepeatUntil`. + + :param countfunc: a function that takes the context as a parameter and returns + the number of elements of the array (count) + :param subcon: the subcon to repeat ``countfunc()`` times + + Example:: + + MetaArray(lambda ctx: 5, UBInt8("foo")) + """ + __slots__ = ["countfunc"] + def __init__(self, countfunc, subcon): + Subconstruct.__init__(self, subcon) + self.countfunc = countfunc + self._clear_flag(self.FLAG_COPY_CONTEXT) + self._set_flag(self.FLAG_DYNAMIC) + def _parse(self, stream, context): + obj = ListContainer() + c = 0 + count = self.countfunc(context) + try: + if self.subcon.conflags & self.FLAG_COPY_CONTEXT: + while c < count: + obj.append(self.subcon._parse(stream, context.__copy__())) + c += 1 + else: + while c < count: + obj.append(self.subcon._parse(stream, context)) + c += 1 + except ConstructError: + raise ArrayError("expected %d, found %d" % (count, c), sys.exc_info()[1]) + return obj + def _build(self, obj, stream, context): + count = self.countfunc(context) + if len(obj) != count: + raise ArrayError("expected %d, found %d" % (count, len(obj))) + if self.subcon.conflags & self.FLAG_COPY_CONTEXT: + for subobj in obj: + self.subcon._build(subobj, stream, context.__copy__()) + else: + for subobj in obj: + self.subcon._build(subobj, stream, context) + def _sizeof(self, context): + return self.subcon._sizeof(context) * self.countfunc(context) + +class Range(Subconstruct): + r""" + A range-array. The subcon will iterate between ``mincount`` to ``maxcount`` + times. If less than ``mincount`` elements are found, raises RangeError. + + .. seealso:: + + The :func:`~construct.macros.GreedyRange` and + :func:`~construct.macros.OptionalGreedyRange` macros. + + The general-case repeater. Repeats the given unit for at least ``mincount`` + times, and up to ``maxcount`` times. If an exception occurs (EOF, validation + error), the repeater exits. If less than ``mincount`` units have been + successfully parsed, a RangeError is raised. + + .. note:: This object requires a seekable stream for parsing. + + :param mincount: the minimal count + :param maxcount: the maximal count + :param subcon: the subcon to repeat + + Example:: + + >>> c = Range(3, 7, UBInt8("foo")) + >>> c.parse("\x01\x02") + Traceback (most recent call last): + ... + construct.core.RangeError: expected 3..7, found 2 + >>> c.parse("\x01\x02\x03") + [1, 2, 3] + >>> c.parse("\x01\x02\x03\x04\x05\x06") + [1, 2, 3, 4, 5, 6] + >>> c.parse("\x01\x02\x03\x04\x05\x06\x07") + [1, 2, 3, 4, 5, 6, 7] + >>> c.parse("\x01\x02\x03\x04\x05\x06\x07\x08\x09") + [1, 2, 3, 4, 5, 6, 7] + >>> c.build([1,2]) + Traceback (most recent call last): + ... + construct.core.RangeError: expected 3..7, found 2 + >>> c.build([1,2,3,4]) + '\x01\x02\x03\x04' + >>> c.build([1,2,3,4,5,6,7,8]) + Traceback (most recent call last): + ... + construct.core.RangeError: expected 3..7, found 8 + """ + + __slots__ = ["mincount", "maxcout"] + def __init__(self, mincount, maxcout, subcon): + Subconstruct.__init__(self, subcon) + self.mincount = mincount + self.maxcout = maxcout + self._clear_flag(self.FLAG_COPY_CONTEXT) + self._set_flag(self.FLAG_DYNAMIC) + def _parse(self, stream, context): + obj = ListContainer() + c = 0 + try: + if self.subcon.conflags & self.FLAG_COPY_CONTEXT: + while c < self.maxcout: + pos = stream.tell() + obj.append(self.subcon._parse(stream, context.__copy__())) + c += 1 + else: + while c < self.maxcout: + pos = stream.tell() + obj.append(self.subcon._parse(stream, context)) + c += 1 + except ConstructError: + if c < self.mincount: + raise RangeError("expected %d to %d, found %d" % + (self.mincount, self.maxcout, c), sys.exc_info()[1]) + stream.seek(pos) + return obj + def _build(self, obj, stream, context): + if len(obj) < self.mincount or len(obj) > self.maxcout: + raise RangeError("expected %d to %d, found %d" % + (self.mincount, self.maxcout, len(obj))) + cnt = 0 + try: + if self.subcon.conflags & self.FLAG_COPY_CONTEXT: + for subobj in obj: + if isinstance(obj, bytes): + subobj = bchr(subobj) + self.subcon._build(subobj, stream, context.__copy__()) + cnt += 1 + else: + for subobj in obj: + if isinstance(obj, bytes): + subobj = bchr(subobj) + self.subcon._build(subobj, stream, context) + cnt += 1 + except ConstructError: + if cnt < self.mincount: + raise RangeError("expected %d to %d, found %d" % + (self.mincount, self.maxcout, len(obj)), sys.exc_info()[1]) + def _sizeof(self, context): + raise SizeofError("can't calculate size") + +class RepeatUntil(Subconstruct): + r""" + An array that repeats until the predicate indicates it to stop. Note that + the last element (which caused the repeat to exit) is included in the + return value. + + :param predicate: a predicate function that takes (obj, context) and returns + True if the stop-condition is met, or False to continue. + :param subcon: the subcon to repeat. + + Example:: + + # will read chars until '\x00' (inclusive) + RepeatUntil(lambda obj, ctx: obj == b"\x00", + Field("chars", 1) + ) + """ + __slots__ = ["predicate"] + def __init__(self, predicate, subcon): + Subconstruct.__init__(self, subcon) + self.predicate = predicate + self._clear_flag(self.FLAG_COPY_CONTEXT) + self._set_flag(self.FLAG_DYNAMIC) + def _parse(self, stream, context): + obj = [] + try: + if self.subcon.conflags & self.FLAG_COPY_CONTEXT: + while True: + subobj = self.subcon._parse(stream, context.__copy__()) + obj.append(subobj) + if self.predicate(subobj, context): + break + else: + while True: + subobj = self.subcon._parse(stream, context) + obj.append(subobj) + if self.predicate(subobj, context): + break + except ConstructError: + raise ArrayError("missing terminator", sys.exc_info()[1]) + return obj + def _build(self, obj, stream, context): + terminated = False + if self.subcon.conflags & self.FLAG_COPY_CONTEXT: + for subobj in obj: + self.subcon._build(subobj, stream, context.__copy__()) + if self.predicate(subobj, context): + terminated = True + break + else: + for subobj in obj: + #subobj = bchr(subobj) -- WTF is that for?! + self.subcon._build(subobj, stream, context.__copy__()) + if self.predicate(subobj, context): + terminated = True + break + if not terminated: + raise ArrayError("missing terminator") + def _sizeof(self, context): + raise SizeofError("can't calculate size") + + +#=============================================================================== +# structures and sequences +#=============================================================================== +class Struct(Construct): + """ + A sequence of named constructs, similar to structs in C. The elements are + parsed and built in the order they are defined. + + .. seealso:: The :func:`~construct.macros.Embedded` macro. + + :param name: the name of the structure + :param subcons: a sequence of subconstructs that make up this structure. + :param nested: a keyword-only argument that indicates whether this struct + creates a nested context. The default is True. This parameter is + considered "advanced usage", and may be removed in the future. + + Example:: + + Struct("foo", + UBInt8("first_element"), + UBInt16("second_element"), + Padding(2), + UBInt8("third_element"), + ) + """ + __slots__ = ["subcons", "nested", "allow_overwrite"] + def __init__(self, name, *subcons, **kw): + self.nested = kw.pop("nested", True) + self.allow_overwrite = kw.pop("allow_overwrite", False) + if kw: + raise TypeError("the only keyword argument accepted is 'nested'", kw) + Construct.__init__(self, name) + self.subcons = subcons + self._inherit_flags(*subcons) + self._clear_flag(self.FLAG_EMBED) + def _parse(self, stream, context): + if "<obj>" in context: + obj = context["<obj>"] + del context["<obj>"] + else: + obj = Container() + if self.nested: + context = Container(_ = context) + for sc in self.subcons: + if sc.conflags & self.FLAG_EMBED: + context["<obj>"] = obj + sc._parse(stream, context) + else: + subobj = sc._parse(stream, context) + if sc.name is not None: + if sc.name in obj and not self.allow_overwrite: + raise OverwriteError("%r would be overwritten but allow_overwrite is False" % (sc.name,)) + obj[sc.name] = subobj + context[sc.name] = subobj + return obj + def _build(self, obj, stream, context): + if "<unnested>" in context: + del context["<unnested>"] + elif self.nested: + context = Container(_ = context) + for sc in self.subcons: + if sc.conflags & self.FLAG_EMBED: + context["<unnested>"] = True + subobj = obj + elif sc.name is None: + subobj = None + else: + subobj = getattr(obj, sc.name) + context[sc.name] = subobj + sc._build(subobj, stream, context) + def _sizeof(self, context): + #if self.nested: + # context = Container(_ = context) + return sum(sc._sizeof(context) for sc in self.subcons) + +class Sequence(Struct): + """ + A sequence of unnamed constructs. The elements are parsed and built in the + order they are defined. + + .. seealso:: The :func:`~construct.macros.Embedded` macro. + + :param name: the name of the structure + :param subcons: a sequence of subconstructs that make up this structure. + :param nested: a keyword-only argument that indicates whether this struct + creates a nested context. The default is True. This parameter is + considered "advanced usage", and may be removed in the future. + + Example:: + + Sequence("foo", + UBInt8("first_element"), + UBInt16("second_element"), + Padding(2), + UBInt8("third_element"), + ) + """ + __slots__ = [] + def _parse(self, stream, context): + if "<obj>" in context: + obj = context["<obj>"] + del context["<obj>"] + else: + obj = ListContainer() + if self.nested: + context = Container(_ = context) + for sc in self.subcons: + if sc.conflags & self.FLAG_EMBED: + context["<obj>"] = obj + sc._parse(stream, context) + else: + subobj = sc._parse(stream, context) + if sc.name is not None: + obj.append(subobj) + context[sc.name] = subobj + return obj + def _build(self, obj, stream, context): + if "<unnested>" in context: + del context["<unnested>"] + elif self.nested: + context = Container(_ = context) + objiter = iter(obj) + for sc in self.subcons: + if sc.conflags & self.FLAG_EMBED: + context["<unnested>"] = True + subobj = objiter + elif sc.name is None: + subobj = None + else: + subobj = advance_iterator(objiter) + context[sc.name] = subobj + sc._build(subobj, stream, context) + +class Union(Construct): + """ + a set of overlapping fields (like unions in C). when parsing, + all fields read the same data; when building, only the first subcon + (called "master") is used. + + :param name: the name of the union + :param master: the master subcon, i.e., the subcon used for building and calculating the total size + :param subcons: additional subcons + + Example:: + + Union("what_are_four_bytes", + UBInt32("one_dword"), + Struct("two_words", UBInt16("first"), UBInt16("second")), + Struct("four_bytes", + UBInt8("a"), + UBInt8("b"), + UBInt8("c"), + UBInt8("d") + ), + ) + """ + __slots__ = ["parser", "builder"] + def __init__(self, name, master, *subcons, **kw): + Construct.__init__(self, name) + args = [Peek(sc) for sc in subcons] + args.append(MetaField(None, lambda ctx: master._sizeof(ctx))) + self.parser = Struct(name, Peek(master, perform_build = True), *args) + self.builder = Struct(name, master) + def _parse(self, stream, context): + return self.parser._parse(stream, context) + def _build(self, obj, stream, context): + return self.builder._build(obj, stream, context) + def _sizeof(self, context): + return self.builder._sizeof(context) + +#=============================================================================== +# conditional +#=============================================================================== +class Switch(Construct): + """ + A conditional branch. Switch will choose the case to follow based on + the return value of keyfunc. If no case is matched, and no default value + is given, SwitchError will be raised. + + .. seealso:: :func:`Pass`. + + :param name: the name of the construct + :param keyfunc: a function that takes the context and returns a key, which + will be used to choose the relevant case. + :param cases: a dictionary mapping keys to constructs. the keys can be any + values that may be returned by keyfunc. + :param default: a default value to use when the key is not found in the cases. + if not supplied, an exception will be raised when the key is not found. + You can use the builtin construct Pass for 'do-nothing'. + :param include_key: whether or not to include the key in the return value + of parsing. defualt is False. + + Example:: + + Struct("foo", + UBInt8("type"), + Switch("value", lambda ctx: ctx.type, { + 1 : UBInt8("spam"), + 2 : UBInt16("spam"), + 3 : UBInt32("spam"), + 4 : UBInt64("spam"), + } + ), + ) + """ + + class NoDefault(Construct): + def _parse(self, stream, context): + raise SwitchError("no default case defined") + def _build(self, obj, stream, context): + raise SwitchError("no default case defined") + def _sizeof(self, context): + raise SwitchError("no default case defined") + NoDefault = NoDefault("No default value specified") + + __slots__ = ["subcons", "keyfunc", "cases", "default", "include_key"] + + def __init__(self, name, keyfunc, cases, default = NoDefault, + include_key = False): + Construct.__init__(self, name) + self._inherit_flags(*cases.values()) + self.keyfunc = keyfunc + self.cases = cases + self.default = default + self.include_key = include_key + self._inherit_flags(*cases.values()) + self._set_flag(self.FLAG_DYNAMIC) + def _parse(self, stream, context): + key = self.keyfunc(context) + obj = self.cases.get(key, self.default)._parse(stream, context) + if self.include_key: + return key, obj + else: + return obj + def _build(self, obj, stream, context): + if self.include_key: + key, obj = obj + else: + key = self.keyfunc(context) + case = self.cases.get(key, self.default) + case._build(obj, stream, context) + def _sizeof(self, context): + case = self.cases.get(self.keyfunc(context), self.default) + return case._sizeof(context) + +class Select(Construct): + """ + Selects the first matching subconstruct. It will literally try each of + the subconstructs, until one matches. + + .. note:: Requires a seekable stream. + + :param name: the name of the construct + :param subcons: the subcons to try (order-sensitive) + :param include_name: a keyword only argument, indicating whether to include + the name of the selected subcon in the return value of parsing. default + is false. + + Example:: + + Select("foo", + UBInt64("large"), + UBInt32("medium"), + UBInt16("small"), + UBInt8("tiny"), + ) + """ + __slots__ = ["subcons", "include_name"] + def __init__(self, name, *subcons, **kw): + include_name = kw.pop("include_name", False) + if kw: + raise TypeError("the only keyword argument accepted " + "is 'include_name'", kw) + Construct.__init__(self, name) + self.subcons = subcons + self.include_name = include_name + self._inherit_flags(*subcons) + self._set_flag(self.FLAG_DYNAMIC) + def _parse(self, stream, context): + for sc in self.subcons: + pos = stream.tell() + context2 = context.__copy__() + try: + obj = sc._parse(stream, context2) + except ConstructError: + stream.seek(pos) + else: + context.__update__(context2) + if self.include_name: + return sc.name, obj + else: + return obj + raise SelectError("no subconstruct matched") + def _build(self, obj, stream, context): + if self.include_name: + name, obj = obj + for sc in self.subcons: + if sc.name == name: + sc._build(obj, stream, context) + return + else: + for sc in self.subcons: + stream2 = BytesIO() + context2 = context.__copy__() + try: + sc._build(obj, stream2, context2) + except Exception: + pass + else: + context.__update__(context2) + stream.write(stream2.getvalue()) + return + raise SelectError("no subconstruct matched", obj) + def _sizeof(self, context): + raise SizeofError("can't calculate size") + + +#=============================================================================== +# stream manipulation +#=============================================================================== +class Pointer(Subconstruct): + """ + Changes the stream position to a given offset, where the construction + should take place, and restores the stream position when finished. + + .. seealso:: + :func:`Anchor`, :func:`OnDemand` and the + :func:`~construct.macros.OnDemandPointer` macro. + + .. note:: Requires a seekable stream. + + :param offsetfunc: a function that takes the context and returns an absolute + stream position, where the construction would take place + :param subcon: the subcon to use at ``offsetfunc()`` + + Example:: + + Struct("foo", + UBInt32("spam_pointer"), + Pointer(lambda ctx: ctx.spam_pointer, + Array(5, UBInt8("spam")) + ) + ) + """ + __slots__ = ["offsetfunc"] + def __init__(self, offsetfunc, subcon): + Subconstruct.__init__(self, subcon) + self.offsetfunc = offsetfunc + def _parse(self, stream, context): + newpos = self.offsetfunc(context) + origpos = stream.tell() + stream.seek(newpos, 2 if newpos < 0 else 0) + obj = self.subcon._parse(stream, context) + stream.seek(origpos) + return obj + def _build(self, obj, stream, context): + newpos = self.offsetfunc(context) + origpos = stream.tell() + stream.seek(newpos, 2 if newpos < 0 else 0) + self.subcon._build(obj, stream, context) + stream.seek(origpos) + def _sizeof(self, context): + return 0 + +class Peek(Subconstruct): + """ + Peeks at the stream: parses without changing the stream position. + See also Union. If the end of the stream is reached when peeking, + returns None. + + .. note:: Requires a seekable stream. + + :param subcon: the subcon to peek at + :param perform_build: whether or not to perform building. by default this + parameter is set to False, meaning building is a no-op. + + Example:: + + Peek(UBInt8("foo")) + """ + __slots__ = ["perform_build"] + def __init__(self, subcon, perform_build = False): + Subconstruct.__init__(self, subcon) + self.perform_build = perform_build + def _parse(self, stream, context): + pos = stream.tell() + try: + return self.subcon._parse(stream, context) + except FieldError: + pass + finally: + stream.seek(pos) + def _build(self, obj, stream, context): + if self.perform_build: + self.subcon._build(obj, stream, context) + def _sizeof(self, context): + return 0 + +class OnDemand(Subconstruct): + """ + Allows for on-demand (lazy) parsing. When parsing, it will return a + LazyContainer that represents a pointer to the data, but does not actually + parses it from stream until it's "demanded". + By accessing the 'value' property of LazyContainers, you will demand the + data from the stream. The data will be parsed and cached for later use. + You can use the 'has_value' property to know whether the data has already + been demanded. + + .. seealso:: The :func:`~construct.macros.OnDemandPointer` macro. + + .. note:: Requires a seekable stream. + + :param subcon: the subcon to read/write on demand + :param advance_stream: whether or not to advance the stream position. by + default this is True, but if subcon is a pointer, this should be False. + :param force_build: whether or not to force build. If set to False, and the + LazyContainer has not been demaned, building is a no-op. + + Example:: + + OnDemand(Array(10000, UBInt8("foo")) + """ + __slots__ = ["advance_stream", "force_build"] + def __init__(self, subcon, advance_stream = True, force_build = True): + Subconstruct.__init__(self, subcon) + self.advance_stream = advance_stream + self.force_build = force_build + def _parse(self, stream, context): + obj = LazyContainer(self.subcon, stream, stream.tell(), context) + if self.advance_stream: + stream.seek(self.subcon._sizeof(context), 1) + return obj + def _build(self, obj, stream, context): + if not isinstance(obj, LazyContainer): + self.subcon._build(obj, stream, context) + elif self.force_build or obj.has_value: + self.subcon._build(obj.value, stream, context) + elif self.advance_stream: + stream.seek(self.subcon._sizeof(context), 1) + +class Buffered(Subconstruct): + """ + Creates an in-memory buffered stream, which can undergo encoding and + decoding prior to being passed on to the subconstruct. + + .. seealso:: The :func:`~construct.macros.Bitwise` macro. + + .. warning:: Do not use pointers inside ``Buffered``. + + :param subcon: the subcon which will operate on the buffer + :param encoder: a function that takes a string and returns an encoded + string (used after building) + :param decoder: a function that takes a string and returns a decoded + string (used before parsing) + :param resizer: a function that takes the size of the subcon and "adjusts" + or "resizes" it according to the encoding/decoding process. + + Example:: + + Buffered(BitField("foo", 16), + encoder = decode_bin, + decoder = encode_bin, + resizer = lambda size: size / 8, + ) + """ + __slots__ = ["encoder", "decoder", "resizer"] + def __init__(self, subcon, decoder, encoder, resizer): + Subconstruct.__init__(self, subcon) + self.encoder = encoder + self.decoder = decoder + self.resizer = resizer + def _parse(self, stream, context): + data = _read_stream(stream, self._sizeof(context)) + stream2 = BytesIO(self.decoder(data)) + return self.subcon._parse(stream2, context) + def _build(self, obj, stream, context): + size = self._sizeof(context) + stream2 = BytesIO() + self.subcon._build(obj, stream2, context) + data = self.encoder(stream2.getvalue()) + assert len(data) == size + _write_stream(stream, self._sizeof(context), data) + def _sizeof(self, context): + return self.resizer(self.subcon._sizeof(context)) + +class Restream(Subconstruct): + """ + Wraps the stream with a read-wrapper (for parsing) or a + write-wrapper (for building). The stream wrapper can buffer the data + internally, reading it from- or writing it to the underlying stream + as needed. For example, BitStreamReader reads whole bytes from the + underlying stream, but returns them as individual bits. + + .. seealso:: The :func:`~construct.macros.Bitwise` macro. + + When the parsing or building is done, the stream's close method + will be invoked. It can perform any finalization needed for the stream + wrapper, but it must not close the underlying stream. + + .. warning:: Do not use pointers inside ``Restream``. + + :param subcon: the subcon + :param stream_reader: the read-wrapper + :param stream_writer: the write wrapper + :param resizer: a function that takes the size of the subcon and "adjusts" + or "resizes" it according to the encoding/decoding process. + + Example:: + + Restream(BitField("foo", 16), + stream_reader = BitStreamReader, + stream_writer = BitStreamWriter, + resizer = lambda size: size / 8, + ) + """ + __slots__ = ["stream_reader", "stream_writer", "resizer"] + def __init__(self, subcon, stream_reader, stream_writer, resizer): + Subconstruct.__init__(self, subcon) + self.stream_reader = stream_reader + self.stream_writer = stream_writer + self.resizer = resizer + def _parse(self, stream, context): + stream2 = self.stream_reader(stream) + obj = self.subcon._parse(stream2, context) + stream2.close() + return obj + def _build(self, obj, stream, context): + stream2 = self.stream_writer(stream) + self.subcon._build(obj, stream2, context) + stream2.close() + def _sizeof(self, context): + return self.resizer(self.subcon._sizeof(context)) + + +#=============================================================================== +# miscellaneous +#=============================================================================== +class Reconfig(Subconstruct): + """ + Reconfigures a subconstruct. Reconfig can be used to change the name and + set and clear flags of the inner subcon. + + :param name: the new name + :param subcon: the subcon to reconfigure + :param setflags: the flags to set (default is 0) + :param clearflags: the flags to clear (default is 0) + + Example:: + + Reconfig("foo", UBInt8("bar")) + """ + __slots__ = [] + def __init__(self, name, subcon, setflags = 0, clearflags = 0): + Construct.__init__(self, name, subcon.conflags) + self.subcon = subcon + self._set_flag(setflags) + self._clear_flag(clearflags) + +class Anchor(Construct): + """ + Gets the *anchor* (stream position) at a point in a Construct. + + Anchors are useful for adjusting relative offsets to absolute positions, + or to measure sizes of Constructs. + + To get an absolute pointer, use an Anchor plus a relative offset. To get a + size, place two Anchors and measure their difference. + + :param name: the name of the anchor + + .. note:: + + Anchor Requires a seekable stream, or at least a tellable stream; it is + implemented using the ``tell()`` method of file-like objects. + + .. seealso:: :func:`Pointer` + """ + + __slots__ = [] + def _parse(self, stream, context): + return stream.tell() + def _build(self, obj, stream, context): + context[self.name] = stream.tell() + def _sizeof(self, context): + return 0 + +class Value(Construct): + """ + A computed value. + + :param name: the name of the value + :param func: a function that takes the context and return the computed value + + Example:: + + Struct("foo", + UBInt8("width"), + UBInt8("height"), + Value("total_pixels", lambda ctx: ctx.width * ctx.height), + ) + """ + __slots__ = ["func"] + def __init__(self, name, func): + Construct.__init__(self, name) + self.func = func + self._set_flag(self.FLAG_DYNAMIC) + def _parse(self, stream, context): + return self.func(context) + def _build(self, obj, stream, context): + context[self.name] = self.func(context) + def _sizeof(self, context): + return 0 + +#class Dynamic(Construct): +# """ +# Dynamically creates a construct and uses it for parsing and building. +# This allows you to create change the construction tree on the fly. +# Deprecated. +# +# Parameters: +# * name - the name of the construct +# * factoryfunc - a function that takes the context and returns a new +# construct object which will be used for parsing and building. +# +# Example: +# def factory(ctx): +# if ctx.bar == 8: +# return UBInt8("spam") +# if ctx.bar == 9: +# return String("spam", 9) +# +# Struct("foo", +# UBInt8("bar"), +# Dynamic("spam", factory), +# ) +# """ +# __slots__ = ["factoryfunc"] +# def __init__(self, name, factoryfunc): +# Construct.__init__(self, name, self.FLAG_COPY_CONTEXT) +# self.factoryfunc = factoryfunc +# self._set_flag(self.FLAG_DYNAMIC) +# def _parse(self, stream, context): +# return self.factoryfunc(context)._parse(stream, context) +# def _build(self, obj, stream, context): +# return self.factoryfunc(context)._build(obj, stream, context) +# def _sizeof(self, context): +# return self.factoryfunc(context)._sizeof(context) + +class LazyBound(Construct): + """ + Lazily bound construct, useful for constructs that need to make cyclic + references (linked-lists, expression trees, etc.). + + :param name: the name of the construct + :param bindfunc: the function (called without arguments) returning the bound construct + + Example:: + + foo = Struct("foo", + UBInt8("bar"), + LazyBound("next", lambda: foo), + ) + """ + __slots__ = ["bindfunc", "bound"] + def __init__(self, name, bindfunc): + Construct.__init__(self, name) + self.bound = None + self.bindfunc = bindfunc + def _parse(self, stream, context): + if self.bound is None: + self.bound = self.bindfunc() + return self.bound._parse(stream, context) + def _build(self, obj, stream, context): + if self.bound is None: + self.bound = self.bindfunc() + self.bound._build(obj, stream, context) + def _sizeof(self, context): + if self.bound is None: + self.bound = self.bindfunc() + return self.bound._sizeof(context) + +class Pass(Construct): + """ + A do-nothing construct, useful as the default case for Switch, or + to indicate Enums. + + .. seealso:: :func:`Switch` and the :func:`~construct.macros.Enum` macro. + + .. note:: This construct is a singleton. Do not try to instatiate it, as it will not work. + + Example:: + + Pass + """ + __slots__ = [] + def _parse(self, stream, context): + pass + def _build(self, obj, stream, context): + assert obj is None + def _sizeof(self, context): + return 0 + +Pass = Pass(None) +""" +A do-nothing construct, useful as the default case for Switch, or +to indicate Enums. + +.. seealso:: :func:`Switch` and the :func:`~construct.macros.Enum` macro. + +.. note:: This construct is a singleton. Do not try to instatiate it, as it will not work. + +Example:: + + Pass +""" + +class Terminator(Construct): + """ + Asserts the end of the stream has been reached at the point it's placed. + You can use this to ensure no more unparsed data follows. + + .. note:: + * This construct is only meaningful for parsing. For building, it's a no-op. + * This construct is a singleton. Do not try to instatiate it, as it will not work. + + Example:: + + Terminator + """ + __slots__ = [] + def _parse(self, stream, context): + if stream.read(1): + raise TerminatorError("expected end of stream") + def _build(self, obj, stream, context): + assert obj is None + def _sizeof(self, context): + return 0 + +Terminator = Terminator(None) +""" +Asserts the end of the stream has been reached at the point it's placed. +You can use this to ensure no more unparsed data follows. + +.. note:: + * This construct is only meaningful for parsing. For building, it's a no-op. + * This construct is a singleton. Do not try to instatiate it, as it will not work. + +Example:: + + Terminator +""" + +#======================================================================================================================= +# Extra +#======================================================================================================================= +class ULInt24(StaticField): + """ + A custom made construct for handling 3-byte types as used in ancient file formats. + A better implementation would be writing a more flexable version of FormatField, + rather then specifically implementing it for this case + """ + __slots__ = ["packer"] + def __init__(self, name): + self.packer = Packer("<BH") + StaticField.__init__(self, name, self.packer.size) + def __getstate__(self): + attrs = StaticField.__getstate__(self) + attrs["packer"] = attrs["packer"].format + return attrs + def __setstate__(self, attrs): + attrs["packer"] = Packer(attrs["packer"]) + return StaticField.__setstate__(self, attrs) + def _parse(self, stream, context): + try: + vals = self.packer.unpack(_read_stream(stream, self.length)) + return vals[0] + (vals[1] << 8) + except Exception: + ex = sys.exc_info()[1] + raise FieldError(ex) + def _build(self, obj, stream, context): + try: + vals = (obj%256, obj >> 8) + _write_stream(stream, self.length, self.packer.pack(vals)) + except Exception: + ex = sys.exc_info()[1] + raise FieldError(ex) + + + + |