Skip to content

Commit

Permalink
doc: eino doc update (#1214)
Browse files Browse the repository at this point in the history
  • Loading branch information
kuhahalong authored Jan 20, 2025
1 parent eea25ec commit b324caa
Show file tree
Hide file tree
Showing 108 changed files with 1,253 additions and 1,062 deletions.
2 changes: 1 addition & 1 deletion content/zh/docs/eino/_index.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
---
Description: Eino 是基于 Golang 的 AI 应用开发框架
date: "2025-01-15"
date: "2025-01-20"
lastmod: ""
linktitle: Eino
menu:
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
---
Description: ""
date: "2025-01-15"
date: "2025-01-20"
lastmod: ""
tags: []
title: 'Eino: CallOption 能力与规范'
Expand All @@ -10,7 +10,7 @@ weight: 0
**CallOption**: 对 Graph 编译产物进行调用时,直接传递数据给特定的一组节点(Component、Implementation、Node)的渠道
- 和 节点 Config 的区别: 节点 Config 是实例粒度的配置,也就是从实例创建到实例消除,Config 中的值一旦确定就不需要改变了
- CallOption:是请求粒度的配置,不同的请求,其中的值是不一样的。更像是节点入参,但是这个入参是直接由 Graph 的入口直接传入,而不是上游节点传入。
- 举例:LarkDocLoader 中,需要提供请求粒度的 RefreshToken,这个 RefreshToken 每个用户每次使用后,都需要更换
- 举例:给一个 ChatModel 节点传入 Temperature 配置;给一个 Lambda 节点传入自定义 option。

## 组件 CallOption 形态

Expand All @@ -31,12 +31,12 @@ eino/components/model
// 抽象实现所在代码位置
eino-ext/components/model
├── maas
│   ├── call_option.go
│   └── Implementation.go
├── openai
├── claude
│   ├── option.go // Component 的一种实现的 CallOption 入参
│   └── chatmodel.go
├── ollama
│   ├── call_option.go // Component 的一种实现的 CallOption 入参
│   ├── Implementation.go
│   ├── chatmodel.go
```

### Model 抽象
Expand All @@ -60,11 +60,18 @@ type ChatModel interface {
}

// 此结构体是【组件抽象CallOption】的统一定义。 组件的实现可根据自己的需要取用【组件抽象CallOption】的信息
// Options is the common options for the model.
type Options struct {
Temperature float32
MaxTokens int
Model string
TopP float32
// Temperature is the temperature for the model, which controls the randomness of the model.
Temperature *float32
// MaxTokens is the max number of tokens, if reached the max tokens, the model will stop generating, and mostly return an finish reason of "length".
MaxTokens *int
// Model is the model name.
Model *string
// TopP is the top p for the model, which controls the diversity of the model.
TopP *float32
// Stop is the stop words for the model, which controls the stopping condition of the model.
Stop []string
}

// Option is the call option for ChatModel component.
Expand All @@ -78,34 +85,47 @@ type Option struct {
implSpecificOptFn any
}

// WithTemperature is the option to set the temperature for the model.
func WithTemperature(temperature float32) Option {
return Option{
apply: func(opts *Options) {
opts.Temperature = temperature
opts.Temperature = &temperature
},
}
}

// WithMaxTokens is the option to set the max tokens for the model.
func WithMaxTokens(maxTokens int) Option {
return Option{
apply: func(opts *Options) {
opts.MaxTokens = maxTokens
opts.MaxTokens = &maxTokens
},
}
}

// WithModel is the option to set the model name.
func WithModel(name string) Option {
return Option{
apply: func(opts *Options) {
opts.Model = name
opts.Model = &name
},
}
}

// WithTopP is the option to set the top p for the model.
func WithTopP(topP float32) Option {
return Option{
apply: func(opts *Options) {
opts.TopP = topP
opts.TopP = &topP
},
}
}

// WithStop is the option to set the stop words for the model.
func WithStop(stop []string) Option {
return Option{
apply: func(opts *Options) {
opts.Stop = stop
},
}
}
Expand Down Expand Up @@ -156,149 +176,116 @@ func GetImplSpecificOptions[T any](base *T, opts ...Option) *T {
}
```

### OpenAI 实现
### Claude 实现

> 组件的实现均类似 OpenAI 的实现
>
> 注:此处为样例,eino-ext/components/model 中暂时没有此场景
[https://github.com/cloudwego/eino-ext/blob/main/components/model/claude/option.go](https://github.com/cloudwego/eino-ext/blob/main/components/model/claude/option.go)

```go
package openai
package claude

import (
"github.com/cloudwego/eino/components/model"
)

// openAIOptions 实现粒度的 CallOption 配置
type requestOptions struct {
APIKey string
Stop []string
PresencePenalty float32
type options struct {
TopK *int32
}

// openai 下的 WithXX() 方法,只能对 openai 这一种实现生效
func WithAPIKey(apiKey string) model.Option {
return model.WrapImplSpecificOptFn[requestOptions](func(o *requestOptions) {
o.APIKey = apiKey
})
}

func WithStop(stop []string) model.Option {
return model.WrapImplSpecificOptFn[requestOptions](func(o *requestOptions) {
o.Stop = stop
})
}

func WithPresencePenalty(presencePenalty float32) model.Option {
return model.WrapImplSpecificOptFn[requestOptions](func(o *requestOptions) {
o.PresencePenalty = presencePenalty
func WithTopK(k int32) model.Option {
return model.WrapImplSpecificOptFn(func(o *options) {
o.TopK = &k
})
}
```

model/openai/Implementation.go
[https://github.com/cloudwego/eino-ext/blob/main/components/model/claude/claude.go](https://github.com/cloudwego/eino-ext/blob/main/components/model/claude/claude.go)

```go
type ChatModel struct {}
func (c *claude) genMessageNewParams(input []*schema.Message, opts ...model.Option) (anthropic.MessageNewParams, error) {
if len(input) == 0 {
return anthropic.MessageNewParams{}, fmt.Errorf("input is empty")
}

func (cm *ChatModel) Generate(ctx context.Context, in []*schema.Message, opts ...model.Option) (

cmOpts := model.GetCommonOptions(&model.Options{
Model: "gpt-3.5-turbo",
MaxTokens: 1024,
Temperature: 0.7,
}, opts...)

implOpts := model.GetImplSpecificOptions[requestOptions](&requestOptions{
Stop: nil,
PresencePenalty: 1,
commonOptions := model.GetCommonOptions(&model.Options{
Model: &c.model,
Temperature: c.temperature,
MaxTokens: &c.maxTokens,
TopP: c.topP,
Stop: c.stopSequences,
}, opts...)
claudeOptions := model.GetImplSpecificOptions(&options{TopK: c.topK}, opts...)

// 在这里开发 OpenAI 的 Model 逻辑
_ = cmOpts
_ = implOpts
// omit mulple lines...
return nil, nil
}
```

## 编排中的 CallOption

func (cm *ChatModel) Stream(ctx context.Context, in []*schema.Message,
opts ...model.Option) (outStream *schema.StreamReader[*schema.Message], err error) {
// 同 Generate 接口
[https://github.com/cloudwego/eino/blob/main/compose/runnable.go](https://github.com/cloudwego/eino/blob/main/compose/runnable.go)

Graph 编译产物是 Runnable

```go
type Runnable[I, O any] interface {
Invoke(ctx context.Context, input I, opts ...Option) (output O, err error)
Stream(ctx context.Context, input I, opts ...Option) (output *schema.StreamReader[O], err error)
Collect(ctx context.Context, input *schema.StreamReader[I], opts ...Option) (output O, err error)
Transform(ctx context.Context, input *schema.StreamReader[I], opts ...Option) (output *schema.StreamReader[O], err error)
}
```

### Graph 编译产物
Runnable 各方法均接收 compose.Option 列表。

> Graph 的编译产物是 Runnable[I, O]
[https://github.com/cloudwego/eino/blob/main/compose/graph_call_options.go](https://github.com/cloudwego/eino/blob/main/compose/graph_call_options.go)

option.go
包括 graph run 整体的配置,各类组件的配置,特定 Lambda 的配置等。

```go
// Option is a functional option type for calling a graph.
type Option struct {
options []any
nodeHandler []callbacks.Handler
keys []string
options []any
handler []callbacks.Handler

graphHandler []callbacks.Handler
graphOption []GraphRunOption
}
paths []*NodePath

// 可指定 NodeKey 定点生效 CallOption
func (o Option) DesignateNode(key ...string) Option {
o.keys = append(o.keys, key...)
return o
maxRunSteps int
}

func WithChatModelOption(opts ...model.Option) Option {
o := make([]any, 0, len(opts))
for i := range opts {
o = append(o, opts[i])
}
return Option{
options: o,
keys: make([]string, 0),
// DesignateNode set the key of the node which will the option be applied to.
// notice: only effective at the top graph.
// e.g.
//
// embeddingOption := compose.WithEmbeddingOption(embedding.WithModel("text-embedding-3-small"))
// runnable.Invoke(ctx, "input", embeddingOption.DesignateNode("my_embedding_node"))
func (o Option) DesignateNode(key ...string) Option {
nKeys := make([]*NodePath, len(key))
for i, k := range key {
nKeys[i] = NewNodePath(k)
}
return o.DesignateNodeWithPath(nKeys...)
}
```

runnable.go

```go
type Runnable[I, O any] interface {
Invoke(ctx context.Context, input I, opts ...Option) (output O, err error)
Stream(ctx context.Context, input I, opts ...Option) (output *schema.StreamReader[O], err error)
Collect(ctx context.Context, input *schema.StreamReader[I], opts ...Option) (output O, err error)
Transform(ctx context.Context, input *schema.StreamReader[I], opts ...Option) (output *schema.StreamReader[O], err error)
// DesignateNodeWithPath sets the path of the node(s) to which the option will be applied to.
// You can make the option take effect in the subgraph by specifying the key of the subgraph.
// e.g.
// DesignateNodeWithPath({"sub graph node key", "node key within sub graph"})
func (o Option) DesignateNodeWithPath(path ...*NodePath) Option {
o.paths = append(o.paths, path...)
return o
}
```

Graph 调用

```go
g := NewGraph[map[string]any, *schema.Message]()

_nodeOfModel := &openai.ChatModel{}_

err = g.AddChatModelNode("openAIModel", _nodeOfModel_)

r, err := g.Compile()

// 默认情况下,WithXXX() 的 Option 方法是按照 Component 的类型进行分发的
// 同一个 WithXXX() 会对同一种 Component 的不同实例同时生效
// 必要情况下可通过指定 NodeKey,仅针对一个 Node 生效 WithXXX() 方法
out, err = r.Invoke(ctx, in, WithChatModelOption(
openai.WithAKSK("ak", "sk"),
openai.WithURL("url"),
),
// 这组 CallOption 仅针对 openAIModel 这个节点生效
WithChatModelOption(
model.WithModel("gpt-3.5-turto"),
openai.WithAPIKey("xxxx"),
).DesignateNode("openAIModel"),
)
// WithEmbeddingOption is a functional option type for embedding component.
// e.g.
//
// embeddingOption := compose.WithEmbeddingOption(embedding.WithModel("text-embedding-3-small"))
// runnable.Invoke(ctx, "input", embeddingOption)
func WithEmbeddingOption(opts ...embedding.Option) Option {
return withComponentOption(opts...)
}
```

## 编排中的 CallOption

CallOption 可以按需分配给 Graph 中不同的节点。
compose.Option 可以按需分配给 Graph 中不同的节点。

![](/img/eino/graph_runnable_after_compile.png)

Expand Down
Original file line number Diff line number Diff line change
@@ -1,19 +1,12 @@
---
Description: ""
date: "2025-01-15"
date: "2025-01-20"
lastmod: ""
tags: []
title: 'Eino: Callback 用户手册'
weight: 0
---

> 💡
> TL;DR
>
> 长文,用意是“明确的、无歧义的、充分的”说明 Eino Callback 设计、实现和使用方式的各方面,可用作解决某个具体问题的工具参考,也可以作为入门后想要更进一步了解细节的一个途径。
>
> 快速入门请移步 :[Eino: 公共切面 - Callbacks](/zh/docs/eino/core_modules/chain_and_graph_orchestration/callbacks_common_aspects)
## 解决的问题

Component(包括 Lambda)、Graph 编排共同解决“把业务逻辑定义出来”的问题。而 logging, tracing, metrics, 上屏展示等横切面性质的功能,需要有机制把功能注入到 Component(包括 Lambda)、Graph 中。
Expand Down Expand Up @@ -121,8 +114,6 @@ type CallbackInput struct {
Messages []*schema.Message
// Tools is the tools to be used in the model.
Tools []*schema.ToolInfo
// ToolChoice is the tool choice, which controls the tool to be used in the model.
ToolChoice any // string / *schema.ToolInfo
// Config is the config for the model.
Config *Config
// Extra is the extra information for the callback.
Expand Down Expand Up @@ -194,6 +185,8 @@ Graph 会为内部所有的 Node 自动注入 RunInfo。机制是每个 Node 的

## 触发方式

![](/img/eino/graph_node_callback_run_place.png)

### 组件实现内部触发(Component Callback)

在组件实现的代码中,调用 callbacks 包中的 `OnStart(), OnEnd(), OnError(), OnStartWithStreamInput(), OnEndWithStreamInput()`。以 Ark 的 ChatModel 实现为例,在 Generate 方法中:
Expand Down Expand Up @@ -444,3 +437,15 @@ Handler 内不建议修改 input / output。原因是:
不同 Handler 之间,没有执行顺序的保证,因此不建议通过上面的机制在不同 Handler 间传递信息。本质上是无法保证某一个 Handler 返回的 context,一定会进入下一个 Handler 的函数执行中。

如果需要在不同 Handler 之间传递信息,建议的方式是在最外层的 context(如 graph 执行时传入的 context)中,设置一个全局的、请求维度的变量作为公共信息的存取空间,在各个 Handler 中按需读取和更新这个公共变量。在有 stream 的情况下,可能需要格外注意和保证这个公共变量的并发安全。

### 流切记要 Close

以存在 ChatModel 这种具有真流输出的节点为例,当存在 Callback 切面时,ChatModel 的输出流:
- 既要被下游节点作为输入来消费,又要被 Callback 切面来消费
- 一个流中的一个帧(Chunk),只能被一个消费方消费到,即流不是广播模型

所以此时需要将流进行复制,其复制关系如下:

![](/img/eino/graph_stream_chunk_copy.png)

- 如果其中一个 Callback n 没有 Close 对应的流,可能导致原始 Stream 无法 Close 和释放资源。
Loading

0 comments on commit b324caa

Please sign in to comment.