Skip to content

Commit

Permalink
feat(ingestion/s3): ignore depth mismatched path
Browse files Browse the repository at this point in the history
  • Loading branch information
eagle-25 committed Jan 22, 2025
1 parent fbfa487 commit f0a1541
Show file tree
Hide file tree
Showing 3 changed files with 105 additions and 18 deletions.
19 changes: 14 additions & 5 deletions metadata-ingestion/src/datahub/ingestion/source/s3/source.py
Original file line number Diff line number Diff line change
Expand Up @@ -866,8 +866,21 @@ def get_folder_info(
Returns:
List[Folder]: A list of Folder objects representing the partitions found.
"""

def _is_allowed_path(path_spec_: PathSpec, s3_uri: str) -> bool:
allowed = path_spec_.allowed(s3_uri)
if not allowed:
logger.debug(f"File {s3_uri} not allowed and skipping")
self.report.report_file_dropped(s3_uri)
return allowed

s3_objects = (
obj
for obj in bucket.objects.filter(Prefix=prefix).page_size(PAGE_SIZE)
if _is_allowed_path(path_spec, f"s3://{obj.bucket_name}/{obj.key}")
)

partitions: List[Folder] = []
s3_objects = bucket.objects.filter(Prefix=prefix).page_size(PAGE_SIZE)
grouped_s3_objects_by_dirname = groupby_unsorted(
s3_objects,
key=lambda obj: obj.key.rsplit("/", 1)[0],
Expand All @@ -878,10 +891,6 @@ def get_folder_info(
modification_time = None

for item in group:
file_path = self.create_s3_path(item.bucket_name, item.key)
if not path_spec.allowed(file_path):
logger.debug(f"File {file_path} not allowed and skipping")
continue
file_size += item.size
if creation_time is None or item.last_modified < creation_time:
creation_time = item.last_modified
Expand Down
31 changes: 31 additions & 0 deletions metadata-ingestion/tests/unit/data_lake/test_path_spec.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
import pytest

from datahub.ingestion.source.data_lake_common.path_spec import PathSpec


@pytest.mark.parametrize(
"include, s3_uri, expected",
[
(
"s3://bucket/{table}/{partition0}/*.csv",
"s3://bucket/table/p1/test.csv",
True,
),
(
"s3://bucket/{table}/{partition0}/*.csv",
"s3://bucket/table/p1/p2/test.csv",
False,
),
],
)
def test_allowed_ignores_depth_mismatch(
include: str, s3_uri: str, expected: bool
) -> None:
# arrange
path_spec = PathSpec(
include=include,
table_name="{table}",
)

# act, assert
assert path_spec.allowed(s3_uri) == expected
73 changes: 60 additions & 13 deletions metadata-ingestion/tests/unit/s3/test_s3_source.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
from datetime import datetime
from typing import List, Tuple
from unittest.mock import Mock
from unittest.mock import Mock, call

import pytest

Expand All @@ -12,6 +12,18 @@
from datahub.ingestion.source.s3.source import S3Source, partitioned_folder_comparator


def _get_s3_source(path_spec_: PathSpec) -> S3Source:
return S3Source.create(
config_dict={
"path_spec": {
"include": path_spec_.include,
"table_name": path_spec_.table_name,
},
},
ctx=PipelineContext(run_id="test-s3"),
)


def test_partition_comparator_numeric_folder_name():
folder1 = "3"
folder2 = "12"
Expand Down Expand Up @@ -249,18 +261,6 @@ def test_get_folder_info():
"""
Test S3Source.get_folder_info returns the latest file in each folder
"""

def _get_s3_source(path_spec_: PathSpec) -> S3Source:
return S3Source.create(
config_dict={
"path_spec": {
"include": path_spec_.include,
"table_name": path_spec_.table_name,
},
},
ctx=PipelineContext(run_id="test-s3"),
)

# arrange
path_spec = PathSpec(
include="s3://my-bucket/{table}/{partition0}/*.csv",
Expand Down Expand Up @@ -303,3 +303,50 @@ def _get_s3_source(path_spec_: PathSpec) -> S3Source:
assert len(res) == 2
assert res[0].sample_file == "s3://my-bucket/my-folder/dir1/0002.csv"
assert res[1].sample_file == "s3://my-bucket/my-folder/dir2/0001.csv"


def test_get_folder_info_ignores_disallowed_path(
caplog: pytest.LogCaptureFixture,
) -> None:
"""
Test S3Source.get_folder_info skips disallowed files and logs a message
"""
# arrange
path_spec = Mock(
spec=PathSpec,
include="s3://my-bucket/{table}/{partition0}/*.csv",
table_name="{table}",
)
path_spec.allowed = Mock(return_value=False)

bucket = Mock()
bucket.objects.filter().page_size = Mock(
return_value=[
Mock(
bucket_name="my-bucket",
key="my-folder/ignore/this/path/0001.csv",
creation_time=datetime(2025, 1, 1, 1),
last_modified=datetime(2025, 1, 1, 1),
size=100,
),
]
)

s3_source = _get_s3_source(path_spec)

# act
res = s3_source.get_folder_info(path_spec, bucket, prefix="/my-folder")

# assert
expected_called_s3_uri = "s3://my-bucket/my-folder/ignore/this/path/0001.csv"

assert path_spec.allowed.call_args_list == [call(expected_called_s3_uri)], (
"File should be checked if it's allowed"
)
assert f"File {expected_called_s3_uri} not allowed and skipping" in caplog.text, (
"Dropped file should be logged"
)
assert s3_source.get_report().filtered == [expected_called_s3_uri], (
"Dropped file should be in the report.filtered"
)
assert res == [], "Dropped file should not be in the result"

0 comments on commit f0a1541

Please sign in to comment.