diff --git a/emitter.conf b/emitter.conf index 5fa1437e..17066b20 100644 --- a/emitter.conf +++ b/emitter.conf @@ -1,5 +1,5 @@ { - "license": "POwgodYTutTG3fW_8buxReM7_8YuPY9oyfkm_3FMZq3e3CIQj1gWZaN1lCl33X-gy-B-RbKiwusF56yFYQE:3", + "license": "PfA8IOPQD29mo7STD_-9g2DhFhDyBq2rfRjU3oqa8yi6O8DAEHZtFblFg9Vc3-XW-nDpW2aivIn7CJbRmpUOAQ:3", "listen": ":8080", "tls": { "listen": ":443" diff --git a/internal/provider/storage/memory_test.go b/internal/provider/storage/memory_test.go index 5f045087..f4cc5ad4 100644 --- a/internal/provider/storage/memory_test.go +++ b/internal/provider/storage/memory_test.go @@ -122,7 +122,7 @@ func TestInMemory_Query(t *testing.T) { }) } - out, err := s.Query(tc.query, zero, zero, tc.limit) + out, err := s.Query(tc.query, zero, zero, nil, NewMessageNumberLimiter(int64(tc.limit))) assert.NoError(t, err) count := 0 @@ -152,7 +152,7 @@ func TestInMemory_lookup(t *testing.T) { } for _, tc := range tests { - matches := s.lookup(newLookupQuery(tc.query, zero, zero, tc.limit)) + matches := s.lookup(newLookupQuery(tc.query, zero, zero, nil, NewMessageNumberLimiter(int64(tc.limit)))) assert.Equal(t, tc.count, len(matches)) } } @@ -172,13 +172,13 @@ func TestInMemory_OnSurvey(t *testing.T) { {name: "ssdstore"}, { name: "ssdstore", - query: newLookupQuery(message.Ssid{0, 1}, zero, zero, 1), + query: newLookupQuery(message.Ssid{0, 1}, zero, zero, nil, NewMessageNumberLimiter(1)), expectOk: true, expectCount: 1, }, { name: "ssdstore", - query: newLookupQuery(message.Ssid{0, 1}, zero, zero, 10), + query: newLookupQuery(message.Ssid{0, 1}, zero, zero, nil, NewMessageNumberLimiter(10)), expectOk: true, expectCount: 2, }, diff --git a/internal/provider/storage/ssd.go b/internal/provider/storage/ssd.go index 3309dece..0b17266e 100644 --- a/internal/provider/storage/ssd.go +++ b/internal/provider/storage/ssd.go @@ -143,7 +143,7 @@ func (s *SSD) Query(ssid message.Ssid, from, untilTime time.Time, untilID messag } } - match.Limit(limit) + limiter.Limit(&match) return match, nil } @@ -175,6 +175,8 @@ func (s *SSD) OnSurvey(surveyType string, payload []byte) ([]byte, bool) { // Lookup performs a against the storage. func (s *SSD) lookup(q lookupQuery) (matches message.Frame) { matches = make(message.Frame, 0) //TODO : explore the implications of not being able to presize with q.Limit + limiter := q.Limiter() + if err := s.db.View(func(tx *badger.Txn) error { it := tx.NewIterator(badger.IteratorOptions{ PrefetchValues: false, @@ -183,17 +185,24 @@ func (s *SSD) lookup(q lookupQuery) (matches message.Frame) { // Since we're starting backwards, seek to the 'until' position first and then // we'll iterate forward but have reverse time ('until' -> 'from') - prefix := q.UntilID - if len(prefix) == 0 { + var prefix message.ID + if len(q.UntilID) == 0 { prefix = message.NewPrefix(q.Ssid, q.UntilTime) + it.Seek(prefix) + } else { + it.Seek(q.UntilID) + if !it.Valid() { + return nil + } + it.Next() } // Seek the prefix and check the key so we can quickly exit the iteration. - for it.Seek(prefix); it.Valid() && + for ; it.Valid() && message.ID(it.Item().Key()).HasPrefix(q.Ssid, q.From); it.Next() { if message.ID(it.Item().Key()).Match(q.Ssid, q.From, q.UntilTime) { if msg, err := loadMessage(it.Item()); err == nil { - if !q.Limiter.Admit(&msg) { + if !limiter.Admit(&msg) { return nil } matches = append(matches, msg) diff --git a/internal/provider/storage/ssd_test.go b/internal/provider/storage/ssd_test.go index 72083a02..a5298437 100644 --- a/internal/provider/storage/ssd_test.go +++ b/internal/provider/storage/ssd_test.go @@ -71,7 +71,7 @@ func TestSSD_Query(t *testing.T) { assert.NoError(t, err) zero := time.Unix(0, 0) - f, err := store.Query([]uint32{0, 3, 2, 6}, zero, zero, 5) + f, err := store.Query([]uint32{0, 3, 2, 6}, zero, zero, nil, NewMessageNumberLimiter(5)) assert.NoError(t, err) assert.Len(t, f, 1) }) @@ -121,7 +121,8 @@ func TestSSD_QuerySurveyed(t *testing.T) { }) } - out, err := s.Query(tc.query, zero, zero, tc.limit, nil) + limiter := NewMessageNumberLimiter(int64(tc.limit)) + out, err := s.Query(tc.query, zero, zero, nil, limiter) assert.NoError(t, err) count := 0 for range out { @@ -146,13 +147,13 @@ func TestSSD_OnSurvey(t *testing.T) { {name: "ssdstore"}, { name: "ssdstore", - query: newLookupQuery(message.Ssid{0, 1}, zero, zero, 1, nil), + query: newLookupQuery(message.Ssid{0, 1}, zero, zero, nil, NewMessageNumberLimiter(1)), expectOk: true, expectCount: 1, }, { name: "ssdstore", - query: newLookupQuery(message.Ssid{0, 1}, zero, zero, 10, nil), + query: newLookupQuery(message.Ssid{0, 1}, zero, zero, nil, NewMessageNumberLimiter(10)), expectOk: true, expectCount: 2, }, @@ -316,7 +317,7 @@ func benchmarkQuery(b *testing.B, store *SSD, last int, m *stats.Metric) { return default: - store.Query(ssid, t0, t1, last, nil) + store.Query(ssid, t0, t1, nil, NewMessageNumberLimiter(int64(last))) m.Update(int32(last)) } } diff --git a/internal/provider/storage/storage.go b/internal/provider/storage/storage.go index 9cc46407..6bb19dc2 100644 --- a/internal/provider/storage/storage.go +++ b/internal/provider/storage/storage.go @@ -66,41 +66,64 @@ func window(from, until time.Time) (int64, int64) { // The lookup query to send out to the cluster. type lookupQuery struct { - Ssid message.Ssid // (required) The ssid to match. - From int64 // (required) The beginning of the time window. - UntilTime int64 // Lookup stops when reaches this time. - UntilID message.ID // Lookup stops when reaches this message ID. - Limiter Limiter // The maximum number of elements to return. + Ssid message.Ssid // (required) The ssid to match. + From int64 // (required) The beginning of the time window. + UntilTime int64 // Lookup stops when reaches this time. + UntilID message.ID // Lookup stops when reaches this message ID. + LimitByCount *MessageNumberLimiter + //LimitBySize *MessageSizeLimiter +} + +// newLookupQuery creates a new lookup query +func newLookupQuery(ssid message.Ssid, from, until time.Time, untilID message.ID, limiter Limiter) lookupQuery { + t0, t1 := window(from, until) + query := lookupQuery{ + Ssid: ssid, + From: t0, + UntilTime: t1, + UntilID: untilID, + } + + switch v := limiter.(type) { + case *MessageNumberLimiter: + query.LimitByCount = v + } + return query +} + +func (q *lookupQuery) Limiter() Limiter { + switch { + case q.LimitByCount != nil: + return q.LimitByCount + default: + return &MessageNumberLimiter{} + } } type Limiter interface { Admit(*message.Message) bool + Limit(*message.Frame) } // MessageNumberLimiter provide an Limiter implementation to replace the "limit" // parameter in the Query() function. type MessageNumberLimiter struct { - count int64 - limit int64 + count int64 `binary:"-"` + MsgLimit int64 } -func (n *MessageNumberLimiter) Admit(m *message.Message) bool { - return n.count < n.limit +func (limiter *MessageNumberLimiter) Admit(m *message.Message) bool { + admit := limiter.count < limiter.MsgLimit + limiter.count += 1 + return admit } -func NewMessageNumberLimiter(limit int64) Limiter { - return &MessageNumberLimiter{limit: limit} + +func (limiter *MessageNumberLimiter) Limit(frame *message.Frame) { + frame.Limit(int(limiter.MsgLimit)) } -// newLookupQuery creates a new lookup query -func newLookupQuery(ssid message.Ssid, from, until time.Time, untilID message.ID, limiter Limiter) lookupQuery { - t0, t1 := window(from, until) - return lookupQuery{ - Ssid: ssid, - From: t0, - UntilTime: t1, - UntilID: untilID, - Limiter: limiter, - } +func NewMessageNumberLimiter(limit int64) Limiter { + return &MessageNumberLimiter{MsgLimit: limit} } // configUint32 retrieves an uint32 from the config diff --git a/internal/provider/storage/storage_test.go b/internal/provider/storage/storage_test.go index 5e8f3d66..ba36f3d9 100644 --- a/internal/provider/storage/storage_test.go +++ b/internal/provider/storage/storage_test.go @@ -47,7 +47,7 @@ func TestNoop_Store(t *testing.T) { func TestNoop_Query(t *testing.T) { s := new(Noop) zero := time.Unix(0, 0) - r, err := s.Query(testMessage(1, 2, 3).Ssid(), zero, zero, 10) + r, err := s.Query(testMessage(1, 2, 3).Ssid(), zero, zero, nil, NewMessageNumberLimiter(10)) assert.NoError(t, err) for range r { t.Errorf("Should be empty") @@ -80,7 +80,7 @@ func testOrder(t *testing.T, store Storage) { // Issue a query zero := time.Unix(0, 0) - f, err := store.Query([]uint32{0, 1, 2}, zero, zero, 5) + f, err := store.Query([]uint32{0, 1, 2}, zero, zero, nil, NewMessageNumberLimiter(5)) assert.NoError(t, err) assert.Len(t, f, 5) @@ -103,7 +103,7 @@ func testRetained(t *testing.T, store Storage) { // Issue a query zero := time.Unix(0, 0) - f, err := store.Query([]uint32{0, 1, 2}, zero, zero, 1) + f, err := store.Query([]uint32{0, 1, 2}, zero, zero, nil, NewMessageNumberLimiter(1)) assert.NoError(t, err) assert.Len(t, f, 1) @@ -126,7 +126,7 @@ func testRange(t *testing.T, store Storage) { } // Issue a query - f, err := store.Query([]uint32{0, 1, 2}, time.Unix(t0, 0), time.Unix(t1, 0), 5) + f, err := store.Query([]uint32{0, 1, 2}, time.Unix(t0, 0), time.Unix(t1, 0), nil, NewMessageNumberLimiter(5)) assert.NoError(t, err) assert.Len(t, f, 5) diff --git a/internal/service/history/history.go b/internal/service/history/history.go index f7b21a0c..97466b1e 100644 --- a/internal/service/history/history.go +++ b/internal/service/history/history.go @@ -20,6 +20,7 @@ import ( "github.com/emitter-io/emitter/internal/errors" "github.com/emitter-io/emitter/internal/message" "github.com/emitter-io/emitter/internal/provider/logging" + "github.com/emitter-io/emitter/internal/provider/storage" "github.com/emitter-io/emitter/internal/security" "github.com/emitter-io/emitter/internal/service" ) @@ -80,17 +81,18 @@ func (s *Service) OnRequest(c service.Conn, payload []byte) (service.Response, b } limit := int64(3) - if v, ok := channel.Last(); ok { - limit = v - } - messageLimiter := &limiter{ - maxCount: limit, - } + // if v, ok := channel.Last(); ok { + // limit = v + // } + // messageLimiter := &limiter{ + // maxCount: limit, + // } ssid := message.NewSsid(key.Contract(), channel.Query) t0, t1 := channel.Window() // Get the window - msgs, err := s.store.LimitedQuery(ssid, t0, t1, messageLimiter, request.LastMessageID) + messageLimiter := storage.NewMessageNumberLimiter(limit) + msgs, err := s.store.Query(ssid, t0, t1, request.LastMessageID, messageLimiter) if err != nil { logging.LogError("conn", "query last messages", err) return errors.ErrServerError, false diff --git a/internal/service/history/history_test.go b/internal/service/history/history_test.go index 4918bbc2..e59a7d3c 100644 --- a/internal/service/history/history_test.go +++ b/internal/service/history/history_test.go @@ -15,23 +15,38 @@ package history import ( - "errors" + "encoding/json" "testing" - "time" "github.com/emitter-io/emitter/internal/message" "github.com/emitter-io/emitter/internal/provider/storage" + "github.com/emitter-io/emitter/internal/security" "github.com/emitter-io/emitter/internal/service/fake" "github.com/stretchr/testify/assert" ) func TestHistory(t *testing.T) { + //assert.True(t, true) ssid := message.Ssid{1, 3238259379, 500706888, 1027807523} store := storage.NewInMemory(nil) store.Configure(nil) - trie := message.NewTrie() auth := &fake.Authorizer{ - Contract: 1, + Success: true, + Contract: uint32(1), + ExtraPerm: security.AllowLoad, + } + + request := &Request{ + Key: "key", + Channel: "a/b/c/", + } + + // Prepare the request + b, _ := json.Marshal(request) + if request == nil { + b = []byte("invalid") + } else { + auth.Target = request.Channel } // Create new service @@ -39,48 +54,46 @@ func TestHistory(t *testing.T) { c := &fake.Conn{} // Store a message - for i := 0; i < 10; i++ { + for i := 0; i < 1; i++ { store.Store(&message.Message{ ID: message.NewID(ssid), - Channel: []byte("test/"), + Channel: []byte("a/b/c/?ttl=30"), Payload: []byte("hello"), TTL: 30, }) } - request := - err := s.OnRequest(c, ) - assert.Equal(t, tc.success, err == nil) - assert.Equal(t, tc.expectLoaded, len(c.Outgoing)) - assert.Equal(t, tc.expectCount, trie.Count()) - + // Issue a request + response, ok := s.OnRequest(c, b) + println(response) + assert.Equal(t, true, ok) } // ------------------------------------------------------------------------------------ -// Noop implements Storage contract. -var _ storage.Storage = new(buggyStore) +// // Noop implements Storage contract. +// var _ storage.Storage = new(buggyStore) -// Noop represents a storage which does nothing. -type buggyStore struct{} +// // Noop represents a storage which does nothing. +// type buggyStore struct{} -// Name returns the name of the provider. -func (s *buggyStore) Name() string { - return "noop" -} +// // Name returns the name of the provider. +// func (s *buggyStore) Name() string { +// return "noop" +// } -func (s *buggyStore) Configure(config map[string]interface{}) error { - return errors.New("not working") -} +// func (s *buggyStore) Configure(config map[string]interface{}) error { +// return errors.New("not working") +// } -func (s *buggyStore) Store(m *message.Message) error { - return errors.New("not working") -} +// func (s *buggyStore) Store(m *message.Message) error { +// return errors.New("not working") +// } -func (s *buggyStore) Query(ssid message.Ssid, from, until time.Time, limit int) (message.Frame, error) { - return nil, errors.New("not working") -} +// func (s *buggyStore) Query(ssid message.Ssid, from, until time.Time, limit int) (message.Frame, error) { +// return nil, errors.New("not working") +// } -func (s *buggyStore) Close() error { - return errors.New("not working") -} +// func (s *buggyStore) Close() error { +// return errors.New("not working") +// } diff --git a/internal/service/pubsub/lastwill_test.go b/internal/service/pubsub/lastwill_test.go index 4a4da76e..43d5be09 100644 --- a/internal/service/pubsub/lastwill_test.go +++ b/internal/service/pubsub/lastwill_test.go @@ -130,7 +130,7 @@ func TestPubSub_LastWill(t *testing.T) { // Query the storage { - msgs, err := store.Query(ssid, time.Unix(0, 0), time.Now(), 100) + msgs, err := store.Query(ssid, time.Unix(0, 0), time.Now(), nil, storage.NewMessageNumberLimiter(100)) assert.NoError(t, err) assert.Equal(t, tc.expectStored, len(msgs)) } diff --git a/internal/service/pubsub/publish_test.go b/internal/service/pubsub/publish_test.go index 5709b71f..0b17ade1 100644 --- a/internal/service/pubsub/publish_test.go +++ b/internal/service/pubsub/publish_test.go @@ -122,7 +122,7 @@ func TestPubSub_Publish(t *testing.T) { // Query the storage { - msgs, err := store.Query(ssid, time.Unix(0, 0), time.Now(), 100) + msgs, err := store.Query(ssid, time.Unix(0, 0), time.Now(), nil, storage.NewMessageNumberLimiter(100)) assert.NoError(t, err) assert.Equal(t, tc.expectStored, len(msgs)) } diff --git a/internal/service/pubsub/subscribe.go b/internal/service/pubsub/subscribe.go index 8896e7f5..375e9e90 100644 --- a/internal/service/pubsub/subscribe.go +++ b/internal/service/pubsub/subscribe.go @@ -21,6 +21,7 @@ import ( "github.com/emitter-io/emitter/internal/event" "github.com/emitter-io/emitter/internal/message" "github.com/emitter-io/emitter/internal/provider/logging" + "github.com/emitter-io/emitter/internal/provider/storage" "github.com/emitter-io/emitter/internal/security" "github.com/emitter-io/emitter/internal/service" "github.com/kelindar/binary/nocopy" @@ -85,7 +86,7 @@ func (s *Service) OnSubscribe(c service.Conn, mqttTopic []byte) *errors.Error { // Check if the key has a load permission (also applies for retained) if key.HasPermission(security.AllowLoad) { t0, t1 := channel.Window() // Get the window - msgs, err := s.store.Query(ssid, t0, t1, int(limit)) + msgs, err := s.store.Query(ssid, t0, t1, nil, storage.NewMessageNumberLimiter(limit)) if err != nil { logging.LogError("conn", "query last messages", err) return errors.ErrServerError diff --git a/internal/service/pubsub/subscribe_test.go b/internal/service/pubsub/subscribe_test.go index 552fb101..2e3d0240 100644 --- a/internal/service/pubsub/subscribe_test.go +++ b/internal/service/pubsub/subscribe_test.go @@ -177,7 +177,7 @@ func (s *buggyStore) Store(m *message.Message) error { return errors.New("not working") } -func (s *buggyStore) Query(ssid message.Ssid, from, until time.Time, limit int) (message.Frame, error) { +func (s *buggyStore) Query(ssid message.Ssid, from, untilTime time.Time, untilID message.ID, limiter storage.Limiter) (message.Frame, error) { return nil, errors.New("not working") }