Skip to content

Commit

Permalink
WIP Basic limiter working, tests todo
Browse files Browse the repository at this point in the history
  • Loading branch information
Florimond committed Nov 3, 2023
1 parent 19906c2 commit 656d3f9
Show file tree
Hide file tree
Showing 12 changed files with 132 additions and 83 deletions.
2 changes: 1 addition & 1 deletion emitter.conf
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
{
"license": "POwgodYTutTG3fW_8buxReM7_8YuPY9oyfkm_3FMZq3e3CIQj1gWZaN1lCl33X-gy-B-RbKiwusF56yFYQE:3",
"license": "PfA8IOPQD29mo7STD_-9g2DhFhDyBq2rfRjU3oqa8yi6O8DAEHZtFblFg9Vc3-XW-nDpW2aivIn7CJbRmpUOAQ:3",
"listen": ":8080",
"tls": {
"listen": ":443"
Expand Down
8 changes: 4 additions & 4 deletions internal/provider/storage/memory_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,7 @@ func TestInMemory_Query(t *testing.T) {
})
}

out, err := s.Query(tc.query, zero, zero, tc.limit)
out, err := s.Query(tc.query, zero, zero, nil, NewMessageNumberLimiter(int64(tc.limit)))
assert.NoError(t, err)

count := 0
Expand Down Expand Up @@ -152,7 +152,7 @@ func TestInMemory_lookup(t *testing.T) {
}

for _, tc := range tests {
matches := s.lookup(newLookupQuery(tc.query, zero, zero, tc.limit))
matches := s.lookup(newLookupQuery(tc.query, zero, zero, nil, NewMessageNumberLimiter(int64(tc.limit))))
assert.Equal(t, tc.count, len(matches))
}
}
Expand All @@ -172,13 +172,13 @@ func TestInMemory_OnSurvey(t *testing.T) {
{name: "ssdstore"},
{
name: "ssdstore",
query: newLookupQuery(message.Ssid{0, 1}, zero, zero, 1),
query: newLookupQuery(message.Ssid{0, 1}, zero, zero, nil, NewMessageNumberLimiter(1)),
expectOk: true,
expectCount: 1,
},
{
name: "ssdstore",
query: newLookupQuery(message.Ssid{0, 1}, zero, zero, 10),
query: newLookupQuery(message.Ssid{0, 1}, zero, zero, nil, NewMessageNumberLimiter(10)),
expectOk: true,
expectCount: 2,
},
Expand Down
19 changes: 14 additions & 5 deletions internal/provider/storage/ssd.go
Original file line number Diff line number Diff line change
Expand Up @@ -143,7 +143,7 @@ func (s *SSD) Query(ssid message.Ssid, from, untilTime time.Time, untilID messag
}
}

match.Limit(limit)
limiter.Limit(&match)
return match, nil
}

Expand Down Expand Up @@ -175,6 +175,8 @@ func (s *SSD) OnSurvey(surveyType string, payload []byte) ([]byte, bool) {
// Lookup performs a against the storage.
func (s *SSD) lookup(q lookupQuery) (matches message.Frame) {
matches = make(message.Frame, 0) //TODO : explore the implications of not being able to presize with q.Limit
limiter := q.Limiter()

if err := s.db.View(func(tx *badger.Txn) error {
it := tx.NewIterator(badger.IteratorOptions{
PrefetchValues: false,
Expand All @@ -183,17 +185,24 @@ func (s *SSD) lookup(q lookupQuery) (matches message.Frame) {

// Since we're starting backwards, seek to the 'until' position first and then
// we'll iterate forward but have reverse time ('until' -> 'from')
prefix := q.UntilID
if len(prefix) == 0 {
var prefix message.ID
if len(q.UntilID) == 0 {
prefix = message.NewPrefix(q.Ssid, q.UntilTime)
it.Seek(prefix)
} else {
it.Seek(q.UntilID)
if !it.Valid() {
return nil
}
it.Next()
}

// Seek the prefix and check the key so we can quickly exit the iteration.
for it.Seek(prefix); it.Valid() &&
for ; it.Valid() &&
message.ID(it.Item().Key()).HasPrefix(q.Ssid, q.From); it.Next() {
if message.ID(it.Item().Key()).Match(q.Ssid, q.From, q.UntilTime) {
if msg, err := loadMessage(it.Item()); err == nil {
if !q.Limiter.Admit(&msg) {
if !limiter.Admit(&msg) {
return nil
}
matches = append(matches, msg)
Expand Down
11 changes: 6 additions & 5 deletions internal/provider/storage/ssd_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ func TestSSD_Query(t *testing.T) {
assert.NoError(t, err)

zero := time.Unix(0, 0)
f, err := store.Query([]uint32{0, 3, 2, 6}, zero, zero, 5)
f, err := store.Query([]uint32{0, 3, 2, 6}, zero, zero, nil, NewMessageNumberLimiter(5))
assert.NoError(t, err)
assert.Len(t, f, 1)
})
Expand Down Expand Up @@ -121,7 +121,8 @@ func TestSSD_QuerySurveyed(t *testing.T) {
})
}

out, err := s.Query(tc.query, zero, zero, tc.limit, nil)
limiter := NewMessageNumberLimiter(int64(tc.limit))
out, err := s.Query(tc.query, zero, zero, nil, limiter)
assert.NoError(t, err)
count := 0
for range out {
Expand All @@ -146,13 +147,13 @@ func TestSSD_OnSurvey(t *testing.T) {
{name: "ssdstore"},
{
name: "ssdstore",
query: newLookupQuery(message.Ssid{0, 1}, zero, zero, 1, nil),
query: newLookupQuery(message.Ssid{0, 1}, zero, zero, nil, NewMessageNumberLimiter(1)),
expectOk: true,
expectCount: 1,
},
{
name: "ssdstore",
query: newLookupQuery(message.Ssid{0, 1}, zero, zero, 10, nil),
query: newLookupQuery(message.Ssid{0, 1}, zero, zero, nil, NewMessageNumberLimiter(10)),
expectOk: true,
expectCount: 2,
},
Expand Down Expand Up @@ -316,7 +317,7 @@ func benchmarkQuery(b *testing.B, store *SSD, last int, m *stats.Metric) {
return

default:
store.Query(ssid, t0, t1, last, nil)
store.Query(ssid, t0, t1, nil, NewMessageNumberLimiter(int64(last)))
m.Update(int32(last))
}
}
Expand Down
65 changes: 44 additions & 21 deletions internal/provider/storage/storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,41 +66,64 @@ func window(from, until time.Time) (int64, int64) {

// The lookup query to send out to the cluster.
type lookupQuery struct {
Ssid message.Ssid // (required) The ssid to match.
From int64 // (required) The beginning of the time window.
UntilTime int64 // Lookup stops when reaches this time.
UntilID message.ID // Lookup stops when reaches this message ID.
Limiter Limiter // The maximum number of elements to return.
Ssid message.Ssid // (required) The ssid to match.
From int64 // (required) The beginning of the time window.
UntilTime int64 // Lookup stops when reaches this time.
UntilID message.ID // Lookup stops when reaches this message ID.
LimitByCount *MessageNumberLimiter
//LimitBySize *MessageSizeLimiter
}

// newLookupQuery creates a new lookup query
func newLookupQuery(ssid message.Ssid, from, until time.Time, untilID message.ID, limiter Limiter) lookupQuery {
t0, t1 := window(from, until)
query := lookupQuery{
Ssid: ssid,
From: t0,
UntilTime: t1,
UntilID: untilID,
}

switch v := limiter.(type) {
case *MessageNumberLimiter:
query.LimitByCount = v
}
return query
}

func (q *lookupQuery) Limiter() Limiter {
switch {
case q.LimitByCount != nil:
return q.LimitByCount
default:
return &MessageNumberLimiter{}
}
}

type Limiter interface {
Admit(*message.Message) bool
Limit(*message.Frame)
}

// MessageNumberLimiter provide an Limiter implementation to replace the "limit"
// parameter in the Query() function.
type MessageNumberLimiter struct {
count int64
limit int64
count int64 `binary:"-"`
MsgLimit int64
}

func (n *MessageNumberLimiter) Admit(m *message.Message) bool {
return n.count < n.limit
func (limiter *MessageNumberLimiter) Admit(m *message.Message) bool {
admit := limiter.count < limiter.MsgLimit
limiter.count += 1
return admit
}
func NewMessageNumberLimiter(limit int64) Limiter {
return &MessageNumberLimiter{limit: limit}

func (limiter *MessageNumberLimiter) Limit(frame *message.Frame) {
frame.Limit(int(limiter.MsgLimit))
}

// newLookupQuery creates a new lookup query
func newLookupQuery(ssid message.Ssid, from, until time.Time, untilID message.ID, limiter Limiter) lookupQuery {
t0, t1 := window(from, until)
return lookupQuery{
Ssid: ssid,
From: t0,
UntilTime: t1,
UntilID: untilID,
Limiter: limiter,
}
func NewMessageNumberLimiter(limit int64) Limiter {
return &MessageNumberLimiter{MsgLimit: limit}
}

// configUint32 retrieves an uint32 from the config
Expand Down
8 changes: 4 additions & 4 deletions internal/provider/storage/storage_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ func TestNoop_Store(t *testing.T) {
func TestNoop_Query(t *testing.T) {
s := new(Noop)
zero := time.Unix(0, 0)
r, err := s.Query(testMessage(1, 2, 3).Ssid(), zero, zero, 10)
r, err := s.Query(testMessage(1, 2, 3).Ssid(), zero, zero, nil, NewMessageNumberLimiter(10))
assert.NoError(t, err)
for range r {
t.Errorf("Should be empty")
Expand Down Expand Up @@ -80,7 +80,7 @@ func testOrder(t *testing.T, store Storage) {

// Issue a query
zero := time.Unix(0, 0)
f, err := store.Query([]uint32{0, 1, 2}, zero, zero, 5)
f, err := store.Query([]uint32{0, 1, 2}, zero, zero, nil, NewMessageNumberLimiter(5))
assert.NoError(t, err)

assert.Len(t, f, 5)
Expand All @@ -103,7 +103,7 @@ func testRetained(t *testing.T, store Storage) {

// Issue a query
zero := time.Unix(0, 0)
f, err := store.Query([]uint32{0, 1, 2}, zero, zero, 1)
f, err := store.Query([]uint32{0, 1, 2}, zero, zero, nil, NewMessageNumberLimiter(1))
assert.NoError(t, err)

assert.Len(t, f, 1)
Expand All @@ -126,7 +126,7 @@ func testRange(t *testing.T, store Storage) {
}

// Issue a query
f, err := store.Query([]uint32{0, 1, 2}, time.Unix(t0, 0), time.Unix(t1, 0), 5)
f, err := store.Query([]uint32{0, 1, 2}, time.Unix(t0, 0), time.Unix(t1, 0), nil, NewMessageNumberLimiter(5))
assert.NoError(t, err)

assert.Len(t, f, 5)
Expand Down
16 changes: 9 additions & 7 deletions internal/service/history/history.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"github.com/emitter-io/emitter/internal/errors"
"github.com/emitter-io/emitter/internal/message"
"github.com/emitter-io/emitter/internal/provider/logging"
"github.com/emitter-io/emitter/internal/provider/storage"
"github.com/emitter-io/emitter/internal/security"
"github.com/emitter-io/emitter/internal/service"
)
Expand Down Expand Up @@ -80,17 +81,18 @@ func (s *Service) OnRequest(c service.Conn, payload []byte) (service.Response, b
}

limit := int64(3)
if v, ok := channel.Last(); ok {
limit = v
}
messageLimiter := &limiter{
maxCount: limit,
}
// if v, ok := channel.Last(); ok {
// limit = v
// }
// messageLimiter := &limiter{
// maxCount: limit,
// }

ssid := message.NewSsid(key.Contract(), channel.Query)
t0, t1 := channel.Window() // Get the window

msgs, err := s.store.LimitedQuery(ssid, t0, t1, messageLimiter, request.LastMessageID)
messageLimiter := storage.NewMessageNumberLimiter(limit)
msgs, err := s.store.Query(ssid, t0, t1, request.LastMessageID, messageLimiter)
if err != nil {
logging.LogError("conn", "query last messages", err)
return errors.ErrServerError, false
Expand Down
Loading

0 comments on commit 656d3f9

Please sign in to comment.