Skip to content

Commit

Permalink
[Go client] Recover from write stream failures (#521)
Browse files Browse the repository at this point in the history
Get rid of failed write-streams in Go client SDK. 

This is similar change to
streamnative/oxia-java#180 in Java SDK.
  • Loading branch information
merlimat committed Sep 20, 2024
1 parent f21df14 commit da3a76c
Show file tree
Hide file tree
Showing 2 changed files with 11 additions and 3 deletions.
2 changes: 1 addition & 1 deletion oxia/internal/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,7 @@ func (e *executorImpl) writeStream(shardId *int64) (*streamWrapper, error) {
e.RLock()

sw, ok := e.writeStreams[*shardId]
if ok {
if ok && !sw.failed.Load() {
e.RUnlock()
return sw, nil
}
Expand Down
12 changes: 10 additions & 2 deletions oxia/internal/write_stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"io"
"log/slog"
"sync"
"sync/atomic"

"github.com/streamnative/oxia/common"
"github.com/streamnative/oxia/proto"
Expand All @@ -29,6 +30,7 @@ type streamWrapper struct {

stream proto.OxiaClient_WriteStreamClient
pendingRequests []common.Future[*proto.WriteResponse]
failed atomic.Bool
}

func newStreamWrapper(stream proto.OxiaClient_WriteStreamClient) *streamWrapper {
Expand All @@ -48,6 +50,7 @@ func (sw *streamWrapper) Send(ctx context.Context, req *proto.WriteRequest) (*pr
sw.Lock()
sw.pendingRequests = append(sw.pendingRequests, f)
if err := sw.stream.Send(req); err != nil {
sw.failed.Store(true)
sw.Unlock()
return nil, err
}
Expand All @@ -62,17 +65,23 @@ func (sw *streamWrapper) handleStreamClosed() {

// Fail all pending requests
sw.Lock()
defer sw.Unlock()

for _, f := range sw.pendingRequests {
f.Fail(io.EOF)
}
sw.pendingRequests = nil
sw.Unlock()
sw.failed.Store(true)
}

func (sw *streamWrapper) handleResponses() {
for {
response, err := sw.stream.Recv()
sw.Lock()

if err != nil {
sw.failed.Store(true)
sw.Unlock()
return
}

Expand All @@ -81,7 +90,6 @@ func (sw *streamWrapper) handleResponses() {
slog.Any("err", err),
)

sw.Lock()
var f common.Future[*proto.WriteResponse]
f, sw.pendingRequests = sw.pendingRequests[0], sw.pendingRequests[1:]
sw.Unlock()
Expand Down

0 comments on commit da3a76c

Please sign in to comment.