diff --git a/metadata-ingestion/src/datahub/ingestion/source/s3/source.py b/metadata-ingestion/src/datahub/ingestion/source/s3/source.py index b5bc08912ee94..b1554ad127b7a 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/s3/source.py +++ b/metadata-ingestion/src/datahub/ingestion/source/s3/source.py @@ -866,8 +866,21 @@ def get_folder_info( Returns: List[Folder]: A list of Folder objects representing the partitions found. """ + + def _is_allowed_path(path_spec_: PathSpec, s3_uri: str) -> bool: + allowed = path_spec_.allowed(s3_uri) + if not allowed: + logger.debug(f"File {s3_uri} not allowed and skipping") + self.report.report_file_dropped(s3_uri) + return allowed + + s3_objects = ( + obj + for obj in bucket.objects.filter(Prefix=prefix).page_size(PAGE_SIZE) + if _is_allowed_path(path_spec, f"s3://{obj.bucket_name}/{obj.key}") + ) + partitions: List[Folder] = [] - s3_objects = bucket.objects.filter(Prefix=prefix).page_size(PAGE_SIZE) grouped_s3_objects_by_dirname = groupby_unsorted( s3_objects, key=lambda obj: obj.key.rsplit("/", 1)[0], @@ -878,10 +891,6 @@ def get_folder_info( modification_time = None for item in group: - file_path = self.create_s3_path(item.bucket_name, item.key) - if not path_spec.allowed(file_path): - logger.debug(f"File {file_path} not allowed and skipping") - continue file_size += item.size if creation_time is None or item.last_modified < creation_time: creation_time = item.last_modified diff --git a/metadata-ingestion/tests/unit/data_lake/test_path_spec.py b/metadata-ingestion/tests/unit/data_lake/test_path_spec.py new file mode 100644 index 0000000000000..305f4b72f5329 --- /dev/null +++ b/metadata-ingestion/tests/unit/data_lake/test_path_spec.py @@ -0,0 +1,31 @@ +import pytest + +from datahub.ingestion.source.data_lake_common.path_spec import PathSpec + + +@pytest.mark.parametrize( + "include, s3_uri, expected", + [ + ( + "s3://bucket/{table}/{partition0}/*.csv", + "s3://bucket/table/p1/test.csv", + True, + ), + ( + "s3://bucket/{table}/{partition0}/*.csv", + "s3://bucket/table/p1/p2/test.csv", + False, + ), + ], +) +def test_allowed_ignores_depth_mismatch( + include: str, s3_uri: str, expected: bool +) -> None: + # arrange + path_spec = PathSpec( + include=include, + table_name="{table}", + ) + + # act, assert + assert path_spec.allowed(s3_uri) == expected diff --git a/metadata-ingestion/tests/unit/s3/test_s3_source.py b/metadata-ingestion/tests/unit/s3/test_s3_source.py index 902987213e122..e91874dcb4236 100644 --- a/metadata-ingestion/tests/unit/s3/test_s3_source.py +++ b/metadata-ingestion/tests/unit/s3/test_s3_source.py @@ -1,6 +1,6 @@ from datetime import datetime from typing import List, Tuple -from unittest.mock import Mock +from unittest.mock import Mock, call import pytest @@ -12,6 +12,18 @@ from datahub.ingestion.source.s3.source import S3Source, partitioned_folder_comparator +def _get_s3_source(path_spec_: PathSpec) -> S3Source: + return S3Source.create( + config_dict={ + "path_spec": { + "include": path_spec_.include, + "table_name": path_spec_.table_name, + }, + }, + ctx=PipelineContext(run_id="test-s3"), + ) + + def test_partition_comparator_numeric_folder_name(): folder1 = "3" folder2 = "12" @@ -249,18 +261,6 @@ def test_get_folder_info(): """ Test S3Source.get_folder_info returns the latest file in each folder """ - - def _get_s3_source(path_spec_: PathSpec) -> S3Source: - return S3Source.create( - config_dict={ - "path_spec": { - "include": path_spec_.include, - "table_name": path_spec_.table_name, - }, - }, - ctx=PipelineContext(run_id="test-s3"), - ) - # arrange path_spec = PathSpec( include="s3://my-bucket/{table}/{partition0}/*.csv", @@ -303,3 +303,50 @@ def _get_s3_source(path_spec_: PathSpec) -> S3Source: assert len(res) == 2 assert res[0].sample_file == "s3://my-bucket/my-folder/dir1/0002.csv" assert res[1].sample_file == "s3://my-bucket/my-folder/dir2/0001.csv" + + +def test_get_folder_info_ignores_disallowed_path( + caplog: pytest.LogCaptureFixture, +) -> None: + """ + Test S3Source.get_folder_info skips disallowed files and logs a message + """ + # arrange + path_spec = Mock( + spec=PathSpec, + include="s3://my-bucket/{table}/{partition0}/*.csv", + table_name="{table}", + ) + path_spec.allowed = Mock(return_value=False) + + bucket = Mock() + bucket.objects.filter().page_size = Mock( + return_value=[ + Mock( + bucket_name="my-bucket", + key="my-folder/ignore/this/path/0001.csv", + creation_time=datetime(2025, 1, 1, 1), + last_modified=datetime(2025, 1, 1, 1), + size=100, + ), + ] + ) + + s3_source = _get_s3_source(path_spec) + + # act + res = s3_source.get_folder_info(path_spec, bucket, prefix="/my-folder") + + # assert + expected_called_s3_uri = "s3://my-bucket/my-folder/ignore/this/path/0001.csv" + + assert path_spec.allowed.call_args_list == [call(expected_called_s3_uri)], ( + "File should be checked if it's allowed" + ) + assert f"File {expected_called_s3_uri} not allowed and skipping" in caplog.text, ( + "Dropped file should be logged" + ) + assert s3_source.get_report().filtered == [expected_called_s3_uri], ( + "Dropped file should be in the report.filtered" + ) + assert res == [], "Dropped file should not be in the result"