Skip to content

Commit

Permalink
refactor:change maps to sync.map
Browse files Browse the repository at this point in the history
  • Loading branch information
ispiroglu committed Nov 13, 2023
1 parent d668748 commit 3b92d73
Show file tree
Hide file tree
Showing 7 changed files with 129 additions and 133 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,11 @@ package main
import (
"context"
"fmt"
"github.com/google/uuid"
"sync/atomic"
"time"

"github.com/google/uuid"

k "github.com/ispiroglu/mercurius/internal/logger"
"github.com/ispiroglu/mercurius/pkg/client"
"github.com/ispiroglu/mercurius/proto"
Expand Down
29 changes: 15 additions & 14 deletions internal/broker/broker.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,13 @@ func (b *Broker) Publish(event *pb.Event) (*pb.ACK, error) {
return &pb.ACK{}, nil
}

func (b *Broker) Unsubscribe(sub *Subscriber) {
b.TopicRepository.Unsubscribe(sub)
if err := b.SubscriberRepository.Unsubscribe(sub); err != nil {
b.logger.Warn("Failed to unsubscribe subscriber", zap.String("SubscriberID", sub.Id), zap.String("Subscriber Name", sub.Name), zap.Error(err))
}
}

func (b *Broker) Subscribe(ctx context.Context, topicName string, sId string, sName string) (*Subscriber, error) {
t, err := b.findOrInsertTopic(topicName)
if err != nil {
Expand All @@ -47,29 +54,23 @@ func (b *Broker) Subscribe(ctx context.Context, topicName string, sId string, sN

s, err := t.AddSubscriber(ctx, sId, sName)
if err != nil {
b.logger.Error("Broker could not add subscriber to topic", zap.String("Topic", topicName), zap.String("SubscriberID", sId)) //, zap.Error(err))
b.logger.Error("Broker could not add subscriber to topic", zap.String("Topic", topicName), zap.String("SubscriberID", sId), zap.Error(err))
return nil, err
}
return s, nil
}

func (b *Broker) Unsubscribe(sub *Subscriber) {
b.TopicRepository.Unsubscribe(sub)
if err := b.SubscriberRepository.Unsubscribe(sub); err != nil {
b.logger.Warn("", zap.Error(err))
}
}

func (b *Broker) findOrInsertTopic(topicName string) (*Topic, error) {
t, err := b.GetTopic(topicName)
if err != nil {
t, err = b.CreateTopic(topicName)

st, ok := status.FromError(err)
if !ok && st.Code() != codes.AlreadyExists {
return nil, err
} else if st.Code() == codes.AlreadyExists {
t, _ = b.GetTopic(topicName)
if err != nil {
st, ok := status.FromError(err)
if !ok && st.Code() != codes.AlreadyExists {
return nil, err
} else if st.Code() == codes.AlreadyExists {
t, _ = b.GetTopic(topicName)
}
}
}

Expand Down
47 changes: 2 additions & 45 deletions internal/broker/subscriber.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,32 +3,23 @@ package broker
import (
"context"
"fmt"
"sync"
"sync/atomic"
"time"

"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"

"github.com/ispiroglu/mercurius/internal/logger"
"github.com/ispiroglu/mercurius/proto"
"go.uber.org/zap"
)

const totalEventCount = 100 * 100 * 100
const channelSize = 1

// const subscriberBulkEventCount = 100 * 100
const subscriberBulkEventCount = 1

var messageCount = atomic.Uint64{}
var _start time.Time

type SubscriberRepository struct {
sync.RWMutex
logger *zap.Logger
StreamPools map[string]*StreamPool
}

type Subscriber struct {
logger *zap.Logger
Id string
Expand All @@ -39,20 +30,8 @@ type Subscriber struct {
Ctx context.Context
}

func (r *SubscriberRepository) Unsubscribe(subscriber *Subscriber) error {
r.Lock()
defer r.Unlock()

if _, ok := r.StreamPools[subscriber.Id]; !ok {
return status.Error(codes.NotFound, "Cannot found subscriber at repository.")
}

delete(r.StreamPools, subscriber.Id)
return nil
}

func NewSubscriber(ctx context.Context, sId string, sName string, topicName string) *Subscriber {
channelSize := 100 * 100 // This channel size should be configurable.
// This channel size should be configurable.
eventChannel := make(chan *proto.Event, channelSize)
return &Subscriber{
logger: logger.NewLogger(),
Expand All @@ -65,13 +44,6 @@ func NewSubscriber(ctx context.Context, sId string, sName string, topicName stri
}
}

func NewSubscriberRepository() *SubscriberRepository {
return &SubscriberRepository{
logger: logger.NewLogger(),
StreamPools: map[string]*StreamPool{},
}
}

func (s *Subscriber) HandleBulkEvent(stream proto.Mercurius_SubscribeServer) error {
eventBuffer := make([]*proto.Event, 0, subscriberBulkEventCount)
for {
Expand Down Expand Up @@ -99,21 +71,6 @@ func (s *Subscriber) sendEvent(bulkEvent *proto.BulkEvent, stream proto.Mercuriu
}
}

func (r *SubscriberRepository) addSubscriber(ctx context.Context, id string, subName string, topicName string) (*Subscriber, error) {
r.Lock()
defer r.Unlock()

// Handle subName conflict?

s := NewSubscriber(ctx, id, subName, topicName)
if r.StreamPools[subName] == nil {
r.StreamPools[subName] = newStreamPool(subName)
}

r.StreamPools[subName].AddSubscriber(s)
return s, nil
}

func checkSentEventCount() {
x := messageCount.Add(1)
if x == 1 {
Expand Down
49 changes: 49 additions & 0 deletions internal/broker/subscriber_repository.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
package broker

import (
"context"
"sync"
"sync/atomic"

"github.com/ispiroglu/mercurius/internal/logger"
"go.uber.org/zap"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
)

type SubscriberRepository struct {
logger *zap.Logger
StreamPools sync.Map
poolCount atomic.Uint32
}

func NewSubscriberRepository() *SubscriberRepository {
return &SubscriberRepository{
logger: logger.NewLogger(),
StreamPools: sync.Map{},
}
}

func (r *SubscriberRepository) Unsubscribe(subscriber *Subscriber) error {

if _, ok := r.StreamPools.Load(subscriber.Id); !ok {
return status.Error(codes.NotFound, "Cannot found subscriber at repository.")
}

r.StreamPools.Delete(subscriber.Id)
r.poolCount.Store(
r.poolCount.Load() - 1,
)
return nil
}

func (r *SubscriberRepository) addSubscriber(ctx context.Context, id string, subName string, topicName string) (*Subscriber, error) {

// Handle subName conflict?
s := NewSubscriber(ctx, id, subName, topicName)
pool, _ := r.StreamPools.LoadOrStore(subName, newStreamPool(subName))

pool.(*StreamPool).AddSubscriber(s)
r.poolCount.Add(1)
return s, nil
}
77 changes: 5 additions & 72 deletions internal/broker/topic.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,57 +22,12 @@ type Topic struct {
EventChan chan *proto.Event
}

type TopicRepository struct {
sync.RWMutex
logger *zap.Logger
Topics map[string]*Topic
}

type ITopicRepository interface {
GetTopic(string) (*Topic, error)
CreateTopic(string) (*Topic, error)
PublishEvent(*proto.Event)
AddSubscriber(context.Context, string, string) error
}

func NewTopicRepository() *TopicRepository {
return &TopicRepository{
logger: logger.NewLogger(),
Topics: map[string]*Topic{},
}
}

func (r *TopicRepository) GetTopic(name string) (*Topic, error) {
topic, exist := r.Topics[name]
if !exist {
// r.logger.Warn("Could not found topic", zap.String("Topic", name))
return nil, status.Error(codes.NotFound, "cannot found the topic called:"+name)
}

return topic, nil
}

func (r *TopicRepository) CreateTopic(name string) (*Topic, error) {
r.Lock()
defer r.Unlock()

if _, err := r.GetTopic(name); err == nil {
// r.logger.Warn("Cannot create the topic that already exists", zap.String("Topic", name))
return nil, status.Error(codes.AlreadyExists, "there is already a topic named:"+name)
}

createdTopic := newTopic(name)
r.Topics[name] = createdTopic

return createdTopic, nil
}

var publishCount = uint32(100 * 100)
var publish = atomic.Uint32{}
var start time.Time

func (t *Topic) PublishEvent(event *proto.Event) {
if len(t.SubscriberRepository.StreamPools) == 0 {
if t.SubscriberRepository.poolCount.Load() == 0 {
t.EventChan <- event
} else {
publish.Add(1)
Expand All @@ -85,15 +40,10 @@ func (t *Topic) PublishEvent(event *proto.Event) {
fmt.Println("Total Routing Time: ", time.Since(start))
}

// This line produces a sync wait
// as it waits for the previous subscriber to complete its send operation before proceeding to the next subscriber.
// maybe a worker pool to minimize this?
// One subscribers fullness affects other subscribers.
// var ts time.Time = time.Now()
for _, s := range t.SubscriberRepository.StreamPools {
s.Ch <- event
}
// fmt.Println("Rotate etmem su kadar surdu", time.Since(ts), len(t.SubscriberRepository.StreamPools))
t.SubscriberRepository.StreamPools.Range(func(k any, v interface{}) bool {
v.(*StreamPool).Ch <- event
return true
})
}
}

Expand All @@ -106,26 +56,9 @@ func (t *Topic) AddSubscriber(ctx context.Context, id string, name string) (*Sub
return nil, status.Error(codes.AlreadyExists, errorMessage)
}

// FIRSTY SUBSCRIBED
//t.logger.Info("Added subscriber", zap.String("Topic", t.Name), zap.String("sId", id), zap.String("sName", name))
// if len(t.SubscriberRepository.StreamPools) == 1 {
// if len(t.EventChan) != 0 {
// for event := range t.EventChan {
// s.EventChannel <- event
// }
// }
// }

return s, nil
}

func (r *TopicRepository) Unsubscribe(subscriber *Subscriber) {
t := subscriber.TopicName
if err := r.Topics[t].SubscriberRepository.Unsubscribe(subscriber); err != nil {
r.logger.Warn("", zap.Error(err))
}
}

func newTopic(name string) *Topic {
return &Topic{
logger: logger.NewLogger(),
Expand Down
55 changes: 55 additions & 0 deletions internal/broker/topic_repository.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
package broker

import (
"sync"

"github.com/ispiroglu/mercurius/internal/logger"
"go.uber.org/zap"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
)

type TopicRepository struct {
logger *zap.Logger
Topics sync.Map
}

func NewTopicRepository() *TopicRepository {
return &TopicRepository{
logger: logger.NewLogger(),
Topics: sync.Map{},
}
}

func (r *TopicRepository) GetTopic(name string) (*Topic, error) {
topic, exist := r.Topics.Load(name)
if !exist {
// r.logger.Warn("Could not found topic", zap.String("Topic", name))
return nil, status.Error(codes.NotFound, "cannot found the topic called:"+name)
}

return topic.(*Topic), nil
}

func (r *TopicRepository) CreateTopic(name string) (*Topic, error) {
if _, err := r.GetTopic(name); err == nil {
// r.logger.Warn("Cannot create the topic that already exists", zap.String("Topic", name))
return nil, status.Error(codes.AlreadyExists, "there is already a topic named:"+name)
}

createdTopic := newTopic(name)
r.Topics.Store(name, createdTopic)

return createdTopic, nil
}

func (r *TopicRepository) Unsubscribe(subscriber *Subscriber) {
topic, err := r.GetTopic(subscriber.TopicName)
if err != nil {
panic(err)
}

if err := topic.SubscriberRepository.Unsubscribe(subscriber); err != nil {
r.logger.Warn("", zap.Error(err))
}
}
2 changes: 1 addition & 1 deletion pkg/client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ type Client struct {
s *serialize.Serializer
}

const streamPerSubscriber int = 50
const streamPerSubscriber int = 20

var l = logger.NewLogger()

Expand Down

0 comments on commit 3b92d73

Please sign in to comment.