diff --git a/gbus/builder/builder.go b/gbus/builder/builder.go index b57ee3c..c09cfd7 100644 --- a/gbus/builder/builder.go +++ b/gbus/builder/builder.go @@ -49,7 +49,7 @@ func (builder *defaultBuilder) Build(svcName string) gbus.Bus { RPCHandlers: make(map[string]gbus.MessageHandler), Serializer: builder.serializer, DLX: builder.dlx, - DefaultPolicies: make([]gbus.MessagePolicy, 0), + DefaultPolicies: builder.defaultPolicies, DbPingTimeout: 3} gb.Confirm = builder.confirm diff --git a/gbus/bus.go b/gbus/bus.go index 54a2368..d8a3e01 100644 --- a/gbus/bus.go +++ b/gbus/bus.go @@ -57,8 +57,8 @@ type DefaultBus struct { Confirm bool healthChan chan error backpreasure bool - rabbitFailure bool DbPingTimeout time.Duration + amqpConnected bool } var ( @@ -257,7 +257,7 @@ func (b *DefaultBus) Start() error { //start monitoring on amqp related errors go b.monitorAMQPErrors() //start consuming messags from service queue - + b.amqpConnected = true return nil } @@ -293,12 +293,11 @@ func (b *DefaultBus) createBusWorkers(workerNum uint) ([]*worker, error) { serializer: b.Serializer, b: b, amqpErrors: b.amqpErrors} - go func() { - err := w.Start() - if err != nil { - log.WithError(err) - } - }() + + err := w.Start() + if err != nil { + log.WithError(err).Error("failed to start worker") + } workers = append(workers, w) } @@ -321,6 +320,7 @@ func (b *DefaultBus) Shutdown() (shutdwonErr error) { err := worker.Stop() if err != nil { b.log().WithError(err).Error("could not stop worker") + return err } } b.Outgoing.shutdown() @@ -359,7 +359,7 @@ func (b *DefaultBus) GetHealth() HealthCard { return HealthCard{ DbConnected: dbConnected, RabbitBackPressure: b.backpreasure, - RabbitConnected: !b.rabbitFailure, + RabbitConnected: b.amqpConnected, } } @@ -577,7 +577,7 @@ func (b *DefaultBus) monitorAMQPErrors() { } b.backpreasure = blocked.Active case amqpErr := <-b.amqpErrors: - b.rabbitFailure = true + b.amqpConnected = false b.log().WithField("amqp_error", amqpErr).Error("amqp error") if b.healthChan != nil { b.healthChan <- amqpErr diff --git a/gbus/worker.go b/gbus/worker.go index d38cc64..0f798c3 100644 --- a/gbus/worker.go +++ b/gbus/worker.go @@ -47,6 +47,7 @@ type worker struct { func (worker *worker) Start() error { worker.log().Info("starting worker") + worker.stop = make(chan bool) worker.channel.NotifyClose(worker.amqpErrors) var ( @@ -62,7 +63,7 @@ func (worker *worker) Start() error { } worker.messages = messages worker.rpcMessages = rpcmsgs - worker.stop = make(chan bool) + go worker.consumeMessages() return nil diff --git a/tests/bus_test.go b/tests/bus_test.go index ff0fb51..eb21c5b 100644 --- a/tests/bus_test.go +++ b/tests/bus_test.go @@ -3,8 +3,10 @@ package tests import ( "context" "database/sql" + "errors" "fmt" "reflect" + "sync" "testing" "time" @@ -209,28 +211,34 @@ func TestRPC(t *testing.T) { } func TestDeadlettering(t *testing.T) { + + var waitgroup sync.WaitGroup + waitgroup.Add(2) poision := gbus.NewBusMessage(PoisionMessage{}) service1 := createBusWithOptions(testSvc1, "grabbit-dead", true, true) deadletterSvc := createBusWithOptions("deadletterSvc", "grabbit-dead", true, true) - proceed := make(chan bool) - handler := func(tx *sql.Tx, poision amqp.Delivery) error { - proceed <- true + + deadMessageHandler := func(tx *sql.Tx, poision amqp.Delivery) error { + waitgroup.Done() return nil } - deadletterSvc.HandleDeadletter(handler) + faultyHandler := func(invocation gbus.Invocation, message *gbus.BusMessage) error { + return errors.New("fail") + } + + deadletterSvc.HandleDeadletter(deadMessageHandler) + service1.HandleMessage(Command1{}, faultyHandler) deadletterSvc.Start() defer deadletterSvc.Shutdown() service1.Start() defer service1.Shutdown() - e := service1.Send(context.Background(), testSvc1, poision) - if e != nil { - log.Printf("send error: %v", e) - } + service1.Send(context.Background(), testSvc1, poision) + service1.Send(context.Background(), testSvc1, gbus.NewBusMessage(Command1{})) - <-proceed + waitgroup.Wait() } func TestRegistrationAfterBusStarts(t *testing.T) { @@ -323,6 +331,21 @@ func TestSendingPanic(t *testing.T) { } } +func TestHealthCheck(t *testing.T) { + svc1 := createNamedBusForTest(testSvc1) + err := svc1.Start() + if err != nil { + t.Error(err.Error()) + } + defer svc1.Shutdown() + health := svc1.GetHealth() + + fmt.Printf("%v", health) + if !health.DbConnected || !health.RabbitConnected || health.RabbitBackPressure { + t.Error("bus expected to be healthy but failed health check") + } +} + func noopTraceContext() context.Context { return context.Background() // tracer := opentracing.NoopTracer{} diff --git a/tests/consts.go b/tests/consts.go index 3234a4a..365ef06 100644 --- a/tests/consts.go +++ b/tests/consts.go @@ -1,6 +1,8 @@ package tests import ( + "time" + "github.com/wework/grabbit/gbus" "github.com/wework/grabbit/gbus/builder" "github.com/wework/grabbit/gbus/policy" @@ -26,7 +28,8 @@ func createBusWithOptions(svcName string, deadletter string, txnl, pos bool) gbu busBuilder := builder. New(). Bus(connStr). - WithPolicies(&policy.Durable{}). + WithPolicies(&policy.Durable{}, &policy.TTL{Duration: time.Second * 3600}). + WorkerNum(3, 1). WithConfirms() if txnl {