Skip to content

Commit

Permalink
Bit by the snake
Browse files Browse the repository at this point in the history
  • Loading branch information
moradology committed May 30, 2024
1 parent bc11c27 commit f18c2b1
Showing 1 changed file with 4 additions and 3 deletions.
7 changes: 4 additions & 3 deletions pangeo_forge_recipes/transforms.py
Original file line number Diff line number Diff line change
Expand Up @@ -146,13 +146,13 @@ class TransferFilesWithConcurrency(beam.DoFn):
transfer_target: The target directory to which files will be transferred.
concurrency_per_executor: The number of concurrent threads per executor.
secrets: Optional dictionary containing secrets required for accessing the transfer target.
open_kwargs: Dictionary of keyword arguments to be passed when opening files.
open_kwargs: Optional dictionary of keyword arguments to be passed when opening files.
"""

transfer_target: CacheFSSpecTarget
max_concurrency: int
secrets: Optional[Dict] = None
open_kwargs: Dict = {}
open_kwargs: Optional[Dict] = None

def process(self, element):
# key here is assigned solely to limit number of workers; we drop it immediately
Expand All @@ -171,7 +171,8 @@ def process(self, element):
logger.error(f"Error transferring file {url}: {e}")

def transfer_file(self, index: Index, url: str) -> Tuple[Index, str]:
self.transfer_target.cache_file(url, self.secrets, **self.open_kwargs)
open_kwargs = self.open_kwargs or {}
self.transfer_target.cache_file(url, self.secrets, **open_kwargs)
return (index, self.transfer_target._full_path(url))


Expand Down

0 comments on commit f18c2b1

Please sign in to comment.