Skip to content

Commit

Permalink
Merge pull request #342 from maxekman/fix/add-closers
Browse files Browse the repository at this point in the history
Fix / Add closers for graceful shutdown
  • Loading branch information
maxekman authored Oct 5, 2021
2 parents 80c74c2 + 64e07ba commit 19944c0
Show file tree
Hide file tree
Showing 28 changed files with 324 additions and 193 deletions.
4 changes: 2 additions & 2 deletions eventbus.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,8 @@ type EventBus interface {
// Errors returns an error channel where async handling errors are sent.
Errors() <-chan EventBusError

// Wait wait for all handlers to be cancelled by their context.
Wait()
// Close closes the EventBus and waits for all handlers to finish.
Close() error
}

// EventBusError is an async error containing the error returned from a handler
Expand Down
21 changes: 13 additions & 8 deletions eventbus/acceptance_testing.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ func TestAddHandler(t *testing.T, bus1 eh.EventBus) {
// }
//
func AcceptanceTest(t *testing.T, bus1, bus2 eh.EventBus, timeout time.Duration) {
ctx, cancel := context.WithCancel(context.Background())
ctx := context.Background()
ctx = mocks.WithContextOne(ctx, "testval")

// Without handler.
Expand Down Expand Up @@ -245,10 +245,14 @@ func AcceptanceTest(t *testing.T, bus1, bus2 eh.EventBus, timeout time.Duration)
}
}

// Cancel all handlers and wait.
cancel()
bus1.Wait()
bus2.Wait()
// TODO: Test context cancellation.

if err := bus1.Close(); err != nil {
t.Error("there should be no error:", err)
}
if err := bus2.Close(); err != nil {
t.Error("there should be no error:", err)
}
}

func checkBusErrors(t *testing.T, bus eh.EventBus) {
Expand Down Expand Up @@ -357,8 +361,9 @@ func benchmark(t bench, bus eh.EventBus, numAggregates, numHandlers, numEvents i

wg.Wait() // Wait for all events to be received.
t.StopTimer()
cancel() // Stop receiving goroutines.

// Cancel handler and wait.
cancel()
bus.Wait()
if err := bus.Close(); err != nil {
t.Error("there should be no error:", err)
}
}
35 changes: 24 additions & 11 deletions eventbus/gcp/eventbus.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,16 +39,22 @@ type EventBus struct {
registered map[eh.EventHandlerType]struct{}
registeredMu sync.RWMutex
errCh chan eh.EventBusError
cctx context.Context
cancel context.CancelFunc
wg sync.WaitGroup
codec eh.EventCodec
}

// NewEventBus creates an EventBus, with optional GCP connection settings.
func NewEventBus(projectID, appID string, options ...Option) (*EventBus, error) {
ctx, cancel := context.WithCancel(context.Background())

b := &EventBus{
appID: appID,
registered: map[eh.EventHandlerType]struct{}{},
errCh: make(chan eh.EventBusError, 100),
cctx: ctx,
cancel: cancel,
codec: &json.EventCodec{},
}

Expand All @@ -63,20 +69,19 @@ func NewEventBus(projectID, appID string, options ...Option) (*EventBus, error)
}

// Create the GCP pubsub client.
ctx := context.Background()
var err error
b.client, err = pubsub.NewClient(ctx, projectID, b.clientOpts...)
b.client, err = pubsub.NewClient(b.cctx, projectID, b.clientOpts...)
if err != nil {
return nil, err
}

// Get or create the topic.
name := appID + "_events"
b.topic = b.client.Topic(name)
if ok, err := b.topic.Exists(ctx); err != nil {
if ok, err := b.topic.Exists(b.cctx); err != nil {
return nil, err
} else if !ok {
if b.topic, err = b.client.CreateTopic(ctx, name); err != nil {
if b.topic, err = b.client.CreateTopic(b.cctx, name); err != nil {
return nil, err
}
}
Expand Down Expand Up @@ -195,8 +200,7 @@ func (b *EventBus) AddHandler(ctx context.Context, m eh.EventMatcher, h eh.Event
b.registered[h.HandlerType()] = struct{}{}

// Handle until context is cancelled.
b.wg.Add(1)
go b.handle(ctx, m, h, sub)
go b.handle(m, h, sub)

return nil
}
Expand All @@ -206,27 +210,36 @@ func (b *EventBus) Errors() <-chan eh.EventBusError {
return b.errCh
}

// Wait for all channels to close in the event bus group
func (b *EventBus) Wait() {
// Close implements the Close method of the eventhorizon.EventBus interface.
func (b *EventBus) Close() error {
// Stop publishing.
b.topic.Stop()

// Stop handling.
b.cancel()
b.wg.Wait()

return b.client.Close()
}

// Handles all events coming in on the channel.
func (b *EventBus) handle(ctx context.Context, m eh.EventMatcher, h eh.EventHandler, sub *pubsub.Subscription) {
func (b *EventBus) handle(m eh.EventMatcher, h eh.EventHandler, sub *pubsub.Subscription) {
b.wg.Add(1)
defer b.wg.Done()

for {
if err := sub.Receive(ctx, b.handler(m, h)); err != nil {
if err := sub.Receive(b.cctx, b.handler(m, h)); err != nil {
err = fmt.Errorf("could not receive: %w", err)
select {
case b.errCh <- eh.EventBusError{Err: err, Ctx: ctx}:
case b.errCh <- eh.EventBusError{Err: err}:
default:
log.Printf("eventhorizon: missed error in GCP event bus: %s", err)
}
// Retry the receive loop if there was an error.
time.Sleep(time.Second)
continue
}

return
}
}
Expand Down
45 changes: 25 additions & 20 deletions eventbus/kafka/eventbus.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,19 +40,24 @@ type EventBus struct {
registered map[eh.EventHandlerType]struct{}
registeredMu sync.RWMutex
errCh chan eh.EventBusError
cctx context.Context
cancel context.CancelFunc
wg sync.WaitGroup
codec eh.EventCodec
}

// NewEventBus creates an EventBus, with optional GCP connection settings.
func NewEventBus(addr, appID string, options ...Option) (*EventBus, error) {
topic := appID + "_events"
ctx, cancel := context.WithCancel(context.Background())

b := &EventBus{
addr: addr,
appID: appID,
topic: topic,
topic: appID + "_events",
registered: map[eh.EventHandlerType]struct{}{},
errCh: make(chan eh.EventBusError, 100),
cctx: ctx,
cancel: cancel,
codec: &json.EventCodec{},
}

Expand All @@ -75,7 +80,7 @@ func NewEventBus(addr, appID string, options ...Option) (*EventBus, error) {
for i := 0; i < 10; i++ {
resp, err = client.CreateTopics(context.Background(), &kafka.CreateTopicsRequest{
Topics: []kafka.TopicConfig{{
Topic: topic,
Topic: b.topic,
NumPartitions: 5,
ReplicationFactor: 1,
}},
Expand All @@ -92,15 +97,15 @@ func NewEventBus(addr, appID string, options ...Option) (*EventBus, error) {
if resp == nil {
return nil, fmt.Errorf("could not get/create Kafka topic in time: %w", err)
}
if topicErr, ok := resp.Errors[topic]; ok && topicErr != nil {
if topicErr, ok := resp.Errors[b.topic]; ok && topicErr != nil {
if !errors.Is(topicErr, kafka.TopicAlreadyExists) {
return nil, fmt.Errorf("invalid Kafka topic: %w", topicErr)
}
}

b.writer = &kafka.Writer{
Addr: kafka.TCP(addr),
Topic: topic,
Topic: b.topic,
BatchSize: 1, // Write every event to the bus without delay.
RequiredAcks: kafka.RequireOne, // Stronger consistency.
Balancer: &kafka.Hash{}, // Hash by aggregate ID.
Expand Down Expand Up @@ -206,8 +211,7 @@ func (b *EventBus) AddHandler(ctx context.Context, m eh.EventMatcher, h eh.Event
b.registered[h.HandlerType()] = struct{}{}

// Handle until context is cancelled.
b.wg.Add(1)
go b.handle(ctx, m, h, r)
go b.handle(m, h, r)

return nil
}
Expand All @@ -217,28 +221,29 @@ func (b *EventBus) Errors() <-chan eh.EventBusError {
return b.errCh
}

// Wait for all channels to close in the event bus group
func (b *EventBus) Wait() {
// Close implements the Close method of the eventhorizon.EventBus interface.
func (b *EventBus) Close() error {
b.cancel()
b.wg.Wait()
if err := b.writer.Close(); err != nil {
log.Printf("eventhorizon: failed to close Kafka writer: %s", err)
}

return b.writer.Close()
}

// Handles all events coming in on the channel.
func (b *EventBus) handle(ctx context.Context, m eh.EventMatcher, h eh.EventHandler, r *kafka.Reader) {
func (b *EventBus) handle(m eh.EventMatcher, h eh.EventHandler, r *kafka.Reader) {
b.wg.Add(1)
defer b.wg.Done()

handler := b.handler(m, h, r)

for {
msg, err := r.FetchMessage(ctx)
msg, err := r.FetchMessage(b.cctx)
if errors.Is(err, context.Canceled) {
break
}
if err != nil {
} else if err != nil {
err = fmt.Errorf("could not fetch message: %w", err)
select {
case b.errCh <- eh.EventBusError{Err: err, Ctx: ctx}:
case b.errCh <- eh.EventBusError{Err: err}:
default:
log.Printf("eventhorizon: missed error in Kafka event bus: %s", err)
}
Expand All @@ -248,17 +253,17 @@ func (b *EventBus) handle(ctx context.Context, m eh.EventMatcher, h eh.EventHand
}

var noBusError eh.EventBusError
if err := handler(ctx, msg); err != noBusError {
if err := handler(b.cctx, msg); err != noBusError {
select {
case b.errCh <- err:
default:
log.Printf("eventhorizon: missed error in Kafka event bus: %s", err)
}
} else {
if err := r.CommitMessages(ctx, msg); err != nil {
if err := r.CommitMessages(b.cctx, msg); err != nil {
err = fmt.Errorf("could not commit message: %w", err)
select {
case b.errCh <- eh.EventBusError{Err: err, Ctx: ctx}:
case b.errCh <- eh.EventBusError{Err: err}:
default:
log.Printf("eventhorizon: missed error in Kafka event bus: %s", err)
}
Expand Down
23 changes: 16 additions & 7 deletions eventbus/local/eventbus.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,16 +35,22 @@ type EventBus struct {
registered map[eh.EventHandlerType]struct{}
registeredMu sync.RWMutex
errCh chan eh.EventBusError
cctx context.Context
cancel context.CancelFunc
wg sync.WaitGroup
codec eh.EventCodec
}

// NewEventBus creates a EventBus.
func NewEventBus(options ...Option) *EventBus {
ctx, cancel := context.WithCancel(context.Background())

b := &EventBus{
group: NewGroup(),
registered: map[eh.EventHandlerType]struct{}{},
errCh: make(chan eh.EventBusError, 100),
cctx: ctx,
cancel: cancel,
codec: &json.EventCodec{},
}

Expand Down Expand Up @@ -115,8 +121,7 @@ func (b *EventBus) AddHandler(ctx context.Context, m eh.EventMatcher, h eh.Event
b.registered[h.HandlerType()] = struct{}{}

// Handle until context is cancelled.
b.wg.Add(1)
go b.handle(ctx, m, h, ch)
go b.handle(m, h, ch)

return nil
}
Expand All @@ -126,10 +131,13 @@ func (b *EventBus) Errors() <-chan eh.EventBusError {
return b.errCh
}

// Wait for all channels to close in the event bus group
func (b *EventBus) Wait() {
// Close implements the Close method of the eventhorizon.EventBus interface.
func (b *EventBus) Close() error {
b.cancel()
b.wg.Wait()
b.group.close()

return nil
}

type evt struct {
Expand All @@ -138,7 +146,8 @@ type evt struct {
}

// Handles all events coming in on the channel.
func (b *EventBus) handle(ctx context.Context, m eh.EventMatcher, h eh.EventHandler, ch <-chan []byte) {
func (b *EventBus) handle(m eh.EventMatcher, h eh.EventHandler, ch <-chan []byte) {
b.wg.Add(1)
defer b.wg.Done()

for {
Expand All @@ -147,7 +156,7 @@ func (b *EventBus) handle(ctx context.Context, m eh.EventMatcher, h eh.EventHand
// Artificial delay to simulate network.
time.Sleep(time.Millisecond)

event, ctx, err := b.codec.UnmarshalEvent(ctx, data)
event, ctx, err := b.codec.UnmarshalEvent(b.cctx, data)
if err != nil {
err = fmt.Errorf("could not unmarshal event: %w", err)
select {
Expand All @@ -172,7 +181,7 @@ func (b *EventBus) handle(ctx context.Context, m eh.EventMatcher, h eh.EventHand
log.Printf("eventhorizon: missed error in local event bus: %s", err)
}
}
case <-ctx.Done():
case <-b.cctx.Done():
return
}
}
Expand Down
Loading

0 comments on commit 19944c0

Please sign in to comment.