-
Notifications
You must be signed in to change notification settings - Fork 217
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Implement high-level streaming API #59
Comments
i like the look of this but i have a million questions: what's the is the what is the is i really like the idea of the |
'path' option is for selecting which parts to stream. Usually expected large document has a single large object or array suitable for streaming. Other parts are not streamed, just collected in RootObj so the handling functions may access not streamed data when needed. 'stream' inside RootObj marks paths given in path option and present in a processed part of a document. If there is only one stream path provided, handle_item and handle_kv see 'stream' exactly in place of a stream being processed (making their first argument redundant). 'terminate/2' callback also sees 'stream' in place of where streaming was performed. RootObj is used for accessing document outside of stream paths. Maybe I need to provide example of a document and corresponding handler calls. 'emit' is for generating user events, not jsx ones. That's like a filter: you feed a raw JSON and get some processed results (aggregated metrics, filtered and re-encoded items, etc.) so user can avoid side-effects in callback module. Not sure if emitting is really needed but I think some way of callback-initiated interruption should be possible. I'm not good at naming. I don't insist on these names, just used them to show a concept. |
The simple example: Imagine we are performing {
"total_rows": 3,
"rows": [
{"id": "doc1", "key": "doc1", "value": {"rev": "4324BB"}},
{"id": "doc2", "key": "doc2", "value": {"rev":"2441HF"}},
{"id": "doc3", "key": "doc3", "value": {"rev":"74EC24"}}
],
"offset": 0
} Obviously, if we are going to stream this document, the source of items is {ok, Parser} = jsx_stream:init(every_nth, every_nth:init(2), [{path, [<<"rows">>]}]). The main loop will accept data chunks from the data source (HTTP request) and feed them to the parser: rcv_parse_loop(Req, Parser) ->
{ok, Chunk} = recv_chunk(Req),
handle_chunk(Chunk, Parser, Req).
handle_chunk(Chunk, Parser, Req) ->
case jsx_stream:input(Parser, Chunk) of
{more, Parser1} -> % continue receive-parse loop
rcv_parse_loop(Req, Parser1);
{end_json, Total} -> % Document has ended
eof = recv_chunk(Req), % Ensure no data is left on the stream
{ok, Total};
{emit, Key, Parser1} -> % fired on every second document
do_something(Key), % Perform the required action
handle_chunk(<<>>, Parser, Req) % Continue parsing without receiving new chunk
end. The corresponding -module(every_nth).
-compile(export_all).
-record(state, {every}).
init(N) ->
#state{every=N}.
handle_item(_Path, Index, Item, _RootObj, #state{every = N} = State) ->
case Index div N of
0 -> % Item meets the condition, emit its key
Key = proplists:get_value(<<"key">>, Item),
{emit, Key, State};
_ -> % No match, proceed
{ok, State}
end.
terminate(_, RootObj, #state{}) ->
Total = prolists:get_value(<<"total_rows">>, RootObj),
{ok, Total}. During the parsing RootObj = [{<<"total_rows">>, 3},{<<"rows">>, stream}]. Actual representation may differ (reversed key order, maps, wrapped in 1-tuple, etc.) but this is all we know about the document at the time of calling this function. handle_item([<<"rows">>], 1,
[{<<"id">>,<<"doc1">>}, {<<"key">>,<<"doc1">>}, {<<"value">>, [{<<"rev">>,<<"4324BB">>}]}],
[{<<"total_rows">>, 3},{<<"rows">>, stream}], {state, 2}) ->
{ok, {state, 2}};
handle_item([<<"rows">>], 2,
[{<<"id">>,<<"doc2">>}, {<<"key">>,<<"doc2">>}, {<<"value">>, [{<<"rev">>,<<"2441HF">>}]}],
[{<<"total_rows">>, 3},{<<"rows">>, stream}], {state, 2}) ->
{emit, <<"doc2">>, {state, 2}};
handle_item([<<"rows">>], 3,
[{<<"id">>,<<"doc3">>}, {<<"key">>,<<"doc3">>}, {<<"value">>, [{<<"rev">>,<<"74EC24">>}]}],
[{<<"total_rows">>, 3},{<<"rows">>, stream}], {state, 2}) ->
{ok, {state, 2}}. When stream ends, the parser has collected the key terminate(end_json, [{<<"total_rows">>, 3},{<<"rows">>, stream}, {<<"offset">>, 0}], {state,2}) ->
{ok, 3}. |
ah i see now. i like your interface in general but |
|
Any news here? |
i half started working on an interface like: handle_item(Path, Item, State) -> ... where { "books": [
{ "title": ...,
"author": ...,
..
},
{ "title": ...,
"author": ...,
...
}
]} that covers both the // json object
{ "books": [
{ "title": "a wrinkle in time",
"author": "madeleine l'engel",
"editions": [1962, 1978, 2007]
},
{ "title": "all creatues great and small",
"author": "james herriot",
"editions": [1972, 1992, 2004, 2014]
}
]} would make the following calls to the interface: handle_item([<<"books">>, 0, <<"title">>], <<"a wrinkle in time">>, State)
handle_item([<<"books">>, 0, <<"author">>], <<"madeleine l'engel">>, State)
handle_item([<<"books">>, 0, <<"editions">>, 0], 1962, State)
handle_item([<<"books">>, 0, <<"editions">>, 1], 1978, State)
handle_item([<<"books">>, 0, <<"editions">>, 2], 2007, State)
handle_item([<<"books">>, 1, <<"title">>], <<"all creatures great and small">>, State)
handle_item([<<"books">>, 1, <<"author">>], <<"james herriot">>, State)
handle_item([<<"books">>, 1, <<"editions">>, 0], 1972, State)
handle_item([<<"books">>, 1, <<"editions">>, 1], 1992, State)
handle_item([<<"books">>, 1, <<"editions">>, 2], 2004, State)
handle_item([<<"books">>, 1, <<"editions">>, 3], 2014, State)
terminate(State) valid returns from handle_item would be how's that look? |
Well, merging The way of streaming primitive values you propose is something I'm trying to avoid. I'm not sure about all use cases, but I suppose most users will have functions handling whole objects (e.g. Also when you append current item's key/index to the path you make it quite tricky to extract it. User has to strip common prefix (and check if it didn't change), then check if the index has changed, populate current object according to the remaining Path. When common prefix or index changes, and when handling terminate finally user's do_something_with_book will be called. One more example (I hope it helps to understand the simplicity I want the new API to have): {ok, JSON} = get_whole_body(JSON),
Library = jsx:decode(JSON),
Books = proplists:get_value(<<"books">>, Library, []),
lists:foldl(fun(Book, N) -> book_db:store(Book), N+1 end, 0, Books). It obviously first does nothing while receiving data, then burns CPU while decoding, finally it overloads DB interface possibly causing other DB clients to have large latencies. The callback module for the API I proposed will look like this:
|
your point about difficulty in matching at the end of the handle_item([<<"books">>, 0], _, _) -> ...
handle_item([<<"books">>, 0, <<"author">>], _, _) -> ... should probably be handle_item([<<"books">>], 0, _, _) -> ...
handle_item([<<"books">>, 0], <<"author">> , _, _) -> ... i think i see what you mean by i also think i see what you mean with your ijson example. you want to use the path to select what is returned but you want whatever is at that path returned as discrete items. an interface like: {ok, Filter} = jsx:start_filter(CallBackMod, Paths, InitialState),
jsx:filter(Filter, JSONChunk) where so your example would be called as: {ok, Filter} = jsx:start_filter(my_callback_mod, [[<<"books">>]], []),
0 = jsx:filter(Filter, JSONChunk),
1 = jsx:filter(Filter, JSONChunk),
...
eof = jsx:filter(Filter, JSONChunk). and my_callback_mod would be: init(_) -> 0.
handle_item([<<"books">>], Index, Book, Index) ->
book_db:store(Book),
{ok, Index, Index + 1}.
terminate(N) -> {ok, eof}. does this look like what you want? |
This: handle_item([<<"books">>, 0], <<"author">> , Author, State) -> requires user to implement state machine to be able to know when the book ends. Also you need to send special item when the stream ends to explicitly close the last book. Also the first list is always reconstructed, which is unneeded overhead. Reversing that list could help with reconstruction but could affect clarity and induce even more prefix-tracking bugs. As a user I would definitely not like writing such necessarily-stateful callback modules. The way of handling (deep) values inside "editions" is much more interesting here. RootObj will be tiny when user knows where the huge object/array is. Streamed paths are not saved in RootObj (see my first example), so absolutely no overhead here. And if user has specified the wrong path, the value of RootObj in terminate will help with debugging (opposed to just dropping anything outside of the stream). Your example with start_filter (if I understand it well) will not work because it presumes item count is equal to chunk count which is almost never true (passed and returned values have to be more complex to deal with that, so the controlling logic will not be trivial). |
And now we have several API drafts and cannot agree on what of them is better. |
that's not the chunk count, it's an arbitrary term returned (optionally) as in emit. i just used it to track books processed |
Hi there. Any update on this? |
Thanks, I'll take a look. |
Here's an API draft.
Creating a new stream is something gen_server-like:
Any jsx decoder options (except
stream
) are accepted too. User may provide multiple path options to stream multiple arrays/objects.Feeding data
Callback module handles input items or key-value pairs:
Optionally callback module may emit some events:
When callback emits an event, the result of input/2 changes. User needs to input empty strings to ensure buffer is completely processed:
User may perform calls to handler module:
Calls are handled by callback function:
Of course, state may change when handling a call.
When JSON document ends,
terminate
callback is called:If the document ends before the end of data, remaining bytes are returned:
The text was updated successfully, but these errors were encountered: