Skip to content

Commit

Permalink
Polishing etcd backend. (#5)
Browse files Browse the repository at this point in the history
Applying some fixes and going simple now on the
EtcdCoordinator. Added some tests to verify that
commands are delivered in the same order even when
multiple replicas exists.

After some digging, find out that there is no need
to use the KV to send requests, this can be done with
the ETCD client directly. So this was removed, and also
there is no need to synchronously send commands nor treat
watch events asynchonously.

Added a load benchmark test to be verified with `pprof`. Any
optimization will be done later.
  • Loading branch information
jabolina authored Mar 1, 2021
1 parent 0e7cc81 commit 29d2f59
Show file tree
Hide file tree
Showing 15 changed files with 599 additions and 200 deletions.
5 changes: 4 additions & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,9 @@ deps: # @HELP install dependencies
@echo "getting dependencies"
go get -t -d -v ./...

benchmark: # @HELP run benchmark and generate files for pprof
go test -bench=. -run=^$ -benchmem -cpuprofile profile.out -memprofile memprofile.out ./test/...

build: # @HELP build the packages
sh $(PWD)/scripts/build.sh

Expand All @@ -37,4 +40,4 @@ ci: deps test_rule dep-linter lint

all: deps test_rule lint

.PHONY: all build
.PHONY: all build
12 changes: 6 additions & 6 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,13 @@

![Go](https://github.com/jabolina/relt/workflows/Go/badge.svg)

Primitives for Reliable Transport. At this point, the "reliable transport" is the AMQP protocol implemented
by the RabbitMQ and that's it.
A primitive for reliable communication, backed by the atomic broadcast protocol. An example can be found on the
`_examples` folder, the current atomic broadcast implementation is the [etcd](https://github.com/etcd-io/etcd)
version, which means that an etcd server is needed.

Since this will be used primarily on the [Generic Atomic Multicast](https://github.com/jabolina/go-mcast), to create
a simple structure, this Relt project will only by a RabbitMQ client with a high level API for sending messages to
fan-out exchanges.
a simple structure, with a high level API for sending messages to a group of processes.

When the [Generic Atomic Multicast](https://github.com/jabolina/go-mcast) contains the basic structure, this reliable
transport will turn into a new whole project where will be implemented a generic atomic broadcast, to be used as a
communication primitive.
transport will turn into a new whole project where will be implemented a generic atomic broadcast, to be used as the
default reliable communication primitive.
8 changes: 4 additions & 4 deletions _examples/peer.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,15 +49,15 @@ func consume(r *relt.Relt, ctx context.Context) {
func main() {
conf := relt.DefaultReltConfiguration()
conf.Name = "local-test"
relt, _ := relt.NewRelt(*conf)
r, _ := relt.NewRelt(*conf)
ctx, done := context.WithCancel(context.Background())

go func() {
produce(relt, os.Stdin, ctx)
produce(r, os.Stdin, ctx)
}()

go func() {
consume(relt, ctx)
consume(r, ctx)
}()

c := make(chan os.Signal, 1)
Expand All @@ -69,5 +69,5 @@ func main() {
}()

<-ctx.Done()
relt.Close()
r.Close()
}
141 changes: 5 additions & 136 deletions internal/coordinator.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,23 +2,9 @@ package internal

import (
"context"
"github.com/coreos/etcd/clientv3"
"io"
"time"
)

// A single write requests to be applied to etcd.
type request struct {
// Issuer writer context.
ctx context.Context

// Event to be sent to etcd.
event Event

// Channel to send response back.
response chan error
}

// Configuration for the coordinator.
type CoordinatorConfiguration struct {
// Each Coordinator will handle only a single partition.
Expand All @@ -38,7 +24,7 @@ type CoordinatorConfiguration struct {
// Coordinator interface that should be implemented by the
// atomic broadcast handler.
// Commands should be issued through the coordinator to be delivered
// to other peers
// to other peers.
type Coordinator interface {
io.Closer

Expand All @@ -48,130 +34,13 @@ type Coordinator interface {
Watch(received chan<- Event) error

// Issues an Event.
Write(ctx context.Context, event Event) <-chan error
// This will have the same effect as broadcasting a message
// for every participant on the destination.
Write(ctx context.Context, event Event) error
}

// Create a new Coordinator using the given configuration.
// The current implementation is the EtcdCoordinator, backed by etcd.
func NewCoordinator(configuration CoordinatorConfiguration) (Coordinator, error) {
cli, err := clientv3.New(clientv3.Config{
DialTimeout: 30 * time.Second,
Endpoints: []string{configuration.Server},
})
if err != nil {
return nil, err
}
kv := clientv3.NewKV(cli)
ctx, cancel := context.WithCancel(configuration.Ctx)
coord := &EtcdCoordinator{
configuration: configuration,
cli: cli,
kv: kv,
ctx: ctx,
cancel: cancel,
writeChan: make(chan request),
}
configuration.Handler.Spawn(coord.writer)
return coord, nil
}

// EtcdCoordinator will use etcd for atomic broadcast.
type EtcdCoordinator struct {
// Configuration parameters.
configuration CoordinatorConfiguration

// Current Coordinator context, created from the parent context.
ctx context.Context

// Function to cancel the current context.
cancel context.CancelFunc

// A client for the etcd server.
cli *clientv3.Client

// The key-value entry point for issuing requests.
kv clientv3.KV

// Channel to receive write requests.
writeChan chan request
}

// Listen and apply write requests.
// This will keep running while the application context is available.
// Receiving commands through the channel will ensure that they are
// applied synchronously to the etcd.
func (e *EtcdCoordinator) writer() {
for {
select {
case <-e.ctx.Done():
return
case req := <-e.writeChan:
_, err := e.kv.Put(req.ctx, req.event.Key, string(req.event.Value))
req.response <- err
}
}
}

// Starts a new coroutine for watching the Coordinator partition.
// All received information will be published back through the channel
// received as parameter.
//
// After calling a routine will run bounded to the application lifetime.
func (e *EtcdCoordinator) Watch(received chan<- Event) error {
watchChan := e.cli.Watch(e.ctx, e.configuration.Partition)
watchChanges := func() {
for response := range watchChan {
select {
case <-e.ctx.Done():
return
default:
e.handleResponse(response, received)
}
}
}
e.configuration.Handler.Spawn(watchChanges)
return nil
}

// Write the given event using the KV interface.
func (e *EtcdCoordinator) Write(ctx context.Context, event Event) <-chan error {
res := make(chan error)
e.writeChan <- request{
ctx: ctx,
event: event,
response: res,
}
return res
}

// Stop the etcd client connection.
func (e *EtcdCoordinator) Close() error {
e.cancel()
return e.cli.Close()
}

// This method is responsible for handling events from the etcd client.
//
// This method will transform each received event into Event object and
// publish it back using the given channel. A buffered channel will be created
// and a goroutine will be spawned, so we can publish the received messages
// asynchronously without blocking. This can cause the Close to hold, if there
// exists pending messages to be consumed by the channel, this method can cause a deadlock.
func (e *EtcdCoordinator) handleResponse(response clientv3.WatchResponse, received chan<- Event) {
buffered := make(chan Event, len(response.Events))
defer close(buffered)

e.configuration.Handler.Spawn(func() {
for ev := range buffered {
received <- ev
}
})

for _, event := range response.Events {
buffered <- Event{
Key: string(event.Kv.Key),
Value: event.Kv.Value,
Error: nil,
}
}
return newEtcdCoordinator(configuration)
}
30 changes: 21 additions & 9 deletions internal/core.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,18 +16,22 @@ var (
coreAlreadyWatching = errors.New("already watching partition")
)

// Configuration for the Core interface.
type CoreConfiguration struct {
// Partition the Coordinator will work with.
Partition string

// Server address for the Coordinator.
Server string

// Default timeout to be applied when handling channels.
// Default timeout to be applied when handling channels and
// asynchronous operations.
DefaultTimeout time.Duration
}

// Holds all flags used to manage the Core state.
// This is the same as an AtomicBoolean and is used internally
// to manage the core states.
type CoreFlags struct {
// Flag for the shutdown state.
shutdown Flag
Expand All @@ -36,13 +40,19 @@ type CoreFlags struct {
watching Flag
}

// Core is the interface that will hold the Relt connection to the Coordinator.
// Core is the interface that will create the link between Relt requests
// and the Coordinator.
// Every command issued will be parsed here, and every command received should
// be handled here before going back to the client.
// Everything after this stage should care only about the atomic broadcast protocol
// and everything before should be abstracted as a simple communication primitive.
// This means that any parsing or state handling for the client should be done here.
type Core interface {
io.Closer

// Start listening for new messages.
// This will receive messages from the atomic broadcast protocol
// and parse to an object the client can handle.
Listen() (<-chan Message, error)

// Send a message asynchronously for the given partition.
Expand All @@ -66,21 +76,23 @@ type ReltCore struct {
handler *GoRoutineHandler

// Coordinator to issues commands and receive Event.
// The coordinator is the interface to reach the atomic
// broadcast protocol.
coord Coordinator

// Channel for sending Message to the client.
output chan Message

// Flags for handling state.
// Flags for handling internal state.
flags CoreFlags

// Core configuration parameters.
configuration CoreConfiguration
}

// Create a new ReltCore using the given configuration.
// As an effect, this will instantiate a Coordinator a failures
// can happen while handling connections between the peers.
// As an effect, this will instantiate a Coordinator a failure
// can happen while handling connection to the atomic broadcast server.
func NewCore(configuration CoreConfiguration) (Core, error) {
ctx, cancel := context.WithCancel(context.TODO())
handler := NewRoutineHandler()
Expand Down Expand Up @@ -136,7 +148,7 @@ func (r *ReltCore) Listen() (<-chan Message, error) {
}

if r.flags.watching.Inactivate() {
events := make(chan Event, 100)
events := make(chan Event)
if err := r.coord.Watch(events); err != nil {
return nil, err
}
Expand All @@ -162,15 +174,14 @@ func (r *ReltCore) Listen() (<-chan Message, error) {
// This is a broadcast message, which means that if _N_ nodes are
// subscribed for a partition, every node will receive the message.
func (r *ReltCore) Send(ctx context.Context, dest string, data []byte) <-chan error {
response := make(chan error, 1)
response := make(chan error)
writeRequest := func() {
if r.flags.shutdown.IsActive() {
event := Event{
Key: dest,
Value: data,
}
err := <-r.coord.Write(ctx, event)
response <- err
response <- r.coord.Write(ctx, event)
} else {
response <- coreWasShutdown
}
Expand All @@ -192,6 +203,7 @@ func (r *ReltCore) Close() error {
}
r.finish()
r.handler.Close()
return nil
}
return coreWasShutdown
}
Loading

0 comments on commit 29d2f59

Please sign in to comment.