diff --git a/components/screens/signup/index.tsx b/components/screens/signup/index.tsx index 5b92a0f..acd8fd6 100644 --- a/components/screens/signup/index.tsx +++ b/components/screens/signup/index.tsx @@ -1,9 +1,16 @@ import { updateUserSettings } from "@/components/screens/auth"; +import { AppContext } from "@/context"; import { segmentTrackFinishedSignup } from "@/lib/analytics"; import { signupStyles } from "@/lib/style"; import { $Enums, Settings } from "@prisma/client"; import { Session } from "@supabase/supabase-js"; -import React, { ReactElement, useEffect, useRef, useState } from "react"; +import React, { + ReactElement, + useContext, + useEffect, + useRef, + useState, +} from "react"; import { Animated, Easing, View } from "react-native"; import { AINameSection } from "./aiName"; import { GenderSection } from "./gender"; @@ -44,6 +51,7 @@ export const SignupFlow: React.FC = ({ setShowSignupFlow, setSettings, }) => { + const { refetchToken } = useContext(AppContext); const [currentStepIndex, setCurrentStepIndex] = useState(0); const [isTransitioning, setIsTransitioning] = useState(false); const [topSection, setTopSection] = useState<"A" | "B">("A"); @@ -109,6 +117,7 @@ export const SignupFlow: React.FC = ({ session.user.id ); setSettings(settings); + await refetchToken(); if (error === null) { Animated.parallel([ diff --git a/server/api/routes/chat.py b/server/api/routes/chat.py index 791c37a..cbdd067 100644 --- a/server/api/routes/chat.py +++ b/server/api/routes/chat.py @@ -16,7 +16,8 @@ from server.api.constants import SUPABASE_AUDIO_MESSAGES_BUCKET_NAME, LLM from server.api.utils import add_memories, authorize_user, get_stream_content from prisma import Prisma, enums, types -from datetime import datetime +from pydantic import BaseModel +from datetime import datetime, timedelta from server.api.analytics import track_sent_message from server.agent.index import generate_response from server.logger.index import fetch_logger @@ -92,8 +93,14 @@ async def call_update_chat( audio_messages_enabled: bool, audio_id: Optional[str], ): - new_user_message = next(msg for msg in reversed(messages) if msg["role"] == "user") - new_user_message = message_to_fixed_string_content(new_user_message)["content"] + # We default the new_user_message to empty if the length of the messages array is 1 + # This handles the case where the agent is sending the first message in the conversation to greet the user + new_user_message = "" + if len(messages) > 1: + new_user_message = next( + msg for msg in reversed(messages) if msg["role"] == "user" + ) + new_user_message = message_to_fixed_string_content(new_user_message)["content"] data = { "new_user_message": new_user_message, @@ -129,10 +136,9 @@ def stream_and_update_chat( user_first_name: str, user_gender: str, audio_messages_enabled: bool, - audio_id: Optional[str] = None, - skip_final_processing: Optional[bool] = False, + audio_id: str, + skip_final_processing: bool, ): - user_message_timestamp = datetime.now() client = OpenAI( api_key=os.environ.get("OPENAI_API_KEY"), ) @@ -171,6 +177,8 @@ def stream_and_update_chat( content = choice.delta.content agent_response += content + user_message_timestamp = datetime.now() + agent_message_timestamp = user_message_timestamp - timedelta(seconds=1) # Run asynchronous operations in a separate thread, which is necessary to prevent the main # thread from getting blocked during synchronous tasks with high latency, like network requests. # This is important when streaming voice responses because the voice will pause in the middle of @@ -192,6 +200,7 @@ def stream_and_update_chat( user_message_timestamp=user_message_timestamp, audio_messages_enabled=audio_messages_enabled, audio_id=audio_id, + agent_message_timestamp=agent_message_timestamp, ) ), daemon=True, @@ -208,9 +217,8 @@ async def final_processing_coroutine( user_message_timestamp: datetime, audio_messages_enabled: bool, audio_id: Optional[str], + agent_message_timestamp: datetime, ) -> None: - agent_message_timestamp = datetime.now() - await call_update_chat( messages=messages, agent_response=agent_response, @@ -256,6 +264,7 @@ def stream_text( user_gender=user_gender, audio_messages_enabled=audio_messages_enabled, audio_id=audio_id, + skip_final_processing=False, ) for chunk in stream: for choice in chunk.choices: @@ -349,6 +358,7 @@ def sync_function(): user_id=user_id, chat_type="type", user_message_timestamp=user_message_timestamp, + agent_message_timestamp=datetime.timestamp(), audio_messages_enabled=audio_messages_enabled, audio_id=audio_id, ) @@ -435,15 +445,16 @@ async def handle_update_chat(request: UpdateChatRequest): audio_messages_enabled = request.audio_messages_enabled # Create new user chat message - await prisma.chatmessages.create( - data=types.ChatMessagesCreateInput( - chatId=chat_id, - role=enums.OpenAIRole.user, - content=new_user_message, - created=datetime.fromtimestamp(request.user_message_timestamp), - displayType="text", + if len(new_user_message) > 0: + await prisma.chatmessages.create( + data=types.ChatMessagesCreateInput( + chatId=chat_id, + role=enums.OpenAIRole.user, + content=new_user_message, + created=datetime.fromtimestamp(request.user_message_timestamp), + displayType="text", + ) ) - ) display_type = "audio" if audio_messages_enabled else "text" diff --git a/server/livekit_worker/llm.py b/server/livekit_worker/llm.py index 0cf8c19..101e537 100644 --- a/server/livekit_worker/llm.py +++ b/server/livekit_worker/llm.py @@ -14,7 +14,7 @@ from livekit.agents import llm from openai.types.chat.chat_completion_chunk import ChatCompletionChunk from server.agent.index import generate_response -from server.api.routes.chat import final_processing_coroutine, stream_and_update_chat +from server.api.routes.chat import stream_and_update_chat from typing import Any, Coroutine from dataclasses import dataclass from server.logger.index import fetch_logger @@ -62,6 +62,7 @@ async def wrapper(): user_gender=user_gender, audio_messages_enabled=audio_messages_enabled, audio_id=None, + skip_final_processing=True, ) it = iter(sync_gen) diff --git a/server/livekit_worker/main.py b/server/livekit_worker/main.py index a782b57..35e0ba4 100644 --- a/server/livekit_worker/main.py +++ b/server/livekit_worker/main.py @@ -21,12 +21,17 @@ import sys from pathlib import Path from fastapi import HTTPException, status -from datetime import datetime +from datetime import datetime, timedelta from sentry_sdk.integrations.asyncio import AsyncioIntegration from sentry_sdk.integrations.logging import LoggingIntegration from .voices import VoiceSettingMapping from server.logger.index import fetch_logger from dotenv import load_dotenv +from livekit.plugins.openai.llm import _build_oai_context, build_oai_message +from ..api.routes.chat import ( + final_processing_coroutine, + message_to_fixed_string_content, +) load_dotenv() @@ -200,10 +205,47 @@ async def entrypoint(ctx: JobContext): ), api_key=os.environ.get("ELEVEN_LABS_API_KEY"), ), - min_endpointing_delay=1, + min_endpointing_delay=2, chat_ctx=initial_ctx, ) + def handle_update_conversation(msg: llm.ChatMessage): + messages = _build_oai_context(assistant.chat_ctx, id(assistant)) + new_agent_message = build_oai_message(msg, id(assistant)) + new_agent_message: str = message_to_fixed_string_content(new_agent_message)[ + "content" + ] + + user_message_timestamp = datetime.now() + agent_message_timestamp = datetime.now() + timedelta(seconds=1) + + # Use asyncio.create_task to schedule the coroutine + asyncio.create_task( + final_processing_coroutine( + messages=messages, + agent_response=new_agent_message.strip(), + chat_id=chat_id, + user_id=user_id, + chat_type="voice", + user_message_timestamp=user_message_timestamp, + agent_message_timestamp=agent_message_timestamp, + audio_messages_enabled=False, + audio_id=None, + ) + ) + + # We update the database when the agent is interrupted and when the agent finishes talking + # We include the interruption because this event reliably only fires when the agent has actually + # Started talking, it does not fire if the agent has not started talking at all and the user simply + # paused long enough for the response process to begin. + @assistant.on("agent_speech_interrupted") + def on_agent_speech_interrupted(msg: llm.ChatMessage): + handle_update_conversation(msg) + + @assistant.on("agent_speech_committed") + def on_agent_speech_committed(msg: llm.ChatMessage): + handle_update_conversation(msg) + assistant.start(ctx.room, participant) if send_first_chat_message: