summaryrefslogtreecommitdiff
path: root/external/construct/debug.py
blob: 3910cae4d900f21917c46d8b606642f8cee21216 (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
"""
Debugging utilities for constructs
"""
import sys
import traceback
import pdb
import inspect
from construct.core import Construct, Subconstruct
from construct.lib import HexString, Container, ListContainer


class Probe(Construct):
    """
    A probe: dumps the context, stack frames, and stream content to the screen
    to aid the debugging process.

    .. seealso:: :class:`Debugger`.

    :param name: the display name
    :param show_stream: whether or not to show stream contents. default is True. the stream must be seekable.
    :param show_context: whether or not to show the context. default is True.
    :param show_stack: whether or not to show the upper stack frames. default is True.
    :param stream_lookahead: the number of bytes to dump when show_stack is set. default is 100.
    
    Example::
    
        Struct("foo",
            UBInt8("a"),
            Probe("between a and b"),
            UBInt8("b"),
        )
    """
    __slots__ = [
        "printname", "show_stream", "show_context", "show_stack", 
        "stream_lookahead"
    ]
    counter = 0
    
    def __init__(self, name = None, show_stream = True, 
                 show_context = True, show_stack = True, 
                 stream_lookahead = 100):
        Construct.__init__(self, None)
        if name is None:
            Probe.counter += 1
            name = "<unnamed %d>" % (Probe.counter,)
        self.printname = name
        self.show_stream = show_stream
        self.show_context = show_context
        self.show_stack = show_stack
        self.stream_lookahead = stream_lookahead
    def __repr__(self):
        return "%s(%r)" % (self.__class__.__name__, self.printname)
    def _parse(self, stream, context):
        self.printout(stream, context)
    def _build(self, obj, stream, context):
        self.printout(stream, context)
    def _sizeof(self, context):
        return 0
    
    def printout(self, stream, context):
        obj = Container()
        if self.show_stream:
            obj.stream_position = stream.tell()
            follows = stream.read(self.stream_lookahead)
            if not follows:
                obj.following_stream_data = "EOF reached"
            else:
                stream.seek(-len(follows), 1)
                obj.following_stream_data = HexString(follows)
            print("")
        
        if self.show_context:
            obj.context = context
        
        if self.show_stack:
            obj.stack = ListContainer()
            frames = [s[0] for s in inspect.stack()][1:-1]
            frames.reverse()
            for f in frames:
                a = Container()
                a.__update__(f.f_locals)
                obj.stack.append(a)
        
        print("=" * 80)
        print("Probe %s" % (self.printname,))
        print(obj)
        print("=" * 80)

class Debugger(Subconstruct):
    """
    A pdb-based debugger. When an exception occurs in the subcon, a debugger
    will appear and allow you to debug the error (and even fix on-the-fly).
    
    :param subcon: the subcon to debug
    
    Example::
    
        Debugger(
            Enum(UBInt8("foo"),
                a = 1,
                b = 2,
                c = 3
            )
        )
    """
    __slots__ = ["retval"]
    def _parse(self, stream, context):
        try:
            return self.subcon._parse(stream, context)
        except Exception:
            self.retval = NotImplemented
            self.handle_exc("(you can set the value of 'self.retval', "
                "which will be returned)")
            if self.retval is NotImplemented:
                raise
            else:
                return self.retval
    def _build(self, obj, stream, context):
        try:
            self.subcon._build(obj, stream, context)
        except Exception:
            self.handle_exc()
    def handle_exc(self, msg = None):
        print("=" * 80)
        print("Debugging exception of %s:" % (self.subcon,))
        print("".join(traceback.format_exception(*sys.exc_info())[1:]))
        if msg:
            print(msg)
        pdb.post_mortem(sys.exc_info()[2])
        print("=" * 80)