Skip to content

Commit

Permalink
Moved document processing out of threadpool execution
Browse files Browse the repository at this point in the history
  • Loading branch information
bsuryadevara committed Jan 30, 2024
1 parent 6c854bc commit c1bc895
Show file tree
Hide file tree
Showing 5 changed files with 119 additions and 94 deletions.
3 changes: 2 additions & 1 deletion docker/conda/environments/cuda11.8_examples.yml
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ dependencies:
- dgl=1.0.2
- dill=0.3.6
- distributed>=2023.1.1
- python-docx==1.1.0
- huggingface_hub=0.10.1 # work-around for https://github.com/UKPLab/sentence-transformers/issues/1762
- langchain=0.0.190
- libwebp>=1.3.2 # Required for CVE mitigation: https://nvd.nist.gov/vuln/detail/CVE-2023-4863
Expand All @@ -59,7 +60,7 @@ dependencies:

####### Pip Dependencies (keep sorted!) #######
- pip:
- farm-haystack[file-conversion]
- google-search-results==2.4
- grpcio-status==1.58 # To keep in sync with 1.58 grpcio which is installed for Morpheus
- nemollm
- PyMuPDF==1.23.20
201 changes: 112 additions & 89 deletions examples/llm/common/content_extractor_module.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,26 +12,22 @@
# See the License for the specific language governing permissions and
# limitations under the License.

import io
import logging
import os
import typing
from concurrent.futures import ThreadPoolExecutor
from dataclasses import dataclass
from pathlib import Path

import fitz
import fsspec
import mrc
import mrc.core.operators as ops
import pandas as pd
from haystack import Document
from haystack.nodes import DocxToTextConverter
from haystack.nodes import PDFToTextConverter
from haystack.nodes import TextConverter
from haystack.nodes.file_converter import BaseConverter
from docx import Document
from langchain.text_splitter import RecursiveCharacterTextSplitter
from pydantic import ValidationError

from morpheus.common import read_file_to_df
from morpheus.messages import MessageMeta
from morpheus.modules.schemas.examples.llm.content_extractor_schema import ContentExtractorSchema
from morpheus.utils.module_utils import ModuleLoaderFactory
Expand All @@ -51,74 +47,6 @@ class FileMeta:
file_type: str


class CsvTextConverter(BaseConverter):
"""
Converts a CSV file column content to text documents.
"""

outgoing_edges = 1

def convert(self,
file_path: Path | str | list[Path | str],
meta: typing.Optional[dict[str, typing.Any]] = None,
remove_numeric_tables: typing.Optional[bool] = None,
valid_languages: typing.Optional[list[str]] = None,
encoding: typing.Optional[str] = "UTF-8",
id_hash_keys: typing.Optional[list[str]] = None) -> list[Document]:
"""
Load a CSV file and convert it to Documents.
Parameters
----------
file_path:
Path to the CSV file you want to convert.
meta:
Optional dictionary of metadata key-value pairs that you want to append to the returned document.
encoding:
Optional file encoding format, default: `UTF-8`.
id_hash_keys:
Generates the document ID from a custom list of strings that refer to the document's attributes.
remove_numeric_tables: bool
Removes numeric tables from the csv.
valid_languages:
Valid languages
Returns
-------
list[haystack.Document]
List of documents, 1 document per line in the CSV.
"""
if not isinstance(file_path, list):
file_path = [file_path]

docs: list[Document] = []
text_column_names = set("content")

if meta is not None:
text_column_names = set(meta.get("csv", {}).get("text_column_names", text_column_names))

for path in file_path:
logger.error("Processing file: %s", path)
df = pd.read_csv(path, encoding=encoding)
if len(df.columns) == 0 or (not text_column_names.issubset(set(df.columns))):
raise ValueError("The CSV file must either include a 'content' column or have a "
"columns specified in the meta configuration with key 'text_column_names'.")

df.fillna(value='', inplace=True)
df["content"] = df[text_column_names].apply(lambda x: ' '.join(map(str, x)), axis=1)

docs_dicts = df.to_dict(orient="records")

for dictionary in docs_dicts:
if meta:
dictionary["meta"] = meta
if id_hash_keys:
dictionary["id_hash_keys"] = id_hash_keys
docs.append(Document.from_dict(dictionary))

return docs


def get_file_meta(open_file: fsspec.core.OpenFile) -> FileMeta:
"""
Extract file metadata from the given open file.
Expand Down Expand Up @@ -150,14 +78,104 @@ def get_file_meta(open_file: fsspec.core.OpenFile) -> FileMeta:
raise


def process_content(docs: list[Document], file_meta: FileMeta, chunk_size: int, chunk_overlap: int) -> list[dict]:
def read_file_to_bytesio(file_path: str) -> io.BytesIO:
"""
Read the content of the file and return it as an io.BytesIO object.
Parameters
----------
file_path: str
Path to the file.
Returns
-------
io.BytesIO or None
Returns io.BytesIO object if the file is successfully read. Returns
None if there is an error reading the file.
"""

io_bytes = None

try:
with open(file_path, 'rb') as file:
io_bytes = io.BytesIO(file.read())
except FileNotFoundError:
logger.error(f"Error: File not found - {file_path}")
except PermissionError:
logger.error(f"Error: Permission denied - {file_path}")
except Exception as e:
logger.error(f"Error reading file {file_path}: {e}")

return io_bytes


def _pdf_to_text_converter(bytes_io: io.BytesIO, meta: dict = None) -> str:
if isinstance(bytes_io, io.BytesIO):
pdf_document = fitz.open(stream=bytes_io, filetype="pdf")
text = ''
for page_num in range(pdf_document.page_count):
page = pdf_document[page_num]
text += page.get_text()
else:
raise ValueError("Invalid input type. Supported type bytes (io.BytesIO).")

return text


def _docx_to_text_converter(bytes_io: io.BytesIO, meta: dict) -> str:
if isinstance(bytes_io, io.BytesIO):
# New io.BytesIO object and copying the content from the original io.BytesIO object
# to effectively reset cursor position
doc = Document(io.BytesIO(bytes_io.read()))
text = '\n'.join([paragraph.text for paragraph in doc.paragraphs])
else:
raise ValueError("Invalid input type. Supported type bytes (io.BytesIO).")

return text


def _csv_to_text_converter(bytes_io: io.BytesIO, meta: dict) -> list[str]:
if isinstance(bytes_io, io.BytesIO):
text_column_names = set("content")

if meta is not None:
text_column_names = set(meta.get("csv", {}).get("text_column_names", text_column_names))

df = pd.read_csv(bytes_io)
if len(df.columns) == 0 or (not text_column_names.issubset(set(df.columns))):
raise ValueError("The CSV file must either include a 'content' column or have a "
"columns specified in the meta configuration with key 'text_column_names'.")

df.fillna(value='', inplace=True)
text_arr = df[text_column_names].apply(lambda x: ' '.join(map(str, x)), axis=1).tolist()
else:
raise ValueError("Invalid input type. Supported type bytes (io.BytesIO).")

return text_arr


def _text_converter(io_bytes: io.BytesIO, meta: dict) -> str:

convertor_conf = meta.get("txt", {})
encoding = convertor_conf.get("encoding", "utf-8")

# Ensure the cursor is at the beginning of the stream
io_bytes.seek(0)

# Decode the bytes to a string using the specified encoding
text = io_bytes.read().decode(encoding)

return text


def process_content(docs: str | list[str], file_meta: FileMeta, chunk_size: int, chunk_overlap: int) -> list[dict]:
"""
Processes the content of a file and splits it into chunks.
Parameters
----------
docs : list[Document]
List of documents.
docs : str | list[str]
Documents content.
file_meta: FileMeta
FileMeta parsed information of a file path.
chunk_size : int
Expand All @@ -177,9 +195,12 @@ def process_content(docs: list[Document], file_meta: FileMeta, chunk_size: int,

processed_data = []

if isinstance(docs, str):
docs = [docs]

for document in docs:
try:
split_text = text_splitter.split_text(document.content)
split_text = text_splitter.split_text(document)

for chunk in split_text:
processed_data.append({
Expand Down Expand Up @@ -246,10 +267,10 @@ def file_content_extractor(builder: mrc.Builder):
converters_meta = extractor_config.converters_meta

converters = {
"pdf": PDFToTextConverter(),
"csv": CsvTextConverter(),
"docx": DocxToTextConverter(valid_languages=["de", "en"]),
"txt": TextConverter()
"pdf": _pdf_to_text_converter,
"csv": _csv_to_text_converter,
"docx": _docx_to_text_converter,
"txt": _text_converter
}

chunk_params = {
Expand Down Expand Up @@ -282,20 +303,22 @@ def parse_files(open_files: typing.List[fsspec.core.OpenFile]) -> MessageMeta:

try:
file_meta: FileMeta = get_file_meta(open_file=open_file)
converter = converters.get(file_meta.file_type, TextConverter())
futures.append(executor.submit(converter.convert, file_meta.file_path, converters_meta))
futures.append(executor.submit(read_file_to_bytesio, file_meta.file_path))
files_meta.append(file_meta)

except Exception as e:
logger.error(f"Error processing file {open_file.path}: {e}")

for file_meta, future in zip(files_meta, futures):
docs = future.result()
if docs:
io_bytes = future.result()

if io_bytes:
converter = converters.get(file_meta.file_type, _text_converter)
result = converter(io_bytes, meta=converters_meta)
# Get chunk params for the file type, default to txt
file_type_chunk_params = chunk_params[
file_meta.file_type] if file_meta.file_type in chunk_params else chunk_params['txt']
result = process_content(docs,
result = process_content(result,
file_meta,
file_type_chunk_params["chunk_size"],
file_type_chunk_params["chunk_overlap"])
Expand All @@ -308,4 +331,4 @@ def parse_files(open_files: typing.List[fsspec.core.OpenFile]) -> MessageMeta:

node = builder.make_node("text_extractor", ops.map(parse_files), ops.filter(lambda x: x is not None))
builder.register_module_input("input", node)
builder.register_module_output("output", node)
builder.register_module_output("output", node)
6 changes: 3 additions & 3 deletions examples/llm/vdb_upload/module/schema_transform.py
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ def _schema_transform(builder: mrc.Builder):
source_column_info.append(ColumnInfo(name=col_name, dtype=col_config["dtype"]))
else:
raise ValueError(f"Unknown op_type '{op_type}' for column '{col_name}'")

preserve_columns.append(col_name)

source_schema = DataFrameInputSchema(column_info=source_column_info)
Expand All @@ -101,14 +101,14 @@ def do_transform(message: MessageMeta):
with message.mutable_dataframe() as mdf:
if (len(mdf) == 0):
return None

for ci in source_schema.column_info:
try:
mdf[ci.name] = ci._process_column(mdf)
except Exception as exc_info:
logger.exception("Failed to process column '%s'. Dataframe: \n%s", ci.name, mdf, exc_info)
return None

mdf = mdf[preserve_columns]

return MessageMeta(df=cudf.DataFrame(mdf))
Expand Down
1 change: 1 addition & 0 deletions examples/llm/vdb_upload/vdb_config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,7 @@ vdb_pipeline:
batch_size: 1024
extractor_config:
chunk_size: 512
num_threads: 10
chunk_overlap: 51
enable_monitor: True
filenames:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ class ContentExtractorSchema(BaseModel):
chunk_overlap: int = 51
chunk_size: int = 512
converters_meta: Dict[str, Dict] = Field(default_factory=dict)
num_threads: 10
num_threads: int = 10

@validator('converters_meta', pre=True)
def validate_converters_meta(cls, v):
Expand Down

0 comments on commit c1bc895

Please sign in to comment.