Skip to content
This repository has been archived by the owner on Apr 29, 2024. It is now read-only.

Commit

Permalink
Step3
Browse files Browse the repository at this point in the history
  • Loading branch information
sasha370 committed Apr 11, 2024
1 parent d64f6b9 commit 5d56aa9
Show file tree
Hide file tree
Showing 28 changed files with 316 additions and 513 deletions.
11 changes: 5 additions & 6 deletions api/endpoint.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,14 +42,14 @@ class InteractionEmbedRequest(BaseModel):

@processor.post("/api/v1/questions")
def create_question(question_event: QuestionEvent):
thread = threading.Thread(target=process_question, args=question_event)
thread = threading.Thread(target=process_question, args=(question_event,))
thread.start()
return {"message": "Question received, processing in background", "data": question_event}


@processor.post("/api/v1/feedback")
def create_feedback(feedback_event: FeedbackEvent):
thread = threading.Thread(target=process_feedback, args=feedback_event)
thread = threading.Thread(target=process_feedback, args=(feedback_event,))
thread.start()
return {"message": "Feedback received, processing in background", "data": feedback_event}

Expand All @@ -61,22 +61,21 @@ def create_embeds(EmbedRequest: EmbedRequest):
Endpoint to initiate the embedding generation and storage process in the background.
"""
page_id = EmbedRequest.page_id
print(f"*******Received embed request for ID: {page_id}") # Debugging line
thread = threading.Thread(target=vectorize_document_and_store_in_db, args=page_id)
thread = threading.Thread(target=vectorize_document_and_store_in_db, args=(page_id,))
thread.start()
return {"message": "Embedding generation initiated, processing in background", "page_id": page_id}


@processor.post("/api/v1/interaction_embeds")
def create_interaction_embeds(InteractionEmbedRequest: InteractionEmbedRequest, db_session=Depends(get_db_session)):
def create_interaction_embeds(InteractionEmbedRequest: InteractionEmbedRequest):
"""
Endpoint to initiate the embedding generation and storage process in the background.
"""
interaction_id = InteractionEmbedRequest.interaction_id
print(f"Received interaction embed request for ID: {interaction_id}") # Debugging line

# Use threading to process the embedding generation and storage without blocking the endpoint response
thread = threading.Thread(target=vectorize_interaction_and_store_in_db, args=(interaction_id, db_session))
thread = threading.Thread(target=vectorize_interaction_and_store_in_db, args=(interaction_id,))
thread.start()

# Make sure to return a response that matches what your client expects
Expand Down
23 changes: 23 additions & 0 deletions api/request.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
import requests
import logging


def post_request(url, payload, headers=None, data_type=None):
"""
Post a request to a given URL.
:param data_type: Type of payload data.
:param url: The URL to post to.
:param payload: The payload to send.
:param headers: The headers to send.
:return: The response from the server.
"""
if headers is None:
headers = {"Content-Type": "application/json"}
try:
response = requests.post(url, json=payload, headers=headers)
response.raise_for_status()
logging.info(f"INFO: {data_type} request submitted to {url}")
except requests.exceptions.HTTPError as e:
logging.error(f"ERROR: An HTTP error with {data_type} request occurred: {e}")
except Exception as e:
logging.error(f"ERROR: An error with {data_type} request occurred: {e}")
6 changes: 6 additions & 0 deletions configuration.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,12 @@ def get_project_root() -> str:
api_host = os.environ.get("NUR_API_HOST", "localhost")
api_port = int(os.environ.get("NUR_API_PORT", "8000"))

# Endpoints
embeds_endpoint = f'http://{api_host}:{api_port}/api/v1/embeds'
feedback_endpoint = f'http://{api_host}:{api_port}/api/v1/feedback'
questions_endpoint = f'http://{api_host}:{api_port}/api/v1/questions'
interaction_embeds_endpoint = f'http://{api_host}:{api_port}/api/v1/interaction_embeds'

# Name of the vector collection
pages_collection_name = "pages"
interactions_collection_name = "interactions"
Expand Down
45 changes: 15 additions & 30 deletions confluence/importer.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
from datetime import datetime
import logging
import requests
import time

from configuration import api_host, api_port
from api.request import post_request
from configuration import embeds_endpoint
from database.page_manager import PageManager
from database.space_manager import SpaceManager
from vector.create_vector_db import add_embeds_to_vector_db
Expand All @@ -16,21 +16,10 @@


def submit_embedding_creation_request(page_id: str):
endpoint_url = f'http://{api_host}:{api_port}/api/v1/embeds'
headers = {"Content-Type": "application/json"}
payload = {"page_id": page_id}
post_request(embeds_endpoint, {"page_id": page_id}, data_type='Embed')

try:
response = requests.post(endpoint_url, json=payload, headers=headers)
response.raise_for_status() # This will raise for HTTP errors
logging.info(f"Embedding creation request successful for page ID {page_id}.")
except requests.exceptions.HTTPError as e:
logging.error(f"HTTP error occurred while submitting embedding creation request for page ID {page_id}: {e}")
except Exception as e:
logging.error(f"An error occurred while submitting embedding creation request for page ID {page_id}: {e}")


def generate_missing_page_embeddings(page_manager, session, retry_limit: int = 3, wait_time: int = 5) -> None:
def generate_missing_page_embeddings(page_manager, session, retry_limit: int = 3, wait_time: int = 5) -> None:
for attempt in range(retry_limit):
# Retrieve the IDs of pages that are still missing embeddings.
page_ids = page_manager.get_page_ids_missing_embeds(session)
Expand All @@ -41,7 +30,6 @@ def generate_missing_page_embeddings(page_manager, session, retry_limit: int =

logging.info(f"Attempt {attempt + 1} of {retry_limit}: Processing {len(page_ids)} pages missing embeddings.")
for page_id in page_ids:
# Submit a request to generate an embedding for each page ID.
submit_embedding_creation_request(page_id)
# A brief delay between requests to manage load and potentially avoid rate limiting.
time.sleep(0.2)
Expand Down Expand Up @@ -79,20 +67,17 @@ def tui_choose_space():
return spaces[choice]['key'], spaces[choice]['name']


def import_space(space_key, space_name, db_session):
import_date = datetime.now().strftime('%Y-%m-%d %H:%M:%S')
pages = retrieve_space(space_key)
def import_space(space_key, space_name, session):
page_manager = PageManager()
page_manager.store_pages_data(space_key, retrieve_space(space_key), session)

with db_session as session:
page_manager = PageManager()
page_manager.store_pages_data(space_key, pages, session)
generate_missing_page_embeddings(page_manager, session)
generate_missing_page_embeddings(page_manager, session)

SpaceManager().upsert_space_info(
space_key=space_key,
space_name=space_name,
last_import_date=import_date,
session=session
)
SpaceManager().upsert_space_info(
session,
space_key=space_key,
space_name=space_name,
last_import_date=datetime.now().strftime('%Y-%m-%d %H:%M:%S'),
)

add_embeds_to_vector_db(session, space_key)
add_embeds_to_vector_db(session, space_key)
28 changes: 10 additions & 18 deletions database/bookmarked_conversation_manager.py
Original file line number Diff line number Diff line change
@@ -1,30 +1,22 @@
from sqlalchemy.exc import SQLAlchemyError
from models.bookmarked_conversation import BookmarkedConversation
from datetime import datetime, timezone


class BookmarkedConversationManager:
def __init__(self, db_session):
self.db_session = db_session
def __init__(self, session):
self.session = session

def add_bookmarked_conversation(self, title, body, thread_id):
with self.db_session as session:
BookmarkedConversation().create_or_update(session,
title=title,
body=body,
thread_id=thread_id)
BookmarkedConversation().create_or_update(self.session,
title=title,
body=body,
thread_id=thread_id)

def update_posted_on_confluence(self, thread_id):
with self.db_session as session:
BookmarkedConversation().create_or_update(session,
thread_id=thread_id,
posted_on_confluence=datetime.now(timezone.utc))
BookmarkedConversation().create_or_update(self.session,
thread_id=thread_id,
posted_on_confluence=datetime.now(timezone.utc))
print(f"Updated conversation with thread ID {thread_id} with timestamp")

def get_unposted_conversations(self):
try:
with self.db_session as session:
return session.query(BookmarkedConversation).filter_by(posted_on_confluence=None).all()
except SQLAlchemyError as e:
print(f"Error getting unposted conversations: {e}")
return None
return self.session.query(BookmarkedConversation).filter_by(posted_on_confluence=None).all()
100 changes: 3 additions & 97 deletions database/database.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,15 +11,13 @@
from contextlib import contextmanager

engine = create_engine(db_url)

SessionLocal = sessionmaker(autocommit=False, autoflush=True, bind=engine)

# from sqlalchemy.ext.declarative import declarative_base
# Base = declarative_base()

# TODO: Extract and uncomment
for model in [QAInteraction, SpaceInfo, PageData, BookmarkedConversation, QuizQuestion, UserScore]:
model.metadata.create_all(engine)

# from sqlalchemy.ext.declarative import declarative_base
# Base = declarative_base()

@contextmanager
def get_db_session():
Expand All @@ -32,95 +30,3 @@ def get_db_session():
raise
finally:
session.close()

#
# from sqlalchemy import create_engine
# from sqlalchemy.orm import sessionmaker
# from sqlalchemy.exc import SQLAlchemyError
# from configuration import db_url
# from models.qa_interaction import QAInteraction
# from models.space_info import SpaceInfo
# from models.page_data import PageData
# from models.bookmarked_conversation import BookmarkedConversation
# from models.quiz_question import QuizQuestion
# from models.user_score import UserScore
#
#
# class Database:
# """
# Class providing access to a SQLAlchemy database.
#
# This class implements the Singleton pattern for creating and managing a connection to a SQLAlchemy database.
# It provides methods for getting database sessions and accessing the SQLAlchemy Engine object.
#
# Attributes:
# engine (sqlalchemy.engine.Engine): The SQLAlchemy Engine object representing the connection to the database.
# Session (sqlalchemy.orm.Session): The SQLAlchemy session factory used for creating database sessions.
# """
#
# _instance = None
#
# def __new__(cls):
# """
# Create a new instance of the Database class.
#
# If an instance of the class has not been created yet, it is created; otherwise, the existing instance is returned.
#
# Returns:
# Database: An instance of the Database class.
# """
# if cls._instance is None:
# cls._instance = super().__new__(cls)
# cls._instance._init_engine()
# cls._instance._create_tables()
# return cls._instance
#
# def _create_tables(self):
# """
# Create tables in the database if they do not exist.
# """
# for model in [QAInteraction, SpaceInfo, PageData, BookmarkedConversation, QuizQuestion, UserScore]:
# model.metadata.create_all(self.engine)
#
# def _init_engine(self):
# """
# Initialize the SQLAlchemy Engine object and session factory.
#
# Creates the Engine object for connecting to the database and the session factory for creating database sessions.
# """
# self.engine = create_engine(db_url)
# self.Session = sessionmaker(bind=self.engine)
#
# def get_session(self):
# """
# Get a new database session.
#
# Returns:
# sqlalchemy.orm.Session: A new database session.
# """
# return self.Session()
#
# def add_object(self, obj):
# """
# Adds the given object to the database.
#
# Args:
# obj: The object to add to the database.
#
# Returns:
# int or None: The ID of the added object if successful, None otherwise.
#
# Raises:
# None
#
# """
# try:
# with self.get_session() as session:
# session.add(obj)
# session.commit()
# return obj
# except SQLAlchemyError as e:
# class_name = obj.__class__.__name__
# print(f"Error adding object of type {class_name}: {e}")
# session.rollback()
# return None
33 changes: 17 additions & 16 deletions database/interaction_manager.py
Original file line number Diff line number Diff line change
@@ -1,20 +1,18 @@
# ./database/interaction_manager.py
from datetime import datetime, timezone
from models.qa_interaction import QAInteraction
from sqlalchemy.exc import SQLAlchemyError
import json


class QAInteractionManager:
def __init__(self, db_session):
self.db_session = db_session
def __init__(self, session):
self.session = session

def add_question_and_answer(self, question, answer, thread_id, assistant_thread_id, channel_id, question_ts,
answer_ts, slack_user_id):

serialized_answer = json.dumps(answer.__dict__) if not isinstance(answer, str) else answer
QAInteraction().create_or_update(
self.db_session,
interaction = QAInteraction(
question_text=question,
thread_id=thread_id,
assistant_thread_id=assistant_thread_id,
Expand All @@ -25,6 +23,8 @@ def add_question_and_answer(self, question, answer, thread_id, assistant_thread_
comments=json.dumps([]),
slack_user_id=slack_user_id
)
self.session.add(interaction)
self.session.commit()

def add_comment_to_interaction(self, thread_id, comment):
interaction = self.get_interaction_by_thread_id(thread_id)
Expand All @@ -34,35 +34,36 @@ def add_comment_to_interaction(self, thread_id, comment):
comments = json.loads(interaction.comments)
comments.append(comment)
interaction.comments = json.dumps(comments)
interaction.update(self.db_session)
self.session.commit()

def get_interaction_by_thread_id(self, thread_id):
return QAInteraction().create_or_update(self.db_session, thread_id=thread_id)
return self.session.query(QAInteraction).filter_by(thread_id=thread_id).first()

def get_interaction_by_interaction_id(self, interaction_id):
return QAInteraction().create_or_update(self.db_session, id=interaction_id)
return self.session.query(QAInteraction).filter_by(id=interaction_id).first()

def get_interactions_by_interaction_ids(self, interaction_ids):
return self.db_session.query(QAInteraction).filter(QAInteraction.id.in_(interaction_ids)).all()
return self.session.query(QAInteraction).filter(QAInteraction.id.in_(interaction_ids)).all()

def get_qa_interactions(self):
return self.db_session.query(QAInteraction).all()
return self.session.query(QAInteraction).all()

def add_embed_to_interaction(self, interaction_id, embed):
QAInteraction().create_or_update(self.db_session,
id=interaction_id,
embed=json.dumps(embed),
last_embedded=datetime.now(timezone.utc))
interaction = self.session.query(QAInteraction).filter_by(id=interaction_id).first()
if interaction:
interaction.embed = json.dumps(embed)
interaction.last_embedded = datetime.now(timezone.utc)
self.session.commit()

def get_interactions_without_embeds(self):
return self.db_session.query(QAInteraction).filter(
return self.session.query(QAInteraction).filter(
(QAInteraction.embed.is_(None)) |
(QAInteraction.embed == json.dumps([])) |
(QAInteraction.embed == '')
).all()

def get_interactions_with_embeds(self):
return self.db_session.query(QAInteraction).filter(
return self.session.query(QAInteraction).filter(
(QAInteraction.embed.is_not(None)) |
(QAInteraction.embed == json.dumps([])) |
(QAInteraction.embed == '')
Expand Down
Loading

0 comments on commit 5d56aa9

Please sign in to comment.