From d732c0bf20aed717b09720a17a74dfbe3b1fd6ed Mon Sep 17 00:00:00 2001 From: Devin Robison Date: Wed, 24 Jan 2024 13:19:49 -0700 Subject: [PATCH] Unit test fixes, formatting updates --- examples/llm/common/vdb_resource_tagging_module.py | 3 +-- examples/llm/common/web_scraper_module.py | 3 +-- .../schemas/examples/llm/content_extractor_schema.py | 4 ++-- morpheus/pipeline/stage_base.py | 2 +- morpheus/stages/input/file_source_stage.py | 11 +++++++---- morpheus/stages/input/in_memory_source_stage.py | 7 +++---- tests/examples/llm/common/test_web_scraper_module.py | 4 ++-- tests/pipeline/test_pipeline.py | 2 +- tests/pipeline/test_preallocation_pipe.py | 6 +++--- tests/test_add_classifications_stage_pipe.py | 4 ++-- tests/test_deserialize_stage_pipe.py | 4 ++-- tests/test_filter_detections_stage_pipe.py | 4 ++-- tests/test_validation_stage_pipe.py | 2 +- tests/test_write_to_databricks_deltalake_stage.py | 2 +- 14 files changed, 29 insertions(+), 29 deletions(-) diff --git a/examples/llm/common/vdb_resource_tagging_module.py b/examples/llm/common/vdb_resource_tagging_module.py index 51ab5190f0..3dd123dd64 100644 --- a/examples/llm/common/vdb_resource_tagging_module.py +++ b/examples/llm/common/vdb_resource_tagging_module.py @@ -18,11 +18,10 @@ from pydantic import ValidationError from morpheus.messages import ControlMessage +from morpheus.modules.schemas.examples.llm.vdb_resource_tagging_schema import VDBResourceTaggingSchema from morpheus.utils.module_utils import ModuleLoaderFactory from morpheus.utils.module_utils import register_module -from .vdb_resource_tagging_schema import VDBResourceTaggingSchema - logger = logging.getLogger(__name__) VDBResourceTaggingLoaderFactory = ModuleLoaderFactory("vdb_resource_tagging", diff --git a/examples/llm/common/web_scraper_module.py b/examples/llm/common/web_scraper_module.py index c371957f43..5495b9cd12 100644 --- a/examples/llm/common/web_scraper_module.py +++ b/examples/llm/common/web_scraper_module.py @@ -33,11 +33,10 @@ import cudf from morpheus.messages import MessageMeta +from morpheus.modules.schemas.examples.llm.web_scraper_schema import WebScraperSchema from morpheus.utils.module_utils import ModuleLoaderFactory from morpheus.utils.module_utils import register_module -from .web_scraper_schema import WebScraperSchema - logger = logging.getLogger(__name__) WebScraperLoaderFactory = ModuleLoaderFactory("web_scraper", "morpheus_examples_llm", WebScraperSchema) diff --git a/morpheus/modules/schemas/examples/llm/content_extractor_schema.py b/morpheus/modules/schemas/examples/llm/content_extractor_schema.py index d549a958f9..b2c2686d4a 100644 --- a/morpheus/modules/schemas/examples/llm/content_extractor_schema.py +++ b/morpheus/modules/schemas/examples/llm/content_extractor_schema.py @@ -23,7 +23,7 @@ logger = logging.getLogger(__name__) -class CSVConverterParamContract(BaseModel): +class CSVConverterSchema(BaseModel): chunk_overlap: int = 102 # Example default value chunk_size: int = 1024 text_column_names: List[str] @@ -44,7 +44,7 @@ def validate_converters_meta(cls, v): validated_meta = {} for key, value in v.items(): if key.lower() == 'csv': - validated_meta[key] = CSVConverterParamContract(**value) + validated_meta[key] = CSVConverterSchema(**value) else: validated_meta[key] = value return validated_meta diff --git a/morpheus/pipeline/stage_base.py b/morpheus/pipeline/stage_base.py index b0bbfe7daa..4cb1fbc134 100644 --- a/morpheus/pipeline/stage_base.py +++ b/morpheus/pipeline/stage_base.py @@ -279,7 +279,7 @@ def supports_cpp_node(self): def _build_cpp_node(self): """ - Specifies whether or not to build a C++ node. Only should be called during the build phase. + Specifies whether to build a C++ node. Only should be called during the build phase. """ return CppConfig.get_should_use_cpp() and self.supports_cpp_node() diff --git a/morpheus/stages/input/file_source_stage.py b/morpheus/stages/input/file_source_stage.py index d05c8c2190..48fa217517 100644 --- a/morpheus/stages/input/file_source_stage.py +++ b/morpheus/stages/input/file_source_stage.py @@ -19,6 +19,7 @@ import mrc +from morpheus._lib.messages import MessageMeta as CppMessageMeta from morpheus.cli import register_stage from morpheus.common import FileTypes from morpheus.config import Config @@ -55,7 +56,7 @@ class FileSourceStage(PreallocatorMixin, SingleOutputSource): repeat : int, default = 1, min = 1 Repeats the input dataset multiple times. Useful to extend small datasets for debugging. filter_null : bool, default = True - Whether or not to filter rows with null 'data' column. Null values in the 'data' column can cause issues down + Whether to filter rows with null 'data' column. Null values in the 'data' column can cause issues down the line with processing. Setting this to True is recommended. parser_kwargs : dict, default = {} Extra options to pass to the file parser. @@ -98,7 +99,7 @@ def input_count(self) -> int: return self._input_count def supports_cpp_node(self) -> bool: - """Indicates whether or not this stage supports a C++ node""" + """Indicates whether this stage supports a C++ node""" return True def compute_schema(self, schema: StageSchema): @@ -129,8 +130,10 @@ def _generate_frames(self) -> typing.Iterable[MessageMeta]: ) for i in range(self._repeat_count): - - x = MessageMeta(df) + if (self._build_cpp_node): + x = CppMessageMeta(df) + else: + x = MessageMeta(df) # If we are looping, copy the object. Do this before we push the object in case it changes if (i + 1 < self._repeat_count): diff --git a/morpheus/stages/input/in_memory_source_stage.py b/morpheus/stages/input/in_memory_source_stage.py index 36728a4da0..a9b4787b15 100644 --- a/morpheus/stages/input/in_memory_source_stage.py +++ b/morpheus/stages/input/in_memory_source_stage.py @@ -40,19 +40,18 @@ class InMemorySourceStage(PreallocatorMixin, SingleOutputSource): Repeats the input dataset multiple times. Useful to extend small datasets for debugging. """ - def __init__(self, c: Config, dataframes: typing.List[cudf.DataFrame], repeat: int = 1, use_cpp_message_meta=False): + def __init__(self, c: Config, dataframes: typing.List[cudf.DataFrame], repeat: int = 1): super().__init__(c) self._dataframes = dataframes self._repeat_count = repeat - self._use_cpp_message_meta = use_cpp_message_meta @property def name(self) -> str: return "from-mem" def supports_cpp_node(self) -> bool: - return False + return True def compute_schema(self, schema: StageSchema): schema.output_schema.set_type(MessageMeta) @@ -60,7 +59,7 @@ def compute_schema(self, schema: StageSchema): def _generate_frames(self) -> typing.Iterator[MessageMeta]: for i in range(self._repeat_count): for k, df in enumerate(self._dataframes): - if (self._use_cpp_message_meta): + if (self._build_cpp_node): x = MessageMetaCpp(df) else: x = MessageMeta(df) diff --git a/tests/examples/llm/common/test_web_scraper_module.py b/tests/examples/llm/common/test_web_scraper_module.py index 8cfc0fe2dd..4d21bfe3eb 100644 --- a/tests/examples/llm/common/test_web_scraper_module.py +++ b/tests/examples/llm/common/test_web_scraper_module.py @@ -39,7 +39,7 @@ def test_web_scraper_module(config: Config, mock_rest_server: str, import_mod: t df = cudf.DataFrame({"link": [url]}) df_expected = cudf.DataFrame({"link": [url], "page_content": "website title some paragraph"}) - web_scraper_definition = import_mod.WebScraperInterface.get_instance( + web_scraper_loader = import_mod.WebScraperLoaderFactory.get_instance( "web_scraper", module_config={ "web_scraper_config": { @@ -55,7 +55,7 @@ def test_web_scraper_module(config: Config, mock_rest_server: str, import_mod: t pipe.set_source(InMemorySourceStage(config, [df])) pipe.add_stage( LinearModulesStage(config, - web_scraper_definition, + web_scraper_loader, input_type=MessageMeta, output_type=MessageMeta, input_port_name="input", diff --git a/tests/pipeline/test_pipeline.py b/tests/pipeline/test_pipeline.py index c528f87684..40ca0b9612 100755 --- a/tests/pipeline/test_pipeline.py +++ b/tests/pipeline/test_pipeline.py @@ -153,7 +153,7 @@ def test_pipeline_narrowing_types(config: Config, filter_probs_df: DataFrameType expected_df = expected_df.rename(columns=dict(zip(expected_df.columns, config.class_labels))) pipe = LinearPipeline(config) - pipe.set_source(InMemorySourceStage(config, [filter_probs_df], use_cpp_message_meta=True)) + pipe.set_source(InMemorySourceStage(config, [filter_probs_df])) pipe.add_stage(DeserializeStage(config)) pipe.add_stage(ConvMsg(config)) pipe.add_stage(MultiMessagePassThruStage(config)) diff --git a/tests/pipeline/test_preallocation_pipe.py b/tests/pipeline/test_preallocation_pipe.py index a0f16cf963..3dbfe01dff 100755 --- a/tests/pipeline/test_preallocation_pipe.py +++ b/tests/pipeline/test_preallocation_pipe.py @@ -44,7 +44,7 @@ def test_preallocation(config, filter_probs_df, probs_type): for c in config.class_labels}) pipe = LinearPipeline(config) - mem_src = pipe.set_source(InMemorySourceStage(config, [filter_probs_df], use_cpp_message_meta=True)) + mem_src = pipe.set_source(InMemorySourceStage(config, [filter_probs_df])) pipe.add_stage(DeserializeStage(config)) pipe.add_stage(ConvMsg(config, columns=list(filter_probs_df.columns), probs_type=probs_np_type)) pipe.add_stage(CheckPreAlloc(config, probs_type=probs_type)) @@ -76,7 +76,7 @@ def test_preallocation_multi_segment_pipe(config, filter_probs_df, probs_type): for c in config.class_labels}) pipe = LinearPipeline(config) - mem_src = pipe.set_source(InMemorySourceStage(config, [filter_probs_df], use_cpp_message_meta=True)) + mem_src = pipe.set_source(InMemorySourceStage(config, [filter_probs_df])) pipe.add_segment_boundary(MessageMeta) pipe.add_stage(DeserializeStage(config)) pipe.add_segment_boundary(MultiMessage) @@ -108,7 +108,7 @@ def test_preallocation_error(config, filter_probs_df): config.class_labels = ['frogs', 'lizards', 'toads', 'turtles'] pipe = LinearPipeline(config) - mem_src = pipe.set_source(InMemorySourceStage(config, [filter_probs_df], use_cpp_message_meta=True)) + mem_src = pipe.set_source(InMemorySourceStage(config, [filter_probs_df])) pipe.add_stage(DeserializeStage(config)) pipe.add_stage(ConvMsg(config, columns=list(filter_probs_df.columns), probs_type='f4')) add_scores = pipe.add_stage(AddScoresStage(config)) diff --git a/tests/test_add_classifications_stage_pipe.py b/tests/test_add_classifications_stage_pipe.py index 2ec4883dab..79a9d2e0e3 100755 --- a/tests/test_add_classifications_stage_pipe.py +++ b/tests/test_add_classifications_stage_pipe.py @@ -48,7 +48,7 @@ def test_add_classifications_stage_pipe(config, filter_probs_df): threshold = 0.75 pipe = LinearPipeline(config) - pipe.set_source(InMemorySourceStage(config, [filter_probs_df], use_cpp_message_meta=True)) + pipe.set_source(InMemorySourceStage(config, [filter_probs_df])) pipe.add_stage(DeserializeStage(config)) pipe.add_stage(ConvMsg(config, filter_probs_df)) pipe.add_stage(AddClassificationsStage(config, threshold=threshold)) @@ -67,7 +67,7 @@ def test_add_classifications_stage_multi_segment_pipe(config, filter_probs_df): threshold = 0.75 pipe = LinearPipeline(config) - pipe.set_source(InMemorySourceStage(config, [filter_probs_df], use_cpp_message_meta=True)) + pipe.set_source(InMemorySourceStage(config, [filter_probs_df])) pipe.add_segment_boundary(MessageMeta) pipe.add_stage(DeserializeStage(config)) pipe.add_segment_boundary(MultiMessage) diff --git a/tests/test_deserialize_stage_pipe.py b/tests/test_deserialize_stage_pipe.py index 71fffd1691..67e6879dbd 100755 --- a/tests/test_deserialize_stage_pipe.py +++ b/tests/test_deserialize_stage_pipe.py @@ -65,7 +65,7 @@ def test_deserialize_pipe(config: Config, dataset: DatasetManager, dup_index: bo filter_probs_df = dataset.replace_index(filter_probs_df, {8: 7}) pipe = LinearPipeline(config) - pipe.set_source(InMemorySourceStage(config, [filter_probs_df], use_cpp_message_meta=True)) + pipe.set_source(InMemorySourceStage(config, [filter_probs_df])) pipe.add_stage(DeserializeStage(config)) pipe.add_stage(SerializeStage(config, include=[r'^v\d+$'])) comp_stage = pipe.add_stage(CompareDataFrameStage(config, dataset.pandas["filter_probs.csv"])) @@ -86,7 +86,7 @@ def test_deserialize_multi_segment_pipe(config: Config, dataset: DatasetManager, filter_probs_df = dataset.replace_index(filter_probs_df, {8: 7}) pipe = LinearPipeline(config) - pipe.set_source(InMemorySourceStage(config, [filter_probs_df], use_cpp_message_meta=True)) + pipe.set_source(InMemorySourceStage(config, [filter_probs_df])) pipe.add_segment_boundary(MessageMeta) pipe.add_stage(DeserializeStage(config)) pipe.add_stage(SerializeStage(config, include=[r'^v\d+$'])) diff --git a/tests/test_filter_detections_stage_pipe.py b/tests/test_filter_detections_stage_pipe.py index 198d069715..7ffad88bf6 100755 --- a/tests/test_filter_detections_stage_pipe.py +++ b/tests/test_filter_detections_stage_pipe.py @@ -58,7 +58,7 @@ def _test_filter_detections_stage_pipe(config: Config, threshold = 0.75 pipe = LinearPipeline(config) - pipe.set_source(InMemorySourceStage(config, [cudf.DataFrame(input_df)], use_cpp_message_meta=True)) + pipe.set_source(InMemorySourceStage(config, [cudf.DataFrame(input_df)])) pipe.add_stage(DeserializeStage(config)) pipe.add_stage(ConvMsg(config, order=order, columns=list(input_df.columns))) pipe.add_stage(FilterDetectionsStage(config, threshold=threshold, copy=copy)) @@ -75,7 +75,7 @@ def _test_filter_detections_stage_multi_segment_pipe(config: Config, dataset_pan input_df = dataset_pandas["filter_probs.csv"] pipe = LinearPipeline(config) - pipe.set_source(InMemorySourceStage(config, [cudf.DataFrame(input_df)], use_cpp_message_meta=True)) + pipe.set_source(InMemorySourceStage(config, [cudf.DataFrame(input_df)])) pipe.add_segment_boundary(MessageMeta) pipe.add_stage(DeserializeStage(config)) pipe.add_segment_boundary(MultiMessage) diff --git a/tests/test_validation_stage_pipe.py b/tests/test_validation_stage_pipe.py index b43d288f04..b094fd9458 100644 --- a/tests/test_validation_stage_pipe.py +++ b/tests/test_validation_stage_pipe.py @@ -35,7 +35,7 @@ def test_file_rw_serialize_deserialize_pipe(tmp_path, config, filter_probs_df, w results_file_name = None pipe = LinearPipeline(config) - pipe.set_source(InMemorySourceStage(config, [filter_probs_df], use_cpp_message_meta=True)) + pipe.set_source(InMemorySourceStage(config, [filter_probs_df])) pipe.add_stage(DeserializeStage(config)) val_stage = pipe.add_stage( ValidationStage(config, val_file_name=filter_probs_df.to_pandas(), results_file_name=results_file_name)) diff --git a/tests/test_write_to_databricks_deltalake_stage.py b/tests/test_write_to_databricks_deltalake_stage.py index 90d56369f3..bb1daedc7e 100644 --- a/tests/test_write_to_databricks_deltalake_stage.py +++ b/tests/test_write_to_databricks_deltalake_stage.py @@ -47,7 +47,7 @@ def test_databricks_deltalake_sink_stage_pipe(config: Config, dataset: DatasetMa mock_spark_df = mock.Mock() databricks_deltalake_sink_stage.spark.createDataFrame.return_value = mock_spark_df pipeline = LinearPipeline(config) - pipeline.set_source(InMemorySourceStage(config, [df_input_a], use_cpp_message_meta=True)) + pipeline.set_source(InMemorySourceStage(config, [df_input_a])) pipeline.add_stage(DeserializeStage(config)) pipeline.add_stage(SerializeStage(config)) pipeline.add_stage(databricks_deltalake_sink_stage)