diff --git a/ci/conda/recipes/morpheus/meta.yaml b/ci/conda/recipes/morpheus/meta.yaml index 5e94706072..5bd1da8b98 100644 --- a/ci/conda/recipes/morpheus/meta.yaml +++ b/ci/conda/recipes/morpheus/meta.yaml @@ -92,12 +92,15 @@ outputs: - python-confluent-kafka 1.9.2 - pytorch 2.0.1 - pytorch-cuda + - requests=2.31 + - requests-cache=1.1 - scikit-learn 1.2.2.* - sqlalchemy <=2.0 # 2.0 is incompatible with pandas=1.3 - tqdm 4.* - tritonclient 2.26.* - typing_utils 0.1.* - watchdog 2.1.* + - websockets run_constrained: # Since we dont explicitly require this but other packages might, constrain the versions. - {{ pin_compatible('cudatoolkit', min_pin='x.x', max_pin='x') }} diff --git a/morpheus/controllers/elasticsearch_controller.py b/morpheus/controllers/elasticsearch_controller.py index 1c4bca4dfa..058ad5ee7c 100644 --- a/morpheus/controllers/elasticsearch_controller.py +++ b/morpheus/controllers/elasticsearch_controller.py @@ -16,12 +16,21 @@ import time import pandas as pd -from elasticsearch import ConnectionError as ESConnectionError -from elasticsearch import Elasticsearch -from elasticsearch.helpers import parallel_bulk + +from morpheus.utils.verify_dependencies import _verify_deps logger = logging.getLogger(__name__) +REQUIRED_DEPS = ('ESConnectionError', 'Elasticsearch', 'parallel_bulk') +IMPORT_ERROR_MESSAGE = "ElasticsearchController requires the elasticsearch package to be installed." + +try: + from elasticsearch import ConnectionError as ESConnectionError + from elasticsearch import Elasticsearch + from elasticsearch.helpers import parallel_bulk +except ImportError: + pass + class ElasticsearchController: """ @@ -38,7 +47,7 @@ class ElasticsearchController: """ def __init__(self, connection_kwargs: dict, raise_on_exception: bool = False, refresh_period_secs: int = 2400): - + _verify_deps(REQUIRED_DEPS, IMPORT_ERROR_MESSAGE, globals()) self._client = None self._last_refresh_time = None self._raise_on_exception = raise_on_exception diff --git a/morpheus/controllers/rss_controller.py b/morpheus/controllers/rss_controller.py index a1b64a926f..3774bac9d4 100644 --- a/morpheus/controllers/rss_controller.py +++ b/morpheus/controllers/rss_controller.py @@ -19,14 +19,23 @@ from dataclasses import dataclass from urllib.parse import urlparse -import feedparser import pandas as pd import requests import requests_cache -from bs4 import BeautifulSoup + +from morpheus.utils.verify_dependencies import _verify_deps logger = logging.getLogger(__name__) +REQUIRED_DEPS = ('BeautifulSoup', 'feedparser') +IMPORT_ERROR_MESSAGE = "RSSController requires the bs4 and feedparser packages to be installed" + +try: + import feedparser + from bs4 import BeautifulSoup +except ImportError: + pass + @dataclass class FeedStats: @@ -72,7 +81,7 @@ def __init__(self, cache_dir: str = "./.cache/http", cooldown_interval: int = 600, request_timeout: float = 2.0): - + _verify_deps(REQUIRED_DEPS, IMPORT_ERROR_MESSAGE, globals()) if (isinstance(feed_input, str)): feed_input = [feed_input] @@ -151,7 +160,7 @@ def _read_file_content(self, file_path: str) -> str: with open(file_path, 'r', encoding="utf-8") as file: return file.read() - def _try_parse_feed_with_beautiful_soup(self, feed_input: str, is_url: bool) -> feedparser.FeedParserDict: + def _try_parse_feed_with_beautiful_soup(self, feed_input: str, is_url: bool) -> "feedparser.FeedParserDict": feed_input = self._get_response_text(feed_input) if is_url else self._read_file_content(feed_input) @@ -191,7 +200,7 @@ def _try_parse_feed_with_beautiful_soup(self, feed_input: str, is_url: bool) -> return feed - def _try_parse_feed(self, url: str) -> feedparser.FeedParserDict: + def _try_parse_feed(self, url: str) -> "feedparser.FeedParserDict": is_url = RSSController.is_url(url) fallback = False diff --git a/morpheus/llm/services/nemo_llm_service.py b/morpheus/llm/services/nemo_llm_service.py index c08a41717d..b9d5dc7a9f 100644 --- a/morpheus/llm/services/nemo_llm_service.py +++ b/morpheus/llm/services/nemo_llm_service.py @@ -19,6 +19,7 @@ from morpheus.llm.services.llm_service import LLMClient from morpheus.llm.services.llm_service import LLMService +from morpheus.utils.verify_dependencies import _verify_deps logger = logging.getLogger(__name__) @@ -29,15 +30,7 @@ try: import nemollm except ImportError: - logger.error(IMPORT_ERROR_MESSAGE) - - -def _verify_nemo_llm(): - """ - When NemoLLM is not installed, raise an ImportError with a helpful message, rather than an attribute error. - """ - if 'nemollm' not in globals(): - raise ImportError(IMPORT_ERROR_MESSAGE) + pass class NeMoLLMClient(LLMClient): @@ -58,7 +51,7 @@ class NeMoLLMClient(LLMClient): def __init__(self, parent: "NeMoLLMService", model_name: str, **model_kwargs: dict[str, typing.Any]) -> None: super().__init__() - _verify_nemo_llm() + _verify_deps(('nemollm', ), IMPORT_ERROR_MESSAGE, globals()) self._parent = parent self._model_name = model_name @@ -154,7 +147,7 @@ class NeMoLLMService(LLMService): def __init__(self, *, api_key: str = None, org_id: str = None) -> None: super().__init__() - _verify_nemo_llm() + _verify_deps(('nemollm', ), IMPORT_ERROR_MESSAGE, globals()) api_key = api_key if api_key is not None else os.environ.get("NGC_API_KEY", None) org_id = org_id if org_id is not None else os.environ.get("NGC_ORG_ID", None) diff --git a/morpheus/llm/services/openai_chat_service.py b/morpheus/llm/services/openai_chat_service.py index dda00d032d..d601c97b20 100644 --- a/morpheus/llm/services/openai_chat_service.py +++ b/morpheus/llm/services/openai_chat_service.py @@ -19,6 +19,7 @@ from morpheus.llm.services.llm_service import LLMClient from morpheus.llm.services.llm_service import LLMService +from morpheus.utils.verify_dependencies import _verify_deps logger = logging.getLogger(__name__) @@ -30,12 +31,7 @@ try: import openai except ImportError: - logger.error(IMPORT_ERROR_MESSAGE) - - -def _verify_openai(): - if 'openai' not in globals(): - raise ImportError(IMPORT_ERROR_MESSAGE) + pass class OpenAIChatClient(LLMClient): @@ -57,7 +53,7 @@ class OpenAIChatClient(LLMClient): def __init__(self, model_name: str, set_assistant: bool = False, **model_kwargs: dict[str, typing.Any]) -> None: super().__init__() - _verify_openai() + _verify_deps(('openai', ), IMPORT_ERROR_MESSAGE, globals()) self._model_name = model_name self._set_assistant = set_assistant @@ -191,7 +187,7 @@ class OpenAIChatService(LLMService): def __init__(self) -> None: super().__init__() - _verify_openai() + _verify_deps(('openai', ), IMPORT_ERROR_MESSAGE, globals()) def get_client(self, model_name: str, diff --git a/morpheus/service/vdb/milvus_client.py b/morpheus/service/vdb/milvus_client.py index ff2956a93c..7f2436eb46 100644 --- a/morpheus/service/vdb/milvus_client.py +++ b/morpheus/service/vdb/milvus_client.py @@ -15,26 +15,9 @@ import typing from pymilvus import Collection -from pymilvus import DataType from pymilvus import MilvusClient as PyMilvusClient from pymilvus.orm.mutation import MutationResult -# Milvus data type mapping dictionary -MILVUS_DATA_TYPE_MAP = { - "int8": DataType.INT8, - "int16": DataType.INT16, - "int32": DataType.INT32, - "int64": DataType.INT64, - "bool": DataType.BOOL, - "float": DataType.FLOAT, - "double": DataType.DOUBLE, - "binary_vector": DataType.BINARY_VECTOR, - "float_vector": DataType.FLOAT_VECTOR, - "string": DataType.STRING, - "varchar": DataType.VARCHAR, - "json": DataType.JSON, -} - def handle_exceptions(func_name: str, error_message: str) -> typing.Callable: """ diff --git a/morpheus/service/vdb/milvus_vector_db_service.py b/morpheus/service/vdb/milvus_vector_db_service.py index 614208d0ed..ef1b741488 100644 --- a/morpheus/service/vdb/milvus_vector_db_service.py +++ b/morpheus/service/vdb/milvus_vector_db_service.py @@ -21,17 +21,26 @@ from functools import wraps import pandas as pd -import pymilvus -from pymilvus.orm.mutation import MutationResult import cudf -from morpheus.service.vdb.milvus_client import MilvusClient from morpheus.service.vdb.vector_db_service import VectorDBResourceService from morpheus.service.vdb.vector_db_service import VectorDBService +from morpheus.utils.verify_dependencies import _verify_deps logger = logging.getLogger(__name__) +REQUIRED_DEPS = ('pymilvus', 'MilvusClient', 'MutationResult') +IMPORT_ERROR_MESSAGE = "MilvusVectorDBResourceService requires the milvus and pymilvus packages to be installed." + +try: + import pymilvus + from pymilvus.orm.mutation import MutationResult + + from morpheus.service.vdb.milvus_client import MilvusClient # pylint: disable=ungrouped-imports +except ImportError: + pass + class FieldSchemaEncoder(json.JSONEncoder): @@ -75,7 +84,7 @@ def object_hook(obj: dict) -> dict: return obj @staticmethod - def dump(field: pymilvus.FieldSchema, f: typing.IO) -> str: + def dump(field: "pymilvus.FieldSchema", f: typing.IO) -> str: """ Serialize a FieldSchema object to a JSON file. @@ -94,7 +103,7 @@ def dump(field: pymilvus.FieldSchema, f: typing.IO) -> str: return json.dump(field, f, cls=FieldSchemaEncoder) @staticmethod - def dumps(field: pymilvus.FieldSchema) -> str: + def dumps(field: "pymilvus.FieldSchema") -> str: """ Serialize a FieldSchema object to a JSON-compatible string format. @@ -112,7 +121,7 @@ def dumps(field: pymilvus.FieldSchema) -> str: return json.dumps(field, cls=FieldSchemaEncoder) @staticmethod - def load(f_obj: typing.IO) -> pymilvus.FieldSchema: + def load(f_obj: typing.IO) -> "pymilvus.FieldSchema": """ Deserialize a JSON file to a FieldSchema object. @@ -129,7 +138,7 @@ def load(f_obj: typing.IO) -> pymilvus.FieldSchema: return pymilvus.FieldSchema.construct_from_dict(json.load(f_obj, object_hook=FieldSchemaEncoder.object_hook)) @staticmethod - def loads(field: str) -> pymilvus.FieldSchema: + def loads(field: str) -> "pymilvus.FieldSchema": """ Deserialize a JSON-compatible string to a FieldSchema object. @@ -147,7 +156,7 @@ def loads(field: str) -> pymilvus.FieldSchema: return pymilvus.FieldSchema.construct_from_dict(json.loads(field, object_hook=FieldSchemaEncoder.object_hook)) @staticmethod - def from_dict(field: dict) -> pymilvus.FieldSchema: + def from_dict(field: dict) -> "pymilvus.FieldSchema": """ Convert a dictionary to a FieldSchema object. @@ -216,7 +225,8 @@ class MilvusVectorDBResourceService(VectorDBResourceService): An instance of the MilvusClient for interaction with the Milvus Vector Database. """ - def __init__(self, name: str, client: MilvusClient) -> None: + def __init__(self, name: str, client: "MilvusClient") -> None: + _verify_deps(REQUIRED_DEPS, IMPORT_ERROR_MESSAGE, globals()) super().__init__() self._name = name @@ -525,7 +535,7 @@ def drop(self, **kwargs: dict[str, typing.Any]) -> None: self._collection.drop(**kwargs) - def _insert_result_to_dict(self, result: MutationResult) -> dict[str, typing.Any]: + def _insert_result_to_dict(self, result: "MutationResult") -> dict[str, typing.Any]: result_dict = { "primary_keys": result.primary_keys, "insert_count": result.insert_count, @@ -539,7 +549,7 @@ def _insert_result_to_dict(self, result: MutationResult) -> dict[str, typing.Any } return result_dict - def _update_delete_result_to_dict(self, result: MutationResult) -> dict[str, typing.Any]: + def _update_delete_result_to_dict(self, result: "MutationResult") -> dict[str, typing.Any]: result_dict = { "insert_count": result.insert_count, "delete_count": result.delete_count, @@ -613,7 +623,7 @@ def list_store_objects(self, **kwargs: dict[str, typing.Any]) -> list[str]: """ return self._client.list_collections(**kwargs) - def _create_schema_field(self, field_conf: dict) -> pymilvus.FieldSchema: + def _create_schema_field(self, field_conf: dict) -> "pymilvus.FieldSchema": field_schema = pymilvus.FieldSchema.construct_from_dict(field_conf) diff --git a/morpheus/stages/input/databricks_deltalake_source_stage.py b/morpheus/stages/input/databricks_deltalake_source_stage.py index ff267c9c8d..f85639b770 100644 --- a/morpheus/stages/input/databricks_deltalake_source_stage.py +++ b/morpheus/stages/input/databricks_deltalake_source_stage.py @@ -15,9 +15,6 @@ import logging import mrc -from databricks.connect import DatabricksSession -from pyspark.sql import functions as sf -from pyspark.sql.window import Window import cudf @@ -27,9 +24,20 @@ from morpheus.pipeline.preallocator_mixin import PreallocatorMixin from morpheus.pipeline.single_output_source import SingleOutputSource from morpheus.pipeline.stage_schema import StageSchema +from morpheus.utils.verify_dependencies import _verify_deps logger = logging.getLogger(__name__) +REQUIRED_DEPS = ('DatabricksSession', 'sf', 'Window') +IMPORT_ERROR_MESSAGE = "DatabricksDeltaLakeSourceStage requires the databricks-connect package to be installed." + +try: + from databricks.connect import DatabricksSession + from pyspark.sql import functions as sf + from pyspark.sql.window import Window +except ImportError: + pass + @register_stage("from-databricks-deltalake") class DataBricksDeltaLakeSourceStage(PreallocatorMixin, SingleOutputSource): @@ -59,7 +67,7 @@ def __init__(self, databricks_host: str = None, databricks_token: str = None, databricks_cluster_id: str = None): - + _verify_deps(REQUIRED_DEPS, IMPORT_ERROR_MESSAGE, globals()) super().__init__(config) self.spark_query = spark_query self.spark = DatabricksSession.builder.remote(host=databricks_host, diff --git a/morpheus/stages/output/write_to_databricks_deltalake_stage.py b/morpheus/stages/output/write_to_databricks_deltalake_stage.py index 05f7c4de77..c8ef52443b 100644 --- a/morpheus/stages/output/write_to_databricks_deltalake_stage.py +++ b/morpheus/stages/output/write_to_databricks_deltalake_stage.py @@ -17,17 +17,7 @@ import mrc import pandas as pd -from databricks.connect import DatabricksSession from mrc.core import operators as ops -from pyspark.sql.types import BooleanType -from pyspark.sql.types import DoubleType -from pyspark.sql.types import FloatType -from pyspark.sql.types import IntegerType -from pyspark.sql.types import LongType -from pyspark.sql.types import StringType -from pyspark.sql.types import StructField -from pyspark.sql.types import StructType -from pyspark.sql.types import TimestampType import cudf @@ -36,9 +26,19 @@ from morpheus.messages import MessageMeta from morpheus.pipeline.pass_thru_type_mixin import PassThruTypeMixin from morpheus.pipeline.single_port_stage import SinglePortStage +from morpheus.utils.verify_dependencies import _verify_deps logger = logging.getLogger(__name__) +REQUIRED_DEPS = ('DatabricksSession', 'sql_types') +IMPORT_ERROR_MESSAGE = "DataBricksDeltaLakeSinkStage requires the databricks-connect package to be installed." + +try: + from databricks.connect import DatabricksSession + from pyspark.sql import types as sql_types +except ImportError: + pass + @register_stage("to-databricks-deltalake") class DataBricksDeltaLakeSinkStage(PassThruTypeMixin, SinglePortStage): @@ -68,7 +68,7 @@ def __init__(self, databricks_token: str = None, databricks_cluster_id: str = None, delta_table_write_mode: str = "append"): - + _verify_deps(REQUIRED_DEPS, IMPORT_ERROR_MESSAGE, globals()) super().__init__(config) self.delta_path = delta_path self.delta_table_write_mode = delta_table_write_mode @@ -113,7 +113,7 @@ def write_to_deltalake(meta: MessageMeta): return node @staticmethod - def _extract_schema_from_pandas_dataframe(df: pd.DataFrame) -> StructType: + def _extract_schema_from_pandas_dataframe(df: pd.DataFrame) -> "sql_types.StructType": """ Extract approximate schemas from pandas dataframe """ @@ -121,21 +121,21 @@ def _extract_schema_from_pandas_dataframe(df: pd.DataFrame) -> StructType: for col, dtype in df.dtypes.items(): try: if dtype == "bool": - spark_dtype = StructField(col, BooleanType()) + spark_dtype = sql_types.StructField(col, sql_types.BooleanType()) elif dtype == "int64": - spark_dtype = StructField(col, LongType()) + spark_dtype = sql_types.StructField(col, sql_types.LongType()) elif dtype == "int32": - spark_dtype = StructField(col, IntegerType()) + spark_dtype = sql_types.StructField(col, sql_types.IntegerType()) elif dtype == "float64": - spark_dtype = StructField(col, DoubleType()) + spark_dtype = sql_types.StructField(col, sql_types.DoubleType()) elif dtype == "float32": - spark_dtype = StructField(col, FloatType()) + spark_dtype = sql_types.StructField(col, sql_types.FloatType()) elif dtype == "datetime64[ns]": - spark_dtype = StructField(col, TimestampType()) + spark_dtype = sql_types.StructField(col, sql_types.TimestampType()) else: - spark_dtype = StructField(col, StringType()) + spark_dtype = sql_types.StructField(col, sql_types.StringType()) except Exception as e: logger.error("Encountered error %s while converting columns %s with data type %s", e, col, dtype) - spark_dtype = StructField(col, StringType()) + spark_dtype = sql_types.StructField(col, sql_types.StringType()) spark_schema.append(spark_dtype) - return StructType(spark_schema) + return sql_types.StructType(spark_schema) diff --git a/morpheus/utils/verify_dependencies.py b/morpheus/utils/verify_dependencies.py new file mode 100644 index 0000000000..c5db25e2fa --- /dev/null +++ b/morpheus/utils/verify_dependencies.py @@ -0,0 +1,26 @@ +# Copyright (c) 2023, NVIDIA CORPORATION. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +def _verify_deps(deps: tuple[str], error_message: str, namespace: dict): + """ + There are some dependencies that are only used by a specific stage, and are not installed by default. + These packages are imported in a guarded try-except block. It is only when the stage is used that these + dependencies need to be enforced. + + raise ImportError if any of the dependencies are not installed. + """ + for dep in deps: + if dep not in namespace: + raise ImportError(error_message) diff --git a/tests/test_milvus_vector_db_service.py b/tests/test_milvus_vector_db_service.py index 091d99faca..a8c57fec87 100644 --- a/tests/test_milvus_vector_db_service.py +++ b/tests/test_milvus_vector_db_service.py @@ -20,13 +20,29 @@ import numpy as np import pymilvus import pytest +from pymilvus import DataType import cudf -from morpheus.service.vdb.milvus_client import MILVUS_DATA_TYPE_MAP from morpheus.service.vdb.milvus_vector_db_service import FieldSchemaEncoder from morpheus.service.vdb.milvus_vector_db_service import MilvusVectorDBService +# Milvus data type mapping dictionary +MILVUS_DATA_TYPE_MAP = { + "int8": DataType.INT8, + "int16": DataType.INT16, + "int32": DataType.INT32, + "int64": DataType.INT64, + "bool": DataType.BOOL, + "float": DataType.FLOAT, + "double": DataType.DOUBLE, + "binary_vector": DataType.BINARY_VECTOR, + "float_vector": DataType.FLOAT_VECTOR, + "string": DataType.STRING, + "varchar": DataType.VARCHAR, + "json": DataType.JSON, +} + @pytest.fixture(scope="module", name="milvus_service") def milvus_service_fixture(milvus_server_uri: str):