Skip to content

Commit

Permalink
Move graph walkers to separate module (#3581)
Browse files Browse the repository at this point in the history
## Changes
Move all graph walkers into a separate module `graph_walkers.py` under
the `linters` module to centralize them, because they are used for
linting only and to reuse them instead of recreating them

### Linked issues

Progresses #3514
Breaks up #3520

### Functionality

- [x] modified existing command: `databricks labs ucx
migrate-local-code`

### Tests

- [ ] manually tested
- [x] modified and added unit tests
- [x] modified and added integration tests
  • Loading branch information
JCZuurmond authored Feb 4, 2025
1 parent f8bf94c commit 3ca395c
Show file tree
Hide file tree
Showing 8 changed files with 337 additions and 312 deletions.
56 changes: 1 addition & 55 deletions src/databricks/labs/ucx/source_code/graph.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,7 @@
import logging
from dataclasses import dataclass
from pathlib import Path
from collections.abc import Callable, Iterable, Iterator
from typing import TypeVar, Generic
from collections.abc import Callable, Iterable

from astroid import ( # type: ignore
NodeNG,
Expand Down Expand Up @@ -601,56 +600,3 @@ def finalize(self) -> InheritedContext:
return self
tree = self.tree.renumber(-1)
return InheritedContext(tree, self.found, [])


T = TypeVar("T")


class DependencyGraphWalker(abc.ABC, Generic[T]):

def __init__(self, graph: DependencyGraph, walked_paths: set[Path], path_lookup: PathLookup):
self._graph = graph
self._walked_paths = walked_paths
self._path_lookup = path_lookup
self._lineage: list[Dependency] = []

def __iter__(self) -> Iterator[T]:
for dependency in self._graph.root_dependencies:
# the dependency is a root, so its path is the one to use
# for computing lineage and building python global context
root_path = dependency.path
yield from self._iter_one(dependency, self._graph, root_path)

def _iter_one(self, dependency: Dependency, graph: DependencyGraph, root_path: Path) -> Iterable[T]:
if dependency.path in self._walked_paths:
return
self._lineage.append(dependency)
self._walked_paths.add(dependency.path)
self._log_walk_one(dependency)
if dependency.path.is_file() or is_a_notebook(dependency.path):
inherited_tree = graph.root.build_inherited_tree(root_path, dependency.path)
path_lookup = self._path_lookup.change_directory(dependency.path.parent)
yield from self._process_dependency(dependency, path_lookup, inherited_tree)
maybe_graph = graph.locate_dependency(dependency.path)
# missing graph problems have already been reported while building the graph
if maybe_graph.graph:
child_graph = maybe_graph.graph
for child_dependency in child_graph.local_dependencies:
yield from self._iter_one(child_dependency, child_graph, root_path)
self._lineage.pop()

def _log_walk_one(self, dependency: Dependency) -> None:
logger.debug(f'Analyzing dependency: {dependency}')

@abc.abstractmethod
def _process_dependency(
self,
dependency: Dependency,
path_lookup: PathLookup,
inherited_tree: Tree | None,
) -> Iterable[T]: ...

@property
def lineage(self) -> list[LineageAtom]:
lists: list[list[LineageAtom]] = [dependency.lineage for dependency in self._lineage]
return list(itertools.chain(*lists))
186 changes: 12 additions & 174 deletions src/databricks/labs/ucx/source_code/jobs.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,14 +3,12 @@
import logging
import shutil
import tempfile
from abc import ABC, abstractmethod
from collections.abc import Generator, Iterable, Callable
from collections.abc import Generator, Iterable
from contextlib import contextmanager
from dataclasses import dataclass
from datetime import datetime, timezone
from importlib import metadata
from pathlib import Path
from typing import TypeVar
from urllib import parse

from databricks.labs.blueprint.parallel import Threads
Expand All @@ -21,20 +19,15 @@
from databricks.sdk.service import compute, jobs
from databricks.sdk.service.compute import DataSecurityMode
from databricks.sdk.service.jobs import Source
from databricks.sdk.service.workspace import Language

from databricks.labs.ucx.assessment.crawlers import runtime_version_tuple
from databricks.labs.ucx.hive_metastore.table_migration_status import TableMigrationIndex
from databricks.labs.ucx.mixins.cached_workspace_path import WorkspaceCache, InvalidPath
from databricks.labs.ucx.source_code.base import (
CurrentSessionState,
LocatedAdvice,
is_a_notebook,
file_language,
SourceInfo,
UsedTable,
LineageAtom,
safe_read_text,
)
from databricks.labs.ucx.source_code.directfs_access import (
DirectFsAccessCrawler,
Expand All @@ -47,12 +40,13 @@
DependencyResolver,
SourceContainer,
WrappingLoader,
DependencyGraphWalker,
)
from databricks.labs.ucx.source_code.linters.graph_walkers import (
LintingWalker,
DfsaCollectorWalker,
TablesCollectorWalker,
)
from databricks.labs.ucx.source_code.linters.context import LinterContext
from databricks.labs.ucx.source_code.notebooks.cells import CellLanguage
from databricks.labs.ucx.source_code.python.python_ast import MaybeTree, Tree, PythonSequentialLinter
from databricks.labs.ucx.source_code.notebooks.sources import FileLinter, Notebook
from databricks.labs.ucx.source_code.path_lookup import PathLookup
from databricks.labs.ucx.source_code.used_table import UsedTablesCrawler

Expand Down Expand Up @@ -532,7 +526,12 @@ def _lint_task(
linted_paths: set[Path],
) -> Iterable[LocatedAdvice]:
walker = LintingWalker(
graph, linted_paths, self._path_lookup, task.task_key, session_state, self._migration_index
graph,
linted_paths,
self._path_lookup,
task.task_key,
session_state,
lambda: LinterContext(self._migration_index, session_state),
)
yield from walker

Expand Down Expand Up @@ -569,164 +568,3 @@ def _collect_task_tables(
LineageAtom(object_type="TASK", object_id=f"{job_id}/{task.task_key}"),
]
yield dataclasses.replace(used_table, source_lineage=atoms + used_table.source_lineage)


class LintingWalker(DependencyGraphWalker[LocatedAdvice]):

def __init__(
self,
graph: DependencyGraph,
walked_paths: set[Path],
path_lookup: PathLookup,
key: str,
session_state: CurrentSessionState,
migration_index: TableMigrationIndex,
):
super().__init__(graph, walked_paths, path_lookup)
self._key = key
self._session_state = session_state
self._linter_context = LinterContext(migration_index, session_state)

def _log_walk_one(self, dependency: Dependency) -> None:
logger.info(f'Linting {self._key} dependency: {dependency}')

def _process_dependency(
self,
dependency: Dependency,
path_lookup: PathLookup,
inherited_tree: Tree | None,
) -> Iterable[LocatedAdvice]:
# FileLinter determines which file/notebook linter to use
linter = FileLinter(self._linter_context, path_lookup, self._session_state, dependency.path, inherited_tree)
for advice in linter.lint():
yield LocatedAdvice(advice, dependency.path)


T = TypeVar("T", bound=SourceInfo)


class _CollectorWalker(DependencyGraphWalker[T], ABC):

def __init__(
self,
graph: DependencyGraph,
walked_paths: set[Path],
path_lookup: PathLookup,
session_state: CurrentSessionState,
migration_index: TableMigrationIndex,
):
super().__init__(graph, walked_paths, path_lookup)
self._session_state = session_state
self._linter_context = LinterContext(migration_index, session_state)

def _process_dependency(
self,
dependency: Dependency,
path_lookup: PathLookup,
inherited_tree: Tree | None,
) -> Iterable[T]:
language = file_language(dependency.path)
if not language:
logger.warning(f"Unknown language for {dependency.path}")
return
cell_language = CellLanguage.of_language(language)
source = safe_read_text(dependency.path)
if not source:
return
if is_a_notebook(dependency.path):
yield from self._collect_from_notebook(source, cell_language, dependency.path, inherited_tree)
elif dependency.path.is_file():
yield from self._collect_from_source(source, cell_language, dependency.path, inherited_tree)

def _collect_from_notebook(
self,
source: str,
language: CellLanguage,
path: Path,
inherited_tree: Tree | None,
) -> Iterable[T]:
notebook = Notebook.parse(path, source, language.language)
src_timestamp = datetime.fromtimestamp(path.stat().st_mtime, timezone.utc)
src_id = str(path)
for cell in notebook.cells:
for item in self._collect_from_source(cell.original_code, cell.language, path, inherited_tree):
yield item.replace_source(source_id=src_id, source_lineage=self.lineage, source_timestamp=src_timestamp)
if cell.language is CellLanguage.PYTHON:
if inherited_tree is None:
inherited_tree = Tree.new_module()
maybe_tree = MaybeTree.from_source_code(cell.original_code)
if maybe_tree.failure:
logger.warning(maybe_tree.failure.message)
continue
assert maybe_tree.tree is not None
inherited_tree.attach_child_tree(maybe_tree.tree)

def _collect_from_source(
self,
source: str,
language: CellLanguage,
path: Path,
inherited_tree: Tree | None,
) -> Iterable[T]:
if language is CellLanguage.PYTHON:
iterable = self._collect_from_python(source, inherited_tree)
else:
fn: Callable[[str], Iterable[T]] | None = getattr(self, f"_collect_from_{language.name.lower()}", None)
if not fn:
raise ValueError(f"Language {language.name} not supported yet!")
# the below is for disabling a false pylint positive
# pylint: disable=not-callable
iterable = fn(source)
src_timestamp = datetime.fromtimestamp(path.stat().st_mtime, timezone.utc)
src_id = str(path)
for item in iterable:
yield item.replace_source(source_id=src_id, source_lineage=self.lineage, source_timestamp=src_timestamp)

@abstractmethod
def _collect_from_python(self, source: str, inherited_tree: Tree | None) -> Iterable[T]: ...

def _collect_from_sql(self, _source: str) -> Iterable[T]:
return []

def _collect_from_r(self, _source: str) -> Iterable[T]:
logger.warning("Language R not supported yet!")
return []

def _collect_from_scala(self, _source: str) -> Iterable[T]:
logger.warning("Language scala not supported yet!")
return []

def _collect_from_shell(self, _source: str) -> Iterable[T]:
return []

def _collect_from_markdown(self, _source: str) -> Iterable[T]:
return []

def _collect_from_run(self, _source: str) -> Iterable[T]:
return []

def _collect_from_pip(self, _source: str) -> Iterable[T]:
return []


class DfsaCollectorWalker(_CollectorWalker[DirectFsAccess]):

def _collect_from_python(self, source: str, inherited_tree: Tree | None) -> Iterable[DirectFsAccess]:
collector = self._linter_context.dfsa_collector(Language.PYTHON)
yield from collector.collect_dfsas(source)

def _collect_from_sql(self, source: str) -> Iterable[DirectFsAccess]:
collector = self._linter_context.dfsa_collector(Language.SQL)
yield from collector.collect_dfsas(source)


class TablesCollectorWalker(_CollectorWalker[UsedTable]):

def _collect_from_python(self, source: str, inherited_tree: Tree | None) -> Iterable[UsedTable]:
collector = self._linter_context.tables_collector(Language.PYTHON)
assert isinstance(collector, PythonSequentialLinter)
yield from collector.collect_tables(source)

def _collect_from_sql(self, source: str) -> Iterable[UsedTable]:
collector = self._linter_context.tables_collector(Language.SQL)
yield from collector.collect_tables(source)
22 changes: 4 additions & 18 deletions src/databricks/labs/ucx/source_code/linters/files.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,7 @@
from typing import TextIO

from databricks.labs.ucx.source_code.base import LocatedAdvice, CurrentSessionState, file_language, is_a_notebook
from databricks.labs.ucx.source_code.python.python_ast import Tree
from databricks.labs.ucx.source_code.notebooks.loaders import NotebookLoader
from databricks.labs.ucx.source_code.notebooks.sources import FileLinter
from databricks.labs.ucx.source_code.path_lookup import PathLookup
from databricks.labs.ucx.source_code.known import KnownList
from databricks.sdk.service.workspace import Language
Expand All @@ -29,8 +27,8 @@
SourceContainer,
DependencyResolver,
InheritedContext,
DependencyGraphWalker,
)
from databricks.labs.ucx.source_code.linters.graph_walkers import LintingWalker

logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -171,23 +169,11 @@ def lint_path(self, path: Path, linted_paths: set[Path] | None = None) -> Iterab
problems = container.build_dependency_graph(graph)
for problem in problems:
yield problem.as_located_advice()
context_factory = self._context_factory
session_state = self._session_state

class LintingWalker(DependencyGraphWalker[LocatedAdvice]):

def _process_dependency(
self, dependency: Dependency, path_lookup: PathLookup, inherited_tree: Tree | None
) -> Iterable[LocatedAdvice]:
ctx = context_factory()
# FileLinter will determine which file/notebook linter to use
linter = FileLinter(ctx, path_lookup, session_state, dependency.path, inherited_tree)
for advice in linter.lint():
yield LocatedAdvice(advice, dependency.path)

if linted_paths is None:
linted_paths = set()
walker = LintingWalker(graph, linted_paths, self._path_lookup)
walker = LintingWalker(
graph, linted_paths, self._path_lookup, path.name, self._session_state, self._context_factory
)
yield from walker


Expand Down
Loading

0 comments on commit 3ca395c

Please sign in to comment.