Skip to content

Commit

Permalink
rename ChangeFeed to feed
Browse files Browse the repository at this point in the history
  • Loading branch information
erickpintor committed Oct 21, 2024
1 parent 6558da8 commit 232ae68
Show file tree
Hide file tree
Showing 4 changed files with 96 additions and 91 deletions.
94 changes: 45 additions & 49 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -292,13 +292,12 @@ The driver supports [Event Feeds](https://docs.fauna.com/fauna/current/learn/tra
### Request an Event Feed

An Event Feed asynchronously polls an [event stream](https://docs.fauna.com/fauna/current/learn/streaming),
represented by a stream token, for events.
represented by an event source, for events.

To get a stream token, append ``toStream()`` or ``changesOn()`` to a set from a
To get an event source, append ``eventSource()`` or ``eventsOn()`` to a set from a
[supported source](https://docs.fauna.com/fauna/current/reference/streaming_reference/#supported-sources).

To get paginated events for the stream, pass the stream token to
``change_feed()``:
To get paginated events for the source, pass the event source to ``feed()``:

```python
from fauna import fql
Expand All @@ -310,46 +309,45 @@ To get paginated events for the stream, pass the stream token to
let set = Product.all()
{
initialPage: set.pageSize(10),
streamToken: set.toStream()
eventSource: set.eventSource()
}
'''))

initialPage = response.data['initialPage']
streamToken = response.data['streamToken']
initial_page = response.data['initialPage']
event_source = response.data['eventSource']

client.change_feed(streamToken)
client.feed(event_source)
```

You can also pass a query that produces a stream token directly to
``change_feed()``:
You can also pass a query that produces an event source directly to ``feed()``:

```python
query = fql('Product.all().changesOn(.price, .stock)')
query = fql('Product.all().eventsOn(.price, .stock)')

client.change_feed(query)
client.feed(query)
```

### Iterate on an Event Feed

``change_feed()`` returns an iterator that emits pages of events. You can use a
``feed()`` returns an iterator that emits pages of events. You can use a
generator expression to iterate through the pages:

```python
query = fql('Product.all().changesOn(.price, .stock)')
feed = client.change_feed(query)
query = fql('Product.all().eventsOn(.price, .stock)')
feed = client.feed(query)

for page in feed:
print('Page stats: ', page.stats)

for event in page:
eventType = event['type']
if (eventType == 'add'):
event_type = event['type']
if (event_type == 'add'):
print('Add event: ', event)
## ...
elif (eventType == 'update'):
elif (event_type == 'update'):
print('Update event: ', event)
## ...
elif (eventType == 'remove'):
elif (event_type == 'remove'):
print('Remove event: ', event)
## ...
```
Expand All @@ -358,11 +356,11 @@ Alternatively, you can iterate through events instead of pages with
``flatten()``:

```python
query = fql('Product.all().changesOn(.price, .stock)')
feed = client.change_feed(query)
query = fql('Product.all().eventsOn(.price, .stock)')
feed = client.feed(query)

for event in feed.flatten():
eventType = event['type']
event_type = event['type']
## ...
```

Expand All @@ -381,8 +379,8 @@ raises a ``FaunaException``:
client = Client()

try:
feed = client.change_feed(fql(
'Product.all().changesOn(.price, .stock)'
feed = client.feed(fql(
'Product.all().eventsOn(.price, .stock)'
))
for event in feed.flatten():
print(event)
Expand All @@ -393,7 +391,7 @@ raises a ``FaunaException``:

Errors can be raised at two different places:

1. At the ``change_feed`` method call;
1. At the ``feed`` method call;
2. At the page iteration.

This distinction allows for users to ignore errors originating from event
Expand All @@ -408,8 +406,8 @@ processing. For example:

# Imagine if there are some products with details = null.
# The ones without details will fail due to the toUpperCase call.
feed = client.change_feed(fql(
'Product.all().map(.details.toUpperCase()).toStream()'
feed = client.feed(fql(
'Product.all().map(.details.toUpperCase()).eventSource()'
))

for page in feed:
Expand All @@ -426,13 +424,12 @@ processing. For example:

### Event Feed options

The client configuration sets default options for the ``change_feed()``
method.
The client configuration sets default options for the ``feed()`` method.

You can pass a ``ChangeFeedOptions`` object to override these defaults:
You can pass a ``FeedOptions`` object to override these defaults:

```python
options = ChangeFeedOptions(
options = FeedOptions(
max_attempts=3,
max_backoff=20,
query_timeout=timedelta(seconds=5),
Expand All @@ -441,7 +438,7 @@ options = ChangeFeedOptions(
start_ts=None,
)

client.change_feed(fql('Product.all().toStream()'), options)
client.feed(fql('Product.all().eventSource()'), options)
```

## Event Streaming
Expand All @@ -450,12 +447,11 @@ The driver supports [Event Streaming](https://docs.fauna.com/fauna/current/learn

### Start a stream

To get a stream token, append ``toStream()`` or ``changesOn()`` to a set from a
To get an event source, append ``eventSource()`` or ``eventsOn()`` to a set from a
[supported source](https://docs.fauna.com/fauna/current/reference/streaming_reference/#supported-sources).


To start and subscribe to the stream, pass the stream token to
``stream()``:
To start and subscribe to the stream, pass the event source to ``stream()``:

```python
from fauna import fql
Expand All @@ -467,21 +463,21 @@ To start and subscribe to the stream, pass the stream token to
let set = Product.all()
{
initialPage: set.pageSize(10),
streamToken: set.toStream()
eventSource: set.eventSource()
}
'''))

initialPage = response.data['initialPage']
streamToken = response.data['streamToken']
initial_page = response.data['initialPage']
event_source = response.data['eventSource']

client.stream(streamToken)
client.stream(event_source)
```

You can also pass a query that produces a stream token directly to
You can also pass a query that produces an event source directly to
``stream()``:

```python
query = fql('Product.all().changesOn(.price, .stock)')
query = fql('Product.all().eventsOn(.price, .stock)')

client.stream(query)
```
Expand All @@ -492,18 +488,18 @@ You can also pass a query that produces a stream token directly to
use a generator expression to iterate through the events:

```python
query = fql('Product.all().changesOn(.price, .stock)')
query = fql('Product.all().eventsOn(.price, .stock)')

with client.stream(query) as stream:
for event in stream:
eventType = event['type']
if (eventType == 'add'):
event_type = event['type']
if (event_type == 'add'):
print('Add event: ', event)
## ...
elif (eventType == 'update'):
elif (event_type == 'update'):
print('Update event: ', event)
## ...
elif (eventType == 'remove'):
elif (event_type == 'remove'):
print('Remove event: ', event)
## ...
```
Expand All @@ -513,7 +509,7 @@ with client.stream(query) as stream:
Use ``close()`` to close a stream:

```python
query = fql('Product.all().changesOn(.price, .stock)')
query = fql('Product.all().eventsOn(.price, .stock)')

count = 0
with client.stream(query) as stream:
Expand All @@ -540,7 +536,7 @@ client = Client()

try:
with client.stream(fql(
'Product.all().changesOn(.price, .stock)'
'Product.all().eventsOn(.price, .stock)'
)) as stream:
for event in stream:
print(event)
Expand All @@ -565,7 +561,7 @@ options = StreamOptions(
status_events=False,
)

client.stream(fql('Product.all().toStream()'), options)
client.stream(fql('Product.all().eventSource()'), options)
```

## Setup
Expand Down
2 changes: 1 addition & 1 deletion fauna/client/__init__.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
from .client import Client, QueryOptions, StreamOptions, ChangeFeedOptions
from .client import Client, QueryOptions, StreamOptions, FeedOptions, FeedPage, FeedIterator
from .endpoints import Endpoints
from .headers import Header
44 changes: 27 additions & 17 deletions fauna/client/client.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import warnings
from dataclasses import dataclass
from datetime import timedelta
from typing import Any, Dict, Iterator, Mapping, Optional, Union, List
Expand Down Expand Up @@ -70,7 +71,7 @@ class StreamOptions:


@dataclass
class ChangeFeedOptions:
class FeedOptions:
"""
A dataclass representing options available for an Event Feed.
Expand Down Expand Up @@ -461,15 +462,25 @@ def stream(
def change_feed(
self,
fql: Union[EventSource, Query],
opts: ChangeFeedOptions = ChangeFeedOptions()
) -> "ChangeFeedIterator":
opts: FeedOptions = FeedOptions(),
) -> "FeedIterator":
warnings.warn(
"The 'change_feed' method is deprecated. Prefer the 'feed' method instead.",
DeprecationWarning)
return self.feed(fql, opts)

def feed(
self,
fql: Union[EventSource, Query],
opts: FeedOptions = FeedOptions(),
) -> "FeedIterator":
"""
Opens an Event Feed in Fauna and returns an iterator that consume Fauna events.
:param fql: A Query that returns a EventSource or a EventSource.
:param opts: (Optional) Event Feed options.
:return: a :class:`ChangeFeedIterator`
:return: a :class:`FeedIterator`
:raises ClientError: Invalid options provided
:raises NetworkError: HTTP Request failed in transit
Expand Down Expand Up @@ -498,10 +509,9 @@ def change_feed(
elif self._query_timeout_ms is not None:
headers[Header.QueryTimeoutMs] = str(self._query_timeout_ms)

return ChangeFeedIterator(self._session, headers,
self._endpoint + "/changefeed/1",
self._max_attempts, self._max_backoff, opts,
token)
return FeedIterator(self._session, headers,
self._endpoint + "/feed/1", self._max_attempts,
self._max_backoff, opts, token)

def _check_protocol(self, response_json: Any, status_code):
# TODO: Logic to validate wire protocol belongs elsewhere.
Expand Down Expand Up @@ -643,7 +653,7 @@ def close(self):
self._stream.close()


class ChangeFeedPage:
class FeedPage:

def __init__(self, events: List[Any], cursor: str, stats: QueryStats):
self._events = events
Expand All @@ -660,11 +670,11 @@ def __iter__(self) -> Iterator[Any]:
yield event


class ChangeFeedIterator:
class FeedIterator:
"""A class to provide an iterator on top of Event Feed pages."""

def __init__(self, http: HTTPClient, headers: Dict[str, str], endpoint: str,
max_attempts: int, max_backoff: int, opts: ChangeFeedOptions,
max_attempts: int, max_backoff: int, opts: FeedOptions,
token: EventSource):
self._http = http
self._headers = headers
Expand All @@ -675,7 +685,7 @@ def __init__(self, http: HTTPClient, headers: Dict[str, str], endpoint: str,
self._is_done = False

if opts.start_ts is not None and opts.cursor is not None:
err_msg = "Only one of 'start_ts' or 'cursor' can be defined in the ChangeFeedOptions."
err_msg = "Only one of 'start_ts' or 'cursor' can be defined in the FeedOptions."
raise TypeError(err_msg)

if opts.page_size is not None:
Expand All @@ -686,19 +696,19 @@ def __init__(self, http: HTTPClient, headers: Dict[str, str], endpoint: str,
elif opts.start_ts is not None:
self._request["start_ts"] = opts.start_ts

def __iter__(self) -> Iterator[ChangeFeedPage]:
def __iter__(self) -> Iterator[FeedPage]:
self._is_done = False
return self

def __next__(self) -> ChangeFeedPage:
def __next__(self) -> FeedPage:
if self._is_done:
raise StopIteration

retryable = Retryable[Any](self._max_attempts, self._max_backoff,
self._next_page)
return retryable.run().response

def _next_page(self) -> ChangeFeedPage:
def _next_page(self) -> FeedPage:
with self._http.request(
method="POST",
url=self._endpoint,
Expand All @@ -717,8 +727,8 @@ def _next_page(self) -> ChangeFeedPage:
if "start_ts" in self._request:
del self._request["start_ts"]

return ChangeFeedPage(decoded["events"], decoded["cursor"],
QueryStats(decoded["stats"]))
return FeedPage(decoded["events"], decoded["cursor"],
QueryStats(decoded["stats"]))

def flatten(self) -> Iterator:
"""A generator that yields events instead of pages of events."""
Expand Down
Loading

0 comments on commit 232ae68

Please sign in to comment.