-
Notifications
You must be signed in to change notification settings - Fork 262
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
doc: sync Stream Error Handling (#1194)
- Loading branch information
1 parent
0865744
commit 21e2598
Showing
1 changed file
with
83 additions
and
0 deletions.
There are no files selected for viewing
83 changes: 83 additions & 0 deletions
83
.../en/docs/kitex/Tutorials/basic-feature/Kitex+StreamX+-+Stream+Error+Handling.md
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,83 @@ | ||
--- | ||
title: "Stream Error Handling" | ||
date: 2025-01-13 | ||
weight: 1 | ||
keywords: ["Stream Error Handling"] | ||
description: "" | ||
--- | ||
|
||
## Preface | ||
|
||
Unlike RPC, stream errors can occur at any time during stream processing. For example, a server can return an error after sending multiple messages. However, once a stream has sent an error, it cannot send any more messages. | ||
|
||
## Error type | ||
|
||
### Business exception | ||
|
||
**Usage example** : For example, in the ChatGPT scenario, we need to constantly check whether the user account balance can continue to call the large model to generate returns. | ||
|
||
Server implementation: | ||
|
||
```go | ||
func (si *streamingService) ServerStreamWithErr(ctx context.Context, req *Request, stream streamx.ServerStreamingServer[Response]) error { | ||
// 检查用户账户余额 | ||
for isHasBalance (req.UserId) { | ||
stream.Send(ctx, res) | ||
} | ||
// 返回用户余额不足错误 | ||
bizErr := kerrors.NewBizStatusErrorWithExtra( | ||
10001, "insufficient user balance", map[string]string{"testKey": "testVal"}, | ||
) | ||
return bizErr | ||
} | ||
``` | ||
|
||
Client implementation: | ||
|
||
```go | ||
svrStream, err = streamClient.ServerStreamWithErr(ctx, req) | ||
|
||
var err error | ||
for { | ||
res, err = stream.Recv(ctx) | ||
if err != nil { | ||
break | ||
} | ||
} | ||
bizErr, ok := kerrors.FromBizStatusError(err) | ||
if ok { | ||
println(bizErr.BizStatusCode(), bizErr.BizMessage(), bizErr.BizExtra()) | ||
} | ||
``` | ||
|
||
### Other errors | ||
|
||
If the Error returned by the Server is a non-business exception, the framework will be uniformly encapsulated as `(* thrift.ApplicationException) `. At this time, only the error Message can be obtained. | ||
|
||
Server implementation: | ||
|
||
```go | ||
func (si *streamingService) ServerStreamWithErr(ctx context.Context, req *Request, stream streamx.ServerStreamingServer[Response]) error { | ||
// ... | ||
return errors.New("test error") | ||
} | ||
``` | ||
|
||
Client implementation: | ||
|
||
```go | ||
svrStream, err = streamClient.ServerStreamWithErr(ctx, req) | ||
test.Assert(t, err == nil, err) | ||
|
||
var err error | ||
for { | ||
res, err = stream.Recv(ctx) | ||
if err != nil { | ||
break | ||
} | ||
} | ||
ex, ok := err.(*thrift.ApplicationException) | ||
if ok { | ||
println(ex.TypeID(), ex.Msg()) | ||
} | ||
``` |