From 0220c422980a740b5ce645c10d5593a1c89f45af Mon Sep 17 00:00:00 2001
From: Cor <jczuurmond@protonmail.com>
Date: Tue, 21 Jan 2025 18:10:44 +0100
Subject: [PATCH] Let `PipelinesMigrator` skip unfound jobs (#3554)

## Changes
Let `PipelinesMigrator` skip unfound jobs. This might happen when jobs
are deleted inbetween assessment and running pipelines migration

### Linked issues

Resolves #3490

### Functionality

- [x] modified existing command: `databricks labs ucx
migrate-dlt-pipelines`

### Tests

- [x] added unit tests

---------

Co-authored-by: Guenia Izquierdo Delgado <guenia.izquierdo@databricks.com>
---
 .../ucx/hive_metastore/pipelines_migrate.py   |  8 +++--
 .../hive_metastore/test_pipeline_migrate.py   |  8 ++++-
 .../hive_metastore/test_pipeline_migrate.py   | 29 +++++++++++++++----
 3 files changed, 36 insertions(+), 9 deletions(-)

diff --git a/src/databricks/labs/ucx/hive_metastore/pipelines_migrate.py b/src/databricks/labs/ucx/hive_metastore/pipelines_migrate.py
index b01233c427..497b864227 100644
--- a/src/databricks/labs/ucx/hive_metastore/pipelines_migrate.py
+++ b/src/databricks/labs/ucx/hive_metastore/pipelines_migrate.py
@@ -4,7 +4,7 @@
 
 from databricks.labs.blueprint.parallel import Threads
 from databricks.sdk import WorkspaceClient
-from databricks.sdk.errors import DatabricksError
+from databricks.sdk.errors import DatabricksError, NotFound
 from databricks.sdk.service.jobs import PipelineTask, Task, JobSettings
 
 from databricks.labs.ucx.assessment.jobs import JobsCrawler
@@ -53,7 +53,11 @@ def _populate_pipeline_job_tasks_mapping(self) -> None:
             if not job.job_id:
                 continue
 
-            job_details = self._ws.jobs.get(int(job.job_id))
+            try:
+                job_details = self._ws.jobs.get(int(job.job_id))
+            except NotFound:
+                logger.warning(f"Skipping non-existing job: {job.job_id}")
+                continue
             if not job_details.settings or not job_details.settings.tasks:
                 continue
 
diff --git a/tests/integration/hive_metastore/test_pipeline_migrate.py b/tests/integration/hive_metastore/test_pipeline_migrate.py
index 24ccfa6ba3..b1335e85e4 100644
--- a/tests/integration/hive_metastore/test_pipeline_migrate.py
+++ b/tests/integration/hive_metastore/test_pipeline_migrate.py
@@ -11,7 +11,13 @@
 
 
 def test_pipeline_migrate(
-    ws, make_pipeline, make_random, watchdog_purge_suffix, make_directory, runtime_ctx, make_mounted_location
+    ws,
+    make_pipeline,
+    make_random,
+    watchdog_purge_suffix,
+    make_directory,
+    runtime_ctx,
+    make_mounted_location,
 ) -> None:
     src_schema = runtime_ctx.make_schema(catalog_name="hive_metastore")
 
diff --git a/tests/unit/hive_metastore/test_pipeline_migrate.py b/tests/unit/hive_metastore/test_pipeline_migrate.py
index c07c460aa7..8294dea2c6 100644
--- a/tests/unit/hive_metastore/test_pipeline_migrate.py
+++ b/tests/unit/hive_metastore/test_pipeline_migrate.py
@@ -4,6 +4,7 @@
 
 from databricks.labs.lsql.backends import MockBackend
 from databricks.sdk.service.jobs import BaseJob, JobSettings, Task, PipelineTask
+from databricks.sdk.errors import NotFound
 
 from databricks.labs.ucx.assessment.jobs import JobsCrawler
 from databricks.labs.ucx.assessment.pipelines import PipelinesCrawler
@@ -91,14 +92,30 @@ def test_migrate_pipelines(ws, mock_installation, pipeline_spec, include_flag, e
         ws.api_client.do.assert_has_calls([api_calls])
 
 
-def test_migrate_pipelines_no_pipelines(
-    ws,
-):
-    errors = {}
-    rows = {}
-    sql_backend = MockBackend(fails_on_first=errors, rows=rows)
+def test_migrate_pipelines_no_pipelines(ws) -> None:
+    sql_backend = MockBackend()
     pipelines_crawler = PipelinesCrawler(ws, sql_backend, "inventory_database")
     jobs_crawler = JobsCrawler(ws, sql_backend, "inventory_database")
     pipelines_migrator = PipelinesMigrator(ws, pipelines_crawler, jobs_crawler, "catalog_name")
     ws.jobs.list.return_value = [BaseJob(job_id=536591785949415), BaseJob(), BaseJob(job_id=536591785949417)]
     pipelines_migrator.migrate_pipelines()
+
+
+def test_migrate_pipelines_skips_not_found_job(caplog, ws) -> None:
+    job_columns = MockBackend.rows("job_id", "success", "failures", "job_name", "creator")
+    sql_backend = MockBackend(
+        rows={
+            "`hive_metastore`.`inventory_database`.`jobs`": job_columns[
+                ("536591785949415", 1, [], "single-job", "anonymous@databricks.com")
+            ]
+        }
+    )
+    pipelines_crawler = PipelinesCrawler(ws, sql_backend, "inventory_database")
+    jobs_crawler = JobsCrawler(ws, sql_backend, "inventory_database")
+    pipelines_migrator = PipelinesMigrator(ws, pipelines_crawler, jobs_crawler, "catalog_name")
+
+    ws.jobs.get.side_effect = NotFound
+
+    with caplog.at_level(logging.WARNING, logger="databricks.labs.ucx.hive_metastore.pipelines_migrate"):
+        pipelines_migrator.migrate_pipelines()
+    assert "Skipping non-existing job: 536591785949415" in caplog.messages