diff --git a/docker/conda/environments/cuda11.8_examples.yml b/docker/conda/environments/cuda11.8_examples.yml index c1a07c983e..63050fbcdd 100644 --- a/docker/conda/environments/cuda11.8_examples.yml +++ b/docker/conda/environments/cuda11.8_examples.yml @@ -35,6 +35,7 @@ dependencies: - dgl=1.0.2 - dill=0.3.6 - distributed>=2023.1.1 + - python-docx==1.1.0 - huggingface_hub=0.10.1 # work-around for https://github.com/UKPLab/sentence-transformers/issues/1762 - langchain=0.0.190 - libwebp>=1.3.2 # Required for CVE mitigation: https://nvd.nist.gov/vuln/detail/CVE-2023-4863 @@ -59,7 +60,7 @@ dependencies: ####### Pip Dependencies (keep sorted!) ####### - pip: - - farm-haystack[file-conversion] - google-search-results==2.4 - grpcio-status==1.58 # To keep in sync with 1.58 grpcio which is installed for Morpheus - nemollm + - PyMuPDF==1.23.20 diff --git a/examples/llm/common/content_extractor_module.py b/examples/llm/common/content_extractor_module.py old mode 100644 new mode 100755 index d0338d24ab..ca89d4a316 --- a/examples/llm/common/content_extractor_module.py +++ b/examples/llm/common/content_extractor_module.py @@ -12,26 +12,23 @@ # See the License for the specific language governing permissions and # limitations under the License. +import io import logging import os import typing from concurrent.futures import ThreadPoolExecutor from dataclasses import dataclass -from pathlib import Path +from functools import wraps +import fitz import fsspec import mrc import mrc.core.operators as ops import pandas as pd -from haystack import Document -from haystack.nodes import DocxToTextConverter -from haystack.nodes import PDFToTextConverter -from haystack.nodes import TextConverter -from haystack.nodes.file_converter import BaseConverter +from docx import Document from langchain.text_splitter import RecursiveCharacterTextSplitter from pydantic import ValidationError -from morpheus.common import read_file_to_df from morpheus.messages import MessageMeta from morpheus.modules.schemas.examples.llm.content_extractor_schema import ContentExtractorSchema from morpheus.utils.module_utils import ModuleLoaderFactory @@ -51,72 +48,10 @@ class FileMeta: file_type: str -class CsvTextConverter(BaseConverter): - """ - Converts a CSV file column content to text documents. - """ - - outgoing_edges = 1 - - def convert(self, - file_path: Path | str | list[Path | str], - meta: typing.Optional[dict[str, typing.Any]] = None, - remove_numeric_tables: typing.Optional[bool] = None, - valid_languages: typing.Optional[list[str]] = None, - encoding: typing.Optional[str] = "UTF-8", - id_hash_keys: typing.Optional[list[str]] = None) -> list[Document]: - """ - Load a CSV file and convert it to Documents. - - Parameters - ---------- - file_path: - Path to the CSV file you want to convert. - meta: - Optional dictionary of metadata key-value pairs that you want to append to the returned document. - encoding: - Optional file encoding format, default: `UTF-8`. - id_hash_keys: - Generates the document ID from a custom list of strings that refer to the document's attributes. - remove_numeric_tables: bool - Removes numeric tables from the csv. - valid_languages: - Valid languages - - Returns - ------- - list[haystack.Document] - List of documents, 1 document per line in the CSV. - """ - if not isinstance(file_path, list): - file_path = [file_path] - - docs: list[Document] = [] - text_column_names = set("content") - - if meta is not None: - text_column_names = set(meta.get("csv", {}).get("text_column_names", text_column_names)) - - for path in file_path: - logger.error("Processing file: %s", path) - df = pd.read_csv(path, encoding=encoding) - if len(df.columns) == 0 or (not text_column_names.issubset(set(df.columns))): - raise ValueError("The CSV file must either include a 'content' column or have a " - "columns specified in the meta configuration with key 'text_column_names'.") - - df.fillna(value='', inplace=True) - df["content"] = df[text_column_names].apply(lambda x: ' '.join(map(str, x)), axis=1) - - docs_dicts = df.to_dict(orient="records") - - for dictionary in docs_dicts: - if meta: - dictionary["meta"] = meta - if id_hash_keys: - dictionary["id_hash_keys"] = id_hash_keys - docs.append(Document.from_dict(dictionary)) - - return docs +@dataclass +class ConverterInputInfo: + io_bytes: io.BytesIO + meta: dict def get_file_meta(open_file: fsspec.core.OpenFile) -> FileMeta: @@ -150,14 +85,105 @@ def get_file_meta(open_file: fsspec.core.OpenFile) -> FileMeta: raise -def process_content(docs: list[Document], file_meta: FileMeta, chunk_size: int, chunk_overlap: int) -> list[dict]: +def read_file_to_bytesio(file_path: str) -> io.BytesIO: + """ + Read the content of the file and return it as an io.BytesIO object. + + Parameters + ---------- + file_path: str + Path to the file. + + Returns + ------- + io.BytesIO or None + Returns io.BytesIO object if the file is successfully read. Returns + None if there is an error reading the file. + """ + + io_bytes = None + + try: + with open(file_path, 'rb') as file: + io_bytes = io.BytesIO(file.read()) + except FileNotFoundError: + logger.error(f"Error: File not found - {file_path}") + except PermissionError: + logger.error(f"Error: Permission denied - {file_path}") + except Exception as e: + logger.error(f"Error reading file {file_path}: {e}") + + return io_bytes + + +def _converter_error_handler(func: typing.Callable) -> typing.Callable: + + @wraps(func) + def wrapper(input_info: ConverterInputInfo, *args, **kwargs): + try: + # Common logic for instance check + if not isinstance(input_info.io_bytes, io.BytesIO): + raise ValueError("Invalid input type. Supported type: io.BytesIO.") + + return func(input_info, *args, **kwargs) + except Exception as exec_info: + logger.error(f"Error in {func.__name__}: {exec_info}") + return func.__annotations__.get("return_type", None)() + + return wrapper + + +@_converter_error_handler +def _pdf_to_text_converter(input_info: ConverterInputInfo) -> str: + text = "" + pdf_document = fitz.open(stream=input_info.io_bytes, filetype="pdf") + for page_num in range(pdf_document.page_count): + page = pdf_document[page_num] + text += page.get_text() + return text + + +@_converter_error_handler +def _docx_to_text_converter(input_info: ConverterInputInfo) -> str: + text = "" + doc = Document(io.BytesIO(input_info.io_bytes.read())) + text = '\n'.join([paragraph.text for paragraph in doc.paragraphs]) + return text + + +@_converter_error_handler +def _csv_to_text_converter(input_info: ConverterInputInfo) -> list[str]: + text_arr = [] + text_column_names = set("content") + if input_info.meta is not None: + text_column_names = set(input_info.meta.get("csv", {}).get("text_column_names", text_column_names)) + df = pd.read_csv(input_info.io_bytes) + if len(df.columns) == 0 or (not text_column_names.issubset(set(df.columns))): + raise ValueError("The CSV file must either include a 'content' column or have a " + "columns specified in the meta configuration with key 'text_column_names'.") + df.fillna(value='', inplace=True) + text_arr = df[text_column_names].apply(lambda x: ' '.join(map(str, x)), axis=1).tolist() + return text_arr + + +@_converter_error_handler +def _text_converter(input_info: ConverterInputInfo) -> str: + text = "" + convertor_conf = input_info.meta.get("txt", {}) + encoding = convertor_conf.get("encoding", "utf-8") + input_info.io_bytes.seek(0) + text = input_info.io_bytes.read().decode(encoding) + return text + + +def process_content(docs: str | list[str], file_meta: FileMeta, chunk_size: int, chunk_overlap: int) -> list[dict]: """ Processes the content of a file and splits it into chunks. Parameters ---------- - docs : list[Document] - List of documents. + docs : str | list[str] + Documents content. file_meta: FileMeta FileMeta parsed information of a file path. chunk_size : int @@ -177,9 +203,12 @@ def process_content(docs: list[Document], file_meta: FileMeta, chunk_size: int, processed_data = [] + if isinstance(docs, str): + docs = [docs] + for document in docs: try: - split_text = text_splitter.split_text(document.content) + split_text = text_splitter.split_text(document) for chunk in split_text: processed_data.append({ @@ -246,10 +275,10 @@ def file_content_extractor(builder: mrc.Builder): converters_meta = extractor_config.converters_meta converters = { - "pdf": PDFToTextConverter(), - "csv": CsvTextConverter(), - "docx": DocxToTextConverter(valid_languages=["de", "en"]), - "txt": TextConverter() + "pdf": _pdf_to_text_converter, + "csv": _csv_to_text_converter, + "docx": _docx_to_text_converter, + "txt": _text_converter } chunk_params = { @@ -282,20 +311,23 @@ def parse_files(open_files: typing.List[fsspec.core.OpenFile]) -> MessageMeta: try: file_meta: FileMeta = get_file_meta(open_file=open_file) - converter = converters.get(file_meta.file_type, TextConverter()) - futures.append(executor.submit(converter.convert, file_meta.file_path, converters_meta)) + futures.append(executor.submit(read_file_to_bytesio, file_meta.file_path)) files_meta.append(file_meta) except Exception as e: logger.error(f"Error processing file {open_file.path}: {e}") for file_meta, future in zip(files_meta, futures): - docs = future.result() - if docs: + io_bytes = future.result() + + if io_bytes: + converter = converters.get(file_meta.file_type, _text_converter) + input_info = ConverterInputInfo(io_bytes=io_bytes, meta=converters_meta) + result = converter(input_info) # Get chunk params for the file type, default to txt file_type_chunk_params = chunk_params[ file_meta.file_type] if file_meta.file_type in chunk_params else chunk_params['txt'] - result = process_content(docs, + result = process_content(result, file_meta, file_type_chunk_params["chunk_size"], file_type_chunk_params["chunk_overlap"]) diff --git a/examples/llm/vdb_upload/module/file_source_pipe.py b/examples/llm/vdb_upload/module/file_source_pipe.py index 2a93676eaf..4e74bf8509 100644 --- a/examples/llm/vdb_upload/module/file_source_pipe.py +++ b/examples/llm/vdb_upload/module/file_source_pipe.py @@ -97,6 +97,8 @@ def _file_source_pipe(builder: mrc.Builder): file_content_extractor_config = { "batch_size": validated_config.batch_size, "num_threads": validated_config.num_threads, + "chunk_size": validated_config.chunk_size, + "chunk_overlap": validated_config.chunk_overlap, "converters_meta": validated_config.converters_meta } extractor_loader = ContentExtractorLoaderFactory.get_instance("file_content_extractor", diff --git a/examples/llm/vdb_upload/module/schema_transform.py b/examples/llm/vdb_upload/module/schema_transform.py index 83e061005a..6cf7405f2f 100644 --- a/examples/llm/vdb_upload/module/schema_transform.py +++ b/examples/llm/vdb_upload/module/schema_transform.py @@ -27,8 +27,6 @@ from morpheus.utils.column_info import RenameColumn from morpheus.utils.module_utils import ModuleLoaderFactory from morpheus.utils.module_utils import register_module -from morpheus.utils.nvt.schema_converters import create_and_attach_nvt_workflow -from morpheus.utils.schema_transforms import process_dataframe logger = logging.getLogger(__name__) @@ -78,6 +76,7 @@ def _schema_transform(builder: mrc.Builder): schema_config = validated_config.schema_transform_config source_column_info = [] + preserve_columns = [] for col_name, col_config in schema_config.items(): op_type = col_config.get("op_type") @@ -91,8 +90,9 @@ def _schema_transform(builder: mrc.Builder): else: raise ValueError(f"Unknown op_type '{op_type}' for column '{col_name}'") + preserve_columns.append(col_name) + source_schema = DataFrameInputSchema(column_info=source_column_info) - source_schema = create_and_attach_nvt_workflow(input_schema=source_schema) def do_transform(message: MessageMeta): if (message is None): @@ -101,9 +101,17 @@ def do_transform(message: MessageMeta): with message.mutable_dataframe() as mdf: if (len(mdf) == 0): return None - _df = process_dataframe(mdf, source_schema) - return MessageMeta(df=cudf.DataFrame(_df)) + for ci in source_schema.column_info: + try: + mdf[ci.name] = ci._process_column(mdf) + except Exception as exc_info: + logger.exception("Failed to process column '%s'. Dataframe: \n%s", ci.name, mdf, exc_info) + return None + + mdf = mdf[preserve_columns] + + return MessageMeta(df=cudf.DataFrame(mdf)) node = builder.make_node("schema_transform", ops.map(do_transform), ops.filter(lambda x: x is not None))