From 1f75c796dfbd6b9f66659fba11706d7c8628098a Mon Sep 17 00:00:00 2001 From: Bhargav Suryadevara Date: Mon, 29 Jan 2024 12:24:07 -0800 Subject: [PATCH 1/4] Removed threadpool executor --- .../llm/common/content_extractor_module.py | 72 ++++++++----------- .../llm/vdb_upload/module/file_source_pipe.py | 5 +- .../schemas/file_source_pipe_schema.py | 1 - examples/llm/vdb_upload/vdb_config.yaml | 1 - .../examples/llm/content_extractor_schema.py | 2 - 5 files changed, 31 insertions(+), 50 deletions(-) diff --git a/examples/llm/common/content_extractor_module.py b/examples/llm/common/content_extractor_module.py index d0338d24ab..6a0ffdca2f 100644 --- a/examples/llm/common/content_extractor_module.py +++ b/examples/llm/common/content_extractor_module.py @@ -212,8 +212,6 @@ def file_content_extractor(builder: mrc.Builder): Notes ----- The `module_config` should contain: - - 'batch_size': int, the number of files to process in parallel. - - 'num_threads': int, the number of threads to use for parallel file reading. - 'chunk_size' : int, size of each chunk of document. - 'chunk_overlap' : int, overlap between consecutive chunks. - 'converters_meta' : dict, converters configuration. @@ -223,8 +221,8 @@ def file_content_extractor(builder: mrc.Builder): Example `module_config` ----------------------- { - "batch_size": 32, - "num_threads": 10 + "chunk_size": 516, + "chunk_overlap": 10 } """ module_config = builder.get_current_module_config() @@ -239,8 +237,6 @@ def file_content_extractor(builder: mrc.Builder): raise ValueError(log_error_message) # Use validated configurations - batch_size = extractor_config.batch_size - num_threads = extractor_config.num_threads chunk_size = extractor_config.chunk_size chunk_overlap = extractor_config.chunk_overlap converters_meta = extractor_config.converters_meta @@ -264,43 +260,33 @@ def parse_files(open_files: typing.List[fsspec.core.OpenFile]) -> MessageMeta: data = [] _fs = fsspec.filesystem(protocol='file') - with ThreadPoolExecutor(max_workers=num_threads) as executor: - for i in range(0, len(open_files), batch_size): - batch = open_files[i:i + batch_size] - futures = [] - files_meta = [] - - for open_file in batch: - # Check if file exists - if (not _fs.exists(open_file.path)): - logger.warning(f"File does not exist: {open_file.path}. Skipping...") - continue - - if (_fs.isdir(open_file.path)): - logger.warning(f"File is a directory: {open_file.path}. Skipping...") - continue - - try: - file_meta: FileMeta = get_file_meta(open_file=open_file) - converter = converters.get(file_meta.file_type, TextConverter()) - futures.append(executor.submit(converter.convert, file_meta.file_path, converters_meta)) - files_meta.append(file_meta) - - except Exception as e: - logger.error(f"Error processing file {open_file.path}: {e}") - - for file_meta, future in zip(files_meta, futures): - docs = future.result() - if docs: - # Get chunk params for the file type, default to txt - file_type_chunk_params = chunk_params[ - file_meta.file_type] if file_meta.file_type in chunk_params else chunk_params['txt'] - result = process_content(docs, - file_meta, - file_type_chunk_params["chunk_size"], - file_type_chunk_params["chunk_overlap"]) - if result: - data.extend(result) + for open_file in open_files: + # Check if file exists + if not _fs.exists(open_file.path): + logger.warning(f"File does not exist: {open_file.path}. Skipping...") + continue + + if _fs.isdir(open_file.path): + logger.warning(f"File is a directory: {open_file.path}. Skipping...") + continue + + try: + file_meta: FileMeta = get_file_meta(open_file=open_file) + converter = converters.get(file_meta.file_type, TextConverter()) + docs = converter.convert(file_meta.file_path, converters_meta) + + if docs: + # Get chunk params for the file type, default to txt + file_type_chunk_params = chunk_params.get(file_meta.file_type, chunk_params['txt']) + result = process_content(docs, + file_meta, + file_type_chunk_params["chunk_size"], + file_type_chunk_params["chunk_overlap"]) + if result: + data.extend(result) + + except Exception as e: + logger.error(f"Error processing file {open_file.path}: {e}") df_final = pd.DataFrame(data) diff --git a/examples/llm/vdb_upload/module/file_source_pipe.py b/examples/llm/vdb_upload/module/file_source_pipe.py index 2a93676eaf..0cc427cdf8 100644 --- a/examples/llm/vdb_upload/module/file_source_pipe.py +++ b/examples/llm/vdb_upload/module/file_source_pipe.py @@ -62,7 +62,6 @@ def _file_source_pipe(builder: mrc.Builder): - **enable_monitor**: Boolean to enable monitoring for this module. - **extractor_config**: Configuration for the file content extractor module. - **chunk_size**: Size of chunks for the extractor. - - **num_threads**: Number of threads for file content extraction. - **filenames**: List of file paths to be processed. - **watch**: Boolean to watch for file changes. @@ -95,8 +94,8 @@ def _file_source_pipe(builder: mrc.Builder): # Configure and load the file content extractor module file_content_extractor_config = { - "batch_size": validated_config.batch_size, - "num_threads": validated_config.num_threads, + "chunk_size": validated_config.chunk_size, + "chunk_overlap": validated_config.chunk_overlap, "converters_meta": validated_config.converters_meta } extractor_loader = ContentExtractorLoaderFactory.get_instance("file_content_extractor", diff --git a/examples/llm/vdb_upload/schemas/file_source_pipe_schema.py b/examples/llm/vdb_upload/schemas/file_source_pipe_schema.py index 5bd8789c00..cfafda8aa8 100644 --- a/examples/llm/vdb_upload/schemas/file_source_pipe_schema.py +++ b/examples/llm/vdb_upload/schemas/file_source_pipe_schema.py @@ -32,7 +32,6 @@ class FileSourcePipeSchema(BaseModel): enable_monitor: bool = False extractor_config: Optional[Dict[Any, Any]] = {} # Flexible dictionary for extractor configuration filenames: List[str] = Field(default_factory=list) # List of file paths - num_threads: int = 1 # Number of threads for processing vdb_resource_name: str watch: bool = False # Flag to watch file changes watch_interval: float = -5.0 # Interval to watch file changes diff --git a/examples/llm/vdb_upload/vdb_config.yaml b/examples/llm/vdb_upload/vdb_config.yaml index 8ba3f5c22d..32d2a97bc7 100644 --- a/examples/llm/vdb_upload/vdb_config.yaml +++ b/examples/llm/vdb_upload/vdb_config.yaml @@ -88,7 +88,6 @@ vdb_pipeline: batch_size: 1024 extractor_config: chunk_size: 512 - num_threads: 10 chunk_overlap: 51 enable_monitor: True filenames: diff --git a/morpheus/modules/schemas/examples/llm/content_extractor_schema.py b/morpheus/modules/schemas/examples/llm/content_extractor_schema.py index 03878aee46..2985ecbc31 100644 --- a/morpheus/modules/schemas/examples/llm/content_extractor_schema.py +++ b/morpheus/modules/schemas/examples/llm/content_extractor_schema.py @@ -33,11 +33,9 @@ class Config: class ContentExtractorSchema(BaseModel): - batch_size: int = 32 chunk_overlap: int = 51 chunk_size: int = 512 converters_meta: Dict[str, Dict] = Field(default_factory=dict) - num_threads: int = 10 @validator('converters_meta', pre=True) def validate_converters_meta(cls, v): From 6c854bc2a4c5b96096a7362ba838d4ef7114034f Mon Sep 17 00:00:00 2001 From: Bhargav Suryadevara Date: Tue, 30 Jan 2024 08:17:08 -0800 Subject: [PATCH 2/4] Updated schema tranform module --- .../llm/common/content_extractor_module.py | 74 +++++++++++-------- .../llm/vdb_upload/module/file_source_pipe.py | 3 + .../llm/vdb_upload/module/schema_transform.py | 20 +++-- .../schemas/file_source_pipe_schema.py | 1 + .../examples/llm/content_extractor_schema.py | 2 + 5 files changed, 64 insertions(+), 36 deletions(-) diff --git a/examples/llm/common/content_extractor_module.py b/examples/llm/common/content_extractor_module.py index 6a0ffdca2f..5b9d89e929 100644 --- a/examples/llm/common/content_extractor_module.py +++ b/examples/llm/common/content_extractor_module.py @@ -212,6 +212,8 @@ def file_content_extractor(builder: mrc.Builder): Notes ----- The `module_config` should contain: + - 'batch_size': int, the number of files to process in parallel. + - 'num_threads': int, the number of threads to use for parallel file reading. - 'chunk_size' : int, size of each chunk of document. - 'chunk_overlap' : int, overlap between consecutive chunks. - 'converters_meta' : dict, converters configuration. @@ -221,8 +223,8 @@ def file_content_extractor(builder: mrc.Builder): Example `module_config` ----------------------- { - "chunk_size": 516, - "chunk_overlap": 10 + "batch_size": 32, + "num_threads": 10 } """ module_config = builder.get_current_module_config() @@ -237,6 +239,8 @@ def file_content_extractor(builder: mrc.Builder): raise ValueError(log_error_message) # Use validated configurations + batch_size = extractor_config.batch_size + num_threads = extractor_config.num_threads chunk_size = extractor_config.chunk_size chunk_overlap = extractor_config.chunk_overlap converters_meta = extractor_config.converters_meta @@ -260,33 +264,43 @@ def parse_files(open_files: typing.List[fsspec.core.OpenFile]) -> MessageMeta: data = [] _fs = fsspec.filesystem(protocol='file') - for open_file in open_files: - # Check if file exists - if not _fs.exists(open_file.path): - logger.warning(f"File does not exist: {open_file.path}. Skipping...") - continue - - if _fs.isdir(open_file.path): - logger.warning(f"File is a directory: {open_file.path}. Skipping...") - continue - - try: - file_meta: FileMeta = get_file_meta(open_file=open_file) - converter = converters.get(file_meta.file_type, TextConverter()) - docs = converter.convert(file_meta.file_path, converters_meta) - - if docs: - # Get chunk params for the file type, default to txt - file_type_chunk_params = chunk_params.get(file_meta.file_type, chunk_params['txt']) - result = process_content(docs, - file_meta, - file_type_chunk_params["chunk_size"], - file_type_chunk_params["chunk_overlap"]) - if result: - data.extend(result) - - except Exception as e: - logger.error(f"Error processing file {open_file.path}: {e}") + with ThreadPoolExecutor(max_workers=num_threads) as executor: + for i in range(0, len(open_files), batch_size): + batch = open_files[i:i + batch_size] + futures = [] + files_meta = [] + + for open_file in batch: + # Check if file exists + if (not _fs.exists(open_file.path)): + logger.warning(f"File does not exist: {open_file.path}. Skipping...") + continue + + if (_fs.isdir(open_file.path)): + logger.warning(f"File is a directory: {open_file.path}. Skipping...") + continue + + try: + file_meta: FileMeta = get_file_meta(open_file=open_file) + converter = converters.get(file_meta.file_type, TextConverter()) + futures.append(executor.submit(converter.convert, file_meta.file_path, converters_meta)) + files_meta.append(file_meta) + + except Exception as e: + logger.error(f"Error processing file {open_file.path}: {e}") + + for file_meta, future in zip(files_meta, futures): + docs = future.result() + if docs: + # Get chunk params for the file type, default to txt + file_type_chunk_params = chunk_params[ + file_meta.file_type] if file_meta.file_type in chunk_params else chunk_params['txt'] + result = process_content(docs, + file_meta, + file_type_chunk_params["chunk_size"], + file_type_chunk_params["chunk_overlap"]) + if result: + data.extend(result) df_final = pd.DataFrame(data) @@ -294,4 +308,4 @@ def parse_files(open_files: typing.List[fsspec.core.OpenFile]) -> MessageMeta: node = builder.make_node("text_extractor", ops.map(parse_files), ops.filter(lambda x: x is not None)) builder.register_module_input("input", node) - builder.register_module_output("output", node) + builder.register_module_output("output", node) \ No newline at end of file diff --git a/examples/llm/vdb_upload/module/file_source_pipe.py b/examples/llm/vdb_upload/module/file_source_pipe.py index 0cc427cdf8..4e74bf8509 100644 --- a/examples/llm/vdb_upload/module/file_source_pipe.py +++ b/examples/llm/vdb_upload/module/file_source_pipe.py @@ -62,6 +62,7 @@ def _file_source_pipe(builder: mrc.Builder): - **enable_monitor**: Boolean to enable monitoring for this module. - **extractor_config**: Configuration for the file content extractor module. - **chunk_size**: Size of chunks for the extractor. + - **num_threads**: Number of threads for file content extraction. - **filenames**: List of file paths to be processed. - **watch**: Boolean to watch for file changes. @@ -94,6 +95,8 @@ def _file_source_pipe(builder: mrc.Builder): # Configure and load the file content extractor module file_content_extractor_config = { + "batch_size": validated_config.batch_size, + "num_threads": validated_config.num_threads, "chunk_size": validated_config.chunk_size, "chunk_overlap": validated_config.chunk_overlap, "converters_meta": validated_config.converters_meta diff --git a/examples/llm/vdb_upload/module/schema_transform.py b/examples/llm/vdb_upload/module/schema_transform.py index 83e061005a..243f850e26 100644 --- a/examples/llm/vdb_upload/module/schema_transform.py +++ b/examples/llm/vdb_upload/module/schema_transform.py @@ -27,8 +27,6 @@ from morpheus.utils.column_info import RenameColumn from morpheus.utils.module_utils import ModuleLoaderFactory from morpheus.utils.module_utils import register_module -from morpheus.utils.nvt.schema_converters import create_and_attach_nvt_workflow -from morpheus.utils.schema_transforms import process_dataframe logger = logging.getLogger(__name__) @@ -78,6 +76,7 @@ def _schema_transform(builder: mrc.Builder): schema_config = validated_config.schema_transform_config source_column_info = [] + preserve_columns = [] for col_name, col_config in schema_config.items(): op_type = col_config.get("op_type") @@ -90,9 +89,10 @@ def _schema_transform(builder: mrc.Builder): source_column_info.append(ColumnInfo(name=col_name, dtype=col_config["dtype"])) else: raise ValueError(f"Unknown op_type '{op_type}' for column '{col_name}'") + + preserve_columns.append(col_name) source_schema = DataFrameInputSchema(column_info=source_column_info) - source_schema = create_and_attach_nvt_workflow(input_schema=source_schema) def do_transform(message: MessageMeta): if (message is None): @@ -101,9 +101,17 @@ def do_transform(message: MessageMeta): with message.mutable_dataframe() as mdf: if (len(mdf) == 0): return None - _df = process_dataframe(mdf, source_schema) - - return MessageMeta(df=cudf.DataFrame(_df)) + + for ci in source_schema.column_info: + try: + mdf[ci.name] = ci._process_column(mdf) + except Exception as exc_info: + logger.exception("Failed to process column '%s'. Dataframe: \n%s", ci.name, mdf, exc_info) + return None + + mdf = mdf[preserve_columns] + + return MessageMeta(df=cudf.DataFrame(mdf)) node = builder.make_node("schema_transform", ops.map(do_transform), ops.filter(lambda x: x is not None)) diff --git a/examples/llm/vdb_upload/schemas/file_source_pipe_schema.py b/examples/llm/vdb_upload/schemas/file_source_pipe_schema.py index cfafda8aa8..5bd8789c00 100644 --- a/examples/llm/vdb_upload/schemas/file_source_pipe_schema.py +++ b/examples/llm/vdb_upload/schemas/file_source_pipe_schema.py @@ -32,6 +32,7 @@ class FileSourcePipeSchema(BaseModel): enable_monitor: bool = False extractor_config: Optional[Dict[Any, Any]] = {} # Flexible dictionary for extractor configuration filenames: List[str] = Field(default_factory=list) # List of file paths + num_threads: int = 1 # Number of threads for processing vdb_resource_name: str watch: bool = False # Flag to watch file changes watch_interval: float = -5.0 # Interval to watch file changes diff --git a/morpheus/modules/schemas/examples/llm/content_extractor_schema.py b/morpheus/modules/schemas/examples/llm/content_extractor_schema.py index 2985ecbc31..1f1ca8ecb9 100644 --- a/morpheus/modules/schemas/examples/llm/content_extractor_schema.py +++ b/morpheus/modules/schemas/examples/llm/content_extractor_schema.py @@ -33,9 +33,11 @@ class Config: class ContentExtractorSchema(BaseModel): + batch_size: int = 32 chunk_overlap: int = 51 chunk_size: int = 512 converters_meta: Dict[str, Dict] = Field(default_factory=dict) + num_threads: 10 @validator('converters_meta', pre=True) def validate_converters_meta(cls, v): From c1bc89529540c66fd9f8d4ceda3d3b961b5ddf56 Mon Sep 17 00:00:00 2001 From: Bhargav Suryadevara Date: Tue, 30 Jan 2024 11:23:08 -0800 Subject: [PATCH 3/4] Moved document processing out of threadpool execution --- .../conda/environments/cuda11.8_examples.yml | 3 +- .../llm/common/content_extractor_module.py | 201 ++++++++++-------- .../llm/vdb_upload/module/schema_transform.py | 6 +- examples/llm/vdb_upload/vdb_config.yaml | 1 + .../examples/llm/content_extractor_schema.py | 2 +- 5 files changed, 119 insertions(+), 94 deletions(-) diff --git a/docker/conda/environments/cuda11.8_examples.yml b/docker/conda/environments/cuda11.8_examples.yml index c1a07c983e..63050fbcdd 100644 --- a/docker/conda/environments/cuda11.8_examples.yml +++ b/docker/conda/environments/cuda11.8_examples.yml @@ -35,6 +35,7 @@ dependencies: - dgl=1.0.2 - dill=0.3.6 - distributed>=2023.1.1 + - python-docx==1.1.0 - huggingface_hub=0.10.1 # work-around for https://github.com/UKPLab/sentence-transformers/issues/1762 - langchain=0.0.190 - libwebp>=1.3.2 # Required for CVE mitigation: https://nvd.nist.gov/vuln/detail/CVE-2023-4863 @@ -59,7 +60,7 @@ dependencies: ####### Pip Dependencies (keep sorted!) ####### - pip: - - farm-haystack[file-conversion] - google-search-results==2.4 - grpcio-status==1.58 # To keep in sync with 1.58 grpcio which is installed for Morpheus - nemollm + - PyMuPDF==1.23.20 diff --git a/examples/llm/common/content_extractor_module.py b/examples/llm/common/content_extractor_module.py index 5b9d89e929..089517cedb 100644 --- a/examples/llm/common/content_extractor_module.py +++ b/examples/llm/common/content_extractor_module.py @@ -12,26 +12,22 @@ # See the License for the specific language governing permissions and # limitations under the License. +import io import logging import os import typing from concurrent.futures import ThreadPoolExecutor from dataclasses import dataclass -from pathlib import Path +import fitz import fsspec import mrc import mrc.core.operators as ops import pandas as pd -from haystack import Document -from haystack.nodes import DocxToTextConverter -from haystack.nodes import PDFToTextConverter -from haystack.nodes import TextConverter -from haystack.nodes.file_converter import BaseConverter +from docx import Document from langchain.text_splitter import RecursiveCharacterTextSplitter from pydantic import ValidationError -from morpheus.common import read_file_to_df from morpheus.messages import MessageMeta from morpheus.modules.schemas.examples.llm.content_extractor_schema import ContentExtractorSchema from morpheus.utils.module_utils import ModuleLoaderFactory @@ -51,74 +47,6 @@ class FileMeta: file_type: str -class CsvTextConverter(BaseConverter): - """ - Converts a CSV file column content to text documents. - """ - - outgoing_edges = 1 - - def convert(self, - file_path: Path | str | list[Path | str], - meta: typing.Optional[dict[str, typing.Any]] = None, - remove_numeric_tables: typing.Optional[bool] = None, - valid_languages: typing.Optional[list[str]] = None, - encoding: typing.Optional[str] = "UTF-8", - id_hash_keys: typing.Optional[list[str]] = None) -> list[Document]: - """ - Load a CSV file and convert it to Documents. - - Parameters - ---------- - file_path: - Path to the CSV file you want to convert. - meta: - Optional dictionary of metadata key-value pairs that you want to append to the returned document. - encoding: - Optional file encoding format, default: `UTF-8`. - id_hash_keys: - Generates the document ID from a custom list of strings that refer to the document's attributes. - remove_numeric_tables: bool - Removes numeric tables from the csv. - valid_languages: - Valid languages - - Returns - ------- - list[haystack.Document] - List of documents, 1 document per line in the CSV. - """ - if not isinstance(file_path, list): - file_path = [file_path] - - docs: list[Document] = [] - text_column_names = set("content") - - if meta is not None: - text_column_names = set(meta.get("csv", {}).get("text_column_names", text_column_names)) - - for path in file_path: - logger.error("Processing file: %s", path) - df = pd.read_csv(path, encoding=encoding) - if len(df.columns) == 0 or (not text_column_names.issubset(set(df.columns))): - raise ValueError("The CSV file must either include a 'content' column or have a " - "columns specified in the meta configuration with key 'text_column_names'.") - - df.fillna(value='', inplace=True) - df["content"] = df[text_column_names].apply(lambda x: ' '.join(map(str, x)), axis=1) - - docs_dicts = df.to_dict(orient="records") - - for dictionary in docs_dicts: - if meta: - dictionary["meta"] = meta - if id_hash_keys: - dictionary["id_hash_keys"] = id_hash_keys - docs.append(Document.from_dict(dictionary)) - - return docs - - def get_file_meta(open_file: fsspec.core.OpenFile) -> FileMeta: """ Extract file metadata from the given open file. @@ -150,14 +78,104 @@ def get_file_meta(open_file: fsspec.core.OpenFile) -> FileMeta: raise -def process_content(docs: list[Document], file_meta: FileMeta, chunk_size: int, chunk_overlap: int) -> list[dict]: +def read_file_to_bytesio(file_path: str) -> io.BytesIO: + """ + Read the content of the file and return it as an io.BytesIO object. + + Parameters + ---------- + file_path: str + Path to the file. + + Returns + ------- + io.BytesIO or None + Returns io.BytesIO object if the file is successfully read. Returns + None if there is an error reading the file. + """ + + io_bytes = None + + try: + with open(file_path, 'rb') as file: + io_bytes = io.BytesIO(file.read()) + except FileNotFoundError: + logger.error(f"Error: File not found - {file_path}") + except PermissionError: + logger.error(f"Error: Permission denied - {file_path}") + except Exception as e: + logger.error(f"Error reading file {file_path}: {e}") + + return io_bytes + + +def _pdf_to_text_converter(bytes_io: io.BytesIO, meta: dict = None) -> str: + if isinstance(bytes_io, io.BytesIO): + pdf_document = fitz.open(stream=bytes_io, filetype="pdf") + text = '' + for page_num in range(pdf_document.page_count): + page = pdf_document[page_num] + text += page.get_text() + else: + raise ValueError("Invalid input type. Supported type bytes (io.BytesIO).") + + return text + + +def _docx_to_text_converter(bytes_io: io.BytesIO, meta: dict) -> str: + if isinstance(bytes_io, io.BytesIO): + # New io.BytesIO object and copying the content from the original io.BytesIO object + # to effectively reset cursor position + doc = Document(io.BytesIO(bytes_io.read())) + text = '\n'.join([paragraph.text for paragraph in doc.paragraphs]) + else: + raise ValueError("Invalid input type. Supported type bytes (io.BytesIO).") + + return text + + +def _csv_to_text_converter(bytes_io: io.BytesIO, meta: dict) -> list[str]: + if isinstance(bytes_io, io.BytesIO): + text_column_names = set("content") + + if meta is not None: + text_column_names = set(meta.get("csv", {}).get("text_column_names", text_column_names)) + + df = pd.read_csv(bytes_io) + if len(df.columns) == 0 or (not text_column_names.issubset(set(df.columns))): + raise ValueError("The CSV file must either include a 'content' column or have a " + "columns specified in the meta configuration with key 'text_column_names'.") + + df.fillna(value='', inplace=True) + text_arr = df[text_column_names].apply(lambda x: ' '.join(map(str, x)), axis=1).tolist() + else: + raise ValueError("Invalid input type. Supported type bytes (io.BytesIO).") + + return text_arr + + +def _text_converter(io_bytes: io.BytesIO, meta: dict) -> str: + + convertor_conf = meta.get("txt", {}) + encoding = convertor_conf.get("encoding", "utf-8") + + # Ensure the cursor is at the beginning of the stream + io_bytes.seek(0) + + # Decode the bytes to a string using the specified encoding + text = io_bytes.read().decode(encoding) + + return text + + +def process_content(docs: str | list[str], file_meta: FileMeta, chunk_size: int, chunk_overlap: int) -> list[dict]: """ Processes the content of a file and splits it into chunks. Parameters ---------- - docs : list[Document] - List of documents. + docs : str | list[str] + Documents content. file_meta: FileMeta FileMeta parsed information of a file path. chunk_size : int @@ -177,9 +195,12 @@ def process_content(docs: list[Document], file_meta: FileMeta, chunk_size: int, processed_data = [] + if isinstance(docs, str): + docs = [docs] + for document in docs: try: - split_text = text_splitter.split_text(document.content) + split_text = text_splitter.split_text(document) for chunk in split_text: processed_data.append({ @@ -246,10 +267,10 @@ def file_content_extractor(builder: mrc.Builder): converters_meta = extractor_config.converters_meta converters = { - "pdf": PDFToTextConverter(), - "csv": CsvTextConverter(), - "docx": DocxToTextConverter(valid_languages=["de", "en"]), - "txt": TextConverter() + "pdf": _pdf_to_text_converter, + "csv": _csv_to_text_converter, + "docx": _docx_to_text_converter, + "txt": _text_converter } chunk_params = { @@ -282,20 +303,22 @@ def parse_files(open_files: typing.List[fsspec.core.OpenFile]) -> MessageMeta: try: file_meta: FileMeta = get_file_meta(open_file=open_file) - converter = converters.get(file_meta.file_type, TextConverter()) - futures.append(executor.submit(converter.convert, file_meta.file_path, converters_meta)) + futures.append(executor.submit(read_file_to_bytesio, file_meta.file_path)) files_meta.append(file_meta) except Exception as e: logger.error(f"Error processing file {open_file.path}: {e}") for file_meta, future in zip(files_meta, futures): - docs = future.result() - if docs: + io_bytes = future.result() + + if io_bytes: + converter = converters.get(file_meta.file_type, _text_converter) + result = converter(io_bytes, meta=converters_meta) # Get chunk params for the file type, default to txt file_type_chunk_params = chunk_params[ file_meta.file_type] if file_meta.file_type in chunk_params else chunk_params['txt'] - result = process_content(docs, + result = process_content(result, file_meta, file_type_chunk_params["chunk_size"], file_type_chunk_params["chunk_overlap"]) @@ -308,4 +331,4 @@ def parse_files(open_files: typing.List[fsspec.core.OpenFile]) -> MessageMeta: node = builder.make_node("text_extractor", ops.map(parse_files), ops.filter(lambda x: x is not None)) builder.register_module_input("input", node) - builder.register_module_output("output", node) \ No newline at end of file + builder.register_module_output("output", node) diff --git a/examples/llm/vdb_upload/module/schema_transform.py b/examples/llm/vdb_upload/module/schema_transform.py index 243f850e26..6cf7405f2f 100644 --- a/examples/llm/vdb_upload/module/schema_transform.py +++ b/examples/llm/vdb_upload/module/schema_transform.py @@ -89,7 +89,7 @@ def _schema_transform(builder: mrc.Builder): source_column_info.append(ColumnInfo(name=col_name, dtype=col_config["dtype"])) else: raise ValueError(f"Unknown op_type '{op_type}' for column '{col_name}'") - + preserve_columns.append(col_name) source_schema = DataFrameInputSchema(column_info=source_column_info) @@ -101,14 +101,14 @@ def do_transform(message: MessageMeta): with message.mutable_dataframe() as mdf: if (len(mdf) == 0): return None - + for ci in source_schema.column_info: try: mdf[ci.name] = ci._process_column(mdf) except Exception as exc_info: logger.exception("Failed to process column '%s'. Dataframe: \n%s", ci.name, mdf, exc_info) return None - + mdf = mdf[preserve_columns] return MessageMeta(df=cudf.DataFrame(mdf)) diff --git a/examples/llm/vdb_upload/vdb_config.yaml b/examples/llm/vdb_upload/vdb_config.yaml index 32d2a97bc7..8ba3f5c22d 100644 --- a/examples/llm/vdb_upload/vdb_config.yaml +++ b/examples/llm/vdb_upload/vdb_config.yaml @@ -88,6 +88,7 @@ vdb_pipeline: batch_size: 1024 extractor_config: chunk_size: 512 + num_threads: 10 chunk_overlap: 51 enable_monitor: True filenames: diff --git a/morpheus/modules/schemas/examples/llm/content_extractor_schema.py b/morpheus/modules/schemas/examples/llm/content_extractor_schema.py index 1f1ca8ecb9..03878aee46 100644 --- a/morpheus/modules/schemas/examples/llm/content_extractor_schema.py +++ b/morpheus/modules/schemas/examples/llm/content_extractor_schema.py @@ -37,7 +37,7 @@ class ContentExtractorSchema(BaseModel): chunk_overlap: int = 51 chunk_size: int = 512 converters_meta: Dict[str, Dict] = Field(default_factory=dict) - num_threads: 10 + num_threads: int = 10 @validator('converters_meta', pre=True) def validate_converters_meta(cls, v): From e40ddbe9c29d612266aa235f8a091d7896037781 Mon Sep 17 00:00:00 2001 From: Bhargav Suryadevara Date: Wed, 31 Jan 2024 13:48:32 -0800 Subject: [PATCH 4/4] Moved document processing out of threadpool execution --- .../llm/common/content_extractor_module.py | 97 ++++++++++--------- 1 file changed, 53 insertions(+), 44 deletions(-) mode change 100644 => 100755 examples/llm/common/content_extractor_module.py diff --git a/examples/llm/common/content_extractor_module.py b/examples/llm/common/content_extractor_module.py old mode 100644 new mode 100755 index 089517cedb..ca89d4a316 --- a/examples/llm/common/content_extractor_module.py +++ b/examples/llm/common/content_extractor_module.py @@ -18,6 +18,7 @@ import typing from concurrent.futures import ThreadPoolExecutor from dataclasses import dataclass +from functools import wraps import fitz import fsspec @@ -47,6 +48,12 @@ class FileMeta: file_type: str +@dataclass +class ConverterInputInfo: + io_bytes: io.BytesIO + meta: dict + + def get_file_meta(open_file: fsspec.core.OpenFile) -> FileMeta: """ Extract file metadata from the given open file. @@ -109,62 +116,63 @@ def read_file_to_bytesio(file_path: str) -> io.BytesIO: return io_bytes -def _pdf_to_text_converter(bytes_io: io.BytesIO, meta: dict = None) -> str: - if isinstance(bytes_io, io.BytesIO): - pdf_document = fitz.open(stream=bytes_io, filetype="pdf") - text = '' - for page_num in range(pdf_document.page_count): - page = pdf_document[page_num] - text += page.get_text() - else: - raise ValueError("Invalid input type. Supported type bytes (io.BytesIO).") - - return text +def _converter_error_handler(func: typing.Callable) -> typing.Callable: + @wraps(func) + def wrapper(input_info: ConverterInputInfo, *args, **kwargs): + try: + # Common logic for instance check + if not isinstance(input_info.io_bytes, io.BytesIO): + raise ValueError("Invalid input type. Supported type: io.BytesIO.") -def _docx_to_text_converter(bytes_io: io.BytesIO, meta: dict) -> str: - if isinstance(bytes_io, io.BytesIO): - # New io.BytesIO object and copying the content from the original io.BytesIO object - # to effectively reset cursor position - doc = Document(io.BytesIO(bytes_io.read())) - text = '\n'.join([paragraph.text for paragraph in doc.paragraphs]) - else: - raise ValueError("Invalid input type. Supported type bytes (io.BytesIO).") + return func(input_info, *args, **kwargs) + except Exception as exec_info: + logger.error(f"Error in {func.__name__}: {exec_info}") + return func.__annotations__.get("return_type", None)() - return text + return wrapper -def _csv_to_text_converter(bytes_io: io.BytesIO, meta: dict) -> list[str]: - if isinstance(bytes_io, io.BytesIO): - text_column_names = set("content") +@_converter_error_handler +def _pdf_to_text_converter(input_info: ConverterInputInfo) -> str: + text = "" + pdf_document = fitz.open(stream=input_info.io_bytes, filetype="pdf") + for page_num in range(pdf_document.page_count): + page = pdf_document[page_num] + text += page.get_text() + return text - if meta is not None: - text_column_names = set(meta.get("csv", {}).get("text_column_names", text_column_names)) - df = pd.read_csv(bytes_io) - if len(df.columns) == 0 or (not text_column_names.issubset(set(df.columns))): - raise ValueError("The CSV file must either include a 'content' column or have a " - "columns specified in the meta configuration with key 'text_column_names'.") +@_converter_error_handler +def _docx_to_text_converter(input_info: ConverterInputInfo) -> str: + text = "" + doc = Document(io.BytesIO(input_info.io_bytes.read())) + text = '\n'.join([paragraph.text for paragraph in doc.paragraphs]) + return text - df.fillna(value='', inplace=True) - text_arr = df[text_column_names].apply(lambda x: ' '.join(map(str, x)), axis=1).tolist() - else: - raise ValueError("Invalid input type. Supported type bytes (io.BytesIO).") +@_converter_error_handler +def _csv_to_text_converter(input_info: ConverterInputInfo) -> list[str]: + text_arr = [] + text_column_names = set("content") + if input_info.meta is not None: + text_column_names = set(input_info.meta.get("csv", {}).get("text_column_names", text_column_names)) + df = pd.read_csv(input_info.io_bytes) + if len(df.columns) == 0 or (not text_column_names.issubset(set(df.columns))): + raise ValueError("The CSV file must either include a 'content' column or have a " + "columns specified in the meta configuration with key 'text_column_names'.") + df.fillna(value='', inplace=True) + text_arr = df[text_column_names].apply(lambda x: ' '.join(map(str, x)), axis=1).tolist() return text_arr -def _text_converter(io_bytes: io.BytesIO, meta: dict) -> str: - - convertor_conf = meta.get("txt", {}) +@_converter_error_handler +def _text_converter(input_info: ConverterInputInfo) -> str: + text = "" + convertor_conf = input_info.meta.get("txt", {}) encoding = convertor_conf.get("encoding", "utf-8") - - # Ensure the cursor is at the beginning of the stream - io_bytes.seek(0) - - # Decode the bytes to a string using the specified encoding - text = io_bytes.read().decode(encoding) - + input_info.io_bytes.seek(0) + text = input_info.io_bytes.read().decode(encoding) return text @@ -314,7 +322,8 @@ def parse_files(open_files: typing.List[fsspec.core.OpenFile]) -> MessageMeta: if io_bytes: converter = converters.get(file_meta.file_type, _text_converter) - result = converter(io_bytes, meta=converters_meta) + input_info = ConverterInputInfo(io_bytes=io_bytes, meta=converters_meta) + result = converter(input_info) # Get chunk params for the file type, default to txt file_type_chunk_params = chunk_params[ file_meta.file_type] if file_meta.file_type in chunk_params else chunk_params['txt']