summaryrefslogtreecommitdiff
path: root/external/construct/protocols/layer3/ipv4.py
blob: 82dfaa9d762f96259dcd095f5c8b2648f937e025 (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
"""
Internet Protocol version 4 (TCP/IP protocol stack)
"""
from construct import *
import six
from binascii import unhexlify

try:
    bytes
except NameError:
    bytes = str


class IpAddressAdapter(Adapter):
    def _encode(self, obj, context):
        if bytes is str:
            return "".join(chr(int(b)) for b in obj.split("."))
        else:
            return bytes(int(b) for b in obj.split("."))
    def _decode(self, obj, context):
        if bytes is str:
            return ".".join(str(ord(b)) for b in obj)
        else:
            return ".".join("%d" % (b,) for b in obj)

def IpAddress(name):
    return IpAddressAdapter(Bytes(name, 4))

def ProtocolEnum(code):
    return Enum(code,
        ICMP = 1,
        TCP = 6,
        UDP = 17,
    )

ipv4_header = Struct("ip_header",
    EmbeddedBitStruct(
        Const(Nibble("version"), 4),
        ExprAdapter(Nibble("header_length"), 
            decoder = lambda obj, ctx: obj * 4, 
            encoder = lambda obj, ctx: obj / 4
        ),
    ),
    BitStruct("tos",
        Bits("precedence", 3),
        Flag("minimize_delay"),
        Flag("high_throuput"),
        Flag("high_reliability"),
        Flag("minimize_cost"),
        Padding(1),
    ),
    UBInt16("total_length"),
    Value("payload_length", lambda ctx: ctx.total_length - ctx.header_length),
    UBInt16("identification"),
    EmbeddedBitStruct(
        Struct("flags",
            Padding(1),
            Flag("dont_fragment"),
            Flag("more_fragments"),
        ),
        Bits("frame_offset", 13),
    ),
    UBInt8("ttl"),
    ProtocolEnum(UBInt8("protocol")),
    UBInt16("checksum"),
    IpAddress("source"),
    IpAddress("destination"),
    Field("options", lambda ctx: ctx.header_length - 20),
)


if __name__ == "__main__":
    cap = unhexlify(six.b("4500003ca0e3000080116185c0a80205d474a126"))
    obj = ipv4_header.parse(cap)
    print (obj)
    print (repr(ipv4_header.build(obj)))