Skip to content

Commit

Permalink
Merge pull request #21 from ispiroglu/stream-pool
Browse files Browse the repository at this point in the history
Implemented Stream pool
  • Loading branch information
ispiroglu committed Dec 22, 2023
2 parents f493393 + 3b92d73 commit f833bdf
Show file tree
Hide file tree
Showing 17 changed files with 458 additions and 351 deletions.
8 changes: 0 additions & 8 deletions .idea/.gitignore

This file was deleted.

16 changes: 10 additions & 6 deletions cmd/mercurius-client/publisher-client/mercurius-client.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@ import (
"sync/atomic"
"time"

"github.com/google/uuid"

logger2 "github.com/ispiroglu/mercurius/internal/logger"
"github.com/ispiroglu/mercurius/pkg/client"
"go.uber.org/zap"
Expand All @@ -23,15 +25,17 @@ var N = 100 * 100
var start time.Time

func main() {
c, err := client.NewClient(CLIENT_NAME, ADDR)
id, _ := uuid.NewUUID()
c, err := client.NewClient(id, ADDR)
if err != nil {
logger.Error("Err", zap.Error(err))
}

logger.Info("Published Event")

var z time.Duration
wg := sync.WaitGroup{}
wg.Add(N)
uintN := uint64(N)
for i := 0; i < N; i++ {
go func(w *sync.WaitGroup) {
for j := 0; j < 1; j++ {
Expand All @@ -43,19 +47,19 @@ func main() {
start = time.Now()
}
fmt.Println(x)
if x == 1000*1000 {
z := time.Since(start)
fmt.Println("Execution time: ", z)
if x == uintN {
z = time.Since(start)
}
fmt.Println(strconv.FormatUint(x, 10))
//time.Sleep(time.Millisecond)
}
w.Done()
}(&wg)
//time.Sleep(200 * time.Second)
}

wg.Wait()
fmt.Println("Execution time: ", z)

}

// package main
Expand Down
19 changes: 12 additions & 7 deletions cmd/mercurius-client/subscriber-client/mercurius-client-sub.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@ import (
"sync/atomic"
"time"

"github.com/google/uuid"

k "github.com/ispiroglu/mercurius/internal/logger"
"github.com/ispiroglu/mercurius/pkg/client"
"github.com/ispiroglu/mercurius/proto"
Expand All @@ -15,7 +17,8 @@ import (
const ADDR = "0.0.0.0:9000"
const TopicName = "one-to-one"
const CLIENT_NAME = "Sample Client"
const N = 100 * 100 * 100
const subCount = 100
const N = 100 * 100 * subCount

var messageCount = atomic.Uint64{}
var start = time.Time{}
Expand All @@ -24,12 +27,14 @@ var ctx, _ = context.WithCancel(context.Background())
var ch = make(chan struct{})

func main() {
c, err := client.NewClient(CLIENT_NAME, ADDR)
if err != nil {
logger.Error("Err", zap.Error(err))
}
for i := 0; i < 100; i++ {

for i := 0; i < subCount; i++ {
go func() {
id, _ := uuid.NewUUID()
c, err := client.NewClient(id, ADDR)
if err != nil {
logger.Error("Err", zap.Error(err))
}
if err := c.Subscribe(TopicName, ctx, handler); err != nil {
logger.Error("Err", zap.Error(err))
}
Expand All @@ -45,7 +50,7 @@ func handler(e *proto.Event) error {
if x == 1 {
start = time.Now()
}
fmt.Println(x)
// fmt.Println(string(e.Body))
if x == N {
z := time.Since(start)
fmt.Println("Execution time: ", z)
Expand Down
3 changes: 2 additions & 1 deletion cmd/mercurius/mercurius.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,14 @@ package main
import (
"net"

"net/http"

"github.com/ispiroglu/mercurius/internal/logger"
sv "github.com/ispiroglu/mercurius/internal/server"
"github.com/ispiroglu/mercurius/proto"
"github.com/prometheus/client_golang/prometheus/promhttp"
"go.uber.org/zap"
"google.golang.org/grpc"
"net/http"
)

const ADDR = "0.0.0.0:9000"
Expand Down
37 changes: 15 additions & 22 deletions internal/broker/broker.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,6 @@ func NewBroker() *Broker {
}

func (b *Broker) Publish(event *pb.Event) (*pb.ACK, error) {
//b.logger.Info("Broker received event for publishing", zap.String("Topic", event.Topic))

t, err := b.findOrInsertTopic(event.Topic)
if err != nil {
return nil, err
Expand All @@ -41,43 +39,38 @@ func (b *Broker) Publish(event *pb.Event) (*pb.ACK, error) {
return &pb.ACK{}, nil
}

func (b *Broker) Subscribe(ctx context.Context, topicName string, sId string, sName string) (*Subscriber, error) {
b.logger.Info("Broker received subscription request", zap.String("Topic", topicName), zap.String("SubscriberID", sId))
func (b *Broker) Unsubscribe(sub *Subscriber) {
b.TopicRepository.Unsubscribe(sub)
if err := b.SubscriberRepository.Unsubscribe(sub); err != nil {
b.logger.Warn("Failed to unsubscribe subscriber", zap.String("SubscriberID", sub.Id), zap.String("Subscriber Name", sub.Name), zap.Error(err))
}
}

func (b *Broker) Subscribe(ctx context.Context, topicName string, sId string, sName string) (*Subscriber, error) {
t, err := b.findOrInsertTopic(topicName)
if err != nil {
return nil, err
}

s, err := t.AddSubscriber(ctx, sId, sName)
if err != nil {
b.logger.Error("Broker could not add subscriber to topic", zap.String("Topic", topicName), zap.String("SubscriberID", sId)) //, zap.Error(err))
b.logger.Error("Broker could not add subscriber to topic", zap.String("Topic", topicName), zap.String("SubscriberID", sId), zap.Error(err))
return nil, err
}
go b.SubscriberRepository.addSub(s)
return s, nil
}

func (b *Broker) Unsubscribe(sub *Subscriber) {
b.logger.Info("Unsubscribing", zap.String("ID", sub.Id), zap.String("Subscriber Name", sub.Name))
b.TopicRepository.Unsubscribe(sub)
b.SubscriberRepository.Lock()
delete(b.SubscriberRepository.Subscribers, sub.Id)
b.SubscriberRepository.Unlock()
}

func (b *Broker) findOrInsertTopic(topicName string) (*Topic, error) {
t, err := b.GetTopic(topicName)
if err != nil {
b.logger.Info("Broker cannot found the topic to subscribe", zap.String("Topic", topicName)) //, zap.Error(err))
t, err = b.CreateTopic(topicName)

st, ok := status.FromError(err)
if !ok && st.Code() != codes.AlreadyExists {
b.logger.Error("Broker cannot create topic", zap.String("Topic", topicName)) //, zap.Error(err), zap.Bool("A", st.Code() != codes.AlreadyExists), zap.String("Code", st.String()))
return nil, err
} else if st.Code() == codes.AlreadyExists {
t, _ = b.GetTopic(topicName)
if err != nil {
st, ok := status.FromError(err)
if !ok && st.Code() != codes.AlreadyExists {
return nil, err
} else if st.Code() == codes.AlreadyExists {
t, _ = b.GetTopic(topicName)
}
}
}

Expand Down
37 changes: 37 additions & 0 deletions internal/broker/stream_pool.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
package broker

import (
"github.com/ispiroglu/mercurius/proto"
"sync"
)

type StreamPool struct {
SubscriberName string
Streams []*Subscriber
Ch chan *proto.Event
sync.Mutex
}

func newStreamPool(name string) *StreamPool {

return &StreamPool{
SubscriberName: name,
Streams: make([]*Subscriber, 0),
Ch: make(chan *proto.Event),
Mutex: sync.Mutex{},
}
}

func (p *StreamPool) AddSubscriber(s *Subscriber) {
p.Lock()
defer p.Unlock()

p.Streams = append(p.Streams, s)
go s.worker(p.Ch)
}

func (s *Subscriber) worker(ch chan *proto.Event) {
for e := range ch {
s.EventChannel <- e
}
}
86 changes: 45 additions & 41 deletions internal/broker/subscriber.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,21 +2,23 @@ package broker

import (
"context"
"sync"

"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
"fmt"
"sync/atomic"
"time"

"github.com/ispiroglu/mercurius/internal/logger"
"github.com/ispiroglu/mercurius/proto"
"go.uber.org/zap"
)

type SubscriberRepository struct {
sync.RWMutex
logger *zap.Logger
Subscribers map[string]*Subscriber
}
const totalEventCount = 100 * 100 * 100
const channelSize = 1

// const subscriberBulkEventCount = 100 * 100
const subscriberBulkEventCount = 1

var messageCount = atomic.Uint64{}
var _start time.Time

type Subscriber struct {
logger *zap.Logger
Expand All @@ -28,52 +30,54 @@ type Subscriber struct {
Ctx context.Context
}

func (r *SubscriberRepository) Unsubscribe(subscriber *Subscriber) error {
r.Lock()
defer r.Unlock()

if _, ok := r.Subscribers[subscriber.Id]; !ok {
return status.Error(codes.NotFound, "Cannot found subscriber at repository.")
}

delete(r.Subscribers, subscriber.Id)
return nil
}

func NewSubscriber(ctx context.Context, sId string, sName string, topicName string) *Subscriber {
eq := make(chan *proto.Event)
// This channel size should be configurable.
eventChannel := make(chan *proto.Event, channelSize)
return &Subscriber{
logger: logger.NewLogger(),
Id: sId,
Name: sName,
EventChannel: eq,
RetryQueue: SubscriberRetryHandler.CreateRetryQueue(sId, eq),
EventChannel: eventChannel,
RetryQueue: SubscriberRetryHandler.CreateRetryQueue(sId, eventChannel),
TopicName: topicName,
Ctx: ctx,
}
}

func NewSubscriberRepository() *SubscriberRepository {
return &SubscriberRepository{
logger: logger.NewLogger(),
Subscribers: map[string]*Subscriber{},
func (s *Subscriber) HandleBulkEvent(stream proto.Mercurius_SubscribeServer) error {
eventBuffer := make([]*proto.Event, 0, subscriberBulkEventCount)
for {
select {
case <-s.Ctx.Done():
return nil
case event := <-s.EventChannel:
checkSentEventCount()
eventBuffer = append(eventBuffer, event)
if len(eventBuffer) == subscriberBulkEventCount {
bulkEvent := &proto.BulkEvent{
EventList: eventBuffer,
}
s.sendEvent(bulkEvent, stream)
eventBuffer = eventBuffer[:0]
}
}
}
}

func (r *SubscriberRepository) addSubscriber(ctx context.Context, id string, subName string, topicName string) (*Subscriber, error) {
r.Lock()
defer r.Unlock()
if r.Subscribers[id] != nil {
return nil, status.Error(codes.AlreadyExists, "Already Exists")
func (s *Subscriber) sendEvent(bulkEvent *proto.BulkEvent, stream proto.Mercurius_SubscribeServer) {
if err := stream.Send(bulkEvent); err != nil {
s.logger.Error("Error on sending event", zap.String("Error: ", err.Error()), zap.String("SubscriberID", s.Id), zap.String("Subscriber Name", s.Name)) //, zap.Error(err))
// sub.RetryQueue <- event
}

s := NewSubscriber(ctx, id, subName, topicName)
r.Subscribers[id] = s
return s, nil
}

func (r *SubscriberRepository) addSub(s *Subscriber) {
r.Lock()
defer r.Unlock()
r.Subscribers[s.Id] = s
func checkSentEventCount() {
x := messageCount.Add(1)
if x == 1 {
_start = time.Now()
}
if x == totalEventCount {
z := time.Since(_start)
fmt.Println("Total stream send time: ", z)
}
}
Loading

0 comments on commit f833bdf

Please sign in to comment.