Skip to content

Commit

Permalink
feat: ✨ OpenAI parser
Browse files Browse the repository at this point in the history
  • Loading branch information
chadell committed Oct 20, 2023
1 parent a5dee28 commit 2900420
Show file tree
Hide file tree
Showing 7 changed files with 632 additions and 16 deletions.
144 changes: 133 additions & 11 deletions circuit_maintenance_parser/parser.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,12 @@
import quopri
from typing import Dict, List
from email.utils import parsedate_tz, mktime_tz
import hashlib

import bs4 # type: ignore
from bs4.element import ResultSet # type: ignore

from pydantic import BaseModel, Extra
from pydantic import BaseModel
from icalendar import Calendar # type: ignore

from circuit_maintenance_parser.errors import ParserError
Expand All @@ -23,7 +24,7 @@
logger = logging.getLogger(__name__)


class Parser(BaseModel, extra=Extra.forbid):
class Parser(BaseModel):
"""Parser class.
A Parser handles one or more specific data type(s) (specified in `data_types`).
Expand All @@ -34,14 +35,15 @@ class Parser(BaseModel, extra=Extra.forbid):
# _data_types are used to match the Parser to to each type of DataPart
_data_types = ["text/plain", "plain"]

# TODO: move it to where it is used, Cogent parser
_geolocator = Geolocator()

@classmethod
def get_data_types(cls) -> List[str]:
"""Return the expected data type."""
return cls._data_types

def parser_hook(self, raw: bytes) -> List[Dict]:
def parser_hook(self, raw: bytes, content_type: str) -> List[Dict]:
"""Custom parser logic.
This method is used by the main `Parser` classes (such as `ICal` or `Html` parser) to define a shared
Expand All @@ -53,14 +55,14 @@ def parser_hook(self, raw: bytes) -> List[Dict]:
"""
raise NotImplementedError

def parse(self, raw: bytes) -> List[Dict]:
def parse(self, raw: bytes, content_type: str) -> List[Dict]:
"""Execute parsing.
Do not override this method!
Instead, each main `Parser` class should implement its own custom logic within the `parser_hook` method.
"""
try:
result = self.parser_hook(raw)
result = self.parser_hook(raw, content_type)
except Exception as exc:
raise ParserError from exc
if any(not partial_result for partial_result in result):
Expand All @@ -86,7 +88,7 @@ class ICal(Parser):

_data_types = ["text/calendar", "ical", "icalendar"]

def parser_hook(self, raw: bytes):
def parser_hook(self, raw: bytes, content_type: str):
"""Execute parsing."""
# iCalendar data sometimes comes encoded with base64
# TODO: add a test case
Expand Down Expand Up @@ -159,7 +161,7 @@ def remove_hex_characters(string):
"""Convert any hex characters to standard ascii."""
return string.encode("ascii", errors="ignore").decode("utf-8")

def parser_hook(self, raw: bytes):
def parser_hook(self, raw: bytes, content_type: str):
"""Execute parsing."""
result = []
soup = bs4.BeautifulSoup(quopri.decodestring(raw), features="lxml")
Expand Down Expand Up @@ -191,7 +193,7 @@ class EmailDateParser(Parser):

_data_types = [EMAIL_HEADER_DATE]

def parser_hook(self, raw: bytes):
def parser_hook(self, raw: bytes, content_type: str):
"""Execute parsing."""
parsed_date = parsedate_tz(raw.decode())
if parsed_date:
Expand All @@ -204,7 +206,7 @@ class EmailSubjectParser(Parser):

_data_types = [EMAIL_HEADER_SUBJECT]

def parser_hook(self, raw: bytes):
def parser_hook(self, raw: bytes, content_type: str):
"""Execute parsing."""
result = []
for data in self.parse_subject(self.bytes_to_string(raw).replace("\r", "").replace("\n", "")):
Expand All @@ -226,7 +228,7 @@ class Csv(Parser):

_data_types = ["application/csv", "text/csv", "application/octet-stream"]

def parser_hook(self, raw: bytes):
def parser_hook(self, raw: bytes, content_type: str):
"""Execute parsing."""
result = []
for data in self.parse_csv(raw):
Expand All @@ -245,7 +247,7 @@ class Text(Parser):

_data_types = ["text/plain"]

def parser_hook(self, raw: bytes):
def parser_hook(self, raw: bytes, content_type: str):
"""Execute parsing."""
result = []
text = self.get_text_hook(raw)
Expand All @@ -261,3 +263,123 @@ def get_text_hook(raw: bytes) -> str:
def parse_text(self, text) -> List[Dict]:
"""Custom text parsing."""
raise NotImplementedError


class LLM(Parser):
"""LLM parser."""

_data_types = ["text/html", "html", "text/plain"]

_llm_question = (
"Can you extract the maintenance_id, the account_id, the impact, the status "
"(e.g., confirmed, cancelled), the summary, the {circuit ids}, and the global "
"start_time and end_time as EPOCH timestamps in JSON format with keys in "
"lowercase underscore format? Reply with only the answer in JSON form and "
"include no other commentary"
)

def parser_hook(self, raw: bytes, content_type: str):
"""Execute parsing."""
result = []
if content_type in ["html", "text/html"]:
soup = bs4.BeautifulSoup(quopri.decodestring(raw), features="lxml")
content = soup.text
else:
content = self.get_text_hook(raw)
for data in self.parse_content(content):
result.append(data)
return result

@staticmethod
def get_text_hook(raw: bytes) -> str:
"""Can be overwritten by subclasses."""
return raw.decode()

@staticmethod
def get_key_with_string(dictionary: dict, string: str):
"""Returns the key in the dictionary that contains the given string."""
for key in dictionary.keys():
if string in key:
return key
return None

def get_llm_response(self, content):
"""Method to retrieve the response from the LLM for some content."""
raise NotImplementedError

def _get_impact(self, generated_json: dict):
"""Method to get a general Impact for all Circuits."""
impact_key = self.get_key_with_string(generated_json, "impact")
if impact_key:
if "no impact" in generated_json[impact_key]:
return Impact.NO_IMPACT

return Impact.OUTAGE

def _get_circuit_ids(self, generated_json: dict, impact: Impact):
"""Method to get the Circuit IDs and use a general Impact."""
circuits = []
circuits_ids = self.get_key_with_string(generated_json, "circuit")
for circuit_id in generated_json[circuits_ids]:
circuits.append(CircuitImpact(circuit_id=circuit_id, impact=impact))
return circuits

def _get_start(self, generated_json: dict):
"""Method to get the Start Time."""
return generated_json[self.get_key_with_string(generated_json, "start_time")]

def _get_end(self, generated_json: dict):
"""Method to get the End Time."""
return generated_json[self.get_key_with_string(generated_json, "end_time")]

def _get_summary(self, generated_json: dict):
"""Method to get the Summary."""
return generated_json[self.get_key_with_string(generated_json, "summary")]

def _get_status(self, generated_json: dict):
"""Method to get the Status."""
status_key = self.get_key_with_string(generated_json, "status")

if generated_json[status_key] == "confirmed":
return Status.CONFIRMED

return Status.CONFIRMED

def _get_account(self, generated_json: dict):
"""Method to get the Account."""
return generated_json[self.get_key_with_string(generated_json, "account")]

def _get_maintenance_id(self, generated_json: dict, start, end, circuits):
"""Method to get the Maintenance ID."""
maintenance_key = self.get_key_with_string(generated_json, "maintenance")
if maintenance_key and generated_json["maintenance_id"] != "N/A":
return generated_json["maintenance_id"]

maintenace_id = str(start) + str(end) + "".join(list(circuits))
return hashlib.md5(maintenace_id.encode("utf-8")).hexdigest() # nosec

def parse_content(self, content):
"""Parse content via LLM."""
generated_json = self.get_llm_response(content)
if not generated_json:
return []

impact = self._get_impact(generated_json)

data = {
"circuits": self._get_circuit_ids(generated_json, impact),
"start": self._get_start(generated_json),
"end": self._get_end(generated_json),
"summary": self._get_summary(generated_json),
"status": self._get_status(generated_json),
"account": self._get_account(generated_json),
}

data["maintenance_id"] = self._get_maintenance_id(
generated_json,
data["start"],
data["end"],
data["circuits"],
)

return [data]
46 changes: 46 additions & 0 deletions circuit_maintenance_parser/parsers/openai.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
"""OpenAI Parser."""
import os
import logging
import json
import openai

from circuit_maintenance_parser.parser import LLM

logger = logging.getLogger(__name__)


class OpenAIParser(LLM):
"""Notifications Parser powered by OpenAI ChatGPT."""

def get_llm_response(self, content) -> list:
"""Get LLM processing from OpenAI."""
openai.api_key = os.getenv("OPENAI_TOKEN")
openai_model = os.getenv("OPENAI_MODEL", "gpt-3.5-turbo")
try:
response = openai.ChatCompletion.create(
model=openai_model,
messages=[
{
"role": "system",
"content": content,
},
{
"role": "user",
"content": self._llm_question,
},
],
)
except openai.error.InvalidRequestError as err:
logger.error(err)
return None

logger.info("Used OpenAI tokens: %s", response["usage"])
generated_text = response.choices[0].message["content"]

try:
return json.loads(generated_text)
except ValueError as err:
logger.error(err)
return None

return None
2 changes: 1 addition & 1 deletion circuit_maintenance_parser/processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ def process(self, data: NotificationData, extended_data: Dict) -> Iterable[Maint

for data_part, data_parser in data_part_and_parser_combinations:
try:
self.process_hook(data_parser().parse(data_part.content), maintenances_data)
self.process_hook(data_parser().parse(data_part.content, data_part.type), maintenances_data)

except (ParserError, ValidationError) as exc:
error_message = "Parser class %s from %s was not successful.\n%s"
Expand Down
6 changes: 5 additions & 1 deletion circuit_maintenance_parser/provider.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
"""Definition of Provider class as the entry point to the library."""
import logging
import os
import re
import traceback

Expand Down Expand Up @@ -39,7 +40,7 @@
from circuit_maintenance_parser.parsers.turkcell import HtmlParserTurkcell1
from circuit_maintenance_parser.parsers.verizon import HtmlParserVerizon1
from circuit_maintenance_parser.parsers.zayo import HtmlParserZayo1, SubjectParserZayo1

from circuit_maintenance_parser.parsers.openai import OpenAIParser

logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -122,6 +123,9 @@ def get_maintenances(self, data: NotificationData) -> Iterable[Maintenance]:
logger.debug("Skipping notification %s due filtering policy for %s.", data, self.__class__.__name__)
return []

if os.getenv("OPENAI_TOKEN"):
self._processors.append(CombinedProcessor(data_parsers=[EmailDateParser, OpenAIParser]))

for processor in self._processors:
try:
return processor.process(data, self.get_extended_data())
Expand Down
Loading

0 comments on commit 2900420

Please sign in to comment.