Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Connection error in OpenAILLMService #1108

Closed
KevGTL opened this issue Jan 29, 2025 · 12 comments
Closed

Connection error in OpenAILLMService #1108

KevGTL opened this issue Jan 29, 2025 · 12 comments
Assignees

Comments

@KevGTL
Copy link
Contributor

KevGTL commented Jan 29, 2025

Description

When running pipecat dynamic flows with a Livekit transport for a voice call over webrtc, I get errors on LLM calls to OpenAI. It started happening today, but reverting to pipecat-ai 0.0.53 doesn't fix it.

Environment

pipecat-ai version: 0.0.54
python version: 3.11
OS: MacOS 15.0.1

Repro steps

It doesn't happen on every single llm call, but i usually get the error very quickly during a conversation (during the first 3 or 4 messages).

Logs

httpcore.RemoteProtocolError: peer closed connection without sending complete message body (incomplete chunked read)


The above exception was the direct cause of the following exception:


Traceback (most recent call last):

  File "/server.py", line 95, in <module>
    uvicorn.run(app, host="0.0.0.0", port=int(os.environ.get("PORT", 8765)))
    │       │   │                             │  │       └ <function Mapping.get at 0x100e8e700>
    │       │   │                             │  └ environ({'NX_CLI_SET': 'true', '_VOLTA_TOOL_RECURSION': '1', 'NX_LOAD_DOT_ENV_FILES': 'true', 'TERM_PROGRAM': 'iTerm.app', 'L...
    │       │   │                             └ <module 'os' (frozen)>
    │       │   └ <fastapi.applications.FastAPI object at 0x16fcbdc50>
    │       └ <function run at 0x101f38d60>
    └ <module 'uvicorn' from '/venv/lib/python3.11/site-packages...

  File "/venv/lib/python3.11/site-packages/uvicorn/main.py", line 579, in run
    server.run()
    │      └ <function Server.run at 0x102604e00>
    └ <uvicorn.server.Server object at 0x17bb3a490>
  File "/venv/lib/python3.11/site-packages/uvicorn/server.py", line 66, in run
    return asyncio.run(self.serve(sockets=sockets))
           │       │   │    │             └ None
           │       │   │    └ <function Server.serve at 0x102604ea0>
           │       │   └ <uvicorn.server.Server object at 0x17bb3a490>
           │       └ <function run at 0x101bf1120>
           └ <module 'asyncio' from '/opt/homebrew/Cellar/[email protected]/3.11.9/Frameworks/Python.framework/Versions/3.11/lib/python3.11/asyn...

  File "/opt/homebrew/Cellar/[email protected]/3.11.9/Frameworks/Python.framework/Versions/3.11/lib/python3.11/asyncio/runners.py", line 190, in run
    return runner.run(main)
           │      │   └ <coroutine object Server.serve at 0x17c3dd300>
           │      └ <function Runner.run at 0x101caff60>
           └ <asyncio.runners.Runner object at 0x17b714850>

  File "/opt/homebrew/Cellar/[email protected]/3.11.9/Frameworks/Python.framework/Versions/3.11/lib/python3.11/asyncio/runners.py", line 118, in run
    return self._loop.run_until_complete(task)
           │    │     │                  └ <Task pending name='Task-1' coro=<Server.serve() running at /Users/kevin/Documents/dev/piplet-dashboard/packages/voice-platfo...
           │    │     └ <cyfunction Loop.run_until_complete at 0x17c471560>
           │    └ <uvloop.Loop running=True closed=False debug=False>
           └ <asyncio.runners.Runner object at 0x17b714850>

> File "/venv/lib/python3.11/site-packages/pipecat/processors/frame_processor.py", line 308, in __input_frame_task_handler
    await self.process_frame(frame, direction)
          │    │             │      └ <FrameDirection.DOWNSTREAM: 1>
          │    │             └ OpenAILLMContextFrame(id=19, name='OpenAILLMContextFrame#0', pts=None, context=<pipecat.processors.aggregators.openai_llm_con...
          │    └ <function BaseOpenAILLMService.process_frame at 0x17a905760>
          └ <pipecat.services.openai.OpenAILLMService object at 0x17c4c2450>
  File "/venv/lib/python3.11/site-packages/pipecat/services/openai.py", line 312, in process_frame
    await self._process_context(context)
          │    │                └ <pipecat.processors.aggregators.openai_llm_context.OpenAILLMContext object at 0x16fcbd4d0>
          │    └ <function BaseOpenAILLMService._process_context at 0x17a905440>
          └ <pipecat.services.openai.OpenAILLMService object at 0x17c4c2450>
  File "/venv/lib/python3.11/site-packages/pipecat/services/openai.py", line 215, in _process_context
    async for chunk in chunk_stream:
              │        └ <openai.AsyncStream object at 0x317821ad0>
              └ ChatCompletionChunk(id='chatcmpl-Av3F2MebQjUCxMcskoz7TynBNh3Dv', choices=[Choice(delta=ChoiceDelta(content=' vous', function_...
  File "/venv/lib/python3.11/site-packages/openai/_streaming.py", line 147, in __aiter__
    async for item in self._iterator:
              │       │    └ <async_generator object AsyncStream.__stream__ at 0x317bb16d0>
              │       └ <openai.AsyncStream object at 0x317821ad0>
              └ ChatCompletionChunk(id='chatcmpl-Av3F2MebQjUCxMcskoz7TynBNh3Dv', choices=[Choice(delta=ChoiceDelta(content=' vous', function_...
  File "/venv/lib/python3.11/site-packages/openai/_streaming.py", line 160, in __stream__
    async for sse in iterator:
              │      └ <async_generator object AsyncStream._iter_events at 0x17c6499a0>
              └ ServerSentEvent(event=None, data={"id":"chatcmpl-Av3F2MebQjUCxMcskoz7TynBNh3Dv","object":"chat.completion.chunk","created":17...
  File "/venv/lib/python3.11/site-packages/openai/_streaming.py", line 151, in _iter_events
    async for sse in self._decoder.aiter_bytes(self.response.aiter_bytes()):
              │      │    │        │           │    │        └ <function Response.aiter_bytes at 0x11079a700>
              │      │    │        │           │    └ <Response [200 OK]>
              │      │    │        │           └ <openai.AsyncStream object at 0x317821ad0>
              │      │    │        └ <function SSEDecoder.aiter_bytes at 0x17a05ea20>
              │      │    └ <openai._streaming.SSEDecoder object at 0x317864e50>
              │      └ <openai.AsyncStream object at 0x317821ad0>
              └ ServerSentEvent(event=None, data={"id":"chatcmpl-Av3F2MebQjUCxMcskoz7TynBNh3Dv","object":"chat.completion.chunk","created":17...
  File "/venv/lib/python3.11/site-packages/openai/_streaming.py", line 302, in aiter_bytes
    async for chunk in self._aiter_chunks(iterator):
              │        │    │             └ <async_generator object Response.aiter_bytes at 0x317bb1b90>
              │        │    └ <function SSEDecoder._aiter_chunks at 0x17a05eac0>
              │        └ <openai._streaming.SSEDecoder object at 0x317864e50>
              └ b'data: {"id":"chatcmpl-Av3F2MebQjUCxMcskoz7TynBNh3Dv","object":"chat.completion.chunk","created":1738160520,"model":"gpt-4o-...
  File "/venv/lib/python3.11/site-packages/openai/_streaming.py", line 313, in _aiter_chunks
    async for chunk in iterator:
              │        └ <async_generator object Response.aiter_bytes at 0x317bb1b90>
              └ b'data: {"id":"chatcmpl-Av3F2MebQjUCxMcskoz7TynBNh3Dv","object":"chat.completion.chunk","created":1738160520,"model":"gpt-4o-...
  File "/venv/lib/python3.11/site-packages/httpx/_models.py", line 931, in aiter_bytes
    async for raw_bytes in self.aiter_raw():
              │            │    └ <function Response.aiter_raw at 0x11079a8e0>
              │            └ <Response [200 OK]>
              └ b'data: {"id":"chatcmpl-Av3F2MebQjUCxMcskoz7TynBNh3Dv","object":"chat.completion.chunk","created":1738160520,"model":"gpt-4o-...
  File "/venv/lib/python3.11/site-packages/httpx/_models.py", line 989, in aiter_raw
    async for raw_stream_bytes in self.stream:
              │                   │    └ <httpx._client.BoundAsyncStream object at 0x317ec25d0>
              │                   └ <Response [200 OK]>
              └ b'data: {"id":"chatcmpl-Av3F2MebQjUCxMcskoz7TynBNh3Dv","object":"chat.completion.chunk","created":1738160520,"model":"gpt-4o-...
  File "/venv/lib/python3.11/site-packages/httpx/_client.py", line 150, in __aiter__
    async for chunk in self._stream:
              │        │    └ <httpx._transports.default.AsyncResponseStream object at 0x16fcbeb90>
              │        └ <httpx._client.BoundAsyncStream object at 0x317ec25d0>
              └ b'data: {"id":"chatcmpl-Av3F2MebQjUCxMcskoz7TynBNh3Dv","object":"chat.completion.chunk","created":1738160520,"model":"gpt-4o-...
  File "/venv/lib/python3.11/site-packages/httpx/_transports/default.py", line 256, in __aiter__
    with map_httpcore_exceptions():
         └ <function map_httpcore_exceptions at 0x11079ede0>

  File "/opt/homebrew/Cellar/[email protected]/3.11.9/Frameworks/Python.framework/Versions/3.11/lib/python3.11/contextlib.py", line 158, in __exit__
    self.gen.throw(typ, value, traceback)
    │    │   │     │    │      └ <traceback object at 0x317e73d40>
    │    │   │     │    └ RemoteProtocolError(RemoteProtocolError('peer closed connection without sending complete message body (incomplete chunked rea...
    │    │   │     └ <class 'httpcore.RemoteProtocolError'>
    │    │   └ <method 'throw' of 'generator' objects>
    │    └ <generator object map_httpcore_exceptions at 0x3178b6140>
    └ <contextlib._GeneratorContextManager object at 0x3177d8c50>

  File "/venv/lib/python3.11/site-packages/httpx/_transports/default.py", line 89, in map_httpcore_exceptions
    raise mapped_exc(message) from exc
          │          └ 'peer closed connection without sending complete message body (incomplete chunked read)'
          └ <class 'httpx.RemoteProtocolError'>
@KevGTL
Copy link
Contributor Author

KevGTL commented Jan 29, 2025

@markbackman Can this be linked to 8c0ecb8?

@markbackman
Copy link
Contributor

@markbackman Can this be linked to 8c0ecb8?

I don't think so. We've tested this extensively since that change.

Is this issue with OpenAI or OpenAI Realtime?

Can you repro using one of the dynamic examples?

@KevGTL
Copy link
Contributor Author

KevGTL commented Jan 30, 2025

@markbackman Yes I just reproduced with https://github.com/pipecat-ai/pipecat-flows/blob/main/examples/dynamic/insurance_openai.py. Same as before, sometimes it works for a couple messages, sometimes I get the error right away.

Btw I had to comment "handler": collect_age, and "transition_callback": handle_age_collection, in create_initial_node, otherwise I was getting TypeError: Object of type function is not JSON serializable when pipecat tried to serialize the node function.

Full code
 async def run_bot_livekit(self, room):
        (url, token) = await configure_livekit(room["name"])

        self.piplet_call_id = room["sid"]
        logger.info(f"Running new bot for call: piplet_call_id - {room['sid']}")

        transport = LiveKitTransport(
            url=url,
            token=token,
            room_name=room["name"],
            params=LiveKitParams(
                audio_in_channels=1,
                audio_in_enabled=True,
                audio_out_enabled=True,
                vad_analyzer=SileroVADAnalyzer(),
                vad_enabled=True,
                vad_audio_passthrough=True,
            ),
        )

        llm = OpenAILLMService(api_key=self.openaiApiKey, model="gpt-4o")

        stt = DeepgramSTTService(
            api_key=self.deepgramApiKey,
            audio_passthrough=True,
            live_options=LiveOptions(
                language=Language.FR,
            ),
        )

        tts = CartesiaTTSService(
            api_key=Config.CARTESIA_API_KEY,
            voice_id="34a34fd0-157b-4470-aaea-db59a8efcfb9",
            model="sonic",
            params=CartesiaTTSService.InputParams(language=Language.FR),
        )

        context = OpenAILLMContext()
        context_aggregator = llm.create_context_aggregator(context)

        pipeline = Pipeline(
            [
                transport.input(),  # Websocket input from client
                stt,  # Speech-To-Text
                context_aggregator.user(),
                llm,  # LLM
                tts,  # Text-To-Speech
                transport.output(),  # Websocket output to client
                context_aggregator.assistant(),
            ]
        )

        class InsuranceQuote(TypedDict):
            monthly_premium: float
            coverage_amount: int
            deductible: int

        class AgeCollectionResult(FlowResult):
            age: int

        class MaritalStatusResult(FlowResult):
            marital_status: str

        class QuoteCalculationResult(FlowResult, InsuranceQuote):
            pass

        class CoverageUpdateResult(FlowResult, InsuranceQuote):
            pass

        # Simulated insurance data
        INSURANCE_RATES = {
            "young_single": {"base_rate": 150, "risk_multiplier": 1.5},
            "young_married": {"base_rate": 130, "risk_multiplier": 1.3},
            "adult_single": {"base_rate": 100, "risk_multiplier": 1.0},
            "adult_married": {"base_rate": 90, "risk_multiplier": 0.9},
        }

        # Function handlers
        async def collect_age(args: FlowArgs) -> AgeCollectionResult:
            """Process age collection."""
            age = args["age"]
            logger.debug(f"collect_age handler executing with age: {age}")
            return AgeCollectionResult(age=age)

        async def collect_marital_status(args: FlowArgs) -> MaritalStatusResult:
            """Process marital status collection."""
            status = args["marital_status"]
            logger.debug(
                f"collect_marital_status handler executing with status: {status}"
            )
            return MaritalStatusResult(marital_status=status)

        async def calculate_quote(args: FlowArgs) -> QuoteCalculationResult:
            """Calculate insurance quote based on age and marital status."""
            age = args["age"]
            marital_status = args["marital_status"]
            logger.debug(
                f"calculate_quote handler executing with age: {age}, status: {marital_status}"
            )

            # Determine rate category
            age_category = "young" if age < 25 else "adult"
            rate_key = f"{age_category}_{marital_status}"
            rates = INSURANCE_RATES.get(rate_key, INSURANCE_RATES["adult_single"])

            # Calculate quote
            monthly_premium = rates["base_rate"] * rates["risk_multiplier"]

            return {
                "monthly_premium": monthly_premium,
                "coverage_amount": 250000,
                "deductible": 1000,
            }

        async def update_coverage(args: FlowArgs) -> CoverageUpdateResult:
            """Update coverage options and recalculate premium."""
            coverage_amount = args["coverage_amount"]
            deductible = args["deductible"]
            logger.debug(
                f"update_coverage handler executing with amount: {coverage_amount}, deductible: {deductible}"
            )

            # Calculate adjusted quote
            monthly_premium = (coverage_amount / 250000) * 100
            if deductible > 1000:
                monthly_premium *= 0.9  # 10% discount for higher deductible

            return {
                "monthly_premium": monthly_premium,
                "coverage_amount": coverage_amount,
                "deductible": deductible,
            }

        async def end_quote() -> FlowResult:
            """Handle quote completion."""
            logger.debug("end_quote handler executing")
            return {"status": "completed"}

        # Transition callbacks and handlers
        async def handle_age_collection(args: Dict, flow_manager: FlowManager):
            flow_manager.state["age"] = args["age"]
            await flow_manager.set_node("marital_status", create_marital_status_node())

        async def handle_marital_status_collection(
            args: Dict, flow_manager: FlowManager
        ):
            flow_manager.state["marital_status"] = args["marital_status"]
            await flow_manager.set_node(
                "quote_calculation",
                create_quote_calculation_node(
                    flow_manager.state["age"], flow_manager.state["marital_status"]
                ),
            )

        async def handle_quote_calculation(args: Dict, flow_manager: FlowManager):
            quote = await calculate_quote(args)
            flow_manager.state["quote"] = quote
            await flow_manager.set_node(
                "quote_results", create_quote_results_node(quote)
            )

        async def handle_end_quote(_: Dict, flow_manager: FlowManager):
            await flow_manager.set_node("end", create_end_node())

        # Node configurations
        def create_initial_node() -> NodeConfig:
            """Create the initial node asking for age."""
            return {
                "role_messages": [
                    {
                        "role": "system",
                        "content": """You are a friendly insurance agent. Your responses will be
                            converted to audio, so avoid special characters. Always use
                            the available functions to progress the conversation naturally.""",
                    }
                ],
                "task_messages": [
                    {
                        "role": "system",
                        "content": "Start by asking for the customer's age.",
                    }
                ],
                "functions": [
                    {
                        "type": "function",
                        "function": {
                            "name": "collect_age",
                            # "handler": collect_age,
                            "description": "Record customer's age",
                            "parameters": {
                                "type": "object",
                                "properties": {"age": {"type": "integer"}},
                                "required": ["age"],
                            },
                            # "transition_callback": handle_age_collection,
                        },
                    }
                ],
            }

        def create_marital_status_node() -> NodeConfig:
            """Create node for collecting marital status."""
            return {
                "task_messages": [
                    {
                        "role": "system",
                        "content": "Ask about the customer's marital status for premium calculation.",
                    }
                ],
                "functions": [
                    {
                        "type": "function",
                        "function": {
                            "name": "collect_marital_status",
                            "handler": collect_marital_status,
                            "description": "Record marital status",
                            "parameters": {
                                "type": "object",
                                "properties": {
                                    "marital_status": {
                                        "type": "string",
                                        "enum": ["single", "married"],
                                    }
                                },
                                "required": ["marital_status"],
                            },
                            "transition_callback": handle_marital_status_collection,
                        },
                    }
                ],
            }

        def create_quote_calculation_node(age: int, marital_status: str) -> NodeConfig:
            """Create node for calculating initial quote."""
            return {
                "task_messages": [
                    {
                        "role": "system",
                        "content": (
                            f"Calculate a quote for {age} year old {marital_status} customer. "
                            "First, call calculate_quote with their information. "
                            "Then explain the quote details and ask if they'd like to adjust coverage."
                        ),
                    }
                ],
                "functions": [
                    {
                        "type": "function",
                        "function": {
                            "name": "calculate_quote",
                            "handler": calculate_quote,
                            "description": "Calculate initial insurance quote",
                            "parameters": {
                                "type": "object",
                                "properties": {
                                    "age": {"type": "integer"},
                                    "marital_status": {
                                        "type": "string",
                                        "enum": ["single", "married"],
                                    },
                                },
                                "required": ["age", "marital_status"],
                            },
                            "transition_callback": handle_quote_calculation,
                        },
                    }
                ],
            }

        def create_quote_results_node(
            quote: Union[QuoteCalculationResult, CoverageUpdateResult],
        ) -> NodeConfig:
            """Create node for showing quote and adjustment options."""
            return {
                "task_messages": [
                    {
                        "role": "system",
                        "content": (
                            f"Quote details:\n"
                            f"Monthly Premium: ${quote['monthly_premium']:.2f}\n"
                            f"Coverage Amount: ${quote['coverage_amount']:,}\n"
                            f"Deductible: ${quote['deductible']:,}\n\n"
                            "Explain these quote details to the customer. When they request changes, "
                            "use update_coverage to recalculate their quote. Explain how their "
                            "changes affected the premium and compare it to their previous quote. "
                            "Ask if they'd like to make any other adjustments or if they're ready "
                            "to end the quote process."
                        ),
                    }
                ],
                "functions": [
                    {
                        "type": "function",
                        "function": {
                            "name": "update_coverage",
                            "handler": update_coverage,
                            "description": "Recalculate quote with new coverage options",
                            "parameters": {
                                "type": "object",
                                "properties": {
                                    "coverage_amount": {"type": "integer"},
                                    "deductible": {"type": "integer"},
                                },
                                "required": ["coverage_amount", "deductible"],
                            },
                        },
                    },
                    {
                        "type": "function",
                        "function": {
                            "name": "end_quote",
                            "handler": end_quote,
                            "description": "Complete the quote process",
                            "parameters": {"type": "object", "properties": {}},
                            "transition_callback": handle_end_quote,
                        },
                    },
                ],
            }

        def create_end_node() -> NodeConfig:
            """Create the final node."""
            return {
                "task_messages": [
                    {
                        "role": "system",
                        "content": (
                            "Thank the customer for their time and end the conversation. "
                            "Mention that a representative will contact them about the quote."
                        ),
                    }
                ],
                "functions": [],
                "post_actions": [{"type": "end_conversation"}],
            }

        task = PipelineTask(
            pipeline,
            params=PipelineParams(allow_interruptions=True),
        )

        flow_manager = FlowManager(
            task=task, llm=llm, context_aggregator=context_aggregator, tts=tts
        )

        @transport.event_handler("on_first_participant_joined")
        async def on_first_participant_joined(transport, participant):
            # await transport.capture_participant_transcription(participant["id"])
            logger.debug("Initializing flow")
            await flow_manager.initialize()
            logger.debug("Setting initial node")
            await flow_manager.set_node("initial", create_initial_node())

        runner = PipelineRunner(handle_sigint=False)
        await runner.run(task)
Error trace

File "/Users/kevin/Documents/dev/piplet-dashboard/packages/voice-platform/server.py", line 95, in <module>
    uvicorn.run(app, host="0.0.0.0", port=int(os.environ.get("PORT", 8765)))
    │       │   │                             │  │       └ <function Mapping.get at 0x102ff6700>
    │       │   │                             │  └ environ({'NX_CLI_SET': 'true', '_VOLTA_TOOL_RECURSION': '1', 'NX_LOAD_DOT_ENV_FILES': 'true', 'TERM_PROGRAM': 'iTerm.app', 'L...
    │       │   │                             └ <module 'os' (frozen)>
    │       │   └ <fastapi.applications.FastAPI object at 0x107e921d0>
    │       └ <function run at 0x105998d60>
    └ <module 'uvicorn' from '/Users/kevin/Documents/dev/piplet-dashboard/packages/voice-platform/venv/lib/python3.11/site-packages...

  File "/Users/kevin/Documents/dev/piplet-dashboard/packages/voice-platform/venv/lib/python3.11/site-packages/uvicorn/main.py", line 579, in run
    server.run()
    │      └ <function Server.run at 0x105a04e00>
    └ <uvicorn.server.Server object at 0x1624d5010>
  File "/Users/kevin/Documents/dev/piplet-dashboard/packages/voice-platform/venv/lib/python3.11/site-packages/uvicorn/server.py", line 66, in run
    return asyncio.run(self.serve(sockets=sockets))
           │       │   │    │             └ None
           │       │   │    └ <function Server.serve at 0x105a04ea0>
           │       │   └ <uvicorn.server.Server object at 0x1624d5010>
           │       └ <function run at 0x103d11120>
           └ <module 'asyncio' from '/opt/homebrew/Cellar/[email protected]/3.11.9/Frameworks/Python.framework/Versions/3.11/lib/python3.11/asyn...

  File "/opt/homebrew/Cellar/[email protected]/3.11.9/Frameworks/Python.framework/Versions/3.11/lib/python3.11/asyncio/runners.py", line 190, in run
    return runner.run(main)
           │      │   └ <coroutine object Server.serve at 0x164024f40>
           │      └ <function Runner.run at 0x103dcff60>
           └ <asyncio.runners.Runner object at 0x164051fd0>

  File "/opt/homebrew/Cellar/[email protected]/3.11.9/Frameworks/Python.framework/Versions/3.11/lib/python3.11/asyncio/runners.py", line 118, in run
    return self._loop.run_until_complete(task)
           │    │     │                  └ <Task pending name='Task-1' coro=<Server.serve() running at /Users/kevin/Documents/dev/piplet-dashboard/packages/voice-platfo...
           │    │     └ <cyfunction Loop.run_until_complete at 0x16408d560>
           │    └ <uvloop.Loop running=True closed=False debug=False>
           └ <asyncio.runners.Runner object at 0x164051fd0>

> File "/Users/kevin/Documents/dev/piplet-dashboard/packages/voice-platform/venv/lib/python3.11/site-packages/pipecat/utils/asyncio.py", line 32, in run_coroutine
    await coroutine
          └ <coroutine object FrameProcessor.__input_frame_task_handler at 0x1648086d0>
  File "/Users/kevin/Documents/dev/piplet-dashboard/packages/voice-platform/venv/lib/python3.11/site-packages/pipecat/processors/frame_processor.py", line 322, in __input_frame_task_handler
    await self.process_frame(frame, direction)
          │    │             │      └ <FrameDirection.DOWNSTREAM: 1>
          │    │             └ OpenAILLMContextFrame(id=6245, name='OpenAILLMContextFrame#8', pts=None, context=<pipecat.processors.aggregators.openai_llm_c...
          │    └ <function BaseOpenAILLMService.process_frame at 0x1624bfb00>
          └ <pipecat.services.openai.OpenAILLMService object at 0x1640f76d0>
  File "/Users/kevin/Documents/dev/piplet-dashboard/packages/voice-platform/venv/lib/python3.11/site-packages/pipecat/services/openai.py", line 312, in process_frame
    await self._process_context(context)
          │    │                └ <pipecat.processors.aggregators.openai_llm_context.OpenAILLMContext object at 0x1641d7b90>
          │    └ <function BaseOpenAILLMService._process_context at 0x1624bf9c0>
          └ <pipecat.services.openai.OpenAILLMService object at 0x1640f76d0>
  File "/Users/kevin/Documents/dev/piplet-dashboard/packages/voice-platform/venv/lib/python3.11/site-packages/pipecat/services/openai.py", line 215, in _process_context
    async for chunk in chunk_stream:
              │        └ <openai.AsyncStream object at 0x1624d52d0>
              └ ChatCompletionChunk(id='chatcmpl-AvMDcD6c1wvzKuYJvWHIKxFsProGA', choices=[Choice(delta=ChoiceDelta(content=' your', function_...
  File "/Users/kevin/Documents/dev/piplet-dashboard/packages/voice-platform/venv/lib/python3.11/site-packages/openai/_streaming.py", line 147, in __aiter__
    async for item in self._iterator:
              │       │    └ <async_generator object AsyncStream.__stream__ at 0x16400b350>
              │       └ <openai.AsyncStream object at 0x1624d52d0>
              └ ChatCompletionChunk(id='chatcmpl-AvMDcD6c1wvzKuYJvWHIKxFsProGA', choices=[Choice(delta=ChoiceDelta(content=' your', function_...
  File "/Users/kevin/Documents/dev/piplet-dashboard/packages/voice-platform/venv/lib/python3.11/site-packages/openai/_streaming.py", line 160, in __stream__
    async for sse in iterator:
              │      └ <async_generator object AsyncStream._iter_events at 0x164811d20>
              └ ServerSentEvent(event=None, data={"id":"chatcmpl-AvMDcD6c1wvzKuYJvWHIKxFsProGA","object":"chat.completion.chunk","created":17...
  File "/Users/kevin/Documents/dev/piplet-dashboard/packages/voice-platform/venv/lib/python3.11/site-packages/openai/_streaming.py", line 151, in _iter_events
    async for sse in self._decoder.aiter_bytes(self.response.aiter_bytes()):
              │      │    │        │           │    │        └ <function Response.aiter_bytes at 0x11058c720>
              │      │    │        │           │    └ <Response [200 OK]>
              │      │    │        │           └ <openai.AsyncStream object at 0x1624d52d0>
              │      │    │        └ <function SSEDecoder.aiter_bytes at 0x137e75b20>
              │      │    └ <openai._streaming.SSEDecoder object at 0x164109550>
              │      └ <openai.AsyncStream object at 0x1624d52d0>
              └ ServerSentEvent(event=None, data={"id":"chatcmpl-AvMDcD6c1wvzKuYJvWHIKxFsProGA","object":"chat.completion.chunk","created":17...
  File "/Users/kevin/Documents/dev/piplet-dashboard/packages/voice-platform/venv/lib/python3.11/site-packages/openai/_streaming.py", line 302, in aiter_bytes
    async for chunk in self._aiter_chunks(iterator):
              │        │    │             └ <async_generator object Response.aiter_bytes at 0x16400a640>
              │        │    └ <function SSEDecoder._aiter_chunks at 0x137e75bc0>
              │        └ <openai._streaming.SSEDecoder object at 0x164109550>
              └ b'data: {"id":"chatcmpl-AvMDcD6c1wvzKuYJvWHIKxFsProGA","object":"chat.completion.chunk","created":1738233468,"model":"gpt-4o-...
  File "/Users/kevin/Documents/dev/piplet-dashboard/packages/voice-platform/venv/lib/python3.11/site-packages/openai/_streaming.py", line 313, in _aiter_chunks
    async for chunk in iterator:
              │        └ <async_generator object Response.aiter_bytes at 0x16400a640>
              └ b'data: {"id":"chatcmpl-AvMDcD6c1wvzKuYJvWHIKxFsProGA","object":"chat.completion.chunk","created":1738233468,"model":"gpt-4o-...
  File "/Users/kevin/Documents/dev/piplet-dashboard/packages/voice-platform/venv/lib/python3.11/site-packages/httpx/_models.py", line 931, in aiter_bytes
    async for raw_bytes in self.aiter_raw():
              │            │    └ <function Response.aiter_raw at 0x11058c900>
              │            └ <Response [200 OK]>
              └ b'data: {"id":"chatcmpl-AvMDcD6c1wvzKuYJvWHIKxFsProGA","object":"chat.completion.chunk","created":1738233468,"model":"gpt-4o-...
  File "/Users/kevin/Documents/dev/piplet-dashboard/packages/voice-platform/venv/lib/python3.11/site-packages/httpx/_models.py", line 989, in aiter_raw
    async for raw_stream_bytes in self.stream:
              │                   │    └ <httpx._client.BoundAsyncStream object at 0x1648ad090>
              │                   └ <Response [200 OK]>
              └ b'data: {"id":"chatcmpl-AvMDcD6c1wvzKuYJvWHIKxFsProGA","object":"chat.completion.chunk","created":1738233468,"model":"gpt-4o-...
  File "/Users/kevin/Documents/dev/piplet-dashboard/packages/voice-platform/venv/lib/python3.11/site-packages/httpx/_client.py", line 150, in __aiter__
    async for chunk in self._stream:
              │        │    └ <httpx._transports.default.AsyncResponseStream object at 0x1641c2e50>
              │        └ <httpx._client.BoundAsyncStream object at 0x1648ad090>
              └ b'data: {"id":"chatcmpl-AvMDcD6c1wvzKuYJvWHIKxFsProGA","object":"chat.completion.chunk","created":1738233468,"model":"gpt-4o-...
  File "/Users/kevin/Documents/dev/piplet-dashboard/packages/voice-platform/venv/lib/python3.11/site-packages/httpx/_transports/default.py", line 256, in __aiter__
    with map_httpcore_exceptions():
         └ <function map_httpcore_exceptions at 0x1105a0e00>

  File "/opt/homebrew/Cellar/[email protected]/3.11.9/Frameworks/Python.framework/Versions/3.11/lib/python3.11/contextlib.py", line 158, in __exit__
    self.gen.throw(typ, value, traceback)
    │    │   │     │    │      └ <traceback object at 0x164818200>
    │    │   │     │    └ RemoteProtocolError(RemoteProtocolError('peer closed connection without sending complete message body (incomplete chunked rea...
    │    │   │     └ <class 'httpcore.RemoteProtocolError'>
    │    │   └ <method 'throw' of 'generator' objects>
    │    └ <generator object map_httpcore_exceptions at 0x16482c440>
    └ <contextlib._GeneratorContextManager object at 0x1648ae2d0>

  File "/Users/kevin/Documents/dev/piplet-dashboard/packages/voice-platform/venv/lib/python3.11/site-packages/httpx/_transports/default.py", line 89, in map_httpcore_exceptions
    raise mapped_exc(message) from exc
          │          └ 'peer closed connection without sending complete message body (incomplete chunked read)'
          └ <class 'httpx.RemoteProtocolError'>

httpx.RemoteProtocolError: peer closed connection without sending complete message body (incomplete chunked read)

@markbackman
Copy link
Contributor

What version of Flows are you using? On main, the examples have changed to match the code on main. There was a breaking change. This could explain the issue. I have no issue running the code you shared (or the examples in the example directory on main).

@markbackman markbackman self-assigned this Jan 30, 2025
@Raphyo
Copy link

Raphyo commented Jan 30, 2025

@markbackman on my side, I am not using Flows. What is breaking change you mentioned above ?
Thanks

@markbackman
Copy link
Contributor

@KevGTL I just released Flows 0.0.12: https://github.com/pipecat-ai/pipecat-flows/releases/tag/v0.0.12. Try updating to it to see if it solves your issues. Also, you need to include handlers and especially the transition_callback in your NodeConfig or else you can't move through the graph.

@markbackman
Copy link
Contributor

@markbackman on my side, I am not using Flows. What is breaking change you mentioned above ? Thanks

There's a specific breaking change in the Flows code. If you're not using Flows, it won't affect you.

I see no issues in connecting to OpenAI via the OpenAILLMService.

@Raphyo
Copy link

Raphyo commented Jan 30, 2025

I reverted to version 0.0.53 and still seeing the issue..
@markbackman Looking at the trace below, do you have any idea on what could cause this issue ?
thanks

15:04:42.965 | ERROR | pipecat.processors.frame_processor:__input_frame_task_handler:321 - OpenAILLMService#0: Uncaught exception peer closed connection without sending complete message body (incomplete chunked read)
2025-01-30T15:04:42Z app[2865135bd93e58] cdg [info]Traceback (most recent call last):
2025-01-30T15:04:42Z app[2865135bd93e58] cdg [info] File "/usr/local/lib/python3.11/site-packages/httpx/_transports/default.py", line 72, in map_httpcore_exceptions
2025-01-30T15:04:42Z app[2865135bd93e58] cdg [info] yield
2025-01-30T15:04:42Z app[2865135bd93e58] cdg [info] File "/usr/local/lib/python3.11/site-packages/httpx/_transports/default.py", line 257, in aiter
2025-01-30T15:04:42Z app[2865135bd93e58] cdg [info] async for part in self._httpcore_stream:
2025-01-30T15:04:42Z app[2865135bd93e58] cdg [info] │ │ └ <httpcore._async.connection_pool.PoolByteStream object at 0x7fbef7730c90>
2025-01-30T15:04:42Z app[2865135bd93e58] cdg [info] │ └ <httpx._transports.default.AsyncResponseStream object at 0x7fbef77491d0>
2025-01-30T15:04:42Z app[2865135bd93e58] cdg [info] └ b'data: {"id":"chatcmpl-AvQNr8grPHRSdX3WYt0FFxf039KvX","object":"chat.completion.chunk","created":1738249479,"model":"gpt-4o-...
2025-01-30T15:04:42Z app[2865135bd93e58] cdg [info] File "/usr/local/lib/python3.11/site-packages/httpcore/_async/connection_pool.py", line 407, in aiter
2025-01-30T15:04:42Z app[2865135bd93e58] cdg [info] raise exc from None
2025-01-30T15:04:42Z app[2865135bd93e58] cdg [info] File "/usr/local/lib/python3.11/site-packages/httpcore/_async/connection_pool.py", line 403, in aiter
2025-01-30T15:04:42Z app[2865135bd93e58] cdg [info] async for part in self._stream:
2025-01-30T15:04:42Z app[2865135bd93e58] cdg [info] │ │ └ <httpcore._async.http11.HTTP11ConnectionByteStream object at 0x7fbef8105dd0>
2025-01-30T15:04:42Z app[2865135bd93e58] cdg [info] │ └ <httpcore._async.connection_pool.PoolByteStream object at 0x7fbef7730c90>
2025-01-30T15:04:42Z app[2865135bd93e58] cdg [info] └ b'data: {"id":"chatcmpl-AvQNr8grPHRSdX3WYt0FFxf039KvX","object":"chat.completion.chunk","created":1738249479,"model":"gpt-4o-...
2025-01-30T15:04:42Z app[2865135bd93e58] cdg [info] File "/usr/local/lib/python3.11/site-packages/httpcore/_async/http11.py", line 342, in aiter
2025-01-30T15:04:42Z app[2865135bd93e58] cdg [info] raise exc
2025-01-30T15:04:42Z app[2865135bd93e58] cdg [info] File "/usr/local/lib/python3.11/site-packages/httpcore/_async/http11.py", line 334, in aiter
2025-01-30T15:04:42Z app[2865135bd93e58] cdg [info] async for chunk in self._connection._receive_response_body(**kwargs):
2025-01-30T15:04:42Z app[2865135bd93e58] cdg [info] │ │ │ │ └ {'request': <Request [b'POST']>}
2025-01-30T15:04:42Z app[2865135bd93e58] cdg [info] │ │ │ └ <function AsyncHTTP11Connection._receive_response_body at 0x7fbefb6759e0>
2025-01-30T15:04:42Z app[2865135bd93e58] cdg [info] │ │ └ <AsyncHTTP11Connection ['https://api.openai.com:443', CLOSED, Request Count: 4]>
2025-01-30T15:04:42Z app[2865135bd93e58] cdg [info] │ └ <httpcore._async.http11.HTTP11ConnectionByteStream object at 0x7fbef8105dd0>
2025-01-30T15:04:42Z app[2865135bd93e58] cdg [info] └ b'data: {"id":"chatcmpl-AvQNr8grPHRSdX3WYt0FFxf039KvX","object":"chat.completion.chunk","created":1738249479,"model":"gpt-4o-...
2025-01-30T15:04:42Z app[2865135bd93e58] cdg [info] File "/usr/local/lib/python3.11/site-packages/httpcore/_async/http11.py", line 203, in _receive_response_body
2025-01-30T15:04:42Z app[2865135bd93e58] cdg [info] event = await self._receive_event(timeout=timeout)
2025-01-30T15:04:42Z app[2865135bd93e58] cdg [info] │ │ └ 600.0
2025-01-30T15:04:42Z app[2865135bd93e58] cdg [info] │ └ <function AsyncHTTP11Connection._receive_event at 0x7fbefb675a80>
2025-01-30T15:04:42Z app[2865135bd93e58] cdg [info] └ <AsyncHTTP11Connection ['https://api.openai.com:443', CLOSED, Request Count: 4]>
2025-01-30T15:04:42Z app[2865135bd93e58] cdg [info] File "/usr/local/lib/python3.11/site-packages/httpcore/_async/http11.py", line 213, in _receive_event
2025-01-30T15:04:42Z app[2865135bd93e58] cdg [info] with map_exceptions({h11.RemoteProtocolError: RemoteProtocolError}):
2025-01-30T15:04:42Z app[2865135bd93e58] cdg [info] │ │ │ └ <class 'httpcore.RemoteProtocolError'>
2025-01-30T15:04:42Z app[2865135bd93e58] cdg [info] │ │ └ <class 'h11._util.RemoteProtocolError'>
2025-01-30T15:04:42Z app[2865135bd93e58] cdg [info] │ └ <module 'h11' from '/usr/local/lib/python3.11/site-packages/h11/init.py'>
2025-01-30T15:04:42Z app[2865135bd93e58] cdg [info] └ <function map_exceptions at 0x7fbefb7e6160>
2025-01-30T15:04:42Z app[2865135bd93e58] cdg [info] File "/usr/local/lib/python3.11/contextlib.py", line 158, in exit
2025-01-30T15:04:42Z app[2865135bd93e58] cdg [info] self.gen.throw(typ, value, traceback)
2025-01-30T15:04:42Z app[2865135bd93e58] cdg [info] │ │ │ │ │ └ <traceback object at 0x7fbef7773e80>
2025-01-30T15:04:42Z app[2865135bd93e58] cdg [info] │ │ │ │ └ RemoteProtocolError('peer closed connection without sending complete message body (incomplete chunked read)')
2025-01-30T15:04:42Z app[2865135bd93e58] cdg [info] │ │ │ └ <class 'h11._util.RemoteProtocolError'>
2025-01-30T15:04:42Z app[2865135bd93e58] cdg [info] │ │ └ <method 'throw' of 'generator' objects>
2025-01-30T15:04:42Z app[2865135bd93e58] cdg [info] │ └ <generator object map_exceptions at 0x7fbef76ede40>
2025-01-30T15:04:42Z app[2865135bd93e58] cdg [info] └ <contextlib._GeneratorContextManager object at 0x7fbef80d7190>
2025-01-30T15:04:42Z app[2865135bd93e58] cdg [info] File "/usr/local/lib/python3.11/site-packages/httpcore/_exceptions.py", line 14, in map_exceptions
2025-01-30T15:04:42Z app[2865135bd93e58] cdg [info] raise to_exc(exc) from exc
2025-01-30T15:04:42Z app[2865135bd93e58] cdg [info] └ <class 'httpcore.RemoteProtocolError'>
2025-01-30T15:04:42Z app[2865135bd93e58] cdg [info]httpcore.RemoteProtocolError: peer closed connection without sending complete message body (incomplete chunked read)
2025-01-30T15:04:42Z app[2865135bd93e58] cdg [info]The above exception was the direct cause of the following exception:
2025-01-30T15:04:42Z app[2865135bd93e58] cdg [info]Traceback (most recent call last):
2025-01-30T15:04:42Z app[2865135bd93e58] cdg [info] File "/zecall/agent/main.py", line 190, in
2025-01-30T15:04:42Z app[2865135bd93e58] cdg [info] uvicorn.run("agent.main:app", host='0.0.0.0', port=5000, reload=False)
2025-01-30T15:04:42Z app[2865135bd93e58] cdg [info] │ └ <function run at 0x7fbf09c53740>
2025-01-30T15:04:42Z app[2865135bd93e58] cdg [info] └ <module 'uvicorn' from '/usr/local/lib/python3.11/site-packages/uvicorn/init.py'>
2025-01-30T15:04:42Z app[2865135bd93e58] cdg [info] File "/usr/local/lib/python3.11/site-packages/uvicorn/main.py", line 579, in run
2025-01-30T15:04:42Z app[2865135bd93e58] cdg [info] server.run()
2025-01-30T15:04:42Z app[2865135bd93e58] cdg [info] │ └ <function Server.run at 0x7fbf09ac4d60>
2025-01-30T15:04:42Z app[2865135bd93e58] cdg [info] └ <uvicorn.server.Server object at 0x7fbefaeec550>
2025-01-30T15:04:42Z app[2865135bd93e58] cdg [info] File "/usr/local/lib/python3.11/site-packages/uvicorn/server.py", line 65, in run
2025-01-30T15:04:42Z app[2865135bd93e58] cdg [info] return asyncio.run(self.serve(sockets=sockets))
2025-01-30T15:04:42Z app[2865135bd93e58] cdg [info] │ │ │ │ └ None
2025-01-30T15:04:42Z app[2865135bd93e58] cdg [info] │ │ │ └ <function Server.serve at 0x7fbf09ac4e00>
2025-01-30T15:04:42Z app[2865135bd93e58] cdg [info] │ │ └ <uvicorn.server.Server object at 0x7fbefaeec550>
2025-01-30T15:04:42Z app[2865135bd93e58] cdg [info] │ └ <function run at 0x7fbf09fcede0>
2025-01-30T15:04:42Z app[2865135bd93e58] cdg [info] └ <module 'asyncio' from '/usr/local/lib/python3.11/asyncio/init.py'>
2025-01-30T15:04:42Z app[2865135bd93e58] cdg [info] File "/usr/local/lib/python3.11/asyncio/runners.py", line 190, in run
2025-01-30T15:04:42Z app[2865135bd93e58] cdg [info] return runner.run(main)
2025-01-30T15:04:42Z app[2865135bd93e58] cdg [info] │ │ └ <coroutine object Server.serve at 0x7fbef820f6a0>
2025-01-30T15:04:42Z app[2865135bd93e58] cdg [info] │ └ <function Runner.run at 0x7fbf09e72980>
2025-01-30T15:04:42Z app[2865135bd93e58] cdg [info] └ <asyncio.runners.Runner object at 0x7fbf09c552d0>
2025-01-30T15:04:42Z app[2865135bd93e58] cdg [info] File "/usr/local/lib/python3.11/asyncio/runners.py", line 118, in run
2025-01-30T15:04:42Z app[2865135bd93e58] cdg [info] return self._loop.run_until_complete(task)
2025-01-30T15:04:42Z app[2865135bd93e58] cdg [info] │ │ │ └ <Task pending name='Task-1' coro=<Server.serve() running at /usr/local/lib/python3.11/site-packages/uvicorn/server.py:69> wai...
2025-01-30T15:04:42Z app[2865135bd93e58] cdg [info] │ │ └ <function BaseEventLoop.run_until_complete at 0x7fbf09e705e0>
2025-01-30T15:04:42Z app[2865135bd93e58] cdg [info] │ └ <_UnixSelectorEventLoop running=True closed=False debug=False>
2025-01-30T15:04:42Z app[2865135bd93e58] cdg [info] └ <asyncio.runners.Runner object at 0x7fbf09c552d0>
2025-01-30T15:04:42Z app[2865135bd93e58] cdg [info] File "/usr/local/lib/python3.11/asyncio/base_events.py", line 641, in run_until_complete
2025-01-30T15:04:42Z app[2865135bd93e58] cdg [info] self.run_forever()
2025-01-30T15:04:42Z app[2865135bd93e58] cdg [info] │ └ <function BaseEventLoop.run_forever at 0x7fbf09e70540>
2025-01-30T15:04:42Z app[2865135bd93e58] cdg [info] └ <_UnixSelectorEventLoop running=True closed=False debug=False>
2025-01-30T15:04:42Z app[2865135bd93e58] cdg [info] File "/usr/local/lib/python3.11/asyncio/base_events.py", line 608, in run_forever
2025-01-30T15:04:42Z app[2865135bd93e58] cdg [info] self._run_once()
2025-01-30T15:04:42Z app[2865135bd93e58] cdg [info] │ └ <function BaseEventLoop._run_once at 0x7fbf09e72340>
2025-01-30T15:04:42Z app[2865135bd93e58] cdg [info] └ <_UnixSelectorEventLoop running=True closed=False debug=False>
2025-01-30T15:04:42Z app[2865135bd93e58] cdg [info] File "/usr/local/lib/python3.11/asyncio/base_events.py", line 1936, in _run_once
2025-01-30T15:04:42Z app[2865135bd93e58] cdg [info] handle._run()
2025-01-30T15:04:42Z app[2865135bd93e58] cdg [info] │ └ <function Handle._run at 0x7fbf09fc8680>
2025-01-30T15:04:42Z app[2865135bd93e58] cdg [info] └ <Handle <TaskStepMethWrapper object at 0x7fbef77b45e0>()>
2025-01-30T15:04:42Z app[2865135bd93e58] cdg [info] File "/usr/local/lib/python3.11/asyncio/events.py", line 84, in _run
2025-01-30T15:04:42Z app[2865135bd93e58] cdg [info] self._context.run(self._callback, *self._args)
2025-01-30T15:04:42Z app[2865135bd93e58] cdg [info] │ │ │ │ │ └ <member '_args' of 'Handle' objects>
2025-01-30T15:04:42Z app[2865135bd93e58] cdg [info] │ │ │ │ └ <Handle <TaskStepMethWrapper object at 0x7fbef77b45e0>()>
2025-01-30T15:04:42Z app[2865135bd93e58] cdg [info] │ │ │ └ <member '_callback' of 'Handle' objects>
2025-01-30T15:04:42Z app[2865135bd93e58] cdg [info] │ │ └ <Handle <TaskStepMethWrapper object at 0x7fbef77b45e0>()>
2025-01-30T15:04:42Z app[2865135bd93e58] cdg [info] │ └ <member '_context' of 'Handle' objects>
2025-01-30T15:04:42Z app[2865135bd93e58] cdg [info] └ <Handle <TaskStepMethWrapper object at 0x7fbef77b45e0>()>
2025-01-30T15:04:42Z app[2865135bd93e58] cdg [info]> File "/usr/local/lib/python3.11/site-packages/pipecat/processors/frame_processor.py", line 308, in __input_frame_task_handler
2025-01-30T15:04:42Z app[2865135bd93e58] cdg [info] await self.process_frame(frame, direction)
2025-01-30T15:04:42Z app[2865135bd93e58] cdg [info] │ │ │ └ <FrameDirection.DOWNSTREAM: 1>
2025-01-30T15:04:42Z app[2865135bd93e58] cdg [info] │ │ └ OpenAILLMContextFrame(id=6820, name='OpenAILLMContextFrame#6', pts=None, context=<pipecat.processors.aggregators.openai_llm_c...
2025-01-30T15:04:42Z app[2865135bd93e58] cdg [info] │ └ <function BaseOpenAILLMService.process_frame at 0x7fbefb08c900>
2025-01-30T15:04:42Z app[2865135bd93e58] cdg [info] └ <pipecat.services.openai.OpenAILLMService object at 0x7fbefb09dc50>
2025-01-30T15:04:42Z app[2865135bd93e58] cdg [info] File "/usr/local/lib/python3.11/site-packages/pipecat/services/openai.py", line 312, in process_frame
2025-01-30T15:04:42Z app[2865135bd93e58] cdg [info] await self._process_context(context)
2025-01-30T15:04:42Z app[2865135bd93e58] cdg [info] │ │ └ <pipecat.processors.aggregators.openai_llm_context.OpenAILLMContext object at 0x7fbef8104710>
2025-01-30T15:04:42Z app[2865135bd93e58] cdg [info] │ └ <function BaseOpenAILLMService._process_context at 0x7fbefb08c860>
2025-01-30T15:04:42Z app[2865135bd93e58] cdg [info] └ <pipecat.services.openai.OpenAILLMService object at 0x7fbefb09dc50>
2025-01-30T15:04:42Z app[2865135bd93e58] cdg [info] File "/usr/local/lib/python3.11/site-packages/pipecat/services/openai.py", line 215, in process_context
2025-01-30T15:04:42Z app[2865135bd93e58] cdg [info] async for chunk in chunk_stream:
2025-01-30T15:04:42Z app[2865135bd93e58] cdg [info] │ └ <openai.AsyncStream object at 0x7fbef7702590>
2025-01-30T15:04:42Z app[2865135bd93e58] cdg [info] └ ChatCompletionChunk(id='chatcmpl-AvQNr8grPHRSdX3WYt0FFxf039KvX', choices=[Choice(delta=ChoiceDelta(content=' plus', function
...
2025-01-30T15:04:42Z app[2865135bd93e58] cdg [info] File "/usr/local/lib/python3.11/site-packages/openai/_streaming.py", line 147, in aiter
2025-01-30T15:04:42Z app[2865135bd93e58] cdg [info] async for item in self.iterator:
2025-01-30T15:04:42Z app[2865135bd93e58] cdg [info] │ │ └ <async_generator object AsyncStream.stream at 0x7fbef8074af0>
2025-01-30T15:04:42Z app[2865135bd93e58] cdg [info] │ └ <openai.AsyncStream object at 0x7fbef7702590>
2025-01-30T15:04:42Z app[2865135bd93e58] cdg [info] └ ChatCompletionChunk(id='chatcmpl-AvQNr8grPHRSdX3WYt0FFxf039KvX', choices=[Choice(delta=ChoiceDelta(content=' plus', function
...
2025-01-30T15:04:42Z app[2865135bd93e58] cdg [info] File "/usr/local/lib/python3.11/site-packages/openai/_streaming.py", line 160, in stream
2025-01-30T15:04:42Z app[2865135bd93e58] cdg [info] async for sse in iterator:
2025-01-30T15:04:42Z app[2865135bd93e58] cdg [info] │ └ <async_generator object AsyncStream._iter_events at 0x7fbef80f97e0>
2025-01-30T15:04:42Z app[2865135bd93e58] cdg [info] └ ServerSentEvent(event=None, data={"id":"chatcmpl-AvQNr8grPHRSdX3WYt0FFxf039KvX","object":"chat.completion.chunk","created":17...
2025-01-30T15:04:42Z app[2865135bd93e58] cdg [info] File "/usr/local/lib/python3.11/site-packages/openai/_streaming.py", line 151, in _iter_events
2025-01-30T15:04:42Z app[2865135bd93e58] cdg [info] async for sse in self._decoder.aiter_bytes(self.response.aiter_bytes()):
2025-01-30T15:04:42Z app[2865135bd93e58] cdg [info] │ │ │ │ │ │ └ <function Response.aiter_bytes at 0x7fbefb7bf920>
2025-01-30T15:04:42Z app[2865135bd93e58] cdg [info] │ │ │ │ │ └ <Response [200 OK]>
2025-01-30T15:04:42Z app[2865135bd93e58] cdg [info] │ │ │ │ └ <openai.AsyncStream object at 0x7fbef7702590>
2025-01-30T15:04:42Z app[2865135bd93e58] cdg [info] │ │ │ └ <function SSEDecoder.aiter_bytes at 0x7fbefb49b100>
2025-01-30T15:04:42Z app[2865135bd93e58] cdg [info] │ │ └ <openai._streaming.SSEDecoder object at 0x7fbef7725750>
2025-01-30T15:04:42Z app[2865135bd93e58] cdg [info] │ └ <openai.AsyncStream object at 0x7fbef7702590>
2025-01-30T15:04:42Z app[2865135bd93e58] cdg [info] └ ServerSentEvent(event=None, data={"id":"chatcmpl-AvQNr8grPHRSdX3WYt0FFxf039KvX","object":"chat.completion.chunk","created":17...
2025-01-30T15:04:42Z app[2865135bd93e58] cdg [info] File "/usr/local/lib/python3.11/site-packages/openai/_streaming.py", line 302, in aiter_bytes
2025-01-30T15:04:42Z app[2865135bd93e58] cdg [info] async for chunk in self._aiter_chunks(iterator):
2025-01-30T15:04:42Z app[2865135bd93e58] cdg [info] │ │ │ └ <async_generator object Response.aiter_bytes at 0x7fbef80755a0>
2025-01-30T15:04:42Z app[2865135bd93e58] cdg [info] │ │ └ <function SSEDecoder._aiter_chunks at 0x7fbefb49b1a0>
2025-01-30T15:04:42Z app[2865135bd93e58] cdg [info] │ └ <openai._streaming.SSEDecoder object at 0x7fbef7725750>
2025-01-30T15:04:42Z app[2865135bd93e58] cdg [info] └ b'data: {"id":"chatcmpl-AvQNr8grPHRSdX3WYt0FFxf039KvX","object":"chat.completion.chunk","created":1738249479,"model":"gpt-4o-...
2025-01-30T15:04:42Z app[2865135bd93e58] cdg [info] File "/usr/local/lib/python3.11/site-packages/openai/_streaming.py", line 313, in _aiter_chunks
2025-01-30T15:04:42Z app[2865135bd93e58] cdg [info] async for chunk in iterator:
2025-01-30T15:04:42Z app[2865135bd93e58] cdg [info] │ └ <async_generator object Response.aiter_bytes at 0x7fbef80755a0>
2025-01-30T15:04:42Z app[2865135bd93e58] cdg [info] └ b'data: {"id":"chatcmpl-AvQNr8grPHRSdX3WYt0FFxf039KvX","object":"chat.completion.chunk","created":1738249479,"model":"gpt-4o-...
2025-01-30T15:04:42Z app[2865135bd93e58] cdg [info] File "/usr/local/lib/python3.11/site-packages/httpx/_models.py", line 931, in aiter_bytes
2025-01-30T15:04:42Z app[2865135bd93e58] cdg [info] async for raw_bytes in self.aiter_raw():
2025-01-30T15:04:42Z app[2865135bd93e58] cdg [info] │ │ └ <function Response.aiter_raw at 0x7fbefb7bfb00>
2025-01-30T15:04:42Z app[2865135bd93e58] cdg [info] │ └ <Response [200 OK]>
2025-01-30T15:04:42Z app[2865135bd93e58] cdg [info] └ b'data: {"id":"chatcmpl-AvQNr8grPHRSdX3WYt0FFxf039KvX","object":"chat.completion.chunk","created":1738249479,"model":"gpt-4o-...
2025-01-30T15:04:42Z app[2865135bd93e58] cdg [info] File "/usr/local/lib/python3.11/site-packages/httpx/_models.py", line 989, in aiter_raw
2025-01-30T15:04:42Z app[2865135bd93e58] cdg [info] async for raw_stream_bytes in self.stream:
2025-01-30T15:04:42Z app[2865135bd93e58] cdg [info] │ │ └ <httpx._client.BoundAsyncStream object at 0x7fbef7702650>
2025-01-30T15:04:42Z app[2865135bd93e58] cdg [info] │ └ <Response [200 OK]>
2025-01-30T15:04:42Z app[2865135bd93e58] cdg [info] └ b'data: {"id":"chatcmpl-AvQNr8grPHRSdX3WYt0FFxf039KvX","object":"chat.completion.chunk","created":1738249479,"model":"gpt-4o-...
2025-01-30T15:04:42Z app[2865135bd93e58] cdg [info] File "/usr/local/lib/python3.11/site-packages/httpx/_client.py", line 150, in aiter
2025-01-30T15:04:42Z app[2865135bd93e58] cdg [info] async for chunk in self._stream:
2025-01-30T15:04:42Z app[2865135bd93e58] cdg [info] │ │ └ <httpx._transports.default.AsyncResponseStream object at 0x7fbef77491d0>
2025-01-30T15:04:42Z app[2865135bd93e58] cdg [info] │ └ <httpx._client.BoundAsyncStream object at 0x7fbef7702650>
2025-01-30T15:04:42Z app[2865135bd93e58] cdg [info] └ b'data: {"id":"chatcmpl-AvQNr8grPHRSdX3WYt0FFxf039KvX","object":"chat.completion.chunk","created":1738249479,"model":"gpt-4o-...
2025-01-30T15:04:42Z app[2865135bd93e58] cdg [info] File "/usr/local/lib/python3.11/site-packages/httpx/_transports/default.py", line 256, in aiter
2025-01-30T15:04:42Z app[2865135bd93e58] cdg [info] with map_httpcore_exceptions():
2025-01-30T15:04:42Z app[2865135bd93e58] cdg [info] └ <function map_httpcore_exceptions at 0x7fbefbe432e0>
2025-01-30T15:04:42Z app[2865135bd93e58] cdg [info] File "/usr/local/lib/python3.11/contextlib.py", line 158, in exit
2025-01-30T15:04:42Z app[2865135bd93e58] cdg [info] self.gen.throw(typ, value, traceback)
2025-01-30T15:04:42Z app[2865135bd93e58] cdg [info] │ │ │ │ │ └ <traceback object at 0x7fbef873ef40>
2025-01-30T15:04:42Z app[2865135bd93e58] cdg [info] │ │ │ │ └ RemoteProtocolError(RemoteProtocolError('peer closed connection without sending complete message body (incomplete chunked rea...
2025-01-30T15:04:42Z app[2865135bd93e58] cdg [info] │ │ │ └ <class 'httpcore.RemoteProtocolError'>
2025-01-30T15:04:42Z app[2865135bd93e58] cdg [info] │ │ └ <method 'throw' of 'generator' objects>
2025-01-30T15:04:42Z app[2865135bd93e58] cdg [info] │ └ <generator object map_httpcore_exceptions at 0x7fbef76ec040>
2025-01-30T15:04:42Z app[2865135bd93e58] cdg [info] └ <contextlib._GeneratorContextManager object at 0x7fbef777fed0>
2025-01-30T15:04:42Z app[2865135bd93e58] cdg [info] File "/usr/local/lib/python3.11/site-packages/httpx/_transports/default.py", line 89, in map_httpcore_exceptions
2025-01-30T15:04:42Z app[2865135bd93e58] cdg [info] raise mapped_exc(message) from exc
2025-01-30T15:04:42Z app[2865135bd93e58] cdg [info] │ └ 'peer closed connection without sending complete message body (incomplete chunked read)'
2025-01-30T15:04:42Z app[2865135bd93e58] cdg [info] └ <class 'httpx.RemoteProtocolError'>
2025-01-30T15:04:42Z app[2865135bd93e58] cdg [info]httpx.RemoteProtocolError: peer closed connection without sending complete message body (incomplete chunked read)

@markbackman
Copy link
Contributor

@Raphyo What's your repro case? Can you hit this with the 14-function-calling.py example?

If so, how frequently does this occur?

@Raphyo
Copy link

Raphyo commented Jan 30, 2025

I am currently using FastAPIWebsocketTransport so it would require some work to try on 14-function-calling.py example

@markbackman
Copy link
Contributor

Is this issue impacting anyone? I still can't repro. OpenAI works fine and I've tested a lot over the last week with it.

@KevGTL
Copy link
Contributor Author

KevGTL commented Feb 6, 2025

@markbackman Yes I still get errors on GPT-4o, but not on other models. I think it's an OpenAI infra problem, we can close this issue I guess.

@KevGTL KevGTL closed this as completed Feb 6, 2025
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

3 participants