Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Adds the new history API #414

Merged
merged 2 commits into from
Jan 28, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions internal/broker/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ import (
"github.com/emitter-io/emitter/internal/security"
"github.com/emitter-io/emitter/internal/security/license"
"github.com/emitter-io/emitter/internal/service/cluster"
"github.com/emitter-io/emitter/internal/service/history"
"github.com/emitter-io/emitter/internal/service/keyban"
"github.com/emitter-io/emitter/internal/service/keygen"
"github.com/emitter-io/emitter/internal/service/link"
Expand Down Expand Up @@ -189,6 +190,7 @@ func NewService(ctx context.Context, cfg *config.Config) (s *Service, err error)
s.pubsub.Handle("keyban", keyban.New(s, s.keygen, s.cluster).OnRequest)
s.pubsub.Handle("link", link.New(s, s.pubsub).OnRequest)
s.pubsub.Handle("me", me.New().OnRequest)
s.pubsub.Handle("history", history.New(s, s.storage).OnRequest)

// Addresses and things
logging.LogTarget("service", "configured node name", nodeName)
Expand Down
8 changes: 4 additions & 4 deletions internal/provider/storage/memory_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,7 @@ func TestInMemory_Query(t *testing.T) {
})
}

out, err := s.Query(tc.query, zero, zero, tc.limit)
out, err := s.Query(tc.query, zero, zero, nil, tc.limit)
assert.NoError(t, err)

count := 0
Expand Down Expand Up @@ -152,7 +152,7 @@ func TestInMemory_lookup(t *testing.T) {
}

for _, tc := range tests {
matches := s.lookup(newLookupQuery(tc.query, zero, zero, tc.limit))
matches := s.lookup(newLookupQuery(tc.query, zero, zero, nil, tc.limit))
assert.Equal(t, tc.count, len(matches))
}
}
Expand All @@ -172,13 +172,13 @@ func TestInMemory_OnSurvey(t *testing.T) {
{name: "ssdstore"},
{
name: "ssdstore",
query: newLookupQuery(message.Ssid{0, 1}, zero, zero, 1),
query: newLookupQuery(message.Ssid{0, 1}, zero, zero, nil, 1),
expectOk: true,
expectCount: 1,
},
{
name: "ssdstore",
query: newLookupQuery(message.Ssid{0, 1}, zero, zero, 10),
query: newLookupQuery(message.Ssid{0, 1}, zero, zero, nil, 10),
expectOk: true,
expectCount: 2,
},
Expand Down
18 changes: 14 additions & 4 deletions internal/provider/storage/ssd.go
Original file line number Diff line number Diff line change
Expand Up @@ -125,10 +125,10 @@ func encodeFrame(msgs message.Frame) []*badger.Entry {
// Query performs a query and attempts to fetch last n messages where
// n is specified by limit argument. From and until times can also be specified
// for time-series retrieval.
func (s *SSD) Query(ssid message.Ssid, from, until time.Time, limit int) (message.Frame, error) {
func (s *SSD) Query(ssid message.Ssid, from, until time.Time, startFromID message.ID, limit int) (message.Frame, error) {

// Construct a query and lookup locally first
query := newLookupQuery(ssid, from, until, limit)
query := newLookupQuery(ssid, from, until, startFromID, limit)
match := s.lookup(query)

// Issue the message survey to the cluster
Expand Down Expand Up @@ -184,11 +184,21 @@ func (s *SSD) lookup(q lookupQuery) (matches message.Frame) {

// Since we're starting backwards, seek to the 'until' position first and then
// we'll iterate forward but have reverse time ('until' -> 'from')
prefix := message.NewPrefix(q.Ssid, q.Until)
var prefix message.ID
if len(q.StartFromID) == 0 {
prefix = message.NewPrefix(q.Ssid, q.Until)
it.Seek(prefix)
} else {
it.Seek(q.StartFromID)
if !it.Valid() {
return nil
}
it.Next()
}

matchesSize := 0
// Seek the prefix and check the key so we can quickly exit the iteration.
for it.Seek(prefix); it.Valid() &&
for ; it.Valid() &&
message.ID(it.Item().Key()).HasPrefix(q.Ssid, q.From) &&
len(matches) < q.Limit; it.Next() {
if !message.ID(it.Item().Key()).Match(q.Ssid, q.From, q.Until) {
Expand Down
16 changes: 11 additions & 5 deletions internal/provider/storage/ssd_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ func TestSSD_Query(t *testing.T) {
assert.NoError(t, err)

zero := time.Unix(0, 0)
f, err := store.Query([]uint32{0, 3, 2, 6}, zero, zero, 5)
f, err := store.Query([]uint32{0, 3, 2, 6}, zero, zero, nil, 5)
assert.NoError(t, err)
assert.Len(t, f, 1)
})
Expand All @@ -83,6 +83,12 @@ func TestSSD_QueryOrdered(t *testing.T) {
})
}

func TestSSD_QueryStartFromID(t *testing.T) {
runSSDTest(func(store *SSD) {
testStartFromID(t, store)
})
}

func TestSSD_MaxResponseSizeReached(t *testing.T) {
runSSDTest(func(store *SSD) {
testMaxResponseSizeReached(t, store)
Expand Down Expand Up @@ -127,7 +133,7 @@ func TestSSD_QuerySurveyed(t *testing.T) {
})
}

out, err := s.Query(tc.query, zero, zero, tc.limit)
out, err := s.Query(tc.query, zero, zero, nil, tc.limit)
assert.NoError(t, err)
count := 0
for range out {
Expand All @@ -152,13 +158,13 @@ func TestSSD_OnSurvey(t *testing.T) {
{name: "ssdstore"},
{
name: "ssdstore",
query: newLookupQuery(message.Ssid{0, 1}, zero, zero, 1),
query: newLookupQuery(message.Ssid{0, 1}, zero, zero, nil, 1),
expectOk: true,
expectCount: 1,
},
{
name: "ssdstore",
query: newLookupQuery(message.Ssid{0, 1}, zero, zero, 10),
query: newLookupQuery(message.Ssid{0, 1}, zero, zero, nil, 10),
expectOk: true,
expectCount: 2,
},
Expand Down Expand Up @@ -322,7 +328,7 @@ func benchmarkQuery(b *testing.B, store *SSD, last int, m *stats.Metric) {
return

default:
store.Query(ssid, t0, t1, last)
store.Query(ssid, t0, t1, nil, last)
m.Update(int32(last))
}
}
Expand Down
24 changes: 13 additions & 11 deletions internal/provider/storage/storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ type Storage interface {
// Query performs a query and attempts to fetch last n messages where
// n is specified by limit argument. From and until times can also be specified
// for time-series retrieval.
Query(ssid message.Ssid, from, until time.Time, limit int) (message.Frame, error)
Query(ssid message.Ssid, from, until time.Time, startFromID message.ID, limit int) (message.Frame, error)
}

// ------------------------------------------------------------------------------------
Expand All @@ -65,20 +65,22 @@ func window(from, until time.Time) (int64, int64) {

// The lookup query to send out to the cluster.
type lookupQuery struct {
Ssid message.Ssid // The ssid to match.
From int64 // The beginning of the time window.
Until int64 // The end of the time window.
Limit int // The maximum number of elements to return.
Ssid message.Ssid // The ssid to match.
From int64 // The beginning of the time window.
Until int64 // The end of the time window.
StartFromID message.ID // The ID to start from when retrieving message, used for pagination.
Limit int // The maximum number of elements to return.
}

// newLookupQuery creates a new lookup query
func newLookupQuery(ssid message.Ssid, from, until time.Time, limit int) lookupQuery {
func newLookupQuery(ssid message.Ssid, from, until time.Time, startFromID message.ID, limit int) lookupQuery {
t0, t1 := window(from, until)
return lookupQuery{
Ssid: ssid,
From: t0,
Until: t1,
Limit: limit,
Ssid: ssid,
From: t0,
Until: t1,
StartFromID: startFromID,
Limit: limit,
}
}

Expand Down Expand Up @@ -128,7 +130,7 @@ func (s *Noop) Store(m *message.Message) error {
// Query performs a query and attempts to fetch last n messages where
// n is specified by limit argument. From and until times can also be specified
// for time-series retrieval.
func (s *Noop) Query(ssid message.Ssid, from, until time.Time, limit int) (message.Frame, error) {
func (s *Noop) Query(ssid message.Ssid, from, until time.Time, startFromID message.ID, limit int) (message.Frame, error) {
return nil, nil
}

Expand Down
41 changes: 36 additions & 5 deletions internal/provider/storage/storage_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ func TestNoop_Store(t *testing.T) {
func TestNoop_Query(t *testing.T) {
s := new(Noop)
zero := time.Unix(0, 0)
r, err := s.Query(testMessage(1, 2, 3).Ssid(), zero, zero, 10)
r, err := s.Query(testMessage(1, 2, 3).Ssid(), zero, zero, nil, 10)
assert.NoError(t, err)
for range r {
t.Errorf("Should be empty")
Expand Down Expand Up @@ -81,7 +81,7 @@ func testOrder(t *testing.T, store Storage) {

// Issue a query
zero := time.Unix(0, 0)
f, err := store.Query([]uint32{0, 1, 2}, zero, zero, 5)
f, err := store.Query([]uint32{0, 1, 2}, zero, zero, nil, 5)
assert.NoError(t, err)

assert.Len(t, f, 5)
Expand All @@ -104,7 +104,7 @@ func testRetained(t *testing.T, store Storage) {

// Issue a query
zero := time.Unix(0, 0)
f, err := store.Query([]uint32{0, 1, 2}, zero, zero, 1)
f, err := store.Query([]uint32{0, 1, 2}, zero, zero, nil, 1)
assert.NoError(t, err)

assert.Len(t, f, 1)
Expand All @@ -127,7 +127,7 @@ func testRange(t *testing.T, store Storage) {
}

// Issue a query
f, err := store.Query([]uint32{0, 1, 2}, time.Unix(t0, 0), time.Unix(t1, 0), 5)
f, err := store.Query([]uint32{0, 1, 2}, time.Unix(t0, 0), time.Unix(t1, 0), nil, 5)
assert.NoError(t, err)

assert.Len(t, f, 5)
Expand All @@ -153,6 +153,37 @@ func Test_configUint32(t *testing.T) {
assert.Equal(t, uint32(99999999), v)
}

// Test the StartFromID option for pagination purposes.
func testStartFromID(t *testing.T, store Storage) {
var fourth message.ID
for i := int64(0); i < 10; i++ {
payload := make([]byte, 1)
payload[0] = byte(i)
msg := message.New(message.Ssid{0, 1, 2}, []byte("a/b/c/"), payload)
msg.TTL = message.RetainedTTL
msg.ID.SetTime(msg.ID.Time() + (i * 10000))
assert.NoError(t, store.Store(msg))
if i == 4 {
fourth = msg.ID
}
}

// Issue a query, starting at the fourth message ID and going back 2.
zero := time.Unix(0, 0)
f, err := store.Query([]uint32{0, 1, 2}, zero, zero, fourth, 2)
assert.NoError(t, err)

assert.Len(t, f, 2)
assert.Equal(t, 2, int(f[0].Payload[0]))
assert.Equal(t, 3, int(f[1].Payload[0]))

// Issue a query, starting at the first message ID and going back 2.
f, err = store.Query([]uint32{0, 1, 2}, zero, zero, f[0].ID, 2)
assert.NoError(t, err)
assert.Equal(t, 0, int(f[0].Payload[0]))
assert.Equal(t, 1, int(f[1].Payload[0]))
}

func testMaxResponseSizeReached(t *testing.T, store Storage) {
for i := int64(0); i < 10; i++ {
payload := make([]byte, mqtt.MaxMessageSize/5)
Expand All @@ -163,7 +194,7 @@ func testMaxResponseSizeReached(t *testing.T, store Storage) {
}

zero := time.Unix(0, 0)
f, err := store.Query([]uint32{0, 1, 2}, zero, zero, 10)
f, err := store.Query([]uint32{0, 1, 2}, zero, zero, nil, 10)
assert.NoError(t, err)

assert.Len(t, f, 4)
Expand Down
95 changes: 95 additions & 0 deletions internal/service/history/history.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
/**********************************************************************************
* Copyright (c) 2009-2020 Misakai Ltd.
* This program is free software: you can redistribute it and/or modify it under the
* terms of the GNU Affero General Public License as published by the Free Software
* Foundation, either version 3 of the License, or(at your option) any later version.
*
* This program is distributed in the hope that it will be useful, but WITHOUT ANY
* WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A
* PARTICULAR PURPOSE. See the GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License along
* with this program. If not, see<http://www.gnu.org/licenses/>.
************************************************************************************/

package history

import (
"encoding/json"

"github.com/emitter-io/emitter/internal/errors"
"github.com/emitter-io/emitter/internal/message"
"github.com/emitter-io/emitter/internal/provider/logging"
"github.com/emitter-io/emitter/internal/security"
"github.com/emitter-io/emitter/internal/service"
)

// Request represents a historical messages request.
type Request struct {
Key string `json:"key"` // The channel key for this request.
Channel string `json:"channel"` // The target channel for this request.
StartFromID message.ID `json:"startFromID,omitempty"`
}

type Message struct {
ID message.ID `json:"id"`
Topic string `json:"topic"` // The channel of the message
Payload string `json:"payload"` // The payload of the message
}
type Response struct {
Request uint16 `json:"req,omitempty"` // The corresponding request ID.
Messages []Message `json:"messages"` // The history of messages.
}

// ForRequest sets the request ID in the response for matching
func (r *Response) ForRequest(id uint16) {
r.Request = id
}

// OnRequest handles a request of historical messages.
func (s *Service) OnRequest(c service.Conn, payload []byte) (service.Response, bool) {
var request Request
if err := json.Unmarshal(payload, &request); err != nil {
return errors.ErrBadRequest, false
}

channel := security.ParseChannel([]byte(request.Channel))
if channel.ChannelType == security.ChannelInvalid {
return errors.ErrBadRequest, false
}

// Check the authorization and permissions
_, key, allowed := s.auth.Authorize(channel, security.AllowLoad)
if !allowed {
return errors.ErrUnauthorized, false
}

// Use limit = 1 if not specified, otherwise use the limit option. The limit now
// defaults to one as per MQTT spec we always need to send retained messages.
limit := int64(1)
if v, ok := channel.Last(); ok {
limit = v
}

ssid := message.NewSsid(key.Contract(), channel.Query)
t0, t1 := channel.Window() // Get the window

msgs, err := s.store.Query(ssid, t0, t1, request.StartFromID, int(limit))
if err != nil {
logging.LogError("conn", "query last messages", err)
return errors.ErrServerError, false
}

resp := &Response{
Messages: make([]Message, 0, len(msgs)),
}
for _, m := range msgs {
msg := m
resp.Messages = append(resp.Messages, Message{
ID: msg.ID,
Topic: string(msg.Channel), // The channel for this message.
Payload: string(msg.Payload), // The payload for this message.
})
}
return resp, true
}
Loading
Loading