Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Event Feeds #174

Merged
merged 24 commits into from
Nov 1, 2024
Merged
Show file tree
Hide file tree
Changes from 19 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -439,6 +439,10 @@ For supported functions, see
[StreamOptFn](https://pkg.go.dev/github.com/fauna/fauna-go/v2#StreamOptFn) in
the API reference.

## Event Feeds (beta)

The driver supports [Event Feeds](https://docs.fauna.com/fauna/current/learn/cdc/#event-feeds). See [example](event_feed_example_test.go).

## Debug logging

To enable debug logging set the `FAUNA_DEBUG` environment variable to an integer for the value of the desired [slog.Level](https://pkg.go.dev/log/slog#Level).
Expand Down
69 changes: 61 additions & 8 deletions client.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ type Client struct {
maxBackoff time.Duration

// lazily cached URLs
queryURL, streamURL *url.URL
queryURL, streamURL, feedURL *url.URL

logger Logger
}
Expand Down Expand Up @@ -197,10 +197,6 @@ func (c *Client) parseQueryURL() (*url.URL, error) {
}
}

if c.queryURL == nil {
return nil, fmt.Errorf("query url is not set")
}

return c.queryURL, nil
}

Expand All @@ -213,11 +209,19 @@ func (c *Client) parseStreamURL() (*url.URL, error) {
}
}

if c.streamURL == nil {
return nil, fmt.Errorf("stream url is not set")
return c.streamURL, nil
}

func (c *Client) parseFeedURL() (*url.URL, error) {
if c.feedURL == nil {
if feedURL, err := url.Parse(c.url); err != nil {
return nil, err
} else {
c.feedURL = feedURL.JoinPath("feed", "1")
}
}

return c.streamURL, nil
return c.feedURL, nil
}

func (c *Client) doWithRetry(req *http.Request) (attempts int, r *http.Response, err error) {
Expand Down Expand Up @@ -423,3 +427,52 @@ func (c *Client) String() string {
func (c *Client) setHeader(key, val string) {
c.headers[key] = val
}

func (c *Client) getEventSource(fql *Query, opts ...QueryOptFn) (*EventSource, error) {
res, err := c.Query(fql, opts...)
if err != nil {
return nil, err
}

token, ok := res.Data.(EventSource)
if !ok {
return nil, fmt.Errorf("query should return a fauna.EventSource but got %T", res.Data)
}

return &token, nil
}

// FeedFromQuery opens an event feed from the event source returned by the [fauna.Query].
func (c *Client) FeedFromQuery(fql *Query, opts ...QueryOptFn) (*EventFeed, error) {
token, err := c.getEventSource(fql, opts...)
if err != nil {
return nil, err
}

return newEventFeed(c, *token)
}

// FeedFromQueryWithStartTime initiates an event from the event source returned by the [fauna.Query] with custom options
func (c *Client) FeedFromQueryWithStartTime(fql *Query, time time.Time, opts ...QueryOptFn) (*EventFeed, error) {
token, err := c.getEventSource(fql, opts...)
if err != nil {
return nil, err
}

return newEventFeed(c, *token, EventFeedStartTime(time.UnixMicro()))
}

// Feed opens an event feed from the event source
func (c *Client) Feed(stream EventSource) (*EventFeed, error) {
return newEventFeed(c, stream)
}

// FeedWithStartTime opens an event feed from the event source with options
func (c *Client) FeedWithStartTime(stream EventSource, start time.Time) (*EventFeed, error) {
return newEventFeed(c, stream, EventFeedStartTime(start.UnixMicro()))
}

// FeedWithCursor opens an event feed from the event source with options
func (c *Client) FeedWithCursor(stream EventSource, cursor string) (*EventFeed, error) {
return newEventFeed(c, stream, EventFeedCursor(cursor))
}
cynicaljoy marked this conversation as resolved.
Show resolved Hide resolved
11 changes: 11 additions & 0 deletions config.go
Original file line number Diff line number Diff line change
Expand Up @@ -167,3 +167,14 @@ func argsStringFromMap(input map[string]string, currentArgs ...string) string {

return strings.ReplaceAll(params.Encode(), "&", ",")
}

// FeedStartFn function to set options on the [Client.NewEventFeed]
type FeedStartFn func(req *feedRequest)
cynicaljoy marked this conversation as resolved.
Show resolved Hide resolved

func EventFeedCursor(cursor string) FeedStartFn {
return func(req *feedRequest) { req.Cursor = cursor }
}

func EventFeedStartTime(ts int64) FeedStartFn {
return func(req *feedRequest) { req.StartTS = ts }
}
cynicaljoy marked this conversation as resolved.
Show resolved Hide resolved
83 changes: 83 additions & 0 deletions event_feed.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
package fauna

import (
"encoding/json"
)

// EventFeed represents an event feed subscription.
type EventFeed struct {
client *Client

source EventSource
opts []FeedStartFn

decoder *json.Decoder

lastCursor string
}

func newEventFeed(client *Client, source EventSource, opts ...FeedStartFn) (*EventFeed, error) {
feed := &EventFeed{
pnwpedro marked this conversation as resolved.
Show resolved Hide resolved
client: client,
source: source,
opts: opts,
}

if err := feed.open(opts...); err != nil {
return nil, err
}
cynicaljoy marked this conversation as resolved.
Show resolved Hide resolved

return feed, nil
}

func (ef *EventFeed) open(opts ...FeedStartFn) error {
req := feedRequest{
apiRequest: apiRequest{
ef.client.ctx,
ef.client.headers,
},
Source: ef.source,
Cursor: ef.lastCursor,
}

if (opts != nil) && (len(opts) > 0) {
ef.opts = append(ef.opts, opts...)
}

for _, optFn := range ef.opts {
optFn(&req)
}

byteStream, err := req.do(ef.client)
if err != nil {
return err
}

ef.decoder = json.NewDecoder(byteStream)

return nil
}

// FeedResponse represents the response from the EventFeed.Events
type FeedResponse struct {
cynicaljoy marked this conversation as resolved.
Show resolved Hide resolved
Events []Event `json:"events"`
Cursor string `json:"cursor"`
HasNext bool `json:"has_next"`
Stats Stats `json:"stats"`
}

// Events return the next FeedResponse from the EventFeed
func (ef *EventFeed) Events() (*FeedResponse, error) {
cynicaljoy marked this conversation as resolved.
Show resolved Hide resolved
var response FeedResponse
if err := ef.open(); err != nil {
return nil, err
}

if err := ef.decoder.Decode(&response); err != nil {
return nil, err
}

ef.lastCursor = response.Cursor

return &response, nil
}
47 changes: 47 additions & 0 deletions event_feed_example_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
package fauna_test

import (
"fmt"
"log"

"github.com/fauna/fauna-go/v3"
)

func ExampleEventFeed_Events() {
client := fauna.NewClient("secret", fauna.DefaultTimeouts(), fauna.URL(fauna.EndpointLocal))

query, queryErr := fauna.FQL(`Collection.byName("EventFeedTest")?.delete()
Collection.create({ name: "EventFeedTest" })
EventFeedTest.all().toStream()`, nil)
if queryErr != nil {
log.Fatal(queryErr.Error())
}

feed, feedErr := client.FeedFromQuery(query)
if feedErr != nil {
log.Fatal(feedErr.Error())
}

addOne, _ := fauna.FQL(`EventFeedTest.create({ foo: 'bar' })`, nil)
_, addOneErr := client.Query(addOne)
if addOneErr != nil {
log.Fatal(addOneErr.Error())
}

for {
res, eventErr := feed.Events()
if eventErr != nil {
log.Fatal(eventErr.Error())
}

for _, event := range res.Events {
fmt.Println(event.Type)
}

if !res.HasNext {
break
}
}

// Output: add
}
Loading
Loading