diff --git a/docker/conda/environments/cuda11.8_examples.yml b/docker/conda/environments/cuda11.8_examples.yml index c1a07c983e..63050fbcdd 100644 --- a/docker/conda/environments/cuda11.8_examples.yml +++ b/docker/conda/environments/cuda11.8_examples.yml @@ -35,6 +35,7 @@ dependencies: - dgl=1.0.2 - dill=0.3.6 - distributed>=2023.1.1 + - python-docx==1.1.0 - huggingface_hub=0.10.1 # work-around for https://github.com/UKPLab/sentence-transformers/issues/1762 - langchain=0.0.190 - libwebp>=1.3.2 # Required for CVE mitigation: https://nvd.nist.gov/vuln/detail/CVE-2023-4863 @@ -59,7 +60,7 @@ dependencies: ####### Pip Dependencies (keep sorted!) ####### - pip: - - farm-haystack[file-conversion] - google-search-results==2.4 - grpcio-status==1.58 # To keep in sync with 1.58 grpcio which is installed for Morpheus - nemollm + - PyMuPDF==1.23.20 diff --git a/examples/llm/common/content_extractor_module.py b/examples/llm/common/content_extractor_module.py index 5b9d89e929..089517cedb 100644 --- a/examples/llm/common/content_extractor_module.py +++ b/examples/llm/common/content_extractor_module.py @@ -12,26 +12,22 @@ # See the License for the specific language governing permissions and # limitations under the License. +import io import logging import os import typing from concurrent.futures import ThreadPoolExecutor from dataclasses import dataclass -from pathlib import Path +import fitz import fsspec import mrc import mrc.core.operators as ops import pandas as pd -from haystack import Document -from haystack.nodes import DocxToTextConverter -from haystack.nodes import PDFToTextConverter -from haystack.nodes import TextConverter -from haystack.nodes.file_converter import BaseConverter +from docx import Document from langchain.text_splitter import RecursiveCharacterTextSplitter from pydantic import ValidationError -from morpheus.common import read_file_to_df from morpheus.messages import MessageMeta from morpheus.modules.schemas.examples.llm.content_extractor_schema import ContentExtractorSchema from morpheus.utils.module_utils import ModuleLoaderFactory @@ -51,74 +47,6 @@ class FileMeta: file_type: str -class CsvTextConverter(BaseConverter): - """ - Converts a CSV file column content to text documents. - """ - - outgoing_edges = 1 - - def convert(self, - file_path: Path | str | list[Path | str], - meta: typing.Optional[dict[str, typing.Any]] = None, - remove_numeric_tables: typing.Optional[bool] = None, - valid_languages: typing.Optional[list[str]] = None, - encoding: typing.Optional[str] = "UTF-8", - id_hash_keys: typing.Optional[list[str]] = None) -> list[Document]: - """ - Load a CSV file and convert it to Documents. - - Parameters - ---------- - file_path: - Path to the CSV file you want to convert. - meta: - Optional dictionary of metadata key-value pairs that you want to append to the returned document. - encoding: - Optional file encoding format, default: `UTF-8`. - id_hash_keys: - Generates the document ID from a custom list of strings that refer to the document's attributes. - remove_numeric_tables: bool - Removes numeric tables from the csv. - valid_languages: - Valid languages - - Returns - ------- - list[haystack.Document] - List of documents, 1 document per line in the CSV. - """ - if not isinstance(file_path, list): - file_path = [file_path] - - docs: list[Document] = [] - text_column_names = set("content") - - if meta is not None: - text_column_names = set(meta.get("csv", {}).get("text_column_names", text_column_names)) - - for path in file_path: - logger.error("Processing file: %s", path) - df = pd.read_csv(path, encoding=encoding) - if len(df.columns) == 0 or (not text_column_names.issubset(set(df.columns))): - raise ValueError("The CSV file must either include a 'content' column or have a " - "columns specified in the meta configuration with key 'text_column_names'.") - - df.fillna(value='', inplace=True) - df["content"] = df[text_column_names].apply(lambda x: ' '.join(map(str, x)), axis=1) - - docs_dicts = df.to_dict(orient="records") - - for dictionary in docs_dicts: - if meta: - dictionary["meta"] = meta - if id_hash_keys: - dictionary["id_hash_keys"] = id_hash_keys - docs.append(Document.from_dict(dictionary)) - - return docs - - def get_file_meta(open_file: fsspec.core.OpenFile) -> FileMeta: """ Extract file metadata from the given open file. @@ -150,14 +78,104 @@ def get_file_meta(open_file: fsspec.core.OpenFile) -> FileMeta: raise -def process_content(docs: list[Document], file_meta: FileMeta, chunk_size: int, chunk_overlap: int) -> list[dict]: +def read_file_to_bytesio(file_path: str) -> io.BytesIO: + """ + Read the content of the file and return it as an io.BytesIO object. + + Parameters + ---------- + file_path: str + Path to the file. + + Returns + ------- + io.BytesIO or None + Returns io.BytesIO object if the file is successfully read. Returns + None if there is an error reading the file. + """ + + io_bytes = None + + try: + with open(file_path, 'rb') as file: + io_bytes = io.BytesIO(file.read()) + except FileNotFoundError: + logger.error(f"Error: File not found - {file_path}") + except PermissionError: + logger.error(f"Error: Permission denied - {file_path}") + except Exception as e: + logger.error(f"Error reading file {file_path}: {e}") + + return io_bytes + + +def _pdf_to_text_converter(bytes_io: io.BytesIO, meta: dict = None) -> str: + if isinstance(bytes_io, io.BytesIO): + pdf_document = fitz.open(stream=bytes_io, filetype="pdf") + text = '' + for page_num in range(pdf_document.page_count): + page = pdf_document[page_num] + text += page.get_text() + else: + raise ValueError("Invalid input type. Supported type bytes (io.BytesIO).") + + return text + + +def _docx_to_text_converter(bytes_io: io.BytesIO, meta: dict) -> str: + if isinstance(bytes_io, io.BytesIO): + # New io.BytesIO object and copying the content from the original io.BytesIO object + # to effectively reset cursor position + doc = Document(io.BytesIO(bytes_io.read())) + text = '\n'.join([paragraph.text for paragraph in doc.paragraphs]) + else: + raise ValueError("Invalid input type. Supported type bytes (io.BytesIO).") + + return text + + +def _csv_to_text_converter(bytes_io: io.BytesIO, meta: dict) -> list[str]: + if isinstance(bytes_io, io.BytesIO): + text_column_names = set("content") + + if meta is not None: + text_column_names = set(meta.get("csv", {}).get("text_column_names", text_column_names)) + + df = pd.read_csv(bytes_io) + if len(df.columns) == 0 or (not text_column_names.issubset(set(df.columns))): + raise ValueError("The CSV file must either include a 'content' column or have a " + "columns specified in the meta configuration with key 'text_column_names'.") + + df.fillna(value='', inplace=True) + text_arr = df[text_column_names].apply(lambda x: ' '.join(map(str, x)), axis=1).tolist() + else: + raise ValueError("Invalid input type. Supported type bytes (io.BytesIO).") + + return text_arr + + +def _text_converter(io_bytes: io.BytesIO, meta: dict) -> str: + + convertor_conf = meta.get("txt", {}) + encoding = convertor_conf.get("encoding", "utf-8") + + # Ensure the cursor is at the beginning of the stream + io_bytes.seek(0) + + # Decode the bytes to a string using the specified encoding + text = io_bytes.read().decode(encoding) + + return text + + +def process_content(docs: str | list[str], file_meta: FileMeta, chunk_size: int, chunk_overlap: int) -> list[dict]: """ Processes the content of a file and splits it into chunks. Parameters ---------- - docs : list[Document] - List of documents. + docs : str | list[str] + Documents content. file_meta: FileMeta FileMeta parsed information of a file path. chunk_size : int @@ -177,9 +195,12 @@ def process_content(docs: list[Document], file_meta: FileMeta, chunk_size: int, processed_data = [] + if isinstance(docs, str): + docs = [docs] + for document in docs: try: - split_text = text_splitter.split_text(document.content) + split_text = text_splitter.split_text(document) for chunk in split_text: processed_data.append({ @@ -246,10 +267,10 @@ def file_content_extractor(builder: mrc.Builder): converters_meta = extractor_config.converters_meta converters = { - "pdf": PDFToTextConverter(), - "csv": CsvTextConverter(), - "docx": DocxToTextConverter(valid_languages=["de", "en"]), - "txt": TextConverter() + "pdf": _pdf_to_text_converter, + "csv": _csv_to_text_converter, + "docx": _docx_to_text_converter, + "txt": _text_converter } chunk_params = { @@ -282,20 +303,22 @@ def parse_files(open_files: typing.List[fsspec.core.OpenFile]) -> MessageMeta: try: file_meta: FileMeta = get_file_meta(open_file=open_file) - converter = converters.get(file_meta.file_type, TextConverter()) - futures.append(executor.submit(converter.convert, file_meta.file_path, converters_meta)) + futures.append(executor.submit(read_file_to_bytesio, file_meta.file_path)) files_meta.append(file_meta) except Exception as e: logger.error(f"Error processing file {open_file.path}: {e}") for file_meta, future in zip(files_meta, futures): - docs = future.result() - if docs: + io_bytes = future.result() + + if io_bytes: + converter = converters.get(file_meta.file_type, _text_converter) + result = converter(io_bytes, meta=converters_meta) # Get chunk params for the file type, default to txt file_type_chunk_params = chunk_params[ file_meta.file_type] if file_meta.file_type in chunk_params else chunk_params['txt'] - result = process_content(docs, + result = process_content(result, file_meta, file_type_chunk_params["chunk_size"], file_type_chunk_params["chunk_overlap"]) @@ -308,4 +331,4 @@ def parse_files(open_files: typing.List[fsspec.core.OpenFile]) -> MessageMeta: node = builder.make_node("text_extractor", ops.map(parse_files), ops.filter(lambda x: x is not None)) builder.register_module_input("input", node) - builder.register_module_output("output", node) \ No newline at end of file + builder.register_module_output("output", node) diff --git a/examples/llm/vdb_upload/module/schema_transform.py b/examples/llm/vdb_upload/module/schema_transform.py index 243f850e26..6cf7405f2f 100644 --- a/examples/llm/vdb_upload/module/schema_transform.py +++ b/examples/llm/vdb_upload/module/schema_transform.py @@ -89,7 +89,7 @@ def _schema_transform(builder: mrc.Builder): source_column_info.append(ColumnInfo(name=col_name, dtype=col_config["dtype"])) else: raise ValueError(f"Unknown op_type '{op_type}' for column '{col_name}'") - + preserve_columns.append(col_name) source_schema = DataFrameInputSchema(column_info=source_column_info) @@ -101,14 +101,14 @@ def do_transform(message: MessageMeta): with message.mutable_dataframe() as mdf: if (len(mdf) == 0): return None - + for ci in source_schema.column_info: try: mdf[ci.name] = ci._process_column(mdf) except Exception as exc_info: logger.exception("Failed to process column '%s'. Dataframe: \n%s", ci.name, mdf, exc_info) return None - + mdf = mdf[preserve_columns] return MessageMeta(df=cudf.DataFrame(mdf)) diff --git a/examples/llm/vdb_upload/vdb_config.yaml b/examples/llm/vdb_upload/vdb_config.yaml index 32d2a97bc7..8ba3f5c22d 100644 --- a/examples/llm/vdb_upload/vdb_config.yaml +++ b/examples/llm/vdb_upload/vdb_config.yaml @@ -88,6 +88,7 @@ vdb_pipeline: batch_size: 1024 extractor_config: chunk_size: 512 + num_threads: 10 chunk_overlap: 51 enable_monitor: True filenames: diff --git a/morpheus/modules/schemas/examples/llm/content_extractor_schema.py b/morpheus/modules/schemas/examples/llm/content_extractor_schema.py index 1f1ca8ecb9..03878aee46 100644 --- a/morpheus/modules/schemas/examples/llm/content_extractor_schema.py +++ b/morpheus/modules/schemas/examples/llm/content_extractor_schema.py @@ -37,7 +37,7 @@ class ContentExtractorSchema(BaseModel): chunk_overlap: int = 51 chunk_size: int = 512 converters_meta: Dict[str, Dict] = Field(default_factory=dict) - num_threads: 10 + num_threads: int = 10 @validator('converters_meta', pre=True) def validate_converters_meta(cls, v):