Skip to content

Commit

Permalink
Adds the new history API
Browse files Browse the repository at this point in the history
The history api allows to retrieve any number of messages stored in a channel.
As the number of messages that can be returned in a single response is limited
by the size of a response (as per #413), a system of pagination is required.
This system is implemented with the introduction of the StartFromID query
parameter. An client that already received a response with a certain number of
messages can send a new request to the API with the StartFromID set to the ID
of the oldest message it received (first in the array). The storage query will
then begin to retrieve messages starting at the ID preceding the ID provided in
the query, to generate the next page of messages.
  • Loading branch information
Florimond committed Nov 22, 2023
1 parent 92c4c9e commit a11ef40
Show file tree
Hide file tree
Showing 13 changed files with 305 additions and 33 deletions.
2 changes: 2 additions & 0 deletions internal/broker/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ import (
"github.com/emitter-io/emitter/internal/security"
"github.com/emitter-io/emitter/internal/security/license"
"github.com/emitter-io/emitter/internal/service/cluster"
"github.com/emitter-io/emitter/internal/service/history"
"github.com/emitter-io/emitter/internal/service/keyban"
"github.com/emitter-io/emitter/internal/service/keygen"
"github.com/emitter-io/emitter/internal/service/link"
Expand Down Expand Up @@ -185,6 +186,7 @@ func NewService(ctx context.Context, cfg *config.Config) (s *Service, err error)
s.pubsub.Handle("keyban", keyban.New(s, s.keygen, s.cluster).OnRequest)
s.pubsub.Handle("link", link.New(s, s.pubsub).OnRequest)
s.pubsub.Handle("me", me.New().OnRequest)
s.pubsub.Handle("history", history.New(s, s.storage).OnRequest)

// Addresses and things
logging.LogTarget("service", "configured node name", nodeName)
Expand Down
8 changes: 4 additions & 4 deletions internal/provider/storage/memory_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,7 @@ func TestInMemory_Query(t *testing.T) {
})
}

out, err := s.Query(tc.query, zero, zero, tc.limit)
out, err := s.Query(tc.query, zero, zero, nil, tc.limit)
assert.NoError(t, err)

count := 0
Expand Down Expand Up @@ -152,7 +152,7 @@ func TestInMemory_lookup(t *testing.T) {
}

for _, tc := range tests {
matches := s.lookup(newLookupQuery(tc.query, zero, zero, tc.limit))
matches := s.lookup(newLookupQuery(tc.query, zero, zero, nil, tc.limit))
assert.Equal(t, tc.count, len(matches))
}
}
Expand All @@ -172,13 +172,13 @@ func TestInMemory_OnSurvey(t *testing.T) {
{name: "ssdstore"},
{
name: "ssdstore",
query: newLookupQuery(message.Ssid{0, 1}, zero, zero, 1),
query: newLookupQuery(message.Ssid{0, 1}, zero, zero, nil, 1),
expectOk: true,
expectCount: 1,
},
{
name: "ssdstore",
query: newLookupQuery(message.Ssid{0, 1}, zero, zero, 10),
query: newLookupQuery(message.Ssid{0, 1}, zero, zero, nil, 10),
expectOk: true,
expectCount: 2,
},
Expand Down
18 changes: 14 additions & 4 deletions internal/provider/storage/ssd.go
Original file line number Diff line number Diff line change
Expand Up @@ -125,10 +125,10 @@ func encodeFrame(msgs message.Frame) []*badger.Entry {
// Query performs a query and attempts to fetch last n messages where
// n is specified by limit argument. From and until times can also be specified
// for time-series retrieval.
func (s *SSD) Query(ssid message.Ssid, from, until time.Time, limit int) (message.Frame, error) {
func (s *SSD) Query(ssid message.Ssid, from, until time.Time, startFromID message.ID, limit int) (message.Frame, error) {

// Construct a query and lookup locally first
query := newLookupQuery(ssid, from, until, limit)
query := newLookupQuery(ssid, from, until, startFromID, limit)
match := s.lookup(query)

// Issue the message survey to the cluster
Expand Down Expand Up @@ -184,11 +184,21 @@ func (s *SSD) lookup(q lookupQuery) (matches message.Frame) {

// Since we're starting backwards, seek to the 'until' position first and then
// we'll iterate forward but have reverse time ('until' -> 'from')
prefix := message.NewPrefix(q.Ssid, q.Until)
var prefix message.ID
if len(q.StartFromID) == 0 {
prefix = message.NewPrefix(q.Ssid, q.Until)
it.Seek(prefix)
} else {
it.Seek(q.StartFromID)
if !it.Valid() {
return nil
}
it.Next()
}

matchesSize := 0
// Seek the prefix and check the key so we can quickly exit the iteration.
for it.Seek(prefix); it.Valid() &&
for ; it.Valid() &&
message.ID(it.Item().Key()).HasPrefix(q.Ssid, q.From) &&
len(matches) < q.Limit; it.Next() {
if !message.ID(it.Item().Key()).Match(q.Ssid, q.From, q.Until) {
Expand Down
16 changes: 11 additions & 5 deletions internal/provider/storage/ssd_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ func TestSSD_Query(t *testing.T) {
assert.NoError(t, err)

zero := time.Unix(0, 0)
f, err := store.Query([]uint32{0, 3, 2, 6}, zero, zero, 5)
f, err := store.Query([]uint32{0, 3, 2, 6}, zero, zero, nil, 5)
assert.NoError(t, err)
assert.Len(t, f, 1)
})
Expand All @@ -83,6 +83,12 @@ func TestSSD_QueryOrdered(t *testing.T) {
})
}

func TestSSD_QueryStartFromID(t *testing.T) {
runSSDTest(func(store *SSD) {
testStartFromID(t, store)
})
}

func TestSSD_MaxResponseSizeReached(t *testing.T) {
runSSDTest(func(store *SSD) {
testMaxResponseSizeReached(t, store)
Expand Down Expand Up @@ -127,7 +133,7 @@ func TestSSD_QuerySurveyed(t *testing.T) {
})
}

out, err := s.Query(tc.query, zero, zero, tc.limit)
out, err := s.Query(tc.query, zero, zero, nil, tc.limit)
assert.NoError(t, err)
count := 0
for range out {
Expand All @@ -152,13 +158,13 @@ func TestSSD_OnSurvey(t *testing.T) {
{name: "ssdstore"},
{
name: "ssdstore",
query: newLookupQuery(message.Ssid{0, 1}, zero, zero, 1),
query: newLookupQuery(message.Ssid{0, 1}, zero, zero, nil, 1),
expectOk: true,
expectCount: 1,
},
{
name: "ssdstore",
query: newLookupQuery(message.Ssid{0, 1}, zero, zero, 10),
query: newLookupQuery(message.Ssid{0, 1}, zero, zero, nil, 10),
expectOk: true,
expectCount: 2,
},
Expand Down Expand Up @@ -322,7 +328,7 @@ func benchmarkQuery(b *testing.B, store *SSD, last int, m *stats.Metric) {
return

default:
store.Query(ssid, t0, t1, last)
store.Query(ssid, t0, t1, nil, last)
m.Update(int32(last))
}
}
Expand Down
24 changes: 13 additions & 11 deletions internal/provider/storage/storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ type Storage interface {
// Query performs a query and attempts to fetch last n messages where
// n is specified by limit argument. From and until times can also be specified
// for time-series retrieval.
Query(ssid message.Ssid, from, until time.Time, limit int) (message.Frame, error)
Query(ssid message.Ssid, from, until time.Time, startFromID message.ID, limit int) (message.Frame, error)
}

// ------------------------------------------------------------------------------------
Expand All @@ -65,20 +65,22 @@ func window(from, until time.Time) (int64, int64) {

// The lookup query to send out to the cluster.
type lookupQuery struct {
Ssid message.Ssid // The ssid to match.
From int64 // The beginning of the time window.
Until int64 // The end of the time window.
Limit int // The maximum number of elements to return.
Ssid message.Ssid // The ssid to match.
From int64 // The beginning of the time window.
Until int64 // The end of the time window.
StartFromID message.ID // The ID to start from when retrieving message, used for pagination.
Limit int // The maximum number of elements to return.
}

// newLookupQuery creates a new lookup query
func newLookupQuery(ssid message.Ssid, from, until time.Time, limit int) lookupQuery {
func newLookupQuery(ssid message.Ssid, from, until time.Time, startFromID message.ID, limit int) lookupQuery {
t0, t1 := window(from, until)
return lookupQuery{
Ssid: ssid,
From: t0,
Until: t1,
Limit: limit,
Ssid: ssid,
From: t0,
Until: t1,
StartFromID: startFromID,
Limit: limit,
}
}

Expand Down Expand Up @@ -128,7 +130,7 @@ func (s *Noop) Store(m *message.Message) error {
// Query performs a query and attempts to fetch last n messages where
// n is specified by limit argument. From and until times can also be specified
// for time-series retrieval.
func (s *Noop) Query(ssid message.Ssid, from, until time.Time, limit int) (message.Frame, error) {
func (s *Noop) Query(ssid message.Ssid, from, until time.Time, startFromID message.ID, limit int) (message.Frame, error) {
return nil, nil
}

Expand Down
41 changes: 36 additions & 5 deletions internal/provider/storage/storage_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ func TestNoop_Store(t *testing.T) {
func TestNoop_Query(t *testing.T) {
s := new(Noop)
zero := time.Unix(0, 0)
r, err := s.Query(testMessage(1, 2, 3).Ssid(), zero, zero, 10)
r, err := s.Query(testMessage(1, 2, 3).Ssid(), zero, zero, nil, 10)
assert.NoError(t, err)
for range r {
t.Errorf("Should be empty")
Expand Down Expand Up @@ -81,7 +81,7 @@ func testOrder(t *testing.T, store Storage) {

// Issue a query
zero := time.Unix(0, 0)
f, err := store.Query([]uint32{0, 1, 2}, zero, zero, 5)
f, err := store.Query([]uint32{0, 1, 2}, zero, zero, nil, 5)
assert.NoError(t, err)

assert.Len(t, f, 5)
Expand All @@ -104,7 +104,7 @@ func testRetained(t *testing.T, store Storage) {

// Issue a query
zero := time.Unix(0, 0)
f, err := store.Query([]uint32{0, 1, 2}, zero, zero, 1)
f, err := store.Query([]uint32{0, 1, 2}, zero, zero, nil, 1)
assert.NoError(t, err)

assert.Len(t, f, 1)
Expand All @@ -127,7 +127,7 @@ func testRange(t *testing.T, store Storage) {
}

// Issue a query
f, err := store.Query([]uint32{0, 1, 2}, time.Unix(t0, 0), time.Unix(t1, 0), 5)
f, err := store.Query([]uint32{0, 1, 2}, time.Unix(t0, 0), time.Unix(t1, 0), nil, 5)
assert.NoError(t, err)

assert.Len(t, f, 5)
Expand All @@ -153,6 +153,37 @@ func Test_configUint32(t *testing.T) {
assert.Equal(t, uint32(99999999), v)
}

// Test the StartFromID option for pagination purposes.
func testStartFromID(t *testing.T, store Storage) {
var fourth message.ID
for i := int64(0); i < 10; i++ {
payload := make([]byte, 1)
payload[0] = byte(i)
msg := message.New(message.Ssid{0, 1, 2}, []byte("a/b/c/"), payload)
msg.TTL = message.RetainedTTL
msg.ID.SetTime(msg.ID.Time() + (i * 10000))
assert.NoError(t, store.Store(msg))
if i == 4 {
fourth = msg.ID
}
}

// Issue a query, starting at the fourth message ID and going back 2.
zero := time.Unix(0, 0)
f, err := store.Query([]uint32{0, 1, 2}, zero, zero, fourth, 2)
assert.NoError(t, err)

assert.Len(t, f, 2)
assert.Equal(t, 2, int(f[0].Payload[0]))
assert.Equal(t, 3, int(f[1].Payload[0]))

// Issue a query, starting at the first message ID and going back 2.
f, err = store.Query([]uint32{0, 1, 2}, zero, zero, f[0].ID, 2)
assert.NoError(t, err)
assert.Equal(t, 0, int(f[0].Payload[0]))
assert.Equal(t, 1, int(f[1].Payload[0]))
}

func testMaxResponseSizeReached(t *testing.T, store Storage) {
for i := int64(0); i < 10; i++ {
payload := make([]byte, mqtt.MaxMessageSize/5)
Expand All @@ -163,7 +194,7 @@ func testMaxResponseSizeReached(t *testing.T, store Storage) {
}

zero := time.Unix(0, 0)
f, err := store.Query([]uint32{0, 1, 2}, zero, zero, 10)
f, err := store.Query([]uint32{0, 1, 2}, zero, zero, nil, 10)
assert.NoError(t, err)

assert.Len(t, f, 4)
Expand Down
95 changes: 95 additions & 0 deletions internal/service/history/history.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
/**********************************************************************************
* Copyright (c) 2009-2020 Misakai Ltd.
* This program is free software: you can redistribute it and/or modify it under the
* terms of the GNU Affero General Public License as published by the Free Software
* Foundation, either version 3 of the License, or(at your option) any later version.
*
* This program is distributed in the hope that it will be useful, but WITHOUT ANY
* WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A
* PARTICULAR PURPOSE. See the GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License along
* with this program. If not, see<http://www.gnu.org/licenses/>.
************************************************************************************/

package history

import (
"encoding/json"

"github.com/emitter-io/emitter/internal/errors"
"github.com/emitter-io/emitter/internal/message"
"github.com/emitter-io/emitter/internal/provider/logging"
"github.com/emitter-io/emitter/internal/security"
"github.com/emitter-io/emitter/internal/service"
)

// Request represents a historical messages request.
type Request struct {
Key string `json:"key"` // The channel key for this request.
Channel string `json:"channel"` // The target channel for this request.
StartFromID message.ID `json:"startFromID,omitempty"`
}

type Message struct {
ID message.ID `json:"id"`
Topic string `json:"topic"` // The channel of the message
Payload string `json:"payload"` // The payload of the message
}
type Response struct {
Request uint16 `json:"req,omitempty"` // The corresponding request ID.
Messages []Message `json:"messages"` // The history of messages.
}

// ForRequest sets the request ID in the response for matching
func (r *Response) ForRequest(id uint16) {
r.Request = id
}

// OnRequest handles a request of historical messages.
func (s *Service) OnRequest(c service.Conn, payload []byte) (service.Response, bool) {
var request Request
if err := json.Unmarshal(payload, &request); err != nil {
return errors.ErrBadRequest, false
}

channel := security.ParseChannel([]byte(request.Channel))
if channel.ChannelType == security.ChannelInvalid {
return errors.ErrBadRequest, false
}

// Check the authorization and permissions
_, key, allowed := s.auth.Authorize(channel, security.AllowLoad)
if !allowed {
return errors.ErrUnauthorized, false
}

// Use limit = 1 if not specified, otherwise use the limit option. The limit now
// defaults to one as per MQTT spec we always need to send retained messages.
limit := int64(1)
if v, ok := channel.Last(); ok {
limit = v
}

ssid := message.NewSsid(key.Contract(), channel.Query)
t0, t1 := channel.Window() // Get the window

msgs, err := s.store.Query(ssid, t0, t1, request.StartFromID, int(limit))
if err != nil {
logging.LogError("conn", "query last messages", err)
return errors.ErrServerError, false
}

resp := &Response{
Messages: make([]Message, 0, len(msgs)),
}
for _, m := range msgs {
msg := m
resp.Messages = append(resp.Messages, Message{
ID: msg.ID,
Topic: string(msg.Channel), // The channel for this message.
Payload: string(msg.Payload), // The payload for this message.
})
}
return resp, true
}
Loading

0 comments on commit a11ef40

Please sign in to comment.