From 29d2f59d460d6871faa892bca8bd71d0c7dccde5 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jos=C3=A9=20Augusto=20Bolina?= Date: Sun, 28 Feb 2021 22:07:00 -0300 Subject: [PATCH] Polishing etcd backend. (#5) Applying some fixes and going simple now on the EtcdCoordinator. Added some tests to verify that commands are delivered in the same order even when multiple replicas exists. After some digging, find out that there is no need to use the KV to send requests, this can be done with the ETCD client directly. So this was removed, and also there is no need to synchronously send commands nor treat watch events asynchonously. Added a load benchmark test to be verified with `pprof`. Any optimization will be done later. --- Makefile | 5 +- README.md | 12 +-- _examples/peer.go | 8 +- internal/coordinator.go | 141 +----------------------- internal/core.go | 30 ++++-- internal/etcd_coordinator.go | 92 ++++++++++++++++ internal/event.go | 23 +++- pkg/relt/configuration.go | 22 ++-- pkg/relt/relt.go | 12 ++- pkg/relt/transport.go | 32 ++---- pkg/relt/util.go | 1 + test/etcd_test.go | 202 +++++++++++++++++++++++++++++++++++ test/helper.go | 11 ++ test/load_test.go | 109 +++++++++++++++++++ test/synchronization_test.go | 99 +++++++++++++++++ 15 files changed, 599 insertions(+), 200 deletions(-) create mode 100644 internal/etcd_coordinator.go create mode 100644 test/etcd_test.go create mode 100644 test/load_test.go diff --git a/Makefile b/Makefile index 2ceba37..06fa9c1 100644 --- a/Makefile +++ b/Makefile @@ -29,6 +29,9 @@ deps: # @HELP install dependencies @echo "getting dependencies" go get -t -d -v ./... +benchmark: # @HELP run benchmark and generate files for pprof + go test -bench=. -run=^$ -benchmem -cpuprofile profile.out -memprofile memprofile.out ./test/... + build: # @HELP build the packages sh $(PWD)/scripts/build.sh @@ -37,4 +40,4 @@ ci: deps test_rule dep-linter lint all: deps test_rule lint -.PHONY: all build \ No newline at end of file +.PHONY: all build diff --git a/README.md b/README.md index a03cadc..87b3c46 100644 --- a/README.md +++ b/README.md @@ -2,13 +2,13 @@ ![Go](https://github.com/jabolina/relt/workflows/Go/badge.svg) -Primitives for Reliable Transport. At this point, the "reliable transport" is the AMQP protocol implemented -by the RabbitMQ and that's it. +A primitive for reliable communication, backed by the atomic broadcast protocol. An example can be found on the +`_examples` folder, the current atomic broadcast implementation is the [etcd](https://github.com/etcd-io/etcd) +version, which means that an etcd server is needed. Since this will be used primarily on the [Generic Atomic Multicast](https://github.com/jabolina/go-mcast), to create -a simple structure, this Relt project will only by a RabbitMQ client with a high level API for sending messages to -fan-out exchanges. +a simple structure, with a high level API for sending messages to a group of processes. When the [Generic Atomic Multicast](https://github.com/jabolina/go-mcast) contains the basic structure, this reliable -transport will turn into a new whole project where will be implemented a generic atomic broadcast, to be used as a -communication primitive. +transport will turn into a new whole project where will be implemented a generic atomic broadcast, to be used as the +default reliable communication primitive. diff --git a/_examples/peer.go b/_examples/peer.go index 027bad5..3efc836 100644 --- a/_examples/peer.go +++ b/_examples/peer.go @@ -49,15 +49,15 @@ func consume(r *relt.Relt, ctx context.Context) { func main() { conf := relt.DefaultReltConfiguration() conf.Name = "local-test" - relt, _ := relt.NewRelt(*conf) + r, _ := relt.NewRelt(*conf) ctx, done := context.WithCancel(context.Background()) go func() { - produce(relt, os.Stdin, ctx) + produce(r, os.Stdin, ctx) }() go func() { - consume(relt, ctx) + consume(r, ctx) }() c := make(chan os.Signal, 1) @@ -69,5 +69,5 @@ func main() { }() <-ctx.Done() - relt.Close() + r.Close() } diff --git a/internal/coordinator.go b/internal/coordinator.go index 9131229..41e5143 100644 --- a/internal/coordinator.go +++ b/internal/coordinator.go @@ -2,23 +2,9 @@ package internal import ( "context" - "github.com/coreos/etcd/clientv3" "io" - "time" ) -// A single write requests to be applied to etcd. -type request struct { - // Issuer writer context. - ctx context.Context - - // Event to be sent to etcd. - event Event - - // Channel to send response back. - response chan error -} - // Configuration for the coordinator. type CoordinatorConfiguration struct { // Each Coordinator will handle only a single partition. @@ -38,7 +24,7 @@ type CoordinatorConfiguration struct { // Coordinator interface that should be implemented by the // atomic broadcast handler. // Commands should be issued through the coordinator to be delivered -// to other peers +// to other peers. type Coordinator interface { io.Closer @@ -48,130 +34,13 @@ type Coordinator interface { Watch(received chan<- Event) error // Issues an Event. - Write(ctx context.Context, event Event) <-chan error + // This will have the same effect as broadcasting a message + // for every participant on the destination. + Write(ctx context.Context, event Event) error } // Create a new Coordinator using the given configuration. // The current implementation is the EtcdCoordinator, backed by etcd. func NewCoordinator(configuration CoordinatorConfiguration) (Coordinator, error) { - cli, err := clientv3.New(clientv3.Config{ - DialTimeout: 30 * time.Second, - Endpoints: []string{configuration.Server}, - }) - if err != nil { - return nil, err - } - kv := clientv3.NewKV(cli) - ctx, cancel := context.WithCancel(configuration.Ctx) - coord := &EtcdCoordinator{ - configuration: configuration, - cli: cli, - kv: kv, - ctx: ctx, - cancel: cancel, - writeChan: make(chan request), - } - configuration.Handler.Spawn(coord.writer) - return coord, nil -} - -// EtcdCoordinator will use etcd for atomic broadcast. -type EtcdCoordinator struct { - // Configuration parameters. - configuration CoordinatorConfiguration - - // Current Coordinator context, created from the parent context. - ctx context.Context - - // Function to cancel the current context. - cancel context.CancelFunc - - // A client for the etcd server. - cli *clientv3.Client - - // The key-value entry point for issuing requests. - kv clientv3.KV - - // Channel to receive write requests. - writeChan chan request -} - -// Listen and apply write requests. -// This will keep running while the application context is available. -// Receiving commands through the channel will ensure that they are -// applied synchronously to the etcd. -func (e *EtcdCoordinator) writer() { - for { - select { - case <-e.ctx.Done(): - return - case req := <-e.writeChan: - _, err := e.kv.Put(req.ctx, req.event.Key, string(req.event.Value)) - req.response <- err - } - } -} - -// Starts a new coroutine for watching the Coordinator partition. -// All received information will be published back through the channel -// received as parameter. -// -// After calling a routine will run bounded to the application lifetime. -func (e *EtcdCoordinator) Watch(received chan<- Event) error { - watchChan := e.cli.Watch(e.ctx, e.configuration.Partition) - watchChanges := func() { - for response := range watchChan { - select { - case <-e.ctx.Done(): - return - default: - e.handleResponse(response, received) - } - } - } - e.configuration.Handler.Spawn(watchChanges) - return nil -} - -// Write the given event using the KV interface. -func (e *EtcdCoordinator) Write(ctx context.Context, event Event) <-chan error { - res := make(chan error) - e.writeChan <- request{ - ctx: ctx, - event: event, - response: res, - } - return res -} - -// Stop the etcd client connection. -func (e *EtcdCoordinator) Close() error { - e.cancel() - return e.cli.Close() -} - -// This method is responsible for handling events from the etcd client. -// -// This method will transform each received event into Event object and -// publish it back using the given channel. A buffered channel will be created -// and a goroutine will be spawned, so we can publish the received messages -// asynchronously without blocking. This can cause the Close to hold, if there -// exists pending messages to be consumed by the channel, this method can cause a deadlock. -func (e *EtcdCoordinator) handleResponse(response clientv3.WatchResponse, received chan<- Event) { - buffered := make(chan Event, len(response.Events)) - defer close(buffered) - - e.configuration.Handler.Spawn(func() { - for ev := range buffered { - received <- ev - } - }) - - for _, event := range response.Events { - buffered <- Event{ - Key: string(event.Kv.Key), - Value: event.Kv.Value, - Error: nil, - } - } + return newEtcdCoordinator(configuration) } diff --git a/internal/core.go b/internal/core.go index 1ea1723..f37dc7b 100644 --- a/internal/core.go +++ b/internal/core.go @@ -16,6 +16,7 @@ var ( coreAlreadyWatching = errors.New("already watching partition") ) +// Configuration for the Core interface. type CoreConfiguration struct { // Partition the Coordinator will work with. Partition string @@ -23,11 +24,14 @@ type CoreConfiguration struct { // Server address for the Coordinator. Server string - // Default timeout to be applied when handling channels. + // Default timeout to be applied when handling channels and + // asynchronous operations. DefaultTimeout time.Duration } // Holds all flags used to manage the Core state. +// This is the same as an AtomicBoolean and is used internally +// to manage the core states. type CoreFlags struct { // Flag for the shutdown state. shutdown Flag @@ -36,13 +40,19 @@ type CoreFlags struct { watching Flag } -// Core is the interface that will hold the Relt connection to the Coordinator. +// Core is the interface that will create the link between Relt requests +// and the Coordinator. // Every command issued will be parsed here, and every command received should // be handled here before going back to the client. +// Everything after this stage should care only about the atomic broadcast protocol +// and everything before should be abstracted as a simple communication primitive. +// This means that any parsing or state handling for the client should be done here. type Core interface { io.Closer // Start listening for new messages. + // This will receive messages from the atomic broadcast protocol + // and parse to an object the client can handle. Listen() (<-chan Message, error) // Send a message asynchronously for the given partition. @@ -66,12 +76,14 @@ type ReltCore struct { handler *GoRoutineHandler // Coordinator to issues commands and receive Event. + // The coordinator is the interface to reach the atomic + // broadcast protocol. coord Coordinator // Channel for sending Message to the client. output chan Message - // Flags for handling state. + // Flags for handling internal state. flags CoreFlags // Core configuration parameters. @@ -79,8 +91,8 @@ type ReltCore struct { } // Create a new ReltCore using the given configuration. -// As an effect, this will instantiate a Coordinator a failures -// can happen while handling connections between the peers. +// As an effect, this will instantiate a Coordinator a failure +// can happen while handling connection to the atomic broadcast server. func NewCore(configuration CoreConfiguration) (Core, error) { ctx, cancel := context.WithCancel(context.TODO()) handler := NewRoutineHandler() @@ -136,7 +148,7 @@ func (r *ReltCore) Listen() (<-chan Message, error) { } if r.flags.watching.Inactivate() { - events := make(chan Event, 100) + events := make(chan Event) if err := r.coord.Watch(events); err != nil { return nil, err } @@ -162,15 +174,14 @@ func (r *ReltCore) Listen() (<-chan Message, error) { // This is a broadcast message, which means that if _N_ nodes are // subscribed for a partition, every node will receive the message. func (r *ReltCore) Send(ctx context.Context, dest string, data []byte) <-chan error { - response := make(chan error, 1) + response := make(chan error) writeRequest := func() { if r.flags.shutdown.IsActive() { event := Event{ Key: dest, Value: data, } - err := <-r.coord.Write(ctx, event) - response <- err + response <- r.coord.Write(ctx, event) } else { response <- coreWasShutdown } @@ -192,6 +203,7 @@ func (r *ReltCore) Close() error { } r.finish() r.handler.Close() + return nil } return coreWasShutdown } diff --git a/internal/etcd_coordinator.go b/internal/etcd_coordinator.go new file mode 100644 index 0000000..93885c4 --- /dev/null +++ b/internal/etcd_coordinator.go @@ -0,0 +1,92 @@ +package internal + +import ( + "context" + "github.com/coreos/etcd/clientv3" + "time" +) + +// EtcdCoordinator will use etcd for atomic broadcast. +type EtcdCoordinator struct { + // Configuration parameters. + configuration CoordinatorConfiguration + + // Current Coordinator context, created from the parent context. + ctx context.Context + + // Function to cancel the current context. + cancel context.CancelFunc + + // A client for interacting with the etcd server. + cli *clientv3.Client +} + +// Creates a new coordinator that is backed by the etcd atomic broadcast. +// This method will connect to the etcd server configured, so a chance of failure +// exists at this step. +// Only a simple configuration is available here. +func newEtcdCoordinator(configuration CoordinatorConfiguration) (Coordinator, error) { + cli, err := clientv3.New(clientv3.Config{ + DialTimeout: 5 * time.Second, + Endpoints: []string{configuration.Server}, + }) + if err != nil { + return nil, err + } + ctx, cancel := context.WithCancel(configuration.Ctx) + coord := &EtcdCoordinator{ + configuration: configuration, + cli: cli, + ctx: ctx, + cancel: cancel, + } + return coord, nil +} + +// Starts a new coroutine for watching the Coordinator partition. +// All received information will be published back through the channel +// received as parameter. +// +// After calling a routine will run bounded to the application lifetime. +func (e *EtcdCoordinator) Watch(received chan<- Event) error { + watchChan := e.cli.Watch(e.ctx, e.configuration.Partition) + watchChanges := func() { + for response := range watchChan { + select { + case <-e.ctx.Done(): + return + default: + e.handleResponse(response, received) + } + } + } + e.configuration.Handler.Spawn(watchChanges) + return nil +} + +// Write the given event issuing a PUT request through the client. +func (e *EtcdCoordinator) Write(ctx context.Context, event Event) error { + _, err := e.cli.Put(ctx, event.Key, string(event.Value)) + return err +} + +// Stop the etcd client connection and stop the goroutines. +func (e *EtcdCoordinator) Close() error { + e.cancel() + return e.cli.Close() +} + +// This method is responsible for handling events from the etcd client. +// +// This method will transform each received event into Event object and +// publish it back using the given channel. This method will hang while +// the messages are not consumed. +func (e *EtcdCoordinator) handleResponse(response clientv3.WatchResponse, received chan<- Event) { + for _, event := range response.Events { + received <- Event{ + Key: string(event.Kv.Key), + Value: event.Kv.Value, + Error: nil, + } + } +} diff --git a/internal/event.go b/internal/event.go index de917b1..e6bc96a 100644 --- a/internal/event.go +++ b/internal/event.go @@ -3,12 +3,20 @@ package internal // Event is a structure handled by the Coordinator. // Events are received and issued through the atomic broadcast. type Event struct { - Key string + // Key affected by the event. + Key string + + // Value that should be applied if is sending the event or + // applied value if the event was received. Value []byte + + // Only used when is a received event from the atomic broadcast, + // this transport any error that happened. Error error } -func (e Event) isError() bool { +// Verify if the event is an error event. +func (e Event) IsError() bool { return e.Error != nil } @@ -24,7 +32,14 @@ func (e Event) toMessage() Message { // Message is the structure handled by the Core. // Messages are the available data sent to the client. type Message struct { + // Where the message was originated. + // If a client `A` sends a messages to client `B`, + // this value will be `B`. Origin string - Data []byte - Error error + + // Data transported. + Data []byte + + // If an error happened. + Error error } diff --git a/pkg/relt/configuration.go b/pkg/relt/configuration.go index 964a9a3..2d7cc13 100644 --- a/pkg/relt/configuration.go +++ b/pkg/relt/configuration.go @@ -13,24 +13,22 @@ var ( // Configuration used for creating a new instance // of the Relt. type Configuration struct { - // The Relt name. Is not required, if empty a - // random string will be generated to be used. - // This must be unique, since it will be used to declare - // the peer queue for consumption. + // The Relt name. Is not required, if empty a random string will + // be generated to be used. Name string - // Only plain auth is supported. The username + password - // will be passed in the connection URL. + // URL used for connecting to the atomic broadcast protocol. Url string - // This will be used to create an exchange on the RabbitMQ - // broker. If the user wants to declare its own name for the - // exchange, if none is passed, the value 'relt' will be used. + // This is the partition where the transport will act to. The client + // will listen for only messages where the destination is the configured + // partition. // // When declaring multiple partitions, this must be configured // properly, since this will dictate which peers received the // messages. If all peers are using the same exchange then - // is the same as all peers are a single partition. + // the clients will act as a single unity, where every peer + // will receive all messages in the same order. Exchange GroupAddress // Default timeout to be applied when handling asynchronous methods. @@ -38,10 +36,6 @@ type Configuration struct { } // Creates the default configuration for the Relt. -// The peer, thus the queue name will be randomly generated, -// the connection Url will connect to a local broker using -// the user `guest` and password `guest`. -// The default exchange will fallback to `relt`. func DefaultReltConfiguration() *Configuration { return &Configuration{ Name: GenerateUID(), diff --git a/pkg/relt/relt.go b/pkg/relt/relt.go index f0fa03b..db2855d 100644 --- a/pkg/relt/relt.go +++ b/pkg/relt/relt.go @@ -15,12 +15,17 @@ var ( // The implementation for the Transport interface // providing reliable communication between hosts. +// +// Every command must be issued through this struct, +// where a single object instance represents a peer that +// participates on the atomic broadcast protocol. type Relt struct { - // Holds the configuration about the core - // and the Relt transport. + // Holds the configuration. configuration Configuration // Holds the Core structure. + // Every command received will be prepared and sent + // through the core structure. core internal.Core } @@ -30,6 +35,9 @@ func (r *Relt) Consume() (<-chan internal.Message, error) { } // Implements the Transport interface. +// Will broadcast a message to all peers that listen to the destination. +// This method is bounded by the given context lifetime and by the +// configured timeout. func (r *Relt) Broadcast(ctx context.Context, message Send) error { if len(message.Address) == 0 { return ErrInvalidGroupAddress diff --git a/pkg/relt/transport.go b/pkg/relt/transport.go index 765aa54..ebcea92 100644 --- a/pkg/relt/transport.go +++ b/pkg/relt/transport.go @@ -6,28 +6,18 @@ import ( "io" ) -// When broadcasting or multicasting a message must provide -// the group address. +// When broadcasting a message must provide the group address. type GroupAddress string -// Denotes a received information or an error -// that occurred in the channel. -type Recv struct { - // Received data or nil if is an error event. - Data []byte - - // Returns an error back to the listener. - Error error -} - // Denotes an information that will be sent. -// By design, the group address cannot be empty -// and the Data to be sent cannot be nil, must be at least -// and empty slice. +// By design, the group address cannot be empty and the Data to be +// sent cannot be nil, must be at least an empty slice. type Send struct { // Which group the message will be sent. - // This is the name of the exchange that must receive - // the message and not an actual IP address. + // This is the name of the exchange that must receive the message + // and not an actual IP address. + // If multiple peers are listening, all of them will receive the + // message in the same order. Address GroupAddress // Data to be sent to the group. @@ -35,11 +25,6 @@ type Send struct { } // A interface to offer a high level API for transport. -// This transport is backed by a RabbitMQ using the quorum queues, -// that uses the Raft Protocol for consistency. -// Everything sent by this transport will not receive a direct -// response back, everything is just message passing simulating -// events that can occur. type Transport interface { io.Closer @@ -56,7 +41,6 @@ type Transport interface { // // A goroutine will be spawned and the message will be sent // through a channel, this channel is only closed when the - // Relt transport is closed, so if the the transport is already - // closed this function will panic. + // Relt transport is closed. Broadcast(ctx context.Context, message Send) error } diff --git a/pkg/relt/util.go b/pkg/relt/util.go index ae9454e..bd1b544 100644 --- a/pkg/relt/util.go +++ b/pkg/relt/util.go @@ -33,6 +33,7 @@ func GenerateRandomIP() (string, error) { return listener.Addr().String(), nil } +// Verify if the given value is a valid URL. func IsUrl(value string) bool { parsed, err := url.Parse(value) return err == nil && parsed.Scheme != "" diff --git a/test/etcd_test.go b/test/etcd_test.go new file mode 100644 index 0000000..e76b62d --- /dev/null +++ b/test/etcd_test.go @@ -0,0 +1,202 @@ +package test + +import ( + "context" + "fmt" + "github.com/coreos/etcd/clientv3" + "github.com/jabolina/relt/internal" + "go.uber.org/goleak" + "sync" + "testing" + "time" +) + +func Test_ShouldReceiveAllCommands(t *testing.T) { + defer goleak.VerifyNone(t) + partition := "etcd-coordinator" + testSize := 100 + clusterSize := 30 + ctx, cancel := context.WithCancel(context.Background()) + listenersGroup := &sync.WaitGroup{} + writerGroup := &sync.WaitGroup{} + initializeReplica := func(c internal.Coordinator, history *MessageHist) { + msgChan := make(chan internal.Event) + err := c.Watch(msgChan) + if err != nil { + t.Fatalf("failed starting to listen. %#v", err) + } + + go func() { + defer listenersGroup.Done() + for { + select { + case recv := <-msgChan: + if recv.Value == nil || len(recv.Value) == 0 { + t.Errorf("received wrong data") + } + + if recv.Error != nil { + t.Errorf("error on consumed response. %v", recv.Error) + } + + history.insert(string(recv.Value)) + case <-ctx.Done(): + return + } + } + }() + } + initializeCluster := func(size int) ([]internal.Coordinator, []*MessageHist) { + var coordinators []internal.Coordinator + var history []*MessageHist + for i := 0; i < size; i++ { + conf := internal.CoordinatorConfiguration{ + Partition: partition, + Server: "localhost:2379", + Ctx: ctx, + Handler: internal.NewRoutineHandler(), + } + coord, err := internal.NewCoordinator(conf) + if err != nil { + t.Fatalf("failed starting coordinator. %#v", err) + } + h := NewHistory() + initializeReplica(coord, h) + + coordinators = append(coordinators, coord) + history = append(history, h) + } + return coordinators, history + } + + listenersGroup.Add(clusterSize) + replicas, history := initializeCluster(clusterSize) + + entry := replicas[0] + writerGroup.Add(testSize) + for i := 0; i < testSize; i++ { + write := func(data []byte) { + defer writerGroup.Done() + err := entry.Write(ctx, internal.Event{ + Key: partition, + Value: data, + Error: nil, + }) + + if err != nil { + t.Errorf("failed broadcasting. %#v", err) + } + } + go write([]byte(fmt.Sprintf("%d", i))) + } + + writerGroup.Wait() + time.Sleep(10 * time.Second) + cancel() + listenersGroup.Wait() + + truth := history[0] + if truth.size() != testSize { + t.Errorf("should have size %d, found %d", testSize, truth.size()) + } + + for i, messageHist := range history { + diff := truth.compare(*messageHist) + if diff != 0 { + t.Errorf("history differ at %d with %d different commands", i, diff) + } + } + + for _, replica := range replicas { + err := replica.Close() + if err != nil { + t.Errorf("failed closing replica. %#v", err) + } + } +} + +func Test_ShouldHaveEventsWhenDirectConnection(t *testing.T) { + defer goleak.VerifyNone(t) + partition := "etcd-coordinator" + testSize := 100 + clusterSize := 30 + ctx, cancel := context.WithCancel(context.Background()) + listenersGroup := &sync.WaitGroup{} + writerGroup := &sync.WaitGroup{} + initializeReplica := func(c *clientv3.Client, history *MessageHist) { + watchChan := c.Watch(ctx, partition) + go func() { + defer listenersGroup.Done() + for { + select { + case res := <-watchChan: + for _, event := range res.Events { + history.insert(string(event.Kv.Value)) + } + case <-ctx.Done(): + return + } + } + }() + } + + initializeCluster := func(size int) ([]*clientv3.Client, []*MessageHist) { + var replicas []*clientv3.Client + var history []*MessageHist + for i := 0; i < size; i++ { + e, err := clientv3.New(clientv3.Config{ + Endpoints: []string{"localhost:2379"}, + DialTimeout: 5 * time.Second, + }) + if err != nil { + t.Fatalf("failed connecting. %#v", err) + } + h := NewHistory() + initializeReplica(e, h) + + replicas = append(replicas, e) + history = append(history, h) + } + return replicas, history + } + + listenersGroup.Add(clusterSize) + replicas, history := initializeCluster(clusterSize) + + entry := replicas[0] + writerGroup.Add(testSize) + for i := 0; i < testSize; i++ { + write := func(data string) { + defer writerGroup.Done() + _, err := entry.Put(ctx, partition, data) + if err != nil { + t.Errorf("failed broadcasting. %v", err) + } + } + go write(fmt.Sprintf("%d", i)) + } + + writerGroup.Wait() + time.Sleep(5 * time.Second) + cancel() + listenersGroup.Wait() + + truth := history[0] + if truth.size() != testSize { + t.Errorf("should have size %d, found %d", testSize, truth.size()) + } + + for i, messageHist := range history { + diff := truth.compare(*messageHist) + if diff != 0 { + t.Errorf("history differ at %d with %d different commands", i, diff) + } + } + + for _, replica := range replicas { + err := replica.Close() + if err != nil { + t.Errorf("failed closing replica. %#v", err) + } + } +} diff --git a/test/helper.go b/test/helper.go index 473c63c..7e51647 100644 --- a/test/helper.go +++ b/test/helper.go @@ -30,7 +30,18 @@ func (m *MessageHist) values() []string { return copied } +func (m *MessageHist) size() int { + m.mutex.Lock() + defer m.mutex.Unlock() + return len(m.history) +} + func (m *MessageHist) compare(other MessageHist) int { + // if both objects hold the same mutex a deadlock will be created. + if m.mutex == other.mutex { + return 0 + } + m.mutex.Lock() defer m.mutex.Unlock() different := 0 diff --git a/test/load_test.go b/test/load_test.go new file mode 100644 index 0000000..638c960 --- /dev/null +++ b/test/load_test.go @@ -0,0 +1,109 @@ +package test + +import ( + "context" + "fmt" + "github.com/google/uuid" + "github.com/jabolina/relt/pkg/relt" + "sync" + "testing" + "time" +) + +func Benchmark_LoadTestMultipleReplicas(b *testing.B) { + partition := "bench-replicas-" + uuid.New().String() + testSize := 200 + clusterSize := 50 + ctx, cancel := context.WithCancel(context.TODO()) + listenersGroup := &sync.WaitGroup{} + writerGroup := &sync.WaitGroup{} + initializeReplica := func(r *relt.Relt, history *MessageHist) { + listener, err := r.Consume() + if err != nil { + b.Fatalf("failed starting consumer. %#v", err) + } + + go func() { + defer listenersGroup.Done() + for { + select { + case recv := <-listener: + if recv.Data == nil || len(recv.Data) == 0 { + b.Errorf("received wrong data") + } + + if recv.Error != nil { + b.Errorf("error on consumed response. %v", recv.Error) + } + + history.insert(string(recv.Data)) + case <-ctx.Done(): + return + } + } + }() + } + initializeCluster := func(size int) ([]*relt.Relt, []*MessageHist) { + var replicas []*relt.Relt + var history []*MessageHist + for i := 0; i < size; i++ { + conf := relt.DefaultReltConfiguration() + conf.Name = partition + fmt.Sprintf("%d", i) + conf.Exchange = relt.GroupAddress(partition) + r, err := relt.NewRelt(*conf) + if err != nil { + b.Fatalf("failed connecting. %v", err) + } + h := NewHistory() + initializeReplica(r, h) + + replicas = append(replicas, r) + history = append(history, h) + } + return replicas, history + } + + listenersGroup.Add(clusterSize) + replicas, history := initializeCluster(clusterSize) + + entry := replicas[0] + writerGroup.Add(testSize) + for i := 0; i < testSize; i++ { + write := func(data []byte) { + defer writerGroup.Done() + err := entry.Broadcast(ctx, relt.Send{ + Address: relt.GroupAddress(partition), + Data: data, + }) + + if err != nil { + b.Errorf("failed broadcasting. %v", err) + } + } + go write([]byte(fmt.Sprintf("%d", i))) + } + + writerGroup.Wait() + time.Sleep(10 * time.Second) + cancel() + listenersGroup.Wait() + + truth := history[0] + if truth.size() != testSize { + b.Errorf("should have size %d, found %d", testSize, truth.size()) + } + + for i, messageHist := range history { + diff := truth.compare(*messageHist) + if diff != 0 { + b.Errorf("history differ at %d with %d different commands", i, diff) + } + } + + for _, replica := range replicas { + err := replica.Close() + if err != nil { + b.Errorf("failed closing replica. %#v", err) + } + } +} diff --git a/test/synchronization_test.go b/test/synchronization_test.go index d3399b0..98b1a11 100644 --- a/test/synchronization_test.go +++ b/test/synchronization_test.go @@ -97,3 +97,102 @@ func Test_ReplicasShouldReceiveSameOrder(t *testing.T) { t.Fatalf("message history do not match. %d messages do not match", diff) } } + +func Test_MassiveNumberOfReplicasShouldBeSynchronized(t *testing.T) { + defer goleak.VerifyNone(t) + partition := "massive-synchronized-replicas-" + uuid.New().String() + testSize := 100 + clusterSize := 30 + ctx, cancel := context.WithCancel(context.TODO()) + listenersGroup := &sync.WaitGroup{} + writerGroup := &sync.WaitGroup{} + initializeReplica := func(r *relt.Relt, history *MessageHist) { + listener, err := r.Consume() + if err != nil { + t.Fatalf("failed starting consumer. %#v", err) + } + + go func() { + defer listenersGroup.Done() + for { + select { + case recv := <-listener: + if recv.Data == nil || len(recv.Data) == 0 { + t.Errorf("received wrong data") + } + + if recv.Error != nil { + t.Errorf("error on consumed response. %v", recv.Error) + } + + history.insert(string(recv.Data)) + case <-ctx.Done(): + return + } + } + }() + } + initializeCluster := func(size int) ([]*relt.Relt, []*MessageHist) { + var replicas []*relt.Relt + var history []*MessageHist + for i := 0; i < size; i++ { + conf := relt.DefaultReltConfiguration() + conf.Name = partition + fmt.Sprintf("%d", i) + conf.Exchange = relt.GroupAddress(partition) + r, err := relt.NewRelt(*conf) + if err != nil { + t.Fatalf("failed connecting. %v", err) + } + h := NewHistory() + initializeReplica(r, h) + + replicas = append(replicas, r) + history = append(history, h) + } + return replicas, history + } + + listenersGroup.Add(clusterSize) + replicas, history := initializeCluster(clusterSize) + + entry := replicas[0] + writerGroup.Add(testSize) + for i := 0; i < testSize; i++ { + write := func(data []byte) { + defer writerGroup.Done() + err := entry.Broadcast(ctx, relt.Send{ + Address: relt.GroupAddress(partition), + Data: data, + }) + + if err != nil { + t.Errorf("failed broadcasting. %v", err) + } + } + go write([]byte(fmt.Sprintf("%d", i))) + } + + writerGroup.Wait() + time.Sleep(10 * time.Second) + cancel() + listenersGroup.Wait() + + truth := history[0] + if truth.size() != testSize { + t.Errorf("should have size %d, found %d", testSize, truth.size()) + } + + for i, messageHist := range history { + diff := truth.compare(*messageHist) + if diff != 0 { + t.Errorf("history differ at %d with %d different commands", i, diff) + } + } + + for _, replica := range replicas { + err := replica.Close() + if err != nil { + t.Errorf("failed closing replica. %#v", err) + } + } +}