Skip to content

Commit

Permalink
[DOP-22144] Change logic of applying limits in FileConnection.walk
Browse files Browse the repository at this point in the history
  • Loading branch information
dolfinus committed Jan 13, 2025
1 parent 20eb931 commit 354421f
Show file tree
Hide file tree
Showing 9 changed files with 27 additions and 26 deletions.
1 change: 1 addition & 0 deletions docs/changelog/next_release/326.breaking.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Change the logic of ``FileConnection.walk`` to exclude file what returned ``True`` from ``limit.stops_at(path)``.
6 changes: 3 additions & 3 deletions onetl/base/base_file_connection.py
Original file line number Diff line number Diff line change
Expand Up @@ -435,11 +435,11 @@ def walk(
If ``True``, walk in top-down order, otherwise walk in bottom-up order.
filters : list of :obj:`BaseFileFilter <onetl.base.base_file_filter.BaseFileFilter>`, optional
Return only files/directories matching these filters. See :ref:`file-filters`
Return only files/directories matching these filters. See :ref:`file-filters`.
limits : list of :obj:`BaseFileLimit <onetl.base.base_file_limit.BaseFileLimit>`, optional
Apply limits to the list of files/directories, and stop if one of the limits is reached.
See :ref:`file-limits`
Apply limits to the list of files/directories, and immediately stop if any of these limits is reached.
See :ref:`file-limits`.
Returns
-------
Expand Down
23 changes: 10 additions & 13 deletions onetl/connection/file_connection/file_connection.py
Original file line number Diff line number Diff line change
Expand Up @@ -415,6 +415,9 @@ def list_dir(
limits = reset_limits(limits or [])

for entry in self._scan_entries(remote_dir):
if limits_reached(limits):
break

name = self._extract_name_from_entry(entry)
stat = self._extract_stat_from_entry(remote_dir, entry)

Expand All @@ -423,12 +426,9 @@ def list_dir(
else:
path = RemoteFile(path=name, stats=stat)

if match_all_filters(path, filters):
if match_all_filters(path, filters) and not limits_stop_at(path, limits):
result.append(path)

if limits_stop_at(path, limits):
break

return result

@slot
Expand Down Expand Up @@ -491,6 +491,9 @@ def _walk( # noqa: WPS231
dirs, files = [], []

for entry in self._scan_entries(root):
if limits_reached(limits):
break

name = self._extract_name_from_entry(entry)
stat = self._extract_stat_from_entry(root, entry)

Expand All @@ -499,21 +502,15 @@ def _walk( # noqa: WPS231
yield from self._walk(root=root / name, topdown=topdown, filters=filters, limits=limits)

path = RemoteDirectory(path=root / name, stats=stat)
if match_all_filters(path, filters):
if match_all_filters(path, filters) and not limits_stop_at(path, limits):
dirs.append(RemoteDirectory(path=name, stats=stat))

if limits_stop_at(path, limits):
break
else:
path = RemoteFile(path=root / name, stats=stat)

if match_all_filters(path, filters):
if match_all_filters(path, filters) and not limits_stop_at(path, limits):
files.append(RemoteFile(path=name, stats=stat))

if limits_stop_at(path, limits):
break

if topdown:
if topdown and not limits_reached(limits):
for name in dirs:
yield from self._walk(root=root / name, topdown=topdown, filters=filters, limits=limits)

Expand Down
2 changes: 1 addition & 1 deletion onetl/core/file_limit/file_limit.py
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ def stops_at(self, path: PathProtocol) -> bool:

@property
def is_reached(self) -> bool:
return self._counter >= self.count_limit
return self._counter > self.count_limit

@validator("count_limit")
def _deprecated(cls, value):
Expand Down
2 changes: 1 addition & 1 deletion onetl/file/limit/max_files_count.py
Original file line number Diff line number Diff line change
Expand Up @@ -76,4 +76,4 @@ def stops_at(self, path: PathProtocol) -> bool:

@property
def is_reached(self) -> bool:
return self._handled >= self.limit
return self._handled > self.limit
2 changes: 1 addition & 1 deletion onetl/file/limit/total_files_size.py
Original file line number Diff line number Diff line change
Expand Up @@ -85,4 +85,4 @@ def stops_at(self, path: PathProtocol) -> bool:

@property
def is_reached(self) -> bool:
return self._handled >= self.limit
return self._handled > self.limit
Original file line number Diff line number Diff line change
Expand Up @@ -790,7 +790,7 @@ def finalizer():
downloader.run([not_a_file])


def test_file_downloader_with_file_limit(file_connection_with_path_and_files, tmp_path_factory, caplog):
def test_file_downloader_with_limit(file_connection_with_path_and_files, tmp_path_factory, caplog):
file_connection, remote_path, _ = file_connection_with_path_and_files
limit = 2
local_path = tmp_path_factory.mktemp("local_path")
Expand All @@ -814,7 +814,7 @@ def test_file_downloader_with_file_limit(file_connection_with_path_and_files, tm
assert len(download_result.successful) == limit


def test_file_downloader_file_limit_is_ignored_by_user_input(
def test_file_downloader_limit_is_ignored_by_user_input(
file_connection_with_path_and_files,
tmp_path_factory,
):
Expand Down
11 changes: 7 additions & 4 deletions tests/tests_unit/test_file/test_limit/test_max_files_count.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,10 +36,10 @@ def test_max_files_count():
assert not limit.stops_at(directory)
assert not limit.is_reached

# limit is reached - all check are True, input does not matter
assert limit.stops_at(file3)
assert limit.is_reached
assert not limit.stops_at(file3)
assert not limit.is_reached

# limit is reached - all check are True, input does not matter
assert limit.stops_at(file4)
assert limit.is_reached

Expand All @@ -56,5 +56,8 @@ def test_max_files_count():
assert not limit.stops_at(file1)
assert not limit.is_reached

assert limit.stops_at(file1)
assert not limit.stops_at(file3)
assert not limit.is_reached

assert limit.stops_at(file4)
assert limit.is_reached
Original file line number Diff line number Diff line change
Expand Up @@ -77,5 +77,5 @@ def test_total_files_size():
assert not limit.stops_at(file1)
assert not limit.is_reached

assert limit.stops_at(file1)
assert limit.stops_at(file3)
assert limit.is_reached

0 comments on commit 354421f

Please sign in to comment.