Skip to content

Commit

Permalink
Merge branch 'main' into fix/remove-tree-from-python-sequential-linter
Browse files Browse the repository at this point in the history
  • Loading branch information
JCZuurmond committed Feb 4, 2025
2 parents 02e2254 + 3ca395c commit 3362158
Show file tree
Hide file tree
Showing 40 changed files with 724 additions and 516 deletions.
4 changes: 3 additions & 1 deletion docs/ucx/docs/dev/contributing.mdx
Original file line number Diff line number Diff line change
Expand Up @@ -302,7 +302,9 @@ rdd-in-shared-clusters
spark-logging-in-shared-clusters
sql-parse-error
sys-path-cannot-compute-value
table-migrated-to-uc
table-migrated-to-uc-python
table-migrated-to-uc-python-sql
table-migrated-to-uc-sql
to-json-in-shared-clusters
unsupported-magic-line
```
Expand Down
45 changes: 43 additions & 2 deletions docs/ucx/docs/process/index.mdx
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,9 @@ On a high level, the steps in migration process are:
2. [group migration](/docs/reference/workflows#group-migration-workflow)
3. [table migration](/docs/process/#table-migration-process)
4. [data reconciliation](/docs/reference/workflows#post-migration-data-reconciliation-workflow)
5. [code migration](#code-migration)
6. [final details](#final-details)
6. [code migration](/docs/reference/commands#code-migration-commands)
7. [delta live table pipeline migration](/docs/process#delta-live-table-pipeline-migration-process)
8. [final details](#final-details)

The migration process can be schematic visualized as:

Expand Down Expand Up @@ -288,6 +289,7 @@ databricks labs ucx revert-migrated-tables --schema X --table Y [--delete-manage
The [`revert-migrated-tables` command](/docs/reference/commands#revert-migrated-tables) drops the Unity Catalog table or view and reset
the `upgraded_to` property on the source object. Use this command to allow for migrating a table or view again.


## Code Migration

After you're done with the [table migration](#table-migration-process) and
Expand All @@ -307,6 +309,45 @@ After investigating the code linter advices, code can be migrated. We recommend
- Use the [`migrate-` commands`](/docs/reference/commands#code-migration-commands) to migrate resources.
- Set the [default catalog](https://docs.databricks.com/en/catalogs/default.html) to Unity Catalog.


## Delta Live Table Pipeline Migration Process

> You are required to complete the [assessment workflow](/docs/reference/workflows#assessment-workflow) before starting the pipeline migration workflow.
The pipeline migration process is a workflow that clones the Hive Metastore Delta Live Table (DLT) pipelines to the Unity Catalog.
Upon the first update, the cloned pipeline will copy over all the data and checkpoints, and then run normally thereafter. After the cloned pipeline reaches ‘RUNNING’, both the original and the cloned pipeline can run independently.

#### Example:
Existing HMS DLT pipeline is called "dlt_pipeline", the pipeline will be stopped and renamed to "dlt_pipeline [OLD]". The new cloned pipeline will be "dlt_pipeline".

### Known issues and Limitations:
- Only clones from HMS to UC are supported.
- Pipelines may only be cloned within the same workspace.
- HMS pipelines must currently be publishing tables to some target schema.
- Only the following streaming sources are supported:
- Delta
- [Autoloader](https://docs.databricks.com/en/ingestion/cloud-object-storage/auto-loader/index.html)
- If your pipeline uses Autoloader with file notification events, do not run the original HMS pipeline after cloning as this will cause some file notification events to be dropped from the UC clone. If the HMS original was started accidentally, missed files can be backfilled by using the `cloudFiles.backfillInterval` option in Autoloader.
- Kafka where `kafka.group.id` is not set
- Kinesis where `consumerMode` is not "efo"
- [Maintenance](https://docs.databricks.com/en/delta-live-tables/index.html#maintenance-tasks-performed-by-delta-live-tables) is automatically paused (for both pipelines) while migration is in progress
- If an Autoloader source specifies an explicit `cloudFiles.schemaLocation`, `mergeSchema` needs to be set to true for the HMS original and UC clone to operate concurrently.
- Pipelines that publish tables to custom schemas are not supported.
- On tables cloned to UC, time travel queries are undefined when querying by timestamp to versions originally written on HMS. Time travel queries by version will work correctly, as will time travel queries by timestamp to versions written on UC.
- [All existing limitations](https://docs.databricks.com/en/delta-live-tables/unity-catalog.html#limitations) of using DLT on UC.
- [Existing UC limitations](https://docs.databricks.com/en/data-governance/unity-catalog/index.html#limitations)
- If tables in the HMS pipeline specify storage locations (using the "path" parameter in Python or the LOCATION clause in SQL), the configuration "pipelines.migration.ignoreExplicitPath" can be set to "true" to ignore the parameter in the cloned pipeline.


### Considerations
- Do not edit the notebooks that define the pipeline during cloning.
- The original pipeline should not be running when requesting the clone.
- When a clone is requested, DLT will automatically start an update to migrate the existing data and metadata for Streaming Tables, allowing them to pick up where the original pipeline left off.
- It is expected that the update metrics do not include the migrated data.
- Make sure all name-based references in the HMS pipeline are fully qualified, e.g. hive_metastore.schema.table
- After the UC clone reaches RUNNING, both the original pipeline and the cloned pipeline may run independently.


## Final details

Once you're done with the [code migration](#code-migration), you can run the:
Expand Down
10 changes: 10 additions & 0 deletions docs/ucx/docs/reference/commands/index.mdx
Original file line number Diff line number Diff line change
Expand Up @@ -660,6 +660,16 @@ It takes a `WorkspaceClient` object and `from` and `to` parameters as parameters
the `TableMove` class. This command is useful for developers and administrators who want to create an alias for a table.
It can also be used to debug issues related to table aliasing.

## Pipeline migration commands

These commands are for [pipeline migration](/docs/process#delta-live-table-pipeline-migration-process) and require the [assessment workflow](/docs/reference/workflows#assessment-workflow) to be completed.

### `migrate-dlt-pipelines`

```text
$ databricks labs ucx migrate-dlt-pipelines [--include-pipeline-ids <comma separated list of pipeline ids>] [--exclude-pipeline-ids <comma separated list of pipeline ids>]
```

## Utility commands

### `logs`
Expand Down
14 changes: 9 additions & 5 deletions docs/ucx/docs/reference/linter_codes.mdx
Original file line number Diff line number Diff line change
Expand Up @@ -31,11 +31,11 @@ spark.table(f"foo_{some_table_name}")

We even detect string constants when coming either from `dbutils.widgets.get` (via job named parameters) or through
loop variables. If `old.things` table is migrated to `brand.new.stuff` in Unity Catalog, the following code will
trigger two messages: [`table-migrated-to-uc`](#table-migrated-to-uc) for the first query, as the contents are clearly
analysable, and `cannot-autofix-table-reference` for the second query.
trigger two messages: [`table-migrated-to-uc-{sql,python,python-sql}`](#table-migrated-to-uc-sqlpythonpython-sql) for
the first query, as the contents are clearly analysable, and `cannot-autofix-table-reference` for the second query.

```python
# ucx[table-migrated-to-uc:+4:4:+4:20] Table old.things is migrated to brand.new.stuff in Unity Catalog
# ucx[table-migrated-to-uc-python-sql:+4:4:+4:20] Table old.things is migrated to brand.new.stuff in Unity Catalog
# ucx[cannot-autofix-table-reference:+3:4:+3:20] Can't migrate table_name argument in 'spark.sql(query)' because its value cannot be computed
table_name = f"table_{index}"
for query in ["SELECT * FROM old.things", f"SELECT * FROM {table_name}"]:
Expand Down Expand Up @@ -247,12 +247,16 @@ analysis where the path is located.



## `table-migrated-to-uc`
## `table-migrated-to-uc-{sql,python,python-sql}`

This message indicates that the linter has found a table that has been migrated to Unity Catalog. The user must ensure
that the table is available in Unity Catalog.


| Postfix | Explanation |
|------------|-------------------------------------------------|
| sql | Table reference in SparkSQL |
| python | Table reference in PySpark |
| python-sql | Table reference in SparkSQL called from PySpark |

## `to-json-in-shared-clusters`

Expand Down
7 changes: 6 additions & 1 deletion src/databricks/labs/ucx/source_code/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -164,7 +164,12 @@ class Fixer(ABC):

@property
@abstractmethod
def name(self) -> str: ...
def diagnostic_code(self) -> str:
"""The diagnostic code that this fixer fixes."""

def is_supported(self, diagnostic_code: str) -> bool:
"""Indicate if the diagnostic code is supported by this fixer."""
return self.diagnostic_code is not None and diagnostic_code == self.diagnostic_code

@abstractmethod
def apply(self, code: str) -> str: ...
Expand Down
56 changes: 1 addition & 55 deletions src/databricks/labs/ucx/source_code/graph.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,7 @@
import logging
from dataclasses import dataclass
from pathlib import Path
from collections.abc import Callable, Iterable, Iterator
from typing import TypeVar, Generic
from collections.abc import Callable, Iterable

from astroid import ( # type: ignore
NodeNG,
Expand Down Expand Up @@ -601,56 +600,3 @@ def finalize(self) -> InheritedContext:
return self
tree = self.tree.renumber(-1)
return InheritedContext(tree, self.found, [])


T = TypeVar("T")


class DependencyGraphWalker(abc.ABC, Generic[T]):

def __init__(self, graph: DependencyGraph, walked_paths: set[Path], path_lookup: PathLookup):
self._graph = graph
self._walked_paths = walked_paths
self._path_lookup = path_lookup
self._lineage: list[Dependency] = []

def __iter__(self) -> Iterator[T]:
for dependency in self._graph.root_dependencies:
# the dependency is a root, so its path is the one to use
# for computing lineage and building python global context
root_path = dependency.path
yield from self._iter_one(dependency, self._graph, root_path)

def _iter_one(self, dependency: Dependency, graph: DependencyGraph, root_path: Path) -> Iterable[T]:
if dependency.path in self._walked_paths:
return
self._lineage.append(dependency)
self._walked_paths.add(dependency.path)
self._log_walk_one(dependency)
if dependency.path.is_file() or is_a_notebook(dependency.path):
inherited_tree = graph.root.build_inherited_tree(root_path, dependency.path)
path_lookup = self._path_lookup.change_directory(dependency.path.parent)
yield from self._process_dependency(dependency, path_lookup, inherited_tree)
maybe_graph = graph.locate_dependency(dependency.path)
# missing graph problems have already been reported while building the graph
if maybe_graph.graph:
child_graph = maybe_graph.graph
for child_dependency in child_graph.local_dependencies:
yield from self._iter_one(child_dependency, child_graph, root_path)
self._lineage.pop()

def _log_walk_one(self, dependency: Dependency) -> None:
logger.debug(f'Analyzing dependency: {dependency}')

@abc.abstractmethod
def _process_dependency(
self,
dependency: Dependency,
path_lookup: PathLookup,
inherited_tree: Tree | None,
) -> Iterable[T]: ...

@property
def lineage(self) -> list[LineageAtom]:
lists: list[list[LineageAtom]] = [dependency.lineage for dependency in self._lineage]
return list(itertools.chain(*lists))
Loading

0 comments on commit 3362158

Please sign in to comment.