Skip to content

Commit

Permalink
feat: ✨ OpenAI parser
Browse files Browse the repository at this point in the history
  • Loading branch information
chadell committed Oct 25, 2023
1 parent a5dee28 commit 3aba61f
Show file tree
Hide file tree
Showing 11 changed files with 722 additions and 37 deletions.
15 changes: 15 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,20 @@ By default, there is a `GenericProvider` that support a `SimpleProcessor` using

> Note: Because these providers do not support the BCOP standard natively, maybe there are some gaps on the implemented parser that will be refined with new test cases. We encourage you to report related **issues**!
#### LLM-powered Parsers

The library supports an optional parser option leveraging Large Language Model (LLM) to provide a best-effort parsing when the specific parsers have not been successful.

These LLM parsers are automatically appended as a processor option after the already available for all the Providers when the integration environmental variable is set (check the below integrations).

> These integrations may involve some costs for API usage. Use it carefully! As an order of magnitude, a parsing of an email with OpenAI GPT gpt-3.5-turbo model costs $0.004.
These are the current supported LLM integrations:

- [OpenAI](https://openai.com/product), these are the supported ENVs:
- `OPENAI_TOKEN` (Required): OpenAI token.
- `OPENAI_MODEL` (Optional): Model to use, it defaults to "gpt-3.5-turbo".

## Installation

The library is available as a Python package in pypi and can be installed with pip:
Expand Down Expand Up @@ -319,6 +333,7 @@ The project is following Network to Code software development guidelines and is
...omitted debug logs...
====================================================== 99 passed, 174 deselected, 17 warnings in 10.35s ======================================================
```

7. Run some final CI tests locally to ensure that there is no linting/formatting issues with your changes. You should look to get a code score of 10/10. See the example below: `invoke tests --local`

```
Expand Down
144 changes: 133 additions & 11 deletions circuit_maintenance_parser/parser.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,12 @@
import quopri
from typing import Dict, List
from email.utils import parsedate_tz, mktime_tz
import hashlib

import bs4 # type: ignore
from bs4.element import ResultSet # type: ignore

from pydantic import BaseModel, Extra
from pydantic import BaseModel
from icalendar import Calendar # type: ignore

from circuit_maintenance_parser.errors import ParserError
Expand All @@ -23,7 +24,7 @@
logger = logging.getLogger(__name__)


class Parser(BaseModel, extra=Extra.forbid):
class Parser(BaseModel):
"""Parser class.
A Parser handles one or more specific data type(s) (specified in `data_types`).
Expand All @@ -34,14 +35,15 @@ class Parser(BaseModel, extra=Extra.forbid):
# _data_types are used to match the Parser to to each type of DataPart
_data_types = ["text/plain", "plain"]

# TODO: move it to where it is used, Cogent parser
_geolocator = Geolocator()

@classmethod
def get_data_types(cls) -> List[str]:
"""Return the expected data type."""
return cls._data_types

def parser_hook(self, raw: bytes) -> List[Dict]:
def parser_hook(self, raw: bytes, content_type: str) -> List[Dict]:
"""Custom parser logic.
This method is used by the main `Parser` classes (such as `ICal` or `Html` parser) to define a shared
Expand All @@ -53,14 +55,14 @@ def parser_hook(self, raw: bytes) -> List[Dict]:
"""
raise NotImplementedError

def parse(self, raw: bytes) -> List[Dict]:
def parse(self, raw: bytes, content_type: str) -> List[Dict]:
"""Execute parsing.
Do not override this method!
Instead, each main `Parser` class should implement its own custom logic within the `parser_hook` method.
"""
try:
result = self.parser_hook(raw)
result = self.parser_hook(raw, content_type)
except Exception as exc:
raise ParserError from exc
if any(not partial_result for partial_result in result):
Expand All @@ -86,7 +88,7 @@ class ICal(Parser):

_data_types = ["text/calendar", "ical", "icalendar"]

def parser_hook(self, raw: bytes):
def parser_hook(self, raw: bytes, content_type: str):
"""Execute parsing."""
# iCalendar data sometimes comes encoded with base64
# TODO: add a test case
Expand Down Expand Up @@ -159,7 +161,7 @@ def remove_hex_characters(string):
"""Convert any hex characters to standard ascii."""
return string.encode("ascii", errors="ignore").decode("utf-8")

def parser_hook(self, raw: bytes):
def parser_hook(self, raw: bytes, content_type: str):
"""Execute parsing."""
result = []
soup = bs4.BeautifulSoup(quopri.decodestring(raw), features="lxml")
Expand Down Expand Up @@ -191,7 +193,7 @@ class EmailDateParser(Parser):

_data_types = [EMAIL_HEADER_DATE]

def parser_hook(self, raw: bytes):
def parser_hook(self, raw: bytes, content_type: str):
"""Execute parsing."""
parsed_date = parsedate_tz(raw.decode())
if parsed_date:
Expand All @@ -204,7 +206,7 @@ class EmailSubjectParser(Parser):

_data_types = [EMAIL_HEADER_SUBJECT]

def parser_hook(self, raw: bytes):
def parser_hook(self, raw: bytes, content_type: str):
"""Execute parsing."""
result = []
for data in self.parse_subject(self.bytes_to_string(raw).replace("\r", "").replace("\n", "")):
Expand All @@ -226,7 +228,7 @@ class Csv(Parser):

_data_types = ["application/csv", "text/csv", "application/octet-stream"]

def parser_hook(self, raw: bytes):
def parser_hook(self, raw: bytes, content_type: str):
"""Execute parsing."""
result = []
for data in self.parse_csv(raw):
Expand All @@ -245,7 +247,7 @@ class Text(Parser):

_data_types = ["text/plain"]

def parser_hook(self, raw: bytes):
def parser_hook(self, raw: bytes, content_type: str):
"""Execute parsing."""
result = []
text = self.get_text_hook(raw)
Expand All @@ -261,3 +263,123 @@ def get_text_hook(raw: bytes) -> str:
def parse_text(self, text) -> List[Dict]:
"""Custom text parsing."""
raise NotImplementedError


class LLM(Parser):
"""LLM parser."""

_data_types = ["text/html", "html", "text/plain"]

_llm_question = (
"Can you extract the maintenance_id, the account_id, the impact, the status "
"(e.g., confirmed, cancelled), the summary, the circuit ids, and the global "
"start_time and end_time as EPOCH timestamps in JSON format with keys in "
"lowercase underscore format? Reply with only the answer in JSON form and "
"include no other commentary"
)

def parser_hook(self, raw: bytes, content_type: str):
"""Execute parsing."""
result = []
if content_type in ["html", "text/html"]:
soup = bs4.BeautifulSoup(quopri.decodestring(raw), features="lxml")
content = soup.text
else:
content = self.get_text_hook(raw)
for data in self.parse_content(content):
result.append(data)
return result

@staticmethod
def get_text_hook(raw: bytes) -> str:
"""Can be overwritten by subclasses."""
return raw.decode()

@staticmethod
def get_key_with_string(dictionary: dict, string: str):
"""Returns the key in the dictionary that contains the given string."""
for key in dictionary.keys():
if string in key:
return key
return None

def get_llm_response(self, content):
"""Method to retrieve the response from the LLM for some content."""
raise NotImplementedError

def _get_impact(self, generated_json: dict):
"""Method to get a general Impact for all Circuits."""
impact_key = self.get_key_with_string(generated_json, "impact")
if impact_key:
if "no impact" in generated_json[impact_key].lower():
return Impact.NO_IMPACT

return Impact.OUTAGE

def _get_circuit_ids(self, generated_json: dict, impact: Impact):
"""Method to get the Circuit IDs and use a general Impact."""
circuits = []
circuits_ids = self.get_key_with_string(generated_json, "circuit")
for circuit_id in generated_json[circuits_ids]:
circuits.append(CircuitImpact(circuit_id=circuit_id, impact=impact))
return circuits

def _get_start(self, generated_json: dict):
"""Method to get the Start Time."""
return generated_json[self.get_key_with_string(generated_json, "start_time")]

def _get_end(self, generated_json: dict):
"""Method to get the End Time."""
return generated_json[self.get_key_with_string(generated_json, "end_time")]

def _get_summary(self, generated_json: dict):
"""Method to get the Summary."""
return generated_json[self.get_key_with_string(generated_json, "summary")]

def _get_status(self, generated_json: dict):
"""Method to get the Status."""
status_key = self.get_key_with_string(generated_json, "status")

if generated_json[status_key] == "confirmed":
return Status.CONFIRMED

return Status.CONFIRMED

def _get_account(self, generated_json: dict):
"""Method to get the Account."""
return generated_json[self.get_key_with_string(generated_json, "account")]

def _get_maintenance_id(self, generated_json: dict, start, end, circuits):
"""Method to get the Maintenance ID."""
maintenance_key = self.get_key_with_string(generated_json, "maintenance")
if maintenance_key and generated_json["maintenance_id"] != "N/A":
return generated_json["maintenance_id"]

maintenace_id = str(start) + str(end) + "".join(list(circuits))
return hashlib.md5(maintenace_id.encode("utf-8")).hexdigest() # nosec

def parse_content(self, content):
"""Parse content via LLM."""
generated_json = self.get_llm_response(content)
if not generated_json:
return []

impact = self._get_impact(generated_json)

data = {
"circuits": self._get_circuit_ids(generated_json, impact),
"start": self._get_start(generated_json),
"end": self._get_end(generated_json),
"summary": self._get_summary(generated_json),
"status": self._get_status(generated_json),
"account": self._get_account(generated_json),
}

data["maintenance_id"] = self._get_maintenance_id(
generated_json,
data["start"],
data["end"],
data["circuits"],
)

return [data]
48 changes: 48 additions & 0 deletions circuit_maintenance_parser/parsers/openai.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
"""OpenAI Parser."""
import os
import logging
import json
from typing import List, Optional

import openai

from circuit_maintenance_parser.parser import LLM

logger = logging.getLogger(__name__)


class OpenAIParser(LLM):
"""Notifications Parser powered by OpenAI ChatGPT."""

def get_llm_response(self, content) -> Optional[List]:
"""Get LLM processing from OpenAI."""
openai.api_key = os.getenv("OPENAI_TOKEN")
openai_model = os.getenv("OPENAI_MODEL", "gpt-3.5-turbo")
try:
response = openai.ChatCompletion.create(
model=openai_model,
messages=[
{
"role": "system",
"content": content,
},
{
"role": "user",
"content": self._llm_question,
},
],
)
except openai.error.InvalidRequestError as err:
logger.error(err)
return None

logger.info("Used OpenAI tokens: %s", response["usage"])
generated_text = response.choices[0].message["content"]
logger.info("Response from LLM: %s", generated_text)
try:
return json.loads(generated_text)
except ValueError as err:
logger.error(err)
return None

return None
18 changes: 9 additions & 9 deletions circuit_maintenance_parser/processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,9 +38,9 @@ def process(self, data: NotificationData, extended_data: Dict) -> Iterable[Maint
notification data may have in order to create the `Maintenance` object.
There are 2 hooks available, to be implemented by custom `Processors`:
process_hook: Method that recieves the parsed output and manipulates the extracted data. It could create
process_hook: Method that receives the parsed output and manipulates the extracted data. It could create
the final `Maintenances` or just accumulate them.
post_process_hook (optional): Used to be able to do a final action on the extracted data before returing
post_process_hook (optional): Used to be able to do a final action on the extracted data before returning
the final `Maintenances`.
Attributes:
Expand All @@ -54,13 +54,13 @@ def process(self, data: NotificationData, extended_data: Dict) -> Iterable[Maint
self.extended_data = extended_data
maintenances_data: List = []

# First, we generate a list of tuples with a `DataPart` and `Parser` if the data type from the first is
# supported by the second.
data_part_and_parser_combinations = [
(data_part, data_parser)
# First, we generate a set with the key `Parser` and `DataPart` if the data type from the first is
# supported by the second. This avoids reusing the same Parser for different data types if supported.
data_part_and_parser_combinations = {
data_parser: data_part
for (data_part, data_parser) in itertools.product(data.data_parts, self.data_parsers)
if data_part.type in data_parser.get_data_types()
]
}

if not data_part_and_parser_combinations:
error_message = (
Expand All @@ -72,9 +72,9 @@ def process(self, data: NotificationData, extended_data: Dict) -> Iterable[Maint
logger.debug(error_message)
raise ProcessorError(error_message)

for data_part, data_parser in data_part_and_parser_combinations:
for data_parser, data_part in data_part_and_parser_combinations.items():
try:
self.process_hook(data_parser().parse(data_part.content), maintenances_data)
self.process_hook(data_parser().parse(data_part.content, data_part.type), maintenances_data)

except (ParserError, ValidationError) as exc:
error_message = "Parser class %s from %s was not successful.\n%s"
Expand Down
6 changes: 5 additions & 1 deletion circuit_maintenance_parser/provider.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
"""Definition of Provider class as the entry point to the library."""
import logging
import os
import re
import traceback

Expand Down Expand Up @@ -39,7 +40,7 @@
from circuit_maintenance_parser.parsers.turkcell import HtmlParserTurkcell1
from circuit_maintenance_parser.parsers.verizon import HtmlParserVerizon1
from circuit_maintenance_parser.parsers.zayo import HtmlParserZayo1, SubjectParserZayo1

from circuit_maintenance_parser.parsers.openai import OpenAIParser

logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -122,6 +123,9 @@ def get_maintenances(self, data: NotificationData) -> Iterable[Maintenance]:
logger.debug("Skipping notification %s due filtering policy for %s.", data, self.__class__.__name__)
return []

if os.getenv("OPENAI_TOKEN"):
self._processors.append(CombinedProcessor(data_parsers=[EmailDateParser, OpenAIParser]))

for processor in self._processors:
try:
return processor.process(data, self.get_extended_data())
Expand Down
Loading

0 comments on commit 3aba61f

Please sign in to comment.