Skip to content

Commit

Permalink
Issue #3853 - s5cmd support WIP
Browse files Browse the repository at this point in the history
  • Loading branch information
mzueva committed Dec 30, 2024
1 parent 5ada355 commit a4cc9bb
Showing 1 changed file with 16 additions and 3 deletions.
19 changes: 16 additions & 3 deletions workflows/pipe-common/scripts/transfer_parameters.py
Original file line number Diff line number Diff line change
Expand Up @@ -262,7 +262,8 @@ def transfer_async_without_file_list(chunk):


class InputDataTask:
def __init__(self, input_dir, common_dir, analysis_dir, task_name, bucket, report_file, rules, upload, env_suffix):
def __init__(self, input_dir, common_dir, analysis_dir, task_name, bucket, report_file, rules, upload, env_suffix,
transfer_tool):
self.input_dir = input_dir
self.common_dir = common_dir
self.analysis_dir = get_path_with_trailing_delimiter(analysis_dir)
Expand All @@ -282,6 +283,7 @@ def __init__(self, input_dir, common_dir, analysis_dir, task_name, bucket, repor
self.is_upload = upload
self.env_suffix = env_suffix
self.extra_args = os.getenv('CP_TRANSFER_PIPE_INPUT_ARGS') if self.is_upload else os.getenv('CP_TRANSFER_PIPE_OUTPUT_ARGS')
self.transfer_tool = transfer_tool

def run(self):
Logger.info('Starting localization of remote data...', task_name=self.task_name)
Expand Down Expand Up @@ -578,9 +580,19 @@ def localize_data(self, remote_locations, rules=None):
if files:
self.perform_cluster_file_transfer(files, cluster, rules=rules)

def transfer_s5cmd(self, source, destination):
if not self.is_file(source):
source = source.rstrip('/') + '/*'
cmd = 's5cmd cp {} {}'.format(source, destination)
Logger.info('Uploading files using s5cmd. Executing command: "{}"'.format(cmd))
S3Bucket().execute_command(cmd, TRANSFER_ATTEMPTS)

def perform_local_transfer(self, source, destination):
Logger.info('Uploading files from {} to {} using local pipe'.format(source, destination), self.task_name)
threads = Cluster.get_slots_per_node() if ParallelType.current() == ParallelType.Threaded else None
if self.transfer_tool == 's5cmd':
self.transfer_s5cmd(source, destination)
return
Logger.info('Uploading files from {} to {} using local pipe'.format(source, destination), self.task_name)
if self.is_upload or self.rules is None:
S3Bucket().pipe_copy(source, destination, TRANSFER_ATTEMPTS, threads=threads, extra_args=self.extra_args)
else:
Expand Down Expand Up @@ -708,6 +720,7 @@ def main():
parser.add_argument('--report-file', required=False, default=None)
parser.add_argument('--task', required=False, default=LOCALIZATION_TASK_NAME)
parser.add_argument('--env-suffix', required=False, default='_PARAM_TYPE')
transfer_tool = os.getenv('CP_TRANSFER_TOOL', 'pipe')
args = parser.parse_args()
if args.operation == 'upload':
upload = True
Expand All @@ -720,7 +733,7 @@ def main():
bucket = os.environ['CP_TRANSFER_BUCKET']
InputDataTask(args.input_dir, args.common_dir, args.analysis_dir,
args.task, bucket, args.report_file, args.storage_rules, upload,
args.env_suffix).run()
args.env_suffix, transfer_tool).run()


if __name__ == '__main__':
Expand Down

0 comments on commit a4cc9bb

Please sign in to comment.