Skip to content

Commit

Permalink
Unit test fixes, formatting updates
Browse files Browse the repository at this point in the history
  • Loading branch information
drobison00 committed Jan 24, 2024
1 parent c2fefb4 commit d732c0b
Show file tree
Hide file tree
Showing 14 changed files with 29 additions and 29 deletions.
3 changes: 1 addition & 2 deletions examples/llm/common/vdb_resource_tagging_module.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,10 @@
from pydantic import ValidationError

from morpheus.messages import ControlMessage
from morpheus.modules.schemas.examples.llm.vdb_resource_tagging_schema import VDBResourceTaggingSchema
from morpheus.utils.module_utils import ModuleLoaderFactory
from morpheus.utils.module_utils import register_module

from .vdb_resource_tagging_schema import VDBResourceTaggingSchema

logger = logging.getLogger(__name__)

VDBResourceTaggingLoaderFactory = ModuleLoaderFactory("vdb_resource_tagging",
Expand Down
3 changes: 1 addition & 2 deletions examples/llm/common/web_scraper_module.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,11 +33,10 @@
import cudf

from morpheus.messages import MessageMeta
from morpheus.modules.schemas.examples.llm.web_scraper_schema import WebScraperSchema
from morpheus.utils.module_utils import ModuleLoaderFactory
from morpheus.utils.module_utils import register_module

from .web_scraper_schema import WebScraperSchema

logger = logging.getLogger(__name__)

WebScraperLoaderFactory = ModuleLoaderFactory("web_scraper", "morpheus_examples_llm", WebScraperSchema)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
logger = logging.getLogger(__name__)


class CSVConverterParamContract(BaseModel):
class CSVConverterSchema(BaseModel):
chunk_overlap: int = 102 # Example default value
chunk_size: int = 1024
text_column_names: List[str]
Expand All @@ -44,7 +44,7 @@ def validate_converters_meta(cls, v):
validated_meta = {}
for key, value in v.items():
if key.lower() == 'csv':
validated_meta[key] = CSVConverterParamContract(**value)
validated_meta[key] = CSVConverterSchema(**value)
else:
validated_meta[key] = value
return validated_meta
Expand Down
2 changes: 1 addition & 1 deletion morpheus/pipeline/stage_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -279,7 +279,7 @@ def supports_cpp_node(self):

def _build_cpp_node(self):
"""
Specifies whether or not to build a C++ node. Only should be called during the build phase.
Specifies whether to build a C++ node. Only should be called during the build phase.
"""
return CppConfig.get_should_use_cpp() and self.supports_cpp_node()

Expand Down
11 changes: 7 additions & 4 deletions morpheus/stages/input/file_source_stage.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

import mrc

from morpheus._lib.messages import MessageMeta as CppMessageMeta
from morpheus.cli import register_stage
from morpheus.common import FileTypes
from morpheus.config import Config
Expand Down Expand Up @@ -55,7 +56,7 @@ class FileSourceStage(PreallocatorMixin, SingleOutputSource):
repeat : int, default = 1, min = 1
Repeats the input dataset multiple times. Useful to extend small datasets for debugging.
filter_null : bool, default = True
Whether or not to filter rows with null 'data' column. Null values in the 'data' column can cause issues down
Whether to filter rows with null 'data' column. Null values in the 'data' column can cause issues down
the line with processing. Setting this to True is recommended.
parser_kwargs : dict, default = {}
Extra options to pass to the file parser.
Expand Down Expand Up @@ -98,7 +99,7 @@ def input_count(self) -> int:
return self._input_count

def supports_cpp_node(self) -> bool:
"""Indicates whether or not this stage supports a C++ node"""
"""Indicates whether this stage supports a C++ node"""
return True

def compute_schema(self, schema: StageSchema):
Expand Down Expand Up @@ -129,8 +130,10 @@ def _generate_frames(self) -> typing.Iterable[MessageMeta]:
)

for i in range(self._repeat_count):

x = MessageMeta(df)
if (self._build_cpp_node):
x = CppMessageMeta(df)
else:
x = MessageMeta(df)

# If we are looping, copy the object. Do this before we push the object in case it changes
if (i + 1 < self._repeat_count):
Expand Down
7 changes: 3 additions & 4 deletions morpheus/stages/input/in_memory_source_stage.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,27 +40,26 @@ class InMemorySourceStage(PreallocatorMixin, SingleOutputSource):
Repeats the input dataset multiple times. Useful to extend small datasets for debugging.
"""

def __init__(self, c: Config, dataframes: typing.List[cudf.DataFrame], repeat: int = 1, use_cpp_message_meta=False):
def __init__(self, c: Config, dataframes: typing.List[cudf.DataFrame], repeat: int = 1):
super().__init__(c)

self._dataframes = dataframes
self._repeat_count = repeat
self._use_cpp_message_meta = use_cpp_message_meta

@property
def name(self) -> str:
return "from-mem"

def supports_cpp_node(self) -> bool:
return False
return True

def compute_schema(self, schema: StageSchema):
schema.output_schema.set_type(MessageMeta)

def _generate_frames(self) -> typing.Iterator[MessageMeta]:
for i in range(self._repeat_count):
for k, df in enumerate(self._dataframes):
if (self._use_cpp_message_meta):
if (self._build_cpp_node):
x = MessageMetaCpp(df)
else:
x = MessageMeta(df)
Expand Down
4 changes: 2 additions & 2 deletions tests/examples/llm/common/test_web_scraper_module.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ def test_web_scraper_module(config: Config, mock_rest_server: str, import_mod: t
df = cudf.DataFrame({"link": [url]})
df_expected = cudf.DataFrame({"link": [url], "page_content": "website title some paragraph"})

web_scraper_definition = import_mod.WebScraperInterface.get_instance(
web_scraper_loader = import_mod.WebScraperLoaderFactory.get_instance(
"web_scraper",
module_config={
"web_scraper_config": {
Expand All @@ -55,7 +55,7 @@ def test_web_scraper_module(config: Config, mock_rest_server: str, import_mod: t
pipe.set_source(InMemorySourceStage(config, [df]))
pipe.add_stage(
LinearModulesStage(config,
web_scraper_definition,
web_scraper_loader,
input_type=MessageMeta,
output_type=MessageMeta,
input_port_name="input",
Expand Down
2 changes: 1 addition & 1 deletion tests/pipeline/test_pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -153,7 +153,7 @@ def test_pipeline_narrowing_types(config: Config, filter_probs_df: DataFrameType
expected_df = expected_df.rename(columns=dict(zip(expected_df.columns, config.class_labels)))

pipe = LinearPipeline(config)
pipe.set_source(InMemorySourceStage(config, [filter_probs_df], use_cpp_message_meta=True))
pipe.set_source(InMemorySourceStage(config, [filter_probs_df]))
pipe.add_stage(DeserializeStage(config))
pipe.add_stage(ConvMsg(config))
pipe.add_stage(MultiMessagePassThruStage(config))
Expand Down
6 changes: 3 additions & 3 deletions tests/pipeline/test_preallocation_pipe.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ def test_preallocation(config, filter_probs_df, probs_type):
for c in config.class_labels})

pipe = LinearPipeline(config)
mem_src = pipe.set_source(InMemorySourceStage(config, [filter_probs_df], use_cpp_message_meta=True))
mem_src = pipe.set_source(InMemorySourceStage(config, [filter_probs_df]))
pipe.add_stage(DeserializeStage(config))
pipe.add_stage(ConvMsg(config, columns=list(filter_probs_df.columns), probs_type=probs_np_type))
pipe.add_stage(CheckPreAlloc(config, probs_type=probs_type))
Expand Down Expand Up @@ -76,7 +76,7 @@ def test_preallocation_multi_segment_pipe(config, filter_probs_df, probs_type):
for c in config.class_labels})

pipe = LinearPipeline(config)
mem_src = pipe.set_source(InMemorySourceStage(config, [filter_probs_df], use_cpp_message_meta=True))
mem_src = pipe.set_source(InMemorySourceStage(config, [filter_probs_df]))
pipe.add_segment_boundary(MessageMeta)
pipe.add_stage(DeserializeStage(config))
pipe.add_segment_boundary(MultiMessage)
Expand Down Expand Up @@ -108,7 +108,7 @@ def test_preallocation_error(config, filter_probs_df):
config.class_labels = ['frogs', 'lizards', 'toads', 'turtles']

pipe = LinearPipeline(config)
mem_src = pipe.set_source(InMemorySourceStage(config, [filter_probs_df], use_cpp_message_meta=True))
mem_src = pipe.set_source(InMemorySourceStage(config, [filter_probs_df]))
pipe.add_stage(DeserializeStage(config))
pipe.add_stage(ConvMsg(config, columns=list(filter_probs_df.columns), probs_type='f4'))
add_scores = pipe.add_stage(AddScoresStage(config))
Expand Down
4 changes: 2 additions & 2 deletions tests/test_add_classifications_stage_pipe.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ def test_add_classifications_stage_pipe(config, filter_probs_df):
threshold = 0.75

pipe = LinearPipeline(config)
pipe.set_source(InMemorySourceStage(config, [filter_probs_df], use_cpp_message_meta=True))
pipe.set_source(InMemorySourceStage(config, [filter_probs_df]))
pipe.add_stage(DeserializeStage(config))
pipe.add_stage(ConvMsg(config, filter_probs_df))
pipe.add_stage(AddClassificationsStage(config, threshold=threshold))
Expand All @@ -67,7 +67,7 @@ def test_add_classifications_stage_multi_segment_pipe(config, filter_probs_df):
threshold = 0.75

pipe = LinearPipeline(config)
pipe.set_source(InMemorySourceStage(config, [filter_probs_df], use_cpp_message_meta=True))
pipe.set_source(InMemorySourceStage(config, [filter_probs_df]))
pipe.add_segment_boundary(MessageMeta)
pipe.add_stage(DeserializeStage(config))
pipe.add_segment_boundary(MultiMessage)
Expand Down
4 changes: 2 additions & 2 deletions tests/test_deserialize_stage_pipe.py
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ def test_deserialize_pipe(config: Config, dataset: DatasetManager, dup_index: bo
filter_probs_df = dataset.replace_index(filter_probs_df, {8: 7})

pipe = LinearPipeline(config)
pipe.set_source(InMemorySourceStage(config, [filter_probs_df], use_cpp_message_meta=True))
pipe.set_source(InMemorySourceStage(config, [filter_probs_df]))
pipe.add_stage(DeserializeStage(config))
pipe.add_stage(SerializeStage(config, include=[r'^v\d+$']))
comp_stage = pipe.add_stage(CompareDataFrameStage(config, dataset.pandas["filter_probs.csv"]))
Expand All @@ -86,7 +86,7 @@ def test_deserialize_multi_segment_pipe(config: Config, dataset: DatasetManager,
filter_probs_df = dataset.replace_index(filter_probs_df, {8: 7})

pipe = LinearPipeline(config)
pipe.set_source(InMemorySourceStage(config, [filter_probs_df], use_cpp_message_meta=True))
pipe.set_source(InMemorySourceStage(config, [filter_probs_df]))
pipe.add_segment_boundary(MessageMeta)
pipe.add_stage(DeserializeStage(config))
pipe.add_stage(SerializeStage(config, include=[r'^v\d+$']))
Expand Down
4 changes: 2 additions & 2 deletions tests/test_filter_detections_stage_pipe.py
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ def _test_filter_detections_stage_pipe(config: Config,
threshold = 0.75

pipe = LinearPipeline(config)
pipe.set_source(InMemorySourceStage(config, [cudf.DataFrame(input_df)], use_cpp_message_meta=True))
pipe.set_source(InMemorySourceStage(config, [cudf.DataFrame(input_df)]))
pipe.add_stage(DeserializeStage(config))
pipe.add_stage(ConvMsg(config, order=order, columns=list(input_df.columns)))
pipe.add_stage(FilterDetectionsStage(config, threshold=threshold, copy=copy))
Expand All @@ -75,7 +75,7 @@ def _test_filter_detections_stage_multi_segment_pipe(config: Config, dataset_pan

input_df = dataset_pandas["filter_probs.csv"]
pipe = LinearPipeline(config)
pipe.set_source(InMemorySourceStage(config, [cudf.DataFrame(input_df)], use_cpp_message_meta=True))
pipe.set_source(InMemorySourceStage(config, [cudf.DataFrame(input_df)]))
pipe.add_segment_boundary(MessageMeta)
pipe.add_stage(DeserializeStage(config))
pipe.add_segment_boundary(MultiMessage)
Expand Down
2 changes: 1 addition & 1 deletion tests/test_validation_stage_pipe.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ def test_file_rw_serialize_deserialize_pipe(tmp_path, config, filter_probs_df, w
results_file_name = None

pipe = LinearPipeline(config)
pipe.set_source(InMemorySourceStage(config, [filter_probs_df], use_cpp_message_meta=True))
pipe.set_source(InMemorySourceStage(config, [filter_probs_df]))
pipe.add_stage(DeserializeStage(config))
val_stage = pipe.add_stage(
ValidationStage(config, val_file_name=filter_probs_df.to_pandas(), results_file_name=results_file_name))
Expand Down
2 changes: 1 addition & 1 deletion tests/test_write_to_databricks_deltalake_stage.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ def test_databricks_deltalake_sink_stage_pipe(config: Config, dataset: DatasetMa
mock_spark_df = mock.Mock()
databricks_deltalake_sink_stage.spark.createDataFrame.return_value = mock_spark_df
pipeline = LinearPipeline(config)
pipeline.set_source(InMemorySourceStage(config, [df_input_a], use_cpp_message_meta=True))
pipeline.set_source(InMemorySourceStage(config, [df_input_a]))
pipeline.add_stage(DeserializeStage(config))
pipeline.add_stage(SerializeStage(config))
pipeline.add_stage(databricks_deltalake_sink_stage)
Expand Down

0 comments on commit d732c0b

Please sign in to comment.