Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

SPEC.md v 0.2 #17

Open
wants to merge 2 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
117 changes: 65 additions & 52 deletions SPEC.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,15 @@
# ZeroEventHub spec

## Version

Specification version: 0.2.

ZeroEventHub has a limited number of servers and clients at the moment.

If you are considering to use ZeroEventHub outside of Vipps MobilePay,
please let us know in a GitHub issue, and we will instantly freeze the standard
as 1.0 and handle future upgrade cases in a backwards-compatible manner,
or include a versioning mechanism in the protocol.

## Abstract

Expand Down Expand Up @@ -124,23 +134,12 @@ Partitions are provided so that event processing can more easily be
parallelized both at the Producer and the Consumer. In this example
the publisher has documented that 4 partitions is available -- the
client cannot change this, but has to document its assumption in the
`n` parameter. Also the client chooses to process 2 of these
parititions in a single request -- presumably there is another thread
processing the remaining 2 partitions in parallel. This method of
maintaining independent cursor allows the consumer flexibility in
advancing all cursors in parallel in one chain of calls, or to orchestrate
multiple chains of calls on different subset of partitions, and to split &
merge partitions at will (up to the limit supported by the producer).

**WARNING**: If comparing with the same events published on Event Hub,
while the partition keys should be the same, the mapping of partition
keys to partitions will be different. Events will be "re-partitioned"
when published to Event Hub vs. the Event API.

`n` parameter.

### Authorization & use of consumer identity

Standard service-to-service MSI, out of scope for this spec.
The authorization method is standard service-to-service HTTP;
out of scope for this spec.

As part of the authorization the publisher will usually figure out a
name/identity of the consumer. The publisher may use this to log access,
Expand Down Expand Up @@ -176,19 +175,19 @@ or use a more complex setup to "re-partition" on the fly.
### Example request:

```
GET https://myservice/my-kind-of-entity/feed/v1?cursor0=1000240213123&cursor1=1231231231242&pagesizehint=1000&n=4&headers=ce_tracestate,ce_id
GET https://myservice/my-kind-of-entity/feed?n=4&partition=0&cursor=1000240213123&pagesizehint=1000&headers=tracestate,id
```


### Parameters
### Parameters supported by all servers

* **n**: Number of partitions the client assumes the server to have,
in total. If there is mismatch, the server is free to either
emulate the behaviour or return 400 Bad Request.

* **cursorN**: Pass in one cursor for each partition you wish to
consume; where `N` is a number in the range `0...n-1`.
Each `cursor` is an opaque string that should be passed
* **partition**: Which partition to read; an integer in the range `0..n-1`.

* **cursor**: The cursor for the given partition; `n` is a number
in the range `0...n-1`. Each `cursor` is an opaque string that should be passed
back as-is, but is limited to ASCII printable characters. Two special
cursors are used; `_first` means to start from the beginning of time,
and `_last` starts at an arbitrary point "around now".
Expand All @@ -203,21 +202,23 @@ GET https://myservice/my-kind-of-entity/feed/v1?cursor0=1000240213123&cursor1=12
sensible default for the dataset.

* **headers**: In event transports (such as Event Hub) the headers are
primarily of use to middlewares. With zeroeventhub the consumption is more
primarily of use to middlewares. With ZeroEventHub the consumption is more
direct, and therefore headers of events are not returned by default,
and the header parameter is used to request which headers one wants.
The special value `_all` can be used to request all headers. The parameter
is optional and its absence means that no headers will be returned.

See the example above for more detailed description of the interaction of
`n` and `cursorN`.
### Parameters optionally supported

* **wait**: The number of milliseconds (as an integer) to wait for at least
one event in the response before returning (i.e., longpolling).
This allows clients to open an HTTP request and get back a response
the moment an event happens to minimize
latency for seldom-occuring events.

Consumers are encouraged to pass contiguous ranges of cursors and the
same number of cursors from every thread. I.e., pass
`cursor2=...&cursor3=...`, DO NOT pass `cursor2=...&cursor4=...` (not
contiguous) and DO NOT pass `cursor1=...&cursor2=...` (does not spread
evenly across threads). This recommentation in this spec makes the
pattern predictable in the case that producers can optimize for it.
* **stream**: The HTTP response will be held open for the given number of
milliseconds (as an integer). Events will be written and flushed continuously
as they happen.

### Response

Expand All @@ -230,13 +231,12 @@ to the streaming format itself.

#### Events

An event has the form `{"partition": ..., "headers": {...}, "data": { ... }}`,
An event has the form `{"headers": {...}, "data": { ... }}`,
here is an example displayed with whitespace for clarity
(newlines must not be present within each event on the wire):

```
{
"partition": 0,
"headers": {
"header1": "value1",
"header2": "value2",
Expand All @@ -249,22 +249,36 @@ here is an example displayed with whitespace for clarity
```

If `header` is empty -- as is the case when not requesting headers in the request --
it can be non-present in the struct
it can be non-present in the struct.

#### Checkpoint
*Please note:* Currently all backends will also emit the key `partition` on every
line. This field will however be gone when all clients have upgraded to
the latest version of the spec.

A checkpoint has the form `{"partition": ..., "cursor": ...}`. The client can save this
#### Checkpoints and event ordering

A checkpoint has the form `{"cursor": ...}`. The client can save this
cursor value in order to start the stream at the same point.

Between checkpoints, events are unsorted and may arrive in a different order if the
same/similar request is done again.
In general, events transported over ZeroEventHub should follow the
*partitioned log* event communication model of Kafka, Event Hub, etc;
i.e., events should arrive in-order and should be processed in the
order they are received in the response.

*However*, there is no guarantee that two requests
from a given `cursor` will produce an identical response each time.
This is because while from the perspective of the API we are consuming
a single partition, the server *could* be emulating this behaviour
from a larger set of real physical partitions, and it may be that
returning data from the underlying partitions happen in a non-deterministic
manner to minimize latency. What is important
is:

* The client should consider events ordered in the order they arrive in
* Any ordering between events *that matters* should be reproducible
(typically, event ordering will be the same between calls for some
underlying physical partition of events)

Checkpoints are also allowed to come back differently if the same/similar request is done
again. This is because the cursor may really be a composite cursor on several internal
partitions in the service, and a checkpoint be emitted for an advance of any individual
internal partition. The requirement for checkpoints is simply that if a client
persisted all events *before* a checkpoint, and then passes the checkpoint cursor
in on the next call, then it will be able to properly follow the stream of the events.

### Recommendations

Expand Down Expand Up @@ -314,16 +328,6 @@ processing you need to handle re-processing anyway! But, if there is a real
need to simply store a state without reading more events you can always
pass `store_cursor=1&pagesizehint=0`...

## Possible future extension: Long-polling

An extra argument:

* **wait=NNN** If there are no new events, wait for this many seconds
before returning.

This allows clients to open an HTTP request and get back a response
the moment an event happens.

## Future possibilities

* The newline-delimited JSON works for other formats more typical for streaming
Expand All @@ -338,3 +342,12 @@ the moment an event happens.
* One could allow subscribing to subsets of the event feed. Such extensions
may be standardized or just bolted on by each backend in a manner that
fits the event data...

## Changelog

* 0.2: Each call can only consume a single partition;
moved from `cursor0=c` to `partition=0&cursor=c`, in order to
simplify the specification and implementations. Described optional
flags `wait` and `stream`.

* 0.1: Initial release.
108 changes: 88 additions & 20 deletions go/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,15 +6,15 @@ import (
"context"
"encoding/json"
"fmt"
"github.com/gorilla/mux"
"github.com/pkg/errors"
"github.com/sirupsen/logrus"
"io"
"net/http"
"net/url"
"strconv"
"strings"

"github.com/gorilla/mux"
"github.com/pkg/errors"
"github.com/sirupsen/logrus"
"time"
)

const (
Expand Down Expand Up @@ -56,13 +56,25 @@ type EventReceiver interface {
Checkpoint(partitionID int, cursor string) error
}

type Options struct {
PageSizeHint int
Wait time.Duration
Stream time.Duration
Headers []string
}

func (o Options) AllHeaders() Options {
o.Headers = []string{All}
return o
}

// EventFetcher is a generic-based interface providing a contract for fetching events: both for the server side and
// client side implementations.
type EventFetcher interface {
// FetchEvents method accepts array of Cursor's along with an optional page size hint and an EventReceiver.
// Pass pageSizeHint = 0 for having the server choose a default / no hint.
// Optional `headers` argument specifies headers to be returned, or none, if it's absent.
FetchEvents(ctx context.Context, cursors []Cursor, pageSizeHint int, receiver EventReceiver, headers ...string) error
FetchEvents(ctx context.Context, cursors []Cursor, receiver EventReceiver, options Options) error
}

// API is a generic-based interface that has to be implemented on a server side.
Expand All @@ -79,17 +91,35 @@ type API interface {
type NDJSONEventSerializer struct {
encoder *json.Encoder
writer io.Writer
flusher http.Flusher
}

func NewNDJSONEventSerializer(writer io.Writer) *NDJSONEventSerializer {
type NoopFlusher struct{}

func (n NoopFlusher) Flush() {
}

func NewNDJSONEventSerializer(writer io.Writer, flush bool) *NDJSONEventSerializer {
var flusher http.Flusher
if flush {
flusher, _ = writer.(http.Flusher)
}
if flusher == nil {
flusher = NoopFlusher{}
}
return &NDJSONEventSerializer{
encoder: json.NewEncoder(writer),
writer: writer,
flusher: flusher,
}
}

func (s NDJSONEventSerializer) writeNdJsonLine(item interface{}) error {
return s.encoder.Encode(item)
if err := s.encoder.Encode(item); err != nil {
return err
}
s.flusher.Flush()
return nil
}

func (s NDJSONEventSerializer) Checkpoint(partitionID int, cursor string) error {
Expand Down Expand Up @@ -171,6 +201,7 @@ func HandlerWithoutRoute(api API, getLogger func(request *http.Request) logrus.F
http.Error(writer, ErrHandshakePartitionCountMissing.Error(), ErrHandshakePartitionCountMissing.Status())
return
}

if n, err := strconv.Atoi(query.Get("n")); err != nil {
http.Error(writer, err.Error(), http.StatusBadRequest)
return
Expand All @@ -180,18 +211,45 @@ func HandlerWithoutRoute(api API, getLogger func(request *http.Request) logrus.F
return
}
}
var pageSizeHint int

var options Options
if query.Has("pagesizehint") {
if x, err := strconv.Atoi(query.Get("pagesizehint")); err != nil {
http.Error(writer, err.Error(), http.StatusBadRequest)
return
} else {
pageSizeHint = x
options.PageSizeHint = x
}
}
var headers []string

parseMilliseconds := func(key string) (time.Duration, error) {
if query.Has(key) {
intResult, err := strconv.Atoi(query.Get(key))
if err != nil {
return 0, err
}
return time.Duration(intResult) * time.Millisecond, nil
} else {
return 0, nil
}
}

var err error

options.Wait, err = parseMilliseconds("wait")
if err != nil {
http.Error(writer, err.Error(), http.StatusBadRequest)
return
}

options.Stream, err = parseMilliseconds("stream")
if err != nil {
http.Error(writer, err.Error(), http.StatusBadRequest)
return
}

if query.Has("headers") {
headers = strings.Split(strings.TrimSuffix(query.Get("headers"), ","), ",")
options.Headers = strings.Split(strings.TrimSuffix(query.Get("headers"), ","), ",")
}
cursors, err := parseCursors(api.GetPartitionCount(), query)
if err != nil {
Expand All @@ -202,11 +260,13 @@ func HandlerWithoutRoute(api API, getLogger func(request *http.Request) logrus.F
WithField("event", api.GetName()).
WithField("PartitionCount", api.GetPartitionCount()).
WithField("Cursors", cursors).
WithField("PageSizeHint", pageSizeHint).
WithField("Headers", headers)
WithField("PageSizeHint", options.PageSizeHint).
WithField("Headers", options.Headers).
WithField("Wait", options.Wait.Milliseconds()).
WithField("Stream", options.Stream)
fields.Info()
serializer := NewNDJSONEventSerializer(writer)
err = api.FetchEvents(request.Context(), cursors, pageSizeHint, serializer, headers...)
serializer := NewNDJSONEventSerializer(writer, options.Stream != 0)
err = api.FetchEvents(request.Context(), cursors, serializer, options)
if err != nil {
logger.WithField("event", api.GetName()+".fetch_events_error").WithError(err).Info()
http.Error(writer, "Internal server error", http.StatusInternalServerError)
Expand Down Expand Up @@ -307,7 +367,7 @@ type checkpointOrEvent struct {
}

// FetchEvents is a client-side implementation that queries the server and properly deserializes received data.
func (c Client) FetchEvents(ctx context.Context, cursors []Cursor, pageSizeHint int, r EventReceiver, headers ...string) error {
func (c Client) FetchEvents(ctx context.Context, cursors []Cursor, r EventReceiver, options Options) error {
if len(cursors) == 0 {
return ErrCursorsMissing
}
Expand All @@ -321,17 +381,25 @@ func (c Client) FetchEvents(ctx context.Context, cursors []Cursor, pageSizeHint

q := req.URL.Query()
q.Add("n", fmt.Sprintf("%d", c.partitionCount))
if pageSizeHint != DefaultPageSize {
q.Add("pagesizehint", fmt.Sprintf("%d", pageSizeHint))
if options.PageSizeHint != DefaultPageSize {
q.Add("pagesizehint", fmt.Sprintf("%d", options.PageSizeHint))
}
for _, cursor := range cursors {
q.Add(fmt.Sprintf("cursor%d", cursor.PartitionID), fmt.Sprintf("%s", cursor.Cursor))
}
if len(headers) != 0 {
q.Add("headers", strings.Join(headers, ","))
if len(options.Headers) != 0 {
q.Add("headers", strings.Join(options.Headers, ","))
}
req.URL.RawQuery = q.Encode()

if options.Stream != 0 {
q.Add("stream", fmt.Sprintf("%d", options.Stream.Milliseconds()))
}

if options.Wait != 0 {
q.Add("wait", fmt.Sprintf("%d", options.Wait.Milliseconds()))
}

if err := c.requestProcessor(req); err != nil {
return err
}
Expand Down
Loading