Skip to content

Commit

Permalink
add email ingestor for better processing (#182)
Browse files Browse the repository at this point in the history
* add email ingestor for better processing

* fix

* fixes

* handle emails and attachments

* handle emails and attachments

* email only returns tokens
  • Loading branch information
saraswatpuneet authored Dec 10, 2023
1 parent 22a89ac commit 96457ea
Show file tree
Hide file tree
Showing 23 changed files with 419 additions and 15 deletions.
9 changes: 6 additions & 3 deletions querent/collectors/email/email_collector.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,10 @@
from querent.common import common_errors
from querent.common.types.collected_bytes import CollectedBytes
from querent.common.uri import Uri
from querent.config.collector.collector_config import CollectorBackend, EmailCollectorConfig
from querent.config.collector.collector_config import (
CollectorBackend,
EmailCollectorConfig,
)
from querent.logging.logger import setup_logger


Expand Down Expand Up @@ -53,11 +56,11 @@ async def poll(self) -> AsyncGenerator[CollectedBytes, None]:
message = response_part[1]
yield CollectedBytes(
data=message,
file=f"{self.config.imap_folder}/{i}.email",
file=f"{self.config.imap_username}:{self.config.imap_folder}/{i}.email",
)
yield CollectedBytes(
data=None,
file=f"{self.config.imap_folder}/{i}.email",
file=f"{self.config.imap_username}:{self.config.imap_folder}/{i}.email",
eof=True,
)
except imaplib.IMAP4.error as e:
Expand Down
15 changes: 15 additions & 0 deletions querent/core/base_engine.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
import asyncio
from querent.callback.event_callback_dispatcher import EventCallbackDispatcher
from querent.callback.event_callback_interface import EventCallbackInterface
from querent.common.types.ingested_images import IngestedImages
from querent.common.types.ingested_messages import IngestedMessages
from querent.common.types.ingested_tokens import IngestedTokens
from querent.common.types.ingested_code import IngestedCode
Expand Down Expand Up @@ -113,6 +114,18 @@ async def process_code(self, data: IngestedCode):
"""
raise NotImplementedError

@abstractmethod
async def process_images(self, data: IngestedImages):
"""
Process images asynchronously.
Args:
data (IngestedImages): The input data to process.
Returns:
EventState: The state of the event is set with the event type and the timestamp
of the event and set using `self.set_state(event_state)`.
"""
raise NotImplementedError

@abstractmethod
def validate(self) -> bool:
"""
Expand Down Expand Up @@ -200,6 +213,8 @@ async def _inner_worker():
await self.process_tokens(data)
elif isinstance(data, IngestedCode):
await self.process_code(data)
elif isinstance(data, IngestedImages):
await self.process_images(data)
else:
raise Exception(
f"Invalid data type {type(data)} for {self.__class__.__name__}. Supported type: {IngestedTokens, IngestedMessages}"
Expand Down
File renamed without changes.
213 changes: 213 additions & 0 deletions querent/ingestors/email/email_ingestor.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,213 @@
from typing import List, AsyncGenerator
import uuid
from PIL import Image
import pybase64
import pypdf
from querent.common import common_errors
from querent.common.types.collected_bytes import CollectedBytes
from querent.common.types.ingested_images import IngestedImages
from querent.ingestors.base_ingestor import BaseIngestor
from querent.ingestors.email.email_reader import EmailReader
from querent.ingestors.ingestor_factory import IngestorFactory
from querent.logging.logger import setup_logger
from querent.processors.async_processor import AsyncProcessor
from querent.config.ingestor.ingestor_config import IngestorBackend
from querent.common.types.ingested_tokens import IngestedTokens
import email
import pytesseract
from io import BytesIO
import io


class EmailIngestorFactory(IngestorFactory):
SUPPORTED_EXTENSIONS = {"email", "eml"}

async def supports(self, file_extension: str) -> bool:
return file_extension.lower() in self.SUPPORTED_EXTENSIONS

async def create(
self, file_extension: str, processors: List[AsyncProcessor]
) -> BaseIngestor:
if not await self.supports(file_extension):
return None
return EmailIngestor(processors)


class EmailIngestor(BaseIngestor):
def __init__(self, processors: List[AsyncProcessor]):
super().__init__(IngestorBackend.Email)
self.processors = processors
self.logger = setup_logger(__name__, "EmailIngestor")
self.email_reader = EmailReader()

async def ingest(
self, poll_function: AsyncGenerator[CollectedBytes, None]
) -> AsyncGenerator[IngestedTokens, None]:
collected_bytes = b""
current_file = None
try:
async for chunk_bytes in poll_function:
if chunk_bytes.is_error() or chunk_bytes.is_eof():
continue

if current_file is None:
current_file = chunk_bytes.file
elif current_file != chunk_bytes.file:
email = await self.extract_and_process_email(
CollectedBytes(file=current_file, data=collected_bytes)
)
yield IngestedTokens(
file=current_file,
data=email, # Wrap line in a list
error=None,
)
yield IngestedTokens(
file=current_file,
data=None,
error=None,
)
collected_bytes = b""
current_file = chunk_bytes.file
collected_bytes += chunk_bytes.data
except Exception as e:
yield IngestedTokens(file=current_file, data=None, error=f"Exception: {e}")
finally:
if current_file is not None:
email = await self.extract_and_process_email(
CollectedBytes(file=current_file, data=collected_bytes)
)
yield IngestedTokens(
file=current_file,
data=email, # Wrap line in a list
error=None,
)
yield IngestedTokens(
file=current_file,
data=None,
error=None,
)

async def extract_and_process_email(
self, collected_bytes: CollectedBytes
) -> List[str]:
text = await self.extract_text_from_email(collected_bytes)
processed_text = await self.process_data(text)
return processed_text

async def extract_text_from_email(self, collected_bytes: CollectedBytes) -> str:
text = ""
try:
msg = email.message_from_bytes(collected_bytes.data)
email_msg = {}
(
email_msg["From"],
email_msg["To"],
email_msg["Date"],
email_msg["Subject"],
) = self.email_reader.obtain_header(msg)
if msg.is_multipart():
for part in msg.walk():
content_type = part.get_content_type()
try:
body = part.get_payload(decode=True)
except Exception as e:
continue
if body is None:
continue
text += await self.handle_sub_part(part)
text += "\n"
else:
content_type = msg.get_content_type()
body = msg.get_payload(decode=True).decode()
if content_type == "text/plain":
text = self.email_reader.clean_email_body(body)
except Exception as e:
self.logger.error(f"Error extracting text from email: {e}")
return text

async def handle_sub_part(self, sub_part):
sub_content_type = sub_part.get_content_type()
sub_content_disposition = str(sub_part.get("Content-Disposition"))
if (
sub_content_type == "text/plain"
and "attachment" not in sub_content_disposition
):
return self.email_reader.clean_email_body(sub_part.get_payload())
elif "attachment" in sub_content_disposition:
# Handle attachment as needed
return await self.handle_attachment(sub_part)
elif sub_content_type == "text/html":
# Handle HTML content as needed
# You can choose to ignore or process it
return ""
else:
# Handle other content types as needed
return ""

async def handle_attachment(self, attachment_part) -> str:
# Get attachment data and type
attachment_data = attachment_part.get_payload(decode=True)
attachment_type = attachment_part.get_content_type()

# Check attachment type and handle accordingly
if attachment_type.startswith("image/"):
return await self.handle_image_attachment(attachment_data)
elif attachment_type == "application/pdf":
return await self.handle_pdf_attachment(attachment_data)
else:
# Handle other attachment types as needed
self.logger.warning(f"Unsupported attachment type: {attachment_type}")

async def handle_image_attachment(self, image_data: bytes) -> str:
try:
ocr_text = await self.get_ocr_from_image(image_data)
return ocr_text
except Exception as e:
self.logger.error(f"Error handling image attachment: {e}")

async def handle_pdf_attachment(self, pdf_data) -> str:
try:
pdf_text = await self.extract_and_process_pdf(pdf_data)
except Exception as e:
self.logger.error(f"Error handling PDF attachment: {e}")
return pdf_text

async def get_ocr_from_image(self, image):
"""Implement this to return ocr text of the image"""
image = Image.open(io.BytesIO(image))
text = pytesseract.image_to_string(image)
return str(text).encode("utf-8").decode("unicode_escape")

async def extract_and_process_pdf(self, pdf_data: bytes) -> str:
pdf_text = ""
try:
path = BytesIO(pdf_data)
loader = pypdf.PdfReader(path)

for _, page in enumerate(loader.pages):
text = page.extract_text()
pdf_text += text + "\n"
pdf_text += await self.extract_images_and_ocr(page)

except TypeError as exc:
self.logger.error(f"Exception while extracting email {exc}")
except Exception as exc:
self.logger.error(f"Exception while extracting email {exc}")
return pdf_text

async def extract_images_and_ocr(self, page) -> str:
ocr_text = ""
try:
for image_path in page.images:
ocr_text += await self.get_ocr_from_image(image_path)
except Exception as e:
self.logger.error(f"Error extracting images and OCR: {e}")
return ocr_text

async def process_data(self, text: str) -> List[str]:
if self.processors is None or len(self.processors) == 0:
return [text]
processed_data = text
for processor in self.processors:
processed_data = await processor.process_text(processed_data)
return processed_data
87 changes: 87 additions & 0 deletions querent/ingestors/email/email_reader.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
import os
import re
from email.header import decode_header

from bs4 import BeautifulSoup


class EmailReader:
def clean_email_body(self, email_body):
"""
Function to clean the email body.
Args:
email_body (str): The email body to be cleaned.
Returns:
str: The cleaned email body.
"""
if email_body is None:
email_body = ""
email_body = BeautifulSoup(email_body, "html.parser")
email_body = email_body.get_text()
email_body = "".join(email_body.splitlines())
email_body = " ".join(email_body.split())
email_body = email_body.encode("ascii", "ignore")
email_body = email_body.decode("utf-8", "ignore")
email_body = re.sub(r"http\S+", "", email_body)
return email_body

def clean(self, text):
"""
Function to clean the text.
Args:
text (str): The text to be cleaned.
Returns:
str: The cleaned text.
"""
return "".join(c if c.isalnum() else "_" for c in text)

def obtain_header(self, msg):
"""
Function to obtain the header of the email.
Args:
msg (email.message.Message): The email message.
Returns:
str: The From field of the email.
"""
if msg["Subject"] is not None:
Subject, encoding = decode_header(msg["Subject"])[0]
else:
Subject = ""
encoding = ""
if isinstance(Subject, bytes):
try:
if encoding is not None:
Subject = Subject.decode(encoding)
else:
Subject = ""
except [LookupError] as err:
pass
From = msg["From"]
To = msg["To"]
Date = msg["Date"]
return From, To, Date, Subject

def download_attachment(self, part, subject):
"""
Function to download the attachment from the email.
Args:
part (email.message.Message): The email message.
subject (str): The subject of the email.
Returns:
None
"""
filename = part.get_filename()
if filename:
folder_name = self.clean(subject)
if not os.path.isdir(folder_name):
os.mkdir(folder_name)
filepath = os.path.join(folder_name, filename)
open(filepath, "wb").write(part.get_payload(decode=True))
4 changes: 3 additions & 1 deletion querent/ingestors/ingestor_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
from querent.common.types.ingested_tokens import IngestedTokens
from querent.config.ingestor.ingestor_config import IngestorBackend
from querent.ingestors.base_ingestor import BaseIngestor
from querent.ingestors.email.email_ingestor import EmailIngestorFactory
from querent.ingestors.ingestor_factory import IngestorFactory, UnsupportedIngestor
from querent.ingestors.pdfs.pdf_ingestor_v1 import PdfIngestorFactory
from querent.ingestors.texts.text_ingestor import TextIngestorFactory
Expand Down Expand Up @@ -103,7 +104,8 @@ def __init__(
IngestorBackend.HTML.value: HtmlIngestorFactory(),
IngestorBackend.MP4.value: VideoIngestorFactory(),
IngestorBackend.GITHUB.value: GithubIngestorFactory(),
IngestorBackend.Slack.value: TextIngestorFactory(),
IngestorBackend.Slack.value: TextIngestorFactory(is_token_stream=True),
IngestorBackend.Email.value: EmailIngestorFactory(),
# Add more mappings as needed
}
self.file_caches = LRUCache(maxsize=cache_size)
Expand Down
Loading

0 comments on commit 96457ea

Please sign in to comment.