diff --git a/client.go b/client.go index c974f34..21f2165 100644 --- a/client.go +++ b/client.go @@ -428,8 +428,7 @@ func (c *Client) setHeader(key, val string) { c.headers[key] = val } -// FeedFromQuery opens an event feed from the event source returned by the [fauna.Query]. -func (c *Client) FeedFromQuery(fql *Query, opts ...QueryOptFn) (*EventFeed, error) { +func (c *Client) getEventSource(fql *Query, opts ...QueryOptFn) (*EventSource, error) { res, err := c.Query(fql, opts...) if err != nil { return nil, err @@ -440,25 +439,50 @@ func (c *Client) FeedFromQuery(fql *Query, opts ...QueryOptFn) (*EventFeed, erro return nil, fmt.Errorf("query should return a fauna.EventSource but got %T", res.Data) } - return newEventFeed(c, token) + return &token, nil } -// FeedFromQueryWithStartTime initiates an event from the event source returned by the [fauna.Query] from the given start time -func (c *Client) FeedFromQueryWithStartTime(fql *Query, startTime time.Time, opts ...QueryOptFn) (*EventFeed, error) { - res, err := c.Query(fql, opts...) +// FeedFromQuery opens an event feed from the event source returned by the [fauna.Query]. +func (c *Client) FeedFromQuery(fql *Query, opts ...QueryOptFn) (*EventFeed, error) { + token, err := c.getEventSource(fql, opts...) if err != nil { return nil, err } - token, ok := res.Data.(EventSource) - if !ok { - return nil, fmt.Errorf("query should return a fauna.EventSource but got %T", res.Data) + return newEventFeed(c, *token) +} + +// FeedFromQueryWithStartTime initiates an event from the event source returned by the [fauna.Query] with custom options +func (c *Client) FeedFromQueryWithStartTime(fql *Query, start FeedStartFn, opts ...QueryOptFn) (*EventFeed, error) { + token, err := c.getEventSource(fql, opts...) + if err != nil { + return nil, err } - return newEventFeed(c, token, FeedStartFn(startTime.UnixMicro())) + return newEventFeed(c, *token, start) +} + +// FeedFromQueryWithCursor initiates an event from the event source returned by the [fauna.Query] with custom options +func (c *Client) FeedFromQueryWithCursor(fql *Query, cursor FeedStartFn, opts ...QueryOptFn) (*EventFeed, error) { + token, err := c.getEventSource(fql, opts...) + if err != nil { + return nil, err + } + + return newEventFeed(c, *token, cursor) } // Feed opens an event feed from the event source -func (c *Client) Feed(stream EventSource, opts ...FeedOptFn) (*EventFeed, error) { - return newEventFeed(c, stream, opts...) +func (c *Client) Feed(stream EventSource) (*EventFeed, error) { + return newEventFeed(c, stream) +} + +// FeedWithStartTime opens an event feed from the event source with options +func (c *Client) FeedWithStartTime(stream EventSource, start FeedStartFn) (*EventFeed, error) { + return newEventFeed(c, stream, start) +} + +// FeedWithCursor opens an event feed from the event source with options +func (c *Client) FeedWithCursor(stream EventSource, cursor FeedStartFn) (*EventFeed, error) { + return newEventFeed(c, stream, cursor) } diff --git a/config.go b/config.go index 74fca53..2ef7de8 100644 --- a/config.go +++ b/config.go @@ -168,9 +168,13 @@ func argsStringFromMap(input map[string]string, currentArgs ...string) string { return strings.ReplaceAll(params.Encode(), "&", ",") } -// FeedOptFn function to set options on the [Client.NewEventFeed] -type FeedOptFn func(req *feedRequest) +// FeedStartFn function to set options on the [Client.NewEventFeed] +type FeedStartFn func(req *feedRequest) -func FeedStartFn(ts int64) FeedOptFn { +func EventFeedCursor(cursor string) FeedStartFn { + return func(req *feedRequest) { req.Cursor = cursor } +} + +func EventFeedStartTime(ts int64) FeedStartFn { return func(req *feedRequest) { req.StartTS = ts } } diff --git a/event_feed.go b/event_feed.go index bb8169c..a588a62 100644 --- a/event_feed.go +++ b/event_feed.go @@ -9,14 +9,14 @@ type EventFeed struct { client *Client source EventSource - opts []FeedOptFn + opts []FeedStartFn decoder *json.Decoder lastCursor string } -func newEventFeed(client *Client, source EventSource, opts ...FeedOptFn) (*EventFeed, error) { +func newEventFeed(client *Client, source EventSource, opts ...FeedStartFn) (*EventFeed, error) { feed := &EventFeed{ client: client, source: source, @@ -30,7 +30,7 @@ func newEventFeed(client *Client, source EventSource, opts ...FeedOptFn) (*Event return feed, nil } -func (ef *EventFeed) open(opts ...FeedOptFn) error { +func (ef *EventFeed) open(opts ...FeedStartFn) error { req := feedRequest{ apiRequest: apiRequest{ ef.client.ctx, diff --git a/event_feed_test.go b/event_feed_test.go index ed80f71..fe4ba46 100644 --- a/event_feed_test.go +++ b/event_feed_test.go @@ -92,7 +92,7 @@ func TestEventFeed(t *testing.T) { eventSource = getEventSource(t, client) require.NotNil(t, eventSource, "failed to get an EventSource") - feed, feedErr = client.Feed(eventSource, fauna.FeedStartFn(time.Now().Add(-time.Minute*10).UnixMicro())) + feed, feedErr = client.FeedWithStartTime(eventSource, fauna.EventFeedStartTime(time.Now().Add(-time.Minute*10).UnixMicro())) require.NoError(t, feedErr, "failed to init events feed") feedRes, eventsErr := feed.Events()