diff --git a/README.md b/README.md index 1229a91..a9bc07e 100644 --- a/README.md +++ b/README.md @@ -439,6 +439,10 @@ For supported functions, see [StreamOptFn](https://pkg.go.dev/github.com/fauna/fauna-go/v2#StreamOptFn) in the API reference. +## Event Feeds (beta) + +The driver supports [Event Feeds](https://docs.fauna.com/fauna/current/learn/cdc/#event-feeds). See [example](event_feed_example_test.go). + ## Debug logging To enable debug logging set the `FAUNA_DEBUG` environment variable to an integer for the value of the desired [slog.Level](https://pkg.go.dev/log/slog#Level). diff --git a/client.go b/client.go index 27c3020..db8dc95 100644 --- a/client.go +++ b/client.go @@ -72,7 +72,7 @@ type Client struct { maxBackoff time.Duration // lazily cached URLs - queryURL, streamURL *url.URL + queryURL, streamURL, feedURL *url.URL logger Logger } @@ -197,10 +197,6 @@ func (c *Client) parseQueryURL() (*url.URL, error) { } } - if c.queryURL == nil { - return nil, fmt.Errorf("query url is not set") - } - return c.queryURL, nil } @@ -213,11 +209,19 @@ func (c *Client) parseStreamURL() (*url.URL, error) { } } - if c.streamURL == nil { - return nil, fmt.Errorf("stream url is not set") + return c.streamURL, nil +} + +func (c *Client) parseFeedURL() (*url.URL, error) { + if c.feedURL == nil { + if feedURL, err := url.Parse(c.url); err != nil { + return nil, err + } else { + c.feedURL = feedURL.JoinPath("feed", "1") + } } - return c.streamURL, nil + return c.feedURL, nil } func (c *Client) doWithRetry(req *http.Request) (attempts int, r *http.Response, err error) { @@ -423,3 +427,49 @@ func (c *Client) String() string { func (c *Client) setHeader(key, val string) { c.headers[key] = val } + +// Feed opens an event feed from the event source +func (c *Client) Feed(stream EventSource, opts ...FeedOptFn) (*EventFeed, error) { + feedOpts, err := parseFeedOptions(opts...) + if err != nil { + return nil, err + } + + return newEventFeed(c, stream, feedOpts) +} + +// FeedFromQuery opens an event feed from a query +func (c *Client) FeedFromQuery(query *Query, opts ...FeedOptFn) (*EventFeed, error) { + feedOpts, err := parseFeedOptions(opts...) + if err != nil { + return nil, err + } + if feedOpts.Cursor != nil { + return nil, fmt.Errorf("cannot use EventFeedCursor with FeedFromQuery") + } + + res, err := c.Query(query) + if err != nil { + return nil, err + } + + eventSource, ok := res.Data.(EventSource) + if !ok { + return nil, fmt.Errorf("query should return a fauna.EventSource but got %T", res.Data) + } + + return newEventFeed(c, eventSource, feedOpts) +} + +func parseFeedOptions(opts ...FeedOptFn) (*feedOptions, error) { + feedOpts := feedOptions{} + for _, optFn := range opts { + optFn(&feedOpts) + } + + if feedOpts.StartTS != nil && feedOpts.Cursor != nil { + return nil, fmt.Errorf("cannot use EventFeedStartTime and EventFeedCursor together") + } + + return &feedOpts, nil +} diff --git a/config.go b/config.go index 5f557ce..2764596 100644 --- a/config.go +++ b/config.go @@ -167,3 +167,23 @@ func argsStringFromMap(input map[string]string, currentArgs ...string) string { return strings.ReplaceAll(params.Encode(), "&", ",") } + +// FeedOptFn function to set options on the [fauna.EventFeed] +type FeedOptFn func(req *feedOptions) + +// EventFeedCursor set the cursor for the [fauna.EventFeed] +// cannot be used with [EventFeedStartTime] or in [fauna.Client.FeedFromQuery] +func EventFeedCursor(cursor string) FeedOptFn { + return func(req *feedOptions) { req.Cursor = &cursor } +} + +// EventFeedStartTime set the start time for the [fauna.EventFeed] +// cannot be used with [EventFeedCursor] +func EventFeedStartTime(ts int64) FeedOptFn { + return func(req *feedOptions) { req.StartTS = &ts } +} + +// EventFeedPageSize set the page size for the [fauna.EventFeed] +func EventFeedPageSize(ts int) FeedOptFn { + return func(req *feedOptions) { req.PageSize = &ts } +} diff --git a/event_feed.go b/event_feed.go new file mode 100644 index 0000000..0f17139 --- /dev/null +++ b/event_feed.go @@ -0,0 +1,94 @@ +package fauna + +import ( + "encoding/json" +) + +// EventFeed represents an event feed subscription. +type EventFeed struct { + client *Client + + source EventSource + + decoder *json.Decoder + + opts *feedOptions + lastCursor string +} + +type feedOptions struct { + PageSize *int + Cursor *string + StartTS *int64 +} + +func newEventFeed(client *Client, source EventSource, opts *feedOptions) (*EventFeed, error) { + feed := &EventFeed{ + client: client, + source: source, + opts: opts, + } + + return feed, nil +} + +func (ef *EventFeed) newFeedRequest() (*feedRequest, error) { + req := feedRequest{ + apiRequest: apiRequest{ + ef.client.ctx, + ef.client.headers, + }, + Source: ef.source, + Cursor: ef.lastCursor, + } + if ef.opts.StartTS != nil { + req.StartTS = *ef.opts.StartTS + } + if ef.opts.Cursor != nil { + req.Cursor = *ef.opts.Cursor + } + if ef.opts.PageSize != nil { + req.PageSize = *ef.opts.PageSize + } + + return &req, nil +} + +func (ef *EventFeed) open() error { + req, err := ef.newFeedRequest() + if err != nil { + return err + } + + byteStream, err := req.do(ef.client) + if err != nil { + return err + } + + ef.decoder = json.NewDecoder(byteStream) + + return nil +} + +// FeedPage represents the response from [fauna.EventFeed.Next] +type FeedPage struct { + Events []Event `json:"events"` + Cursor string `json:"cursor"` + HasNext bool `json:"has_next"` + Stats Stats `json:"stats"` +} + +// Next retrieves the next FeedPage from the [fauna.EventFeed] +func (ef *EventFeed) Next(page *FeedPage) error { + if err := ef.open(); err != nil { + return err + } + + if err := ef.decoder.Decode(&page); err != nil { + return err + } + + ef.lastCursor = page.Cursor + + return nil +} diff --git a/event_feed_example_test.go b/event_feed_example_test.go new file mode 100644 index 0000000..bcc7015 --- /dev/null +++ b/event_feed_example_test.go @@ -0,0 +1,48 @@ +package fauna_test + +import ( + "fmt" + "log" + + "github.com/fauna/fauna-go/v3" +) + +func ExampleEventFeed_Next() { + client := fauna.NewClient("secret", fauna.DefaultTimeouts(), fauna.URL(fauna.EndpointLocal)) + + query, queryErr := fauna.FQL(`Collection.byName("EventFeedTest")?.delete() +Collection.create({ name: "EventFeedTest" }) +EventFeedTest.all().eventSource()`, nil) + if queryErr != nil { + log.Fatal(queryErr.Error()) + } + + feed, feedErr := client.FeedFromQuery(query) + if feedErr != nil { + log.Fatal(feedErr.Error()) + } + + addOne, _ := fauna.FQL(`EventFeedTest.create({ foo: 'bar' })`, nil) + _, addOneErr := client.Query(addOne) + if addOneErr != nil { + log.Fatal(addOneErr.Error()) + } + + for { + var page fauna.FeedPage + eventErr := feed.Next(&page) + if eventErr != nil { + log.Fatal(eventErr.Error()) + } + + for _, event := range page.Events { + fmt.Println(event.Type) + } + + if !page.HasNext { + break + } + } + + // Output: add +} diff --git a/event_feed_test.go b/event_feed_test.go new file mode 100644 index 0000000..ddec9d6 --- /dev/null +++ b/event_feed_test.go @@ -0,0 +1,228 @@ +package fauna_test + +import ( + "testing" + "time" + + "github.com/fauna/fauna-go/v3" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +func TestEventFeed(t *testing.T) { + t.Setenv(fauna.EnvFaunaEndpoint, fauna.EndpointLocal) + t.Setenv(fauna.EnvFaunaSecret, "secret") + + client, clientErr := fauna.NewDefaultClient() + require.NoError(t, clientErr) + + resetCollection(t, client) + + t.Run("returns errors correctly", func(t *testing.T) { + t.Run("should error when the query doesn't return an event source", func(t *testing.T) { + query, queryErr := fauna.FQL(`42`, nil) + require.NoError(t, queryErr) + + _, feedErr := client.FeedFromQuery(query) + require.ErrorContains(t, feedErr, "query should return a fauna.EventSource but got int") + }) + + t.Run("should error when attempting to use a cursor with a query", func(t *testing.T) { + query, queryErr := fauna.FQL(`EventFeedTest.all().eventSource()`, nil) + require.NoError(t, queryErr, "failed to create a query for EventSource") + + _, feedErr := client.FeedFromQuery(query, fauna.EventFeedCursor("cursor")) + require.ErrorContains(t, feedErr, "cannot use EventFeedCursor with FeedFromQuery") + }) + + t.Run("should error when attempting to use a start time and a cursor", func(t *testing.T) { + query, queryErr := fauna.FQL(`EventFeedTest.all().eventSource()`, nil) + require.NoError(t, queryErr, "failed to create a query for EventSource") + + req, reqErr := client.Query(query) + require.NoError(t, reqErr, "failed to execute query") + + var response fauna.EventSource + unmarshalErr := req.Unmarshal(&response) + require.NoError(t, unmarshalErr, "failed to unmarshal EventSource") + + _, feedErr := client.Feed(response, fauna.EventFeedStartTime(time.Now().UnixMicro()), fauna.EventFeedCursor("cursor")) + require.ErrorContains(t, feedErr, "cannot use EventFeedStartTime and EventFeedCursor together") + }) + }) + + t.Run("can use event feeds from a query", func(t *testing.T) { + query, queryErr := fauna.FQL(`EventFeedTest.all().eventSource()`, nil) + require.NoError(t, queryErr, "failed to create a query for EventSource") + + feed, feedErr := client.FeedFromQuery(query) + require.NoError(t, feedErr, "failed to init events feed") + + var ( + start = 5 + end = 20 + ) + + createOne(t, client, feed) + createMultipleDocs(t, client, start, end) + + var page fauna.FeedPage + eventsErr := feed.Next(&page) + require.NoError(t, eventsErr, "failed to get events from EventSource") + require.Equal(t, end-start, len(page.Events), "unexpected number of events") + }) + + t.Run("can get events from EventSource", func(t *testing.T) { + t.Run("can get an EventSource", func(t *testing.T) { + eventSource := getEventSource(t, client) + require.NotNil(t, eventSource, "failed to get an EventSource") + }) + + t.Run("get events from an EventSource", func(t *testing.T) { + eventSource := getEventSource(t, client) + + feed, feedErr := client.Feed(eventSource) + require.NoError(t, feedErr, "failed to init events feed") + + var ( + start = 5 + end = 20 + ) + + createOne(t, client, feed) + createMultipleDocs(t, client, start, end) + + var page fauna.FeedPage + eventsErr := feed.Next(&page) + require.NoError(t, eventsErr, "failed to get events from EventSource") + require.Equal(t, end-start, len(page.Events), "unexpected number of events") + }) + }) + + t.Run("can get events from history", func(t *testing.T) { + resetCollection(t, client) + + createOne(t, client, nil) + + eventSource := getEventSource(t, client) + require.NotNil(t, eventSource, "failed to get an EventSource") + + feed, feedErr := client.Feed(eventSource) + require.NoError(t, feedErr, "failed to init events feed") + + var page fauna.FeedPage + eventsErr := feed.Next(&page) + require.NoError(t, eventsErr, "failed to get events") + require.Equal(t, 0, len(page.Events), "unexpected number of events") + + eventSource = getEventSource(t, client) + require.NotNil(t, eventSource, "failed to get an EventSource") + + tenMinutesAgo := time.Now().Add(-10 * time.Minute) + feed, feedErr = client.Feed(eventSource, fauna.EventFeedStartTime(tenMinutesAgo.UnixMicro())) + require.NoError(t, feedErr, "failed to init events feed") + + eventsErr = feed.Next(&page) + require.NoError(t, eventsErr, "failed to get events") + require.Equal(t, 1, len(page.Events), "unexpected number of events") + }) + + t.Run("can use page size", func(t *testing.T) { + resetCollection(t, client) + + eventSource := getEventSource(t, client) + + pageSize := 3 + feed, feedErr := client.Feed(eventSource, fauna.EventFeedPageSize(pageSize)) + require.NoError(t, feedErr, "failed to init events feed") + + var ( + start = 5 + end = 20 + page fauna.FeedPage + seenEvents int + ) + + createOne(t, client, feed) + createMultipleDocs(t, client, start, end) + + for { + eventsErr := feed.Next(&page) + require.NoError(t, eventsErr, "failed to get events from EventSource") + + seenEvents += len(page.Events) + + if !page.HasNext { + break + } + + // every page but the last should have the right page size + require.Equal(t, pageSize, len(page.Events), "unexpected number of events") + } + + require.Equal(t, end-start, seenEvents, "unexpected number of events") + }) +} + +func resetCollection(t *testing.T, client *fauna.Client) { + t.Helper() + + setupQuery, setupQueryErr := fauna.FQL(`Collection.byName("EventFeedTest")?.delete() +Collection.create({ name: "EventFeedTest" })`, nil) + require.NoError(t, setupQueryErr, "setup query error: %s", setupQueryErr) + + _, setupErr := client.Query(setupQuery) + require.NoError(t, setupErr, "setup error: %s", setupErr) +} + +func getEventSource(t *testing.T, client *fauna.Client) fauna.EventSource { + t.Helper() + + query, queryErr := fauna.FQL(`EventFeedTest.all().eventSource()`, nil) + require.NoError(t, queryErr, "failed to create a query for EventSource") + + feedRes, feedResErr := client.Query(query) + require.NoError(t, feedResErr, "failed to init events feed") + + var eventSource fauna.EventSource + unmarshalErr := feedRes.Unmarshal(&eventSource) + require.NoError(t, unmarshalErr, "failed to unmarshal EventSource") + require.NotNil(t, eventSource, "event source is nil") + require.NotEmpty(t, eventSource, "event source is empty") + + return eventSource +} + +func createOne(t *testing.T, client *fauna.Client, feed *fauna.EventFeed) { + t.Helper() + + createOneQuery, createOneQueryErr := fauna.FQL("EventFeedTest.create({ foo: 'bar' })", nil) + require.NoError(t, createOneQueryErr, "failed to init query for create statement") + require.NotNil(t, createOneQuery, "create statement is nil") + + _, createOneErr := client.Query(createOneQuery) + require.NoError(t, createOneErr, "failed to create a document") + + if feed == nil { + return + } + + var page fauna.FeedPage + eventsErr := feed.Next(&page) + require.NoError(t, eventsErr, "failed to get events") + + assert.Equal(t, 1, len(page.Events), "unexpected number of events") +} + +func createMultipleDocs(t *testing.T, client *fauna.Client, start int, end int) { + t.Helper() + + query, queryErr := fauna.FQL(`Set.sequence(${start}, ${end}).forEach(n => EventFeedTest.create({ n: n }))`, map[string]any{ + "start": start, + "end": end, + }) + require.NoError(t, queryErr, "failed to init query for create statement") + + _, err := client.Query(query) + require.NoError(t, err) +} diff --git a/request.go b/request.go index b10d166..4808e8f 100644 --- a/request.go +++ b/request.go @@ -179,3 +179,41 @@ func (streamReq *streamRequest) do(cli *Client) (bytes io.ReadCloser, err error) bytes = httpRes.Body return } + +type feedRequest struct { + apiRequest + Source EventSource + Cursor string + StartTS int64 + PageSize int +} + +func (feedReq *feedRequest) do(cli *Client) (io.ReadCloser, error) { + bytesOut, marshalErr := marshal(feedReq) + if marshalErr != nil { + return nil, fmt.Errorf("marshal request failed: %w", marshalErr) + } + + changeFeedURL, parseURLErr := cli.parseFeedURL() + if parseURLErr != nil { + return nil, fmt.Errorf("parse url failed: %w", parseURLErr) + } + + attempts, httpRes, postErr := feedReq.post(cli, changeFeedURL, bytesOut) + if postErr != nil { + return nil, fmt.Errorf("post request failed: %w", postErr) + } + + if httpRes.StatusCode != http.StatusOK { + qRes, err := parseQueryResponse(httpRes) + if err == nil { + if err = getErrFauna(httpRes.StatusCode, qRes, attempts); err == nil { + err = fmt.Errorf("unknown error for http status: %d", httpRes.StatusCode) + } + } + + return nil, err + } + + return httpRes.Body, nil +} diff --git a/serializer.go b/serializer.go index 6959280..e32ee57 100644 --- a/serializer.go +++ b/serializer.go @@ -523,6 +523,19 @@ func encode(v any, hint string) (any, error) { } return out, nil + case feedRequest: + out := map[string]any{"token": string(vt.Source)} + if vt.PageSize > 0 { + out["page_size"] = vt.PageSize + } + if vt.StartTS > 0 { + out["start_ts"] = vt.StartTS + } + if len(vt.Cursor) > 0 { + out["cursor"] = vt.Cursor + } + return out, nil + case []byte: return encodeBytes(vt) } diff --git a/serializer_test.go b/serializer_test.go index fe95148..30ec022 100644 --- a/serializer_test.go +++ b/serializer_test.go @@ -1,6 +1,7 @@ package fauna import ( + "context" "encoding/base64" "reflect" "testing" @@ -623,3 +624,41 @@ func TestComposition(t *testing.T) { } }) } + +func TestMarshalEventSourceStructs(t *testing.T) { + t.Run("marshal query request", func(t *testing.T) { + marshalAndCheck(t, queryRequest{ + apiRequest: apiRequest{ + Context: context.Background(), + Headers: map[string]string{}, + }, + Query: nil, + Arguments: nil, + }) + }) + + t.Run("marshal stream request", func(t *testing.T) { + marshalAndCheck(t, streamRequest{ + apiRequest: apiRequest{ + Context: context.Background(), + Headers: map[string]string{}, + }, + Stream: "", + StartTS: 0, + Cursor: "", + }) + }) + + t.Run("marshal feed request", func(t *testing.T) { + marshalAndCheck(t, feedRequest{ + apiRequest: apiRequest{ + Context: context.Background(), + Headers: map[string]string{}, + }, + Source: "", + Cursor: "", + PageSize: 0, + StartTS: 0, + }) + }) +}