Skip to content

Commit

Permalink
Merge pull request #71 from wework/coverage
Browse files Browse the repository at this point in the history
increased test coverage
  • Loading branch information
Guy Baron authored May 8, 2019
2 parents bbbeac7 + a3069de commit 665d2d1
Show file tree
Hide file tree
Showing 5 changed files with 49 additions and 22 deletions.
2 changes: 1 addition & 1 deletion gbus/builder/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ func (builder *defaultBuilder) Build(svcName string) gbus.Bus {
RPCHandlers: make(map[string]gbus.MessageHandler),
Serializer: builder.serializer,
DLX: builder.dlx,
DefaultPolicies: make([]gbus.MessagePolicy, 0),
DefaultPolicies: builder.defaultPolicies,
DbPingTimeout: 3}

gb.Confirm = builder.confirm
Expand Down
20 changes: 10 additions & 10 deletions gbus/bus.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,8 +57,8 @@ type DefaultBus struct {
Confirm bool
healthChan chan error
backpreasure bool
rabbitFailure bool
DbPingTimeout time.Duration
amqpConnected bool
}

var (
Expand Down Expand Up @@ -257,7 +257,7 @@ func (b *DefaultBus) Start() error {
//start monitoring on amqp related errors
go b.monitorAMQPErrors()
//start consuming messags from service queue

b.amqpConnected = true
return nil
}

Expand Down Expand Up @@ -293,12 +293,11 @@ func (b *DefaultBus) createBusWorkers(workerNum uint) ([]*worker, error) {
serializer: b.Serializer,
b: b,
amqpErrors: b.amqpErrors}
go func() {
err := w.Start()
if err != nil {
log.WithError(err)
}
}()

err := w.Start()
if err != nil {
log.WithError(err).Error("failed to start worker")
}

workers = append(workers, w)
}
Expand All @@ -321,6 +320,7 @@ func (b *DefaultBus) Shutdown() (shutdwonErr error) {
err := worker.Stop()
if err != nil {
b.log().WithError(err).Error("could not stop worker")
return err
}
}
b.Outgoing.shutdown()
Expand Down Expand Up @@ -359,7 +359,7 @@ func (b *DefaultBus) GetHealth() HealthCard {
return HealthCard{
DbConnected: dbConnected,
RabbitBackPressure: b.backpreasure,
RabbitConnected: !b.rabbitFailure,
RabbitConnected: b.amqpConnected,
}
}

Expand Down Expand Up @@ -577,7 +577,7 @@ func (b *DefaultBus) monitorAMQPErrors() {
}
b.backpreasure = blocked.Active
case amqpErr := <-b.amqpErrors:
b.rabbitFailure = true
b.amqpConnected = false
b.log().WithField("amqp_error", amqpErr).Error("amqp error")
if b.healthChan != nil {
b.healthChan <- amqpErr
Expand Down
3 changes: 2 additions & 1 deletion gbus/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ type worker struct {
func (worker *worker) Start() error {

worker.log().Info("starting worker")
worker.stop = make(chan bool)
worker.channel.NotifyClose(worker.amqpErrors)

var (
Expand All @@ -62,7 +63,7 @@ func (worker *worker) Start() error {
}
worker.messages = messages
worker.rpcMessages = rpcmsgs
worker.stop = make(chan bool)

go worker.consumeMessages()

return nil
Expand Down
41 changes: 32 additions & 9 deletions tests/bus_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,10 @@ package tests
import (
"context"
"database/sql"
"errors"
"fmt"
"reflect"
"sync"
"testing"
"time"

Expand Down Expand Up @@ -209,28 +211,34 @@ func TestRPC(t *testing.T) {
}

func TestDeadlettering(t *testing.T) {

var waitgroup sync.WaitGroup
waitgroup.Add(2)
poision := gbus.NewBusMessage(PoisionMessage{})
service1 := createBusWithOptions(testSvc1, "grabbit-dead", true, true)
deadletterSvc := createBusWithOptions("deadletterSvc", "grabbit-dead", true, true)
proceed := make(chan bool)
handler := func(tx *sql.Tx, poision amqp.Delivery) error {
proceed <- true

deadMessageHandler := func(tx *sql.Tx, poision amqp.Delivery) error {
waitgroup.Done()
return nil
}

deadletterSvc.HandleDeadletter(handler)
faultyHandler := func(invocation gbus.Invocation, message *gbus.BusMessage) error {
return errors.New("fail")
}

deadletterSvc.HandleDeadletter(deadMessageHandler)
service1.HandleMessage(Command1{}, faultyHandler)

deadletterSvc.Start()
defer deadletterSvc.Shutdown()
service1.Start()
defer service1.Shutdown()

e := service1.Send(context.Background(), testSvc1, poision)
if e != nil {
log.Printf("send error: %v", e)
}
service1.Send(context.Background(), testSvc1, poision)
service1.Send(context.Background(), testSvc1, gbus.NewBusMessage(Command1{}))

<-proceed
waitgroup.Wait()
}

func TestRegistrationAfterBusStarts(t *testing.T) {
Expand Down Expand Up @@ -323,6 +331,21 @@ func TestSendingPanic(t *testing.T) {
}
}

func TestHealthCheck(t *testing.T) {
svc1 := createNamedBusForTest(testSvc1)
err := svc1.Start()
if err != nil {
t.Error(err.Error())
}
defer svc1.Shutdown()
health := svc1.GetHealth()

fmt.Printf("%v", health)
if !health.DbConnected || !health.RabbitConnected || health.RabbitBackPressure {
t.Error("bus expected to be healthy but failed health check")
}
}

func noopTraceContext() context.Context {
return context.Background()
// tracer := opentracing.NoopTracer{}
Expand Down
5 changes: 4 additions & 1 deletion tests/consts.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
package tests

import (
"time"

"github.com/wework/grabbit/gbus"
"github.com/wework/grabbit/gbus/builder"
"github.com/wework/grabbit/gbus/policy"
Expand All @@ -26,7 +28,8 @@ func createBusWithOptions(svcName string, deadletter string, txnl, pos bool) gbu
busBuilder := builder.
New().
Bus(connStr).
WithPolicies(&policy.Durable{}).
WithPolicies(&policy.Durable{}, &policy.TTL{Duration: time.Second * 3600}).
WorkerNum(3, 1).
WithConfirms()

if txnl {
Expand Down

0 comments on commit 665d2d1

Please sign in to comment.