Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement high-level streaming API #59

Open
stolen opened this issue Jul 15, 2014 · 15 comments
Open

Implement high-level streaming API #59

stolen opened this issue Jul 15, 2014 · 15 comments

Comments

@stolen
Copy link

stolen commented Jul 15, 2014

Here's an API draft.

Creating a new stream is something gen_server-like:

{ok, Stream} = jsx_stream:init(my_callback_module, {state, initial},
       [{path, [<<"result">>,<<"items">>]}]).

Any jsx decoder options (except stream) are accepted too. User may provide multiple path options to stream multiple arrays/objects.

Feeding data

{more, Stream1} = jsx_stream:input(Stream, Data).

Callback module handles input items or key-value pairs:

-module(my_callback_module).
% called when specified path contains an array
handle_item(Path, Index, Item, RootObj, State) ->
  % Path = [<<"result">>,<<"items">>] as specified by init option
  % Index is integer item index in array, 1..Length
  % Item is actual array item — string, number, boolean, 'none', list or object
  % RootObj is root JSON object collected so far with magic 'stream' item 
  %        in place of configured stream path. E.g.
  %   RootObj = [
  %        {<<"foo">>, 1},
  %        {<<"result">>, [
  %          {<<"a">>, <<"z">>},
  %          {<<"items">>, stream}
  %          ]}
  %        ]
  {ok, State#state{last_index = Index}}.

% called for each key-value pair when path contains an object
handle_kv(Path, Key, Value, RootObj, State) ->
  {ok, State}.

Optionally callback module may emit some events:

handle_item(Path, Index, Item, RootObj, #state{last_index = 100} = State) ->
  Event = {wow, hundred, elements},
  {emit, Event, State#state{last_index = Index}};

When callback emits an event, the result of input/2 changes. User needs to input empty strings to ensure buffer is completely processed:

handle_data_from_socket(Data, Stream) ->
  case jsx_stream:input(Stream, Data) of
    {more, Stream1} ->
      {ok, Stream1};
    {emit, Event, Stream1} ->
      handle_event(Event),
      handle_data_from_socket(<<>>, Stream1)
  end.

User may perform calls to handler module:

{ItemsProcessed, Stream1} = jsx_stream:call(Stream, items_processed).

Calls are handled by callback function:

handle_call(items_processed, RootObj, #state{last_index = Index} = State) ->
  {Index, State}.

Of course, state may change when handling a call.

When JSON document ends, terminate callback is called:

terminate(end_json, RootObj, #state{last_index = Index} = State) ->
  {ok, Index}.

If the document ends before the end of data, remaining bytes are returned:

case jsx_stream:input(Stream, FinalChunk) of
  {end_json, LastIndex} ->
    {ok, LastIndex};
  {end_json, LastIndex, BytesLeft} ->
    do_something(LastIndex),
    handle_next_document(BytesLeft);
  {more, Stream1} ->
    handle_unfinished_stream(Stream1)
end.
@talentdeficit
Copy link
Owner

i like the look of this but i have a million questions:

what's the path option for? is it a way to selectively ignore parts of the stream?

is the stream token inserted in RootObj a placeholder for where parsing has currently paused?

what is the RootObj used for?

is emit for inserting new events into the stream that may not be present?

i really like the idea of the gen_server style callback module for streaming (particularly for direct access to the internal state) but i'd probably want different names to make it clear there's no seperate process involved

@stolen
Copy link
Author

stolen commented Jul 16, 2014

'path' option is for selecting which parts to stream. Usually expected large document has a single large object or array suitable for streaming. Other parts are not streamed, just collected in RootObj so the handling functions may access not streamed data when needed.

'stream' inside RootObj marks paths given in path option and present in a processed part of a document. If there is only one stream path provided, handle_item and handle_kv see 'stream' exactly in place of a stream being processed (making their first argument redundant). 'terminate/2' callback also sees 'stream' in place of where streaming was performed.

RootObj is used for accessing document outside of stream paths.

Maybe I need to provide example of a document and corresponding handler calls.

'emit' is for generating user events, not jsx ones. That's like a filter: you feed a raw JSON and get some processed results (aggregated metrics, filtered and re-encoded items, etc.) so user can avoid side-effects in callback module. Not sure if emitting is really needed but I think some way of callback-initiated interruption should be possible.

I'm not good at naming. I don't insist on these names, just used them to show a concept.

@stolen
Copy link
Author

stolen commented Jul 16, 2014

The simple example:

Imagine we are performing GET /somedatabase/_all_docs on CouchDB and do something with every N-th document stored there as soon as we meet it. The response body according to the docs is like this (I moved "offset" field to the end to show how terminate/3 could be useful:

{
  "total_rows": 3,
  "rows": [
    {"id": "doc1", "key": "doc1", "value": {"rev": "4324BB"}},
    {"id": "doc2", "key": "doc2", "value": {"rev":"2441HF"}},
    {"id": "doc3", "key": "doc3", "value": {"rev":"74EC24"}}
  ],
  "offset": 0
}

Obviously, if we are going to stream this document, the source of items is "rows" key.
So, we initialize the stream parser and tell our callback module to emit every second item in it:

{ok, Parser} = jsx_stream:init(every_nth, every_nth:init(2), [{path, [<<"rows">>]}]).

The main loop will accept data chunks from the data source (HTTP request) and feed them to the parser:

rcv_parse_loop(Req, Parser) ->
  {ok, Chunk} = recv_chunk(Req),
  handle_chunk(Chunk, Parser, Req).

handle_chunk(Chunk, Parser, Req) ->
  case jsx_stream:input(Parser, Chunk) of
    {more, Parser1} -> % continue receive-parse loop
      rcv_parse_loop(Req, Parser1);
    {end_json, Total} -> % Document has ended
      eof = recv_chunk(Req), % Ensure no data is left on the stream
      {ok, Total};
    {emit, Key, Parser1} -> % fired on every second document
      do_something(Key), % Perform the required action
      handle_chunk(<<>>, Parser, Req) % Continue parsing without receiving new chunk
  end.

The corresponding every_nth module does single thing: emits a key of the every Nth document in the stream. Finally it returns a value of "total_rows" which is external to the stream:

-module(every_nth).
-compile(export_all).
-record(state, {every}).

init(N) -> 
  #state{every=N}.

handle_item(_Path, Index, Item, _RootObj, #state{every = N} = State) ->
  case Index div N of
    0 -> % Item meets the condition, emit its key
      Key = proplists:get_value(<<"key">>, Item),
      {emit, Key, State};
    _ -> % No match, proceed
      {ok, State}
  end.

terminate(_, RootObj, #state{}) ->
  Total = prolists:get_value(<<"total_rows">>, RootObj),
  {ok, Total}.

During the parsing every_nth:handle_item is called three times. RootObj is the same for these calls:

RootObj = [{<<"total_rows">>, 3},{<<"rows">>, stream}].

Actual representation may differ (reversed key order, maps, wrapped in 1-tuple, etc.) but this is all we know about the document at the time of calling this function.
So, args and return values for calls during the stream:

handle_item([<<"rows">>], 1, 
    [{<<"id">>,<<"doc1">>}, {<<"key">>,<<"doc1">>}, {<<"value">>, [{<<"rev">>,<<"4324BB">>}]}],
    [{<<"total_rows">>, 3},{<<"rows">>, stream}], {state, 2}) ->
  {ok, {state, 2}};
handle_item([<<"rows">>], 2, 
    [{<<"id">>,<<"doc2">>}, {<<"key">>,<<"doc2">>}, {<<"value">>, [{<<"rev">>,<<"2441HF">>}]}],
    [{<<"total_rows">>, 3},{<<"rows">>, stream}], {state, 2}) ->
  {emit, <<"doc2">>, {state, 2}};
handle_item([<<"rows">>], 3, 
    [{<<"id">>,<<"doc3">>}, {<<"key">>,<<"doc3">>}, {<<"value">>, [{<<"rev">>,<<"74EC24">>}]}],
    [{<<"total_rows">>, 3},{<<"rows">>, stream}], {state, 2}) ->
  {ok, {state, 2}}.

When stream ends, the parser has collected the key "offset" which goes after the streamed path. So, terminate is called with different RootObj:

terminate(end_json, [{<<"total_rows">>, 3},{<<"rows">>, stream}, {<<"offset">>, 0}], {state,2}) ->
  {ok, 3}.

@talentdeficit
Copy link
Owner

ah i see now. i like your interface in general but RootObj won't really work without first parsing the entire json body. still, i really like the idea of selecting which parts of the json you are interested in via Path and the idea of a gen_server like interface. let me think about it for a day or two and work on it a bit

@stolen
Copy link
Author

stolen commented Jul 19, 2014

RootObj may work if server guarantees field order. Yes, it is uncommon for JSON, but it is possible (especially when JSON is generated from proplist).
Also if this API is good enough it may be reused later in parsers of other formats (msgpack, bed, etc.) where RootObj could be more predictable.

@stolen
Copy link
Author

stolen commented Jul 23, 2014

Any news here?
I may need this api soon, so I'm going to start implementing it.
Please tell what you think of naming, arity, argument order, etc. Changing the interface later will break some code.

@talentdeficit
Copy link
Owner

i half started working on an interface like:

handle_item(Path, Item, State) -> ...

where Path would be something like [<<"books">>, 0, <<"author">>] as in:

{ "books": [
  { "title": ...,
    "author": ...,
    ..
  },
  { "title": ...,
    "author": ...,
    ...
  }
]}

that covers both the Path and Index components of the interface you sketched out. i still don't know if RootObj is doable in a reasonable manner and i'm not sure what to do when the value in a kv pair is an object or an array. possibly something like:

// json object
{ "books": [
  { "title": "a wrinkle in time",
    "author": "madeleine l'engel",
    "editions": [1962, 1978, 2007]
  },
  { "title": "all creatues great and small",
    "author": "james herriot",
    "editions": [1972, 1992, 2004, 2014]
  }
]}

would make the following calls to the interface:

handle_item([<<"books">>, 0, <<"title">>], <<"a wrinkle in time">>, State)
handle_item([<<"books">>, 0, <<"author">>], <<"madeleine l'engel">>, State)
handle_item([<<"books">>, 0, <<"editions">>, 0], 1962, State)
handle_item([<<"books">>, 0, <<"editions">>, 1], 1978, State)
handle_item([<<"books">>, 0, <<"editions">>, 2], 2007, State)
handle_item([<<"books">>, 1, <<"title">>], <<"all creatures great and small">>, State)
handle_item([<<"books">>, 1, <<"author">>], <<"james herriot">>, State)
handle_item([<<"books">>, 1, <<"editions">>, 0], 1972, State)
handle_item([<<"books">>, 1, <<"editions">>, 1], 1992, State)
handle_item([<<"books">>, 1, <<"editions">>, 2], 2004, State)
handle_item([<<"books">>, 1, <<"editions">>, 3], 2014, State)
terminate(State)

valid returns from handle_item would be {ok, State} or {emit, UserData, State}

how's that look?

@stolen
Copy link
Author

stolen commented Jul 24, 2014

Well, merging handle_kv and handle_item was something I was thinking about. Integer vs binary index/key allows to easily distinguish parent type.

The way of streaming primitive values you propose is something I'm trying to avoid. I'm not sure about all use cases, but I suppose most users will have functions handling whole objects (e.g. send_book_entry_to_websocket(BookObj, Socket)) and forcing them to do deep objec reconstruction themselfes makes this API too complex to use (just a bit easier than current one).
Again, see how they do it in Python: https://github.com/isagalaev/ijsonobjects = ijson.items(f, 'earth.europe.item'). It's not very flexible, but it returns one 'earth.europe' element at a time no matter how deep it is. I think jsx should be able to do at least the same thing.

Also when you append current item's key/index to the path you make it quite tricky to extract it. User has to strip common prefix (and check if it didn't change), then check if the index has changed, populate current object according to the remaining Path. When common prefix or index changes, and when handling terminate finally user's do_something_with_book will be called.
So, for trivial case instead of two functions of three lines you force user to write tens of lines of buggy stateful boilerplate code. I think that this boilerplate should be inside jsx.

One more example (I hope it helps to understand the simplicity I want the new API to have):
The old naive inefficient code:

{ok, JSON} = get_whole_body(JSON),
Library = jsx:decode(JSON),
Books = proplists:get_value(<<"books">>, Library, []),
lists:foldl(fun(Book, N) -> book_db:store(Book), N+1 end, 0, Books).

It obviously first does nothing while receiving data, then burns CPU while decoding, finally it overloads DB interface possibly causing other DB clients to have large latencies.

The callback module for the API I proposed will look like this:

erlang
-module(book_streamer).
-compile(export_all).
handle_item(_, _, , Book, N) ->
book_db:store(Book),
{ok, N+1}.
terminate(
, _, N) ->
N.


I don't need to do tricky (and costly) list comparisons and tracking any extra state here. Simple solution for a simple task. And most tasks are simple.

Could you show the equivalent callback module for the API you propose? It should be immune to changing main array path from "books" to deep "result.books".

As for RootObj in `handle_item` I'll tell about our case.
There is a Python backend which returns the call result as a huge (and sometimes quite small) list of objects under "results" key. The backend is written by python guys which are mostly unaware of processing data on-the-fly.
We needed to implement some trivial sanity-check: backend says how long the array is. At first it was done with a "length" key. Usually the whole response is encoded such way that "length" appears before "results" — it is added earlier and compares less. So the parser callback may abort processing the contents when "length" is 3 and result index is over 3. Otherwise we would have to parse the whole document (say, 1000 results) just to throw it away.
Of course, the results length is metadata and should be declared in HTTP header (and now it is), but there are lots of people in internet who don't understand what these headers are for, but still provide some unique services.

Maybe RootObj should be handled in separate callback (say, `start_stream`) explicitly enabled (or just optional, checked for existence). Or even that callback should be merged with `terminate` to something like `handle_root_event(Event, RootObj, State) where Event :: end_json | {stream, path()}`. I don't know. I just think that already collected data should be accessible before the very end of the document.

@talentdeficit
Copy link
Owner

your point about difficulty in matching at the end of the Path is a good one.

handle_item([<<"books">>, 0], _, _) -> ...
handle_item([<<"books">>, 0, <<"author">>], _, _) -> ...

should probably be

handle_item([<<"books">>], 0, _, _) -> ...
handle_item([<<"books">>, 0], <<"author">> , _, _) -> ...

i think i see what you mean by RootObj. it's just the current state of the collected term? it adds a lot of overhead for what i imagine will be a rarely used term. most users of a streaming interface will be using it to filter or act on json results without saving them i suspect. if you really need it there's an (undocumented) interface in jsx_to_term you could use to build the object as events are read and store it in state, maybe?

i also think i see what you mean with your ijson example. you want to use the path to select what is returned but you want whatever is at that path returned as discrete items. an interface like:

{ok, Filter} = jsx:start_filter(CallBackMod, Paths, InitialState),
jsx:filter(Filter, JSONChunk)

where CallBackMod is your callback module, Paths is a list of paths (as above) you want returned and InitialState is passed to CallBackMod:init/1

so your example would be called as:

{ok, Filter} = jsx:start_filter(my_callback_mod, [[<<"books">>]], []),
0 = jsx:filter(Filter, JSONChunk),
1 = jsx:filter(Filter, JSONChunk),
...
eof = jsx:filter(Filter, JSONChunk).

and my_callback_mod would be:

init(_) -> 0.

handle_item([<<"books">>], Index, Book, Index) ->
  book_db:store(Book),
  {ok, Index, Index + 1}.

terminate(N) -> {ok, eof}.

does this look like what you want?

@stolen
Copy link
Author

stolen commented Jul 24, 2014

This:

handle_item([<<"books">>, 0], <<"author">> , Author, State) -> 

requires user to implement state machine to be able to know when the book ends. Also you need to send special item when the stream ends to explicitly close the last book. Also the first list is always reconstructed, which is unneeded overhead. Reversing that list could help with reconstruction but could affect clarity and induce even more prefix-tracking bugs. As a user I would definitely not like writing such necessarily-stateful callback modules. The way of handling (deep) values inside "editions" is much more interesting here.

RootObj will be tiny when user knows where the huge object/array is. Streamed paths are not saved in RootObj (see my first example), so absolutely no overhead here. And if user has specified the wrong path, the value of RootObj in terminate will help with debugging (opposed to just dropping anything outside of the stream).

Your example with start_filter (if I understand it well) will not work because it presumes item count is equal to chunk count which is almost never true (passed and returned values have to be more complex to deal with that, so the controlling logic will not be trivial).

@stolen
Copy link
Author

stolen commented Jul 24, 2014

And now we have several API drafts and cannot agree on what of them is better.
Why don't we ask the potential users (erlang-questions mailing list) which one would they prefer?

@talentdeficit
Copy link
Owner

that's not the chunk count, it's an arbitrary term returned (optionally) as in emit. i just used it to track books processed

@sumerman
Copy link

Hi there. Any update on this?

@talentdeficit
Copy link
Owner

parts of what was talked about here i've implemented as a seperate library here

there's also a different interface to working with json in erlang i've started here that might be of interest

@sumerman
Copy link

sumerman commented Feb 2, 2015

Thanks, I'll take a look.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

3 participants