From d4548ad14a3fe0e8da142eb89d16f0fb49cfe173 Mon Sep 17 00:00:00 2001 From: Martynas Asipauskas Date: Thu, 5 Sep 2024 12:03:28 +0100 Subject: [PATCH] Clarify that id = armada job-id (#3905) --- third_party/airflow/armada/operators/armada.py | 11 ++++++----- 1 file changed, 6 insertions(+), 5 deletions(-) diff --git a/third_party/airflow/armada/operators/armada.py b/third_party/airflow/armada/operators/armada.py index c7e51f1a536..a1087d46f30 100644 --- a/third_party/airflow/armada/operators/armada.py +++ b/third_party/airflow/armada/operators/armada.py @@ -213,7 +213,7 @@ def on_kill(self) -> None: if self.job_context is not None: self.log.info( f"on_kill called, " - f"cancelling job with id {self.job_context.job_id} in queue " + f"cancelling armada job with job-id {self.job_context.job_id} in queue " f"{self.job_context.armada_queue}" ) self.hook.cancel_job(self.job_context) @@ -264,7 +264,8 @@ def _reattach_or_submit_job( ) if existing_run is not None: self.log.info( - f"Attached to existing job with id {existing_run['armada_job_id']}." + "Attached to existing armada job " + f"with job-id {existing_run['armada_job_id']}." f" {self._trigger_tracking_message(existing_run['armada_job_id'])}" ) return RunningJobContext( @@ -277,7 +278,7 @@ def _reattach_or_submit_job( # We haven't got a running job, submit a new one and persist state to xcom. ctx = self.hook.submit_job(self.armada_queue, job_set_id, job_request) tracking_msg = self._trigger_tracking_message(ctx.job_id) - self.log.info(f"Submitted job with id {ctx.job_id}. {tracking_msg}") + self.log.info(f"Submitted job to armada with job-id {ctx.job_id}. {tracking_msg}") ti.xcom_push( key=f"{ti.try_number}", @@ -325,8 +326,8 @@ def _check_job_status_and_fetch_logs(self) -> None: if self._not_acknowledged_within_timeout(): self.log.info( - f"Job {self.job_context.job_id} not acknowledged by the Armada within " - f"timeout ({self.job_acknowledgement_timeout}), terminating" + f"Armada job with job-id: {self.job_context.job_id} not acknowledged " + f"within timeout ({self.job_acknowledgement_timeout}), terminating" ) self.job_context = self.hook.cancel_job(self.job_context) return