From 272eb220a7d456b1ad058f551db3fdec65875298 Mon Sep 17 00:00:00 2001 From: h0n9 Date: Wed, 2 Aug 2023 22:15:08 -0700 Subject: [PATCH 1/5] Fix deadlock on subscribers leaving stream --- lake/lake.go | 88 +++++++++++++++++++++++++++++------------ msg/box.go | 110 +++++++++++++++++++++++++++++++++------------------ 2 files changed, 134 insertions(+), 64 deletions(-) diff --git a/lake/lake.go b/lake/lake.go index 6a3ab34..a1e14fa 100644 --- a/lake/lake.go +++ b/lake/lake.go @@ -3,6 +3,7 @@ package lake import ( "context" "fmt" + "sync" "github.com/rs/zerolog" @@ -86,7 +87,10 @@ func (service *Service) Publish(ctx context.Context, req *pb.PublishReq) (*pb.Pu return &publishRes, err } - service.logger.Debug().Str("addr", string(pubKey.Address())).Msg("published") + service.logger.Debug(). + Str("topic-id", req.GetTopicId()). + Str("addr", string(pubKey.Address())). + Msg("published") // update publish res publishRes.Ok = true @@ -94,8 +98,13 @@ func (service *Service) Publish(ctx context.Context, req *pb.PublishReq) (*pb.Pu return &publishRes, nil } func (service *Service) Subscribe(req *pb.SubscribeReq, stream pb.Lake_SubscribeServer) error { - service.logger.Debug().Msg("begin of subscribe stream") - defer service.logger.Debug().Msg("end of subscribe stream") + service.logger.Debug(). + Str("topic-id", req.GetTopicId()). + Msg("begin of subscribe stream") + defer service.logger.Debug(). + Str("topic-id", req.GetTopicId()). + Msg("end of subscribe stream") + // set subscribe res res := pb.SubscribeRes{ Type: pb.SubscribeResType_SUBSCRIBE_RES_TYPE_ACK, @@ -159,7 +168,10 @@ func (service *Service) Subscribe(req *pb.SubscribeReq, stream pb.Lake_Subscribe return nil } - service.logger.Debug().Str("subscriber-id", subscriberID).Msg("registered") + service.logger.Info(). + Str("topic-id", req.GetTopicId()). + Str("subscriber-id", subscriberID). + Msg("joined subscriber") // update subscriber res res.SubscriberId = subscriberID @@ -171,29 +183,53 @@ func (service *Service) Subscribe(req *pb.SubscribeReq, stream pb.Lake_Subscribe return err } - // update subscriber res - res.Type = pb.SubscribeResType_SUBSCRIBE_RES_TYPE_RELAY - - // relay msgs to susbscriber - for { - select { - case <-stream.Context().Done(): - subSize, err := msgBox.LeaveSub(subscriberID) - if err != nil { - return err - } - if subSize > 0 { - return nil + wg := sync.WaitGroup{} + + wg.Add(1) + go func() { + defer wg.Done() + for { + select { + case <-stream.Context().Done(): + return + case msgCapsule := <-subscriberCh: + err := stream.Send(&pb.SubscribeRes{ + Type: pb.SubscribeResType_SUBSCRIBE_RES_TYPE_RELAY, + Res: &pb.SubscribeRes_MsgCapsule{ + MsgCapsule: msgCapsule, + }, + }) + if err != nil { + return + } + msgCapsule = nil // explicitly free } - msgBox.StopSub() - return nil - case msgCapsule := <-subscriberCh: - res.Res = &pb.SubscribeRes_MsgCapsule{MsgCapsule: msgCapsule} - err := stream.Send(&res) - if err != nil { - service.logger.Err(err).Msg("") - } - msgCapsule = nil // explicitly free } + }() + + wg.Wait() + + go func() { + for len(subscriberCh) > 0 { + <-subscriberCh + } + service.logger.Debug(). + Str("topic-id", req.GetTopicId()). + Str("subscriber-id", subscriberID). + Msg("drained subscriber ch") + }() + + err = msgBox.LeaveSub(subscriberID) + if err != nil { + service.logger.Err(err). + Str("topic-id", req.GetTopicId()). + Str("subscriber-id", subscriberID). + Msg("") } + service.logger.Info(). + Str("topic-id", req.GetTopicId()). + Str("subscriber-id", subscriberID). + Msg("left subscriber") + + return nil } diff --git a/msg/box.go b/msg/box.go index be2fc72..1db7e46 100644 --- a/msg/box.go +++ b/msg/box.go @@ -14,6 +14,10 @@ import ( pb "github.com/h0n9/msg-lake/proto" ) +const ( + ChanBufferSize = 1000 +) + type Box struct { ctx context.Context cancel context.CancelFunc @@ -52,7 +56,7 @@ func NewBox(logger *zerolog.Logger, topicID string, topic *pubsub.Topic) (*Box, setSubscriberCh: make(setSubscriberCh), deleteSubscriberCh: make(deleteSubscriberCh), - subCh: make(SubscriberCh, 10), + subCh: make(SubscriberCh, 5000), sub: nil, subCtx: nil, subCancel: nil, @@ -61,24 +65,16 @@ func NewBox(logger *zerolog.Logger, topicID string, topic *pubsub.Topic) (*Box, } go func() { - var ( - msgCapsule *pb.MsgCapsule - - setSubscriber setSubscriber - deleteSubscriber deleteSubscriber - ) for { select { case <-ctx.Done(): return - case msgCapsule = <-box.subCh: - for subscriberID, subscriberCh := range box.subscribers { - subLogger.Debug().Str("subscriber-id", subscriberID).Msg("relaying") + case msgCapsule := <-box.subCh: + for _, subscriberCh := range box.subscribers { subscriberCh <- msgCapsule - subLogger.Debug().Str("subscriber-id", subscriberID).Msg("relayed") } msgCapsule = nil // explicitly free - case setSubscriber = <-box.setSubscriberCh: + case setSubscriber := <-box.setSubscriberCh: _, exist := box.subscribers[setSubscriber.subscriberID] if exist { setSubscriber.errCh <- fmt.Errorf("%s is already subscribing", setSubscriber.subscriberID) @@ -89,16 +85,29 @@ func NewBox(logger *zerolog.Logger, topicID string, topic *pubsub.Topic) (*Box, go box.startSub() } setSubscriber.errCh <- nil - case deleteSubscriber = <-box.deleteSubscriberCh: + case deleteSubscriber := <-box.deleteSubscriberCh: subscriberCh, exist := box.subscribers[deleteSubscriber.subscriberID] if !exist { deleteSubscriber.errCh <- fmt.Errorf("%s is not subscribing", <-deleteSubscriber.errCh) continue } close(subscriberCh) - subLogger.Debug().Str("subscriber-id", deleteSubscriber.subscriberID).Msg("closed channel") + subLogger.Debug(). + Str("topic-id", topicID). + Str("subscriber-id", deleteSubscriber.subscriberID). + Msg("closed channel") delete(box.subscribers, deleteSubscriber.subscriberID) - subLogger.Debug().Str("subscriber-id", deleteSubscriber.subscriberID).Msg("deleted channel") + subLogger.Debug(). + Str("topic-id", topicID). + Str("subscriber-id", deleteSubscriber.subscriberID). + Msg("deleted channel") + box.logger.Debug(). + Str("topic-id", box.topicID). + Int("num-of-subscribers", len(box.subscribers)). + Msg("") + if len(box.subscribers) == 0 { + box.StopSub() + } deleteSubscriber.errCh <- nil } } @@ -117,32 +126,54 @@ func (box *Box) startSub() { box.subCtx = ctx box.subCancel = cancel box.sub = sub - box.logger.Debug().Str("topic-id", box.topicID).Msg("started subscription") - for { - pubSubMsg, err := box.sub.Next(box.subCtx) - if err != nil { - if errors.Is(context.Canceled, err) { - sub.Cancel() - box.subCtx = nil - box.subCancel = nil - box.sub = nil - box.logger.Debug().Str("topic-id", box.topicID).Msg("stopped subscription") + box.logger.Info(). + Str("topic-id", box.topicID). + Msg("started subscription") + + wg := sync.WaitGroup{} + + wg.Add(1) + go func() { + defer wg.Done() + for { + select { + case <-ctx.Done(): return + default: + pubSubMsg, err := box.sub.Next(box.subCtx) + if err != nil { + if errors.Is(context.Canceled, err) { + return + } + box.logger.Err(err).Msg("") + continue + } + msgCapsule := pb.MsgCapsule{} + err = proto.Unmarshal(pubSubMsg.GetData(), &msgCapsule) + if err != nil { + box.logger.Err(err).Msg("") + continue + } + box.subCh <- &msgCapsule } - box.logger.Err(err).Msg("") - continue - } - msgCapsule := pb.MsgCapsule{} - err = proto.Unmarshal(pubSubMsg.GetData(), &msgCapsule) - if err != nil { - box.logger.Err(err).Msg("") - continue } - box.subCh <- &msgCapsule - } + }() + + wg.Wait() + + sub.Cancel() + box.subCtx = nil + box.subCancel = nil + box.sub = nil + box.logger.Info(). + Str("topic-id", box.topicID). + Msg("stopped subscription") } func (box *Box) StopSub() { + if box.subCancel == nil { + return + } box.subCancel() } @@ -178,7 +209,7 @@ func (box *Box) Publish(msgCapsule *pb.MsgCapsule) error { func (box *Box) JoinSub(subscriberID string) (SubscriberCh, error) { var ( - subscriberCh = make(SubscriberCh, 10) + subscriberCh = make(SubscriberCh, ChanBufferSize) errCh = make(chan error) ) defer close(errCh) @@ -198,7 +229,7 @@ func (box *Box) JoinSub(subscriberID string) (SubscriberCh, error) { return subscriberCh, nil } -func (box *Box) LeaveSub(subscriberID string) (int, error) { +func (box *Box) LeaveSub(subscriberID string) error { var ( errCh = make(chan error) ) @@ -210,5 +241,8 @@ func (box *Box) LeaveSub(subscriberID string) (int, error) { errCh: errCh, } err := <-errCh - return len(box.subscribers), err + if err != nil { + return err + } + return nil } From e513bd92ba7fd50d8f01e84dee7cde079ef95ce2 Mon Sep 17 00:00:00 2001 From: h0n9 Date: Wed, 2 Aug 2023 22:39:57 -0700 Subject: [PATCH 2/5] Make int, ext chan buffer size customizable --- msg/box.go | 34 +++++++++++++++++++++++++++++++--- 1 file changed, 31 insertions(+), 3 deletions(-) diff --git a/msg/box.go b/msg/box.go index 1db7e46..f3e096e 100644 --- a/msg/box.go +++ b/msg/box.go @@ -4,6 +4,7 @@ import ( "context" "errors" "fmt" + "strconv" "sync" "time" @@ -12,10 +13,17 @@ import ( "google.golang.org/protobuf/proto" pb "github.com/h0n9/msg-lake/proto" + "github.com/h0n9/msg-lake/util" ) const ( - ChanBufferSize = 1000 + DefaultInternalChanBufferSize = 5000 + DefaultExternalChanBufferSize = 1000 +) + +var ( + internalChanBufferSize int + externalChanBufferSize int ) type Box struct { @@ -56,7 +64,7 @@ func NewBox(logger *zerolog.Logger, topicID string, topic *pubsub.Topic) (*Box, setSubscriberCh: make(setSubscriberCh), deleteSubscriberCh: make(deleteSubscriberCh), - subCh: make(SubscriberCh, 5000), + subCh: make(SubscriberCh, internalChanBufferSize), sub: nil, subCtx: nil, subCancel: nil, @@ -209,7 +217,7 @@ func (box *Box) Publish(msgCapsule *pb.MsgCapsule) error { func (box *Box) JoinSub(subscriberID string) (SubscriberCh, error) { var ( - subscriberCh = make(SubscriberCh, ChanBufferSize) + subscriberCh = make(SubscriberCh, externalChanBufferSize) errCh = make(chan error) ) defer close(errCh) @@ -246,3 +254,23 @@ func (box *Box) LeaveSub(subscriberID string) error { } return nil } + +func init() { + tmp, err := getEnvInt("INTERNAL_CHAN_BUFFER_SIZE", DefaultInternalChanBufferSize) + if err != nil { + panic(err) + } + internalChanBufferSize = tmp + + tmp, err = getEnvInt("EXTERNAL_CHAN_BUFFER_SIZE", DefaultExternalChanBufferSize) + if err != nil { + panic(err) + } + externalChanBufferSize = tmp +} + +func getEnvInt(key string, fallback int) (int, error) { + tmpStr := strconv.Itoa(fallback) + tmpStr = util.GetEnv(key, tmpStr) + return strconv.Atoi(tmpStr) +} From dccc57928857872364736e003618c76d503f61a8 Mon Sep 17 00:00:00 2001 From: h0n9 Date: Thu, 3 Aug 2023 09:10:26 -0700 Subject: [PATCH 3/5] Gracefully shutdown agent --- cli/agent/agent.go | 26 ++++++++++++++++++++++++-- 1 file changed, 24 insertions(+), 2 deletions(-) diff --git a/cli/agent/agent.go b/cli/agent/agent.go index 773db63..9c0ef23 100644 --- a/cli/agent/agent.go +++ b/cli/agent/agent.go @@ -7,6 +7,9 @@ import ( "math/big" "net" "os" + "os/signal" + "sync" + "syscall" "github.com/rs/zerolog" "github.com/spf13/cobra" @@ -46,10 +49,27 @@ var Cmd = &cobra.Command{ zerolog.SetGlobalLevel(logLevel) logger.Info().Msg("initalized logger") - ctx := context.Background() + ctx, cancel := context.WithCancel(context.Background()) logger.Info().Msg("initalized context") - grpcServer := grpc.NewServer() + var grpcServer *grpc.Server = nil + + sigCh := make(chan os.Signal, 1) + signal.Notify(sigCh, syscall.SIGINT, syscall.SIGTERM) + wg := sync.WaitGroup{} + wg.Add(1) + go func() { + s := <-sigCh + logger.Info().Msgf("got signal %v, attempting graceful shutdown", s) + cancel() + if grpcServer != nil { + grpcServer.GracefulStop() + } + wg.Done() + }() + logger.Info().Msg("listening os signal: SIGINT, SIGTERM") + + grpcServer = grpc.NewServer() logger.Info().Msg("initalized gRPC server") lakeService, err := lake.NewService( ctx, @@ -79,6 +99,8 @@ var Cmd = &cobra.Command{ return err } + wg.Wait() + return nil }, } From 9836fa1f85e10b22fd8259e74ec71b66e002efee Mon Sep 17 00:00:00 2001 From: h0n9 Date: Thu, 3 Aug 2023 09:16:30 -0700 Subject: [PATCH 4/5] Gracefully shutdown client --- cli/client/client.go | 41 +++++++++++++++++++++++------------------ 1 file changed, 23 insertions(+), 18 deletions(-) diff --git a/cli/client/client.go b/cli/client/client.go index 469ec2e..cec7f33 100644 --- a/cli/client/client.go +++ b/cli/client/client.go @@ -56,16 +56,20 @@ var Cmd = &cobra.Command{ wg.Add(1) go func() { defer wg.Done() - sig := <-sigCh - fmt.Println("\r\ngot", sig.String()) - if conn != nil { - fmt.Printf("closing grpc client ... ") - conn.Close() + select { + case <-ctx.Done(): + return + case s := <-sigCh: + fmt.Printf("got signal %v, attempting graceful shutdown\n", s) + if conn != nil { + fmt.Printf("closing grpc client ... ") + conn.Close() + fmt.Printf("done\n") + } + fmt.Printf("cancelling ctx ... ") + cancel() fmt.Printf("done\n") } - fmt.Printf("cancelling ctx ... ") - cancel() - fmt.Printf("done\n") }() // init privKey @@ -134,8 +138,8 @@ var Cmd = &cobra.Command{ res, err := stream.Recv() if err != nil { fmt.Println(err) - sigCh <- syscall.SIGINT - break + cancel() + return } if res.GetType() != pb.SubscribeResType_SUBSCRIBE_RES_TYPE_RELAY { continue @@ -157,13 +161,16 @@ var Cmd = &cobra.Command{ continue } printOutput(true, &msg, timestamp) - printInput(true) + // printInput(true) } }() // execute goroutine (sender) + wg.Add(1) go func() { + defer wg.Done() reader := bufio.NewReader(os.Stdin) + ok := true for { printInput(false) input, err := reader.ReadString('\n') @@ -178,6 +185,10 @@ var Cmd = &cobra.Command{ if input == "" { continue } + if !ok { + cancel() + return + } go func() { msg := Msg{ Data: []byte(input), @@ -206,15 +217,9 @@ var Cmd = &cobra.Command{ }, }, }) - if err == io.EOF { - err := stream.CloseSend() - if err != nil { - fmt.Println(err) - cancel() - } - } if err != nil { fmt.Println(err) + ok = false return } From dc81d588cdd58ec486d77565777621788b4fd209 Mon Sep 17 00:00:00 2001 From: h0n9 Date: Thu, 3 Aug 2023 10:42:14 -0700 Subject: [PATCH 5/5] Minor fix on client input --- cli/client/client.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/cli/client/client.go b/cli/client/client.go index cec7f33..07209ff 100644 --- a/cli/client/client.go +++ b/cli/client/client.go @@ -161,7 +161,7 @@ var Cmd = &cobra.Command{ continue } printOutput(true, &msg, timestamp) - // printInput(true) + printInput(true) } }()