From ba41ecfa7d847b2efb249ecb76e9a102bfabcace Mon Sep 17 00:00:00 2001 From: Bhargav Suryadevara Date: Wed, 17 Jan 2024 13:08:27 -0800 Subject: [PATCH 1/5] Updated vdb_upload pipe tests --- .../llm/vdb_upload/{common.py => helper.py} | 4 +- examples/llm/vdb_upload/langchain.py | 2 +- .../llm/vdb_upload/module/file_source_pipe.py | 4 +- .../llm/vdb_upload/module/rss_source_pipe.py | 4 +- examples/llm/vdb_upload/pipeline.py | 2 +- examples/llm/vdb_upload/run.py | 8 +- tests/llm/test_vdb_upload_pipe.py | 96 ++++++------------- .../examples/llm/vdb_upload/vdb_config.yaml | 3 + 8 files changed, 43 insertions(+), 80 deletions(-) rename examples/llm/vdb_upload/{common.py => helper.py} (99%) create mode 100644 tests/tests_data/examples/llm/vdb_upload/vdb_config.yaml diff --git a/examples/llm/vdb_upload/common.py b/examples/llm/vdb_upload/helper.py similarity index 99% rename from examples/llm/vdb_upload/common.py rename to examples/llm/vdb_upload/helper.py index ff010aad55..90c0f22156 100644 --- a/examples/llm/vdb_upload/common.py +++ b/examples/llm/vdb_upload/helper.py @@ -21,8 +21,8 @@ from morpheus.messages.multi_message import MultiMessage from morpheus.pipeline.pipeline import Pipeline from morpheus.stages.general.linear_modules_source import LinearModuleSourceStage -from .module.file_source_pipe import FileSourcePipe -from .module.rss_source_pipe import RSSSourcePipe +from module.file_source_pipe import FileSourcePipe +from module.rss_source_pipe import RSSSourcePipe logger = logging.getLogger(__name__) diff --git a/examples/llm/vdb_upload/langchain.py b/examples/llm/vdb_upload/langchain.py index f66d75d9d3..bac532af6b 100644 --- a/examples/llm/vdb_upload/langchain.py +++ b/examples/llm/vdb_upload/langchain.py @@ -19,7 +19,7 @@ from langchain.text_splitter import RecursiveCharacterTextSplitter from langchain.vectorstores.milvus import Milvus -from llm.vdb_upload.common import build_rss_urls +from examples.llm.vdb_upload.helper import build_rss_urls from morpheus.utils.logging_timer import log_time logger = logging.getLogger(__name__) diff --git a/examples/llm/vdb_upload/module/file_source_pipe.py b/examples/llm/vdb_upload/module/file_source_pipe.py index 67f15d456f..2398fbaa15 100644 --- a/examples/llm/vdb_upload/module/file_source_pipe.py +++ b/examples/llm/vdb_upload/module/file_source_pipe.py @@ -28,8 +28,8 @@ from morpheus.modules.preprocess.deserialize import DeserializeInterface from morpheus.utils.module_utils import ModuleInterface from morpheus.utils.module_utils import register_module -from .schema_transform import SchemaTransformInterface -from ...common.content_extractor_module import FileContentExtractorInterface +from module.schema_transform import SchemaTransformInterface +from common.content_extractor_module import FileContentExtractorInterface logger = logging.getLogger(__name__) diff --git a/examples/llm/vdb_upload/module/rss_source_pipe.py b/examples/llm/vdb_upload/module/rss_source_pipe.py index f8c9b6bf05..937b41f60f 100644 --- a/examples/llm/vdb_upload/module/rss_source_pipe.py +++ b/examples/llm/vdb_upload/module/rss_source_pipe.py @@ -29,8 +29,8 @@ from morpheus.modules.preprocess.deserialize import DeserializeInterface from morpheus.utils.module_utils import ModuleInterface from morpheus.utils.module_utils import register_module -from .schema_transform import SchemaTransformInterface -from ...common.web_scraper_module import WebScraperInterface +from module.schema_transform import SchemaTransformInterface +from common.web_scraper_module import WebScraperInterface logger = logging.getLogger(__name__) diff --git a/examples/llm/vdb_upload/pipeline.py b/examples/llm/vdb_upload/pipeline.py index cab3906d48..34049b6974 100644 --- a/examples/llm/vdb_upload/pipeline.py +++ b/examples/llm/vdb_upload/pipeline.py @@ -23,7 +23,7 @@ from morpheus.stages.inference.triton_inference_stage import TritonInferenceStage from morpheus.stages.output.write_to_vector_db_stage import WriteToVectorDBStage from morpheus.stages.preprocess.preprocess_nlp_stage import PreprocessNLPStage -from .common import process_vdb_sources +from helper import process_vdb_sources logger = logging.getLogger(__name__) diff --git a/examples/llm/vdb_upload/run.py b/examples/llm/vdb_upload/run.py index 423a7e52de..877b1c401e 100644 --- a/examples/llm/vdb_upload/run.py +++ b/examples/llm/vdb_upload/run.py @@ -19,9 +19,9 @@ import yaml from morpheus.config import Config, PipelineModes -from .common import build_defualt_milvus_config -from ..common.utils import build_rss_urls -from ..common.utils import build_milvus_config +from helper import build_defualt_milvus_config +from common.utils import build_rss_urls +from common.utils import build_milvus_config logger = logging.getLogger(__name__) @@ -315,7 +315,7 @@ def build_final_config(vdb_conf_path, cli_source_conf, cli_embeddings_conf, cli_ source_conf = vdb_pipeline_config.get('sources', []) + list(cli_source_conf.values()) tokenizer_conf = merge_configs(vdb_pipeline_config.get('tokenizer', {}), cli_tokenizer_conf) vdb_conf = vdb_pipeline_config.get('vdb', {}) - resource_schema = vdb_conf.pop("resource_shema", None) + resource_schema = vdb_conf.pop("resource_schema", None) if resource_schema: vdb_conf["resource_kwargs"] = build_milvus_config(resource_schema) diff --git a/tests/llm/test_vdb_upload_pipe.py b/tests/llm/test_vdb_upload_pipe.py index fb0599f938..e0e5e23e31 100644 --- a/tests/llm/test_vdb_upload_pipe.py +++ b/tests/llm/test_vdb_upload_pipe.py @@ -26,78 +26,35 @@ from _utils import TEST_DIRS from _utils import mk_async_infer from _utils.dataset_manager import DatasetManager -from morpheus.config import Config -from morpheus.config import PipelineModes -from morpheus.pipeline.linear_pipeline import LinearPipeline from morpheus.service.vdb.milvus_vector_db_service import MilvusVectorDBService -from morpheus.stages.inference.triton_inference_stage import TritonInferenceStage -from morpheus.stages.input.rss_source_stage import RSSSourceStage -from morpheus.stages.output.write_to_vector_db_stage import WriteToVectorDBStage -from morpheus.stages.preprocess.deserialize_stage import DeserializeStage -from morpheus.stages.preprocess.preprocess_nlp_stage import PreprocessNLPStage EMBEDDING_SIZE = 384 MODEL_MAX_BATCH_SIZE = 64 MODEL_FEA_LENGTH = 512 +@pytest.fixture(scope="session", name="vdb_conf_path") +def vdb_conf_path_fixture(): + vdb_conf_path = os.path.join(TEST_DIRS.tests_data_dir, "examples/llm/vdb_upload/vdb_config.yaml") + return vdb_conf_path -def _run_pipeline(config: Config, - milvus_server_uri: str, - collection_name: str, - rss_files: list[str], - utils_mod: types.ModuleType, - web_scraper_stage_mod: types.ModuleType): - - config.mode = PipelineModes.NLP - config.pipeline_batch_size = 1024 - config.model_max_batch_size = MODEL_MAX_BATCH_SIZE - config.feature_length = MODEL_FEA_LENGTH - config.edge_buffer_size = 128 - config.class_labels = [str(i) for i in range(EMBEDDING_SIZE)] - - pipe = LinearPipeline(config) - - pipe.set_source( - RSSSourceStage(config, feed_input=rss_files, batch_size=128, run_indefinitely=False, enable_cache=False)) - pipe.add_stage(web_scraper_stage_mod.WebScraperStage(config, chunk_size=MODEL_FEA_LENGTH, enable_cache=False)) - pipe.add_stage(DeserializeStage(config)) - - pipe.add_stage( - PreprocessNLPStage(config, - vocab_hash_file=os.path.join(TEST_DIRS.data_dir, 'bert-base-uncased-hash.txt'), - do_lower_case=True, - truncation=True, - add_special_tokens=False, - column='page_content')) - - pipe.add_stage( - TritonInferenceStage(config, model_name='test-model', server_url='test:0000', force_convert_inputs=True)) - - pipe.add_stage( - WriteToVectorDBStage(config, - resource_name=collection_name, - resource_kwargs=utils_mod.build_milvus_config(embedding_size=EMBEDDING_SIZE), - recreate=True, - service="milvus", - uri=milvus_server_uri)) - pipe.run() - - + @pytest.mark.milvus @pytest.mark.use_python @pytest.mark.use_pandas @pytest.mark.import_mod([ - os.path.join(TEST_DIRS.examples_dir, 'llm/common/utils.py'), - os.path.join(TEST_DIRS.examples_dir, 'llm/common/web_scraper_stage.py') -]) + os.path.join(TEST_DIRS.examples_dir, 'llm/common'), + os.path.join(TEST_DIRS.examples_dir, 'llm/vdb_upload/helper.py'), + os.path.join(TEST_DIRS.examples_dir, 'llm/vdb_upload/run.py'), + os.path.join(TEST_DIRS.examples_dir, 'llm/vdb_upload/pipeline.py')] +) @mock.patch('requests.Session') @mock.patch('tritonclient.grpc.InferenceServerClient') def test_vdb_upload_pipe(mock_triton_client: mock.MagicMock, mock_requests_session: mock.MagicMock, - config: Config, dataset: DatasetManager, - milvus_server_uri: str, - import_mod: list[types.ModuleType]): + # milvus_server_uri: str, + import_mod: list[types.ModuleType], + vdb_conf_path: dict): # We're going to use this DF to both provide values to the mocked Tritonclient, # but also to verify the values in the Milvus collection. expected_values_df = dataset["service/milvus_rss_data.json"] @@ -147,18 +104,21 @@ def mock_get_fn(url: str): mock_requests_session.return_value = mock_requests_session mock_requests_session.get.side_effect = mock_get_fn - (utils_mod, web_scraper_stage_mod) = import_mod - collection_name = "test_vdb_upload_pipe" - rss_source_file = os.path.join(TEST_DIRS.tests_data_dir, 'service/cisa_rss_feed.xml') - - _run_pipeline(config=config, - milvus_server_uri=milvus_server_uri, - collection_name=collection_name, - rss_files=[rss_source_file], - utils_mod=utils_mod, - web_scraper_stage_mod=web_scraper_stage_mod) - - milvus_service = MilvusVectorDBService(uri=milvus_server_uri) + _, _, vdb_upload_run_mod, vdb_upload_pipeline_mod = import_mod + + vdb_pipeline_config = vdb_upload_run_mod.build_final_config(vdb_conf_path=vdb_conf_path, + cli_source_conf={}, + cli_embeddings_conf={}, + cli_pipeline_conf={}, + cli_tokenizer_conf={}, + cli_vdb_conf={}) + + vdb_pipeline_config["vdb_config"]["uri"] = "http://localhost:19530" + collection_name = vdb_pipeline_config["vdb_config"]["resource_name"] + + vdb_upload_pipeline_mod.pipeline(**vdb_pipeline_config) + + milvus_service = MilvusVectorDBService(uri="http://localhost:19530") resource_service = milvus_service.load_resource(name=collection_name) assert resource_service.count() == len(expected_values_df) diff --git a/tests/tests_data/examples/llm/vdb_upload/vdb_config.yaml b/tests/tests_data/examples/llm/vdb_upload/vdb_config.yaml new file mode 100644 index 0000000000..20dec760ed --- /dev/null +++ b/tests/tests_data/examples/llm/vdb_upload/vdb_config.yaml @@ -0,0 +1,3 @@ +version https://git-lfs.github.com/spec/v1 +oid sha256:cd4874c6695f9807c8456f51604307ec05ece1671969d5eecefe55618f200765 +size 3560 From 3657a0d069568426166339151ed3ced3e6fa1cc2 Mon Sep 17 00:00:00 2001 From: Bhargav Suryadevara Date: Wed, 17 Jan 2024 15:15:36 -0800 Subject: [PATCH 2/5] Updated vdb_upload pipeline test --- tests/llm/test_vdb_upload_pipe.py | 49 ++++++++++++++++--------------- 1 file changed, 25 insertions(+), 24 deletions(-) diff --git a/tests/llm/test_vdb_upload_pipe.py b/tests/llm/test_vdb_upload_pipe.py index e0e5e23e31..a3f33dde06 100644 --- a/tests/llm/test_vdb_upload_pipe.py +++ b/tests/llm/test_vdb_upload_pipe.py @@ -18,6 +18,7 @@ import os import types from unittest import mock +from morpheus.config import Config import numpy as np import pandas as pd @@ -28,10 +29,6 @@ from _utils.dataset_manager import DatasetManager from morpheus.service.vdb.milvus_vector_db_service import MilvusVectorDBService -EMBEDDING_SIZE = 384 -MODEL_MAX_BATCH_SIZE = 64 -MODEL_FEA_LENGTH = 512 - @pytest.fixture(scope="session", name="vdb_conf_path") def vdb_conf_path_fixture(): vdb_conf_path = os.path.join(TEST_DIRS.tests_data_dir, "examples/llm/vdb_upload/vdb_config.yaml") @@ -52,25 +49,41 @@ def vdb_conf_path_fixture(): def test_vdb_upload_pipe(mock_triton_client: mock.MagicMock, mock_requests_session: mock.MagicMock, dataset: DatasetManager, - # milvus_server_uri: str, + milvus_server_uri: str, import_mod: list[types.ModuleType], vdb_conf_path: dict): # We're going to use this DF to both provide values to the mocked Tritonclient, # but also to verify the values in the Milvus collection. expected_values_df = dataset["service/milvus_rss_data.json"] + expected_values_df = expected_values_df.rename(columns={"page_content": "content"}) + expected_values_df["source"] = "rss" with open(os.path.join(TEST_DIRS.tests_data_dir, 'service/cisa_web_responses.json'), encoding='utf-8') as fh: web_responses = json.load(fh) + _, _, vdb_upload_run_mod, vdb_upload_pipeline_mod = import_mod + + vdb_pipeline_config = vdb_upload_run_mod.build_final_config(vdb_conf_path=vdb_conf_path, + cli_source_conf={}, + cli_embeddings_conf={}, + cli_pipeline_conf={}, + cli_tokenizer_conf={}, + cli_vdb_conf={}) + + config: Config = vdb_pipeline_config["pipeline_config"] + + vdb_pipeline_config["vdb_config"]["uri"] = milvus_server_uri + collection_name = vdb_pipeline_config["vdb_config"]["resource_name"] + # Mock Triton results mock_metadata = { "inputs": [{ - "name": "input_ids", "datatype": "INT32", "shape": [-1, MODEL_FEA_LENGTH] + "name": "input_ids", "datatype": "INT32", "shape": [-1, config.feature_length] }, { - "name": "attention_mask", "datatype": "INT32", "shape": [-1, MODEL_FEA_LENGTH] + "name": "attention_mask", "datatype": "INT32", "shape": [-1, config.feature_length] }], "outputs": [{ - "name": "output", "datatype": "FP32", "shape": [-1, EMBEDDING_SIZE] + "name": "output", "datatype": "FP32", "shape": [-1, len(config.class_labels)] }] } mock_model_config = {"config": {"max_batch_size": 256}} @@ -84,7 +97,7 @@ def test_vdb_upload_pipe(mock_triton_client: mock.MagicMock, mock_result_values = expected_values_df['embedding'].to_list() inf_results = np.split(mock_result_values, - range(MODEL_MAX_BATCH_SIZE, len(mock_result_values), MODEL_MAX_BATCH_SIZE)) + range(config.model_max_batch_size, len(mock_result_values), config.model_max_batch_size)) # The triton client is going to perform a logits function, calculate the inverse of it here inf_results = [np.log((1.0 / x) - 1.0) * -1 for x in inf_results] @@ -103,31 +116,19 @@ def mock_get_fn(url: str): mock_requests_session.return_value = mock_requests_session mock_requests_session.get.side_effect = mock_get_fn - - _, _, vdb_upload_run_mod, vdb_upload_pipeline_mod = import_mod - - vdb_pipeline_config = vdb_upload_run_mod.build_final_config(vdb_conf_path=vdb_conf_path, - cli_source_conf={}, - cli_embeddings_conf={}, - cli_pipeline_conf={}, - cli_tokenizer_conf={}, - cli_vdb_conf={}) - - vdb_pipeline_config["vdb_config"]["uri"] = "http://localhost:19530" - collection_name = vdb_pipeline_config["vdb_config"]["resource_name"] vdb_upload_pipeline_mod.pipeline(**vdb_pipeline_config) - milvus_service = MilvusVectorDBService(uri="http://localhost:19530") + milvus_service = MilvusVectorDBService(uri=milvus_server_uri) resource_service = milvus_service.load_resource(name=collection_name) assert resource_service.count() == len(expected_values_df) db_results = resource_service.query("", offset=0, limit=resource_service.count()) db_df = pd.DataFrame(sorted(db_results, key=lambda k: k['id'])) - + # The comparison function performs rounding on the values, but is unable to do so for array columns - dataset.assert_compare_df(db_df, expected_values_df[db_df.columns], exclude_columns=['id', 'embedding']) + dataset.assert_compare_df(db_df, expected_values_df[db_df.columns], exclude_columns=['id', 'source', 'embedding']) db_emb = db_df['embedding'] expected_emb = expected_values_df['embedding'] From cbc7f74e9926b4927f2eb76bc583709e72383448 Mon Sep 17 00:00:00 2001 From: Bhargav Suryadevara Date: Thu, 18 Jan 2024 09:15:31 -0800 Subject: [PATCH 3/5] Updated vdb upload pipeline test --- examples/llm/vdb_upload/helper.py | 4 +- .../llm/vdb_upload/module/file_source_pipe.py | 2 +- .../llm/vdb_upload/module/rss_source_pipe.py | 2 +- examples/llm/vdb_upload/pipeline.py | 2 +- examples/llm/vdb_upload/run.py | 2 +- tests/llm/test_vdb_upload_pipe.py | 101 ++++++++++++++++-- .../examples/llm/vdb_upload/test_data.csv | 3 + .../llm/vdb_upload/test_data_output.json | 3 + .../vdb_upload/vdb_file_source_config.yaml | 3 + .../llm/vdb_upload/vdb_rss_source_config.yaml | 3 + 10 files changed, 110 insertions(+), 15 deletions(-) create mode 100755 tests/tests_data/examples/llm/vdb_upload/test_data.csv create mode 100755 tests/tests_data/examples/llm/vdb_upload/test_data_output.json create mode 100755 tests/tests_data/examples/llm/vdb_upload/vdb_file_source_config.yaml create mode 100755 tests/tests_data/examples/llm/vdb_upload/vdb_rss_source_config.yaml diff --git a/examples/llm/vdb_upload/helper.py b/examples/llm/vdb_upload/helper.py index 90c0f22156..9bc8e9c3c2 100644 --- a/examples/llm/vdb_upload/helper.py +++ b/examples/llm/vdb_upload/helper.py @@ -21,8 +21,8 @@ from morpheus.messages.multi_message import MultiMessage from morpheus.pipeline.pipeline import Pipeline from morpheus.stages.general.linear_modules_source import LinearModuleSourceStage -from module.file_source_pipe import FileSourcePipe -from module.rss_source_pipe import RSSSourcePipe +from vdb_upload.module.file_source_pipe import FileSourcePipe +from vdb_upload.module.rss_source_pipe import RSSSourcePipe logger = logging.getLogger(__name__) diff --git a/examples/llm/vdb_upload/module/file_source_pipe.py b/examples/llm/vdb_upload/module/file_source_pipe.py index 2398fbaa15..54ef7962cc 100644 --- a/examples/llm/vdb_upload/module/file_source_pipe.py +++ b/examples/llm/vdb_upload/module/file_source_pipe.py @@ -28,7 +28,7 @@ from morpheus.modules.preprocess.deserialize import DeserializeInterface from morpheus.utils.module_utils import ModuleInterface from morpheus.utils.module_utils import register_module -from module.schema_transform import SchemaTransformInterface +from vdb_upload.module.schema_transform import SchemaTransformInterface from common.content_extractor_module import FileContentExtractorInterface logger = logging.getLogger(__name__) diff --git a/examples/llm/vdb_upload/module/rss_source_pipe.py b/examples/llm/vdb_upload/module/rss_source_pipe.py index 937b41f60f..3b6d5de1ae 100644 --- a/examples/llm/vdb_upload/module/rss_source_pipe.py +++ b/examples/llm/vdb_upload/module/rss_source_pipe.py @@ -29,7 +29,7 @@ from morpheus.modules.preprocess.deserialize import DeserializeInterface from morpheus.utils.module_utils import ModuleInterface from morpheus.utils.module_utils import register_module -from module.schema_transform import SchemaTransformInterface +from vdb_upload.module.schema_transform import SchemaTransformInterface from common.web_scraper_module import WebScraperInterface logger = logging.getLogger(__name__) diff --git a/examples/llm/vdb_upload/pipeline.py b/examples/llm/vdb_upload/pipeline.py index 34049b6974..3f977d0bfa 100644 --- a/examples/llm/vdb_upload/pipeline.py +++ b/examples/llm/vdb_upload/pipeline.py @@ -23,7 +23,7 @@ from morpheus.stages.inference.triton_inference_stage import TritonInferenceStage from morpheus.stages.output.write_to_vector_db_stage import WriteToVectorDBStage from morpheus.stages.preprocess.preprocess_nlp_stage import PreprocessNLPStage -from helper import process_vdb_sources +from vdb_upload.helper import process_vdb_sources logger = logging.getLogger(__name__) diff --git a/examples/llm/vdb_upload/run.py b/examples/llm/vdb_upload/run.py index 877b1c401e..9f21371b08 100644 --- a/examples/llm/vdb_upload/run.py +++ b/examples/llm/vdb_upload/run.py @@ -19,7 +19,7 @@ import yaml from morpheus.config import Config, PipelineModes -from helper import build_defualt_milvus_config +from vdb_upload.helper import build_defualt_milvus_config from common.utils import build_rss_urls from common.utils import build_milvus_config diff --git a/tests/llm/test_vdb_upload_pipe.py b/tests/llm/test_vdb_upload_pipe.py index a3f33dde06..34dd71fbfd 100644 --- a/tests/llm/test_vdb_upload_pipe.py +++ b/tests/llm/test_vdb_upload_pipe.py @@ -29,12 +29,7 @@ from _utils.dataset_manager import DatasetManager from morpheus.service.vdb.milvus_vector_db_service import MilvusVectorDBService -@pytest.fixture(scope="session", name="vdb_conf_path") -def vdb_conf_path_fixture(): - vdb_conf_path = os.path.join(TEST_DIRS.tests_data_dir, "examples/llm/vdb_upload/vdb_config.yaml") - return vdb_conf_path - - + @pytest.mark.milvus @pytest.mark.use_python @pytest.mark.use_pandas @@ -46,23 +41,26 @@ def vdb_conf_path_fixture(): ) @mock.patch('requests.Session') @mock.patch('tritonclient.grpc.InferenceServerClient') -def test_vdb_upload_pipe(mock_triton_client: mock.MagicMock, +def test_vdb_upload_rss_source_pipe(mock_triton_client: mock.MagicMock, mock_requests_session: mock.MagicMock, dataset: DatasetManager, milvus_server_uri: str, - import_mod: list[types.ModuleType], - vdb_conf_path: dict): + import_mod: list[types.ModuleType]): # We're going to use this DF to both provide values to the mocked Tritonclient, # but also to verify the values in the Milvus collection. expected_values_df = dataset["service/milvus_rss_data.json"] + # As page_content is used by other pipelines, we're just renaming it to content. expected_values_df = expected_values_df.rename(columns={"page_content": "content"}) expected_values_df["source"] = "rss" + vdb_conf_path = os.path.join(TEST_DIRS.tests_data_dir, "examples/llm/vdb_upload/vdb_rss_source_config.yaml") + with open(os.path.join(TEST_DIRS.tests_data_dir, 'service/cisa_web_responses.json'), encoding='utf-8') as fh: web_responses = json.load(fh) _, _, vdb_upload_run_mod, vdb_upload_pipeline_mod = import_mod + # Building final configuration. Here we're passing empty dictionaries for cli configuration. vdb_pipeline_config = vdb_upload_run_mod.build_final_config(vdb_conf_path=vdb_conf_path, cli_source_conf={}, cli_embeddings_conf={}, @@ -136,3 +134,88 @@ def mock_get_fn(url: str): db_emb_row = pd.DataFrame(db_emb[i], dtype=np.float32) expected_emb_row = pd.DataFrame(expected_emb[i], dtype=np.float32) dataset.assert_compare_df(db_emb_row, expected_emb_row) + + +@pytest.mark.milvus +@pytest.mark.use_python +@pytest.mark.use_pandas +@pytest.mark.import_mod([ + os.path.join(TEST_DIRS.examples_dir, 'llm/common'), + os.path.join(TEST_DIRS.examples_dir, 'llm/vdb_upload/helper.py'), + os.path.join(TEST_DIRS.examples_dir, 'llm/vdb_upload/run.py'), + os.path.join(TEST_DIRS.examples_dir, 'llm/vdb_upload/pipeline.py')] +) +@mock.patch('tritonclient.grpc.InferenceServerClient') +def test_vdb_upload_file_source_pipe(mock_triton_client: mock.MagicMock, + dataset: DatasetManager, + milvus_server_uri: str, + import_mod: list[types.ModuleType]): + + expected_values_df = dataset["examples/llm/vdb_upload/test_data_output.json"] + print(expected_values_df) + vdb_conf_path = os.path.join(TEST_DIRS.tests_data_dir, "examples/llm/vdb_upload/vdb_file_source_config.yaml") + + _, _, vdb_upload_run_mod, vdb_upload_pipeline_mod = import_mod + + # # Building final configuration. Here we're passing empty dictionaries for cli configuration. + vdb_pipeline_config = vdb_upload_run_mod.build_final_config(vdb_conf_path=vdb_conf_path, + cli_source_conf={}, + cli_embeddings_conf={}, + cli_pipeline_conf={}, + cli_tokenizer_conf={}, + cli_vdb_conf={}) + + config: Config = vdb_pipeline_config["pipeline_config"] + + vdb_pipeline_config["vdb_config"]["uri"] = milvus_server_uri + collection_name = vdb_pipeline_config["vdb_config"]["resource_name"] + + # Mock Triton results + mock_metadata = { + "inputs": [{ + "name": "input_ids", "datatype": "INT32", "shape": [-1, config.feature_length] + }, { + "name": "attention_mask", "datatype": "INT32", "shape": [-1, config.feature_length] + }], + "outputs": [{ + "name": "output", "datatype": "FP32", "shape": [-1, len(config.class_labels)] + }] + } + mock_model_config = {"config": {"max_batch_size": 256}} + + mock_triton_client.return_value = mock_triton_client + mock_triton_client.is_server_live.return_value = True + mock_triton_client.is_server_ready.return_value = True + mock_triton_client.is_model_ready.return_value = True + mock_triton_client.get_model_metadata.return_value = mock_metadata + mock_triton_client.get_model_config.return_value = mock_model_config + + mock_result_values = expected_values_df['embedding'].to_list() + inf_results = np.split(mock_result_values, + range(config.model_max_batch_size, len(mock_result_values), config.model_max_batch_size)) + + # The triton client is going to perform a logits function, calculate the inverse of it here + inf_results = [np.log((1.0 / x) - 1.0) * -1 for x in inf_results] + + async_infer = mk_async_infer(inf_results) + mock_triton_client.async_infer.side_effect = async_infer + + vdb_upload_pipeline_mod.pipeline(**vdb_pipeline_config) + + milvus_service = MilvusVectorDBService(uri=milvus_server_uri) + resource_service = milvus_service.load_resource(name=collection_name) + + assert resource_service.count() == len(expected_values_df) + + db_results = resource_service.query("", offset=0, limit=resource_service.count()) + db_df = pd.DataFrame(sorted(db_results, key=lambda k: k['id'])) + + # The comparison function performs rounding on the values, but is unable to do so for array columns + dataset.assert_compare_df(db_df, expected_values_df[db_df.columns], exclude_columns=['id', 'embedding']) + db_emb = db_df['embedding'] + expected_emb = expected_values_df['embedding'] + + for i in range(resource_service.count()): + db_emb_row = pd.DataFrame(db_emb[i], dtype=np.float32) + expected_emb_row = pd.DataFrame(expected_emb[i], dtype=np.float32) + dataset.assert_compare_df(db_emb_row, expected_emb_row)@pytest.mark.milvus \ No newline at end of file diff --git a/tests/tests_data/examples/llm/vdb_upload/test_data.csv b/tests/tests_data/examples/llm/vdb_upload/test_data.csv new file mode 100755 index 0000000000..467e516886 --- /dev/null +++ b/tests/tests_data/examples/llm/vdb_upload/test_data.csv @@ -0,0 +1,3 @@ +version https://git-lfs.github.com/spec/v1 +oid sha256:a2fc74ea761a42ae0230c12289bcd459cd5facda05504afe691cfcd4ecce443b +size 1955 diff --git a/tests/tests_data/examples/llm/vdb_upload/test_data_output.json b/tests/tests_data/examples/llm/vdb_upload/test_data_output.json new file mode 100755 index 0000000000..413b7e9afa --- /dev/null +++ b/tests/tests_data/examples/llm/vdb_upload/test_data_output.json @@ -0,0 +1,3 @@ +version https://git-lfs.github.com/spec/v1 +oid sha256:9cea89f494ce1a8006f59f45bbd577c538b7eb3c2688933f9153fa9181643a8a +size 26299 diff --git a/tests/tests_data/examples/llm/vdb_upload/vdb_file_source_config.yaml b/tests/tests_data/examples/llm/vdb_upload/vdb_file_source_config.yaml new file mode 100755 index 0000000000..979151c391 --- /dev/null +++ b/tests/tests_data/examples/llm/vdb_upload/vdb_file_source_config.yaml @@ -0,0 +1,3 @@ +version https://git-lfs.github.com/spec/v1 +oid sha256:6e2e4eef01af2c9931fc1f0473a10a175da2b7b76903895bdbb70fb45d408b03 +size 4031 diff --git a/tests/tests_data/examples/llm/vdb_upload/vdb_rss_source_config.yaml b/tests/tests_data/examples/llm/vdb_upload/vdb_rss_source_config.yaml new file mode 100755 index 0000000000..20dec760ed --- /dev/null +++ b/tests/tests_data/examples/llm/vdb_upload/vdb_rss_source_config.yaml @@ -0,0 +1,3 @@ +version https://git-lfs.github.com/spec/v1 +oid sha256:cd4874c6695f9807c8456f51604307ec05ece1671969d5eecefe55618f200765 +size 3560 From 9f3e542549cedee49ca7bb4f0ae83835d8a584b7 Mon Sep 17 00:00:00 2001 From: Bhargav Suryadevara Date: Thu, 18 Jan 2024 12:15:17 -0800 Subject: [PATCH 4/5] Added vdb_upload pipeline test --- tests/llm/test_vdb_upload_pipe.py | 168 +++++------------- .../llm/vdb_upload/test_data_output.json | 4 +- .../examples/llm/vdb_upload/vdb_config.yaml | 3 - .../vdb_upload/vdb_file_source_config.yaml | 4 +- 4 files changed, 53 insertions(+), 126 deletions(-) mode change 100644 => 100755 tests/llm/test_vdb_upload_pipe.py delete mode 100644 tests/tests_data/examples/llm/vdb_upload/vdb_config.yaml diff --git a/tests/llm/test_vdb_upload_pipe.py b/tests/llm/test_vdb_upload_pipe.py old mode 100644 new mode 100755 index 34dd71fbfd..1328daac39 --- a/tests/llm/test_vdb_upload_pipe.py +++ b/tests/llm/test_vdb_upload_pipe.py @@ -18,7 +18,6 @@ import os import types from unittest import mock -from morpheus.config import Config import numpy as np import pandas as pd @@ -27,9 +26,10 @@ from _utils import TEST_DIRS from _utils import mk_async_infer from _utils.dataset_manager import DatasetManager +from morpheus.config import Config from morpheus.service.vdb.milvus_vector_db_service import MilvusVectorDBService - + @pytest.mark.milvus @pytest.mark.use_python @pytest.mark.use_pandas @@ -37,136 +37,66 @@ os.path.join(TEST_DIRS.examples_dir, 'llm/common'), os.path.join(TEST_DIRS.examples_dir, 'llm/vdb_upload/helper.py'), os.path.join(TEST_DIRS.examples_dir, 'llm/vdb_upload/run.py'), - os.path.join(TEST_DIRS.examples_dir, 'llm/vdb_upload/pipeline.py')] -) + os.path.join(TEST_DIRS.examples_dir, 'llm/vdb_upload/pipeline.py') +]) @mock.patch('requests.Session') @mock.patch('tritonclient.grpc.InferenceServerClient') -def test_vdb_upload_rss_source_pipe(mock_triton_client: mock.MagicMock, - mock_requests_session: mock.MagicMock, - dataset: DatasetManager, - milvus_server_uri: str, - import_mod: list[types.ModuleType]): +@pytest.mark.parametrize('is_rss_source, exclude_columns, expected_output_path, vdb_conf_file', + [(True, ['id', 'embedding', 'source'], + 'service/milvus_rss_data.json', + 'examples/llm/vdb_upload/vdb_rss_source_config.yaml'), + (False, ['id', 'embedding'], + 'examples/llm/vdb_upload/test_data_output.json', + 'examples/llm/vdb_upload/vdb_file_source_config.yaml')]) +def test_vdb_upload_pipe(mock_triton_client: mock.MagicMock, + mock_requests_session: mock.MagicMock, + dataset: DatasetManager, + milvus_server_uri: str, + import_mod: list[types.ModuleType], + is_rss_source: str, + exclude_columns: list[str], + expected_output_path: str, + vdb_conf_file: str): + # We're going to use this DF to both provide values to the mocked Tritonclient, # but also to verify the values in the Milvus collection. - expected_values_df = dataset["service/milvus_rss_data.json"] - # As page_content is used by other pipelines, we're just renaming it to content. - expected_values_df = expected_values_df.rename(columns={"page_content": "content"}) - expected_values_df["source"] = "rss" - - vdb_conf_path = os.path.join(TEST_DIRS.tests_data_dir, "examples/llm/vdb_upload/vdb_rss_source_config.yaml") - - with open(os.path.join(TEST_DIRS.tests_data_dir, 'service/cisa_web_responses.json'), encoding='utf-8') as fh: - web_responses = json.load(fh) - - _, _, vdb_upload_run_mod, vdb_upload_pipeline_mod = import_mod - - # Building final configuration. Here we're passing empty dictionaries for cli configuration. - vdb_pipeline_config = vdb_upload_run_mod.build_final_config(vdb_conf_path=vdb_conf_path, - cli_source_conf={}, - cli_embeddings_conf={}, - cli_pipeline_conf={}, - cli_tokenizer_conf={}, - cli_vdb_conf={}) - - config: Config = vdb_pipeline_config["pipeline_config"] - - vdb_pipeline_config["vdb_config"]["uri"] = milvus_server_uri - collection_name = vdb_pipeline_config["vdb_config"]["resource_name"] + expected_values_df = dataset[expected_output_path] - # Mock Triton results - mock_metadata = { - "inputs": [{ - "name": "input_ids", "datatype": "INT32", "shape": [-1, config.feature_length] - }, { - "name": "attention_mask", "datatype": "INT32", "shape": [-1, config.feature_length] - }], - "outputs": [{ - "name": "output", "datatype": "FP32", "shape": [-1, len(config.class_labels)] - }] - } - mock_model_config = {"config": {"max_batch_size": 256}} + if is_rss_source: + with open(os.path.join(TEST_DIRS.tests_data_dir, 'service/cisa_web_responses.json'), encoding='utf-8') as fh: + web_responses = json.load(fh) - mock_triton_client.return_value = mock_triton_client - mock_triton_client.is_server_live.return_value = True - mock_triton_client.is_server_ready.return_value = True - mock_triton_client.is_model_ready.return_value = True - mock_triton_client.get_model_metadata.return_value = mock_metadata - mock_triton_client.get_model_config.return_value = mock_model_config + # Mock requests, since we are feeding the RSSSourceStage with a local file it won't be using the + # requests lib, only web_scraper_stage.py will use it. + def mock_get_fn(url: str): + mock_response = mock.MagicMock() + mock_response.ok = True + mock_response.status_code = 200 + mock_response.text = web_responses[url] + return mock_response - mock_result_values = expected_values_df['embedding'].to_list() - inf_results = np.split(mock_result_values, - range(config.model_max_batch_size, len(mock_result_values), config.model_max_batch_size)) + mock_requests_session.return_value = mock_requests_session + mock_requests_session.get.side_effect = mock_get_fn - # The triton client is going to perform a logits function, calculate the inverse of it here - inf_results = [np.log((1.0 / x) - 1.0) * -1 for x in inf_results] + # As page_content is used by other pipelines, we're just renaming it to content. + expected_values_df = expected_values_df.rename(columns={"page_content": "content"}) + expected_values_df["source"] = "rss" - async_infer = mk_async_infer(inf_results) - mock_triton_client.async_infer.side_effect = async_infer - - # Mock requests, since we are feeding the RSSSourceStage with a local file it won't be using the - # requests lib, only web_scraper_stage.py will use it. - def mock_get_fn(url: str): - mock_response = mock.MagicMock() - mock_response.ok = True - mock_response.status_code = 200 - mock_response.text = web_responses[url] - return mock_response - - mock_requests_session.return_value = mock_requests_session - mock_requests_session.get.side_effect = mock_get_fn - - vdb_upload_pipeline_mod.pipeline(**vdb_pipeline_config) - - milvus_service = MilvusVectorDBService(uri=milvus_server_uri) - resource_service = milvus_service.load_resource(name=collection_name) - - assert resource_service.count() == len(expected_values_df) - - db_results = resource_service.query("", offset=0, limit=resource_service.count()) - db_df = pd.DataFrame(sorted(db_results, key=lambda k: k['id'])) + vdb_conf_path = os.path.join(TEST_DIRS.tests_data_dir, vdb_conf_file) - # The comparison function performs rounding on the values, but is unable to do so for array columns - dataset.assert_compare_df(db_df, expected_values_df[db_df.columns], exclude_columns=['id', 'source', 'embedding']) - db_emb = db_df['embedding'] - expected_emb = expected_values_df['embedding'] - - for i in range(resource_service.count()): - db_emb_row = pd.DataFrame(db_emb[i], dtype=np.float32) - expected_emb_row = pd.DataFrame(expected_emb[i], dtype=np.float32) - dataset.assert_compare_df(db_emb_row, expected_emb_row) - - -@pytest.mark.milvus -@pytest.mark.use_python -@pytest.mark.use_pandas -@pytest.mark.import_mod([ - os.path.join(TEST_DIRS.examples_dir, 'llm/common'), - os.path.join(TEST_DIRS.examples_dir, 'llm/vdb_upload/helper.py'), - os.path.join(TEST_DIRS.examples_dir, 'llm/vdb_upload/run.py'), - os.path.join(TEST_DIRS.examples_dir, 'llm/vdb_upload/pipeline.py')] -) -@mock.patch('tritonclient.grpc.InferenceServerClient') -def test_vdb_upload_file_source_pipe(mock_triton_client: mock.MagicMock, - dataset: DatasetManager, - milvus_server_uri: str, - import_mod: list[types.ModuleType]): - - expected_values_df = dataset["examples/llm/vdb_upload/test_data_output.json"] - print(expected_values_df) - vdb_conf_path = os.path.join(TEST_DIRS.tests_data_dir, "examples/llm/vdb_upload/vdb_file_source_config.yaml") - _, _, vdb_upload_run_mod, vdb_upload_pipeline_mod = import_mod - # # Building final configuration. Here we're passing empty dictionaries for cli configuration. - vdb_pipeline_config = vdb_upload_run_mod.build_final_config(vdb_conf_path=vdb_conf_path, - cli_source_conf={}, - cli_embeddings_conf={}, - cli_pipeline_conf={}, + # Building final configuration. Here we're passing empty dictionaries for cli configuration. + vdb_pipeline_config = vdb_upload_run_mod.build_final_config(vdb_conf_path=vdb_conf_path, + cli_source_conf={}, + cli_embeddings_conf={}, + cli_pipeline_conf={}, cli_tokenizer_conf={}, cli_vdb_conf={}) config: Config = vdb_pipeline_config["pipeline_config"] + # Overwriting uri provided in the config file with milvus_server_uri vdb_pipeline_config["vdb_config"]["uri"] = milvus_server_uri collection_name = vdb_pipeline_config["vdb_config"]["resource_name"] @@ -199,7 +129,7 @@ def test_vdb_upload_file_source_pipe(mock_triton_client: mock.MagicMock, async_infer = mk_async_infer(inf_results) mock_triton_client.async_infer.side_effect = async_infer - + vdb_upload_pipeline_mod.pipeline(**vdb_pipeline_config) milvus_service = MilvusVectorDBService(uri=milvus_server_uri) @@ -209,13 +139,13 @@ def test_vdb_upload_file_source_pipe(mock_triton_client: mock.MagicMock, db_results = resource_service.query("", offset=0, limit=resource_service.count()) db_df = pd.DataFrame(sorted(db_results, key=lambda k: k['id'])) - + # The comparison function performs rounding on the values, but is unable to do so for array columns - dataset.assert_compare_df(db_df, expected_values_df[db_df.columns], exclude_columns=['id', 'embedding']) + dataset.assert_compare_df(db_df, expected_values_df[db_df.columns], exclude_columns=exclude_columns) db_emb = db_df['embedding'] expected_emb = expected_values_df['embedding'] for i in range(resource_service.count()): db_emb_row = pd.DataFrame(db_emb[i], dtype=np.float32) expected_emb_row = pd.DataFrame(expected_emb[i], dtype=np.float32) - dataset.assert_compare_df(db_emb_row, expected_emb_row)@pytest.mark.milvus \ No newline at end of file + dataset.assert_compare_df(db_emb_row, expected_emb_row) diff --git a/tests/tests_data/examples/llm/vdb_upload/test_data_output.json b/tests/tests_data/examples/llm/vdb_upload/test_data_output.json index 413b7e9afa..a470af6136 100755 --- a/tests/tests_data/examples/llm/vdb_upload/test_data_output.json +++ b/tests/tests_data/examples/llm/vdb_upload/test_data_output.json @@ -1,3 +1,3 @@ version https://git-lfs.github.com/spec/v1 -oid sha256:9cea89f494ce1a8006f59f45bbd577c538b7eb3c2688933f9153fa9181643a8a -size 26299 +oid sha256:73946921a9863832557a3d7e6e7891b1546e243d28d3a6078405dcd6649d971c +size 26418 diff --git a/tests/tests_data/examples/llm/vdb_upload/vdb_config.yaml b/tests/tests_data/examples/llm/vdb_upload/vdb_config.yaml deleted file mode 100644 index 20dec760ed..0000000000 --- a/tests/tests_data/examples/llm/vdb_upload/vdb_config.yaml +++ /dev/null @@ -1,3 +0,0 @@ -version https://git-lfs.github.com/spec/v1 -oid sha256:cd4874c6695f9807c8456f51604307ec05ece1671969d5eecefe55618f200765 -size 3560 diff --git a/tests/tests_data/examples/llm/vdb_upload/vdb_file_source_config.yaml b/tests/tests_data/examples/llm/vdb_upload/vdb_file_source_config.yaml index 979151c391..ce2fa54251 100755 --- a/tests/tests_data/examples/llm/vdb_upload/vdb_file_source_config.yaml +++ b/tests/tests_data/examples/llm/vdb_upload/vdb_file_source_config.yaml @@ -1,3 +1,3 @@ version https://git-lfs.github.com/spec/v1 -oid sha256:6e2e4eef01af2c9931fc1f0473a10a175da2b7b76903895bdbb70fb45d408b03 -size 4031 +oid sha256:17cbbf6dea2146e22b64b5a4a87d5c88d2c5abb5629c521499ead15aa9570d55 +size 3411 From 49c755d756cb2c1d51276ddf5efaf37eaa3b18d4 Mon Sep 17 00:00:00 2001 From: Bhargav Suryadevara Date: Mon, 22 Jan 2024 08:29:33 -0800 Subject: [PATCH 5/5] Added multile column embedding support --- examples/llm/common/content_extractor_module.py | 17 ++++++++--------- morpheus/modules/output/write_to_vector_db.py | 11 +++++++---- .../llm/vdb_upload/test_data_output.json | 4 ++-- .../llm/vdb_upload/vdb_file_source_config.yaml | 4 ++-- 4 files changed, 19 insertions(+), 17 deletions(-) diff --git a/examples/llm/common/content_extractor_module.py b/examples/llm/common/content_extractor_module.py index aa257e10a0..c6dd8a0a17 100644 --- a/examples/llm/common/content_extractor_module.py +++ b/examples/llm/common/content_extractor_module.py @@ -89,21 +89,20 @@ def convert(self, file_path = [file_path] docs: list[Document] = [] - if meta is None: - text_column_name = "content" - else: - text_column_name = meta.get("csv", {}).get("text_column_name", "content") + text_column_names = {"content"} + + if meta is not None: + text_column_names = set(meta.get("csv", {}).get("text_column_names", text_column_names)) for path in file_path: df = pd.read_csv(path, encoding=encoding) - if len(df.columns) == 0 or (text_column_name not in df.columns): + if len(df.columns) == 0 or (not text_column_names.issubset(set(df.columns))): raise ValueError("The CSV file must either include a 'content' column or have a " - "column specified in the meta configuraton with key 'text_column_name'.") + "columns specified in the meta configuraton with key 'text_column_names'.") df.fillna(value="", inplace=True) - df[text_column_name] = df[text_column_name].apply(lambda x: x.strip()) + df["content"] = df[text_column_names].apply(lambda x: ' '.join(map(str, x)), axis=1) - df = df.rename(columns={text_column_name: "content"}) docs_dicts = df.to_dict(orient="records") for dictionary in docs_dicts: @@ -195,7 +194,7 @@ def process_content(docs: list[Document], file_meta: FileMeta, chunk_size: int, class CSVConverterParamContract(BaseModel): chunk_size: int = 1024 - text_column_name: str = "raw" + text_column_names: list[str] = Field(default_factory=["raw"]) chunk_overlap: int = 102 # Example default value diff --git a/morpheus/modules/output/write_to_vector_db.py b/morpheus/modules/output/write_to_vector_db.py index 11c36ed0ce..e052b62aa6 100644 --- a/morpheus/modules/output/write_to_vector_db.py +++ b/morpheus/modules/output/write_to_vector_db.py @@ -144,10 +144,13 @@ def on_completed(): # Pushing remaining messages for key, accum_stats in accumulator_dict.items(): - if accum_stats.data: - merged_df = cudf.concat(accum_stats.data) - service.insert_dataframe(name=key, df=merged_df) - final_df_references.append(accum_stats.data) + try: + if accum_stats.data: + merged_df = cudf.concat(accum_stats.data) + service.insert_dataframe(name=key, df=merged_df) + final_df_references.append(accum_stats.data) + except Exception as e: + logger.error(f"Unable to upload dataframe entries to vector database: {e}") # Close vector database service connection service.close() diff --git a/tests/tests_data/examples/llm/vdb_upload/test_data_output.json b/tests/tests_data/examples/llm/vdb_upload/test_data_output.json index a470af6136..754bb5d0c1 100755 --- a/tests/tests_data/examples/llm/vdb_upload/test_data_output.json +++ b/tests/tests_data/examples/llm/vdb_upload/test_data_output.json @@ -1,3 +1,3 @@ version https://git-lfs.github.com/spec/v1 -oid sha256:73946921a9863832557a3d7e6e7891b1546e243d28d3a6078405dcd6649d971c -size 26418 +oid sha256:624aee8ddf26ca20cb70601616a343d4b1d01fbfe1938013bfeac5a80e6f40a0 +size 26436 diff --git a/tests/tests_data/examples/llm/vdb_upload/vdb_file_source_config.yaml b/tests/tests_data/examples/llm/vdb_upload/vdb_file_source_config.yaml index ce2fa54251..3d0b536c8d 100755 --- a/tests/tests_data/examples/llm/vdb_upload/vdb_file_source_config.yaml +++ b/tests/tests_data/examples/llm/vdb_upload/vdb_file_source_config.yaml @@ -1,3 +1,3 @@ version https://git-lfs.github.com/spec/v1 -oid sha256:17cbbf6dea2146e22b64b5a4a87d5c88d2c5abb5629c521499ead15aa9570d55 -size 3411 +oid sha256:64cee4c9d168c0248363fbf6bf91c56dea5228f73aba9e33e41d396b820a0136 +size 3456