From 0ba95e480c7725f7d0819b7a20ef25e1198fce28 Mon Sep 17 00:00:00 2001 From: Paulo V C Medeiros Date: Fri, 1 Mar 2024 17:08:57 +0100 Subject: [PATCH] Further optimisations and fixes --- pyrobbot/app/app_page_templates.py | 393 +++++++++++++++-------------- pyrobbot/app/multipage.py | 85 +++++-- pyrobbot/sst_and_tts.py | 20 +- pyrobbot/voice_chat.py | 3 + 4 files changed, 292 insertions(+), 209 deletions(-) diff --git a/pyrobbot/app/app_page_templates.py b/pyrobbot/app/app_page_templates.py index a4b3a92..8c46b18 100644 --- a/pyrobbot/app/app_page_templates.py +++ b/pyrobbot/app/app_page_templates.py @@ -302,7 +302,18 @@ def play_chime(self, chime_type: str = "correct-answer-tone", parent_element=Non chime, hidden=True, autoplay=True, parent_element=parent_element ) - def render_chat_input_widgets(self): + def render_title(self): + """Render the title of the chatbot page.""" + with st.container(height=70, border=False): + self.title_container = st.empty() + with st.container(height=50, border=False): + left, _ = st.columns([0.7, 0.3]) + with left: + self.status_msg_container = st.empty() + self.title_container.subheader(self.title, divider="rainbow") + + @property + def direct_text_prompt(self): """Render chat inut widgets and return the user's input.""" placeholder = ( f"Send a message to {self.chat_obj.assistant_name} ({self.chat_obj.model})" @@ -310,30 +321,48 @@ def render_chat_input_widgets(self): with st.container(): left, right = st.columns([0.95, 0.05]) with left: - if text_prompt := st.chat_input( - placeholder=placeholder, key=f"text_input_widget_{self.page_id}" - ): - self.parent.text_prompt_queue.put({"page": self, "text": text_prompt}) - return - + text_from_chat_input_widget = st.chat_input(placeholder=placeholder) with right: - continuous_audio = st.session_state.get( - "toggle_continuous_voice_input", False - ) - continuous_audio = True # TEST - - audio = AudioSegment.empty() - if continuous_audio: - # We won't handle this here. It is handled in listen, ..., sst threads - if not self.parent.listen_thread.is_alive(): - raise ValueError("The listen thread is not alive") + text_from_manual_audio_recorder = "" + if st.session_state.get("toggle_continuous_voice_input"): + st.empty() else: - audio = self.manual_switch_mic_recorder() - if audio and ( - audio.duration_seconds > self.chat_obj.min_speech_duration_seconds - ): - new_input = {"page": self, "text": self.chat_obj.stt(audio).text} - self.parent.text_prompt_queue.put(new_input) + text_from_manual_audio_recorder = self.chat_obj.stt( + self.manual_switch_mic_recorder() + ).text + return text_from_chat_input_widget or text_from_manual_audio_recorder + + @property + def continuous_text_prompt(self): + """Wait until a promp from the continuous stream is ready and return it.""" + if not st.session_state.get("toggle_continuous_voice_input"): + return None + + if not self.parent.continuous_audio_input_engine_is_running: + logger.warning("Continuous audio input engine is not running!!!") + self.status_msg_container.error( + "The continuous audio input engine is not running!!!" + ) + return None + + logger.debug("Running on continuous audio prompt. Waiting user input...") + with self.status_msg_container: + self.play_chime() + with st.spinner(f"{self.chat_obj.assistant_name} is listening..."): + while True: + with contextlib.suppress(queue.Empty): + with self.parent.text_prompt_queue.mutex: + this_page_prompt_queue = filter_page_info_from_queue( + app_page=self, the_queue=self.parent.text_prompt_queue + ) + if prompt := this_page_prompt_queue.get_nowait()["text"]: + this_page_prompt_queue.task_done() + break + logger.trace("Still waiting for user text prompt...") + time.sleep(0.1) + + logger.debug("Done getting user input: {}", prompt) + return prompt def _render_chatbot_page(self): # noqa: PLR0915 """Render a chatbot page. @@ -346,192 +375,187 @@ def _render_chatbot_page(self): # noqa: PLR0915 question_answer_chunks_queue = queue.Queue() partial_audios_queue = queue.Queue() - with st.container(height=70, border=False): - title_container = st.empty() - with st.container(height=35, border=False): - left, _ = st.columns([0.7, 0.3]) - with left: - status_msg_container = st.empty() - - title_container.subheader(self.title, divider="rainbow") + self.render_title() chat_msgs_container = st.container(height=600, border=False) with chat_msgs_container: self.render_chat_history() - self.render_chat_input_widgets() - - with status_msg_container: - logger.debug("Waiting for user text prompt...") - self.play_chime() - with st.spinner(f"{self.chat_obj.assistant_name} is listening..."): - while True: - with contextlib.suppress(queue.Empty): - this_page_prompt_queue = filter_page_info_from_queue( - app_page=self, the_queue=self.parent.text_prompt_queue - ) - if prompt := this_page_prompt_queue.get_nowait()["text"]: - break - logger.trace("Still waiting for user text prompt...") - time.sleep(0.1) - if prompt := prompt.strip(): + direct_text_prompt = self.direct_text_prompt + continuous_stt_prompt = self.continuous_text_prompt + prompt = direct_text_prompt or continuous_stt_prompt + if prompt: + logger.opt(colors=True).debug("Recived prompt: {}", prompt) self.parent.reply_ongoing.set() + # Interrupt any ongoing reply in this page with question_answer_chunks_queue.mutex: question_answer_chunks_queue.queue.clear() with partial_audios_queue.mutex: partial_audios_queue.queue.clear() - logger.opt(colors=True).debug("Recived prompt: {}", prompt) - - self.play_chime("option-select") - status_msg_container.success("Got your message!") - time.sleep(0.5) - else: - status_msg_container.warning( + if continuous_stt_prompt: + self.play_chime("option-select") + self.status_msg_container.success("Got your message!") + time.sleep(0.5) + elif continuous_stt_prompt: + self.status_msg_container.warning( "Could not understand your message. Please try again." ) logger.opt(colors=True).debug("Received empty prompt") self.parent.reply_ongoing.clear() - with chat_msgs_container: - # Process user input - if prompt: - time_now = datetime.datetime.now().replace(microsecond=0) - self.state.update({"chat_started": True}) - # Display user message in chat message container - with st.chat_message("user", avatar=self.avatars["user"]): - st.caption(time_now) - st.markdown(prompt) - self.chat_history.append( - { - "role": "user", - "name": self.chat_obj.username, - "content": prompt, - "timestamp": time_now, - } - ) - - # Display (stream) assistant response in chat message container - with st.chat_message("assistant", avatar=self.avatars["assistant"]): - text_reply_container = st.empty() - audio_reply_container = st.empty() - - # Create threads to process text and audio replies asynchronously - answer_question_thread = threading.Thread( - target=_put_chat_reply_chunks_in_queue, - args=(self.chat_obj, prompt, question_answer_chunks_queue), - ) - play_partial_audios_thread = threading.Thread( - target=_play_queued_audios, - args=( - partial_audios_queue, - self.render_custom_audio_player, - status_msg_container, - ), - daemon=False, - ) - for thread in (answer_question_thread, play_partial_audios_thread): - add_script_run_ctx(thread) - thread.start() - - # Render the reply - chunk = "" - full_response = "" - current_audio = AudioSegment.empty() - text_reply_container.markdown("▌") - status_msg_container.empty() - while (chunk is not None) or (current_audio is not None): - logger.trace("Waiting for text or audio chunks...") - # Render text - with contextlib.suppress(queue.Empty): - chunk = question_answer_chunks_queue.get_nowait() - if chunk is not None: - full_response += chunk - text_reply_container.markdown(full_response + "▌") - question_answer_chunks_queue.task_done() - - # Render audio (if any) - with contextlib.suppress(queue.Empty): - current_audio = self.chat_obj.play_speech_queue.get_nowait() - self.chat_obj.play_speech_queue.task_done() - if current_audio is None: - partial_audios_queue.put(None) - else: - partial_audios_queue.put(current_audio.speech) - - logger.opt(colors=True).debug( - "Replied to user prompt '{}': {}", - prompt, - full_response, - ) - text_reply_container.caption( - datetime.datetime.now().replace(microsecond=0) - ) - text_reply_container.markdown(full_response) - - while play_partial_audios_thread.is_alive(): - logger.trace("Waiting for partial audios to finish playing...") - time.sleep(0.1) - - logger.debug("Getting path to full audio file...") - try: - full_audio_fpath = self.chat_obj.last_answer_full_audio_fpath.get( - timeout=2 - ) - except queue.Empty: - full_audio_fpath = None - logger.warning("Problem getting path to full audio file") - else: - logger.debug("Got path to full audio file: {}", full_audio_fpath) - self.chat_obj.last_answer_full_audio_fpath.task_done() - + if prompt: + with chat_msgs_container: + # Process user input + if prompt: + time_now = datetime.datetime.now().replace(microsecond=0) + self.state.update({"chat_started": True}) + # Display user message in chat message container + with st.chat_message("user", avatar=self.avatars["user"]): + st.caption(time_now) + st.markdown(prompt) self.chat_history.append( { - "role": "assistant", - "name": self.chat_obj.assistant_name, - "content": full_response, - "assistant_reply_audio_file": full_audio_fpath, + "role": "user", + "name": self.chat_obj.username, + "content": prompt, + "timestamp": time_now, } ) - if full_audio_fpath: - self.render_custom_audio_player( - full_audio_fpath, - parent_element=audio_reply_container, - autoplay=False, + # Display (stream) assistant response in chat message container + with st.chat_message("assistant", avatar=self.avatars["assistant"]): + text_reply_container = st.empty() + audio_reply_container = st.empty() + + # Create threads to process text and audio replies asynchronously + answer_question_thread = threading.Thread( + target=_put_chat_reply_chunks_in_queue, + args=(self.chat_obj, prompt, question_answer_chunks_queue), + ) + play_partial_audios_thread = threading.Thread( + target=_play_queued_audios, + args=( + partial_audios_queue, + self.render_custom_audio_player, + self.status_msg_container, + ), + daemon=False, + ) + for thread in ( + answer_question_thread, + play_partial_audios_thread, + ): + add_script_run_ctx(thread) + thread.start() + + # Render the reply + chunk = "" + full_response = "" + current_audio = AudioSegment.empty() + text_reply_container.markdown("▌") + self.status_msg_container.empty() + while (chunk is not None) or (current_audio is not None): + logger.trace("Waiting for text or audio chunks...") + # Render text + with contextlib.suppress(queue.Empty): + chunk = question_answer_chunks_queue.get_nowait() + if chunk is not None: + full_response += chunk + text_reply_container.markdown(full_response + "▌") + question_answer_chunks_queue.task_done() + + # Render audio (if any) + with contextlib.suppress(queue.Empty): + current_audio = ( + self.chat_obj.play_speech_queue.get_nowait() + ) + self.chat_obj.play_speech_queue.task_done() + if current_audio is None: + partial_audios_queue.put(None) + else: + partial_audios_queue.put(current_audio.speech) + + logger.opt(colors=True).debug( + "Replied to user prompt '{}': {}", + prompt, + full_response, + ) + text_reply_container.caption( + datetime.datetime.now().replace(microsecond=0) + ) + text_reply_container.markdown(full_response) + + while play_partial_audios_thread.is_alive(): + logger.trace( + "Waiting for partial audios to finish playing..." + ) + time.sleep(0.1) + + logger.debug("Getting path to full audio file...") + try: + full_audio_fpath = ( + self.chat_obj.last_answer_full_audio_fpath.get(timeout=2) + ) + except queue.Empty: + full_audio_fpath = None + logger.warning("Problem getting path to full audio file") + else: + logger.debug( + "Got path to full audio file: {}", full_audio_fpath + ) + self.chat_obj.last_answer_full_audio_fpath.task_done() + + self.chat_history.append( + { + "role": "assistant", + "name": self.chat_obj.assistant_name, + "content": full_response, + "assistant_reply_audio_file": full_audio_fpath, + } ) - # Reset title according to conversation initial contents - min_history_len_for_summary = 3 - if ( - "page_title" not in self.state - and len(self.chat_history) > min_history_len_for_summary - ): - logger.debug("Working out conversation topic...") - prompt = "Summarize the previous messages in max 4 words" - title = "".join(self.chat_obj.respond_system_prompt(prompt)) - self.chat_obj.metadata["page_title"] = title - self.chat_obj.metadata["sidebar_title"] = title - self.chat_obj.save_cache() - - self.title = title - self.sidebar_title = title - title_container.header(title, divider="rainbow") - - # Clear the prompt queue for this page, to remove old prompts - with self.parent.continuous_user_prompt_queue.mutex: - filter_page_info_from_queue( - app_page=self, the_queue=self.parent.continuous_user_prompt_queue - ) - with self.parent.text_prompt_queue.mutex: - filter_page_info_from_queue( - app_page=self, the_queue=self.parent.text_prompt_queue - ) + if full_audio_fpath: + self.render_custom_audio_player( + full_audio_fpath, + parent_element=audio_reply_container, + autoplay=False, + ) + + # Reset title according to conversation initial contents + min_history_len_for_summary = 3 + if ( + "page_title" not in self.state + and len(self.chat_history) > min_history_len_for_summary + ): + logger.debug("Working out conversation topic...") + prompt = "Summarize the previous messages in max 4 words" + title = "".join(self.chat_obj.respond_system_prompt(prompt)) + self.chat_obj.metadata["page_title"] = title + self.chat_obj.metadata["sidebar_title"] = title + self.chat_obj.save_cache() + + self.title = title + self.sidebar_title = title + self.title_container.header(title, divider="rainbow") + + # Clear the prompt queue for this page, to remove old prompts + with self.parent.continuous_user_prompt_queue.mutex: + filter_page_info_from_queue( + app_page=self, + the_queue=self.parent.continuous_user_prompt_queue, + ) + with self.parent.text_prompt_queue.mutex: + filter_page_info_from_queue( + app_page=self, the_queue=self.parent.text_prompt_queue + ) - self.parent.reply_ongoing.clear() + self.parent.reply_ongoing.clear() - if not self.parent.reply_ongoing.is_set(): - logger.debug("Rerunning the app") + if continuous_stt_prompt and not self.parent.reply_ongoing.is_set(): + logger.opt(colors=True).debug( + "Rerunning the app to wait for new input..." + ) st.rerun() def render(self): @@ -555,6 +579,7 @@ def _trim_page_padding(): self.render_cost_estimate_page() else: self._render_chatbot_page() + logger.debug("Reached the end of the chatbot page.") def filter_page_info_from_queue(app_page: AppPage, the_queue: queue.Queue): diff --git a/pyrobbot/app/multipage.py b/pyrobbot/app/multipage.py index 1539b7c..5f05c7c 100644 --- a/pyrobbot/app/multipage.py +++ b/pyrobbot/app/multipage.py @@ -254,6 +254,11 @@ def handle_stt(): if audio.duration_seconds >= chat_obj.min_speech_duration_seconds: recorded_prompt_as_txt = chat_obj.stt(audio).text if recorded_prompt_as_txt: + logger.debug( + "Audio from page '{}' transcribed '{}'. Input ready to fetch.", + info_for_stt["page"].title, + recorded_prompt_as_txt, + ) text_prompt_queue.put( {"page": info_for_stt["page"], "text": recorded_prompt_as_txt} ) @@ -285,9 +290,24 @@ def __init__(self, **kwargs) -> None: """Initialise streamlit page configs.""" st.set_page_config(**kwargs) + hide_zero_height_elements = """ + + """ + st.markdown(hide_zero_height_elements, unsafe_allow_html=True) + self.listen_thread = listen_thread self.continuous_user_prompt_thread = continuous_user_prompt_thread - if not listen_thread.is_alive(): + self.handle_stt_thread = handle_stt_thread + if ( + st.session_state.get("toggle_continuous_voice_input") + and not self.continuous_audio_input_engine_is_running + ): for thread in [ listen_thread, continuous_user_prompt_thread, @@ -304,6 +324,15 @@ def __init__(self, **kwargs) -> None: self.text_prompt_queue = text_prompt_queue self.reply_ongoing = reply_ongoing + @property + def continuous_audio_input_engine_is_running(self): + """Return whether the continuous audio input engine is running.""" + return ( + self.listen_thread.is_alive() + and self.continuous_user_prompt_thread.is_alive() + and self.handle_stt_thread.is_alive() + ) + def render_continuous_audio_input_widget(self): """Render the continuous audio input widget.""" # Definitions related to webrtc_streamer @@ -346,19 +375,27 @@ def audio_frame_callback(frame): add_script_run_ctx(audio_frame_callback) - self.stream_audio_context = streamlit_webrtc.component.webrtc_streamer( - key="sendonly-audio", - mode=WebRtcMode.SENDONLY, - rtc_configuration=rtc_configuration, - media_stream_constraints={"audio": True, "video": False}, - desired_playing_state=True, - audio_frame_callback=audio_frame_callback, - ) - - logger.debug("Waiting for the audio stream to start...") - while not self.stream_audio_context.state.playing: - time.sleep(1) - logger.debug("Audio stream started") + logger.debug("Initialising input audio stream...") + try: + self.hide_zero_height_elements() + with st.container(height=0, border=False): + self.stream_audio_context = streamlit_webrtc.component.webrtc_streamer( + key="sendonly-audio", + mode=WebRtcMode.SENDONLY, + rtc_configuration=rtc_configuration, + media_stream_constraints={"audio": True, "video": False}, + desired_playing_state=True, + audio_frame_callback=audio_frame_callback, + ) + except TypeError: + logger.opt(exception=True).error("Failed to initialise audio stream") + logger.error("Failed to initialise audio stream") + self.stream_audio_context = None + else: + logger.debug("Audio stream initialised. Waiting for it to start...") + while not self.stream_audio_context.state.playing: + time.sleep(1) + logger.debug("Audio stream started") return self.stream_audio_context @@ -588,11 +625,18 @@ def render(self, **kwargs): divider="rainbow", help="https://github.com/paulovcmedeiros/pyRobBot", ) + self.create_api_key_element() + # Create a sidebar with tabs for chats and settings tab1, tab2 = st.tabs(["Chats", "Settings for Current Chat"]) - self.sidebar_tabs = {"chats": tab1, "settings": tab2} + with tab1: + tab1_visible_container = st.container() + tab1_invisible_container = st.container(height=0, border=False) + + self.sidebar_tabs = {"chats": tab1_visible_container, "settings": tab2} + with tab1_visible_container: left, center, right = st.columns(3) with left: # Add button to show the costs table @@ -616,13 +660,9 @@ def render(self, **kwargs): key="toggle_continuous_voice_input", label=":microphone:", help="Toggle continuous voice input", - value=True, + value=False, ) - # Create a container for the continuous audio input widget, which will - # be rendered only later - continuous_audio_input_widget_container = st.container() - # Add button to create a new chat new_chat_button = st.button(label=":heavy_plus_sign: New Chat") @@ -657,7 +697,10 @@ def render(self, **kwargs): if new_chat_button or not self.pages: self.add_page() - with continuous_audio_input_widget_container: + # We'l hide the webrtc input buttom because I don't know how to customise it. + # I'll use the component "toggle_continuous_voice_input" to toggle it + if st.session_state["toggle_continuous_voice_input"]: + with tab1_invisible_container: self.render_continuous_audio_input_widget() return super().render(**kwargs) diff --git a/pyrobbot/sst_and_tts.py b/pyrobbot/sst_and_tts.py index caf0079..a0f741d 100644 --- a/pyrobbot/sst_and_tts.py +++ b/pyrobbot/sst_and_tts.py @@ -2,6 +2,7 @@ import io import socket +import uuid from dataclasses import dataclass, field from typing import Literal @@ -69,7 +70,10 @@ def _stt(self) -> str: fallback_stt_function = self._stt_openai fallback_name = "openai" - logger.debug("Converting audio to text ({} STT)...", self.engine) + conversion_id = uuid.uuid4() + logger.debug( + "Converting audio to text ({} STT). Process {}.", self.engine, conversion_id + ) try: rtn = stt_function() except ( @@ -79,16 +83,24 @@ def _stt(self) -> str: ) as error: logger.error(error) logger.error( - "Can't communicate with `{}` speech-to-text API right now", + "{}: Can't communicate with `{}` speech-to-text API right now", + conversion_id, self.engine, ) - logger.warning("Trying to use `{}` STT instead", fallback_name) + logger.warning( + "{}: Trying to use `{}` STT instead", conversion_id, fallback_name + ) rtn = fallback_stt_function() except sr.exceptions.UnknownValueError: - logger.opt(colors=True).debug("Can't understand audio") + logger.opt(colors=True).debug( + "{}: Can't understand audio", conversion_id + ) rtn = "" self._text = rtn.strip() + logger.opt(colors=True).debug( + "{}: Done with STT: {}", conversion_id, self._text + ) return self._text diff --git a/pyrobbot/voice_chat.py b/pyrobbot/voice_chat.py index 1ced0a3..0b67f44 100644 --- a/pyrobbot/voice_chat.py +++ b/pyrobbot/voice_chat.py @@ -234,6 +234,9 @@ def handle_update_audio_history(self, current_answer_audios_queue: queue.Queue): merged_audio.export(audio_file_path, format="mp3") logger.debug("File {} stored", audio_file_path) self.last_answer_full_audio_fpath.put(audio_file_path) + logger.debug( + "File {} sent to last_answer_full_audio_fpath queue", audio_file_path + ) merged_audio = AudioSegment.empty() current_answer_audios_queue.task_done()