Skip to content

Commit

Permalink
zmq4: remove load-balanced writer
Browse files Browse the repository at this point in the history
  • Loading branch information
sbinet committed Oct 21, 2020
1 parent 1cd6417 commit 14db8f5
Showing 1 changed file with 0 additions and 60 deletions.
60 changes: 0 additions & 60 deletions msgio.go
Original file line number Diff line number Diff line change
Expand Up @@ -180,65 +180,6 @@ func (w *mwriter) write(ctx context.Context, msg Msg) error {
return err
}

type lbwriter struct {
ctx context.Context
c chan Msg
sem *semaphore
}

func newLBWriter(ctx context.Context) *lbwriter {
const size = 10
return &lbwriter{
ctx: ctx,
c: make(chan Msg, size),
sem: newSemaphore(),
}
}

func (lw *lbwriter) Close() error {
close(lw.c)
return nil
}

func (lw *lbwriter) addConn(w *Conn) {
lw.sem.enable()
go lw.listen(lw.ctx, w)
}

func (*lbwriter) rmConn(w *Conn) {}

func (lw *lbwriter) write(ctx context.Context, msg Msg) error {
lw.sem.lock()
select {
case <-ctx.Done():
return ctx.Err()
case lw.c <- msg:
return nil
}
}

func (lw *lbwriter) listen(ctx context.Context, w *Conn) {
defer lw.rmConn(w)
defer w.Close()

for {
select {
case <-ctx.Done():
return
case msg, ok := <-lw.c:
if !ok {
return
}
err := w.SendMsg(msg)
if err != nil {
// try another msg writer
lw.c <- msg
break
}
}
}
}

type semaphore struct {
ready chan struct{}
}
Expand All @@ -265,5 +206,4 @@ func (sem *semaphore) lock() {
var (
_ rpool = (*qreader)(nil)
_ wpool = (*mwriter)(nil)
_ wpool = (*lbwriter)(nil)
)

0 comments on commit 14db8f5

Please sign in to comment.