Skip to content

Commit

Permalink
fix: invoke reconnect when readInternal return error (#188)
Browse files Browse the repository at this point in the history
Co-authored-by: Yo Eight <[email protected]>
  • Loading branch information
itgram and YoEight authored Nov 12, 2024
1 parent 88d8c74 commit c29aa7d
Show file tree
Hide file tree
Showing 4 changed files with 106 additions and 2 deletions.
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ clusterNode: ## Run tests against a cluster node.
@$(DOCKER_COMPOSE_CMD) -f cluster-docker-compose.yml up -d
@echo "Waiting for services to be fully ready..."
@sleep 5
@EVENTSTORE_INSECURE=false CLUSTER=true go test -count=1 -v ./esdb -run 'TestStreams|TestPersistentSubscriptions|TestProjections'
@EVENTSTORE_INSECURE=false CLUSTER=true go test -count=1 -v ./esdb -run 'TestStreams|TestPersistentSubscriptions|TestProjections|TestClusterRebalance'
@$(DOCKER_COMPOSE_CMD) -f cluster-docker-compose.yml down --remove-orphans

.PHONY: misc
Expand Down
23 changes: 22 additions & 1 deletion esdb/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,9 @@ import (
"io"
"strconv"

"github.com/EventStore/EventStore-Client-Go/v4/protos/gossip"
persistentProto "github.com/EventStore/EventStore-Client-Go/v4/protos/persistent"
"github.com/EventStore/EventStore-Client-Go/v4/protos/shared"
"google.golang.org/grpc"
"google.golang.org/grpc/metadata"

Expand Down Expand Up @@ -759,7 +761,9 @@ func readInternal(
result, err := streamsClient.Read(ctx, readRequest, callOptions...)
if err != nil {
defer cancel()
return nil, err

err = client.grpcClient.handleError(handle, trailers, err)
return nil, fmt.Errorf("could not construct read operation. Reason: %w", err)
}

params := readStreamParams{
Expand All @@ -773,3 +777,20 @@ func readInternal(

return newReadStream(params), nil
}

func (client *Client) Gossip(ctx context.Context) ([]*gossip.MemberInfo, error) {
handle, err := client.grpcClient.getConnectionHandle()

if err != nil {
return nil, err
}

gossipClient := gossip.NewGossipClient(handle.Connection())
clusterInfo, err := gossipClient.Read(ctx, &shared.Empty{})

if err != nil {
return nil, err
}

return clusterInfo.Members, nil
}
4 changes: 4 additions & 0 deletions esdb/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -126,3 +126,7 @@ func TestMisc(t *testing.T) {
TestPositionParsing(t)
UUIDParsingTests(t)
}

func TestClusterRebalance(t *testing.T) {
ClusterRebalanceTests(t)
}
79 changes: 79 additions & 0 deletions esdb/cluster_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,14 @@ package esdb_test

import (
"context"
"crypto/tls"
"fmt"
"net/http"
"testing"
"time"

"github.com/EventStore/EventStore-Client-Go/v4/esdb"
"github.com/EventStore/EventStore-Client-Go/v4/protos/gossip"

"github.com/stretchr/testify/assert"
)
Expand All @@ -16,6 +20,12 @@ func ClusterTests(t *testing.T) {
})
}

func ClusterRebalanceTests(t *testing.T) {
t.Run("ClusterRebalanceTests", func(t *testing.T) {
t.Run("readStreamAfterClusterRebalance", readStreamAfterClusterRebalance)
})
}

func notLeaderExceptionButWorkAfterRetry(t *testing.T) {
// Seems on GHA, we need to try more that once because the name generator is not random enough.
for count := 0; count < 10; count++ {
Expand Down Expand Up @@ -57,3 +67,72 @@ func notLeaderExceptionButWorkAfterRetry(t *testing.T) {

t.Fatalf("we retried long enough but the test is still failing")
}

func readStreamAfterClusterRebalance(t *testing.T) {
// We purposely connect to a leader node.
db := CreateClient("esdb://admin:changeit@localhost:2111,localhost:2112,localhost:2113?nodepreference=leader&tlsverifycert=false", t)
defer db.Close()

ctx := context.Background()
streamID := NAME_GENERATOR.Generate()

// Start reading the stream
options := esdb.ReadStreamOptions{From: esdb.Start{}}

stream, err := db.ReadStream(ctx, streamID, options, 10)
if err != nil {
t.Errorf("failed to read stream: %v", err)
return
}

stream.Close()

// Simulate leader node failure
members, err := db.Gossip(ctx)

assert.Nil(t, err)

for _, member := range members {
if member.State != gossip.MemberInfo_Leader || !member.GetIsAlive() {
continue
}

// Shutdown the leader node
url := fmt.Sprintf("https://%s:%d/admin/shutdown", member.HttpEndPoint.Address, member.HttpEndPoint.Port)
t.Log("Shutting down leader node: ", url)

req, err := http.NewRequest("POST", url, nil)
assert.Nil(t, err)

req.SetBasicAuth("admin", "changeit")
client := &http.Client{
Transport: &http.Transport{
TLSClientConfig: &tls.Config{InsecureSkipVerify: true},
},
}
resp, err := client.Do(req)

assert.Nil(t, err)
resp.Body.Close()

break
}

// Wait for the cluster to rebalance
time.Sleep(5 * time.Second)

// Try reading the stream again
for count := 0; count < 10; count++ {
stream, err = db.ReadStream(ctx, streamID, options, 10)
if err != nil {
continue
}

stream.Close()

t.Logf("Successfully read stream after %d retries", count+1)
return
}

t.Fatalf("we retried long enough but the test is still failing")
}

0 comments on commit c29aa7d

Please sign in to comment.