Skip to content

modules

Top-level package for TheForensicator.

app

Main module.

EWFImage

Object that reads the content of the EWF file and parses the content

Source code in theforensicator/app.py
class EWFImage(object):
    """Object that reads the content of the EWF file and parses the content"""

    def __init__(self, filename: str) -> None:
        """Initialize the object with default values and the given filename

        Args:
            filename: The filename of the file to parse
        """
        self.filename = filename
        self.handle = None
        self.verbose = True
        self.ntfs_partitions = []
        self.mft_dump_location = None
        self.out_file_location = None

    def __enter__(self) -> None:
        """Open a handle on EWF files and read the content. Called when we enter
        a `with` block
        """

        try:
            import pyewf

            self.handle = pyewf.handle()
            self.handle.open(pyewf.glob(self.filename))
        except ModuleNotFoundError:
            print("[!]\tCould not load pyewf, using python implementation...")
            from .ewf import Ewf

            self.handle = Ewf(self.filename)

        return self

    def _read_int(self, offset: int) -> int:
        """Reads an Integer at the given offset

        Args:
            offset: Where we want to read

        Returns:
            The value that has been read (as a int)

        Raises:
            ValueError: If offset is out of bounds
        """
        curr_off = self.handle.get_offset()
        buf = self.handle.read_buffer_at_offset(UINT32, offset)
        self.handle.seek(curr_off)
        return unpack("<I", buf)[0]

    def _read_bytes(self, offset: int, nb_bytes: int) -> bytes:
        """Reads some bytes at the given offset

        Args:
            offset: Where we want to read

        Returns:
            The bytes that has been read

        Raises:
            ValueError: If offset is out of bounds
        """
        curr_off = self.handle.get_offset()
        buf = self.handle.read_buffer_at_offset(nb_bytes, offset)
        self.handle.seek(curr_off)
        return buf

    def _is_mbr_partition(self) -> bool:
        """Check if the beginning of the disk matches a MBR magic number

        Returns:
            True if it is a MBR partition table

        Raises:
            ValueError: If the disk size is 0
        """
        return self._read_int(0) == MBR_MAGIC

    def _get_partitions(self):
        """Parses the partition table"""
        self.mbr = MBR(self)
        self.mbr_partitions = self.mbr.mbr_partitions

        self.gpt = GPT(self)
        self.gpt_partitions = self.gpt.gpt_partitions

    def _read_sector(self, nb_sector: int) -> bytes:
        """Read the given sector

        Args:
            nb_sector: Index of the sector to read

        Returns:
            The content of the sector

        Raises:
            ValueError: If we try to read out of bounds
        """
        return self._read_bytes(nb_sector * 512, SECTOR_SIZE)

    def _read_int_at_sector_offset(self, nb_sector: int, offset: int):
        """Read an int at a given offset in the given sector

        Args:
            nb_sector: Index of the sector to read
            offset: The offset where we want to read within the sector

        Returns:
            The int we wanted to read

        Raises:
            ValueError: If we try to read out of bounds
        """
        return self._read_int((nb_sector * 512) + offset)

    def _find_ntfs_partitions(self):
        """Retrieve all the NTFS partitions (_get_partitions needs to be called
        before this function)
        """
        for partition in self.gpt_partitions:
            magic = self._read_int_at_sector_offset(partition["first_lba"], 0)
            if magic == NTFS_MAGIC:
                self.ntfs_partitions.append(NTFS(self, partition))

    def read_ewf(self):
        """Read the EWF file, and parse the partition tables"""

        if not self._is_mbr_partition():
            print("[!] No MBR partition found, exiting...")
            exit(-1)

        print("[+] MBR partition found.")

        self._get_partitions()
        self._find_ntfs_partitions()

    def analyze_ntfs(self, resolve_mft_file: str, clear_cache):
        """Analyze the NTFS partitions to extract the wanted files

        Args:
            resolve_mft_file: Output file of resolved MFT in JSON format
        """
        for (part_idx, partition) in enumerate(self.ntfs_partitions):
            partition.analyze_ntfs_header(part_idx, resolve_mft_file, clear_cache)

    def dump_file(self, filenames: list, dump_dir: str):
        for partition in self.ntfs_partitions:
            partition.dump_file(filenames, dump_dir)

    def __exit__(self, exception_type, exception_value, exception_traceback):
        """Close and clean everything. Called when we exit a `with` block."""
        pass

__enter__(self) special

Open a handle on EWF files and read the content. Called when we enter a with block

Source code in theforensicator/app.py
def __enter__(self) -> None:
    """Open a handle on EWF files and read the content. Called when we enter
    a `with` block
    """

    try:
        import pyewf

        self.handle = pyewf.handle()
        self.handle.open(pyewf.glob(self.filename))
    except ModuleNotFoundError:
        print("[!]\tCould not load pyewf, using python implementation...")
        from .ewf import Ewf

        self.handle = Ewf(self.filename)

    return self

__exit__(self, exception_type, exception_value, exception_traceback) special

Close and clean everything. Called when we exit a with block.

Source code in theforensicator/app.py
def __exit__(self, exception_type, exception_value, exception_traceback):
    """Close and clean everything. Called when we exit a `with` block."""
    pass

__init__(self, filename) special

Initialize the object with default values and the given filename

Parameters:

Name Type Description Default
filename str

The filename of the file to parse

required
Source code in theforensicator/app.py
def __init__(self, filename: str) -> None:
    """Initialize the object with default values and the given filename

    Args:
        filename: The filename of the file to parse
    """
    self.filename = filename
    self.handle = None
    self.verbose = True
    self.ntfs_partitions = []
    self.mft_dump_location = None
    self.out_file_location = None

analyze_ntfs(self, resolve_mft_file, clear_cache)

Analyze the NTFS partitions to extract the wanted files

Parameters:

Name Type Description Default
resolve_mft_file str

Output file of resolved MFT in JSON format

required
Source code in theforensicator/app.py
def analyze_ntfs(self, resolve_mft_file: str, clear_cache):
    """Analyze the NTFS partitions to extract the wanted files

    Args:
        resolve_mft_file: Output file of resolved MFT in JSON format
    """
    for (part_idx, partition) in enumerate(self.ntfs_partitions):
        partition.analyze_ntfs_header(part_idx, resolve_mft_file, clear_cache)

read_ewf(self)

Read the EWF file, and parse the partition tables

Source code in theforensicator/app.py
def read_ewf(self):
    """Read the EWF file, and parse the partition tables"""

    if not self._is_mbr_partition():
        print("[!] No MBR partition found, exiting...")
        exit(-1)

    print("[+] MBR partition found.")

    self._get_partitions()
    self._find_ntfs_partitions()

cli

Console script for theforensicator.

cmd(ewf_file, dump_dir=None, resolve_mft_file=None, dmp_file=None, clear_cache=None, extract_artefacts=False)

Parses a EWF file and dump interesting files found in the windows file system

Parameters:

Name Type Description Default
ewf_file str

File that will be analysed (*.E01)

required
dump_dir str

Directory location to store dumped data (default location is current execution directory)

None
resolve_mft_file str

Output file where to store MFT files / directories in JSON format.

None
dmp_file str

Filename to dump from the disk (ex: "C:\Windows\System32\cmd.exe")

None
extract_artefacts bool

Extract automatically artefacts (Registry hives, ...) to dump_dir

False
Source code in theforensicator/cli.py
def cmd(ewf_file: str, dump_dir: str = None, resolve_mft_file: str = None, dmp_file: str = None, clear_cache: str = None, extract_artefacts: bool = False):
    """Parses a EWF file and dump interesting files found in the windows file
    system

    Args:
        ewf_file: File that will be analysed (*.E01)
        dump_dir: Directory location to store dumped data (default location is current execution directory)
        resolve_mft_file: Output file where to store MFT files / directories in JSON format.
        dmp_file: Filename to dump from the disk (ex: "C:\\Windows\\System32\\cmd.exe")
        extract_artefacts: Extract automatically artefacts (Registry hives, ...) to dump_dir
    """
    with EWFImage(ewf_file) as ewf:
        ewf.read_ewf()

        if dmp_file:
            if type(dmp_file) is not str:
                print("[?] --dump-file is empty, you must enter a valid filename.")
                exit()

        ewf.analyze_ntfs(resolve_mft_file, clear_cache)

        if dmp_file:
            ewf.dump_file([dmp_file], dump_dir)

        if extract_artefacts:
            artefact_files = glob.glob(f"{dirname(__file__)}/artefacts/*.yaml")

            for artefact in artefact_files:
                with open(artefact, "r") as _artefact:
                    data = yaml.safe_load(_artefact.read())

                    if data is None:
                        continue

                    if dump_dir and type(dump_dir) is str:
                        out_dir = normpath(f"{dump_dir}/{data['dirname']}")
                    else:
                        out_dir = normpath(f"./{data['dirname']}")

                    if not exists(out_dir):
                        Path(out_dir).mkdir(parents=True, exist_ok=True)

                    ewf.dump_file(data["files"], normpath(out_dir))
                    _artefact.close()

ewf special

file_parsing

minimal EWF "driver" in pure Python Laurent Clevy (@lorenzo2472)

reference document : https://github.com/libyal/libewf/blob/master/documentation/Expert%20Witness%20Compression%20Format%20%28EWF%29.asciidoc tested with FTK imager 4.3 and Ewfacquire

Ewf

Source code in theforensicator/ewf/file_parsing.py
class Ewf:
    S_HEADER = Struct("<8sBHH")
    NT_HEADER = namedtuple("header", "signature one segment_num zero")
    assert S_HEADER.size == 13

    S_SECTION = Struct("<16sQQ40sL")
    NT_SECTION = namedtuple("section", "stype next_offset size padding checksum")
    assert S_SECTION.size == 76

    S_DISK = Struct("<LLLLL20s45s5sL")
    assert S_DISK.size == 94
    NT_DISK = namedtuple(
        "disk",
        "one chunk_count sectors_per_chunk bytes_per_sector sector_count reserved padding signature checksum",
    )

    S_VOLUME = Struct("<LLLLL")
    NT_VOLUME = namedtuple(
        "volume", "reserved chunk_count sectors_per_chunk bytes_per_sector sector_count"
    )

    S_TABLE_HEADER = Struct("<L4sQ4sL")
    assert S_TABLE_HEADER.size == 24
    NT_TABLE_HEADER = namedtuple("table_header", "entry_count pad1 base pad2 checksum")

    S_DIGEST = Struct("<16s20s40sL")
    assert S_DIGEST.size == 80
    NT_DIGEST = namedtuple("digest", "md5 sha1 padding checksum")

    S_HASH = Struct("<16s16sL")
    assert S_HASH.size == 36
    NT_HASH = namedtuple("digest", "md5 unknown checksum")

    S_DATA = Struct("<B3sLLLQLLLB3sL4sLB3sL4s16s963s5sL")
    assert S_DATA.size == 1052
    NT_DATA = namedtuple(
        "data",
        "media_type unk1 chunk_count sectors_per_chunk bytes_per_sector sector_count cylinders heads sectors media_flags unk2 PALM_volume unk3 smart_logs compr_level unk4 errors unk5 guid unk6 signature checksum",
    )

    SECTION_HEADER = b"header\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00"
    SECTION_HEADER2 = b"header2\x00\x00\x00\x00\x00\x00\x00\x00\x00"
    SECTION_DATA = b"data\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00"
    SECTION_DISK = b"disk\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00"
    SECTION_VOLUME = b"volume\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00"
    SECTION_SECTORS = b"sectors\x00\x00\x00\x00\x00\x00\x00\x00\x00"
    SECTION_TABLE = b"table\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00"
    SECTION_TABLE2 = b"table2\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00"
    SECTION_DIGEST = b"digest\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00"
    SECTION_HASH = b"hash\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00"
    EVF_SIGNATURE = b"EVF\t\r\n\xff\x00"

    def __init__(self, filename, checksums=False, verbose=0):
        self.chunks = dict()  # list of chunks pointers per segment
        self.uncompressed = (
            dict()
        )  # keep track of uncompressed chunks by storing their offset in the segment

        if PurePath(filename).suffix == ".E01":
            filenames = sorted(
                Path(filename).parent.glob(Path(filename).name[:-2] + "??")
            )
            # print( filenames )
            self.current_segment = None  # for seek()
            self.current_chunk_num = 0
            self.ptr_in_current_chunk = 0
            self.current_chunk_data = None
            self.total_chunk_count = 0
            self.checksums = checksums
            self.verbose = verbose

            # data per segment
            self.filedesc = dict()
            self.filename = dict()
            self.hashes = dict()  # to store md5 and sha1
            self.end_of_sectors = (
                dict()
            )  # to known how many bytes to read for last compressed chunk of the segment
            # self.sectors_offset = dict()
            for filename in filenames:
                self.parse_segment(filename)

            self.chunk_range = dict()
            start_chunk = 0
            self.last_sector_in_last_chunk = (
                self.total_chunk_count * self.sectors_per_chunk
            ) - self.sector_count
            # print('self.last_sector_in_last_chunk %x' % self.last_sector_in_last_chunk)
            for i in range(1, self.last_segment + 1):
                end_chunk = start_chunk + len(self.chunks[i]) - 1
                self.chunk_range[i] = (
                    start_chunk,
                    end_chunk,
                )  # determine chunk number range per segment
                start_chunk = end_chunk + 1

            self.seek(0)  # init "file" pointer to 0

        else:
            print("unsupported format")
            sys.exit()

    def parse_header(self, section_nt):
        header_data = self.filedesc[self.last_segment].read(section_nt.size)
        self.header_string = decompress(header_data)
        # FTK imager : b'1\nmain\nc\tn\ta\te\tt\tav\tov\tm\tu\tp\tr\n \t \tuntitled\t \t \tADI4.3.0.18\tWin 201x\t2020 9 23 10 11 36\t2020 9 23 10 11 36\t0\tf\n'
        # Ewfacquire : b'1\r\nmain\r\nc\tn\ta\te\tt\tav\tov\tm\tu\tp\r\n\t\t\t\t\t20180403\tLinux\t2020 2 6 15 4 33\t2020 2 6 15 4 33\t0\r\n\r\n'
        if self.verbose > 1:
            print(self.header_string)

    def parse_tables(self, section_nt):
        data = self.filedesc[self.last_segment].read(section_nt.size)
        table_header_nt = Ewf.NT_TABLE_HEADER(*Ewf.S_TABLE_HEADER.unpack_from(data, 0))
        if self.verbose > 1:
            print(table_header_nt)
        # print('%x %x' % (Ewf.S_SECTION.size+Ewf.S_TABLE_HEADER.size+table_header_nt.entry_count*4, section_nt.size ) )
        offset = Ewf.S_TABLE_HEADER.size
        for i in range(table_header_nt.entry_count):
            ptr = (
                Struct("<L").unpack_from(data, offset + i * 4)[0] & 0x7FFFFFFF
            )  # most significant bit is compression status
            ptr += table_header_nt.base
            if (
                Struct("<L").unpack_from(data, offset + i * 4)[0] & 0x80000000 == 0
            ):  # most chunks are compressed (bit is set), so we stores uncompressed ptr only
                self.uncompressed[self.last_segment].add(ptr)
            self.chunks[self.last_segment].add(ptr)

        if self.checksums:
            end_of_table = Ewf.S_TABLE_HEADER.size + table_header_nt.entry_count * 4
            if (
                adler32(data[Ewf.S_TABLE_HEADER.size : end_of_table])
                != Struct("<L").unpack_from(data, end_of_table)[0]
            ):
                print("checksum error (table)")

    def parse_part(self, section_nt, file):
        if section_nt.stype == Ewf.SECTION_HEADER:
            self.parse_header(section_nt)
        elif section_nt.stype == Ewf.SECTION_HEADER2:
            data = file.read(section_nt.size)
            # print( decompress( data ).decode('utf16') )
        elif section_nt.stype == Ewf.SECTION_VOLUME:
            data = file.read(section_nt.size)
            volume_nt = Ewf.NT_VOLUME(*Ewf.S_VOLUME.unpack_from(data, 0))
            if self.verbose > 1:
                print(volume_nt)
            self.chunk_count = volume_nt.chunk_count
            self.sectors_per_chunk = volume_nt.sectors_per_chunk
            self.bytes_per_sector = volume_nt.bytes_per_sector
            self.sector_count = volume_nt.sector_count
            self.chunk_size = (
                volume_nt.sectors_per_chunk * volume_nt.bytes_per_sector
            )  # constant
        elif section_nt.stype == Ewf.SECTION_DISK:
            data = file.read(section_nt.size)
            # print(hexlify(data))
            disk_nt = Ewf.NT_DISK(*Ewf.S_DISK.unpack_from(data, 0))
            self.chunk_count = disk_nt.chunk_count
            self.sectors_per_chunk = disk_nt.sectors_per_chunk
            self.bytes_per_sector = disk_nt.bytes_per_sector
            self.sector_count = disk_nt.sector_count
            self.chunk_size = (
                disk_nt.sectors_per_chunk * disk_nt.bytes_per_sector
            )  # constant
            if self.verbose > 1:
                print(disk_nt)
        elif section_nt.stype == Ewf.SECTION_SECTORS:
            # self.sectors_offset[ self.last_segment ] = section_offset #will be used by next table/table2 section
            # print('self.sectors_offset[ self.last_segment ] %x' % self.sectors_offset[ self.last_segment ])
            self.end_of_sectors[self.last_segment] = (
                file.tell() - Ewf.S_SECTION.size + section_nt.size
            )  # end of 'sectors' section, for last 'sectors' section
        elif (
            section_nt.stype == Ewf.SECTION_TABLE
            or section_nt.stype == Ewf.SECTION_TABLE2
        ):
            self.parse_tables(section_nt)
        elif section_nt.stype == Ewf.SECTION_DIGEST:
            data = file.read(section_nt.size)
            digest_nt = Ewf.NT_DIGEST(*Ewf.S_DIGEST.unpack_from(data, 0))
            self.hashes["md5"] = digest_nt.md5
            self.hashes["sha1"] = digest_nt.sha1
            # print( digest_nt )
        elif section_nt.stype == Ewf.SECTION_HASH:
            data = file.read(section_nt.size)
            hash_nt = Ewf.NT_HASH(*Ewf.S_HASH.unpack_from(data, 0))
            self.hashes["md5"] = hash_nt.md5
            # print( hash_nt )
        elif section_nt.stype == Ewf.SECTION_DATA:
            data = file.read(section_nt.size)

    def parse_segment(self, filename):
        if self.verbose > 0:
            print(filename)
        file = open(filename, "rb")
        # parse EVF header
        data = file.read(Ewf.S_HEADER.size)
        header_nt = Ewf.NT_HEADER(*Ewf.S_HEADER.unpack_from(data, 0))
        assert (
            header_nt.one == 1
            and header_nt.zero == 0
            and header_nt.signature == Ewf.EVF_SIGNATURE
        )
        self.chunks[header_nt.segment_num] = set()
        self.uncompressed[header_nt.segment_num] = set()
        self.last_segment = header_nt.segment_num
        self.filedesc[header_nt.segment_num] = file
        self.filename[header_nt.segment_num] = filename
        if self.verbose > 0:
            print(header_nt)

        data = file.read(Ewf.S_SECTION.size)
        section_nt = Ewf.NT_SECTION(*Ewf.S_SECTION.unpack_from(data, 0))
        if self.verbose > 0:
            print(
                "0x%08x: type:%8s next:%x size:%x"
                % (
                    file.tell(),
                    section_nt.stype,
                    section_nt.next_offset,
                    section_nt.size,
                )
            )
        if self.checksums:
            computed_sum = adler32(data[:-4])
            if section_nt.checksum != computed_sum:
                print(
                    "checksum file:%08x != computed:%08x"
                    % (section_nt.checksum, computed_sum)
                )

        previous_next = 0
        if section_nt.stype == Ewf.SECTION_HEADER:
            self.parse_header(section_nt)
        elif section_nt.stype == Ewf.SECTION_DATA:
            data = file.read(section_nt.size)

        while previous_next != section_nt.next_offset:
            file.seek(section_nt.next_offset)
            section_offset = file.tell()
            previous_next = section_nt.next_offset
            data = file.read(Ewf.S_SECTION.size)
            section_nt = Ewf.NT_SECTION(*Ewf.S_SECTION.unpack_from(data, 0))
            if self.verbose > 0:
                print(
                    "0x%08x: type:%8s next:%x size:%x"
                    % (
                        section_offset,
                        section_nt.stype,
                        section_nt.next_offset,
                        section_nt.size,
                    )
                )
            if self.checksums:
                computed_sum = adler32(data[:-4])
                if section_nt.checksum != computed_sum:
                    print(
                        "checksum file:%08x != computed:%08x"
                        % (section_nt.checksum, computed_sum)
                    )

            self.parse_part(section_nt, file)

        self.chunks[header_nt.segment_num] = array.array(
            "L", sorted(self.chunks[header_nt.segment_num])
        )  # convert the set in array
        self.total_chunk_count += len(self.chunks[header_nt.segment_num])

    def display_properties(self):
        print(
            "chunk_count:0x%x, sectors_per_chunk:0x%x, bytes_per_sector:0x%x, sector_count:0x%x"
            % (
                self.chunk_count,
                self.sectors_per_chunk,
                self.bytes_per_sector,
                self.sector_count,
            )
        )
        # print('last_segment: %d' % self.last_segment)
        if "sha1" in self.hashes:
            print("sha1: %s" % (hexlify(self.hashes["sha1"])))
        print("md5: %s" % (hexlify(self.hashes["md5"])))
        if self.verbose > 0:
            for segment in range(1, self.last_segment + 1):
                print("segment #%d, filename: %s" % (segment, self.filename[segment]))
                print(
                    "  chunks count: %d (including uncompressed:%d, %.2f%%)"
                    % (
                        len(self.chunks[segment]),
                        len(self.uncompressed[segment]),
                        len(self.uncompressed[segment])
                        * 100
                        / len(self.chunks[segment]),
                    )
                )
                print(
                    "  data offsets: first:0x%x last:0x%x"
                    % (self.chunks[segment][0], self.chunks[segment][-1])
                )
                print("  absolute chunk number ranges", self.chunk_range[segment])
                print("  end_of_sectors: 0x%x" % self.end_of_sectors[segment])

    def compute_offset(self, offset):  # offset in bytes, multiple of 512
        if offset > self.sector_count * self.bytes_per_sector or offset < 0:
            raise ValueError("Offset out of bounds")
            return

        num_chunk = offset // self.chunk_size
        # print('num_chunk %d' % num_chunk)
        if num_chunk >= self.total_chunk_count:
            print("error num_chunk >= self.chunk_count")
            return

        # locate the segment
        segment = 1
        while (
            self.chunk_range[segment][0] > num_chunk
            or num_chunk > self.chunk_range[segment][1]
            and segment < self.last_segment
        ):
            segment += 1
        # locate the chunk
        chunk_num_in_segment = (
            num_chunk - self.chunk_range[segment][0]
        )  # relative chunk number (in segment), instead of absolute (in dump)
        return (
            segment,
            chunk_num_in_segment,
            offset % self.chunk_size,
        )  # return segment, index in self.chunks[ segment ] and ptr in chunk

    def seek(self, offset):
        segment, num_chunk_in_segment, ptr_in_chunk = self.compute_offset(offset)
        if (
            self.current_chunk_num != num_chunk_in_segment
            or self.current_segment != segment
        ):  # read new chunk if needed
            self.current_chunk_data = self.read_chunk(segment, num_chunk_in_segment)
            self.current_chunk_num = num_chunk_in_segment
            self.current_segment = segment
        self.ptr_in_current_chunk = ptr_in_chunk

    # allow to iterate chunk number inside segment and over different segments
    def next_chunk_num(self, segment, relative_chunk_num):
        if relative_chunk_num + 1 < len(
            self.chunks[segment]
        ):  # not the last chunk of the segment
            return segment, relative_chunk_num + 1
        else:
            if segment + 1 <= self.last_segment:  # must go to next segment
                return segment + 1, 0
            else:
                print(
                    "next_chunk_num error: segment %d, relative_chunk_num %d"
                    % (segment, relative_chunk_num)
                )

    def tell(self):
        chunks = 0
        for seg in range(1, self.current_segment):
            chunks += len(self.chunks[seg])  # count chunks in segment < current_segment
        chunks += self.current_chunk_num  # chunks from start of current segment
        offset = chunks * self.chunk_size + self.ptr_in_current_chunk
        return offset

    def get_offset(self):
        return self.tell()

    def read(self, size):  # emulate read() in a file system
        data = b""
        # print('%d %d' % (self.current_segment, self.current_chunk_num))
        if self.current_chunk_data is None:  # no chunk in cache yet
            self.current_chunk_data = self.read_chunk(
                self.current_segment, self.current_chunk_num
            )
            self.ptr_in_current_chunk = 0
        while size > 0:
            if (
                self.chunk_size - self.ptr_in_current_chunk >= size
            ):  # last read in current chunk
                data += self.current_chunk_data[
                    self.ptr_in_current_chunk : self.ptr_in_current_chunk + size
                ]
                self.ptr_in_current_chunk = self.ptr_in_current_chunk + size
                size = 0
            else:  # will need to read another chunk
                data += self.current_chunk_data[
                    self.ptr_in_current_chunk :
                ]  # read end of current chunk
                size -= self.chunk_size - self.ptr_in_current_chunk
                self.ptr_in_current_chunk = self.chunk_size
                if self.current_segment < self.last_segment or (
                    self.current_segment == self.last_segment
                    and self.current_chunk_num + 1
                    < len(self.chunks[self.current_segment])
                ):  # next chunk does exist
                    self.current_segment, self.current_chunk_num = self.next_chunk_num(
                        self.current_segment, self.current_chunk_num
                    )
                    self.current_chunk_data = self.read_chunk(
                        self.current_segment, self.current_chunk_num
                    )  # read next chunk
                    self.ptr_in_current_chunk = 0
                else:
                    # print('short read: self.current_segment %d, self.current_chunk_num %d' % (self.current_segment, self.current_chunk_num) )
                    return data
        return data

    def read_buffer_at_offset(self, nb_bytes: int, offset: int):
        self.seek(offset)
        return self.read(nb_bytes)

    def read_chunk(self, segment, chunk):  # number of chunk in segment
        # print('segment %d, chunk %d' % (segment, chunk))
        if chunk >= len(self.chunks[segment]) or chunk < 0:
            print("read_chunk: chunk number. segment %d chunk %d" % (segment, chunk))
            raise IndexError
        start_offset = self.chunks[segment][chunk]

        # seek
        self.filedesc[segment].seek(start_offset)  # seek in file segment

        # read
        if start_offset in self.uncompressed[segment]:
            data = self.filedesc[segment].read(self.chunk_size)  # without adler32
        else:
            if start_offset == self.chunks[segment][-1]:  # last chunk in segment
                end_offset = self.end_of_sectors[segment]
            else:
                end_offset = self.chunks[segment][chunk + 1]
            # print('start_offset %x end_offset %x ' % (start_offset, end_offset ) )
            compressed = self.filedesc[segment].read(
                end_offset - start_offset
            )  # compressed data includes adler32
            data = decompress(compressed)
        """if segment==3 and chunk==5026:
      printHex(data)"""
        return data

    def compute_image_hash(self, md):  # accessing chunk directly
        for segment in range(1, self.last_segment + 1):
            for chunk in range(len(self.chunks[segment])):
                data = self.read_chunk(segment, chunk)
                md.update(data)
        return md.digest()
NT_DATA (tuple)

data(media_type, unk1, chunk_count, sectors_per_chunk, bytes_per_sector, sector_count, cylinders, heads, sectors, media_flags, unk2, PALM_volume, unk3, smart_logs, compr_level, unk4, errors, unk5, guid, unk6, signature, checksum)

__getnewargs__(self) special

Return self as a plain tuple. Used by copy and pickle.

Source code in theforensicator/ewf/file_parsing.py
def __getnewargs__(self):
    'Return self as a plain tuple.  Used by copy and pickle.'
    return _tuple(self)
__new__(_cls, media_type, unk1, chunk_count, sectors_per_chunk, bytes_per_sector, sector_count, cylinders, heads, sectors, media_flags, unk2, PALM_volume, unk3, smart_logs, compr_level, unk4, errors, unk5, guid, unk6, signature, checksum) special staticmethod

Create new instance of data(media_type, unk1, chunk_count, sectors_per_chunk, bytes_per_sector, sector_count, cylinders, heads, sectors, media_flags, unk2, PALM_volume, unk3, smart_logs, compr_level, unk4, errors, unk5, guid, unk6, signature, checksum)

__repr__(self) special

Return a nicely formatted representation string

Source code in theforensicator/ewf/file_parsing.py
def __repr__(self):
    'Return a nicely formatted representation string'
    return self.__class__.__name__ + repr_fmt % self
NT_DIGEST (tuple)

digest(md5, sha1, padding, checksum)

__getnewargs__(self) special

Return self as a plain tuple. Used by copy and pickle.

Source code in theforensicator/ewf/file_parsing.py
def __getnewargs__(self):
    'Return self as a plain tuple.  Used by copy and pickle.'
    return _tuple(self)
__new__(_cls, md5, sha1, padding, checksum) special staticmethod

Create new instance of digest(md5, sha1, padding, checksum)

__repr__(self) special

Return a nicely formatted representation string

Source code in theforensicator/ewf/file_parsing.py
def __repr__(self):
    'Return a nicely formatted representation string'
    return self.__class__.__name__ + repr_fmt % self
NT_DISK (tuple)

disk(one, chunk_count, sectors_per_chunk, bytes_per_sector, sector_count, reserved, padding, signature, checksum)

__getnewargs__(self) special

Return self as a plain tuple. Used by copy and pickle.

Source code in theforensicator/ewf/file_parsing.py
def __getnewargs__(self):
    'Return self as a plain tuple.  Used by copy and pickle.'
    return _tuple(self)
__new__(_cls, one, chunk_count, sectors_per_chunk, bytes_per_sector, sector_count, reserved, padding, signature, checksum) special staticmethod

Create new instance of disk(one, chunk_count, sectors_per_chunk, bytes_per_sector, sector_count, reserved, padding, signature, checksum)

__repr__(self) special

Return a nicely formatted representation string

Source code in theforensicator/ewf/file_parsing.py
def __repr__(self):
    'Return a nicely formatted representation string'
    return self.__class__.__name__ + repr_fmt % self
NT_HASH (tuple)

digest(md5, unknown, checksum)

__getnewargs__(self) special

Return self as a plain tuple. Used by copy and pickle.

Source code in theforensicator/ewf/file_parsing.py
def __getnewargs__(self):
    'Return self as a plain tuple.  Used by copy and pickle.'
    return _tuple(self)
__new__(_cls, md5, unknown, checksum) special staticmethod

Create new instance of digest(md5, unknown, checksum)

__repr__(self) special

Return a nicely formatted representation string

Source code in theforensicator/ewf/file_parsing.py
def __repr__(self):
    'Return a nicely formatted representation string'
    return self.__class__.__name__ + repr_fmt % self
NT_HEADER (tuple)

header(signature, one, segment_num, zero)

__getnewargs__(self) special

Return self as a plain tuple. Used by copy and pickle.

Source code in theforensicator/ewf/file_parsing.py
def __getnewargs__(self):
    'Return self as a plain tuple.  Used by copy and pickle.'
    return _tuple(self)
__new__(_cls, signature, one, segment_num, zero) special staticmethod

Create new instance of header(signature, one, segment_num, zero)

__repr__(self) special

Return a nicely formatted representation string

Source code in theforensicator/ewf/file_parsing.py
def __repr__(self):
    'Return a nicely formatted representation string'
    return self.__class__.__name__ + repr_fmt % self
NT_SECTION (tuple)

section(stype, next_offset, size, padding, checksum)

__getnewargs__(self) special

Return self as a plain tuple. Used by copy and pickle.

Source code in theforensicator/ewf/file_parsing.py
def __getnewargs__(self):
    'Return self as a plain tuple.  Used by copy and pickle.'
    return _tuple(self)
__new__(_cls, stype, next_offset, size, padding, checksum) special staticmethod

Create new instance of section(stype, next_offset, size, padding, checksum)

__repr__(self) special

Return a nicely formatted representation string

Source code in theforensicator/ewf/file_parsing.py
def __repr__(self):
    'Return a nicely formatted representation string'
    return self.__class__.__name__ + repr_fmt % self
NT_TABLE_HEADER (tuple)

table_header(entry_count, pad1, base, pad2, checksum)

__getnewargs__(self) special

Return self as a plain tuple. Used by copy and pickle.

Source code in theforensicator/ewf/file_parsing.py
def __getnewargs__(self):
    'Return self as a plain tuple.  Used by copy and pickle.'
    return _tuple(self)
__new__(_cls, entry_count, pad1, base, pad2, checksum) special staticmethod

Create new instance of table_header(entry_count, pad1, base, pad2, checksum)

__repr__(self) special

Return a nicely formatted representation string

Source code in theforensicator/ewf/file_parsing.py
def __repr__(self):
    'Return a nicely formatted representation string'
    return self.__class__.__name__ + repr_fmt % self
NT_VOLUME (tuple)

volume(reserved, chunk_count, sectors_per_chunk, bytes_per_sector, sector_count)

__getnewargs__(self) special

Return self as a plain tuple. Used by copy and pickle.

Source code in theforensicator/ewf/file_parsing.py
def __getnewargs__(self):
    'Return self as a plain tuple.  Used by copy and pickle.'
    return _tuple(self)
__new__(_cls, reserved, chunk_count, sectors_per_chunk, bytes_per_sector, sector_count) special staticmethod

Create new instance of volume(reserved, chunk_count, sectors_per_chunk, bytes_per_sector, sector_count)

__repr__(self) special

Return a nicely formatted representation string

Source code in theforensicator/ewf/file_parsing.py
def __repr__(self):
    'Return a nicely formatted representation string'
    return self.__class__.__name__ + repr_fmt % self

fs special

gpt

Parser for GPT

GPT

MBR Partition Table parser

Source code in theforensicator/fs/gpt.py
class GPT(object):
    """MBR Partition Table parser"""

    GPT_HEADER_SIGNATURE = 0x5452415020494645

    def __init__(self, ewf_image: "theforensicator.app.EWFImage") -> None:
        """Initialize the MBR object

        Args:
            ewf_image: The EWFImage object used as a base
        """
        self.handle = ewf_image.handle
        self.verbose = ewf_image.verbose
        self.gpt = {}

        self._read_gpt()

        if self.verbose:
            self._print_gpt_info()

    def _read_gpt(self):
        """Reads the GPT partition table"""
        self.gpt_header = self.read_gpt_header()
        self.gpt_partitions = self.read_gpt_partitions(lba_size=512)

    def read_gpt_header(self) -> bytes:
        offset = self.handle.get_offset()
        self.handle.seek(512)

        gpt_header = self.handle.read(512)

        self.gpt["signature"] = unpack_from("<Q", gpt_header, offset=0)[0]

        if self.gpt["signature"] != GPT.GPT_HEADER_SIGNATURE:
            print("[!] Failed to read GPT header, wrong signature %#x found." % self.mbr["signature"])
            exit(-1)

        self.gpt["revision"] = unpack_from("<I", gpt_header, offset=8)[0]
        self.gpt["header_size"] = unpack_from("<I", gpt_header, offset=12)[0]
        self.gpt["header_crc32"] = unpack_from("<I", gpt_header, offset=16)[0]
        self.gpt["reserved1"] = unpack_from("<I", gpt_header, offset=20)[0]
        self.gpt["my_lba"] = unpack_from("<Q", gpt_header, offset=24)[0]
        self.gpt["alternate_lba"] = unpack_from("<Q", gpt_header, offset=32)[0]
        self.gpt["first_usable_lba"] = unpack_from("<Q", gpt_header, offset=40)[0]
        self.gpt["last_usable_lba"] = unpack_from("<Q", gpt_header, offset=48)[0]
        self.gpt["disk_guid"] = "%08X-%04X-%04X-%04X-%s" % (
            unpack_from("<I", gpt_header, offset=56)[0],
            unpack_from("<H", gpt_header, offset=60)[0],
            unpack_from("<H", gpt_header, offset=62)[0],
            unpack_from("<H", gpt_header, offset=64)[0],
            unpack_from("<8s", gpt_header, offset=66)[0].hex().upper()
        )
        self.gpt["partition_entry_lba"] = unpack_from("<Q", gpt_header, offset=72)[0]
        self.gpt["num_partition_entries"] = unpack_from("<I", gpt_header, offset=80)[0]
        self.gpt["sizeof_partition_entry"] = unpack_from("<I", gpt_header, offset=84)[0]
        self.gpt["partition_entry_array_crc32"] = unpack_from("<I", gpt_header, offset=88)[0]

        self.handle.seek(offset)
        return gpt_header

    def read_gpt_partitions(self, lba_size=512):
        offset = self.handle.get_offset()

        partition_entry_lba = self.gpt["partition_entry_lba"]
        self.handle.seek(partition_entry_lba * lba_size)

        gpt_partitions = []

        for entry_idx in range(self.gpt["num_partition_entries"]):
            entry = self.handle.read(self.gpt["sizeof_partition_entry"])

            partition_entry = {}
            partition_entry["partition_type_guid"] = "%08X-%04X-%04X-%04X-%s" % (
                unpack_from("<I", entry, offset=0)[0],
                unpack_from("<H", entry, offset=4)[0],
                unpack_from("<H", entry, offset=6)[0],
                unpack_from(">H", entry, offset=8)[0],
                unpack_from("<6s", entry, offset=10)[0].hex().upper()
            )
            partition_entry["unique_partition_guid"] = "%08X-%04X-%04X-%04X-%s" % (
                unpack_from("<I", entry, offset=16)[0],
                unpack_from("<H", entry, offset=20)[0],
                unpack_from("<H", entry, offset=22)[0],
                unpack_from(">H", entry, offset=24)[0],
                unpack_from("<6s", entry, offset=26)[0].hex().upper()
            )
            partition_entry["first_lba"] = unpack_from("<Q", entry, offset=32)[0]
            partition_entry["last_lba"] = unpack_from("<Q", entry, offset=40)[0]

            # Determine last entry
            if not partition_entry["first_lba"] and not partition_entry["last_lba"]:
                break

            gpt_partitions.append(partition_entry)

        self.handle.seek(offset)
        return gpt_partitions

    def _print_gpt_info(self):
        """Prints the informations from the GPT partition table"""
        print("GPT INFOS")
        print("=" * 89)
        print("  Index  Type" + ' '*30 + "    Offset Start (Sectors)    Length (Sectors)")
        print("-------  ----" + '-'*30 + "  ------------------------  ------------------")

        for (i, partition) in enumerate(self.gpt_partitions):
            print(("%7d  %-34s" + "  %24d  %18d") % (
                i, 
                PARTITION_TYPE_GUID[partition["partition_type_guid"]],
                partition["first_lba"],
                (partition["last_lba"] - partition["first_lba"] + 1)
            ))

        print("=" * 89)
__init__(self, ewf_image) special

Initialize the MBR object

Parameters:

Name Type Description Default
ewf_image theforensicator.app.EWFImage

The EWFImage object used as a base

required
Source code in theforensicator/fs/gpt.py
def __init__(self, ewf_image: "theforensicator.app.EWFImage") -> None:
    """Initialize the MBR object

    Args:
        ewf_image: The EWFImage object used as a base
    """
    self.handle = ewf_image.handle
    self.verbose = ewf_image.verbose
    self.gpt = {}

    self._read_gpt()

    if self.verbose:
        self._print_gpt_info()

mbr

Parser for MBR

MBR

MBR Partition Table parser

Source code in theforensicator/fs/mbr.py
class MBR(object):
    """MBR Partition Table parser"""

    MSDOS_MBR_SIGNATURE = 0xaa55
    EFI_PMBR_OSTYPE_EFI = 0xEF
    EFI_PMBR_OSTYPE_EFI_GPT = 0xEE

    def __init__(self, ewf_image: "theforensicator.app.EWFImage"):
        """Initialize the MBR object

        Args:
            ewf_image: The EWFImage object used as a base
        """
        self.handle = ewf_image.handle
        self.verbose = ewf_image.verbose
        self.mbr = {
            "partition_records" : []
        }

        self._read_mbr()

        # not very useful
        if self.verbose:
            pass
            #self._print_mbr_info()

    def _read_mbr(self):
        """Reads the MBR partition table"""
        self.mbr_header = self.read_mbr_header()
        self.mbr_partitions = self.mbr["partition_records"]

    def read_mbr_header(self):
        offset = self.handle.get_offset()
        mbr_header = self.handle.read(512)

        # https://elixir.bootlin.com/linux/latest/source/block/partitions/efi.h

        self.mbr["signature"] = unpack_from("<H", mbr_header, offset=510)[0]

        if self.mbr["signature"] != MBR.MSDOS_MBR_SIGNATURE:
            print("[!] Failed to read MBR header, wrong signature %#x found." % self.mbr["signature"])
            exit(-1)

        self.mbr["boot_code"] = unpack_from("<440s", mbr_header, offset=0)[0]
        self.mbr["unique_mbr_signature"] = unpack_from("<I", mbr_header, offset=440)[0]
        self.mbr["unknown"] = unpack_from("<H", mbr_header, offset=444)[0]

        for pt_record_nb in range(4):
            partition_record = {}

            partition_record["boot_indicator"] = unpack_from("<B", mbr_header, offset=446 + (pt_record_nb * 16))[0]
            partition_record["start_head"]  = unpack_from("<B", mbr_header, offset=446 + (pt_record_nb * 16) + 1)[0]
            partition_record["start_sector"]  = unpack_from("<B", mbr_header, offset=446 + (pt_record_nb * 16) + 2)[0]
            partition_record["start_track"]  = unpack_from("<B", mbr_header, offset=446 + (pt_record_nb * 16) + 3)[0]
            partition_record["os_type"]  = unpack_from("<B", mbr_header, offset=446 + (pt_record_nb * 16) + 4)[0]
            partition_record["end_head"]  = unpack_from("<B", mbr_header, offset=446 + (pt_record_nb * 16) + 5)[0]
            partition_record["end_sector"]  = unpack_from("<B", mbr_header, offset=446 + (pt_record_nb * 16) + 6)[0]
            partition_record["end_track"]  = unpack_from("<B", mbr_header, offset=446 + (pt_record_nb * 16) + 7)[0]
            partition_record["starting_lba"]  = unpack_from("<I", mbr_header, offset=446 + (pt_record_nb * 16) + 8)[0]
            partition_record["size_in_lba"]  = unpack_from("<I", mbr_header, offset=446 + (pt_record_nb * 16) + 12)[0]

            self.mbr["partition_records"].append(partition_record)

        self.handle.seek(offset)

    def _print_mbr_info(self):
        """Prints the informations from the MBR partition table"""
        print("=" * 0x40)
        print("MBR INFOS")

        for (i, partition) in enumerate(self.mbr_partitions):
            print("=" * 0x40)
            print("Partition record %d" % i)
            print("=" * 0x40)

            for key in partition.keys():
                print("\t%-16s : 0x%X" % (key, partition[key]))

        print("=" * 0x40)
__init__(self, ewf_image) special

Initialize the MBR object

Parameters:

Name Type Description Default
ewf_image theforensicator.app.EWFImage

The EWFImage object used as a base

required
Source code in theforensicator/fs/mbr.py
def __init__(self, ewf_image: "theforensicator.app.EWFImage"):
    """Initialize the MBR object

    Args:
        ewf_image: The EWFImage object used as a base
    """
    self.handle = ewf_image.handle
    self.verbose = ewf_image.verbose
    self.mbr = {
        "partition_records" : []
    }

    self._read_mbr()

    # not very useful
    if self.verbose:
        pass
        #self._print_mbr_info()

ntfs

Parser for NTFS

MFT

MFT class

Source code in theforensicator/fs/ntfs.py
class MFT(object):
    """MFT class"""

    def __init__(self, header: bytes, ntfs: "NTFS", verbose: bool) -> None:
        """Initialize the MFT class

        Args:
            header: Header of the MFT
            ntfs: NTFS
            verbose: verbose
        """
        self._mft_fields = [
            "magic",
            "usa_ofs",
            "usa_count",
            "lsn",
            "sequence_number",
            "link_count",
            "attrs_offset",
            "flags",
            "bytes_in_use",
            "bytes_allocated",
            "base_mft_record",
            "next_attr_instance",
            "reserved",
            "mft_record_number",
            "record",
        ]

        self._attr_r_fields = [
            "type",
            "length",
            "non_resident",
            "name_length",
            "name_offset",
            "flags",
            "instance",
            "value_length",
            "value_offset",
            "flags",
            "reserved",
        ]

        self._attr_nr_fields = [
            "type",
            "length",
            "non_resident",
            "name_length",
            "name_offset",
            "flags",
            "instance",
            "lowest_vcn",
            "highest_vcn",
            "mapping_pairs_offset",
            "compression_unit",
            "reserved",
            "allocated_size",
            "data_size",
            "initialized_size",
            "compressed_size",
        ]

        self.raw = header
        self.ntfs = ntfs
        self.verbose = verbose

        # mft header fields with their values
        self.mft_parsed = {}
        self.is_valid_entry = True
        self.record = {"is_directory": False, "files": []}

    def _get_datetime(self, windows_time: int) -> dict:
        """Convert windows time to datetime

        Args:
            windows_time: Time to convert

        Returns:
            Time in a dict
        """
        seconds = windows_time / 10000000
        epoch = seconds - 11644473600

        if epoch < 0:
            epoch = 0

        dt = datetime.datetime(2000, 1, 1, 0, 0, 0).fromtimestamp(epoch)
        return {"timestamp": epoch, "date": f"{dt.ctime()}"}

    """Attribute type : (0x10) STANDARD_INFORMATION.
    """

    def _standard_info_decode(self, attribute: bytes):
        """Decode STANDARD_INFORMATION attribute

        Args:
            attribute: Raw attribute to decode

        Returns:
            The parsed attribute
        """
        # not complete but at this time we don't need more
        si_info = {}

        si_info["creation_time"] = self._get_datetime(
            unpack_from("<Q", attribute, offset=0x0)[0]
        )
        si_info["last_data_change_time"] = self._get_datetime(
            unpack_from("<Q", attribute, offset=0x8)[0]
        )
        si_info["last_mft_change_time"] = self._get_datetime(
            unpack_from("<Q", attribute, offset=0x10)[0]
        )
        si_info["last_access_time"] = self._get_datetime(
            unpack_from("<Q", attribute, offset=0x18)[0]
        )
        si_info["file_attributes"] = unpack_from("<I", attribute, offset=0x20)[0]

        if self.verbose:
            print(
                "-> Created : %s\n-> Last data change : %s\n-> Last MFT change : %s\n-> Last access : %s\n-> Flags : %d"
                % (
                    si_info["creation_time"]["date"],
                    si_info["last_data_change_time"]["date"],
                    si_info["last_mft_change_time"]["date"],
                    si_info["last_access_time"]["date"],
                    si_info["file_attributes"],
                )
            )

        return si_info

    """Attribute type : (0x20) ATTR_LIST_ENTRY.
    """

    def _attribute_list_decode(self, attribute: bytes) -> dict:
        """Decode ATTR_LIST_ENTRY attribute

        Args:
            attribute: Raw attribute to decode

        Returns:
            The parsed attribute
        """
        attr_list = {}

        attr_list["type"] = unpack_from("<I", attribute, offset=0)[0]
        attr_list["length"] = unpack_from("<H", attribute, offset=4)[0]
        attr_list["name_length"] = unpack_from("<B", attribute, offset=6)[0]
        attr_list["name_offset"] = unpack_from("<B", attribute, offset=7)[0]
        attr_list["lowest_vcn"] = unpack_from("<Q", attribute, offset=8)[0]
        attr_list["mft_reference"] = unpack_from("<Q", attribute, offset=16)[0]
        attr_list["instance"] = unpack_from("<H", attribute, offset=24)[0]
        attr_list["name"] = unpack_from(
            f"<{attr_list['name_length'] * 2}s", attribute, offset=26
        )[0]

        return attr_list

    """Attribute type : (0x30) FILE_NAME_ATTR
    A file can be an archive, the flags field tells us if it's the case.
    """

    def _file_name_decode(self, attribute: bytes) -> dict:
        """Decode FILE_NAME_ATTR attribute

        Args:
            attribute: Raw attribute to decode

        Returns:
            The parsed attribute
        """
        _file_name = {}

        # for now there's no check on sequence number, maybe after
        # it's used to know either the file is allocated, deleted or orphan
        # https://usermanual.wiki/Pdf/WpNtOrphanFilesEnUs.1012197800.pdf
        parent_dir = unpack_from("<Q", attribute, offset=0x0)[0]
        _file_name["parent_directory"] = parent_dir & 0xFFFFFFFFFFFF
        _file_name["seq_num"] = parent_dir >> 0x30
        _file_name["creation_time"] = self._get_datetime(
            unpack_from("<Q", attribute, offset=0x8)[0]
        )
        _file_name["last_data_change_time"] = self._get_datetime(
            unpack_from("<Q", attribute, offset=0x10)[0]
        )
        _file_name["last_mft_change_time"] = self._get_datetime(
            unpack_from("<Q", attribute, offset=0x18)[0]
        )
        _file_name["last_access_time"] = self._get_datetime(
            unpack_from("<Q", attribute, offset=0x20)[0]
        )
        _file_name["allocated_size"] = unpack_from("<Q", attribute, offset=0x28)[0]
        _file_name["data_size"] = unpack_from("<Q", attribute, offset=0x30)[0]
        _file_name["file_attributes"] = unpack_from("<I", attribute, offset=0x38)[0]
        # some are missing because not useful at this time
        _file_name["file_name_length"] = unpack_from("<B", attribute, offset=0x40)[0]
        _file_name["file_name_type"] = unpack_from("<B", attribute, offset=0x41)[0]
        _file_name["file_name"] = unpack_from(
            f"<{_file_name['file_name_length'] * 2}s", attribute, offset=0x42
        )[0].decode("utf-16")

        self.record["dates"] = {
            "creation_time": _file_name["creation_time"],
            "last_data_change_time": _file_name["last_data_change_time"],
            "last_mft_change_time": _file_name["last_mft_change_time"],
            "last_access_time": _file_name["last_access_time"],
        }

        if _file_name["file_attributes"] & DIRECTORY == DIRECTORY:
            self.record["is_directory"] = True
            self.record["directory_name"] = _file_name["file_name"]
            self.record["parent_directory"] = _file_name["parent_directory"]
            self.record["seq_num"] = _file_name["seq_num"]

        else:
            if (_file_name["file_attributes"] & ARCHIVE) == ARCHIVE:
                _file_name["is_archive"] = True

            if (_file_name["file_attributes"] & COMPRESSED) == COMPRESSED:
                pass
                # print("COMPRESSED FILE FOUND !")
                # exit()

            self.record["files"].append(_file_name)

        if self.verbose:
            print("Filename record")
            print(
                "-> Parent directory : %d"
                % (_file_name["parent_directory"] & 0xFFFFFFFFFFFF)
            )
            print("-> Name : %s" % (_file_name["file_name"]))
            print("-> Creation : %s" % (_file_name["creation_time"]["date"]))
            print(
                "-> Last data change : %s"
                % (_file_name["last_data_change_time"]["date"])
            )
            print(
                "-> Last MFT change : %s" % (_file_name["last_mft_change_time"]["date"])
            )
            print("-> Last access : %s" % (_file_name["last_access_time"]["date"]))
            print("-> Size : %d" % _file_name["data_size"])
            print("-> Flags : %#x" % _file_name["file_attributes"])

        return _file_name

    """Attribute type : (0x40) OBJECT_ID_ATTR
    """

    def _object_id_decode(self, attribute: bytes) -> dict:
        """Decode OBJECT_ID_ATTR attribute

        Args:
            attribute: Raw attribute to decode

        Returns:
            The parsed attribute
        """
        _object_id = {}

        _object_id["data1"] = unpack_from("<I", attribute, offset=0x0)[0]
        _object_id["data2"] = unpack_from("<H", attribute, offset=0x4)[0]
        _object_id["data3"] = unpack_from("<H", attribute, offset=0x6)[0]
        _object_id["data4"] = unpack_from("<8s", attribute, offset=0x8)[0]

        #         guid = "%08x-%04x-%04x-%s" % (
        #             _object_id["data1"],
        #             _object_id["data2"],
        #             _object_id["data3"],
        #             _object_id["data4"].hex(),
        #         )

        # print("GUID : %s" % guid)

        return _object_id

    """Attribute type : (0x60) VOLUME_NAME
    """

    def _volume_name_decode(self, attribute: bytes):
        # _volume_name = {}
        pass

    """Attribute type : (0x80) DATA
    """

    def _data_runs_decode(self, dataruns: bytes) -> list:
        """Decode DATA attribute

        Args:
            dataruns: dataruns list

        Returns:
            Data of the MFT entry
        """
        current_datarun = dataruns
        run_header = unpack_from("<B", current_datarun, offset=0)[0]

        data = []

        size_lcn_nb = run_header & 0xF
        size_lcn_offset = run_header >> 4

        lcn_length = unpack("<Q", current_datarun[1 : 1 + size_lcn_nb].ljust(8, b"\0"))[
            0
        ]
        lcn_offset = unpack(
            "<Q",
            current_datarun[1 + size_lcn_nb : 1 + size_lcn_nb + size_lcn_offset].ljust(
                8, b"\0"
            ),
        )[0]

        if lcn_length == 0x0:
            print("ERROR SPARSE FILE !")
            exit()

        # used for relative offset
        prev_offset = lcn_offset

        data.append({"lcn_length": lcn_length, "lcn_offset": lcn_offset})

        current_datarun = current_datarun[1 + size_lcn_nb + size_lcn_offset :]

        # potential next datarun
        run_header = unpack_from("<B", current_datarun, offset=0)[0]

        # if we enter in the loop, it means that the file is
        # fragmented or sparsed (empty VCN between clusters)
        while (run_header != 0x0) and ((run_header & 0xF) < 0x4):
            size_lcn_nb = run_header & 0xF
            size_lcn_offset = run_header >> 4

            # print(current_datarun)

            lcn_length = unpack(
                "<Q", current_datarun[1 : 1 + size_lcn_nb].ljust(8, b"\0")
            )[0]

            if lcn_length != 0x0:
                lcn_offset = unpack(
                    "<Q",
                    current_datarun[
                        1 + size_lcn_nb : 1 + size_lcn_nb + size_lcn_offset
                    ].ljust(8, b"\0"),
                )[0]

            # if it's a sparse file we continue to the next
            # run because we don't care of this data.
            if lcn_length == 0x0:
                pass
                # print("sparse file")
            else:
                # if not sparsed we add data

                # if signed bit
                if (lcn_offset >> 23) & 1 == 1:
                    lcn_offset = (
                        int(bin(lcn_offset)[2:].rjust(32, "1"), 2) % -0x100000000
                    )

                data.append(
                    {"lcn_length": lcn_length, "lcn_offset": (prev_offset + lcn_offset)}
                )

            prev_offset = prev_offset + lcn_offset
            current_datarun = current_datarun[1 + size_lcn_nb + size_lcn_offset :]
            run_header = unpack_from("<B", current_datarun, offset=0)[0]

        return data

    def _analyze_attribute(self, attr_parsed: dict, raw_attr: bytes):
        """Analyze and decode an attribute

        Args:
            attr_parsed: parsed attribute dict
            raw_attr: raw bytes attribute
        """
        if attr_parsed["non_resident"]:
            attribute = b""
        else:
            attribute = raw_attr[attr_parsed["value_offset"] :]

        if attr_parsed["type"] == AT_STANDARD_INFORMATION:
            si_info = self._standard_info_decode(
                raw_attr[attr_parsed["value_offset"] :]
            )

        # not checked
        if attr_parsed["type"] == AT_ATTRIBUTE_LIST:
            if attr_parsed["non_resident"] == 0:
                attr_list = self._attribute_list_decode(attribute)
            else:
                # TO FIX
                # we can fall in this case if there's not enough place
                # for data runs. (see: https://flatcap.github.io/linux-ntfs/ntfs/attributes/attribute_list.html)
                # print(attr_parsed)
                # print(raw_attr[attr_parsed['mapping_pairs_offset']:])
                # print("Non-resident attribute list")
                # exit()
                pass

        if attr_parsed["type"] == AT_FILE_NAME:
            _file_name = self._file_name_decode(attribute)

        if attr_parsed["type"] == AT_OBJECT_ID:
            object_id = self._object_id_decode(attribute)

        if attr_parsed["type"] == AT_VOLUME_NAME:
            volume_name = self._volume_name_decode(attribute)

        if attr_parsed["type"] == AT_DATA:
            if attr_parsed["name"] != "":
                pass
                # print("AT_DATA ERROR")
                # exit()

            """
            If the attribute is resident, then data is stored in the attribute
            """
            if attr_parsed["non_resident"] == 0:
                data = raw_attr[
                    attr_parsed["value_offset"] : attr_parsed["value_offset"]
                    + attr_parsed["value_length"]
                ]
                self.record["raw_data"] = True
                self.record["data"] = {
                    "size": attr_parsed["value_length"],
                    "raw_data": data.hex(),
                }

            """
            If the attribute is non-resident, then data is stored somewhere in memory,
            we can know location based on dataruns stored at the end of the attribute.
            """
            if attr_parsed["non_resident"] == 1:
                """
                data_run structure :
                    - header : constructed of 1 byte (ex: 0x21)
                        -> header & 0xf = size of number of clusters
                        -> header >> 4  = offset to starting cluster number (LCN)
                """

                if attr_parsed["allocated_size"] > 0:
                    # `mapping_pairs_offset` is the offset from attribute start of dataruns
                    data = self._data_runs_decode(
                        raw_attr[attr_parsed["mapping_pairs_offset"] :]
                    )

                    self.record["raw_data"] = False
                    self.record["data"] = {
                        "size": attr_parsed["data_size"],
                        "init_size": attr_parsed["initialized_size"],
                        "raw_data": data,
                    }

    """This function will parse attribute header of an MFT entry.
    """

    def parse_attr_header(self):
        """Parse an attribute
        """
        attrs_offset = self.mft_parsed["attrs_offset"]

        # offset must be aligned on 8 bytes
        if attrs_offset % 8:
            print("Attribute misalignment")

        self.attributes = []

        while attrs_offset < 1024:
            attr_parsed = {}

            # used to know if it's a resident (b0) or non-resident (b1) attribute
            attr_record = self.raw[attrs_offset:]

            if unpack_from("<I", attr_record, offset=0)[0] == 0xFFFFFFFF:
                # print("[?] Attributes end")
                break

            if unpack_from(ATTR_RECORD_T, attr_record)[2]:
                buf = unpack_from(ATTR_RECORD_NON_RESIDENT, attr_record)
                for (field, value) in zip(self._attr_nr_fields, buf):
                    attr_parsed.update({field: value})
            else:
                buf = unpack_from(ATTR_RECORD_RESIDENT, attr_record)
                for (field, value) in zip(self._attr_r_fields, buf):
                    attr_parsed.update({field: value})

            # if an attribute has a name
            if attr_parsed["name_length"] > 0:
                record_name = attr_record[
                    attr_parsed["name_offset"] : attr_parsed["name_offset"]
                    + (attr_parsed["name_length"] * 2)
                ]
                attr_parsed["name"] = record_name.decode("utf-16").encode()
            else:
                attr_parsed["name"] = ""

            # analyze attribute type
            self._analyze_attribute(attr_parsed, attr_record)

            self.attributes.append(attr_parsed)

            attrs_offset += attr_parsed["length"]

        # maybe use this to avoid some calculus above
        self.record["nb_record"] = self.mft_parsed["mft_record_number"]

    """Parse MFT header
    """

    def parse_mft_header(self):
        for (field, value) in zip(
            self._mft_fields, unpack_from(MFT_RECORD_T, self.raw)
        ):
            self.mft_parsed.update({field: value})

        # check if it's a valid MFT entry
        if self.mft_parsed["magic"] != 0x454C4946:
            self.is_valid_entry = False
            # print("Bad magic :", hex(self.mft_parsed["magic"]))
            # print("Entry number : %d" % self.mft_parsed["mft_record_number"])
            # print("[!] Bad MFT entry")
            # exit()
__init__(self, header, ntfs, verbose) special

Initialize the MFT class

Parameters:

Name Type Description Default
header bytes

Header of the MFT

required
ntfs NTFS

NTFS

required
verbose bool

verbose

required
Source code in theforensicator/fs/ntfs.py
def __init__(self, header: bytes, ntfs: "NTFS", verbose: bool) -> None:
    """Initialize the MFT class

    Args:
        header: Header of the MFT
        ntfs: NTFS
        verbose: verbose
    """
    self._mft_fields = [
        "magic",
        "usa_ofs",
        "usa_count",
        "lsn",
        "sequence_number",
        "link_count",
        "attrs_offset",
        "flags",
        "bytes_in_use",
        "bytes_allocated",
        "base_mft_record",
        "next_attr_instance",
        "reserved",
        "mft_record_number",
        "record",
    ]

    self._attr_r_fields = [
        "type",
        "length",
        "non_resident",
        "name_length",
        "name_offset",
        "flags",
        "instance",
        "value_length",
        "value_offset",
        "flags",
        "reserved",
    ]

    self._attr_nr_fields = [
        "type",
        "length",
        "non_resident",
        "name_length",
        "name_offset",
        "flags",
        "instance",
        "lowest_vcn",
        "highest_vcn",
        "mapping_pairs_offset",
        "compression_unit",
        "reserved",
        "allocated_size",
        "data_size",
        "initialized_size",
        "compressed_size",
    ]

    self.raw = header
    self.ntfs = ntfs
    self.verbose = verbose

    # mft header fields with their values
    self.mft_parsed = {}
    self.is_valid_entry = True
    self.record = {"is_directory": False, "files": []}
parse_attr_header(self)

Parse an attribute

Source code in theforensicator/fs/ntfs.py
def parse_attr_header(self):
    """Parse an attribute
    """
    attrs_offset = self.mft_parsed["attrs_offset"]

    # offset must be aligned on 8 bytes
    if attrs_offset % 8:
        print("Attribute misalignment")

    self.attributes = []

    while attrs_offset < 1024:
        attr_parsed = {}

        # used to know if it's a resident (b0) or non-resident (b1) attribute
        attr_record = self.raw[attrs_offset:]

        if unpack_from("<I", attr_record, offset=0)[0] == 0xFFFFFFFF:
            # print("[?] Attributes end")
            break

        if unpack_from(ATTR_RECORD_T, attr_record)[2]:
            buf = unpack_from(ATTR_RECORD_NON_RESIDENT, attr_record)
            for (field, value) in zip(self._attr_nr_fields, buf):
                attr_parsed.update({field: value})
        else:
            buf = unpack_from(ATTR_RECORD_RESIDENT, attr_record)
            for (field, value) in zip(self._attr_r_fields, buf):
                attr_parsed.update({field: value})

        # if an attribute has a name
        if attr_parsed["name_length"] > 0:
            record_name = attr_record[
                attr_parsed["name_offset"] : attr_parsed["name_offset"]
                + (attr_parsed["name_length"] * 2)
            ]
            attr_parsed["name"] = record_name.decode("utf-16").encode()
        else:
            attr_parsed["name"] = ""

        # analyze attribute type
        self._analyze_attribute(attr_parsed, attr_record)

        self.attributes.append(attr_parsed)

        attrs_offset += attr_parsed["length"]

    # maybe use this to avoid some calculus above
    self.record["nb_record"] = self.mft_parsed["mft_record_number"]

NTFS

NTFS class

Source code in theforensicator/fs/ntfs.py
class NTFS(object):
    """NTFS class"""

    def __init__(self, ewf_image: "theforensicator.app.EWFImage", partition) -> None:
        """Initializes the NTFS object

        Args:
            ewf_image: EWF object we are based on
            partition: Partition we will parse
        """
        self.ewf_image = ewf_image

        self.handle = self.ewf_image.handle
        self.verbose = self.ewf_image.verbose
        self.partition = partition
        self._start = self.partition["first_lba"]
        self._end = self.partition["last_lba"]

        self.is_mft_dump = None
        self.dump_mft = None

        self.handle.seek(self._start * SECTOR_SIZE)
        self.ntfs_header = NTFSHeader(
            self._read_nsectors(0, SECTOR_NB(SECTOR_SIZE))
        ).ntfs_header
        self.cluster_block_size = (
            self.ntfs_header["bytes_per_sector"]
            * self.ntfs_header["sectors_per_cluster"]
        )

        print("[+] NTFS partition at sector %#x" % (self._start))

        if self.verbose:
            pass
            #self._pretty_print()

        self.mft = {}

    def _pretty_print(self):
        """Prints additionnal informations about the partition"""

        for header_name in self.ntfs_header.keys():
            if type(self.ntfs_header[header_name]) is bytes or str:
                print("\t%-18s : %s" % (header_name, self.ntfs_header[header_name]))
            else:
                print("\t%-20s : %#x" % (header_name, self.ntfs_header[header_name]))

        print("=" * 0x40)

    def _read(self, offset: int, nb_bytes: int) -> bytes:
        """Reads data at a given offset

        Args:
            offset: Where we want to read
            nb_bytes: Number of bytes we want to read

        Returns:
            The bytes we have read
        """
        curr_off = self.handle.get_offset()
        self.handle.seek(self._start * SECTOR_SIZE + offset)
        buf = self.handle.read(nb_bytes)
        self.handle.seek(curr_off)
        return buf

    def _read_sector(self, sector_idx: int) -> bytes:
        """Reads the given sector

        Args:
            sector_idx: Index of the sector we want to read

        Returns:
            The bytes we have read
        """
        return self._read(sector_idx * SECTOR_SIZE, SECTOR_SIZE)

    def _read_nsectors(self, sector_idx: int, nb_sector: int) -> bytes:
        """Reads the given sectors

        Args:
            sector_idx: Index of the first sector we want to read
            nb_sector: Number of sectors we want to read

        Returns:
            The bytes we have read
        """
        return self._read(sector_idx * SECTOR_SIZE, nb_sector * SECTOR_SIZE)

    def _read_cluster(self, cluster_idx: int) -> bytes:
        """Reads a cluster

        Args:
            cluster_idx: Index of the cluster we want to read

        Returns:
            The bytes we have read
        """
        return self._read(
            cluster_idx * self.cluster_block_size, self.cluster_block_size
        )

    def _read_cluster_nbytes(self, cluster_idx: int, nb_bytes: int) -> bytes:
        """Reads some bytes from a cluster

        Args:
            cluster_idx: Index of the cluster we want to read
            nb_bytes: Number of bytes to read

        Returns:
            The bytes we have read
        """
        return self._read(cluster_idx * self.cluster_block_size, nb_bytes)

    def _read_mft_entry(self, mft_entry_idx: int):
        """Reads a MFT entry

        Args:
            mft_entry_idx: Index of the mft entry we want to read

        Returns:
            The bytes we have read
        """
        return self._read(
            (self.mft_start * self.cluster_block_size)
            + (MFT_ENTRY_SIZE * mft_entry_idx),
            MFT_ENTRY_SIZE,
        )

    def read_mft_entry(self, mft_entry_idx: int, verbose=False) -> bytes:
        """Reads a MFT entry

        Args:
            mft_entry_idx: Index of the mft entry we want to read
            verbose: How much logs we want

        Returns:
            The bytes we have read
        """
        mft_entry_raw = self._read_mft_entry(mft_entry_idx)
        mft_entry = MFT(mft_entry_raw, self, verbose)
        return mft_entry

    def load_mft_dump(self, dump_file: str):
        """Load a MFT dump

        Args:
            dump_file: Path of the dump
        """
        with open(dump_file, "r") as dmp_file:
            self.dump_mft = json.loads(dmp_file.read())
            dmp_file.close()

    def analyze_ntfs_header(self, partition_idx: str, resolve_mft_file: str, clear_cache):
        """Analyze the NTFS header

        Args:
            out_file: Where to store the output
            dump_file: Where the output has been stored in a previous run
            resolve_mft_file: Where the resolved MFT in JSON format will be stored
        """
        self.partition_idx = partition_idx
        mft_dump_filepath = f"MFT{partition_idx}.dump"

        if clear_cache:
            if isfile(mft_dump_filepath):
                unlink(mft_dump_filepath)
                print("[+] Cache cleared.")

        self.mft_start = self.ntfs_header["mft_lcn"]

        print("[+] Loading and analyzing MFT ...")

        if not isfile(mft_dump_filepath):
            self.is_mft_dump = False
            self.analyze_mft(mft_dump_filepath)
        else:
            print("[+] Found %s, loading cache file." % (mft_dump_filepath))
            self.is_mft_dump = True
            self.load_mft_dump(mft_dump_filepath)
            print("[+] Cache file loaded.")

        print("[+] MFT loaded ...")

        self.resolve_mft(resolve_mft_file)

    def _get_dump_mft_entry(self, idx: int):
        """Get a dump of the given mft entry

        Args:
            Index of the MFT entry to dump
        """
        if self.is_mft_dump:
            try:
                return self.dump_mft["mft"][str(idx)]
            except:
                return None

        try:
            return self.dump_mft["mft"][idx]
        except:
            return None

    def _resolve_path(self, mft_entry) -> list:
        """Resolve the path of the given mft entry

        Args:
            mft_entry: MFT entry to resolve

        Returns:
            The list of the possible paths of the MFT entry
        """
        paths = []

        # if it's a directory
        if mft_entry["is_directory"]:
            path = ""
            parent_dir = mft_entry["parent_directory"]
            path += mft_entry["directory_name"]

            while parent_dir != FILE_root:
                next_entry = self._get_dump_mft_entry(parent_dir)

                if next_entry is None:
                    break

                if next_entry["is_directory"]:
                    parent_dir = next_entry["parent_directory"]
                    path = f'{next_entry["directory_name"]}\\{path}'
                else:
                    return [{"type": "ORPHAN_DIRECTORY", "directory_name": path}]

            path = "C:\\" + path

            paths.append({"type": "DIRECTORY", "directory_name": path})
        else:
            for file in mft_entry["files"]:
                path = ""
                parent_dir = file["parent_directory"]
                path += file["file_name"]

                is_valid = True

                while parent_dir != FILE_root:
                    next_entry = self._get_dump_mft_entry(parent_dir)

                    if next_entry is None:
                        is_valid = False
                        break

                    if next_entry["is_directory"]:
                        parent_dir = next_entry["parent_directory"]
                        path = f'{next_entry["directory_name"]}\\{path}'
                    else:
                        return [{"type": "ORPHAN_FILE", "file_name": path}]

                if not is_valid:
                    continue

                path = "C:\\" + path

                paths.append({"type": "FILE", "file_name": path})

        return paths

    def resolve_mft(self, json_outfile: str):
        """Resolve the MFT paths and save it to outfile

        Args:
            json_outfile: Where to save the output
        """
        self.resolved_mft = {}

        print("[+] Resolving paths from MFT ...")

        for entry_idx in self.dump_mft["mft"].keys():
            entry = self._get_dump_mft_entry(entry_idx)
            path_infos = self._resolve_path(entry)

            if path_infos:
                obj_type = path_infos[0]["type"]
                if obj_type in ["DIRECTORY", "ORPHAN_DIRECTORY"]:
                    self.resolved_mft[int(entry_idx)] = {
                        "type": obj_type,
                        "info": path_infos,
                        "dates": entry["dates"],
                    }

                if obj_type in ["FILE", "ORPHAN_FILE"]:
                    # case not handled in AT_DATA attribute
                    data = None

                    if "data" in entry:
                        data = entry["data"]
                    else:
                        # need to fix this issue
                        pass

                    self.resolved_mft[int(entry_idx)] = {
                        "type": obj_type,
                        "info": path_infos,
                        "dates": entry["dates"],
                        "data": data,
                    }

        print("[+] MFT paths resolved ...")

        if json_outfile and type(json_outfile) is str:
            with open(f"{json_outfile}.{self.partition_idx}", "w") as dmp:
                dmp.write(json.dumps(self.resolved_mft))
                dmp.close()
            print("[+] %s successfully written." % (json_outfile))

    def analyze_mft(self, out_file: str):
        """Analyze the MFT

        Args:
            out_file: Where to store the output
        """
        print("[?] Analyzing MFT")

        mft_entry_nb = -1

        while True:
            mft_entry_nb += 1

            mft_file = self.read_mft_entry(mft_entry_nb, verbose=False)
            # print(f"Reading MFT entry {mft_entry_nb}")

            if mft_file.raw[0:4] == b"\x00" * 4:
                continue

            mft_file.parse_mft_header()

            if not mft_file.is_valid_entry:
                break

            mft_file.parse_attr_header()

            self.mft[mft_entry_nb] = mft_file.record

        self.dump_mft = {
            "disk_filename": self.ewf_image.filename,
            "total_entries": mft_entry_nb,
            "mft": self.mft,
        }

        with open(out_file, "w") as dmp_file:
            dmp_file.write(json.dumps(self.dump_mft))
            dmp_file.close()

    def _dump_data(self, lcn_dict: dict) -> bytes:
        raw_data = lcn_dict["raw_data"]

        buf = b""

        if lcn_dict["size"] == 0 and len(raw_data) == 0:
            return b""

        if type(raw_data) is str:
            return bytes.fromhex(raw_data)

        for lcn in raw_data:
            for idx in range(lcn["lcn_length"]):
                buf += self._read_cluster(lcn["lcn_offset"] + idx)

        return buf[: lcn_dict["init_size"]]

    def write_to_file(self, dump_dir, filename: str, data: bytes):
        if dump_dir and type(dump_dir) is str:
            out_filename = normpath(dump_dir + "/dump_" + filename.replace('\\', '_').replace(':', ''))
        else:
            out_filename = "./dump_" + filename.replace('\\', '_').replace(':', '')

        with open(out_filename, "wb") as f:
            f.write(data)
            f.close()

        print("[?] %s successfully dumped to %s." % (filename, out_filename))

    def dump_file(self, filenames: list, dump_dir: str) -> bytes:
        """Dump a file using its filename

        Args:
            filenames: Filename of the file to dump

        Returns:
            The file content
        """

        files_list_match = '(?:%s)' % '|'.join(filenames)

        for key in self.resolved_mft:

            obj_type = self.resolved_mft[key]["type"]

            if obj_type not in ["FILE", "ORPHAN_FILE"]:
                continue

            info = self.resolved_mft[key]["info"]

            for file in info:
                if re.match(files_list_match, file["file_name"], flags=re.IGNORECASE):
                    data = self.resolved_mft[key]["data"]
                    if data:
                        self.write_to_file(
                            dump_dir,
                            file["file_name"],
                            self._dump_data(data)
                        )

    def _analyze_registry(self):
        print("[?] Analyzing registries")

    def _analyze_winsec(self):
        print("[?] Analyzing Windows Security")
__init__(self, ewf_image, partition) special

Initializes the NTFS object

Parameters:

Name Type Description Default
ewf_image theforensicator.app.EWFImage

EWF object we are based on

required
partition

Partition we will parse

required
Source code in theforensicator/fs/ntfs.py
def __init__(self, ewf_image: "theforensicator.app.EWFImage", partition) -> None:
    """Initializes the NTFS object

    Args:
        ewf_image: EWF object we are based on
        partition: Partition we will parse
    """
    self.ewf_image = ewf_image

    self.handle = self.ewf_image.handle
    self.verbose = self.ewf_image.verbose
    self.partition = partition
    self._start = self.partition["first_lba"]
    self._end = self.partition["last_lba"]

    self.is_mft_dump = None
    self.dump_mft = None

    self.handle.seek(self._start * SECTOR_SIZE)
    self.ntfs_header = NTFSHeader(
        self._read_nsectors(0, SECTOR_NB(SECTOR_SIZE))
    ).ntfs_header
    self.cluster_block_size = (
        self.ntfs_header["bytes_per_sector"]
        * self.ntfs_header["sectors_per_cluster"]
    )

    print("[+] NTFS partition at sector %#x" % (self._start))

    if self.verbose:
        pass
        #self._pretty_print()

    self.mft = {}
analyze_mft(self, out_file)

Analyze the MFT

Parameters:

Name Type Description Default
out_file str

Where to store the output

required
Source code in theforensicator/fs/ntfs.py
def analyze_mft(self, out_file: str):
    """Analyze the MFT

    Args:
        out_file: Where to store the output
    """
    print("[?] Analyzing MFT")

    mft_entry_nb = -1

    while True:
        mft_entry_nb += 1

        mft_file = self.read_mft_entry(mft_entry_nb, verbose=False)
        # print(f"Reading MFT entry {mft_entry_nb}")

        if mft_file.raw[0:4] == b"\x00" * 4:
            continue

        mft_file.parse_mft_header()

        if not mft_file.is_valid_entry:
            break

        mft_file.parse_attr_header()

        self.mft[mft_entry_nb] = mft_file.record

    self.dump_mft = {
        "disk_filename": self.ewf_image.filename,
        "total_entries": mft_entry_nb,
        "mft": self.mft,
    }

    with open(out_file, "w") as dmp_file:
        dmp_file.write(json.dumps(self.dump_mft))
        dmp_file.close()
analyze_ntfs_header(self, partition_idx, resolve_mft_file, clear_cache)

Analyze the NTFS header

Parameters:

Name Type Description Default
out_file

Where to store the output

required
dump_file

Where the output has been stored in a previous run

required
resolve_mft_file str

Where the resolved MFT in JSON format will be stored

required
Source code in theforensicator/fs/ntfs.py
def analyze_ntfs_header(self, partition_idx: str, resolve_mft_file: str, clear_cache):
    """Analyze the NTFS header

    Args:
        out_file: Where to store the output
        dump_file: Where the output has been stored in a previous run
        resolve_mft_file: Where the resolved MFT in JSON format will be stored
    """
    self.partition_idx = partition_idx
    mft_dump_filepath = f"MFT{partition_idx}.dump"

    if clear_cache:
        if isfile(mft_dump_filepath):
            unlink(mft_dump_filepath)
            print("[+] Cache cleared.")

    self.mft_start = self.ntfs_header["mft_lcn"]

    print("[+] Loading and analyzing MFT ...")

    if not isfile(mft_dump_filepath):
        self.is_mft_dump = False
        self.analyze_mft(mft_dump_filepath)
    else:
        print("[+] Found %s, loading cache file." % (mft_dump_filepath))
        self.is_mft_dump = True
        self.load_mft_dump(mft_dump_filepath)
        print("[+] Cache file loaded.")

    print("[+] MFT loaded ...")

    self.resolve_mft(resolve_mft_file)
dump_file(self, filenames, dump_dir)

Dump a file using its filename

Parameters:

Name Type Description Default
filenames list

Filename of the file to dump

required

Returns:

Type Description
bytes

The file content

Source code in theforensicator/fs/ntfs.py
def dump_file(self, filenames: list, dump_dir: str) -> bytes:
    """Dump a file using its filename

    Args:
        filenames: Filename of the file to dump

    Returns:
        The file content
    """

    files_list_match = '(?:%s)' % '|'.join(filenames)

    for key in self.resolved_mft:

        obj_type = self.resolved_mft[key]["type"]

        if obj_type not in ["FILE", "ORPHAN_FILE"]:
            continue

        info = self.resolved_mft[key]["info"]

        for file in info:
            if re.match(files_list_match, file["file_name"], flags=re.IGNORECASE):
                data = self.resolved_mft[key]["data"]
                if data:
                    self.write_to_file(
                        dump_dir,
                        file["file_name"],
                        self._dump_data(data)
                    )
load_mft_dump(self, dump_file)

Load a MFT dump

Parameters:

Name Type Description Default
dump_file str

Path of the dump

required
Source code in theforensicator/fs/ntfs.py
def load_mft_dump(self, dump_file: str):
    """Load a MFT dump

    Args:
        dump_file: Path of the dump
    """
    with open(dump_file, "r") as dmp_file:
        self.dump_mft = json.loads(dmp_file.read())
        dmp_file.close()
read_mft_entry(self, mft_entry_idx, verbose=False)

Reads a MFT entry

Parameters:

Name Type Description Default
mft_entry_idx int

Index of the mft entry we want to read

required
verbose

How much logs we want

False

Returns:

Type Description
bytes

The bytes we have read

Source code in theforensicator/fs/ntfs.py
def read_mft_entry(self, mft_entry_idx: int, verbose=False) -> bytes:
    """Reads a MFT entry

    Args:
        mft_entry_idx: Index of the mft entry we want to read
        verbose: How much logs we want

    Returns:
        The bytes we have read
    """
    mft_entry_raw = self._read_mft_entry(mft_entry_idx)
    mft_entry = MFT(mft_entry_raw, self, verbose)
    return mft_entry
resolve_mft(self, json_outfile)

Resolve the MFT paths and save it to outfile

Parameters:

Name Type Description Default
json_outfile str

Where to save the output

required
Source code in theforensicator/fs/ntfs.py
def resolve_mft(self, json_outfile: str):
    """Resolve the MFT paths and save it to outfile

    Args:
        json_outfile: Where to save the output
    """
    self.resolved_mft = {}

    print("[+] Resolving paths from MFT ...")

    for entry_idx in self.dump_mft["mft"].keys():
        entry = self._get_dump_mft_entry(entry_idx)
        path_infos = self._resolve_path(entry)

        if path_infos:
            obj_type = path_infos[0]["type"]
            if obj_type in ["DIRECTORY", "ORPHAN_DIRECTORY"]:
                self.resolved_mft[int(entry_idx)] = {
                    "type": obj_type,
                    "info": path_infos,
                    "dates": entry["dates"],
                }

            if obj_type in ["FILE", "ORPHAN_FILE"]:
                # case not handled in AT_DATA attribute
                data = None

                if "data" in entry:
                    data = entry["data"]
                else:
                    # need to fix this issue
                    pass

                self.resolved_mft[int(entry_idx)] = {
                    "type": obj_type,
                    "info": path_infos,
                    "dates": entry["dates"],
                    "data": data,
                }

    print("[+] MFT paths resolved ...")

    if json_outfile and type(json_outfile) is str:
        with open(f"{json_outfile}.{self.partition_idx}", "w") as dmp:
            dmp.write(json.dumps(self.resolved_mft))
            dmp.close()
        print("[+] %s successfully written." % (json_outfile))

NTFSHeader

NTFS Header

Source code in theforensicator/fs/ntfs.py
class NTFSHeader(object):
    """NTFS Header"""

    def __init__(self, header: bytes) -> None:
        """Initialize the NTFSHeader class

        Args:
            header: Bytes of the header
        """
        self._fields = [
            "jump",
            "oem_id",
            "bytes_per_sector",
            "sectors_per_cluster",
            "reserved_sectors",
            "fats",
            "root_entries",
            "sectors",
            "media_type",
            "sectors_per_fat",
            "sectors_per_track",
            "heads",
            "hidden_sectors",
            "large_sectors",
            "unused",
            "number_of_sectors",
            "mft_lcn",
            "mftmirr_lcn",
            "clusters_per_mft_record",
            "reserved0",
            "clusters_per_index_record",
            "reserved1",
            "volume_serial_number",
            "checksum",
            "bootstrap",
            "end_of_sector_marker",
        ]

        self.header = header
        self.ntfs_header = {}

        for (field, value) in zip(
            self._fields, unpack(NTFS_BOOT_SECTOR_T, self.header)
        ):
            if field == "bootstrap":
                self.ntfs_header[field] = value.hex()
            else:
                self.ntfs_header.update({field: value})
__init__(self, header) special

Initialize the NTFSHeader class

Parameters:

Name Type Description Default
header bytes

Bytes of the header

required
Source code in theforensicator/fs/ntfs.py
def __init__(self, header: bytes) -> None:
    """Initialize the NTFSHeader class

    Args:
        header: Bytes of the header
    """
    self._fields = [
        "jump",
        "oem_id",
        "bytes_per_sector",
        "sectors_per_cluster",
        "reserved_sectors",
        "fats",
        "root_entries",
        "sectors",
        "media_type",
        "sectors_per_fat",
        "sectors_per_track",
        "heads",
        "hidden_sectors",
        "large_sectors",
        "unused",
        "number_of_sectors",
        "mft_lcn",
        "mftmirr_lcn",
        "clusters_per_mft_record",
        "reserved0",
        "clusters_per_index_record",
        "reserved1",
        "volume_serial_number",
        "checksum",
        "bootstrap",
        "end_of_sector_marker",
    ]

    self.header = header
    self.ntfs_header = {}

    for (field, value) in zip(
        self._fields, unpack(NTFS_BOOT_SECTOR_T, self.header)
    ):
        if field == "bootstrap":
            self.ntfs_header[field] = value.hex()
        else:
            self.ntfs_header.update({field: value})