Skip to content

Commit

Permalink
upadte eino docs
Browse files Browse the repository at this point in the history
  • Loading branch information
kuhahalong committed Jan 20, 2025
1 parent 7c96f4a commit 18ba065
Show file tree
Hide file tree
Showing 38 changed files with 360 additions and 234 deletions.
2 changes: 1 addition & 1 deletion content/zh/docs/eino/_index.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
---
Description: Eino 是基于 Golang 的 AI 应用开发框架
date: "2025-01-15"
date: "2025-01-20"
lastmod: ""
linktitle: Eino
menu:
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
---
Description: ""
date: "2025-01-15"
date: "2025-01-20"
lastmod: ""
tags: []
title: 'Eino: Callback 用户手册'
Expand Down Expand Up @@ -194,6 +194,8 @@ Graph 会为内部所有的 Node 自动注入 RunInfo。机制是每个 Node 的

## 触发方式

![](/img/eino/graph_node_callback_run_place.png)

### 组件实现内部触发(Component Callback)

在组件实现的代码中,调用 callbacks 包中的 `OnStart(), OnEnd(), OnError(), OnStartWithStreamInput(), OnEndWithStreamInput()`。以 Ark 的 ChatModel 实现为例,在 Generate 方法中:
Expand Down Expand Up @@ -444,3 +446,15 @@ Handler 内不建议修改 input / output。原因是:
不同 Handler 之间,没有执行顺序的保证,因此不建议通过上面的机制在不同 Handler 间传递信息。本质上是无法保证某一个 Handler 返回的 context,一定会进入下一个 Handler 的函数执行中。

如果需要在不同 Handler 之间传递信息,建议的方式是在最外层的 context(如 graph 执行时传入的 context)中,设置一个全局的、请求维度的变量作为公共信息的存取空间,在各个 Handler 中按需读取和更新这个公共变量。在有 stream 的情况下,可能需要格外注意和保证这个公共变量的并发安全。

### 流切记要 Close

以存在 ChatModel 这种具有真流输出的节点为例,当存在 Callback 切面时,ChatModel 的输出流:
- 既要被下游节点作为输入来消费,又要被 Callback 切面来消费
- 一个流中的一个帧(Chunk),只能被一个消费方消费到,即流不是广播模型

所以此时需要将流进行复制,其复制关系如下:

![](/img/eino/graph_stream_chunk_copy.png)

- 如果其中一个 Callback n 没有 Close 对应的流,可能导致原始 Stream 无法 Close 和释放资源。
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
---
Description: ""
date: "2025-01-15"
date: "2025-01-20"
lastmod: ""
tags: []
title: 'Eino: 编排的设计理念'
Expand Down Expand Up @@ -127,7 +127,6 @@ parallel 节点的输出一定是一个 `map[string]any`,其中的 key 则是
```go
func TestParallel() {
chain := compose.NewChain[map[string]any, map[string]*schema.Message]()
templateNode := &fakeTemplateNode{} // input: map[string]any, output: []*schema.Message

parallel := compose.NewParallel()
model01 := &fakeChatModel{} // input: []*schema.Message, output: *schema.Message
Expand Down Expand Up @@ -171,58 +170,72 @@ Workflow 的类型对齐的维度,由整体的 Input & Output 改成了字段

### invoke 和 stream 下的类型对齐方式

eino 中,编排的结果是 graph 或 chain,若要运行,则需要使用 `Compile()` 来生成一个 `Runnable` 接口。
Eino 中,编排的结果是 graph 或 chain,若要运行,则需要使用 `Compile()` 来生成一个 `Runnable` 接口。

Runnable 的一个重要作用就是提供了 `invoke``stream``collect``transform` 几种调用方式的降级兼容
Runnable 的一个重要作用就是提供了 `I``nvoke``S``tream``C``ollect``T``ransform` 四种调用方式

> 上述几种调用方式的介绍以及详细的 Runnable 介绍可以查看: [Eino: 基础概念介绍](/zh/docs/eino/overview)
以我们最常见的 invoke 和 stream 模式为例,其接口签名如下:
假设我们有一个 `Graph[[]*schema.Message, []*schema.Message]`,里面有一个 ChatModel 节点,一个 Lambda 节点,Compile 之后是一个 `Runnable[[]*schema.Message, []*schema.Message]`

```go
type Runnable[I, O any] interface {
Invoke(ctx context.Context, input I, opts ...Option) (output O, err error)
Stream(ctx context.Context, input I, opts ...Option) (output *schema.StreamReader[O], err error)
}
```

以 chat model 的场景为例,Runnable 加上 I,O 类型后签名如下:

```go
type Runnable interface {
Invoke(ctx context.Context, input []*schema.Message, opts ...Option) (output *schema.Message, err error)
Stream(ctx context.Context, input []*schema.Message, opts ...Option) (output *schema.StreamReader[*schema.Message], err error)
}
```

在 Invoke 模式下,返回的 output 的类型为 `*schema.Message`; 在 Stream 模式下,其返回的 output 类型必须为 `*schema.StreamReader[*schema.Message]`。也就是 stream 的每一帧的类型和 invoke 的结果类型是相同的。

一般来说,Stream 得到的每一帧合并起来应当和 invoke 的结果相同,在上面这个场景中,也即要求:

```go
func TestInvokeAndStream() {
var r Runnable[[]*schema.Message, *schema.Message]

reader, err := r.Stream(...)
allFrames := make([]*schema.Message, 0)
package main

import (
"context"
"io"
"testing"

"github.com/cloudwego/eino/compose"
"github.com/cloudwego/eino/schema"
"github.com/stretchr/testify/assert"
)

func TestTypeMatch(t *testing.T) {
ctx := context.Background()

g1 := compose.NewGraph[[]*schema.Message, string]()
_ = g1.AddChatModelNode("model", &mockChatModel{})
_ = g1.AddLambdaNode("lambda", compose.InvokableLambda(func(_ context.Context, msg *schema.Message) (string, error) {
return msg.Content, nil
}))
_ = g1.AddEdge(compose.START, "model")
_ = g1.AddEdge("model", "lambda")
_ = g1.AddEdge("lambda", compose.END)

runner, err := g1.Compile(ctx)
assert.NoError(t, err)

c, err := runner.Invoke(ctx, []*schema.Message{
schema.UserMessage("what's the weather in beijing?"),
})
assert.NoError(t, err)
assert.Equal(t, "the weather is good", c)

s, err := runner.Stream(ctx, []*schema.Message{
schema.UserMessage("what's the weather in beijing?"),
})
assert.NoError(t, err)

var fullStr string
for {
frame, err := reader.Recev()
...
allFrames = append(allFrames, frame)
...
chunk, err := s.Recv()
if err != nil {
if err == io.EOF {
break
}
panic(err)
}

fullStr += chunk
}

invokeRes, err := r.Invoke(...)

// allFrames 合并后需要和 invokeRes 相同
assert.Equal(t, c, fullStr)
}
```

在 stream 模式下,`合并帧` 是一个非常常见的操作,例如在和大模型的交互中,可以把已经接收到的所有帧拼接起来(Concatenate),得到一个完整的输出。

另外,在框架中,当一个仅提供了 Stream 接口的节点,被编排后使用 Invoke 调用,框架则会把 Stream 降级为 Invoke,此时的操作是 底层调用开发者提供的 Stream 接口,获取完整的帧后,把所有帧合并,得到的结果再流转到下一节点。 这个过程中,也是使用的 `拼接``帧`
当我们以 Stream 方式调用上面编译好的 Runnable 时,model 节点会输出 `*schema.StreamReader[*Message]`,但是 lambda 节点是 InvokableLambda,只接收非流式的 `*schema.Message` 作为输入。这也符合类型对齐规则,因为 Eino 框架会自动把流式的 Message 拼接成完整的 Message。

拼接时,会先把 `*StreamReader[T] ` 中的所有元素取出来转成 `[]T`。框架内已经内置支持了如下类型的拼接:
在 stream 模式下,`拼接``帧` 是一个非常常见的操作,拼接时,会先把 `*StreamReader[T] ` 中的所有元素取出来转成 `[]T`,再尝试把 `[]T` 拼接成一个完整的 `T`。框架内已经内置支持了如下类型的拼接:

- `*schema.Message`: 详情见 `schema.ConcatMessages()`
- `string`: 实现逻辑等同于 `+=`
Expand Down Expand Up @@ -251,7 +264,7 @@ func concatTStreamForTest(items []*tStreamConcatItemForTest) (*tStreamConcatItem
return &tStreamConcatItemForTest{s: s}, nil
}

func init() {
func Init() {
// 注册到全局的拼接方法中
compose.RegisterStreamChunkConcatFunc(concatTStreamForTest)
}
Expand All @@ -269,16 +282,6 @@ eino 的 Graph 类型对齐检查,会在 `err = graph.AddEdge("node1", "node2"

这种场景适用于开发者能自行处理好上下游类型对齐的情况,可根据不同类型选择下游执行节点。

## 大模型场景的编排

eino 在编排中的是以大模型应用为核心场景的编排系统,因此在 eino 的编排设计中,是直接把 `component` 作为了编排的直接主体,封装了在大模型应用中最常用的组件,详细的 API 查看: [Eino: 基础概念介绍](/zh/docs/eino/overview)

大多数情况下,业务的实现应当把自己的组件实现为上述组件中的一种,或者直接使用 eino-ext 中已经封装好的组件。

当然,除了上述标准的组件外,还有很多场景我们需要实现一些自定义的代码逻辑,在 eino 中,这就是 `Lambda` 组件。这是一个很泛化的组件,可以基于这个基础组件实现几乎所有的需求,实际上,在 eino 内部,上述的组件也都是使用 lambda 来实现的。

> 更多信息可以参考: [Eino: Components 抽象&实现](/zh/docs/eino/core_modules/components)
## 带有明确倾向性的设计选择

### 扇入与合并
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
---
Description: ""
date: "2025-01-15"
date: "2025-01-20"
lastmod: ""
tags: []
title: Eino 流式编程要点
Expand Down Expand Up @@ -44,6 +44,7 @@ type ChatModel interface {
Generate(ctx context.Context**, **input []*schema.Message**, **opts ...Option) (*schema.Message**, **error)
Stream(ctx context.Context**, **input []*schema.Message**, **opts ...Option) (
*schema.StreamReader[*schema.Message]**, **error)
// other methods omitted...
}
```

Expand Down Expand Up @@ -101,7 +102,7 @@ Collect 和 Transform 两种流式范式,目前只在编排场景有用到。
上面的 Concat message stream 是 Eino 框架自动提供的能力,即使不是 message,是任意的 T,只要满足特定的条件,Eino 框架都会自动去做这个 StreamReader[T] 到 T 的转化,这个条件是:**在编排中,当一个组件的上游输出是 StreamReader[T],但是组件只提供了 T 作为输入的业务接口时,框架会自动将 StreamReader[T] concat 成 T,再输入给这个组件。**

> 💡
> 框架自动将 StreamReader[T] concat 成 T 的过程,可能需要用户提供一个 Concat function。详见 [Eino: 编排的设计理念](/zh/docs/eino/core_modules/chain_and_graph_orchestration/orchestration_design_principles) 中关于“合并帧”的章节。
> 框架自动将 StreamReader[T] concat 成 T 的过程,可能需要用户提供一个 Concat function。详见 [Eino: 编排的设计理念](/zh/docs/eino/core_modules/chain_and_graph_orchestration/orchestration_design_principles#share-FaVnd9E2foy4fAxtbTqcsgq3n5f) 中关于“合并帧”的章节。
另一方面,考虑一个相反的例子。还是 React Agent,这次是一个更完整的编排示意图:

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ type Options struct {

最基础的文本解析器,将输入内容直接作为文档内容:

> 代码位置:eino-ext/components/document/parser/textparser
> 代码位置:eino-examples/components/document/parser/textparser
```go
import "github.com/cloudwego/eino/components/document/parser"
Expand All @@ -81,31 +81,61 @@ logs.Infof("text content: %v", docs[0].Content)

基于文件扩展名的解析器,可以根据文件扩展名自动选择合适的解析器:

> 代码位置:eino-examples/components/document/parser/extparser
```go
// 创建扩展解析器
parser, err := NewExtParser(ctx, &ExtParserConfig{
// 注册特定扩展名的解析器
Parsers: map[string]Parser{
".html": html.NewParser(&html.ParserConfig{
Selector: ".body"
}),
".pdf": pdf.NewParser(&pdf.ParserConfig{}),
},
// 设置默认解析器,用于处理未知格式
FallbackParser: TextParser{},
})
if err != nil {
return err
}
package main

// 使用解析器
file, _ := os.Open("./document.html")
docs, err := parser.Parse(ctx, file,
WithURI("./document.html"), // 必须提供 URI 以便选��正确的解析器
WithExtraMeta(map[string]any{
"source": "local",
}),
import (
"context"
"os"

"github.com/cloudwego/eino-ext/components/document/parser/html"
"github.com/cloudwego/eino-ext/components/document/parser/pdf"
"github.com/cloudwego/eino/components/document/parser"

"github.com/cloudwego/eino-examples/internal/gptr"
"github.com/cloudwego/eino-examples/internal/logs"
)

func main() {
ctx := context.Background()

textParser := parser.TextParser{}

htmlParser, _ := html.NewParser(ctx, &html.Config{
Selector: gptr.Of("body"),
})

pdfParser, _ := pdf.NewPDFParser(ctx, &pdf.Config{})

// 创建扩展解析器
extParser, _ := parser.NewExtParser(ctx, &parser.ExtParserConfig{
// 注册特定扩展名的解析器
Parsers: map[string]parser.Parser{
".html": htmlParser,
".pdf": pdfParser,
},
// 设置默认解析器,用于处理未知格式
FallbackParser: textParser,
})

// 使用解析器
filePath := "./testdata/test.html"
file, _ := os.Open(filePath)

docs, _ := extParser.Parse(ctx, file,
// 必须提供 URI ExtParser 选择正确的解析器进行解析
parser.WithURI(filePath),
parser.WithExtraMeta(map[string]any{
"source": "local",
}),
)

for idx, doc := range docs {
logs.Infof("doc_%v content: %v", idx, doc.Content)
}
}
```

### 其他实现
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ weight: 1
</td>
<td>
1. <strong>通过Marketplace,搜索Eino Dev 插件并按照</strong>
<img src="/img/eino/S0yAbpcV8oauOpxgGKlcn6lVn6g.png" />
<img src="/img/eino/eino_install_page_2_page.png" />

</td>
</tr></tbody></table>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ go mod tidy
<td>

2.点击配置调试地址
<img src="/img/eino/Wy7abH4QZoJzLQxAlLscmnnDnHh.png" />
<img src="/img/eino/eino_debug_config_3_page.png" />

</td>
</tr></tbody></table>
Expand All @@ -67,13 +67,13 @@ go mod tidy
<td>

3.填入 127.0.0.1:52538
<img src="/img/eino/SHvXbcIRko3tA0xUgFQcMH6Vned.png" />
<img src="/img/eino/eino_debug_config_2_page.png" />

</td>
<td>

4.点击确认进入调试界面,选择要调试的 Graph
<img src="/img/eino/NAQIbC4yxoKcsRx3tmkc9ZjEnAg.png" />
<img src="/img/eino/eino_orchestration_index_2_page.png" />

</td>
</tr></tbody></table>
Expand Down Expand Up @@ -120,7 +120,7 @@ go mod tidy
## 编排拓扑可视化

支持 Graph 和 Chain 编排拓扑可视化。
![](/img/eino/R8EYbfenDoeMnfxGjJ9cZ6Hjnff.png)
![](/img/eino/eino_debug_list_nodes_page.png)

## 从任意节点开始调试

Expand Down Expand Up @@ -245,7 +245,7 @@ func main() {
> - 远程服务器调试:需要你保证端口可访问。
IP 和 Port 配置完成后,点击确认,调试插件会自动连接到目标调试服务器。如果成功连接,连接状态指示器会变成绿色。
![](/img/eino/XnWRblIfroE1f2xENJ0cafzknKe.png)
![](/img/eino/eino_debug_ip_port_show_page.png)

## 选择目标调试编排产物

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ weight: 2
<img src="/img/eino/eino_orchestration_add_nodes_page.png" />
</td>
<td>
<img src="/img/eino/G6oabw0FNoFTeSxoAxWcrhNJnVd.png" />
<img src="/img/eino/eino_dev_chat_model_config2.png" />
</td>
</tr></tbody></table>

Expand All @@ -64,7 +64,7 @@ weight: 2
<img src="/img/eino/eino_orchestration_add_nodes_3_page.png" />
</td>
<td>
<img src="/img/eino/FQHfbUjLJofszaxeEtncYa47nnf.png" />
<img src="/img/eino/eino_dev_chat_model_config.png" />
</td>
</tr></tbody></table>

Expand Down Expand Up @@ -100,7 +100,7 @@ weight: 2
<img src="/img/eino/eino_orchestration_add_nodes_2_page.png" />
</td>
<td>
<img src="/img/eino/TntYbJKbMofcs3xlU7icemXsnbe.png" />
<img src="/img/eino/eino_orchestration_node_config_2_page.png" />
</td>
<td>
<img src="/img/eino/eino_orchestration_add_edges_page.png" />
Expand Down
Loading

0 comments on commit 18ba065

Please sign in to comment.