Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

doc: sync 基础流编程 #1183

Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
253 changes: 253 additions & 0 deletions content/zh/docs/kitex/Kitex+StreamX+-+基础流编程.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,253 @@
---
title: "基础流编程"
date: 2025-01-10
weight: 1
keywords: ["基础流编程"]
description: ""
---

## 选择协议

当前支持:

- **TTHeader Streaming**

- 传输协议:TTHeader
- IDL 定义语言 与 序列化协议:Thrift
- **gRPC Streaming**:~~~~ (计划实现)

- ~~传输协议:gRPC~~
- ~~IDL 定义语言 与 序列化协议:Protobuf 编码~~

此处选定的协议只影响从 IDL 生成代码,无论哪种协议,以下用法均一致。

## 使用方法

### 生成代码

#### 定义 IDL

##### Thrift

文件 `echo.thrift`:

```go
namespace go echo

service TestService {
Response PingPong(1: Request req) // PingPong 非流式接口

Response Echo (1: Request req) (streaming.mode="bidirectional"),
Response EchoClient (1: Request req) (streaming.mode="client"),
Response EchoServer (1: Request req) (streaming.mode="server"),
}
```

#### 生成代码

为保持与旧流式生成代码的兼容,命令行需加上 `-streamx` flag。

```
kitex -streamx -module <go module> -service P.S.M echo.thrift
```

##### 初始化

#### 创建 Client

```go
// 生成代码目录,streamserver 为 IDL 定义的 service name
import ".../kitex_gen/echo/testservice"
import "github.com/cloudwego/kitex/client/streamxclient"

cli, err := testservice.NewClient(
"a.b.c",
streamxclient.WithStreamRecvMiddleware(...),
streamxclient.WithStreamSendMiddleware(...),
)
```

#### 创建 Server

```go
import ".../kitex_gen/echo/streamserver"
import "github.com/cloudwego/kitex/server/streamxserver"

svr := streamserver.NewServer(
new(serviceImpl),
streamxserver.WithStreamRecvMiddleware(...),
streamxserver.WithStreamSendMiddleware(...),
)
```

### Client Streaming

#### 使用场景

Client 需要发送多份数据给 Server 端,Server 端可以发送一条消息给 Client:

```go
------------------- [Client Streaming] -------------------
--------------- (stream Req) returns (Res) ---------------
client.Send(req) === req ==> server.Recv(req)
...
client.Send(req) === req ==> server.Recv(req)

client.CloseSend() === EOF ==> server.Recv(EOF)
client.Recv(res) <== res === server.SendAndClose(res)
** OR
client.CloseAndRecv(res) === EOF ==> server.Recv(EOF)
<== res === server.SendAndClose(res)
```

#### Client 用法

- [**必须**]: client 必须调用 CloseAndRecv() 或者 (CloseSend + Recv)方法,告知 Server 不再有新数据发送。

```go
ctx, cs, err := cli.ClientStream(ctx)
for i := 0; i < 3; i++ {
err = cs.Send(ctx, req)
}
res, err = cs.CloseAndRecv(ctx)
```

#### Server 用法

- [**必须**]: server 必须在 handler 结束时返回一个 Response,告知 Client 最终结果。

```go

func (si *serviceImpl) ClientStream(
ctx context.Context, stream streamx.ClientStreamingServer[Request, Response]
) (res *Response, err error) {
for {
req, err := stream.Recv(ctx)
if err == io.EOF {
res := new(Response)
return res, nil
}
if err != nil {
return nil, err
}
}
}
```

### Server Streaming

#### 使用场景

典型场景:ChatGPT 类型业务

Client 发送一个请求给 Server,Server 发送多个返回给 Client:

```go
------------------- [Server Streaming] -------------------
---------- (Request) returns (stream Response) ----------
client.Send(req) === req ==> server.Recv(req)
client.Recv(res) <== res === server.Send(req)
...
client.Recv(res) <== res === server.Send(req)
client.Recv(EOF) <== EOF === server handler return
```

#### Client 用法

- [**必须**]: client 必须判断 io.EOF 错误,并结束循环

```go
ctx, ss, err := cli.ServerStream(ctx, req)
for {
res, err := ss.Recv(ctx)
if errors.Is(err, io.EOF) {
break
}
}
```

#### Server 用法

```go
func (si *serviceImpl) ServerStream(ctx context.Context, req *Request, stream streamx.ServerStreamingServer[Response]) error {
for i := 0; i < 3; i++ {
err := stream.Send(ctx, resp)
if err != nil {
return err
}
}
return nil
}
```

### Bidirectional Streaming

#### 使用场景

Client 与 Server 可能需要或者未来有可能需要发送多条消息:

```go
----------- [Bidirectional Streaming] -----------
--- (stream Request) returns (stream Response) ---
* goroutine 1 *
client.Send(req) === req ==> server.Recv(req)
...
client.Send(req) === req ==> server.Recv(req)
client.CloseSend() === EOF ==> server.Recv(EOF)

* goroutine 2 *
client.Recv(res) <== res === server.Send(req)
...
client.Recv(res) <== res === server.Send(req)
client.Recv(EOF) <== EOF === server handler return
```

#### Client 用法

- [**必须**]: client 必须在发送结束后调用 CloseSend
- [**必须**]: client 必须在 Recv 时,判断 io.EOF 并结束循环

```go
ctx, bs, err := cli.BidiStream(ctx)
var wg sync.WaitGroup
wg.Add(2)
go func() {
defer wg.Done()
for i := 0; i < round; i++ {
err := bs.Send(ctx, req)
}
err = bs.CloseSend(ctx)
}()
go func() {
defer wg.Done()
for {
res, err := bs.Recv(ctx)
if errors.Is(err, io.EOF) {
break
}
}
}()
wg.Wait()
```

#### Server 用法

- [**必须**]: server 必须在 Recv 时,判断 io.EOF 并结束循环

```go
func (si *serviceImpl) BidiStream(ctx context.Context, stream streamx.BidiStreamingServer[Request, Response]) error {
for {
req, err := stream.Recv(ctx)
if err == io.EOF {
return nil
}
if err != nil {
return err
}
err = stream.Send(ctx, resp)
if err != nil {
return err
}
}
}
```
Loading