-
Notifications
You must be signed in to change notification settings - Fork 253
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
- Loading branch information
1 parent
6d5da26
commit 976a2b7
Showing
1 changed file
with
77 additions
and
0 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,77 @@ | ||
--- | ||
title: "流元消息透传最佳实践" | ||
date: 2025-01-10 | ||
weight: 1 | ||
keywords: ["流元消息透传最佳实践"] | ||
description: "" | ||
--- | ||
|
||
## 前言 | ||
|
||
消息透传总体用法与 [Kitex - 元信息透传](https://bytedance.larkoffice.com/wiki/Y3ChwldJzihF4Vkb6Ekcie38no4)相似,区别在于每一个 stream 只有在**创建时才能透传元信息**,发送消息无法透传。 | ||
|
||
## 使用指南 | ||
|
||
### Client 透传元消息 | ||
|
||
```go | ||
ctx = metainfo.WithPersistentValue(ctx, "k1", "v1") | ||
ctx = metainfo.WithValue(ctx, "k2", "v2") | ||
|
||
s, err := streamClient.ClientStream(ctx) | ||
``` | ||
|
||
### Server 接收元消息 | ||
|
||
```go | ||
func (s *streamingService) ClientStream(ctx context.Context, | ||
stream streamx.ClientStreamingServer[Request, Response]) (*Response, error) { | ||
|
||
v, ok := metainfo.GetPersistentValue(ctx, "k1") | ||
// v == "v1" | ||
v, ok = metainfo.GetValue(ctx, "k2") | ||
// v == "v2" | ||
} | ||
``` | ||
|
||
### Server 反向透传元消息 | ||
|
||
反向透传引入了一个新的概念,Header 和 Trailer ,任何完整数据流必定包含 Header(头包) 和 Trailer(尾包),使用这两个帧来反向透传信息。 | ||
|
||
```go | ||
func (s *streamingService) ClientStream(ctx context.Context, | ||
stream streamx.ClientStreamingServer[Request, Response]) (*Response, error) { | ||
|
||
// SetTrailer set 的 trailer 会在 server handler 结束后发送 | ||
err := stream.SetTrailer(streamx.Trailer{"t1": "v1"}) | ||
// 立刻发送 Header | ||
err = stream.SendHeader(streamx.Header{"h1": "v1"}) | ||
if err != nil { | ||
return err | ||
} | ||
// 发送正常数据 | ||
err = stream.Send(req) | ||
} | ||
``` | ||
|
||
### Client 接收反向透传的元消息 | ||
|
||
```go | ||
s, err := streamClient.ClientStream(ctx) | ||
|
||
// Header/Trailer 函数会一直阻塞到对端发送了 Header/Trailer 为止,或中间发生了错误 | ||
hd, err := s.Header() | ||
// hd["h1"] == "v1" | ||
tl, err := s.Trailer() | ||
// tl["t1"] == "v1" | ||
``` | ||
|
||
## FAQ | ||
|
||
### 为什么反向透传接口与 PingPong 的反向透传不一致 | ||
|
||
因为流的概念与 PingPong 不同,流下,Header 是可以独立单独发送的,也就是说我 Server 可以在第一秒发送 Header, 过 10s 后发送 Data, 再过 1s 后发送 Trailer。 | ||
|
||
与此同时,客户端也可以选择是否要调用 `.Header()` 或 `.Trailer()` 来阻塞。 | ||
|
||
所以传统 ctx 的语义无法满足流的反向透传功能了。正向透传依然与原先的 PingPong 保持一致。 |