Skip to content

Commit

Permalink
Added setup_input_data support for diff
Browse files Browse the repository at this point in the history
  • Loading branch information
vladimir2217 committed Dec 9, 2024
1 parent 1c535f9 commit dafbe0c
Showing 1 changed file with 16 additions and 2 deletions.
18 changes: 16 additions & 2 deletions dags/roger/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -266,6 +266,16 @@ def setup_input_data(context, exec_conf):
# Download files from lakefs and store them in this new input_path
client = init_lakefs_client(config=config)
repos = exec_conf['repos']
dag_params = context["dag"]["params"]

if dag_params.get("repository_id"):
repos=[{
'repo': dag_params.get("repository_id"),
'branch': dag_params.get("branch_name"),
'commit_from': dag_params.get("commitid_from"),
'commit_to': dag_params.get("commitid_to")
}]

# if no external repo is provided we assume to get the upstream task dataset.
if not repos or len(repos) == 0:
# merge destination branch
Expand All @@ -279,7 +289,9 @@ def setup_input_data(context, exec_conf):
repos = [{
'repo': repo,
'branch': branch,
'path': f'{dag_id}/{upstream_id}'
'path': f'{dag_id}/{upstream_id}',
'commit_from': None,
'commit_to': None
} for upstream_id in upstream_ids]

# input_repo = exec_conf['input_repo']
Expand All @@ -306,7 +318,9 @@ def setup_input_data(context, exec_conf):
remote_path=r['path'],
branch=r['branch'],
repo=r['repo'],
changes_only=False,
changes_only=r.get("commitid_from") is None,
commit_from=r.get("commitid_from"),
commit_to=r.get("commitid_to"),
lake_fs_client=client
)
else:
Expand Down

0 comments on commit dafbe0c

Please sign in to comment.