modules
Top-level package for TheForensicator.
app
¶
Main module.
EWFImage
¶
Object that reads the content of the EWF file and parses the content
Source code in theforensicator/app.py
class EWFImage(object):
"""Object that reads the content of the EWF file and parses the content"""
def __init__(self, filename: str) -> None:
"""Initialize the object with default values and the given filename
Args:
filename: The filename of the file to parse
"""
self.filename = filename
self.handle = None
self.verbose = True
self.ntfs_partitions = []
self.mft_dump_location = None
self.out_file_location = None
def __enter__(self) -> None:
"""Open a handle on EWF files and read the content. Called when we enter
a `with` block
"""
try:
import pyewf
self.handle = pyewf.handle()
self.handle.open(pyewf.glob(self.filename))
except ModuleNotFoundError:
print("[!]\tCould not load pyewf, using python implementation...")
from .ewf import Ewf
self.handle = Ewf(self.filename)
return self
def _read_int(self, offset: int) -> int:
"""Reads an Integer at the given offset
Args:
offset: Where we want to read
Returns:
The value that has been read (as a int)
Raises:
ValueError: If offset is out of bounds
"""
curr_off = self.handle.get_offset()
buf = self.handle.read_buffer_at_offset(UINT32, offset)
self.handle.seek(curr_off)
return unpack("<I", buf)[0]
def _read_bytes(self, offset: int, nb_bytes: int) -> bytes:
"""Reads some bytes at the given offset
Args:
offset: Where we want to read
Returns:
The bytes that has been read
Raises:
ValueError: If offset is out of bounds
"""
curr_off = self.handle.get_offset()
buf = self.handle.read_buffer_at_offset(nb_bytes, offset)
self.handle.seek(curr_off)
return buf
def _is_mbr_partition(self) -> bool:
"""Check if the beginning of the disk matches a MBR magic number
Returns:
True if it is a MBR partition table
Raises:
ValueError: If the disk size is 0
"""
return self._read_int(0) == MBR_MAGIC
def _get_partitions(self):
"""Parses the partition table"""
self.mbr = MBR(self)
self.mbr_partitions = self.mbr.mbr_partitions
self.gpt = GPT(self)
self.gpt_partitions = self.gpt.gpt_partitions
def _read_sector(self, nb_sector: int) -> bytes:
"""Read the given sector
Args:
nb_sector: Index of the sector to read
Returns:
The content of the sector
Raises:
ValueError: If we try to read out of bounds
"""
return self._read_bytes(nb_sector * 512, SECTOR_SIZE)
def _read_int_at_sector_offset(self, nb_sector: int, offset: int):
"""Read an int at a given offset in the given sector
Args:
nb_sector: Index of the sector to read
offset: The offset where we want to read within the sector
Returns:
The int we wanted to read
Raises:
ValueError: If we try to read out of bounds
"""
return self._read_int((nb_sector * 512) + offset)
def _find_ntfs_partitions(self):
"""Retrieve all the NTFS partitions (_get_partitions needs to be called
before this function)
"""
for partition in self.gpt_partitions:
magic = self._read_int_at_sector_offset(partition["first_lba"], 0)
if magic == NTFS_MAGIC:
self.ntfs_partitions.append(NTFS(self, partition))
def read_ewf(self):
"""Read the EWF file, and parse the partition tables"""
if not self._is_mbr_partition():
print("[!] No MBR partition found, exiting...")
exit(-1)
print("[+] MBR partition found.")
self._get_partitions()
self._find_ntfs_partitions()
def analyze_ntfs(self, resolve_mft_file: str, clear_cache):
"""Analyze the NTFS partitions to extract the wanted files
Args:
resolve_mft_file: Output file of resolved MFT in JSON format
"""
for (part_idx, partition) in enumerate(self.ntfs_partitions):
partition.analyze_ntfs_header(part_idx, resolve_mft_file, clear_cache)
def dump_file(self, filenames: list, dump_dir: str):
for partition in self.ntfs_partitions:
partition.dump_file(filenames, dump_dir)
def __exit__(self, exception_type, exception_value, exception_traceback):
"""Close and clean everything. Called when we exit a `with` block."""
pass
__enter__(self)
special
¶
Open a handle on EWF files and read the content. Called when we enter
a with
block
Source code in theforensicator/app.py
def __enter__(self) -> None:
"""Open a handle on EWF files and read the content. Called when we enter
a `with` block
"""
try:
import pyewf
self.handle = pyewf.handle()
self.handle.open(pyewf.glob(self.filename))
except ModuleNotFoundError:
print("[!]\tCould not load pyewf, using python implementation...")
from .ewf import Ewf
self.handle = Ewf(self.filename)
return self
__exit__(self, exception_type, exception_value, exception_traceback)
special
¶
Close and clean everything. Called when we exit a with
block.
Source code in theforensicator/app.py
def __exit__(self, exception_type, exception_value, exception_traceback):
"""Close and clean everything. Called when we exit a `with` block."""
pass
__init__(self, filename)
special
¶
Initialize the object with default values and the given filename
Parameters:
Name | Type | Description | Default |
---|---|---|---|
filename |
str |
The filename of the file to parse |
required |
Source code in theforensicator/app.py
def __init__(self, filename: str) -> None:
"""Initialize the object with default values and the given filename
Args:
filename: The filename of the file to parse
"""
self.filename = filename
self.handle = None
self.verbose = True
self.ntfs_partitions = []
self.mft_dump_location = None
self.out_file_location = None
analyze_ntfs(self, resolve_mft_file, clear_cache)
¶
Analyze the NTFS partitions to extract the wanted files
Parameters:
Name | Type | Description | Default |
---|---|---|---|
resolve_mft_file |
str |
Output file of resolved MFT in JSON format |
required |
Source code in theforensicator/app.py
def analyze_ntfs(self, resolve_mft_file: str, clear_cache):
"""Analyze the NTFS partitions to extract the wanted files
Args:
resolve_mft_file: Output file of resolved MFT in JSON format
"""
for (part_idx, partition) in enumerate(self.ntfs_partitions):
partition.analyze_ntfs_header(part_idx, resolve_mft_file, clear_cache)
read_ewf(self)
¶
Read the EWF file, and parse the partition tables
Source code in theforensicator/app.py
def read_ewf(self):
"""Read the EWF file, and parse the partition tables"""
if not self._is_mbr_partition():
print("[!] No MBR partition found, exiting...")
exit(-1)
print("[+] MBR partition found.")
self._get_partitions()
self._find_ntfs_partitions()
cli
¶
Console script for theforensicator.
cmd(ewf_file, dump_dir=None, resolve_mft_file=None, dmp_file=None, clear_cache=None, extract_artefacts=False)
¶
Parses a EWF file and dump interesting files found in the windows file system
Parameters:
Name | Type | Description | Default |
---|---|---|---|
ewf_file |
str |
File that will be analysed (*.E01) |
required |
dump_dir |
str |
Directory location to store dumped data (default location is current execution directory) |
None |
resolve_mft_file |
str |
Output file where to store MFT files / directories in JSON format. |
None |
dmp_file |
str |
Filename to dump from the disk (ex: "C:\Windows\System32\cmd.exe") |
None |
extract_artefacts |
bool |
Extract automatically artefacts (Registry hives, ...) to dump_dir |
False |
Source code in theforensicator/cli.py
def cmd(ewf_file: str, dump_dir: str = None, resolve_mft_file: str = None, dmp_file: str = None, clear_cache: str = None, extract_artefacts: bool = False):
"""Parses a EWF file and dump interesting files found in the windows file
system
Args:
ewf_file: File that will be analysed (*.E01)
dump_dir: Directory location to store dumped data (default location is current execution directory)
resolve_mft_file: Output file where to store MFT files / directories in JSON format.
dmp_file: Filename to dump from the disk (ex: "C:\\Windows\\System32\\cmd.exe")
extract_artefacts: Extract automatically artefacts (Registry hives, ...) to dump_dir
"""
with EWFImage(ewf_file) as ewf:
ewf.read_ewf()
if dmp_file:
if type(dmp_file) is not str:
print("[?] --dump-file is empty, you must enter a valid filename.")
exit()
ewf.analyze_ntfs(resolve_mft_file, clear_cache)
if dmp_file:
ewf.dump_file([dmp_file], dump_dir)
if extract_artefacts:
artefact_files = glob.glob(f"{dirname(__file__)}/artefacts/*.yaml")
for artefact in artefact_files:
with open(artefact, "r") as _artefact:
data = yaml.safe_load(_artefact.read())
if data is None:
continue
if dump_dir and type(dump_dir) is str:
out_dir = normpath(f"{dump_dir}/{data['dirname']}")
else:
out_dir = normpath(f"./{data['dirname']}")
if not exists(out_dir):
Path(out_dir).mkdir(parents=True, exist_ok=True)
ewf.dump_file(data["files"], normpath(out_dir))
_artefact.close()
ewf
special
¶
file_parsing
¶
minimal EWF "driver" in pure Python Laurent Clevy (@lorenzo2472)
reference document : https://github.com/libyal/libewf/blob/master/documentation/Expert%20Witness%20Compression%20Format%20%28EWF%29.asciidoc tested with FTK imager 4.3 and Ewfacquire
Ewf
¶
Source code in theforensicator/ewf/file_parsing.py
class Ewf:
S_HEADER = Struct("<8sBHH")
NT_HEADER = namedtuple("header", "signature one segment_num zero")
assert S_HEADER.size == 13
S_SECTION = Struct("<16sQQ40sL")
NT_SECTION = namedtuple("section", "stype next_offset size padding checksum")
assert S_SECTION.size == 76
S_DISK = Struct("<LLLLL20s45s5sL")
assert S_DISK.size == 94
NT_DISK = namedtuple(
"disk",
"one chunk_count sectors_per_chunk bytes_per_sector sector_count reserved padding signature checksum",
)
S_VOLUME = Struct("<LLLLL")
NT_VOLUME = namedtuple(
"volume", "reserved chunk_count sectors_per_chunk bytes_per_sector sector_count"
)
S_TABLE_HEADER = Struct("<L4sQ4sL")
assert S_TABLE_HEADER.size == 24
NT_TABLE_HEADER = namedtuple("table_header", "entry_count pad1 base pad2 checksum")
S_DIGEST = Struct("<16s20s40sL")
assert S_DIGEST.size == 80
NT_DIGEST = namedtuple("digest", "md5 sha1 padding checksum")
S_HASH = Struct("<16s16sL")
assert S_HASH.size == 36
NT_HASH = namedtuple("digest", "md5 unknown checksum")
S_DATA = Struct("<B3sLLLQLLLB3sL4sLB3sL4s16s963s5sL")
assert S_DATA.size == 1052
NT_DATA = namedtuple(
"data",
"media_type unk1 chunk_count sectors_per_chunk bytes_per_sector sector_count cylinders heads sectors media_flags unk2 PALM_volume unk3 smart_logs compr_level unk4 errors unk5 guid unk6 signature checksum",
)
SECTION_HEADER = b"header\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00"
SECTION_HEADER2 = b"header2\x00\x00\x00\x00\x00\x00\x00\x00\x00"
SECTION_DATA = b"data\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00"
SECTION_DISK = b"disk\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00"
SECTION_VOLUME = b"volume\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00"
SECTION_SECTORS = b"sectors\x00\x00\x00\x00\x00\x00\x00\x00\x00"
SECTION_TABLE = b"table\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00"
SECTION_TABLE2 = b"table2\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00"
SECTION_DIGEST = b"digest\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00"
SECTION_HASH = b"hash\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00"
EVF_SIGNATURE = b"EVF\t\r\n\xff\x00"
def __init__(self, filename, checksums=False, verbose=0):
self.chunks = dict() # list of chunks pointers per segment
self.uncompressed = (
dict()
) # keep track of uncompressed chunks by storing their offset in the segment
if PurePath(filename).suffix == ".E01":
filenames = sorted(
Path(filename).parent.glob(Path(filename).name[:-2] + "??")
)
# print( filenames )
self.current_segment = None # for seek()
self.current_chunk_num = 0
self.ptr_in_current_chunk = 0
self.current_chunk_data = None
self.total_chunk_count = 0
self.checksums = checksums
self.verbose = verbose
# data per segment
self.filedesc = dict()
self.filename = dict()
self.hashes = dict() # to store md5 and sha1
self.end_of_sectors = (
dict()
) # to known how many bytes to read for last compressed chunk of the segment
# self.sectors_offset = dict()
for filename in filenames:
self.parse_segment(filename)
self.chunk_range = dict()
start_chunk = 0
self.last_sector_in_last_chunk = (
self.total_chunk_count * self.sectors_per_chunk
) - self.sector_count
# print('self.last_sector_in_last_chunk %x' % self.last_sector_in_last_chunk)
for i in range(1, self.last_segment + 1):
end_chunk = start_chunk + len(self.chunks[i]) - 1
self.chunk_range[i] = (
start_chunk,
end_chunk,
) # determine chunk number range per segment
start_chunk = end_chunk + 1
self.seek(0) # init "file" pointer to 0
else:
print("unsupported format")
sys.exit()
def parse_header(self, section_nt):
header_data = self.filedesc[self.last_segment].read(section_nt.size)
self.header_string = decompress(header_data)
# FTK imager : b'1\nmain\nc\tn\ta\te\tt\tav\tov\tm\tu\tp\tr\n \t \tuntitled\t \t \tADI4.3.0.18\tWin 201x\t2020 9 23 10 11 36\t2020 9 23 10 11 36\t0\tf\n'
# Ewfacquire : b'1\r\nmain\r\nc\tn\ta\te\tt\tav\tov\tm\tu\tp\r\n\t\t\t\t\t20180403\tLinux\t2020 2 6 15 4 33\t2020 2 6 15 4 33\t0\r\n\r\n'
if self.verbose > 1:
print(self.header_string)
def parse_tables(self, section_nt):
data = self.filedesc[self.last_segment].read(section_nt.size)
table_header_nt = Ewf.NT_TABLE_HEADER(*Ewf.S_TABLE_HEADER.unpack_from(data, 0))
if self.verbose > 1:
print(table_header_nt)
# print('%x %x' % (Ewf.S_SECTION.size+Ewf.S_TABLE_HEADER.size+table_header_nt.entry_count*4, section_nt.size ) )
offset = Ewf.S_TABLE_HEADER.size
for i in range(table_header_nt.entry_count):
ptr = (
Struct("<L").unpack_from(data, offset + i * 4)[0] & 0x7FFFFFFF
) # most significant bit is compression status
ptr += table_header_nt.base
if (
Struct("<L").unpack_from(data, offset + i * 4)[0] & 0x80000000 == 0
): # most chunks are compressed (bit is set), so we stores uncompressed ptr only
self.uncompressed[self.last_segment].add(ptr)
self.chunks[self.last_segment].add(ptr)
if self.checksums:
end_of_table = Ewf.S_TABLE_HEADER.size + table_header_nt.entry_count * 4
if (
adler32(data[Ewf.S_TABLE_HEADER.size : end_of_table])
!= Struct("<L").unpack_from(data, end_of_table)[0]
):
print("checksum error (table)")
def parse_part(self, section_nt, file):
if section_nt.stype == Ewf.SECTION_HEADER:
self.parse_header(section_nt)
elif section_nt.stype == Ewf.SECTION_HEADER2:
data = file.read(section_nt.size)
# print( decompress( data ).decode('utf16') )
elif section_nt.stype == Ewf.SECTION_VOLUME:
data = file.read(section_nt.size)
volume_nt = Ewf.NT_VOLUME(*Ewf.S_VOLUME.unpack_from(data, 0))
if self.verbose > 1:
print(volume_nt)
self.chunk_count = volume_nt.chunk_count
self.sectors_per_chunk = volume_nt.sectors_per_chunk
self.bytes_per_sector = volume_nt.bytes_per_sector
self.sector_count = volume_nt.sector_count
self.chunk_size = (
volume_nt.sectors_per_chunk * volume_nt.bytes_per_sector
) # constant
elif section_nt.stype == Ewf.SECTION_DISK:
data = file.read(section_nt.size)
# print(hexlify(data))
disk_nt = Ewf.NT_DISK(*Ewf.S_DISK.unpack_from(data, 0))
self.chunk_count = disk_nt.chunk_count
self.sectors_per_chunk = disk_nt.sectors_per_chunk
self.bytes_per_sector = disk_nt.bytes_per_sector
self.sector_count = disk_nt.sector_count
self.chunk_size = (
disk_nt.sectors_per_chunk * disk_nt.bytes_per_sector
) # constant
if self.verbose > 1:
print(disk_nt)
elif section_nt.stype == Ewf.SECTION_SECTORS:
# self.sectors_offset[ self.last_segment ] = section_offset #will be used by next table/table2 section
# print('self.sectors_offset[ self.last_segment ] %x' % self.sectors_offset[ self.last_segment ])
self.end_of_sectors[self.last_segment] = (
file.tell() - Ewf.S_SECTION.size + section_nt.size
) # end of 'sectors' section, for last 'sectors' section
elif (
section_nt.stype == Ewf.SECTION_TABLE
or section_nt.stype == Ewf.SECTION_TABLE2
):
self.parse_tables(section_nt)
elif section_nt.stype == Ewf.SECTION_DIGEST:
data = file.read(section_nt.size)
digest_nt = Ewf.NT_DIGEST(*Ewf.S_DIGEST.unpack_from(data, 0))
self.hashes["md5"] = digest_nt.md5
self.hashes["sha1"] = digest_nt.sha1
# print( digest_nt )
elif section_nt.stype == Ewf.SECTION_HASH:
data = file.read(section_nt.size)
hash_nt = Ewf.NT_HASH(*Ewf.S_HASH.unpack_from(data, 0))
self.hashes["md5"] = hash_nt.md5
# print( hash_nt )
elif section_nt.stype == Ewf.SECTION_DATA:
data = file.read(section_nt.size)
def parse_segment(self, filename):
if self.verbose > 0:
print(filename)
file = open(filename, "rb")
# parse EVF header
data = file.read(Ewf.S_HEADER.size)
header_nt = Ewf.NT_HEADER(*Ewf.S_HEADER.unpack_from(data, 0))
assert (
header_nt.one == 1
and header_nt.zero == 0
and header_nt.signature == Ewf.EVF_SIGNATURE
)
self.chunks[header_nt.segment_num] = set()
self.uncompressed[header_nt.segment_num] = set()
self.last_segment = header_nt.segment_num
self.filedesc[header_nt.segment_num] = file
self.filename[header_nt.segment_num] = filename
if self.verbose > 0:
print(header_nt)
data = file.read(Ewf.S_SECTION.size)
section_nt = Ewf.NT_SECTION(*Ewf.S_SECTION.unpack_from(data, 0))
if self.verbose > 0:
print(
"0x%08x: type:%8s next:%x size:%x"
% (
file.tell(),
section_nt.stype,
section_nt.next_offset,
section_nt.size,
)
)
if self.checksums:
computed_sum = adler32(data[:-4])
if section_nt.checksum != computed_sum:
print(
"checksum file:%08x != computed:%08x"
% (section_nt.checksum, computed_sum)
)
previous_next = 0
if section_nt.stype == Ewf.SECTION_HEADER:
self.parse_header(section_nt)
elif section_nt.stype == Ewf.SECTION_DATA:
data = file.read(section_nt.size)
while previous_next != section_nt.next_offset:
file.seek(section_nt.next_offset)
section_offset = file.tell()
previous_next = section_nt.next_offset
data = file.read(Ewf.S_SECTION.size)
section_nt = Ewf.NT_SECTION(*Ewf.S_SECTION.unpack_from(data, 0))
if self.verbose > 0:
print(
"0x%08x: type:%8s next:%x size:%x"
% (
section_offset,
section_nt.stype,
section_nt.next_offset,
section_nt.size,
)
)
if self.checksums:
computed_sum = adler32(data[:-4])
if section_nt.checksum != computed_sum:
print(
"checksum file:%08x != computed:%08x"
% (section_nt.checksum, computed_sum)
)
self.parse_part(section_nt, file)
self.chunks[header_nt.segment_num] = array.array(
"L", sorted(self.chunks[header_nt.segment_num])
) # convert the set in array
self.total_chunk_count += len(self.chunks[header_nt.segment_num])
def display_properties(self):
print(
"chunk_count:0x%x, sectors_per_chunk:0x%x, bytes_per_sector:0x%x, sector_count:0x%x"
% (
self.chunk_count,
self.sectors_per_chunk,
self.bytes_per_sector,
self.sector_count,
)
)
# print('last_segment: %d' % self.last_segment)
if "sha1" in self.hashes:
print("sha1: %s" % (hexlify(self.hashes["sha1"])))
print("md5: %s" % (hexlify(self.hashes["md5"])))
if self.verbose > 0:
for segment in range(1, self.last_segment + 1):
print("segment #%d, filename: %s" % (segment, self.filename[segment]))
print(
" chunks count: %d (including uncompressed:%d, %.2f%%)"
% (
len(self.chunks[segment]),
len(self.uncompressed[segment]),
len(self.uncompressed[segment])
* 100
/ len(self.chunks[segment]),
)
)
print(
" data offsets: first:0x%x last:0x%x"
% (self.chunks[segment][0], self.chunks[segment][-1])
)
print(" absolute chunk number ranges", self.chunk_range[segment])
print(" end_of_sectors: 0x%x" % self.end_of_sectors[segment])
def compute_offset(self, offset): # offset in bytes, multiple of 512
if offset > self.sector_count * self.bytes_per_sector or offset < 0:
raise ValueError("Offset out of bounds")
return
num_chunk = offset // self.chunk_size
# print('num_chunk %d' % num_chunk)
if num_chunk >= self.total_chunk_count:
print("error num_chunk >= self.chunk_count")
return
# locate the segment
segment = 1
while (
self.chunk_range[segment][0] > num_chunk
or num_chunk > self.chunk_range[segment][1]
and segment < self.last_segment
):
segment += 1
# locate the chunk
chunk_num_in_segment = (
num_chunk - self.chunk_range[segment][0]
) # relative chunk number (in segment), instead of absolute (in dump)
return (
segment,
chunk_num_in_segment,
offset % self.chunk_size,
) # return segment, index in self.chunks[ segment ] and ptr in chunk
def seek(self, offset):
segment, num_chunk_in_segment, ptr_in_chunk = self.compute_offset(offset)
if (
self.current_chunk_num != num_chunk_in_segment
or self.current_segment != segment
): # read new chunk if needed
self.current_chunk_data = self.read_chunk(segment, num_chunk_in_segment)
self.current_chunk_num = num_chunk_in_segment
self.current_segment = segment
self.ptr_in_current_chunk = ptr_in_chunk
# allow to iterate chunk number inside segment and over different segments
def next_chunk_num(self, segment, relative_chunk_num):
if relative_chunk_num + 1 < len(
self.chunks[segment]
): # not the last chunk of the segment
return segment, relative_chunk_num + 1
else:
if segment + 1 <= self.last_segment: # must go to next segment
return segment + 1, 0
else:
print(
"next_chunk_num error: segment %d, relative_chunk_num %d"
% (segment, relative_chunk_num)
)
def tell(self):
chunks = 0
for seg in range(1, self.current_segment):
chunks += len(self.chunks[seg]) # count chunks in segment < current_segment
chunks += self.current_chunk_num # chunks from start of current segment
offset = chunks * self.chunk_size + self.ptr_in_current_chunk
return offset
def get_offset(self):
return self.tell()
def read(self, size): # emulate read() in a file system
data = b""
# print('%d %d' % (self.current_segment, self.current_chunk_num))
if self.current_chunk_data is None: # no chunk in cache yet
self.current_chunk_data = self.read_chunk(
self.current_segment, self.current_chunk_num
)
self.ptr_in_current_chunk = 0
while size > 0:
if (
self.chunk_size - self.ptr_in_current_chunk >= size
): # last read in current chunk
data += self.current_chunk_data[
self.ptr_in_current_chunk : self.ptr_in_current_chunk + size
]
self.ptr_in_current_chunk = self.ptr_in_current_chunk + size
size = 0
else: # will need to read another chunk
data += self.current_chunk_data[
self.ptr_in_current_chunk :
] # read end of current chunk
size -= self.chunk_size - self.ptr_in_current_chunk
self.ptr_in_current_chunk = self.chunk_size
if self.current_segment < self.last_segment or (
self.current_segment == self.last_segment
and self.current_chunk_num + 1
< len(self.chunks[self.current_segment])
): # next chunk does exist
self.current_segment, self.current_chunk_num = self.next_chunk_num(
self.current_segment, self.current_chunk_num
)
self.current_chunk_data = self.read_chunk(
self.current_segment, self.current_chunk_num
) # read next chunk
self.ptr_in_current_chunk = 0
else:
# print('short read: self.current_segment %d, self.current_chunk_num %d' % (self.current_segment, self.current_chunk_num) )
return data
return data
def read_buffer_at_offset(self, nb_bytes: int, offset: int):
self.seek(offset)
return self.read(nb_bytes)
def read_chunk(self, segment, chunk): # number of chunk in segment
# print('segment %d, chunk %d' % (segment, chunk))
if chunk >= len(self.chunks[segment]) or chunk < 0:
print("read_chunk: chunk number. segment %d chunk %d" % (segment, chunk))
raise IndexError
start_offset = self.chunks[segment][chunk]
# seek
self.filedesc[segment].seek(start_offset) # seek in file segment
# read
if start_offset in self.uncompressed[segment]:
data = self.filedesc[segment].read(self.chunk_size) # without adler32
else:
if start_offset == self.chunks[segment][-1]: # last chunk in segment
end_offset = self.end_of_sectors[segment]
else:
end_offset = self.chunks[segment][chunk + 1]
# print('start_offset %x end_offset %x ' % (start_offset, end_offset ) )
compressed = self.filedesc[segment].read(
end_offset - start_offset
) # compressed data includes adler32
data = decompress(compressed)
"""if segment==3 and chunk==5026:
printHex(data)"""
return data
def compute_image_hash(self, md): # accessing chunk directly
for segment in range(1, self.last_segment + 1):
for chunk in range(len(self.chunks[segment])):
data = self.read_chunk(segment, chunk)
md.update(data)
return md.digest()
NT_DATA (tuple)
¶
data(media_type, unk1, chunk_count, sectors_per_chunk, bytes_per_sector, sector_count, cylinders, heads, sectors, media_flags, unk2, PALM_volume, unk3, smart_logs, compr_level, unk4, errors, unk5, guid, unk6, signature, checksum)
__getnewargs__(self)
special
¶
Return self as a plain tuple. Used by copy and pickle.
Source code in theforensicator/ewf/file_parsing.py
def __getnewargs__(self):
'Return self as a plain tuple. Used by copy and pickle.'
return _tuple(self)
__new__(_cls, media_type, unk1, chunk_count, sectors_per_chunk, bytes_per_sector, sector_count, cylinders, heads, sectors, media_flags, unk2, PALM_volume, unk3, smart_logs, compr_level, unk4, errors, unk5, guid, unk6, signature, checksum)
special
staticmethod
¶
Create new instance of data(media_type, unk1, chunk_count, sectors_per_chunk, bytes_per_sector, sector_count, cylinders, heads, sectors, media_flags, unk2, PALM_volume, unk3, smart_logs, compr_level, unk4, errors, unk5, guid, unk6, signature, checksum)
__repr__(self)
special
¶
Return a nicely formatted representation string
Source code in theforensicator/ewf/file_parsing.py
def __repr__(self):
'Return a nicely formatted representation string'
return self.__class__.__name__ + repr_fmt % self
NT_DIGEST (tuple)
¶
digest(md5, sha1, padding, checksum)
__getnewargs__(self)
special
¶
Return self as a plain tuple. Used by copy and pickle.
Source code in theforensicator/ewf/file_parsing.py
def __getnewargs__(self):
'Return self as a plain tuple. Used by copy and pickle.'
return _tuple(self)
__new__(_cls, md5, sha1, padding, checksum)
special
staticmethod
¶
Create new instance of digest(md5, sha1, padding, checksum)
__repr__(self)
special
¶
Return a nicely formatted representation string
Source code in theforensicator/ewf/file_parsing.py
def __repr__(self):
'Return a nicely formatted representation string'
return self.__class__.__name__ + repr_fmt % self
NT_DISK (tuple)
¶
disk(one, chunk_count, sectors_per_chunk, bytes_per_sector, sector_count, reserved, padding, signature, checksum)
__getnewargs__(self)
special
¶
Return self as a plain tuple. Used by copy and pickle.
Source code in theforensicator/ewf/file_parsing.py
def __getnewargs__(self):
'Return self as a plain tuple. Used by copy and pickle.'
return _tuple(self)
__new__(_cls, one, chunk_count, sectors_per_chunk, bytes_per_sector, sector_count, reserved, padding, signature, checksum)
special
staticmethod
¶
Create new instance of disk(one, chunk_count, sectors_per_chunk, bytes_per_sector, sector_count, reserved, padding, signature, checksum)
__repr__(self)
special
¶
Return a nicely formatted representation string
Source code in theforensicator/ewf/file_parsing.py
def __repr__(self):
'Return a nicely formatted representation string'
return self.__class__.__name__ + repr_fmt % self
NT_HASH (tuple)
¶
digest(md5, unknown, checksum)
__getnewargs__(self)
special
¶
Return self as a plain tuple. Used by copy and pickle.
Source code in theforensicator/ewf/file_parsing.py
def __getnewargs__(self):
'Return self as a plain tuple. Used by copy and pickle.'
return _tuple(self)
__new__(_cls, md5, unknown, checksum)
special
staticmethod
¶
Create new instance of digest(md5, unknown, checksum)
__repr__(self)
special
¶
Return a nicely formatted representation string
Source code in theforensicator/ewf/file_parsing.py
def __repr__(self):
'Return a nicely formatted representation string'
return self.__class__.__name__ + repr_fmt % self
NT_HEADER (tuple)
¶
header(signature, one, segment_num, zero)
__getnewargs__(self)
special
¶
Return self as a plain tuple. Used by copy and pickle.
Source code in theforensicator/ewf/file_parsing.py
def __getnewargs__(self):
'Return self as a plain tuple. Used by copy and pickle.'
return _tuple(self)
__new__(_cls, signature, one, segment_num, zero)
special
staticmethod
¶
Create new instance of header(signature, one, segment_num, zero)
__repr__(self)
special
¶
Return a nicely formatted representation string
Source code in theforensicator/ewf/file_parsing.py
def __repr__(self):
'Return a nicely formatted representation string'
return self.__class__.__name__ + repr_fmt % self
NT_SECTION (tuple)
¶
section(stype, next_offset, size, padding, checksum)
__getnewargs__(self)
special
¶
Return self as a plain tuple. Used by copy and pickle.
Source code in theforensicator/ewf/file_parsing.py
def __getnewargs__(self):
'Return self as a plain tuple. Used by copy and pickle.'
return _tuple(self)
__new__(_cls, stype, next_offset, size, padding, checksum)
special
staticmethod
¶
Create new instance of section(stype, next_offset, size, padding, checksum)
__repr__(self)
special
¶
Return a nicely formatted representation string
Source code in theforensicator/ewf/file_parsing.py
def __repr__(self):
'Return a nicely formatted representation string'
return self.__class__.__name__ + repr_fmt % self
NT_TABLE_HEADER (tuple)
¶
table_header(entry_count, pad1, base, pad2, checksum)
__getnewargs__(self)
special
¶
Return self as a plain tuple. Used by copy and pickle.
Source code in theforensicator/ewf/file_parsing.py
def __getnewargs__(self):
'Return self as a plain tuple. Used by copy and pickle.'
return _tuple(self)
__new__(_cls, entry_count, pad1, base, pad2, checksum)
special
staticmethod
¶
Create new instance of table_header(entry_count, pad1, base, pad2, checksum)
__repr__(self)
special
¶
Return a nicely formatted representation string
Source code in theforensicator/ewf/file_parsing.py
def __repr__(self):
'Return a nicely formatted representation string'
return self.__class__.__name__ + repr_fmt % self
NT_VOLUME (tuple)
¶
volume(reserved, chunk_count, sectors_per_chunk, bytes_per_sector, sector_count)
__getnewargs__(self)
special
¶
Return self as a plain tuple. Used by copy and pickle.
Source code in theforensicator/ewf/file_parsing.py
def __getnewargs__(self):
'Return self as a plain tuple. Used by copy and pickle.'
return _tuple(self)
__new__(_cls, reserved, chunk_count, sectors_per_chunk, bytes_per_sector, sector_count)
special
staticmethod
¶
Create new instance of volume(reserved, chunk_count, sectors_per_chunk, bytes_per_sector, sector_count)
__repr__(self)
special
¶
Return a nicely formatted representation string
Source code in theforensicator/ewf/file_parsing.py
def __repr__(self):
'Return a nicely formatted representation string'
return self.__class__.__name__ + repr_fmt % self
fs
special
¶
gpt
¶
Parser for GPT
GPT
¶
MBR Partition Table parser
Source code in theforensicator/fs/gpt.py
class GPT(object):
"""MBR Partition Table parser"""
GPT_HEADER_SIGNATURE = 0x5452415020494645
def __init__(self, ewf_image: "theforensicator.app.EWFImage") -> None:
"""Initialize the MBR object
Args:
ewf_image: The EWFImage object used as a base
"""
self.handle = ewf_image.handle
self.verbose = ewf_image.verbose
self.gpt = {}
self._read_gpt()
if self.verbose:
self._print_gpt_info()
def _read_gpt(self):
"""Reads the GPT partition table"""
self.gpt_header = self.read_gpt_header()
self.gpt_partitions = self.read_gpt_partitions(lba_size=512)
def read_gpt_header(self) -> bytes:
offset = self.handle.get_offset()
self.handle.seek(512)
gpt_header = self.handle.read(512)
self.gpt["signature"] = unpack_from("<Q", gpt_header, offset=0)[0]
if self.gpt["signature"] != GPT.GPT_HEADER_SIGNATURE:
print("[!] Failed to read GPT header, wrong signature %#x found." % self.mbr["signature"])
exit(-1)
self.gpt["revision"] = unpack_from("<I", gpt_header, offset=8)[0]
self.gpt["header_size"] = unpack_from("<I", gpt_header, offset=12)[0]
self.gpt["header_crc32"] = unpack_from("<I", gpt_header, offset=16)[0]
self.gpt["reserved1"] = unpack_from("<I", gpt_header, offset=20)[0]
self.gpt["my_lba"] = unpack_from("<Q", gpt_header, offset=24)[0]
self.gpt["alternate_lba"] = unpack_from("<Q", gpt_header, offset=32)[0]
self.gpt["first_usable_lba"] = unpack_from("<Q", gpt_header, offset=40)[0]
self.gpt["last_usable_lba"] = unpack_from("<Q", gpt_header, offset=48)[0]
self.gpt["disk_guid"] = "%08X-%04X-%04X-%04X-%s" % (
unpack_from("<I", gpt_header, offset=56)[0],
unpack_from("<H", gpt_header, offset=60)[0],
unpack_from("<H", gpt_header, offset=62)[0],
unpack_from("<H", gpt_header, offset=64)[0],
unpack_from("<8s", gpt_header, offset=66)[0].hex().upper()
)
self.gpt["partition_entry_lba"] = unpack_from("<Q", gpt_header, offset=72)[0]
self.gpt["num_partition_entries"] = unpack_from("<I", gpt_header, offset=80)[0]
self.gpt["sizeof_partition_entry"] = unpack_from("<I", gpt_header, offset=84)[0]
self.gpt["partition_entry_array_crc32"] = unpack_from("<I", gpt_header, offset=88)[0]
self.handle.seek(offset)
return gpt_header
def read_gpt_partitions(self, lba_size=512):
offset = self.handle.get_offset()
partition_entry_lba = self.gpt["partition_entry_lba"]
self.handle.seek(partition_entry_lba * lba_size)
gpt_partitions = []
for entry_idx in range(self.gpt["num_partition_entries"]):
entry = self.handle.read(self.gpt["sizeof_partition_entry"])
partition_entry = {}
partition_entry["partition_type_guid"] = "%08X-%04X-%04X-%04X-%s" % (
unpack_from("<I", entry, offset=0)[0],
unpack_from("<H", entry, offset=4)[0],
unpack_from("<H", entry, offset=6)[0],
unpack_from(">H", entry, offset=8)[0],
unpack_from("<6s", entry, offset=10)[0].hex().upper()
)
partition_entry["unique_partition_guid"] = "%08X-%04X-%04X-%04X-%s" % (
unpack_from("<I", entry, offset=16)[0],
unpack_from("<H", entry, offset=20)[0],
unpack_from("<H", entry, offset=22)[0],
unpack_from(">H", entry, offset=24)[0],
unpack_from("<6s", entry, offset=26)[0].hex().upper()
)
partition_entry["first_lba"] = unpack_from("<Q", entry, offset=32)[0]
partition_entry["last_lba"] = unpack_from("<Q", entry, offset=40)[0]
# Determine last entry
if not partition_entry["first_lba"] and not partition_entry["last_lba"]:
break
gpt_partitions.append(partition_entry)
self.handle.seek(offset)
return gpt_partitions
def _print_gpt_info(self):
"""Prints the informations from the GPT partition table"""
print("GPT INFOS")
print("=" * 89)
print(" Index Type" + ' '*30 + " Offset Start (Sectors) Length (Sectors)")
print("------- ----" + '-'*30 + " ------------------------ ------------------")
for (i, partition) in enumerate(self.gpt_partitions):
print(("%7d %-34s" + " %24d %18d") % (
i,
PARTITION_TYPE_GUID[partition["partition_type_guid"]],
partition["first_lba"],
(partition["last_lba"] - partition["first_lba"] + 1)
))
print("=" * 89)
__init__(self, ewf_image)
special
¶
Initialize the MBR object
Parameters:
Name | Type | Description | Default |
---|---|---|---|
ewf_image |
theforensicator.app.EWFImage |
The EWFImage object used as a base |
required |
Source code in theforensicator/fs/gpt.py
def __init__(self, ewf_image: "theforensicator.app.EWFImage") -> None:
"""Initialize the MBR object
Args:
ewf_image: The EWFImage object used as a base
"""
self.handle = ewf_image.handle
self.verbose = ewf_image.verbose
self.gpt = {}
self._read_gpt()
if self.verbose:
self._print_gpt_info()
mbr
¶
Parser for MBR
MBR
¶
MBR Partition Table parser
Source code in theforensicator/fs/mbr.py
class MBR(object):
"""MBR Partition Table parser"""
MSDOS_MBR_SIGNATURE = 0xaa55
EFI_PMBR_OSTYPE_EFI = 0xEF
EFI_PMBR_OSTYPE_EFI_GPT = 0xEE
def __init__(self, ewf_image: "theforensicator.app.EWFImage"):
"""Initialize the MBR object
Args:
ewf_image: The EWFImage object used as a base
"""
self.handle = ewf_image.handle
self.verbose = ewf_image.verbose
self.mbr = {
"partition_records" : []
}
self._read_mbr()
# not very useful
if self.verbose:
pass
#self._print_mbr_info()
def _read_mbr(self):
"""Reads the MBR partition table"""
self.mbr_header = self.read_mbr_header()
self.mbr_partitions = self.mbr["partition_records"]
def read_mbr_header(self):
offset = self.handle.get_offset()
mbr_header = self.handle.read(512)
# https://elixir.bootlin.com/linux/latest/source/block/partitions/efi.h
self.mbr["signature"] = unpack_from("<H", mbr_header, offset=510)[0]
if self.mbr["signature"] != MBR.MSDOS_MBR_SIGNATURE:
print("[!] Failed to read MBR header, wrong signature %#x found." % self.mbr["signature"])
exit(-1)
self.mbr["boot_code"] = unpack_from("<440s", mbr_header, offset=0)[0]
self.mbr["unique_mbr_signature"] = unpack_from("<I", mbr_header, offset=440)[0]
self.mbr["unknown"] = unpack_from("<H", mbr_header, offset=444)[0]
for pt_record_nb in range(4):
partition_record = {}
partition_record["boot_indicator"] = unpack_from("<B", mbr_header, offset=446 + (pt_record_nb * 16))[0]
partition_record["start_head"] = unpack_from("<B", mbr_header, offset=446 + (pt_record_nb * 16) + 1)[0]
partition_record["start_sector"] = unpack_from("<B", mbr_header, offset=446 + (pt_record_nb * 16) + 2)[0]
partition_record["start_track"] = unpack_from("<B", mbr_header, offset=446 + (pt_record_nb * 16) + 3)[0]
partition_record["os_type"] = unpack_from("<B", mbr_header, offset=446 + (pt_record_nb * 16) + 4)[0]
partition_record["end_head"] = unpack_from("<B", mbr_header, offset=446 + (pt_record_nb * 16) + 5)[0]
partition_record["end_sector"] = unpack_from("<B", mbr_header, offset=446 + (pt_record_nb * 16) + 6)[0]
partition_record["end_track"] = unpack_from("<B", mbr_header, offset=446 + (pt_record_nb * 16) + 7)[0]
partition_record["starting_lba"] = unpack_from("<I", mbr_header, offset=446 + (pt_record_nb * 16) + 8)[0]
partition_record["size_in_lba"] = unpack_from("<I", mbr_header, offset=446 + (pt_record_nb * 16) + 12)[0]
self.mbr["partition_records"].append(partition_record)
self.handle.seek(offset)
def _print_mbr_info(self):
"""Prints the informations from the MBR partition table"""
print("=" * 0x40)
print("MBR INFOS")
for (i, partition) in enumerate(self.mbr_partitions):
print("=" * 0x40)
print("Partition record %d" % i)
print("=" * 0x40)
for key in partition.keys():
print("\t%-16s : 0x%X" % (key, partition[key]))
print("=" * 0x40)
__init__(self, ewf_image)
special
¶
Initialize the MBR object
Parameters:
Name | Type | Description | Default |
---|---|---|---|
ewf_image |
theforensicator.app.EWFImage |
The EWFImage object used as a base |
required |
Source code in theforensicator/fs/mbr.py
def __init__(self, ewf_image: "theforensicator.app.EWFImage"):
"""Initialize the MBR object
Args:
ewf_image: The EWFImage object used as a base
"""
self.handle = ewf_image.handle
self.verbose = ewf_image.verbose
self.mbr = {
"partition_records" : []
}
self._read_mbr()
# not very useful
if self.verbose:
pass
#self._print_mbr_info()
ntfs
¶
Parser for NTFS
MFT
¶
MFT class
Source code in theforensicator/fs/ntfs.py
class MFT(object):
"""MFT class"""
def __init__(self, header: bytes, ntfs: "NTFS", verbose: bool) -> None:
"""Initialize the MFT class
Args:
header: Header of the MFT
ntfs: NTFS
verbose: verbose
"""
self._mft_fields = [
"magic",
"usa_ofs",
"usa_count",
"lsn",
"sequence_number",
"link_count",
"attrs_offset",
"flags",
"bytes_in_use",
"bytes_allocated",
"base_mft_record",
"next_attr_instance",
"reserved",
"mft_record_number",
"record",
]
self._attr_r_fields = [
"type",
"length",
"non_resident",
"name_length",
"name_offset",
"flags",
"instance",
"value_length",
"value_offset",
"flags",
"reserved",
]
self._attr_nr_fields = [
"type",
"length",
"non_resident",
"name_length",
"name_offset",
"flags",
"instance",
"lowest_vcn",
"highest_vcn",
"mapping_pairs_offset",
"compression_unit",
"reserved",
"allocated_size",
"data_size",
"initialized_size",
"compressed_size",
]
self.raw = header
self.ntfs = ntfs
self.verbose = verbose
# mft header fields with their values
self.mft_parsed = {}
self.is_valid_entry = True
self.record = {"is_directory": False, "files": []}
def _get_datetime(self, windows_time: int) -> dict:
"""Convert windows time to datetime
Args:
windows_time: Time to convert
Returns:
Time in a dict
"""
seconds = windows_time / 10000000
epoch = seconds - 11644473600
if epoch < 0:
epoch = 0
dt = datetime.datetime(2000, 1, 1, 0, 0, 0).fromtimestamp(epoch)
return {"timestamp": epoch, "date": f"{dt.ctime()}"}
"""Attribute type : (0x10) STANDARD_INFORMATION.
"""
def _standard_info_decode(self, attribute: bytes):
"""Decode STANDARD_INFORMATION attribute
Args:
attribute: Raw attribute to decode
Returns:
The parsed attribute
"""
# not complete but at this time we don't need more
si_info = {}
si_info["creation_time"] = self._get_datetime(
unpack_from("<Q", attribute, offset=0x0)[0]
)
si_info["last_data_change_time"] = self._get_datetime(
unpack_from("<Q", attribute, offset=0x8)[0]
)
si_info["last_mft_change_time"] = self._get_datetime(
unpack_from("<Q", attribute, offset=0x10)[0]
)
si_info["last_access_time"] = self._get_datetime(
unpack_from("<Q", attribute, offset=0x18)[0]
)
si_info["file_attributes"] = unpack_from("<I", attribute, offset=0x20)[0]
if self.verbose:
print(
"-> Created : %s\n-> Last data change : %s\n-> Last MFT change : %s\n-> Last access : %s\n-> Flags : %d"
% (
si_info["creation_time"]["date"],
si_info["last_data_change_time"]["date"],
si_info["last_mft_change_time"]["date"],
si_info["last_access_time"]["date"],
si_info["file_attributes"],
)
)
return si_info
"""Attribute type : (0x20) ATTR_LIST_ENTRY.
"""
def _attribute_list_decode(self, attribute: bytes) -> dict:
"""Decode ATTR_LIST_ENTRY attribute
Args:
attribute: Raw attribute to decode
Returns:
The parsed attribute
"""
attr_list = {}
attr_list["type"] = unpack_from("<I", attribute, offset=0)[0]
attr_list["length"] = unpack_from("<H", attribute, offset=4)[0]
attr_list["name_length"] = unpack_from("<B", attribute, offset=6)[0]
attr_list["name_offset"] = unpack_from("<B", attribute, offset=7)[0]
attr_list["lowest_vcn"] = unpack_from("<Q", attribute, offset=8)[0]
attr_list["mft_reference"] = unpack_from("<Q", attribute, offset=16)[0]
attr_list["instance"] = unpack_from("<H", attribute, offset=24)[0]
attr_list["name"] = unpack_from(
f"<{attr_list['name_length'] * 2}s", attribute, offset=26
)[0]
return attr_list
"""Attribute type : (0x30) FILE_NAME_ATTR
A file can be an archive, the flags field tells us if it's the case.
"""
def _file_name_decode(self, attribute: bytes) -> dict:
"""Decode FILE_NAME_ATTR attribute
Args:
attribute: Raw attribute to decode
Returns:
The parsed attribute
"""
_file_name = {}
# for now there's no check on sequence number, maybe after
# it's used to know either the file is allocated, deleted or orphan
# https://usermanual.wiki/Pdf/WpNtOrphanFilesEnUs.1012197800.pdf
parent_dir = unpack_from("<Q", attribute, offset=0x0)[0]
_file_name["parent_directory"] = parent_dir & 0xFFFFFFFFFFFF
_file_name["seq_num"] = parent_dir >> 0x30
_file_name["creation_time"] = self._get_datetime(
unpack_from("<Q", attribute, offset=0x8)[0]
)
_file_name["last_data_change_time"] = self._get_datetime(
unpack_from("<Q", attribute, offset=0x10)[0]
)
_file_name["last_mft_change_time"] = self._get_datetime(
unpack_from("<Q", attribute, offset=0x18)[0]
)
_file_name["last_access_time"] = self._get_datetime(
unpack_from("<Q", attribute, offset=0x20)[0]
)
_file_name["allocated_size"] = unpack_from("<Q", attribute, offset=0x28)[0]
_file_name["data_size"] = unpack_from("<Q", attribute, offset=0x30)[0]
_file_name["file_attributes"] = unpack_from("<I", attribute, offset=0x38)[0]
# some are missing because not useful at this time
_file_name["file_name_length"] = unpack_from("<B", attribute, offset=0x40)[0]
_file_name["file_name_type"] = unpack_from("<B", attribute, offset=0x41)[0]
_file_name["file_name"] = unpack_from(
f"<{_file_name['file_name_length'] * 2}s", attribute, offset=0x42
)[0].decode("utf-16")
self.record["dates"] = {
"creation_time": _file_name["creation_time"],
"last_data_change_time": _file_name["last_data_change_time"],
"last_mft_change_time": _file_name["last_mft_change_time"],
"last_access_time": _file_name["last_access_time"],
}
if _file_name["file_attributes"] & DIRECTORY == DIRECTORY:
self.record["is_directory"] = True
self.record["directory_name"] = _file_name["file_name"]
self.record["parent_directory"] = _file_name["parent_directory"]
self.record["seq_num"] = _file_name["seq_num"]
else:
if (_file_name["file_attributes"] & ARCHIVE) == ARCHIVE:
_file_name["is_archive"] = True
if (_file_name["file_attributes"] & COMPRESSED) == COMPRESSED:
pass
# print("COMPRESSED FILE FOUND !")
# exit()
self.record["files"].append(_file_name)
if self.verbose:
print("Filename record")
print(
"-> Parent directory : %d"
% (_file_name["parent_directory"] & 0xFFFFFFFFFFFF)
)
print("-> Name : %s" % (_file_name["file_name"]))
print("-> Creation : %s" % (_file_name["creation_time"]["date"]))
print(
"-> Last data change : %s"
% (_file_name["last_data_change_time"]["date"])
)
print(
"-> Last MFT change : %s" % (_file_name["last_mft_change_time"]["date"])
)
print("-> Last access : %s" % (_file_name["last_access_time"]["date"]))
print("-> Size : %d" % _file_name["data_size"])
print("-> Flags : %#x" % _file_name["file_attributes"])
return _file_name
"""Attribute type : (0x40) OBJECT_ID_ATTR
"""
def _object_id_decode(self, attribute: bytes) -> dict:
"""Decode OBJECT_ID_ATTR attribute
Args:
attribute: Raw attribute to decode
Returns:
The parsed attribute
"""
_object_id = {}
_object_id["data1"] = unpack_from("<I", attribute, offset=0x0)[0]
_object_id["data2"] = unpack_from("<H", attribute, offset=0x4)[0]
_object_id["data3"] = unpack_from("<H", attribute, offset=0x6)[0]
_object_id["data4"] = unpack_from("<8s", attribute, offset=0x8)[0]
# guid = "%08x-%04x-%04x-%s" % (
# _object_id["data1"],
# _object_id["data2"],
# _object_id["data3"],
# _object_id["data4"].hex(),
# )
# print("GUID : %s" % guid)
return _object_id
"""Attribute type : (0x60) VOLUME_NAME
"""
def _volume_name_decode(self, attribute: bytes):
# _volume_name = {}
pass
"""Attribute type : (0x80) DATA
"""
def _data_runs_decode(self, dataruns: bytes) -> list:
"""Decode DATA attribute
Args:
dataruns: dataruns list
Returns:
Data of the MFT entry
"""
current_datarun = dataruns
run_header = unpack_from("<B", current_datarun, offset=0)[0]
data = []
size_lcn_nb = run_header & 0xF
size_lcn_offset = run_header >> 4
lcn_length = unpack("<Q", current_datarun[1 : 1 + size_lcn_nb].ljust(8, b"\0"))[
0
]
lcn_offset = unpack(
"<Q",
current_datarun[1 + size_lcn_nb : 1 + size_lcn_nb + size_lcn_offset].ljust(
8, b"\0"
),
)[0]
if lcn_length == 0x0:
print("ERROR SPARSE FILE !")
exit()
# used for relative offset
prev_offset = lcn_offset
data.append({"lcn_length": lcn_length, "lcn_offset": lcn_offset})
current_datarun = current_datarun[1 + size_lcn_nb + size_lcn_offset :]
# potential next datarun
run_header = unpack_from("<B", current_datarun, offset=0)[0]
# if we enter in the loop, it means that the file is
# fragmented or sparsed (empty VCN between clusters)
while (run_header != 0x0) and ((run_header & 0xF) < 0x4):
size_lcn_nb = run_header & 0xF
size_lcn_offset = run_header >> 4
# print(current_datarun)
lcn_length = unpack(
"<Q", current_datarun[1 : 1 + size_lcn_nb].ljust(8, b"\0")
)[0]
if lcn_length != 0x0:
lcn_offset = unpack(
"<Q",
current_datarun[
1 + size_lcn_nb : 1 + size_lcn_nb + size_lcn_offset
].ljust(8, b"\0"),
)[0]
# if it's a sparse file we continue to the next
# run because we don't care of this data.
if lcn_length == 0x0:
pass
# print("sparse file")
else:
# if not sparsed we add data
# if signed bit
if (lcn_offset >> 23) & 1 == 1:
lcn_offset = (
int(bin(lcn_offset)[2:].rjust(32, "1"), 2) % -0x100000000
)
data.append(
{"lcn_length": lcn_length, "lcn_offset": (prev_offset + lcn_offset)}
)
prev_offset = prev_offset + lcn_offset
current_datarun = current_datarun[1 + size_lcn_nb + size_lcn_offset :]
run_header = unpack_from("<B", current_datarun, offset=0)[0]
return data
def _analyze_attribute(self, attr_parsed: dict, raw_attr: bytes):
"""Analyze and decode an attribute
Args:
attr_parsed: parsed attribute dict
raw_attr: raw bytes attribute
"""
if attr_parsed["non_resident"]:
attribute = b""
else:
attribute = raw_attr[attr_parsed["value_offset"] :]
if attr_parsed["type"] == AT_STANDARD_INFORMATION:
si_info = self._standard_info_decode(
raw_attr[attr_parsed["value_offset"] :]
)
# not checked
if attr_parsed["type"] == AT_ATTRIBUTE_LIST:
if attr_parsed["non_resident"] == 0:
attr_list = self._attribute_list_decode(attribute)
else:
# TO FIX
# we can fall in this case if there's not enough place
# for data runs. (see: https://flatcap.github.io/linux-ntfs/ntfs/attributes/attribute_list.html)
# print(attr_parsed)
# print(raw_attr[attr_parsed['mapping_pairs_offset']:])
# print("Non-resident attribute list")
# exit()
pass
if attr_parsed["type"] == AT_FILE_NAME:
_file_name = self._file_name_decode(attribute)
if attr_parsed["type"] == AT_OBJECT_ID:
object_id = self._object_id_decode(attribute)
if attr_parsed["type"] == AT_VOLUME_NAME:
volume_name = self._volume_name_decode(attribute)
if attr_parsed["type"] == AT_DATA:
if attr_parsed["name"] != "":
pass
# print("AT_DATA ERROR")
# exit()
"""
If the attribute is resident, then data is stored in the attribute
"""
if attr_parsed["non_resident"] == 0:
data = raw_attr[
attr_parsed["value_offset"] : attr_parsed["value_offset"]
+ attr_parsed["value_length"]
]
self.record["raw_data"] = True
self.record["data"] = {
"size": attr_parsed["value_length"],
"raw_data": data.hex(),
}
"""
If the attribute is non-resident, then data is stored somewhere in memory,
we can know location based on dataruns stored at the end of the attribute.
"""
if attr_parsed["non_resident"] == 1:
"""
data_run structure :
- header : constructed of 1 byte (ex: 0x21)
-> header & 0xf = size of number of clusters
-> header >> 4 = offset to starting cluster number (LCN)
"""
if attr_parsed["allocated_size"] > 0:
# `mapping_pairs_offset` is the offset from attribute start of dataruns
data = self._data_runs_decode(
raw_attr[attr_parsed["mapping_pairs_offset"] :]
)
self.record["raw_data"] = False
self.record["data"] = {
"size": attr_parsed["data_size"],
"init_size": attr_parsed["initialized_size"],
"raw_data": data,
}
"""This function will parse attribute header of an MFT entry.
"""
def parse_attr_header(self):
"""Parse an attribute
"""
attrs_offset = self.mft_parsed["attrs_offset"]
# offset must be aligned on 8 bytes
if attrs_offset % 8:
print("Attribute misalignment")
self.attributes = []
while attrs_offset < 1024:
attr_parsed = {}
# used to know if it's a resident (b0) or non-resident (b1) attribute
attr_record = self.raw[attrs_offset:]
if unpack_from("<I", attr_record, offset=0)[0] == 0xFFFFFFFF:
# print("[?] Attributes end")
break
if unpack_from(ATTR_RECORD_T, attr_record)[2]:
buf = unpack_from(ATTR_RECORD_NON_RESIDENT, attr_record)
for (field, value) in zip(self._attr_nr_fields, buf):
attr_parsed.update({field: value})
else:
buf = unpack_from(ATTR_RECORD_RESIDENT, attr_record)
for (field, value) in zip(self._attr_r_fields, buf):
attr_parsed.update({field: value})
# if an attribute has a name
if attr_parsed["name_length"] > 0:
record_name = attr_record[
attr_parsed["name_offset"] : attr_parsed["name_offset"]
+ (attr_parsed["name_length"] * 2)
]
attr_parsed["name"] = record_name.decode("utf-16").encode()
else:
attr_parsed["name"] = ""
# analyze attribute type
self._analyze_attribute(attr_parsed, attr_record)
self.attributes.append(attr_parsed)
attrs_offset += attr_parsed["length"]
# maybe use this to avoid some calculus above
self.record["nb_record"] = self.mft_parsed["mft_record_number"]
"""Parse MFT header
"""
def parse_mft_header(self):
for (field, value) in zip(
self._mft_fields, unpack_from(MFT_RECORD_T, self.raw)
):
self.mft_parsed.update({field: value})
# check if it's a valid MFT entry
if self.mft_parsed["magic"] != 0x454C4946:
self.is_valid_entry = False
# print("Bad magic :", hex(self.mft_parsed["magic"]))
# print("Entry number : %d" % self.mft_parsed["mft_record_number"])
# print("[!] Bad MFT entry")
# exit()
__init__(self, header, ntfs, verbose)
special
¶
Initialize the MFT class
Parameters:
Name | Type | Description | Default |
---|---|---|---|
header |
bytes |
Header of the MFT |
required |
ntfs |
NTFS |
NTFS |
required |
verbose |
bool |
verbose |
required |
Source code in theforensicator/fs/ntfs.py
def __init__(self, header: bytes, ntfs: "NTFS", verbose: bool) -> None:
"""Initialize the MFT class
Args:
header: Header of the MFT
ntfs: NTFS
verbose: verbose
"""
self._mft_fields = [
"magic",
"usa_ofs",
"usa_count",
"lsn",
"sequence_number",
"link_count",
"attrs_offset",
"flags",
"bytes_in_use",
"bytes_allocated",
"base_mft_record",
"next_attr_instance",
"reserved",
"mft_record_number",
"record",
]
self._attr_r_fields = [
"type",
"length",
"non_resident",
"name_length",
"name_offset",
"flags",
"instance",
"value_length",
"value_offset",
"flags",
"reserved",
]
self._attr_nr_fields = [
"type",
"length",
"non_resident",
"name_length",
"name_offset",
"flags",
"instance",
"lowest_vcn",
"highest_vcn",
"mapping_pairs_offset",
"compression_unit",
"reserved",
"allocated_size",
"data_size",
"initialized_size",
"compressed_size",
]
self.raw = header
self.ntfs = ntfs
self.verbose = verbose
# mft header fields with their values
self.mft_parsed = {}
self.is_valid_entry = True
self.record = {"is_directory": False, "files": []}
parse_attr_header(self)
¶
Parse an attribute
Source code in theforensicator/fs/ntfs.py
def parse_attr_header(self):
"""Parse an attribute
"""
attrs_offset = self.mft_parsed["attrs_offset"]
# offset must be aligned on 8 bytes
if attrs_offset % 8:
print("Attribute misalignment")
self.attributes = []
while attrs_offset < 1024:
attr_parsed = {}
# used to know if it's a resident (b0) or non-resident (b1) attribute
attr_record = self.raw[attrs_offset:]
if unpack_from("<I", attr_record, offset=0)[0] == 0xFFFFFFFF:
# print("[?] Attributes end")
break
if unpack_from(ATTR_RECORD_T, attr_record)[2]:
buf = unpack_from(ATTR_RECORD_NON_RESIDENT, attr_record)
for (field, value) in zip(self._attr_nr_fields, buf):
attr_parsed.update({field: value})
else:
buf = unpack_from(ATTR_RECORD_RESIDENT, attr_record)
for (field, value) in zip(self._attr_r_fields, buf):
attr_parsed.update({field: value})
# if an attribute has a name
if attr_parsed["name_length"] > 0:
record_name = attr_record[
attr_parsed["name_offset"] : attr_parsed["name_offset"]
+ (attr_parsed["name_length"] * 2)
]
attr_parsed["name"] = record_name.decode("utf-16").encode()
else:
attr_parsed["name"] = ""
# analyze attribute type
self._analyze_attribute(attr_parsed, attr_record)
self.attributes.append(attr_parsed)
attrs_offset += attr_parsed["length"]
# maybe use this to avoid some calculus above
self.record["nb_record"] = self.mft_parsed["mft_record_number"]
NTFS
¶
NTFS class
Source code in theforensicator/fs/ntfs.py
class NTFS(object):
"""NTFS class"""
def __init__(self, ewf_image: "theforensicator.app.EWFImage", partition) -> None:
"""Initializes the NTFS object
Args:
ewf_image: EWF object we are based on
partition: Partition we will parse
"""
self.ewf_image = ewf_image
self.handle = self.ewf_image.handle
self.verbose = self.ewf_image.verbose
self.partition = partition
self._start = self.partition["first_lba"]
self._end = self.partition["last_lba"]
self.is_mft_dump = None
self.dump_mft = None
self.handle.seek(self._start * SECTOR_SIZE)
self.ntfs_header = NTFSHeader(
self._read_nsectors(0, SECTOR_NB(SECTOR_SIZE))
).ntfs_header
self.cluster_block_size = (
self.ntfs_header["bytes_per_sector"]
* self.ntfs_header["sectors_per_cluster"]
)
print("[+] NTFS partition at sector %#x" % (self._start))
if self.verbose:
pass
#self._pretty_print()
self.mft = {}
def _pretty_print(self):
"""Prints additionnal informations about the partition"""
for header_name in self.ntfs_header.keys():
if type(self.ntfs_header[header_name]) is bytes or str:
print("\t%-18s : %s" % (header_name, self.ntfs_header[header_name]))
else:
print("\t%-20s : %#x" % (header_name, self.ntfs_header[header_name]))
print("=" * 0x40)
def _read(self, offset: int, nb_bytes: int) -> bytes:
"""Reads data at a given offset
Args:
offset: Where we want to read
nb_bytes: Number of bytes we want to read
Returns:
The bytes we have read
"""
curr_off = self.handle.get_offset()
self.handle.seek(self._start * SECTOR_SIZE + offset)
buf = self.handle.read(nb_bytes)
self.handle.seek(curr_off)
return buf
def _read_sector(self, sector_idx: int) -> bytes:
"""Reads the given sector
Args:
sector_idx: Index of the sector we want to read
Returns:
The bytes we have read
"""
return self._read(sector_idx * SECTOR_SIZE, SECTOR_SIZE)
def _read_nsectors(self, sector_idx: int, nb_sector: int) -> bytes:
"""Reads the given sectors
Args:
sector_idx: Index of the first sector we want to read
nb_sector: Number of sectors we want to read
Returns:
The bytes we have read
"""
return self._read(sector_idx * SECTOR_SIZE, nb_sector * SECTOR_SIZE)
def _read_cluster(self, cluster_idx: int) -> bytes:
"""Reads a cluster
Args:
cluster_idx: Index of the cluster we want to read
Returns:
The bytes we have read
"""
return self._read(
cluster_idx * self.cluster_block_size, self.cluster_block_size
)
def _read_cluster_nbytes(self, cluster_idx: int, nb_bytes: int) -> bytes:
"""Reads some bytes from a cluster
Args:
cluster_idx: Index of the cluster we want to read
nb_bytes: Number of bytes to read
Returns:
The bytes we have read
"""
return self._read(cluster_idx * self.cluster_block_size, nb_bytes)
def _read_mft_entry(self, mft_entry_idx: int):
"""Reads a MFT entry
Args:
mft_entry_idx: Index of the mft entry we want to read
Returns:
The bytes we have read
"""
return self._read(
(self.mft_start * self.cluster_block_size)
+ (MFT_ENTRY_SIZE * mft_entry_idx),
MFT_ENTRY_SIZE,
)
def read_mft_entry(self, mft_entry_idx: int, verbose=False) -> bytes:
"""Reads a MFT entry
Args:
mft_entry_idx: Index of the mft entry we want to read
verbose: How much logs we want
Returns:
The bytes we have read
"""
mft_entry_raw = self._read_mft_entry(mft_entry_idx)
mft_entry = MFT(mft_entry_raw, self, verbose)
return mft_entry
def load_mft_dump(self, dump_file: str):
"""Load a MFT dump
Args:
dump_file: Path of the dump
"""
with open(dump_file, "r") as dmp_file:
self.dump_mft = json.loads(dmp_file.read())
dmp_file.close()
def analyze_ntfs_header(self, partition_idx: str, resolve_mft_file: str, clear_cache):
"""Analyze the NTFS header
Args:
out_file: Where to store the output
dump_file: Where the output has been stored in a previous run
resolve_mft_file: Where the resolved MFT in JSON format will be stored
"""
self.partition_idx = partition_idx
mft_dump_filepath = f"MFT{partition_idx}.dump"
if clear_cache:
if isfile(mft_dump_filepath):
unlink(mft_dump_filepath)
print("[+] Cache cleared.")
self.mft_start = self.ntfs_header["mft_lcn"]
print("[+] Loading and analyzing MFT ...")
if not isfile(mft_dump_filepath):
self.is_mft_dump = False
self.analyze_mft(mft_dump_filepath)
else:
print("[+] Found %s, loading cache file." % (mft_dump_filepath))
self.is_mft_dump = True
self.load_mft_dump(mft_dump_filepath)
print("[+] Cache file loaded.")
print("[+] MFT loaded ...")
self.resolve_mft(resolve_mft_file)
def _get_dump_mft_entry(self, idx: int):
"""Get a dump of the given mft entry
Args:
Index of the MFT entry to dump
"""
if self.is_mft_dump:
try:
return self.dump_mft["mft"][str(idx)]
except:
return None
try:
return self.dump_mft["mft"][idx]
except:
return None
def _resolve_path(self, mft_entry) -> list:
"""Resolve the path of the given mft entry
Args:
mft_entry: MFT entry to resolve
Returns:
The list of the possible paths of the MFT entry
"""
paths = []
# if it's a directory
if mft_entry["is_directory"]:
path = ""
parent_dir = mft_entry["parent_directory"]
path += mft_entry["directory_name"]
while parent_dir != FILE_root:
next_entry = self._get_dump_mft_entry(parent_dir)
if next_entry is None:
break
if next_entry["is_directory"]:
parent_dir = next_entry["parent_directory"]
path = f'{next_entry["directory_name"]}\\{path}'
else:
return [{"type": "ORPHAN_DIRECTORY", "directory_name": path}]
path = "C:\\" + path
paths.append({"type": "DIRECTORY", "directory_name": path})
else:
for file in mft_entry["files"]:
path = ""
parent_dir = file["parent_directory"]
path += file["file_name"]
is_valid = True
while parent_dir != FILE_root:
next_entry = self._get_dump_mft_entry(parent_dir)
if next_entry is None:
is_valid = False
break
if next_entry["is_directory"]:
parent_dir = next_entry["parent_directory"]
path = f'{next_entry["directory_name"]}\\{path}'
else:
return [{"type": "ORPHAN_FILE", "file_name": path}]
if not is_valid:
continue
path = "C:\\" + path
paths.append({"type": "FILE", "file_name": path})
return paths
def resolve_mft(self, json_outfile: str):
"""Resolve the MFT paths and save it to outfile
Args:
json_outfile: Where to save the output
"""
self.resolved_mft = {}
print("[+] Resolving paths from MFT ...")
for entry_idx in self.dump_mft["mft"].keys():
entry = self._get_dump_mft_entry(entry_idx)
path_infos = self._resolve_path(entry)
if path_infos:
obj_type = path_infos[0]["type"]
if obj_type in ["DIRECTORY", "ORPHAN_DIRECTORY"]:
self.resolved_mft[int(entry_idx)] = {
"type": obj_type,
"info": path_infos,
"dates": entry["dates"],
}
if obj_type in ["FILE", "ORPHAN_FILE"]:
# case not handled in AT_DATA attribute
data = None
if "data" in entry:
data = entry["data"]
else:
# need to fix this issue
pass
self.resolved_mft[int(entry_idx)] = {
"type": obj_type,
"info": path_infos,
"dates": entry["dates"],
"data": data,
}
print("[+] MFT paths resolved ...")
if json_outfile and type(json_outfile) is str:
with open(f"{json_outfile}.{self.partition_idx}", "w") as dmp:
dmp.write(json.dumps(self.resolved_mft))
dmp.close()
print("[+] %s successfully written." % (json_outfile))
def analyze_mft(self, out_file: str):
"""Analyze the MFT
Args:
out_file: Where to store the output
"""
print("[?] Analyzing MFT")
mft_entry_nb = -1
while True:
mft_entry_nb += 1
mft_file = self.read_mft_entry(mft_entry_nb, verbose=False)
# print(f"Reading MFT entry {mft_entry_nb}")
if mft_file.raw[0:4] == b"\x00" * 4:
continue
mft_file.parse_mft_header()
if not mft_file.is_valid_entry:
break
mft_file.parse_attr_header()
self.mft[mft_entry_nb] = mft_file.record
self.dump_mft = {
"disk_filename": self.ewf_image.filename,
"total_entries": mft_entry_nb,
"mft": self.mft,
}
with open(out_file, "w") as dmp_file:
dmp_file.write(json.dumps(self.dump_mft))
dmp_file.close()
def _dump_data(self, lcn_dict: dict) -> bytes:
raw_data = lcn_dict["raw_data"]
buf = b""
if lcn_dict["size"] == 0 and len(raw_data) == 0:
return b""
if type(raw_data) is str:
return bytes.fromhex(raw_data)
for lcn in raw_data:
for idx in range(lcn["lcn_length"]):
buf += self._read_cluster(lcn["lcn_offset"] + idx)
return buf[: lcn_dict["init_size"]]
def write_to_file(self, dump_dir, filename: str, data: bytes):
if dump_dir and type(dump_dir) is str:
out_filename = normpath(dump_dir + "/dump_" + filename.replace('\\', '_').replace(':', ''))
else:
out_filename = "./dump_" + filename.replace('\\', '_').replace(':', '')
with open(out_filename, "wb") as f:
f.write(data)
f.close()
print("[?] %s successfully dumped to %s." % (filename, out_filename))
def dump_file(self, filenames: list, dump_dir: str) -> bytes:
"""Dump a file using its filename
Args:
filenames: Filename of the file to dump
Returns:
The file content
"""
files_list_match = '(?:%s)' % '|'.join(filenames)
for key in self.resolved_mft:
obj_type = self.resolved_mft[key]["type"]
if obj_type not in ["FILE", "ORPHAN_FILE"]:
continue
info = self.resolved_mft[key]["info"]
for file in info:
if re.match(files_list_match, file["file_name"], flags=re.IGNORECASE):
data = self.resolved_mft[key]["data"]
if data:
self.write_to_file(
dump_dir,
file["file_name"],
self._dump_data(data)
)
def _analyze_registry(self):
print("[?] Analyzing registries")
def _analyze_winsec(self):
print("[?] Analyzing Windows Security")
__init__(self, ewf_image, partition)
special
¶
Initializes the NTFS object
Parameters:
Name | Type | Description | Default |
---|---|---|---|
ewf_image |
theforensicator.app.EWFImage |
EWF object we are based on |
required |
partition |
Partition we will parse |
required |
Source code in theforensicator/fs/ntfs.py
def __init__(self, ewf_image: "theforensicator.app.EWFImage", partition) -> None:
"""Initializes the NTFS object
Args:
ewf_image: EWF object we are based on
partition: Partition we will parse
"""
self.ewf_image = ewf_image
self.handle = self.ewf_image.handle
self.verbose = self.ewf_image.verbose
self.partition = partition
self._start = self.partition["first_lba"]
self._end = self.partition["last_lba"]
self.is_mft_dump = None
self.dump_mft = None
self.handle.seek(self._start * SECTOR_SIZE)
self.ntfs_header = NTFSHeader(
self._read_nsectors(0, SECTOR_NB(SECTOR_SIZE))
).ntfs_header
self.cluster_block_size = (
self.ntfs_header["bytes_per_sector"]
* self.ntfs_header["sectors_per_cluster"]
)
print("[+] NTFS partition at sector %#x" % (self._start))
if self.verbose:
pass
#self._pretty_print()
self.mft = {}
analyze_mft(self, out_file)
¶
Analyze the MFT
Parameters:
Name | Type | Description | Default |
---|---|---|---|
out_file |
str |
Where to store the output |
required |
Source code in theforensicator/fs/ntfs.py
def analyze_mft(self, out_file: str):
"""Analyze the MFT
Args:
out_file: Where to store the output
"""
print("[?] Analyzing MFT")
mft_entry_nb = -1
while True:
mft_entry_nb += 1
mft_file = self.read_mft_entry(mft_entry_nb, verbose=False)
# print(f"Reading MFT entry {mft_entry_nb}")
if mft_file.raw[0:4] == b"\x00" * 4:
continue
mft_file.parse_mft_header()
if not mft_file.is_valid_entry:
break
mft_file.parse_attr_header()
self.mft[mft_entry_nb] = mft_file.record
self.dump_mft = {
"disk_filename": self.ewf_image.filename,
"total_entries": mft_entry_nb,
"mft": self.mft,
}
with open(out_file, "w") as dmp_file:
dmp_file.write(json.dumps(self.dump_mft))
dmp_file.close()
analyze_ntfs_header(self, partition_idx, resolve_mft_file, clear_cache)
¶
Analyze the NTFS header
Parameters:
Name | Type | Description | Default |
---|---|---|---|
out_file |
Where to store the output |
required | |
dump_file |
Where the output has been stored in a previous run |
required | |
resolve_mft_file |
str |
Where the resolved MFT in JSON format will be stored |
required |
Source code in theforensicator/fs/ntfs.py
def analyze_ntfs_header(self, partition_idx: str, resolve_mft_file: str, clear_cache):
"""Analyze the NTFS header
Args:
out_file: Where to store the output
dump_file: Where the output has been stored in a previous run
resolve_mft_file: Where the resolved MFT in JSON format will be stored
"""
self.partition_idx = partition_idx
mft_dump_filepath = f"MFT{partition_idx}.dump"
if clear_cache:
if isfile(mft_dump_filepath):
unlink(mft_dump_filepath)
print("[+] Cache cleared.")
self.mft_start = self.ntfs_header["mft_lcn"]
print("[+] Loading and analyzing MFT ...")
if not isfile(mft_dump_filepath):
self.is_mft_dump = False
self.analyze_mft(mft_dump_filepath)
else:
print("[+] Found %s, loading cache file." % (mft_dump_filepath))
self.is_mft_dump = True
self.load_mft_dump(mft_dump_filepath)
print("[+] Cache file loaded.")
print("[+] MFT loaded ...")
self.resolve_mft(resolve_mft_file)
dump_file(self, filenames, dump_dir)
¶
Dump a file using its filename
Parameters:
Name | Type | Description | Default |
---|---|---|---|
filenames |
list |
Filename of the file to dump |
required |
Returns:
Type | Description |
---|---|
bytes |
The file content |
Source code in theforensicator/fs/ntfs.py
def dump_file(self, filenames: list, dump_dir: str) -> bytes:
"""Dump a file using its filename
Args:
filenames: Filename of the file to dump
Returns:
The file content
"""
files_list_match = '(?:%s)' % '|'.join(filenames)
for key in self.resolved_mft:
obj_type = self.resolved_mft[key]["type"]
if obj_type not in ["FILE", "ORPHAN_FILE"]:
continue
info = self.resolved_mft[key]["info"]
for file in info:
if re.match(files_list_match, file["file_name"], flags=re.IGNORECASE):
data = self.resolved_mft[key]["data"]
if data:
self.write_to_file(
dump_dir,
file["file_name"],
self._dump_data(data)
)
load_mft_dump(self, dump_file)
¶
Load a MFT dump
Parameters:
Name | Type | Description | Default |
---|---|---|---|
dump_file |
str |
Path of the dump |
required |
Source code in theforensicator/fs/ntfs.py
def load_mft_dump(self, dump_file: str):
"""Load a MFT dump
Args:
dump_file: Path of the dump
"""
with open(dump_file, "r") as dmp_file:
self.dump_mft = json.loads(dmp_file.read())
dmp_file.close()
read_mft_entry(self, mft_entry_idx, verbose=False)
¶
Reads a MFT entry
Parameters:
Name | Type | Description | Default |
---|---|---|---|
mft_entry_idx |
int |
Index of the mft entry we want to read |
required |
verbose |
How much logs we want |
False |
Returns:
Type | Description |
---|---|
bytes |
The bytes we have read |
Source code in theforensicator/fs/ntfs.py
def read_mft_entry(self, mft_entry_idx: int, verbose=False) -> bytes:
"""Reads a MFT entry
Args:
mft_entry_idx: Index of the mft entry we want to read
verbose: How much logs we want
Returns:
The bytes we have read
"""
mft_entry_raw = self._read_mft_entry(mft_entry_idx)
mft_entry = MFT(mft_entry_raw, self, verbose)
return mft_entry
resolve_mft(self, json_outfile)
¶
Resolve the MFT paths and save it to outfile
Parameters:
Name | Type | Description | Default |
---|---|---|---|
json_outfile |
str |
Where to save the output |
required |
Source code in theforensicator/fs/ntfs.py
def resolve_mft(self, json_outfile: str):
"""Resolve the MFT paths and save it to outfile
Args:
json_outfile: Where to save the output
"""
self.resolved_mft = {}
print("[+] Resolving paths from MFT ...")
for entry_idx in self.dump_mft["mft"].keys():
entry = self._get_dump_mft_entry(entry_idx)
path_infos = self._resolve_path(entry)
if path_infos:
obj_type = path_infos[0]["type"]
if obj_type in ["DIRECTORY", "ORPHAN_DIRECTORY"]:
self.resolved_mft[int(entry_idx)] = {
"type": obj_type,
"info": path_infos,
"dates": entry["dates"],
}
if obj_type in ["FILE", "ORPHAN_FILE"]:
# case not handled in AT_DATA attribute
data = None
if "data" in entry:
data = entry["data"]
else:
# need to fix this issue
pass
self.resolved_mft[int(entry_idx)] = {
"type": obj_type,
"info": path_infos,
"dates": entry["dates"],
"data": data,
}
print("[+] MFT paths resolved ...")
if json_outfile and type(json_outfile) is str:
with open(f"{json_outfile}.{self.partition_idx}", "w") as dmp:
dmp.write(json.dumps(self.resolved_mft))
dmp.close()
print("[+] %s successfully written." % (json_outfile))
NTFSHeader
¶
NTFS Header
Source code in theforensicator/fs/ntfs.py
class NTFSHeader(object):
"""NTFS Header"""
def __init__(self, header: bytes) -> None:
"""Initialize the NTFSHeader class
Args:
header: Bytes of the header
"""
self._fields = [
"jump",
"oem_id",
"bytes_per_sector",
"sectors_per_cluster",
"reserved_sectors",
"fats",
"root_entries",
"sectors",
"media_type",
"sectors_per_fat",
"sectors_per_track",
"heads",
"hidden_sectors",
"large_sectors",
"unused",
"number_of_sectors",
"mft_lcn",
"mftmirr_lcn",
"clusters_per_mft_record",
"reserved0",
"clusters_per_index_record",
"reserved1",
"volume_serial_number",
"checksum",
"bootstrap",
"end_of_sector_marker",
]
self.header = header
self.ntfs_header = {}
for (field, value) in zip(
self._fields, unpack(NTFS_BOOT_SECTOR_T, self.header)
):
if field == "bootstrap":
self.ntfs_header[field] = value.hex()
else:
self.ntfs_header.update({field: value})
__init__(self, header)
special
¶
Initialize the NTFSHeader class
Parameters:
Name | Type | Description | Default |
---|---|---|---|
header |
bytes |
Bytes of the header |
required |
Source code in theforensicator/fs/ntfs.py
def __init__(self, header: bytes) -> None:
"""Initialize the NTFSHeader class
Args:
header: Bytes of the header
"""
self._fields = [
"jump",
"oem_id",
"bytes_per_sector",
"sectors_per_cluster",
"reserved_sectors",
"fats",
"root_entries",
"sectors",
"media_type",
"sectors_per_fat",
"sectors_per_track",
"heads",
"hidden_sectors",
"large_sectors",
"unused",
"number_of_sectors",
"mft_lcn",
"mftmirr_lcn",
"clusters_per_mft_record",
"reserved0",
"clusters_per_index_record",
"reserved1",
"volume_serial_number",
"checksum",
"bootstrap",
"end_of_sector_marker",
]
self.header = header
self.ntfs_header = {}
for (field, value) in zip(
self._fields, unpack(NTFS_BOOT_SECTOR_T, self.header)
):
if field == "bootstrap":
self.ntfs_header[field] = value.hex()
else:
self.ntfs_header.update({field: value})