-
Notifications
You must be signed in to change notification settings - Fork 253
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
- Loading branch information
1 parent
976a2b7
commit c30f237
Showing
1 changed file
with
83 additions
and
0 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,83 @@ | ||
--- | ||
title: "流错误处理最佳实践" | ||
date: 2025-01-10 | ||
weight: 1 | ||
keywords: ["流错误处理最佳实践"] | ||
description: "" | ||
--- | ||
|
||
## 前言 | ||
|
||
与 PingPong RPC 不同,流的错误可以发生在一个流处理的任何时候,例如 server 可以在发送多条消息后,再返回一个错误。但是一旦一个流发送完错误后,就不能再发送任何消息。 | ||
|
||
## 错误类型 | ||
|
||
### 业务异常 | ||
|
||
**使用范例**:例如 ChatGPT 场景,我们需要不停检查用户账户余额是否能继续调用大模型生成返回。 | ||
|
||
Server 实现: | ||
|
||
```go | ||
func (si *streamingService) ServerStreamWithErr(ctx context.Context, req *Request, stream streamx.ServerStreamingServer[Response]) error { | ||
// 检查用户账户余额 | ||
for isHasBalance (req.UserId) { | ||
stream.Send(ctx, res) | ||
} | ||
// 返回用户余额不足错误 | ||
bizErr := kerrors.NewBizStatusErrorWithExtra( | ||
10001, "insufficient user balance", map[string]string{"testKey": "testVal"}, | ||
) | ||
return bizErr | ||
} | ||
``` | ||
|
||
Client 实现: | ||
|
||
```go | ||
svrStream, err = streamClient.ServerStreamWithErr(ctx, req) | ||
|
||
var err error | ||
for { | ||
res, err = stream.Recv(ctx) | ||
if err != nil { | ||
break | ||
} | ||
} | ||
bizErr, ok := kerrors.FromBizStatusError(err) | ||
if ok { | ||
println(bizErr.BizStatusCode(), bizErr.BizMessage(), bizErr.BizExtra()) | ||
} | ||
``` | ||
|
||
### 其他错误 | ||
|
||
如果 Server 返回的 Error 为非业务异常,框架会统一封装为 `(*thrift.ApplicationException)` 。此时只能拿到错误的 Message 。 | ||
|
||
Server 实现: | ||
|
||
```go | ||
func (si *streamingService) ServerStreamWithErr(ctx context.Context, req *Request, stream streamx.ServerStreamingServer[Response]) error { | ||
// ... | ||
return errors.New("test error") | ||
} | ||
``` | ||
|
||
Client 实现: | ||
|
||
```go | ||
svrStream, err = streamClient.ServerStreamWithErr(ctx, req) | ||
test.Assert(t, err == nil, err) | ||
|
||
var err error | ||
for { | ||
res, err = stream.Recv(ctx) | ||
if err != nil { | ||
break | ||
} | ||
} | ||
ex, ok := err.(*thrift.ApplicationException) | ||
if ok { | ||
println(ex.TypeID(), ex.Msg()) | ||
} | ||
``` |