summaryrefslogtreecommitdiff
path: root/external/construct/formats/filesystem/fat16.py
diff options
context:
space:
mode:
Diffstat (limited to 'external/construct/formats/filesystem/fat16.py')
-rw-r--r--external/construct/formats/filesystem/fat16.py226
1 files changed, 226 insertions, 0 deletions
diff --git a/external/construct/formats/filesystem/fat16.py b/external/construct/formats/filesystem/fat16.py
new file mode 100644
index 0000000..5d6caf1
--- /dev/null
+++ b/external/construct/formats/filesystem/fat16.py
@@ -0,0 +1,226 @@
+# fat.py; ad-hoc fat16 reader
+# by Bram Westerbaan <bram@westerbaan.name>
+#
+# references:
+# http://en.wikipedia.org/wiki/File_Allocation_Table
+# http://www.ecma-international.org/publications/standards/Ecma-107.htm
+#
+# example:
+# with open("/dev/sdc1") as file:
+# fs = FatFs(file)
+# for rootdir in fs:
+# print rootdir
+import numbers
+from io import BytesIO, BufferedReader
+from construct import Struct, Byte, Bytes, ULInt16, ULInt32, Enum, \
+ Array, Padding, Embed, Pass, BitStruct, Flag, Const
+
+
+def Fat16Header(name):
+ return Struct(name,
+ Bytes("jumpInstruction", 3),
+ Bytes("creatingSystemId", 8),
+ ULInt16("sectorSize"),
+ Byte("sectorsPerCluster"),
+ ULInt16("reservedSectorCount"),
+ Byte("fatCount"),
+ ULInt16("rootdirEntryCount"),
+ ULInt16("sectorCount_small"),
+ Byte("mediaId"),
+ ULInt16("sectorsPerFat"),
+ ULInt16("sectorsPerTrack"),
+ ULInt16("sideCount"),
+ ULInt32("hiddenSectorCount"),
+ ULInt32("sectorCount_large"),
+ Byte("physicalDriveNumber"),
+ Byte("currentHead"),
+ Byte("extendedBootSignature"),
+ Bytes("volumeId", 4),
+ Bytes("volumeLabel", 11),
+ Const(Bytes("fsType", 8), "FAT16 "),
+ Bytes("bootCode", 448),
+ Const(Bytes("bootSectorSignature", 2), "\x55\xaa"))
+
+def BootSector(name):
+ header = Fat16Header("header")
+ return Struct(name,
+ Embed(header),
+ Padding(lambda ctx: ctx.sectorSize - header.sizeof()))
+
+def FatEntry(name):
+ return Enum(ULInt16(name),
+ free_cluster = 0x0000,
+ bad_cluster = 0xfff7,
+ last_cluster = 0xffff,
+ _default_ = Pass)
+
+def DirEntry(name):
+ return Struct(name,
+ Bytes("name", 8),
+ Bytes("extension", 3),
+ BitStruct("attributes",
+ Flag("unused"),
+ Flag("device"),
+ Flag("archive"),
+ Flag("subDirectory"),
+ Flag("volumeLabel"),
+ Flag("system"),
+ Flag("hidden"),
+ Flag("readonly")),
+ # reserved
+ Padding(10),
+ ULInt16("timeRecorded"),
+ ULInt16("dateRecorded"),
+ ULInt16("firstCluster"),
+ ULInt32("fileSize"))
+
+def PreDataRegion(name):
+ rde = DirEntry("rootdirs")
+ fe = FatEntry("fats")
+ return Struct(name,
+ Embed(BootSector("bootSector")),
+ # the remaining reserved sectors
+ Padding(lambda ctx: (ctx.reservedSectorCount - 1)
+ * ctx.sectorSize),
+ # file allocation tables
+ Array(lambda ctx: (ctx.fatCount),
+ Array(lambda ctx: ctx.sectorsPerFat *
+ ctx.sectorSize / fe.sizeof(), fe)),
+ # root directories
+ Array(lambda ctx: (ctx.rootdirEntryCount*rde.sizeof())
+ / ctx.sectorSize, rde))
+
+class File(object):
+ def __init__(self, dirEntry, fs):
+ self.fs = fs
+ self.dirEntry = dirEntry
+
+ @classmethod
+ def fromDirEntry(cls, dirEntry, fs):
+ if dirEntry.name[0] in "\x00\xe5\x2e":
+ return None
+ a = dirEntry.attributes
+ #Long file name directory entry
+ if a.volumeLabel and a.system and a.hidden and a.readonly:
+ return None
+ if a.subDirectory:
+ return Directory(dirEntry, fs)
+ return File(dirEntry, fs)
+
+ @classmethod
+ def fromDirEntries(cls, dirEntries, fs):
+ return filter(None, [cls.fromDirEntry(de, fs)
+ for de in dirEntries])
+
+ def toStream(self, stream):
+ self.fs.fileToStream(self.dirEntry.firstCluster, stream)
+
+ @property
+ def name(self):
+ return "%s.%s" % (self.dirEntry.name.rstrip(),
+ self.dirEntry.extension)
+
+ def __str__(self):
+ return "&%s %s" % (self.dirEntry.firstCluster, self.name)
+
+class Directory(File):
+ def __init__(self, dirEntry, fs, children=None):
+ File.__init__(self, dirEntry, fs)
+ self.children = children
+ if not self.children:
+ self.children = File.fromDirEntries(\
+ self.fs.getDirEntries(\
+ self.dirEntry.firstCluster), fs)
+
+ @property
+ def name(self):
+ return self.dirEntry.name.rstrip()
+
+ def __str__(self):
+ return "&%s %s/" % (self.dirEntry.firstCluster, self.name)
+
+ def __getitem__(self, name):
+ for file in self.children:
+ if file.name == name:
+ return file
+
+ def __iter__(self):
+ return iter(self.children)
+
+class FatFs(Directory):
+ def __init__(self, stream):
+ self.stream = stream
+ self.pdr = PreDataRegion("pdr").parse_stream(stream)
+ Directory.__init__(self, dirEntry = None,
+ fs = self, children = File.fromDirEntries(
+ self.pdr.rootdirs, self))
+
+ def fileToStream(self, clidx, stream):
+ for clidx in self.getLinkedClusters(clidx):
+ self.clusterToStream(clidx, stream)
+
+ def clusterToStream(self, clidx, stream):
+ start, todo = self.getClusterSlice(clidx)
+ self.stream.seek(start)
+ while todo > 0:
+ read = self.stream.read(todo)
+ if not len(read):
+ print("failed to read %s bytes at %s" % (todo, self.stream.tell()))
+ raise EOFError()
+ todo -= len(read)
+ stream.write(read)
+
+ def getClusterSlice(self, clidx):
+ startSector = self.pdr.reservedSectorCount \
+ + self.pdr.fatCount * self.pdr.sectorsPerFat \
+ + (self.pdr.rootdirEntryCount * 32) \
+ / self.pdr.sectorSize \
+ + (clidx-2) * self.pdr.sectorsPerCluster
+ start = startSector * self.pdr.sectorSize
+ length = self.pdr.sectorSize * self.pdr.sectorsPerCluster
+ return (start, length)
+
+ def getLinkedClusters(self, clidx):
+ res = []
+ while clidx != "last_cluster":
+ if not isinstance(clidx, numbers.Real):
+ print(clidx)
+ assert False
+ assert 2 <= clidx <= 0xffef
+ res.append(clidx)
+ clidx = self.getNextCluster(clidx)
+ assert clidx not in res
+ return res
+
+ def getNextCluster(self, clidx):
+ ress = set([fat[clidx] for fat in self.pdr.fats])
+ if len(ress)==1:
+ return ress.pop()
+ print("inconsistencie between FATs: %s points to" % clidx)
+ for i,fat in enumerate(self.pdr.fats):
+ print("\t%s according to fat #%s" % (fat[clidx], i))
+ res = ress.pop()
+ print ("assuming %s" % res)
+ return res
+
+ def getDirEntries(self, clidx):
+ try:
+ for de in self._getDirEntries(clidx):
+ yield de
+ except IOError:
+ print("failed to read directory entries at %s" % clidx)
+
+ def _getDirEntries(self, clidx):
+ de = DirEntry("dirEntry")
+ with BytesIO() as mem:
+ self.fileToStream(clidx, mem)
+ mem.seek(0)
+ with BufferedReader(mem) as reader:
+ while reader.peek(1):
+ yield de.parse_stream(reader)
+ def __str__(self):
+ return "/"
+
+ @property
+ def name(self):
+ return ""