Skip to content

Commit

Permalink
[GEN-1406]: import plugin and clear plugin message for failed source …
Browse files Browse the repository at this point in the history
…import (#17788)

* support side effects on source classes by always importing source class

* streamlined error message

* fixed service type extraction for test suite pipeline

* - replaced "custom" with constant
- added quotes for the plugin exception for copy/paste ergonomics

(cherry picked from commit 59854de)
  • Loading branch information
sushi30 committed Sep 13, 2024
1 parent 05cb36f commit 6cd5a43
Show file tree
Hide file tree
Showing 10 changed files with 110 additions and 41 deletions.
15 changes: 13 additions & 2 deletions ingestion/src/metadata/data_quality/source/test_suite.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
from metadata.generated.schema.entity.services.ingestionPipelines.status import (
StackTraceError,
)
from metadata.generated.schema.entity.services.serviceType import ServiceType
from metadata.generated.schema.metadataIngestion.testSuitePipeline import (
TestSuitePipeline,
)
Expand All @@ -36,6 +37,8 @@
from metadata.ingestion.api.steps import Source
from metadata.ingestion.ometa.ometa_api import OpenMetadata
from metadata.utils import fqn
from metadata.utils.constants import CUSTOM_CONNECTOR_PREFIX
from metadata.utils.importer import import_source_class
from metadata.utils.logger import test_suite_logger

logger = test_suite_logger()
Expand Down Expand Up @@ -73,7 +76,7 @@ def _get_table_entity(self) -> Optional[Table]:
table: Table = self.metadata.get_by_name(
entity=Table,
fqn=self.source_config.entityFullyQualifiedName.root,
fields=["tableProfilerConfig", "testSuite"],
fields=["tableProfilerConfig", "testSuite", "serviceType"],
)

return table
Expand Down Expand Up @@ -104,8 +107,16 @@ def test_connection(self) -> None:

def _iter(self) -> Iterable[Either[TableAndTests]]:
table: Table = self._get_table_entity()

if table:
source_type = table.serviceType.value.lower()
if source_type.startswith(CUSTOM_CONNECTOR_PREFIX):
logger.warning(
"Data quality tests might not work as expected with custom sources"
)
else:
import_source_class(
service_type=ServiceType.Database, source_type=source_type
)
yield from self._process_table_suite(table)

else:
Expand Down
2 changes: 2 additions & 0 deletions ingestion/src/metadata/utils/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -106,3 +106,5 @@
ENTITY_REFERENCE_TYPE_MAP = {
value.__name__: key for key, value in ENTITY_REFERENCE_CLASS_MAP.items()
}

CUSTOM_CONNECTOR_PREFIX = "custom"
44 changes: 38 additions & 6 deletions ingestion/src/metadata/utils/importer.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@
from metadata.generated.schema.metadataIngestion.workflow import Sink as WorkflowSink
from metadata.ingestion.api.steps import BulkSink, Processor, Sink, Source, Stage
from metadata.utils.class_helper import get_service_type_from_source_type
from metadata.utils.client_version import get_client_version
from metadata.utils.constants import CUSTOM_CONNECTOR_PREFIX
from metadata.utils.logger import utils_logger
from metadata.utils.singleton import Singleton

Expand All @@ -43,6 +45,38 @@ class DynamicImportException(Exception):
Raise it when having issues dynamically importing objects
"""

def __init__(self, module: str, key: str = None, cause: Exception = None):
self.module = module
self.key = key
self.cause = cause

def __str__(self):
import_path = self.module
if self.key:
import_path += f".{self.key}"
return f"Cannot import {import_path} due to {self.cause}"


class MissingPluginException(Exception):
"""
An excpetion that captures a missing openmetadata-ingestion plugin for a specific connector.
"""

def __init__(self, plugin: str):
self.plugin = plugin

def __str__(self):
try:
version = "==" + get_client_version()
except Exception:
logger.warning("unable to get client version")
logger.debug(traceback.format_exc())
version = ""
return (
f"You might be missing the plugin [{self.plugin}]. Try:\n"
f'pip install "openmetadata-ingestion[{self.plugin}]{version}"'
)


def get_module_dir(type_: str) -> str:
"""
Expand Down Expand Up @@ -93,13 +127,13 @@ def import_from_module(key: str) -> Type[Any]:
Dynamically import an object from a module path
"""

module_name, obj_name = key.rsplit(MODULE_SEPARATOR, 1)
try:
module_name, obj_name = key.rsplit(MODULE_SEPARATOR, 1)
obj = getattr(importlib.import_module(module_name), obj_name)
return obj
except Exception as err:
logger.debug(traceback.format_exc())
raise DynamicImportException(f"Cannot load object from {key} due to {err}")
raise DynamicImportException(module=module_name, key=obj_name, cause=err)


# module building strings read better with .format instead of f-strings
Expand Down Expand Up @@ -200,7 +234,7 @@ def import_connection_fn(connection: BaseModel, function_name: str) -> Callable:
# module building strings read better with .format instead of f-strings
# pylint: disable=consider-using-f-string

if connection.type.value.lower().startswith("custom"):
if connection.type.value.lower().startswith(CUSTOM_CONNECTOR_PREFIX):
python_class_parts = connection.sourcePythonClass.rsplit(".", 1)
python_module_path = ".".join(python_class_parts[:-1])

Expand Down Expand Up @@ -261,9 +295,7 @@ def import_side_effects(self, *modules):
SideEffectsLoader.modules.add(module.__name__)
except Exception as err:
logger.debug(traceback.format_exc())
raise DynamicImportException(
f"Cannot load object from {module} due to {err}"
)
raise DynamicImportException(module=module, cause=err)
else:
logger.debug(f"Module {module} already imported")

Expand Down
10 changes: 7 additions & 3 deletions ingestion/src/metadata/utils/logger.py
Original file line number Diff line number Diff line change
Expand Up @@ -170,10 +170,14 @@ def set_loggers_level(level: Union[int, str] = logging.INFO):


def log_ansi_encoded_string(
color: Optional[ANSI] = None, bold: bool = False, message: str = ""
color: Optional[ANSI] = None,
bold: bool = False,
message: str = "",
level=logging.INFO,
):
utils_logger().info(
f"{ANSI.BOLD.value if bold else ''}{color.value if color else ''}{message}{ANSI.ENDC.value}"
utils_logger().log(
level=level,
msg=f"{ANSI.BOLD.value if bold else ''}{color.value if color else ''}{message}{ANSI.ENDC.value}",
)


Expand Down
28 changes: 27 additions & 1 deletion ingestion/src/metadata/workflow/ingestion.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
"""
import traceback
from abc import ABC, abstractmethod
from typing import List, Tuple, cast
from typing import List, Tuple, Type, cast

from metadata.config.common import WorkflowExecutionError
from metadata.generated.schema.entity.services.connections.metadata.openMetadataConnection import (
Expand All @@ -46,6 +46,13 @@
get_service_class_from_service_type,
get_service_type_from_source_type,
)
from metadata.utils.constants import CUSTOM_CONNECTOR_PREFIX
from metadata.utils.importer import (
DynamicImportException,
MissingPluginException,
import_from_module,
import_source_class,
)
from metadata.utils.logger import ingestion_logger
from metadata.workflow.base import BaseWorkflow, InvalidWorkflowJSONException
from metadata.workflow.workflow_status_mixin import SUCCESS_THRESHOLD_VALUE
Expand Down Expand Up @@ -218,3 +225,22 @@ def validate(self):
raise WorkflowExecutionError(
f"Profiler is not supported for the service connection: {self.config.source.serviceConnection}"
)

def import_source_class(self) -> Type[Source]:
source_type = self.config.source.type.lower()
try:
return (
import_from_module(
self.config.source.serviceConnection.root.config.sourcePythonClass
)
if source_type.startswith(CUSTOM_CONNECTOR_PREFIX)
else import_source_class(
service_type=self.service_type, source_type=source_type
)
)
except DynamicImportException as e:
if source_type.startswith(CUSTOM_CONNECTOR_PREFIX):
raise e
logger.debug(traceback.format_exc())
logger.error(f"Failed to import source of type '{source_type}'")
raise MissingPluginException(source_type)
16 changes: 2 additions & 14 deletions ingestion/src/metadata/workflow/metadata.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,7 @@

from metadata.config.common import WorkflowExecutionError
from metadata.ingestion.api.steps import Sink, Source
from metadata.utils.importer import (
import_from_module,
import_sink_class,
import_source_class,
)
from metadata.utils.importer import import_sink_class
from metadata.utils.logger import ingestion_logger
from metadata.workflow.ingestion import IngestionWorkflow

Expand Down Expand Up @@ -47,15 +43,7 @@ def _get_source(self) -> Source:
"configuration here: https://docs.open-metadata.org/connectors"
)

source_class = (
import_from_module(
self.config.source.serviceConnection.root.config.sourcePythonClass
)
if source_type.startswith("custom")
else import_source_class(
service_type=self.service_type, source_type=source_type
)
)
source_class = self.import_source_class()

pipeline_name = (
self.ingestion_pipeline.fullyQualifiedName.root
Expand Down
1 change: 1 addition & 0 deletions ingestion/src/metadata/workflow/profiler.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ def __init__(self, config: OpenMetadataWorkflowConfig):

def _get_source_class(self):
if self.config.source.serviceName:
self.import_source_class()
return OpenMetadataSource
logger.info(
"Database Service name not provided, we will scan all the tables "
Expand Down
13 changes: 1 addition & 12 deletions ingestion/src/metadata/workflow/usage.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,7 @@
from metadata.ingestion.api.steps import BulkSink, Processor, Source, Stage
from metadata.utils.importer import (
import_bulk_sink_type,
import_from_module,
import_processor_class,
import_source_class,
import_stage_class,
)
from metadata.utils.logger import ingestion_logger
Expand Down Expand Up @@ -51,16 +49,7 @@ def _get_source(self) -> Source:
"configuration here: https://docs.open-metadata.org/connectors"
)

source_class = (
import_from_module(
self.config.source.serviceConnection.root.config.sourcePythonClass
)
if source_type.startswith("custom")
else import_source_class(
service_type=self.service_type, source_type=source_type
)
)

source_class = self.import_source_class()
source: Source = source_class.create(
self.config.source.model_dump(), self.metadata
)
Expand Down
18 changes: 15 additions & 3 deletions ingestion/src/metadata/workflow/workflow_init_error_handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
"""
Module handles the init error messages from different workflows
"""
import logging
import traceback
from pathlib import Path
from typing import Any, Dict, Optional, Type, Union
Expand All @@ -25,7 +26,7 @@
ParsingConfigurationError,
)
from metadata.utils.constants import UTF_8
from metadata.utils.logger import ANSI, log_ansi_encoded_string
from metadata.utils.logger import ANSI, log_ansi_encoded_string, utils_logger

EXAMPLES_WORKFLOW_PATH: Path = Path(__file__).parent / "../examples" / "workflows"

Expand Down Expand Up @@ -74,10 +75,10 @@ def print_init_error(
source_type_name, pipeline_type
)
else:
utils_logger().debug(traceback.format_exc())
WorkflowInitErrorHandler._print_error_msg(
f"\nError initializing {pipeline_type.name}: {exc}"
)
WorkflowInitErrorHandler._print_error_msg(traceback.format_exc())

WorkflowInitErrorHandler._print_more_info(pipeline_type)

Expand Down Expand Up @@ -151,4 +152,15 @@ def _print_error_msg(msg: str) -> None:
"""
Print message with error style
"""
log_ansi_encoded_string(color=ANSI.BRIGHT_RED, bold=False, message=f"{msg}")
log_ansi_encoded_string(
color=ANSI.BRIGHT_RED, bold=False, message=f"{msg}", level=logging.ERROR
)

@staticmethod
def _print_debug_msg(msg: str) -> None:
"""
Print message with error style
"""
log_ansi_encoded_string(
color=ANSI.YELLOW, bold=False, message=f"{msg}", level=logging.DEBUG
)
4 changes: 4 additions & 0 deletions ingestion/tests/unit/data_quality/source/test_test_suite.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,9 @@

from metadata.data_quality.source.test_suite import TestSuiteSource
from metadata.generated.schema.entity.data.table import Table
from metadata.generated.schema.entity.services.databaseService import (
DatabaseServiceType,
)
from metadata.generated.schema.metadataIngestion.workflow import (
OpenMetadataWorkflowConfig,
)
Expand Down Expand Up @@ -76,6 +79,7 @@ def test_source_config(parameters, expected, monkeypatch):
name="test_table",
columns=[],
testSuite=MOCK_ENTITY_REFERENCE,
serviceType=DatabaseServiceType.Postgres,
)
mock_metadata.list_all_entities.return_value = [
TestCase(
Expand Down

0 comments on commit 6cd5a43

Please sign in to comment.