Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Update Content Extraction #24

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion docker/conda/environments/cuda11.8_examples.yml
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ dependencies:
- dgl=1.0.2
- dill=0.3.6
- distributed>=2023.1.1
- python-docx==1.1.0
- huggingface_hub=0.10.1 # work-around for https://github.com/UKPLab/sentence-transformers/issues/1762
- langchain=0.0.190
- libwebp>=1.3.2 # Required for CVE mitigation: https://nvd.nist.gov/vuln/detail/CVE-2023-4863
Expand All @@ -59,7 +60,7 @@ dependencies:

####### Pip Dependencies (keep sorted!) #######
- pip:
- farm-haystack[file-conversion]
- google-search-results==2.4
- grpcio-status==1.58 # To keep in sync with 1.58 grpcio which is installed for Morpheus
- nemollm
- PyMuPDF==1.23.20
204 changes: 118 additions & 86 deletions examples/llm/common/content_extractor_module.py
100644 → 100755
Original file line number Diff line number Diff line change
Expand Up @@ -12,26 +12,23 @@
# See the License for the specific language governing permissions and
# limitations under the License.

import io
import logging
import os
import typing
from concurrent.futures import ThreadPoolExecutor
from dataclasses import dataclass
from pathlib import Path
from functools import wraps

import fitz
import fsspec
import mrc
import mrc.core.operators as ops
import pandas as pd
from haystack import Document
from haystack.nodes import DocxToTextConverter
from haystack.nodes import PDFToTextConverter
from haystack.nodes import TextConverter
from haystack.nodes.file_converter import BaseConverter
from docx import Document
from langchain.text_splitter import RecursiveCharacterTextSplitter
from pydantic import ValidationError

from morpheus.common import read_file_to_df
from morpheus.messages import MessageMeta
from morpheus.modules.schemas.examples.llm.content_extractor_schema import ContentExtractorSchema
from morpheus.utils.module_utils import ModuleLoaderFactory
Expand All @@ -51,72 +48,10 @@ class FileMeta:
file_type: str


class CsvTextConverter(BaseConverter):
"""
Converts a CSV file column content to text documents.
"""

outgoing_edges = 1

def convert(self,
file_path: Path | str | list[Path | str],
meta: typing.Optional[dict[str, typing.Any]] = None,
remove_numeric_tables: typing.Optional[bool] = None,
valid_languages: typing.Optional[list[str]] = None,
encoding: typing.Optional[str] = "UTF-8",
id_hash_keys: typing.Optional[list[str]] = None) -> list[Document]:
"""
Load a CSV file and convert it to Documents.

Parameters
----------
file_path:
Path to the CSV file you want to convert.
meta:
Optional dictionary of metadata key-value pairs that you want to append to the returned document.
encoding:
Optional file encoding format, default: `UTF-8`.
id_hash_keys:
Generates the document ID from a custom list of strings that refer to the document's attributes.
remove_numeric_tables: bool
Removes numeric tables from the csv.
valid_languages:
Valid languages

Returns
-------
list[haystack.Document]
List of documents, 1 document per line in the CSV.
"""
if not isinstance(file_path, list):
file_path = [file_path]

docs: list[Document] = []
text_column_names = set("content")

if meta is not None:
text_column_names = set(meta.get("csv", {}).get("text_column_names", text_column_names))

for path in file_path:
logger.error("Processing file: %s", path)
df = pd.read_csv(path, encoding=encoding)
if len(df.columns) == 0 or (not text_column_names.issubset(set(df.columns))):
raise ValueError("The CSV file must either include a 'content' column or have a "
"columns specified in the meta configuration with key 'text_column_names'.")

df.fillna(value='', inplace=True)
df["content"] = df[text_column_names].apply(lambda x: ' '.join(map(str, x)), axis=1)

docs_dicts = df.to_dict(orient="records")

for dictionary in docs_dicts:
if meta:
dictionary["meta"] = meta
if id_hash_keys:
dictionary["id_hash_keys"] = id_hash_keys
docs.append(Document.from_dict(dictionary))

return docs
@dataclass
class ConverterInputInfo:
io_bytes: io.BytesIO
meta: dict


def get_file_meta(open_file: fsspec.core.OpenFile) -> FileMeta:
Expand Down Expand Up @@ -150,14 +85,105 @@ def get_file_meta(open_file: fsspec.core.OpenFile) -> FileMeta:
raise


def process_content(docs: list[Document], file_meta: FileMeta, chunk_size: int, chunk_overlap: int) -> list[dict]:
def read_file_to_bytesio(file_path: str) -> io.BytesIO:
"""
Read the content of the file and return it as an io.BytesIO object.

Parameters
----------
file_path: str
Path to the file.

Returns
-------
io.BytesIO or None
Returns io.BytesIO object if the file is successfully read. Returns
None if there is an error reading the file.
"""

io_bytes = None

try:
with open(file_path, 'rb') as file:
io_bytes = io.BytesIO(file.read())
except FileNotFoundError:
logger.error(f"Error: File not found - {file_path}")
except PermissionError:
logger.error(f"Error: Permission denied - {file_path}")
except Exception as e:
logger.error(f"Error reading file {file_path}: {e}")

return io_bytes


def _converter_error_handler(func: typing.Callable) -> typing.Callable:

@wraps(func)
def wrapper(input_info: ConverterInputInfo, *args, **kwargs):
try:
# Common logic for instance check
if not isinstance(input_info.io_bytes, io.BytesIO):
raise ValueError("Invalid input type. Supported type: io.BytesIO.")

return func(input_info, *args, **kwargs)
except Exception as exec_info:
logger.error(f"Error in {func.__name__}: {exec_info}")
return func.__annotations__.get("return_type", None)()

return wrapper


@_converter_error_handler
def _pdf_to_text_converter(input_info: ConverterInputInfo) -> str:
text = ""
pdf_document = fitz.open(stream=input_info.io_bytes, filetype="pdf")
for page_num in range(pdf_document.page_count):
page = pdf_document[page_num]
text += page.get_text()
return text


@_converter_error_handler
def _docx_to_text_converter(input_info: ConverterInputInfo) -> str:
text = ""
doc = Document(io.BytesIO(input_info.io_bytes.read()))
text = '\n'.join([paragraph.text for paragraph in doc.paragraphs])
return text


@_converter_error_handler
def _csv_to_text_converter(input_info: ConverterInputInfo) -> list[str]:
text_arr = []
text_column_names = set("content")
if input_info.meta is not None:
text_column_names = set(input_info.meta.get("csv", {}).get("text_column_names", text_column_names))
df = pd.read_csv(input_info.io_bytes)
if len(df.columns) == 0 or (not text_column_names.issubset(set(df.columns))):
raise ValueError("The CSV file must either include a 'content' column or have a "
"columns specified in the meta configuration with key 'text_column_names'.")
df.fillna(value='', inplace=True)
text_arr = df[text_column_names].apply(lambda x: ' '.join(map(str, x)), axis=1).tolist()
return text_arr


@_converter_error_handler
def _text_converter(input_info: ConverterInputInfo) -> str:
text = ""
convertor_conf = input_info.meta.get("txt", {})
encoding = convertor_conf.get("encoding", "utf-8")
input_info.io_bytes.seek(0)
text = input_info.io_bytes.read().decode(encoding)
return text


def process_content(docs: str | list[str], file_meta: FileMeta, chunk_size: int, chunk_overlap: int) -> list[dict]:
"""
Processes the content of a file and splits it into chunks.

Parameters
----------
docs : list[Document]
List of documents.
docs : str | list[str]
Documents content.
file_meta: FileMeta
FileMeta parsed information of a file path.
chunk_size : int
Expand All @@ -177,9 +203,12 @@ def process_content(docs: list[Document], file_meta: FileMeta, chunk_size: int,

processed_data = []

if isinstance(docs, str):
docs = [docs]

for document in docs:
try:
split_text = text_splitter.split_text(document.content)
split_text = text_splitter.split_text(document)

for chunk in split_text:
processed_data.append({
Expand Down Expand Up @@ -246,10 +275,10 @@ def file_content_extractor(builder: mrc.Builder):
converters_meta = extractor_config.converters_meta

converters = {
"pdf": PDFToTextConverter(),
"csv": CsvTextConverter(),
"docx": DocxToTextConverter(valid_languages=["de", "en"]),
"txt": TextConverter()
"pdf": _pdf_to_text_converter,
"csv": _csv_to_text_converter,
"docx": _docx_to_text_converter,
"txt": _text_converter
}

chunk_params = {
Expand Down Expand Up @@ -282,20 +311,23 @@ def parse_files(open_files: typing.List[fsspec.core.OpenFile]) -> MessageMeta:

try:
file_meta: FileMeta = get_file_meta(open_file=open_file)
converter = converters.get(file_meta.file_type, TextConverter())
futures.append(executor.submit(converter.convert, file_meta.file_path, converters_meta))
futures.append(executor.submit(read_file_to_bytesio, file_meta.file_path))
files_meta.append(file_meta)

except Exception as e:
logger.error(f"Error processing file {open_file.path}: {e}")

for file_meta, future in zip(files_meta, futures):
docs = future.result()
if docs:
io_bytes = future.result()

if io_bytes:
converter = converters.get(file_meta.file_type, _text_converter)
input_info = ConverterInputInfo(io_bytes=io_bytes, meta=converters_meta)
result = converter(input_info)
# Get chunk params for the file type, default to txt
file_type_chunk_params = chunk_params[
file_meta.file_type] if file_meta.file_type in chunk_params else chunk_params['txt']
result = process_content(docs,
result = process_content(result,
file_meta,
file_type_chunk_params["chunk_size"],
file_type_chunk_params["chunk_overlap"])
Expand Down
2 changes: 2 additions & 0 deletions examples/llm/vdb_upload/module/file_source_pipe.py
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,8 @@ def _file_source_pipe(builder: mrc.Builder):
file_content_extractor_config = {
"batch_size": validated_config.batch_size,
"num_threads": validated_config.num_threads,
"chunk_size": validated_config.chunk_size,
"chunk_overlap": validated_config.chunk_overlap,
"converters_meta": validated_config.converters_meta
}
extractor_loader = ContentExtractorLoaderFactory.get_instance("file_content_extractor",
Expand Down
18 changes: 13 additions & 5 deletions examples/llm/vdb_upload/module/schema_transform.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,6 @@
from morpheus.utils.column_info import RenameColumn
from morpheus.utils.module_utils import ModuleLoaderFactory
from morpheus.utils.module_utils import register_module
from morpheus.utils.nvt.schema_converters import create_and_attach_nvt_workflow
from morpheus.utils.schema_transforms import process_dataframe

logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -78,6 +76,7 @@ def _schema_transform(builder: mrc.Builder):
schema_config = validated_config.schema_transform_config

source_column_info = []
preserve_columns = []

for col_name, col_config in schema_config.items():
op_type = col_config.get("op_type")
Expand All @@ -91,8 +90,9 @@ def _schema_transform(builder: mrc.Builder):
else:
raise ValueError(f"Unknown op_type '{op_type}' for column '{col_name}'")

preserve_columns.append(col_name)

source_schema = DataFrameInputSchema(column_info=source_column_info)
source_schema = create_and_attach_nvt_workflow(input_schema=source_schema)

def do_transform(message: MessageMeta):
if (message is None):
Expand All @@ -101,9 +101,17 @@ def do_transform(message: MessageMeta):
with message.mutable_dataframe() as mdf:
if (len(mdf) == 0):
return None
_df = process_dataframe(mdf, source_schema)

return MessageMeta(df=cudf.DataFrame(_df))
for ci in source_schema.column_info:
try:
mdf[ci.name] = ci._process_column(mdf)
except Exception as exc_info:
logger.exception("Failed to process column '%s'. Dataframe: \n%s", ci.name, mdf, exc_info)
return None

mdf = mdf[preserve_columns]

return MessageMeta(df=cudf.DataFrame(mdf))

node = builder.make_node("schema_transform", ops.map(do_transform), ops.filter(lambda x: x is not None))

Expand Down
Loading