diff --git a/dtformats/data_format.py b/dtformats/data_format.py index 74c846a..e4953f1 100644 --- a/dtformats/data_format.py +++ b/dtformats/data_format.py @@ -394,6 +394,17 @@ def _FormatString(self, string): """ return string.rstrip('\x00') + def _FormatByteAsString(self, bytes_object): + """Formats a bytes object as a string. + + Args: + bytes_object (bytes): bytes object + + Returns: + str: a string decoded from bytes + """ + return bytes_object.decode('utf-8') + def _FormatStructureObject(self, structure_object, debug_info): """Formats a structure object debug information. diff --git a/dtformats/index_directory_entry.py b/dtformats/index_directory_entry.py new file mode 100644 index 0000000..70882ba --- /dev/null +++ b/dtformats/index_directory_entry.py @@ -0,0 +1,164 @@ +# -*- coding: utf-8 -*- +"""NTFS $I30 index entries.""" + +import os + +from dtformats import data_format +from dtformats import errors + + +class NTFSIndexI30Record(data_format.BinaryDataFile): + """Class that represents an NTFS $I30 index record.""" + + _FABRIC = data_format.BinaryDataFile.ReadDefinitionFile( + 'index_directory_entry.yml') + + _DEBUG_INDX_ENTRY_HEADER = [ + ('signature', 'signature', '_FormatByteAsString'), + ('fixup_value_offset', 'fixup_value_offset', '_FormatIntegerAsDecimal'), + ('num_fixup_values', 'num_fixup_values', '_FormatIntegerAsDecimal'), + ('logfile_sequence_number', 'logfile_sequence_number', + '_FormatIntegerAsDecimal'), + ('virtual_cluster_number', 'virtual_cluster_number', + '_FormatIntegerAsDecimal')] + + _DEBUG_INDX_NODE_HEADER = [ + ('index_values_offset', 'index_values_offset', + '_FormatIntegerAsDecimal'), + ('index_node_size', 'index_node_size', '_FormatIntegerAsDecimal'), + ('allocated_index_node_size', 'allocated_index_node_size', + '_FormatIntegerAsDecimal'), + ('index_node_flags', 'index_node_flags', '_FormatIntegerAsDecimal')] + + _DEBUG_INDX_DIR_RECORD = [ + ('file_reference', 'file_reference', '_FormatIntegerAsDecimal'), + ('index_value_size', 'index_value_size', '_FormatIntegerAsDecimal'), + ('index_key_data_size', 'index_key_data_size', '_FormatIntegerAsDecimal'), + ('index_value_flags', 'index_value_flags', '_FormatIntegerAsDecimal')] + + _DEBUG_FILE_NAME_ATTR = [ + ('parent_file_reference', 'parent_file_reference', + '_FormatIntegerAsDecimal'), + ('creation_time', 'creation_time', '_FormatIntegerAsDecimal'), + ('modification_time', 'modification_time', '_FormatIntegerAsDecimal'), + ('entry_modification_time', 'entry_modification_time', + '_FormatIntegerAsDecimal'), + ('access_time', 'access_time', '_FormatIntegerAsDecimal'), + ('allocated_file_size', 'allocated_file_size', '_FormatIntegerAsDecimal'), + ('file_size', 'file_size', '_FormatIntegerAsDecimal'), + ('file_attribute_flags', 'file_attribute_flags', + '_FormatIntegerAsDecimal'), + ('extended_data', 'extended_data', '_FormatIntegerAsDecimal'), + ('name_size', 'name_size', '_FormatIntegerAsDecimal'), + ('name_space', 'name_space', '_FormatIntegerAsDecimal'), + ('filename', 'filename', '_FormatString')] + + def PrintRecord(self, record): + """Prints a human readable version of the NTFS $I30 + index record. + + Args: + record (index_directory_entry): An index_directory_entry structure. + """ + if record: + if self._debug: + self._DebugPrintStructureObject( + record.entry_header, self._DEBUG_INDX_ENTRY_HEADER) + self._DebugPrintStructureObject( + record.node_header, self._DEBUG_INDX_NODE_HEADER) + self._DebugPrintStructureObject( + record, self._DEBUG_INDX_DIR_RECORD) + self._DebugPrintStructureObject( + record.index_key_data, self._DEBUG_FILE_NAME_ATTR) + + def _ParseIndexEntryHeader(self, file_object): + """Parses an NTFS index entry header. + + Args: + file_object: A file-like object. + + Returns: + tuple[index_entry_header, integer]: A tuple containing an + index entry header structure and data size. + """ + file_offset = file_object.tell() + data_type_map = self._GetDataTypeMap('index_entry_header') + + index_entry_header, data_size = self._ReadStructureFromFileObject( + file_object, file_offset, data_type_map, 'INDX Entry Header') + return index_entry_header, data_size + + def _ParseIndexNodeHeader(self, file_object): + """Parses an NTFS index node header. + + Args: + file_object: A file-like object. + + Returns: + tuple[index_node_header, integer]: A tuple containing an index node + header structure and data size. + """ + file_offset = file_object.tell() + data_type_map = self._GetDataTypeMap('index_node_header') + + index_node_header, data_size = self._ReadStructureFromFileObject( + file_object, file_offset, data_type_map, 'INDX Node Header') + return index_node_header, data_size + + def _ParseIndexDirectoryEntry(self, file_object): + """Parses an NTFS $I30 index record that contains directory entries + + Args: + file_object: A file-like object. + + Returns: + tuple[index_directory_entry, integer]: A tuple containing a + NTFS $I30 index structure and data size. + """ + file_offset = file_object.tell() + data_type_map = self._GetDataTypeMap('index_directory_entry') + + try: + index_directory_entry, data_size = self._ReadStructureFromFileObject( + file_object, file_offset, data_type_map, + '4KB block with possible INDX Directory Entry') + return index_directory_entry, data_size + except errors.ParseError as exception: + if self._debug: + print(exception) + return None, None + + def ReadFileObject(self, file_object): + """Reads a file-like object containing INDX records. + + Args: + file_object (file): file-like object. + + Raises: + ParseError: if the file cannot be read. + """ + self._file_object = file_object + + def ReadRecords(self): + """Reads NTFS $I30 INDX records. + + Yields: + index_directory_entry: An NTFS $I30 index record. + + Raises: + ParseError: if a record cannot be read. + """ + self._file_object.seek(0, os.SEEK_SET) + file_offset = 0 + + # NTFS INDX entries allocated in 4096-byte chunks + block_size = 4096 + + while file_offset < self._file_size: + self._file_object.seek(file_offset, os.SEEK_SET) + index_directory_entry, _ = self._ParseIndexDirectoryEntry( + self._file_object) + if index_directory_entry: + yield index_directory_entry + + file_offset += block_size diff --git a/dtformats/index_directory_entry.yml b/dtformats/index_directory_entry.yml new file mode 100644 index 0000000..0ee319a --- /dev/null +++ b/dtformats/index_directory_entry.yml @@ -0,0 +1,158 @@ +name: ntfs_i30_index +type: format +description: NTFS $I30 index record that contains directory entries +urls: ["https://github.com/libyal/libfsntfs/blob/main/documentation/New%20Technologies%20File%20System%20(NTFS).asciidoc#index"] +metadata: + authors: ['Juan Leaniz 64 + data_type: file_name_attribute diff --git a/tests/index_directory_entry.py b/tests/index_directory_entry.py new file mode 100644 index 0000000..989ec73 --- /dev/null +++ b/tests/index_directory_entry.py @@ -0,0 +1,188 @@ +# -*- coding: utf-8 -*- +"""Tests for NTFS INDX directory entries.""" + +import os +import unittest + +from dtformats import index_directory_entry + +from tests import test_lib + + + # pylint: disable=protected-access +class IndexRecordFileTest(test_lib.BaseTestCase): + """NTFS $I30 index record tests.""" + + expected_values = [ + {'entry_header_signature': b'INDX', + 'entry_header_fixup_value_offset': 40, + 'entry_header_num_fixup_values': 9, + 'entry_header_logfile_sequence_number': 1143279, + 'entry_header_virtual_cluster_number': 0, + 'node_header_index_values_offset': 40, + 'node_header_index_node_size': 3176, + 'node_header_allocated_index_node_size': 4072, + 'node_header_index_node_flags': 0, + 'file_reference' : 281474976710748, + 'index_value_size' : 88, + 'index_key_data_size' : 72, + 'index_value_flags' : 0, + 'index_key_data_parent_file_reference': 281474976710691, + 'index_key_data_creation_time': 130795153668593750, + 'index_key_data_modification_time': 130870367262995000, + 'index_key_data_entry_modification_time': 130849843578932500, + 'index_key_data_access_time': 130870367262995000, + 'index_key_data_allocated_file_size': 24576, + 'index_key_data_file_size': 24576, + 'index_key_data_file_attribute_flags': 38, + 'index_key_data_extended_data': 0, + 'index_key_data_name_size': 3, + 'index_key_data_name_space': 3, + 'index_key_data_filename': 'BCD', + }, + {'entry_header_signature': b'INDX', + 'entry_header_fixup_value_offset': 40, + 'entry_header_num_fixup_values': 9, + 'entry_header_logfile_sequence_number': 1060954, + 'entry_header_virtual_cluster_number': 0, + 'node_header_index_values_offset': 40, + 'node_header_index_node_size': 784, + 'node_header_allocated_index_node_size': 4072, + 'node_header_index_node_flags': 0, + 'file_reference' : 281474976710687, + 'index_value_size' : 96, + 'index_key_data_size' : 76, + 'index_value_flags' : 0, + 'index_key_data_parent_file_reference': 281474976710685, + 'index_key_data_creation_time': 130795138741093750, + 'index_key_data_modification_time': 130795138741093750, + 'index_key_data_entry_modification_time': 130795138741093750, + 'index_key_data_access_time': 130795138741093750, + 'index_key_data_allocated_file_size': 0, + 'index_key_data_file_size': 0, + 'index_key_data_file_attribute_flags': 38, + 'index_key_data_extended_data': 0, + 'index_key_data_name_size': 5, + 'index_key_data_name_space': 0, + 'index_key_data_filename': '$Tops', + }] + + def _GetTestStructure(self, path): + """ Helper method to create an NTFSIndexI30Record object. + + Args: + path (str): Path to a test file. + + Returns: + NTFSIndexI30Record: An NTFS I30 index object. + """ + test_file_path = self._GetTestFilePath(path) + self._SkipIfPathNotExists(test_file_path) + + output_writer = test_lib.TestOutputWriter() + test_file = index_directory_entry.NTFSIndexI30Record( + debug=True, output_writer=output_writer) + + test_file.Open(test_file_path) + return test_file + + def testReadFileObject(self): + """Tests the ReadFileObject function.""" + test_file = self._GetTestStructure(['ntfs_i30_records.raw']) + self.assertIsNotNone(test_file) + + def testParseIndexEntryHeader(self): + """Tests the _ParseIndexEntryHeader function. """ + test_file = self._GetTestStructure(['ntfs_i30_records.raw']) + record, _ = test_file._ParseIndexEntryHeader(test_file._file_object) + + self.assertEqual(record.signature, + self.expected_values[0].get('entry_header_signature')) + self.assertEqual(record.fixup_value_offset, + self.expected_values[0].get('entry_header_fixup_value_offset')) + self.assertEqual(record.num_fixup_values, + self.expected_values[0].get('entry_header_num_fixup_values')) + self.assertEqual(record.logfile_sequence_number, + self.expected_values[0].get('entry_header_logfile_sequence_number')) + self.assertEqual(record.virtual_cluster_number, + self.expected_values[0].get('entry_header_virtual_cluster_number')) + + def testParseIndexNodeHeader(self): + """Tests the _ParseIndexNodeHeader function. """ + test_file = self._GetTestStructure(['ntfs_i30_records.raw']) + test_file._file_object.seek(24, os.SEEK_SET) + record, _ = test_file._ParseIndexNodeHeader(test_file._file_object) + + self.assertEqual(record.index_values_offset, + self.expected_values[0].get('node_header_index_values_offset')) + self.assertEqual(record.index_node_size, + self.expected_values[0].get('node_header_index_node_size')) + self.assertEqual(record.allocated_index_node_size, + self.expected_values[0].get('node_header_allocated_index_node_size')) + self.assertEqual(record.index_node_flags, + self.expected_values[0].get('node_header_index_node_flags')) + + def testReadRecords(self): + """ + Method to test class functionality. + """ + test_file = self._GetTestStructure(['ntfs_i30_records.raw']) + + i = 0 + for r in test_file.ReadRecords(): + self.assertEqual(r.entry_header.signature, + self.expected_values[i].get('entry_header_signature')) + self.assertEqual(r.entry_header.fixup_value_offset, + self.expected_values[i].get('entry_header_fixup_value_offset')) + self.assertEqual(r.entry_header.num_fixup_values, + self.expected_values[i].get('entry_header_num_fixup_values')) + self.assertEqual(r.entry_header.logfile_sequence_number, + self.expected_values[i].get('entry_header_logfile_sequence_number')) + self.assertEqual(r.entry_header.virtual_cluster_number, + self.expected_values[i].get('entry_header_virtual_cluster_number')) + self.assertEqual(r.node_header.index_values_offset, + self.expected_values[i].get('node_header_index_values_offset')) + self.assertEqual(r.node_header.index_node_size, + self.expected_values[i].get('node_header_index_node_size')) + self.assertEqual(r.node_header.allocated_index_node_size, + self.expected_values[i].get('node_header_allocated_index_node_size')) + self.assertEqual(r.node_header.index_node_flags, + self.expected_values[i].get('node_header_index_node_flags')) + self.assertEqual(r.file_reference, + self.expected_values[i].get('file_reference')) + self.assertEqual(r.index_value_size, + self.expected_values[i].get('index_value_size')) + self.assertEqual(r.index_key_data_size, + self.expected_values[i].get('index_key_data_size')) + self.assertEqual(r.index_value_flags, + self.expected_values[i].get('index_value_flags')) + self.assertEqual(r.index_key_data.parent_file_reference, + self.expected_values[i].get('index_key_data_parent_file_reference')) + self.assertEqual(r.index_key_data.creation_time, + self.expected_values[i].get('index_key_data_creation_time')) + self.assertEqual(r.index_key_data.modification_time, + self.expected_values[i].get('index_key_data_modification_time')) + self.assertEqual(r.index_key_data.entry_modification_time, + self.expected_values[i].get('index_key_data_entry_modification_time')) + self.assertEqual(r.index_key_data.access_time, + self.expected_values[i].get('index_key_data_access_time')) + self.assertEqual(r.index_key_data.allocated_file_size, + self.expected_values[i].get('index_key_data_allocated_file_size')) + self.assertEqual(r.index_key_data.file_size, + self.expected_values[i].get('index_key_data_file_size')) + self.assertEqual(r.index_key_data.file_attribute_flags, + self.expected_values[i].get('index_key_data_file_attribute_flags')) + self.assertEqual(r.index_key_data.extended_data, + self.expected_values[i].get('index_key_data_extended_data')) + self.assertEqual(r.index_key_data.name_size, + self.expected_values[i].get('index_key_data_name_size')) + self.assertEqual(r.index_key_data.name_space, + self.expected_values[i].get('index_key_data_name_space')) + self.assertEqual(r.index_key_data.filename, + self.expected_values[i].get('index_key_data_filename')) + + i += 1 + + +if __name__ == '__main__': + unittest.main()