Skip to content
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.

Commit a5d0278

Browse files
committedJan 1, 2025·
fix(ingestion/s3): groupby group-splitting issue
1 parent 96c6058 commit a5d0278

File tree

2 files changed

+33
-11
lines changed

2 files changed

+33
-11
lines changed
 

‎metadata-ingestion/src/datahub/ingestion/source/s3/source.py

+11-10
Original file line numberDiff line numberDiff line change
@@ -5,8 +5,8 @@
55
import pathlib
66
import re
77
import time
8+
from collections import defaultdict
89
from datetime import datetime
9-
from itertools import groupby
1010
from pathlib import PurePath
1111
from typing import Any, Dict, Iterable, List, Optional, Tuple
1212
from urllib.parse import urlparse
@@ -139,6 +139,14 @@ def partitioned_folder_comparator(folder1: str, folder2: str) -> int:
139139
return 1 if folder1 > folder2 else -1
140140

141141

142+
def _group_s3_objects_by_dirname(s3_objects: Any) -> Dict[str, List[Any]]:
143+
grouped_objects = defaultdict(list)
144+
for obj in s3_objects:
145+
dirname = os.path.dirname(obj.key)
146+
grouped_objects[dirname].append(obj)
147+
return grouped_objects
148+
149+
142150
@dataclasses.dataclass
143151
class Folder:
144152
creation_time: datetime
@@ -863,16 +871,9 @@ def get_folder_info(
863871
Returns:
864872
List[Folder]: A list of Folder objects representing the partitions found.
865873
"""
866-
867-
prefix_to_list = prefix
868-
files = list(
869-
bucket.objects.filter(Prefix=f"{prefix_to_list}").page_size(PAGE_SIZE)
870-
)
871-
files = sorted(files, key=lambda a: a.last_modified)
872-
grouped_files = groupby(files, lambda x: x.key.rsplit("/", 1)[0])
873-
874874
partitions: List[Folder] = []
875-
for key, group in grouped_files:
875+
s3_objects = bucket.objects.filter(Prefix=prefix).page_size(PAGE_SIZE)
876+
for key, group in _group_s3_objects_by_dirname(s3_objects).items():
876877
file_size = 0
877878
creation_time = None
878879
modification_time = None

‎metadata-ingestion/tests/unit/s3/test_s3_source.py

+22-1
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,16 @@
11
from typing import List, Tuple
2+
from unittest.mock import Mock
23

34
import pytest
45

56
from datahub.emitter.mcp import MetadataChangeProposalWrapper
67
from datahub.ingestion.api.workunit import MetadataWorkUnit
78
from datahub.ingestion.source.data_lake_common.data_lake_utils import ContainerWUCreator
89
from datahub.ingestion.source.data_lake_common.path_spec import PathSpec
9-
from datahub.ingestion.source.s3.source import partitioned_folder_comparator
10+
from datahub.ingestion.source.s3.source import (
11+
_group_s3_objects_by_dirname,
12+
partitioned_folder_comparator,
13+
)
1014

1115

1216
def test_partition_comparator_numeric_folder_name():
@@ -240,3 +244,20 @@ def container_properties_filter(x: MetadataWorkUnit) -> bool:
240244
"folder_abs_path": "my-bucket/my-dir/my-dir2",
241245
"platform": "s3",
242246
}
247+
248+
249+
def test_group_s3_objects_by_dir_name():
250+
# arrange
251+
s3_objects = [
252+
Mock(key="/dir1/file1.txt"),
253+
Mock(key="/dir1/file2.txt"),
254+
Mock(key="/dir2/file3.txt"),
255+
Mock(key="/dir2/file4.txt"),
256+
]
257+
258+
# act
259+
grouped_objects = _group_s3_objects_by_dirname(s3_objects)
260+
261+
# assert
262+
assert grouped_objects["/dir1"] == [s3_objects[0], s3_objects[1]]
263+
assert grouped_objects["/dir2"] == [s3_objects[2], s3_objects[3]]

0 commit comments

Comments
 (0)
Please sign in to comment.