Skip to content

Commit

Permalink
feed option funcs
Browse files Browse the repository at this point in the history
  • Loading branch information
cynicaljoy committed Nov 1, 2024
1 parent 86c451e commit 38f5087
Show file tree
Hide file tree
Showing 7 changed files with 52 additions and 62 deletions.
18 changes: 4 additions & 14 deletions client.go
Original file line number Diff line number Diff line change
Expand Up @@ -428,23 +428,13 @@ func (c *Client) setHeader(key, val string) {
c.headers[key] = val
}

// FeedArgs optional arguments for [fauna.Client.Feed]
type FeedArgs struct {
// PageSize number of events to return per page
PageSize *int
// StartTs incompatible with Cursor
StartTs *time.Time
// Cursor incompatible with StartTs
Cursor *string
}

// Feed opens an event feed from the event source
func (c *Client) Feed(stream EventSource, feedArgs *FeedArgs) (*EventFeed, error) {
return newEventFeed(c, stream, feedArgs)
func (c *Client) Feed(stream EventSource, opts ...FeedOptFn) (*EventFeed, error) {
return newEventFeed(c, stream, opts...)
}

// FeedFromQuery opens an event feed from a query
func (c *Client) FeedFromQuery(query *Query, feedArgs *FeedArgs) (*EventFeed, error) {
func (c *Client) FeedFromQuery(query *Query, opts ...FeedOptFn) (*EventFeed, error) {
res, err := c.Query(query)
if err != nil {
return nil, err
Expand All @@ -455,5 +445,5 @@ func (c *Client) FeedFromQuery(query *Query, feedArgs *FeedArgs) (*EventFeed, er
return nil, fmt.Errorf("query should return a fauna.EventSource but got %T", res.Data)
}

return newEventFeed(c, eventSource, feedArgs)
return newEventFeed(c, eventSource, opts...)
}
15 changes: 15 additions & 0 deletions config.go
Original file line number Diff line number Diff line change
Expand Up @@ -167,3 +167,18 @@ func argsStringFromMap(input map[string]string, currentArgs ...string) string {

return strings.ReplaceAll(params.Encode(), "&", ",")
}

// FeedOptFn function to set options on the [Client.NewEventFeed]
type FeedOptFn func(req *feedRequest)

func EventFeedCursor(cursor string) FeedOptFn {
return func(req *feedRequest) { req.Cursor = cursor }
}

func EventFeedStartTime(ts int64) FeedOptFn {
return func(req *feedRequest) { req.StartTS = ts }
}

func EventFeedPageSize(ts int) FeedOptFn {
return func(req *feedRequest) { req.PageSize = ts }
}
47 changes: 18 additions & 29 deletions event_feed.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package fauna

import (
"encoding/json"
"fmt"
)

// EventFeed represents an event feed subscription.
Expand All @@ -13,46 +12,36 @@ type EventFeed struct {

decoder *json.Decoder

lastCursor *string
pageSize *int
startTs *int64
opts []FeedOptFn
lastCursor string
}

func newEventFeed(client *Client, source EventSource, args *FeedArgs) (*EventFeed, error) {
func newEventFeed(client *Client, source EventSource, opts ...FeedOptFn) (*EventFeed, error) {
feed := &EventFeed{
client: client,
source: source,
}

if args != nil {
if args.StartTs != nil && args.Cursor != nil {
return nil, fmt.Errorf("StartTs and Cursor cannot be used simultaneously")
}
if args.Cursor != nil {
feed.lastCursor = args.Cursor
}

if args.StartTs != nil {
unixTime := args.StartTs.UnixMicro()
feed.startTs = &unixTime
}

feed.pageSize = args.PageSize
opts: opts,
}

return feed, nil
}

func (ef *EventFeed) open() error {
func (ef *EventFeed) open(opts ...FeedOptFn) error {
req := feedRequest{
apiRequest: apiRequest{
ef.client.ctx,
ef.client.headers,
},
Source: ef.source,
Cursor: ef.lastCursor,
PageSize: ef.pageSize,
StartTS: ef.startTs,
Source: ef.source,
Cursor: ef.lastCursor,
}

if (opts != nil) && (len(opts) > 0) {
ef.opts = append(ef.opts, opts...)
}

for _, optFn := range ef.opts {
optFn(&req)
}

byteStream, err := req.do(ef.client)
Expand All @@ -74,16 +63,16 @@ type FeedPage struct {
}

// Next retrieves the next FeedPage from the EventFeed
func (ef *EventFeed) Next(page *FeedPage) error {
if err := ef.open(); err != nil {
func (ef *EventFeed) Next(page *FeedPage, opts ...FeedOptFn) error {
if err := ef.open(opts...); err != nil {
return err
}

if err := ef.decoder.Decode(&page); err != nil {
return err
}

ef.lastCursor = &page.Cursor
ef.lastCursor = page.Cursor

return nil
}
16 changes: 6 additions & 10 deletions event_feed_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ func TestEventFeed(t *testing.T) {
query, queryErr := fauna.FQL(`42`, nil)
require.NoError(t, queryErr)

_, feedErr := client.FeedFromQuery(query, nil)
_, feedErr := client.FeedFromQuery(query)
require.ErrorContains(t, feedErr, "query should return a fauna.EventSource but got int")
})
})
Expand All @@ -32,7 +32,7 @@ func TestEventFeed(t *testing.T) {
query, queryErr := fauna.FQL(`EventFeedTest.all().eventSource()`, nil)
require.NoError(t, queryErr, "failed to create a query for EventSource")

feed, feedErr := client.FeedFromQuery(query, nil)
feed, feedErr := client.FeedFromQuery(query)
require.NoError(t, feedErr, "failed to init events feed")

var (
Expand All @@ -58,7 +58,7 @@ func TestEventFeed(t *testing.T) {
t.Run("get events from an EventSource", func(t *testing.T) {
eventSource := getEventSource(t, client)

feed, feedErr := client.Feed(eventSource, nil)
feed, feedErr := client.Feed(eventSource)
require.NoError(t, feedErr, "failed to init events feed")

var (
Expand All @@ -84,7 +84,7 @@ func TestEventFeed(t *testing.T) {
eventSource := getEventSource(t, client)
require.NotNil(t, eventSource, "failed to get an EventSource")

feed, feedErr := client.Feed(eventSource, nil)
feed, feedErr := client.Feed(eventSource)
require.NoError(t, feedErr, "failed to init events feed")

var page fauna.FeedPage
Expand All @@ -96,9 +96,7 @@ func TestEventFeed(t *testing.T) {
require.NotNil(t, eventSource, "failed to get an EventSource")

tenMinutesAgo := time.Now().Add(-10 * time.Minute)
feed, feedErr = client.Feed(eventSource, &fauna.FeedArgs{
StartTs: &tenMinutesAgo,
})
feed, feedErr = client.Feed(eventSource, fauna.EventFeedStartTime(tenMinutesAgo.UnixMicro()))
require.NoError(t, feedErr, "failed to init events feed")

eventsErr = feed.Next(&page)
Expand All @@ -112,9 +110,7 @@ func TestEventFeed(t *testing.T) {
eventSource := getEventSource(t, client)

pageSize := 3
feed, feedErr := client.Feed(eventSource, &fauna.FeedArgs{
PageSize: &pageSize,
})
feed, feedErr := client.Feed(eventSource, fauna.EventFeedPageSize(pageSize))
require.NoError(t, feedErr, "failed to init events feed")

var (
Expand Down
6 changes: 3 additions & 3 deletions request.go
Original file line number Diff line number Diff line change
Expand Up @@ -183,9 +183,9 @@ func (streamReq *streamRequest) do(cli *Client) (bytes io.ReadCloser, err error)
type feedRequest struct {
apiRequest
Source EventSource
Cursor *string `json:"cursor,omitempty"`
StartTS *int64 `json:"start_ts,omitempty"`
PageSize *int `json:"page_size,omitempty"`
Cursor string
StartTS int64
PageSize int
}

func (feedReq *feedRequest) do(cli *Client) (io.ReadCloser, error) {
Expand Down
6 changes: 3 additions & 3 deletions serializer.go
Original file line number Diff line number Diff line change
Expand Up @@ -525,13 +525,13 @@ func encode(v any, hint string) (any, error) {

case feedRequest:
out := map[string]any{"token": string(vt.Source)}
if vt.PageSize != nil {
if vt.PageSize > 0 {
out["page_size"] = vt.PageSize
}
if vt.StartTS != nil {
if vt.StartTS > 0 {
out["start_ts"] = vt.StartTS
}
if vt.Cursor != nil {
if len(vt.Cursor) > 0 {
out["cursor"] = vt.Cursor
}
return out, nil
Expand Down
6 changes: 3 additions & 3 deletions serializer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -656,9 +656,9 @@ func TestMarshalEventSourceStructs(t *testing.T) {
Headers: map[string]string{},
},
Source: "",
Cursor: nil,
PageSize: nil,
StartTS: nil,
Cursor: "",
PageSize: 0,
StartTS: 0,
})
})
}

0 comments on commit 38f5087

Please sign in to comment.