Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

doc: sync 流中间件最佳实践 #1184

Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
93 changes: 93 additions & 0 deletions content/zh/docs/kitex/Kitex+StreamX+-+流中间件最佳实践.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
---
title: "流中间件最佳实践"
date: 2025-01-10
weight: 1
keywords: ["流中间件最佳实践"]
description: ""
---

## 中间件类型

### Stream Recv/Send Middleware

**触发时机**:流收发消息时调用

#### 类型定义

```go
type StreamRecvEndpoint func(ctx context.Context, stream Stream, res any) (err error)
type StreamSendEndpoint func(ctx context.Context, stream Stream, req any) (err error)

type StreamRecvMiddleware func(next StreamRecvEndpoint) StreamRecvEndpoint
type StreamSendMiddleware func(next StreamSendEndpoint) StreamSendEndpoint
```

**参数说明**:

- stream 直接获取当前的流对象
- res/req 均代表真实请求和响应。
- Next 函数调用前后的行为:

| 中间件类型 | Next 调用前 | Next 调用后 |
| -------------------- | ----------------------------------------------------------------------- | ---------------------------------------------------- |
| StreamRecvMiddleware | - 数据未真正收,刚调用 stream.Recv() 函数<br><br>- res 参数为空 | - 数据已收到或遇到错误<br><br>- res 参数有真实值 |
| StreamSendMiddleware | - 数据未真正发送,刚调用 stream.Send() 函数<br><br>- req 参数为真实请求 | - 数据发送完成或遇到错误<br><br>- req 参数为真实请求 |

#### 使用范例

**使用场景**:流收/发消息时,注入相关业务逻辑。

```go
svr, err := xxx.NewServer(
//...
streamxserver.WithStreamRecvMiddleware(func(next streamx.StreamRecvEndpoint) streamx.StreamRecvEndpoint {
return func(ctx context.Context, stream streamx.Stream, res any) (err error) {
// ctx 依然含有用户透传的 token
token, ok := metainfo.GetPersistentValue(ctx, "user_token")
// 检查 token 是否有账户余额继续维持会话
if !hasBalance(token) {
return fmt.Errorf("user dont have enough balance: token=%s", token)
}
return next(ctx, stream, res)
}
}),
)
```

## 注入中间件

#### 注入 Client Middleware

```go
cli, err := xxx.NewClient(
"a.b.c",
streamxclient.WithStreamRecvMiddleware(func(next streamx.StreamRecvEndpoint) streamx.StreamRecvEndpoint {
return func(ctx context.Context, stream streamx.Stream, res any) (err error) {
return next(ctx, stream, res)
}
}),
streamxclient.WithStreamSendMiddleware(func(next streamx.StreamSendEndpoint) streamx.StreamSendEndpoint {
return func(ctx context.Context, stream streamx.Stream, req any) (err error) {
return next(ctx, stream, req)
}
}),
)
```

#### 注入 Server Middleware

```go
server, err := xxx.NewServer(
// ....
streamxserver.WithStreamRecvMiddleware(func(next streamx.StreamRecvEndpoint) streamx.StreamRecvEndpoint {
return func(ctx context.Context, stream streamx.Stream, res any) (err error) {
return next(ctx, stream, res)
}
}),
streamxserver.WithStreamSendMiddleware(func(next streamx.StreamSendEndpoint) streamx.StreamSendEndpoint {
return func(ctx context.Context, stream streamx.Stream, req any) (err error) {
return next(ctx, stream, req)
}
}),
)
```
Loading