-
Notifications
You must be signed in to change notification settings - Fork 124
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Pause test workflow when changes are deployed to production #842
Comments
@panoptikum I am using the Python Databricks SDK for something similar to what you are describing. When testing changes to a pipeline I pause the current production job (production referring to live in Databricks, this could be in dev, stage, or prod) and cancel any running job runs. I then deploy the changed one as an adhoc workflow, run it, and then when it's done unpause the job. Below is a copy/paste from my deployment logic, hopefully it will give you some idea of how to accomplish what you are wanting to do. def dbx_adhoc_run():
args: KnownArgs = ArgsToDataClass(KnownArgs).dclass_instance
print("") # separate next printed line from args
non_default_args = args.get_non_default_args()
if non_default_args:
args.print_non_default_args(non_default_args)
if not non_default_args:
echo("Skipping adhoc run because all args are default")
return
args.adhoc = True
create_deployment(args)
# this will deploy with a different name ("adhoc" added to name)
dbx_deploy(args=args)
w = WorkspaceClient()
with pause_current_job(w, args):
with one_time_job_run(w, args) as run:
print("")
echo(f"Created new run: {run.run_page_url}")
try:
_ = w.jobs.wait_get_run_job_terminated_or_skipped(
run_id=run.run_id, timeout=timedelta(hours=args.timeout_hours)
)
run_status = w.jobs.get_run(run_id=run.run_id)
task_output_dict: dict[str, RunOutput] = {}
for task in run_status.tasks:
task_output = w.jobs.get_run_output(run_id=task.run_id)
task_output_dict[task.task_key] = task_output
echo(f"Run ID: {run.run_id} finished in state: {run_status.state.result_state}")
for task_key, task_output in task_output_dict.items():
echo(f"Task logs: {task_key}\n")
echo(task_output.logs)
except KeyboardInterrupt:
print("") # skip the line that has the ^C
echo("Keyboard interrupt, canceling run")
w.jobs.cancel_run_and_wait(run_id=run.run_id)
raise
if run_status.state.result_state != RunResultState.SUCCESS:
for task_key, task_output in task_output_dict.items():
if task_output.error:
echo(f"Error: {task_key}\n")
echo("Error: ", task_output.error)
echo("Error Trace:", task_output.error_trace)
raise Exception(f"Run ID: {run.run_id} finished in state: {run_status.state.result_state}")
@contextmanager
def one_time_job_run(w: WorkspaceClient, args: KnownArgs) -> Generator[Run, None, None]:
exc = None
try:
one_time_job = get_job(w, get_package_name(args))
run_wait: Run = w.jobs.run_now(job_id=one_time_job.job_id)
run = w.jobs.get_run(run_id=run_wait.run_id)
yield run
except: # noqa
exc = sys.exc_info()[0]
finally:
echo("Deleting one time job")
w.jobs.delete(job_id=one_time_job.job_id)
if exc is KeyboardInterrupt:
return True
elif exc:
raise exc
@contextmanager
def pause_current_job(w: WorkspaceClient, args: KnownArgs):
exc = None
try:
# get the standard job name so we can pause the schedule and cancel any running jobs
job_name = get_package_name(args.catalog)
standard_job = get_job(w, job_name)
job_runs = list(w.jobs.list_runs(job_id=standard_job.job_id, active_only=True)) or []
for job_run in job_runs:
echo(f"Canceling existing run: {job_run.run_id}")
w.jobs.cancel_run_and_wait(run_id=job_run.run_id)
orig_settings = standard_job.settings
if not orig_settings.schedule or orig_settings.schedule.pause_status == PauseStatus.PAUSED:
yield
return
echo("Pausing schedule")
copied_settings = deepcopy(orig_settings)
copied_settings.schedule.pause_status = PauseStatus.PAUSED
w.jobs.update(job_id=standard_job.job_id, new_settings=copied_settings)
yield
except: # noqa
exc = sys.exc_info()[0]
finally:
echo("Unpausing schedule")
w.jobs.update(job_id=standard_job.job_id, new_settings=orig_settings)
if exc is KeyboardInterrupt:
return True
elif exc:
raise exc
def get_job(w: WorkspaceClient, job_name: str):
job_list = list(w.jobs.list(name=job_name))
job_id = job_list[0].job_id
existing_job = w.jobs.get(job_id=job_id)
return existing_job |
1 similar comment
@panoptikum I am using the Python Databricks SDK for something similar to what you are describing. When testing changes to a pipeline I pause the current production job (production referring to live in Databricks, this could be in dev, stage, or prod) and cancel any running job runs. I then deploy the changed one as an adhoc workflow, run it, and then when it's done unpause the job. Below is a copy/paste from my deployment logic, hopefully it will give you some idea of how to accomplish what you are wanting to do. def dbx_adhoc_run():
args: KnownArgs = ArgsToDataClass(KnownArgs).dclass_instance
print("") # separate next printed line from args
non_default_args = args.get_non_default_args()
if non_default_args:
args.print_non_default_args(non_default_args)
if not non_default_args:
echo("Skipping adhoc run because all args are default")
return
args.adhoc = True
create_deployment(args)
# this will deploy with a different name ("adhoc" added to name)
dbx_deploy(args=args)
w = WorkspaceClient()
with pause_current_job(w, args):
with one_time_job_run(w, args) as run:
print("")
echo(f"Created new run: {run.run_page_url}")
try:
_ = w.jobs.wait_get_run_job_terminated_or_skipped(
run_id=run.run_id, timeout=timedelta(hours=args.timeout_hours)
)
run_status = w.jobs.get_run(run_id=run.run_id)
task_output_dict: dict[str, RunOutput] = {}
for task in run_status.tasks:
task_output = w.jobs.get_run_output(run_id=task.run_id)
task_output_dict[task.task_key] = task_output
echo(f"Run ID: {run.run_id} finished in state: {run_status.state.result_state}")
for task_key, task_output in task_output_dict.items():
echo(f"Task logs: {task_key}\n")
echo(task_output.logs)
except KeyboardInterrupt:
print("") # skip the line that has the ^C
echo("Keyboard interrupt, canceling run")
w.jobs.cancel_run_and_wait(run_id=run.run_id)
raise
if run_status.state.result_state != RunResultState.SUCCESS:
for task_key, task_output in task_output_dict.items():
if task_output.error:
echo(f"Error: {task_key}\n")
echo("Error: ", task_output.error)
echo("Error Trace:", task_output.error_trace)
raise Exception(f"Run ID: {run.run_id} finished in state: {run_status.state.result_state}")
@contextmanager
def one_time_job_run(w: WorkspaceClient, args: KnownArgs) -> Generator[Run, None, None]:
exc = None
try:
one_time_job = get_job(w, get_package_name(args))
run_wait: Run = w.jobs.run_now(job_id=one_time_job.job_id)
run = w.jobs.get_run(run_id=run_wait.run_id)
yield run
except: # noqa
exc = sys.exc_info()[0]
finally:
echo("Deleting one time job")
w.jobs.delete(job_id=one_time_job.job_id)
if exc is KeyboardInterrupt:
return True
elif exc:
raise exc
@contextmanager
def pause_current_job(w: WorkspaceClient, args: KnownArgs):
exc = None
try:
# get the standard job name so we can pause the schedule and cancel any running jobs
job_name = get_package_name(args.catalog)
standard_job = get_job(w, job_name)
job_runs = list(w.jobs.list_runs(job_id=standard_job.job_id, active_only=True)) or []
for job_run in job_runs:
echo(f"Canceling existing run: {job_run.run_id}")
w.jobs.cancel_run_and_wait(run_id=job_run.run_id)
orig_settings = standard_job.settings
if not orig_settings.schedule or orig_settings.schedule.pause_status == PauseStatus.PAUSED:
yield
return
echo("Pausing schedule")
copied_settings = deepcopy(orig_settings)
copied_settings.schedule.pause_status = PauseStatus.PAUSED
w.jobs.update(job_id=standard_job.job_id, new_settings=copied_settings)
yield
except: # noqa
exc = sys.exc_info()[0]
finally:
echo("Unpausing schedule")
w.jobs.update(job_id=standard_job.job_id, new_settings=orig_settings)
if exc is KeyboardInterrupt:
return True
elif exc:
raise exc
def get_job(w: WorkspaceClient, job_name: str):
job_list = list(w.jobs.list(name=job_name))
job_id = job_list[0].job_id
existing_job = w.jobs.get(job_id=job_id)
return existing_job |
Hi @NodeJSmith , Thank you for sharing your code. I'll look into it and see how I can adapt it to my needs. Best, |
Hello,
I am currently thinking about how to implement a process that automatically switches the Job attribute "PAUSE_STATUS" to PAUSED, if I push changes either per tag or whatever method to production. Is this something dbx somehow supports? I could not find anything...
Right now, we usually set up new pipelines in the test environment first than we ship things to production. If I do changes now, we activate that test workflow again but it does not get switched off again if changes move to production as well.
Any input on this matter is much appreciated.
Best
Felix
The text was updated successfully, but these errors were encountered: