Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Broadcast queue #15

Merged
merged 2 commits into from
Jan 14, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
33 changes: 16 additions & 17 deletions chotki.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,15 +77,16 @@ type Options struct {
Src uint64
Name string
Log1 protocol.Records
MaxLogLen int64
RelaxedOrder bool
Logger utils.Logger
PingPeriod time.Duration
PingWait time.Duration
PingPeriod time.Duration // how often should we ping neighbour replicae if its silent
PingWait time.Duration // how much time we wait until pong received
PebbleWriteOptions *pebble.WriteOptions
BroadcastBatchSize int
BroadcastTimeLimit time.Duration
ReadAccumTimeLimit time.Duration
BroadcastQueueMaxSize int // size in bytes, after reaching it all writes will block
BroadcastQueueMinBatchSize int // reads will wait until they have enough data or timelimit expires
// if this limit expires before read has enough data (BroadcastQueueMinBatchSize) it will return whatever it has,
// writes will cause overflow error which will result in queue shutdown and session end
BroadcastQueueTimeLimit time.Duration
ReadAccumTimeLimit time.Duration //
ReadMaxBufferSize int
ReadMinBufferSizeToProcess int
TcpReadBufferSize int
Expand All @@ -95,10 +96,6 @@ type Options struct {
}

func (o *Options) SetDefaults() {
if o.MaxLogLen == 0 {
o.MaxLogLen = 1 << 23
}

if o.PingPeriod == 0 {
o.PingPeriod = 30 * time.Second
}
Expand All @@ -118,8 +115,11 @@ func (o *Options) SetDefaults() {
o.PebbleWriteOptions = &pebble.WriteOptions{Sync: true}
}

if o.BroadcastTimeLimit == 0 {
o.BroadcastTimeLimit = time.Millisecond
if o.BroadcastQueueTimeLimit == 0 {
o.BroadcastQueueTimeLimit = time.Second
}
if o.BroadcastQueueMaxSize == 0 {
o.BroadcastQueueMaxSize = 10 * 1024 * 1024 // 10Mb
}
if o.ReadAccumTimeLimit == 0 {
o.ReadAccumTimeLimit = 5 * time.Second
Expand Down Expand Up @@ -236,9 +236,8 @@ func Open(dirname string, opts Options) (*Chotki, error) {

cho.net = protocol.NewNet(cho.log,
func(name string) protocol.FeedDrainCloserTraced { // new connection
const outQueueLimit = 1 << 20

queue := utils.NewFDQueue[protocol.Records](outQueueLimit, cho.opts.BroadcastTimeLimit, cho.opts.BroadcastBatchSize)
queue := utils.NewFDQueue[protocol.Records](cho.opts.BroadcastQueueMaxSize, cho.opts.BroadcastQueueTimeLimit, cho.opts.BroadcastQueueMinBatchSize)
if q, loaded := cho.outq.LoadAndStore(name, queue); loaded && q != nil {
cho.log.Warn(fmt.Sprintf("closing the old conn to %s", name))
if err := q.Close(); err != nil {
Expand Down Expand Up @@ -586,8 +585,8 @@ func (n *ChotkiCollector) Collect(m chan<- prometheus.Metric) {
need_to_pass[key] = true
}
n.chotki.outq.Range(func(key string, value protocol.DrainCloser) bool {
if q, ok := value.(*utils.FDQueue[protocol.Records, []byte]); ok {
m <- prometheus.MustNewConstMetric(n.outq_size, prometheus.GaugeValue, float64(q.Len()), key)
if q, ok := value.(*utils.FDQueue[protocol.Records]); ok {
m <- prometheus.MustNewConstMetric(n.outq_size, prometheus.GaugeValue, float64(q.Size()), key)
nw_collected[key] = struct{}{}
need_to_pass[key] = false
}
Expand Down
18 changes: 8 additions & 10 deletions protocol/net_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,11 +64,11 @@ func tlsConfig(servername string) *tls.Config {
}
}

type TracedQueue[S ~[]E, E any] struct {
*utils.FDQueue[S, E]
type TracedQueue[T ~[][]byte] struct {
*utils.FDQueue[T]
}

func (t *TracedQueue[S, E]) GetTraceId() string {
func (t *TracedQueue[T]) GetTraceId() string {
return ""
}

Expand All @@ -77,28 +77,26 @@ func TestTCPDepot_Connect(t *testing.T) {

log := utils.NewDefaultLogger(slog.LevelDebug)

lCon := utils.NewFDQueue[Records](16, time.Millisecond, 0)
lCon := utils.NewFDQueue[Records](1000, time.Minute, 1)
l := NewNet(log, func(_ string) FeedDrainCloserTraced {
return &TracedQueue[Records, []byte]{lCon}
return &TracedQueue[Records]{lCon}
}, func(_ string, t Traced) { lCon.Close() }, &NetTlsConfigOpt{tlsConfig("a.chotki.local")}, &NetWriteTimeoutOpt{Timeout: 1 * time.Minute})

err := l.Listen(loop)
assert.Nil(t, err)

cCon := utils.NewFDQueue[Records](16, time.Millisecond, 0)
cCon := utils.NewFDQueue[Records](1000, time.Minute, 1)
c := NewNet(log, func(_ string) FeedDrainCloserTraced {
return &TracedQueue[Records, []byte]{cCon}
return &TracedQueue[Records]{cCon}
}, func(_ string, t Traced) { cCon.Close() }, &NetTlsConfigOpt{tlsConfig("b.chotki.local")}, &NetWriteTimeoutOpt{Timeout: 1 * time.Minute})

err = c.Connect(loop)
assert.Nil(t, err)
time.Sleep(time.Second) // Wait connection, todo use events

// send a record
err = cCon.Drain(context.Background(), Records{Record('M', []byte("Hi there"))})
assert.Nil(t, err)

time.Sleep(1000 * time.Millisecond)
rec, err := lCon.Feed(context.Background())
assert.Nil(t, err)
assert.Greater(t, len(rec), 0)
Expand Down Expand Up @@ -136,7 +134,7 @@ func TestTCPDepot_ConnectFailed(t *testing.T) {

cCon := utils.NewFDQueue[Records](16, time.Millisecond, 0)
c := NewNet(log, func(_ string) FeedDrainCloserTraced {
return &TracedQueue[Records, []byte]{cCon}
return &TracedQueue[Records]{cCon}
}, func(_ string, t Traced) { cCon.Close() }, &NetTlsConfigOpt{tlsConfig("b.chotki.local")})

err := c.Connect(loop)
Expand Down
Loading
Loading