Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

StreamingAdapterObserver seems to get automatically completed #143

Open
tb opened this issue Sep 4, 2024 · 3 comments
Open

StreamingAdapterObserver seems to get automatically completed #143

tb opened this issue Sep 4, 2024 · 3 comments

Comments

@tb
Copy link

tb commented Sep 4, 2024

I am connecting AiChat to AI agent via LangServe with:

 const chatAdapter = useAsStreamAdapter(streamText);
 <AiChat adapter={chatAdapter} />

and default streamText code:

import {StreamingAdapterObserver} from '@nlux/react';

export const streamText = async (prompt: string, observer: StreamingAdapterObserver) => {
    const response = await fetch('http://localhost:8000/stream/', {
        method: 'POST',
        body: JSON.stringify({input: {messages: [prompt]}}),
        headers: {'Content-Type': 'application/json'},
    });

    if (response.status !== 200) {
        console.log("--- response.status !== 200");
        observer.error(new Error('Failed to connect to the server'));
        return;
    }

    if (!response.body) {
        console.log("--- !response.body !!!!!!!!!!!!!!!!!!!!!!!!!");
        return;
    }

    const reader = response.body.getReader();
    const textDecoder = new TextDecoder();

    while (true) {
        const { value, done } = await reader.read();
        const {type, messages} = parse(textDecoder.decode(value));
        const content = messages[0] ? messages[0]['content'] : '';

        console.log(JSON.stringify({type: type, content: content}, null, 2));

        if (type == "tools") {
            observer.next(`Running (${type})\n`);
        }
        
        if (type == "agent" && content) {
            observer.next(content);
        }
        
        if (done) {
            console.log('--- await reader.read() done')
            break;
        }
    }

    console.log('--- complete')
    observer.complete();
};

This code works well for queries that take up to about 8-10s, but if AI agent uses more tools the code crashes with error: stream.tsx:40 [nlux] Stream is already complete. No more chunks can be added

Here is what I see in the console log for short task that works correctly:

{
  "type": "agent",
  "content": ""
}
{
  "type": "tools",
  "content": "..."
}
{
  "type": "agent",
  "content": ""
}
{
  "type": "tools",
  "content": "..."
}
{
  "type": "agent",
  "content": ""
}
{
  "type": "tools",
  "content": "[(93,)]"
}
{
  "type": "agent",
  "content": "We have a total of 93 clients."
}
{}
{}
--- await reader.read() done
--- complete

and here is how it looks for prompt that uses more tools and more time on the backend:

{
  "type": "agent",
  "content": ""
}
{
  "type": "tools",
  "content": "..."
}
{
  "type": "agent",
  "content": ""
}
{
  "type": "tools",
  "content": "...."
}
{
  "type": "agent",
  "content": ""
}
{
  "type": "tools",
  "content": "..."
}
[nlux] Stream is already complete. No more chunks can be added**
{
  "type": "agent",
  "content": ""
}
{
  "type": "tools",
  "content": "..."
}
**[nlux] Stream is already complete. No more chunks can be added**
{
  "type": "agent",
  "content": "Here are some European clients' names from the database:\n\n1. Alfreds Futterkiste\n2. Berglunds snabbköp\n3. Blauer See Delikatessen\n4. Blondesddsl père et fils\n5. Bólido Comidas preparadas\n6. Bon app'\n7. Chop-suey Chinese\n8. Drachenblut Delikatessen\n9. Du monde entier\n10. Ernst Handel\n11. FISSA Fabrica Inter. Salchichas S.A.\n12. Folies gourmandes\n13. Folk och fä HB\n14. Frankenversand\n15. France restauration\n16. Franchi S.p.A.\n17. Furia Bacalhau e Frutos do Mar\n18. Galería del gastrónomo\n19. Godos Cocina Típica\n20. Hungry Owl All-Night Grocers\n21. Königlich Essen\n22. La corne d'abondance\n23. La maison d'Asie\n24. Lehmanns Marktstand\n25. Magazzini Alimentari Riuniti\n26. Maison Dewey\n27. Morgenstern Gesundkost\n28. Ottilies Käseladen\n29. Paris spécialités\n30. Piccolo und mehr\n31. Princesa Isabel Vinhos\n32. QUICK-Stop\n33. Reggiani Caseifici\n34. Richter Supermarkt\n35. Romero y tomillo\n36. Santé Gourmet\n37. Simons bistro\n38. Spécialités du monde\n39. Suprêmes délices\n40. Toms Spezialitäten\n41. Vaffeljernet\n42. Victuailles en stock\n43. Vins et alcools Chevalier\n44. Die Wandernde Kuh\n45. Wartian Herkku\n46. Wilman Kala\n47. Wolski Zajazd\n\nIf you need more information or specific details, feel free to ask!"
}
{}
{}
--- await reader.read() done
--- complete

It looks like that my streamText code is not completed because of steanText code... but rather somerhere else in codebase which is hard for me to debug.

Is there some kind of timeout in AiChat that could be responsible for this?

@tb
Copy link
Author

tb commented Sep 4, 2024

I have noticed now that if I skip:

        if (type == "tools") {
            observer.next(`Running (${type})\n`);
        }

even short AI requests fail with the error:
[nlux] Stream is already complete. No more chunks can be added

@lvmaoya
Copy link

lvmaoya commented Oct 31, 2024

I met the same issue;

@lvmaoya
Copy link

lvmaoya commented Oct 31, 2024

@tb Have you solved the problem?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants