From 3b92d7362bee4511047aea7c02f2d8110796eaa9 Mon Sep 17 00:00:00 2001 From: Evren Ispiroglu Date: Mon, 13 Nov 2023 21:36:21 +0300 Subject: [PATCH] refactor:change maps to sync.map --- .../subscriber-client/mercurius-client-sub.go | 3 +- internal/broker/broker.go | 29 +++---- internal/broker/subscriber.go | 47 +---------- internal/broker/subscriber_repository.go | 49 ++++++++++++ internal/broker/topic.go | 77 ++----------------- internal/broker/topic_repository.go | 55 +++++++++++++ pkg/client/client.go | 2 +- 7 files changed, 129 insertions(+), 133 deletions(-) create mode 100644 internal/broker/subscriber_repository.go create mode 100644 internal/broker/topic_repository.go diff --git a/cmd/mercurius-client/subscriber-client/mercurius-client-sub.go b/cmd/mercurius-client/subscriber-client/mercurius-client-sub.go index b0301ec..528860d 100644 --- a/cmd/mercurius-client/subscriber-client/mercurius-client-sub.go +++ b/cmd/mercurius-client/subscriber-client/mercurius-client-sub.go @@ -3,10 +3,11 @@ package main import ( "context" "fmt" - "github.com/google/uuid" "sync/atomic" "time" + "github.com/google/uuid" + k "github.com/ispiroglu/mercurius/internal/logger" "github.com/ispiroglu/mercurius/pkg/client" "github.com/ispiroglu/mercurius/proto" diff --git a/internal/broker/broker.go b/internal/broker/broker.go index 55eafbd..58984d5 100644 --- a/internal/broker/broker.go +++ b/internal/broker/broker.go @@ -39,6 +39,13 @@ func (b *Broker) Publish(event *pb.Event) (*pb.ACK, error) { return &pb.ACK{}, nil } +func (b *Broker) Unsubscribe(sub *Subscriber) { + b.TopicRepository.Unsubscribe(sub) + if err := b.SubscriberRepository.Unsubscribe(sub); err != nil { + b.logger.Warn("Failed to unsubscribe subscriber", zap.String("SubscriberID", sub.Id), zap.String("Subscriber Name", sub.Name), zap.Error(err)) + } +} + func (b *Broker) Subscribe(ctx context.Context, topicName string, sId string, sName string) (*Subscriber, error) { t, err := b.findOrInsertTopic(topicName) if err != nil { @@ -47,29 +54,23 @@ func (b *Broker) Subscribe(ctx context.Context, topicName string, sId string, sN s, err := t.AddSubscriber(ctx, sId, sName) if err != nil { - b.logger.Error("Broker could not add subscriber to topic", zap.String("Topic", topicName), zap.String("SubscriberID", sId)) //, zap.Error(err)) + b.logger.Error("Broker could not add subscriber to topic", zap.String("Topic", topicName), zap.String("SubscriberID", sId), zap.Error(err)) return nil, err } return s, nil } -func (b *Broker) Unsubscribe(sub *Subscriber) { - b.TopicRepository.Unsubscribe(sub) - if err := b.SubscriberRepository.Unsubscribe(sub); err != nil { - b.logger.Warn("", zap.Error(err)) - } -} - func (b *Broker) findOrInsertTopic(topicName string) (*Topic, error) { t, err := b.GetTopic(topicName) if err != nil { t, err = b.CreateTopic(topicName) - - st, ok := status.FromError(err) - if !ok && st.Code() != codes.AlreadyExists { - return nil, err - } else if st.Code() == codes.AlreadyExists { - t, _ = b.GetTopic(topicName) + if err != nil { + st, ok := status.FromError(err) + if !ok && st.Code() != codes.AlreadyExists { + return nil, err + } else if st.Code() == codes.AlreadyExists { + t, _ = b.GetTopic(topicName) + } } } diff --git a/internal/broker/subscriber.go b/internal/broker/subscriber.go index 48b6535..dc67dad 100644 --- a/internal/broker/subscriber.go +++ b/internal/broker/subscriber.go @@ -3,19 +3,16 @@ package broker import ( "context" "fmt" - "sync" "sync/atomic" "time" - "google.golang.org/grpc/codes" - "google.golang.org/grpc/status" - "github.com/ispiroglu/mercurius/internal/logger" "github.com/ispiroglu/mercurius/proto" "go.uber.org/zap" ) const totalEventCount = 100 * 100 * 100 +const channelSize = 1 // const subscriberBulkEventCount = 100 * 100 const subscriberBulkEventCount = 1 @@ -23,12 +20,6 @@ const subscriberBulkEventCount = 1 var messageCount = atomic.Uint64{} var _start time.Time -type SubscriberRepository struct { - sync.RWMutex - logger *zap.Logger - StreamPools map[string]*StreamPool -} - type Subscriber struct { logger *zap.Logger Id string @@ -39,20 +30,8 @@ type Subscriber struct { Ctx context.Context } -func (r *SubscriberRepository) Unsubscribe(subscriber *Subscriber) error { - r.Lock() - defer r.Unlock() - - if _, ok := r.StreamPools[subscriber.Id]; !ok { - return status.Error(codes.NotFound, "Cannot found subscriber at repository.") - } - - delete(r.StreamPools, subscriber.Id) - return nil -} - func NewSubscriber(ctx context.Context, sId string, sName string, topicName string) *Subscriber { - channelSize := 100 * 100 // This channel size should be configurable. + // This channel size should be configurable. eventChannel := make(chan *proto.Event, channelSize) return &Subscriber{ logger: logger.NewLogger(), @@ -65,13 +44,6 @@ func NewSubscriber(ctx context.Context, sId string, sName string, topicName stri } } -func NewSubscriberRepository() *SubscriberRepository { - return &SubscriberRepository{ - logger: logger.NewLogger(), - StreamPools: map[string]*StreamPool{}, - } -} - func (s *Subscriber) HandleBulkEvent(stream proto.Mercurius_SubscribeServer) error { eventBuffer := make([]*proto.Event, 0, subscriberBulkEventCount) for { @@ -99,21 +71,6 @@ func (s *Subscriber) sendEvent(bulkEvent *proto.BulkEvent, stream proto.Mercuriu } } -func (r *SubscriberRepository) addSubscriber(ctx context.Context, id string, subName string, topicName string) (*Subscriber, error) { - r.Lock() - defer r.Unlock() - - // Handle subName conflict? - - s := NewSubscriber(ctx, id, subName, topicName) - if r.StreamPools[subName] == nil { - r.StreamPools[subName] = newStreamPool(subName) - } - - r.StreamPools[subName].AddSubscriber(s) - return s, nil -} - func checkSentEventCount() { x := messageCount.Add(1) if x == 1 { diff --git a/internal/broker/subscriber_repository.go b/internal/broker/subscriber_repository.go new file mode 100644 index 0000000..f95afda --- /dev/null +++ b/internal/broker/subscriber_repository.go @@ -0,0 +1,49 @@ +package broker + +import ( + "context" + "sync" + "sync/atomic" + + "github.com/ispiroglu/mercurius/internal/logger" + "go.uber.org/zap" + "google.golang.org/grpc/codes" + "google.golang.org/grpc/status" +) + +type SubscriberRepository struct { + logger *zap.Logger + StreamPools sync.Map + poolCount atomic.Uint32 +} + +func NewSubscriberRepository() *SubscriberRepository { + return &SubscriberRepository{ + logger: logger.NewLogger(), + StreamPools: sync.Map{}, + } +} + +func (r *SubscriberRepository) Unsubscribe(subscriber *Subscriber) error { + + if _, ok := r.StreamPools.Load(subscriber.Id); !ok { + return status.Error(codes.NotFound, "Cannot found subscriber at repository.") + } + + r.StreamPools.Delete(subscriber.Id) + r.poolCount.Store( + r.poolCount.Load() - 1, + ) + return nil +} + +func (r *SubscriberRepository) addSubscriber(ctx context.Context, id string, subName string, topicName string) (*Subscriber, error) { + + // Handle subName conflict? + s := NewSubscriber(ctx, id, subName, topicName) + pool, _ := r.StreamPools.LoadOrStore(subName, newStreamPool(subName)) + + pool.(*StreamPool).AddSubscriber(s) + r.poolCount.Add(1) + return s, nil +} diff --git a/internal/broker/topic.go b/internal/broker/topic.go index bc02403..b9ba02b 100644 --- a/internal/broker/topic.go +++ b/internal/broker/topic.go @@ -22,57 +22,12 @@ type Topic struct { EventChan chan *proto.Event } -type TopicRepository struct { - sync.RWMutex - logger *zap.Logger - Topics map[string]*Topic -} - -type ITopicRepository interface { - GetTopic(string) (*Topic, error) - CreateTopic(string) (*Topic, error) - PublishEvent(*proto.Event) - AddSubscriber(context.Context, string, string) error -} - -func NewTopicRepository() *TopicRepository { - return &TopicRepository{ - logger: logger.NewLogger(), - Topics: map[string]*Topic{}, - } -} - -func (r *TopicRepository) GetTopic(name string) (*Topic, error) { - topic, exist := r.Topics[name] - if !exist { - // r.logger.Warn("Could not found topic", zap.String("Topic", name)) - return nil, status.Error(codes.NotFound, "cannot found the topic called:"+name) - } - - return topic, nil -} - -func (r *TopicRepository) CreateTopic(name string) (*Topic, error) { - r.Lock() - defer r.Unlock() - - if _, err := r.GetTopic(name); err == nil { - // r.logger.Warn("Cannot create the topic that already exists", zap.String("Topic", name)) - return nil, status.Error(codes.AlreadyExists, "there is already a topic named:"+name) - } - - createdTopic := newTopic(name) - r.Topics[name] = createdTopic - - return createdTopic, nil -} - var publishCount = uint32(100 * 100) var publish = atomic.Uint32{} var start time.Time func (t *Topic) PublishEvent(event *proto.Event) { - if len(t.SubscriberRepository.StreamPools) == 0 { + if t.SubscriberRepository.poolCount.Load() == 0 { t.EventChan <- event } else { publish.Add(1) @@ -85,15 +40,10 @@ func (t *Topic) PublishEvent(event *proto.Event) { fmt.Println("Total Routing Time: ", time.Since(start)) } - // This line produces a sync wait - // as it waits for the previous subscriber to complete its send operation before proceeding to the next subscriber. - // maybe a worker pool to minimize this? - // One subscribers fullness affects other subscribers. - // var ts time.Time = time.Now() - for _, s := range t.SubscriberRepository.StreamPools { - s.Ch <- event - } - // fmt.Println("Rotate etmem su kadar surdu", time.Since(ts), len(t.SubscriberRepository.StreamPools)) + t.SubscriberRepository.StreamPools.Range(func(k any, v interface{}) bool { + v.(*StreamPool).Ch <- event + return true + }) } } @@ -106,26 +56,9 @@ func (t *Topic) AddSubscriber(ctx context.Context, id string, name string) (*Sub return nil, status.Error(codes.AlreadyExists, errorMessage) } - // FIRSTY SUBSCRIBED - //t.logger.Info("Added subscriber", zap.String("Topic", t.Name), zap.String("sId", id), zap.String("sName", name)) - // if len(t.SubscriberRepository.StreamPools) == 1 { - // if len(t.EventChan) != 0 { - // for event := range t.EventChan { - // s.EventChannel <- event - // } - // } - // } - return s, nil } -func (r *TopicRepository) Unsubscribe(subscriber *Subscriber) { - t := subscriber.TopicName - if err := r.Topics[t].SubscriberRepository.Unsubscribe(subscriber); err != nil { - r.logger.Warn("", zap.Error(err)) - } -} - func newTopic(name string) *Topic { return &Topic{ logger: logger.NewLogger(), diff --git a/internal/broker/topic_repository.go b/internal/broker/topic_repository.go new file mode 100644 index 0000000..b1a12d4 --- /dev/null +++ b/internal/broker/topic_repository.go @@ -0,0 +1,55 @@ +package broker + +import ( + "sync" + + "github.com/ispiroglu/mercurius/internal/logger" + "go.uber.org/zap" + "google.golang.org/grpc/codes" + "google.golang.org/grpc/status" +) + +type TopicRepository struct { + logger *zap.Logger + Topics sync.Map +} + +func NewTopicRepository() *TopicRepository { + return &TopicRepository{ + logger: logger.NewLogger(), + Topics: sync.Map{}, + } +} + +func (r *TopicRepository) GetTopic(name string) (*Topic, error) { + topic, exist := r.Topics.Load(name) + if !exist { + // r.logger.Warn("Could not found topic", zap.String("Topic", name)) + return nil, status.Error(codes.NotFound, "cannot found the topic called:"+name) + } + + return topic.(*Topic), nil +} + +func (r *TopicRepository) CreateTopic(name string) (*Topic, error) { + if _, err := r.GetTopic(name); err == nil { + // r.logger.Warn("Cannot create the topic that already exists", zap.String("Topic", name)) + return nil, status.Error(codes.AlreadyExists, "there is already a topic named:"+name) + } + + createdTopic := newTopic(name) + r.Topics.Store(name, createdTopic) + + return createdTopic, nil +} + +func (r *TopicRepository) Unsubscribe(subscriber *Subscriber) { + topic, err := r.GetTopic(subscriber.TopicName) + if err != nil { + panic(err) + } + + if err := topic.SubscriberRepository.Unsubscribe(subscriber); err != nil { + r.logger.Warn("", zap.Error(err)) + } +} diff --git a/pkg/client/client.go b/pkg/client/client.go index 1c9e547..2f346f3 100644 --- a/pkg/client/client.go +++ b/pkg/client/client.go @@ -21,7 +21,7 @@ type Client struct { s *serialize.Serializer } -const streamPerSubscriber int = 50 +const streamPerSubscriber int = 20 var l = logger.NewLogger()