Skip to content

Commit

Permalink
unstructured data processing (#4584)
Browse files Browse the repository at this point in the history
* initial commit

* apply suggestions

* lint

* mypy

* use udfs

* changelog

* doc test

* black

* add deps to toml

* unstructured as extra

* use unicodedata

* import inside func

* disable test for extra

* move to llm-app

* endline

* clean

* flake8

* undo flake(conflict with black)

GitOrigin-RevId: 893f531ff2cfc7ba6c2421fa9375703fcd09b561
  • Loading branch information
Mohamed Malhou authored and Manul from Pathway committed Sep 19, 2023
1 parent 5740fe2 commit 3cf03c1
Show file tree
Hide file tree
Showing 11 changed files with 2,434 additions and 524 deletions.
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.1.0/),
and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html).

## [Unreleased]
- Introduced llm_app.extract_texts(), leveraging unstructured-io for text extraction from raw file bytes.
- Chunking udf function `llm_app.chunk_texts()` that splits text based on token count.

## [0.1.1] - 2023-07-04

Expand Down
Binary file added examples/data/finance/20230203_alphabet_10K.pdf
Binary file not shown.
Binary file not shown.
3 changes: 3 additions & 0 deletions examples/pipelines/unstructured/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
from .app import run

__all__ = ["run"]
122 changes: 122 additions & 0 deletions examples/pipelines/unstructured/app.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,122 @@
"""
Microservice for a context-aware ChatGPT assistant.
The following program reads in a collection of documents,
embeds each document using the OpenAI document embedding model,
then builds an index for fast retrieval of documents relevant to a question,
effectively replacing a vector database.
The program then starts a REST API endpoint serving queries about programming in Pathway.
Each query text is first turned into a vector using OpenAI embedding service,
then relevant documentation pages are found using a Nearest Neighbor index computed
for documents in the corpus. A prompt is build from the relevant documentations pages
and sent to the OpenAI GPT-4 chat service for processing.
Usage:
In the root of this repository run:
`poetry run ./run_examples.py unstruct`
or, if all dependencies are managed manually rather than using poetry
`python examples/pipelines/unstructured/app.py`
You can also run this example directly in the environment with llm_app installed.
On another terminal, navigate to `examples/pipelines/unstructured/ui` and run
`streamlit run server.py`. You can interact with the app at `localhost:8501`
"""

import os

import pathway as pw
from pathway.stdlib.ml.index import KNNIndex

from llm_app import chunk_texts, extract_texts
from llm_app.model_wrappers import OpenAIChatGPTModel, OpenAIEmbeddingModel


class DocumentInputSchema(pw.Schema):
doc: str


class QueryInputSchema(pw.Schema):
query: str
user: str


def run(
*,
data_dir: str = os.environ.get("PATHWAY_DATA_DIR", "./examples/data/finance/"),
api_key: str = os.environ.get("OPENAI_API_TOKEN", ""),
host: str = "0.0.0.0",
port: int = 8080,
embedder_locator: str = "text-embedding-ada-002",
embedding_dimension: int = 1536,
model_locator: str = "gpt-3.5-turbo",
max_tokens: int = 300,
temperature: float = 0.0,
**kwargs,
):
embedder = OpenAIEmbeddingModel(api_key=api_key)

files = pw.io.fs.read(
data_dir,
mode="streaming",
format="binary",
autocommit_duration_ms=50,
)
documents = files.select(texts=extract_texts(pw.this.data))
documents = documents.select(chunks=chunk_texts(pw.this.texts))
documents = documents.flatten(pw.this.chunks).rename_columns(chunk=pw.this.chunks)

enriched_documents = documents + documents.select(
vector=embedder.apply(text=pw.this.chunk, locator=embedder_locator)
)

index = KNNIndex(
enriched_documents.vector, enriched_documents, n_dimensions=embedding_dimension
)

query, response_writer = pw.io.http.rest_connector(
host=host,
port=port,
schema=QueryInputSchema,
autocommit_duration_ms=50,
)

query += query.select(
vector=embedder.apply(text=pw.this.query, locator=embedder_locator),
)

query_context = query + index.get_nearest_items(
query.vector, k=3, collapse_rows=True
).select(documents_list=pw.this.chunk).promise_universe_is_equal_to(query)

@pw.udf
def build_prompt(documents, query):
docs_str = "\n".join(documents)
prompt = f"Given the following documents : \n {docs_str} \nanswer this query: {query}"
return prompt

prompt = query_context.select(
prompt=build_prompt(pw.this.documents_list, pw.this.query)
)

model = OpenAIChatGPTModel(api_key=api_key)

responses = prompt.select(
query_id=pw.this.id,
result=model.apply(
pw.this.prompt,
locator=model_locator,
temperature=temperature,
max_tokens=max_tokens,
),
)

response_writer(responses)

pw.run()


if __name__ == "__main__":
run()
60 changes: 60 additions & 0 deletions examples/pipelines/unstructured/ui/server.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
import os

import requests
import streamlit as st
from dotenv import load_dotenv

with st.sidebar:
st.markdown(
"[View the source code on GitHub](https://github.com/pathwaycom/llm-app)"
)

# Load environment variables
load_dotenv()
api_host = os.environ.get("PATHWAY_REST_CONNECTOR_HOST", "127.0.0.1")
api_port = int(os.environ.get("PATHWAY_REST_CONNECTOR_PORT", 8080))
data_path = "../../../../examples/data/finance/"

# Streamlit UI elements
st.title("LLM App")

uploaded_files = st.file_uploader("Upload a text file", accept_multiple_files=True)
if uploaded_files:
for file in uploaded_files:
print(file.name)
with open(os.path.join(data_path, file.name), "wb") as f:
f.write(file.read())

# Initialize chat history
if "messages" not in st.session_state:
st.session_state.messages = []

# Display chat messages from history on app rerun
for message in st.session_state.messages:
with st.chat_message(message["role"]):
st.markdown(message["content"])


# React to user input
if prompt := st.chat_input("How can I help you today?"):
# Display user message in chat message container
with st.chat_message("user"):
st.markdown(prompt)

# Add user message to chat history
st.session_state.messages.append({"role": "user", "content": prompt})

url = f"http://{api_host}:{api_port}/"
data = {"query": prompt, "user": "user"}

response = requests.post(url, json=data)

if response.status_code == 200:
response = response.json()
with st.chat_message("assistant"):
st.markdown(response)
st.session_state.messages.append({"role": "assistant", "content": response})
else:
st.error(
f"Failed to send data to Discounts API. Status code: {response.status_code}"
)
3 changes: 2 additions & 1 deletion llm_app/__init__.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
from llm_app import model_wrappers as model_wrappers
from llm_app.processing import chunk_texts, extract_texts

__all__ = ["model_wrappers"]
__all__ = ["model_wrappers", "extract_texts", "chunk_texts"]
117 changes: 117 additions & 0 deletions llm_app/processing.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,117 @@
import logging
import unicodedata
from io import BytesIO

import pathway as pw

CHARS_PER_TOKEN = 3
PUNCTUATION = [".", "?", "!", "\n"]


@pw.udf
def chunk_texts(
texts: str | list[str],
min_tokens: int = 50,
max_tokens: int = 500,
encoding_name: str = "cl100k_base",
) -> list[str]:
"""
Splits a given string or a list of strings into chunks based on token
count.
This function tokenizes the input texts and splits them into smaller parts ("chunks")
ensuring that each chunk has a token count between `min_tokens` and
`max_tokens`. It also attempts to break chunks at sensible points such as
punctuation marks.
Arguments:
texts: string or list of strings.
min_tokens: minimum tokens in a chunk of text.
max_tokens: maximum size of a chunk in tokens.
encoding_name: name of the encoding from tiktoken.
Example:
# >>> from pathway.stdlib.ml import chunk_texts
# >>> import pathway as pw
# >>> t = pw.debug.table_from_markdown(
# ... '''| text
# ... 1| cooltext'''
# ... )
# >>> t += t.select(chunks = chunk_texts(pw.this.text, min_tokens=1, max_tokens=1))
# >>> pw.debug.compute_and_print(t, include_id=False)
# text | chunks
# cooltext | ('cool', 'text')
"""
import tiktoken

if not isinstance(texts, str):
texts = "\n".join(texts)

tokenizer = tiktoken.get_encoding(encoding_name)
text: str = texts
text = normalize_unicode(text)
tokens = tokenizer.encode_ordinary(text)
output = []
i = 0
while i < len(tokens):
chunk_tokens = tokens[i : i + max_tokens]
chunk = tokenizer.decode(chunk_tokens)
last_punctuation = max([chunk.rfind(p) for p in PUNCTUATION], default=-1)
if last_punctuation != -1 and last_punctuation > CHARS_PER_TOKEN * min_tokens:
chunk = chunk[: last_punctuation + 1]

i += len(tokenizer.encode_ordinary(chunk))

output.append(chunk)
return output


def normalize_unicode(text: str):
"""
Get rid of ligatures
"""
return unicodedata.normalize("NFKC", text)


@pw.udf
def extract_texts(data: bytes) -> list[str]:
"""
Extract text elements from binary data using the partition function from
unstructured-io.
Visit [unstructured-io](https://unstructured-io.github.io/unstructured/) to know
more.
Arguments:
data (bytes): Binary data representing the text format file.
Returns:
list[str]: A list of extracted text elements.
Example
# >>> from pathway.stdlib.ml import extract_texts
# >>> import pathway as pw
# >>> t = pw.debug.table_from_markdown(
# ... '''| text
# ... 1| cooltext'''
# ... )
# >>> t += t.select(bytes = pw.apply(str.encode, pw.this.text))
# >>> t = t.select(decoded=extract_texts(pw.this.bytes))
# >>> pw.debug.compute_and_print(t, include_id=False)
# decoded
# ('cooltext',)
"""
from unstructured.partition.auto import partition

file_like = BytesIO(data)
try:
elements = partition(file=file_like)
texts = [element.text for element in elements]
except ValueError as ve:
logging.error(f"Value Error: {str(ve)}")
return []
except Exception as e:
logging.exception(f"An unexpected error occurred: {str(e)}")
return []
return texts
Loading

0 comments on commit 3cf03c1

Please sign in to comment.