-
Notifications
You must be signed in to change notification settings - Fork 1.2k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Any plan to support JSON or JSONB? #7845
Comments
DataFusion supports reading JSON in and some basic things like field access (like However, it doesn't have the range of operatos that postgres does This is similar to #5493 It would be great if someone wanted to start working on adding more native JSON support (starting by spec'ing it out, writing the appropriate tickets), but at the moment I don't know of anyone doing do. |
Indexed field access is only valid for DataFusion/Arrow does not have JSON/JSONB datatype, nor the related json functions. We could solve it in two ways:
The first one is more naturely, the type is more precise, and could seamless integrate with Parquet's JSON/BSON type. The second one could be more faster to land, but there would be some type issues. |
A third way could be to parse JSON data into Arrow One limitation of this approach is that it requires all the JSON records to have the same schema, which is often, but not always, the case. The more dynamic "JSON/BSON" type approach would allow it, but would not support as fast performance |
I also think there is a solution that is part way between what @dojiong proposes in #7845 (comment): store BSON as |
Yeah, JSON is schemaless, it's hard to fix schema to Arrow Struct.
That could be land quickly, without modifying the types of Arrow. But there are some limitations:
If we use binary as BSON, we should consider a pleasure way to convert BSON data to other datatypes. |
A third option (or maybe you meant this by either 1 or 2): have JSON and BSON be extension types, so DataFusion could recognize them via field metadata. I think the Arrow community would be open to eventually have canonical extension types for these, given their prevalence in various formats, including Parquet.
I wonder if |
I didn't find ways to use extension type in datafusion, do you means add extension types support to DataFusion? But DataFusion use DataType in arrow, maybe it's more hard to add JSON type to Arrow.
That a way to archive, then |
Maybe add ExtensionType to Arrow's DataType is more naturely: trait ExtensionType {
fn inner_type(&self) -> &DataType;
// ....
}
enum DataType {
Int8,
Utf8,
// ....
Extension(Arc<dyn ExtensionType>)
}
impl DataType {
fn raw_type(&self) -> &DataType {
let mut ty = self;
while let Self::Extension(ext) = ty {
ty = ext.inner_type();
}
ty
}
} |
@dojiong Discussion of support for extension type can be found at apache/arrow-rs#4472 |
Related proposal for user defined types: #7923 |
We (Coralogix) built our own binary jsonb format (we call it jsona for json arrow) that we are planning on open-sourcing in the next couple months (hopefully Jan/Feb time frame, need to fill in some details) that has some nifty features for vectorized processing. In broad strokes we took the
So you have three child arrays: If this is something other's would be interested in, we would be happy to upstream it into arrow-rs proper as a native array type (not in the sense of adding the the arrow spec but as something with "native" APIs in arrow-rs to avoid the ceremony around dealing with struct arrays) and add support in DataFusion. The benefits of this over just using JSONB in a regular binary array (and the reason we built it) are roughly:
|
Definitely interested! |
Cross posting. There are some interesting ideas in #9103 (comment) |
I think there are two major themes here: Theme 1How to query such semi-structured data. DataFusion today supports the Arrow type system, which while powerful can not describe arbitrary json schemas. For example you have to know up front what all the possible fields are for a StructArray Systems like Postgres handle this by adding a new type and operators (see Postgres Docs for example the -- This array contains the primitive string value:
SELECT '["foo", "bar"]'::jsonb @> '"bar"'::jsonb;
-- String exists as array element:
SELECT '["foo", "bar", "baz"]'::jsonb ? 'bar';
-- String exists as object key:
SELECT '{"foo": "bar"}'::jsonb ? 'foo';
-- Object values are not considered:
SELECT '{"foo": "bar"}'::jsonb ? 'bar'; -- yields false Theme 2: how to store such data efficientlyOnce we have some sort of idea how to query the data then there is the question about how to store / implement those queries efficiently. I think that is where #7845 (comment) from @thinkharderdev and ArroyoSystems/arroyo#477 from @mwylde are coming from Suggestions on paths forwardI personally think it may be tough to reach a consensus on storing JSON vs BSON vs JSONA as they seem to have different usecases and pros/cons (interop vs performance for example) However, I could easily see this supported as "extension types" (basically views over StringArray/BinaryArray). I think we could implement such support today using functions, albiet with a poor UX. Like why can't we do something like -- create jsonb constant from a json string, like `make_json`
select make_jsonb('{"foo": "bar"}');
-- create json string from jsonb
select json_from_jsonb(<binary json>);
-- test if data in binary_column contains the structure {foo: bar}
select jsonb_contains(binary_column, make_jsonb('{"foo": "bar"}');
🤔 |
Our immediate concern (which motivated our json extension type and the changes in https://github.com/ArroyoSystems/arrow-rs/tree/49.0.0/json) is being able to support partial deserialization/serialization of json. The arrow typesystem is necessarily limited compared to the schemas enabled by, for example, json-schema, and for a system like ours it's important to be able to handle real-world json with arbitrarily complex schemas. We do that by taking json schema and attempting to convert it into arrow as far as that's possible; when we encounter a subschema that isn't representable as arrow, we use utf8 with the json extension type. Then, with those arrow-json changes we're able to partially deserialize/serialize the arrow-schema fields, while leaving the unsupported fields as string-encoded json. Similarly, for tables defined via SQL DDL, we support a We would be interested in more native json support, particularly if we could parse the json once and store it in some form that enables more efficient json functions (which seems to be the direction that @thinkharderdev is going in with the tape representation). |
How do people query such types? Do you have native operator support (like postres BTW Adding something new to the arrow spec is possible, but it is quite a process. There is a slightly less intense way here: However, I think it might be good to figure out how to support this initially and then consider trying to standardize. It isn't clear to me there is a consensus on the actual storage / representation yet. It seems there is a consensus on the need to be able to access / query types with an arbitrary structure ( |
We support a few JSON functions (https://doc.arroyo.dev/sql/scalar-functions#json-functions) for querying JSON data, and for more complex needs users can write Rust UDFs with serde_json. In practice both approaches seem pretty widely used. |
From a user's standpoint I've run into this now from a Datafusion SQL standpoint. As a SQL user I am hurting mostly by the lack of a get_json_object() function to use in order to extract keys from within strings. For example, Delta Lake has no native JSON type, so my applications rely on String columns which are known a priori to contain JSON data, and then a common query pattern might be: SELECT id, get_json_object(ctx_data, '$.name') as name FROM sometable WHERE ds = current_date() Where the schema might be:
In the meantime I might just shove a UDF into place. |
I'd very much like it if there were jsonb or json support. I want to use the pg_analytics extension and they use datafusion. |
I think we are now pretty close to being able to support JSON / JSONB via scalar functions The basic idea might be:
This is basically how Array function support is now implemented Here is the library of array functions https://github.com/apache/arrow-datafusion/tree/main/datafusion/functions-array/src Here is the rewrite |
FYI this topic came up in our first meetup #8522 |
@alamb if you're interested in JSON parsing support I might be interested in contributing. We (Pydantic) maintain a very fast Rust JSON parser (generally significantly faster thane serde-json), jiter which I think would be perfect for this case as it allows you to iterate over the JSON, rather than materialize the data before querying it. (Jiter is pretty stable and included in pydantic-core, meaning it's the most downloaded non-std JSON parser for Python) I did a brief experiment with using fn json_contains(args: &[ColumnarValue]) -> Result<ColumnarValue> {
// first argument is the array of json values
let json_values = match &args[0] {
// ColumnarValue::Array(array) => as_generic_binary_array::<i32>(array),
ColumnarValue::Array(array) => as_string_array(array),
_ => panic!("first argument: unexpected argument type"),
};
// second argument is the key name
let key_name = match &args[1] {
ColumnarValue::Scalar(ScalarValue::Utf8(Some(s))) => s.clone(),
_ => panic!("second argument: unexpected argument type"),
};
let array = json_values
.iter()
.map(|json| {
if let Some(json) = json {
Some(jiter_json_contains(json.as_bytes(), &key_name))
} else {
None
}
})
.collect::<BooleanArray>();
Ok(ColumnarValue::from(Arc::new(array) as ArrayRef))
}
fn jiter_json_contains(json_data: &[u8], expected_key: &str) -> bool {
let mut jiter = Jiter::new(json_data, false);
let first_key = match jiter.next_object() {
Ok(Some(key)) => key,
_ => return false,
};
if first_key == expected_key {
return true;
}
// TODO we should skip over the next value, rather than fully parse it, needs a small change to jiter
if jiter.next_value().is_err() {
return false;
}
while let Ok(Some(key)) = jiter.next_key() {
if key == expected_key {
return true;
}
// TODO we should skip over the next value, rather than fully parse it, needs a small change to jiter
if jiter.next_value().is_err() {
return false;
}
}
false
} Using that and querying a set of parquet files (28gb), even without the --------------------- count(*) ---------------------
-- duckdb:
SELECT count(*) FROM records_parquet_local;
-- 102275349 in 99ms
-- datafusion:
SELECT count(*) FROM records;
-- 102275349 in 58ms
--------------------- ilike span_name ---------------------
-- duckdb:
SELECT count(*) FROM records_parquet_local where span_name ilike 'converting traces to database%';
-- 3151751 in 1.805s
-- datafusion:
SELECT count(*) FROM records where span_name ilike 'converting traces to database%';
-- 3151751 in 1068ms
--------------------- ilike span_name, list attributes ---------------------
-- duckdb:
SELECT attributes FROM records_parquet_local where span_name ilike 'converting traces to database%';
-- 7.6s
-- datafusion:
SELECT attributes FROM records where span_name ilike 'converting traces to database%';
-- 13.9s
--------------------- json_contains attributes ---------------------
-- duckdb
SELECT count(*) FROM records_parquet_local where list_contains(json_keys(attributes), 'size');
-- 6165747 in 11.5s
-- datafusion
SELECT count(*) FROM records where json_contains(attributes, 'size');
-- 6165747 in 18.0s The slowdown relative to duckdb of using of course, if you want to use an alternative storage structure for a custom |
For what it’s worth I think having the ability to performantly parse JSON stored as a String or Binary is valuable in and of itself. You don’t always control how the data gets stored. If a binary format improves performance that’s a great option for when you can control the storage format. |
tiny update to my example above, I realised there’s a much better comparison query: -- datafusion
SELECT count(*) FROM records where json_contains(attributes, 'size');
-- 6165747 in 18.0s
-- datafusion
SELECT count(*) FROM records where attributes like '%"size":%';
-- 6165747 in 15.4s
-- datafusion
SELECT count(*) FROM records where attributes ilike '%"size":%';
-- 6165747 in 22.6s so even unoptimised, jiter/ |
Nice -- thank you for the offer and information @samuelcolvin and @adriangb High level proposalI think it would initailly possible to implement JSON support using the existing datafusion apis (and implement it in a separate crate). I think @philippemnoel is also interested in such an endeavor as well. Using the existing extension APIs would both
While I don't think I personally have the bandwidth to help implement the JSON functionality I think the API design is critical to the success of DataFusion and would be very interested in helping make it happen. Specific proposalSo, in this case the solution might look like
Then using the JSON functionality would look something like let ctx = SessionContext::new();
datafusion_json_functions::register_all(&ctx)?;
let results = ctx.sql("SELECT count(*) FROM records where json_contains(attributes, 'size')
.await?
.collect()?; Next StepsOnce we have that working I think we could then discuss with the community if we should incorporate this set of functions into the main datafusion repo for maintenance (given the demand I suspect this would be a popular idea) New RepoIf anyone is interested. I created https://github.com/datafusion-contrib/datafuison-functions-json in the datafusion-contrib organization. I can add you as admin if you are interested. |
This is great news @alamb. Just to confirm, are you happy to use If so, I'm happy to help both implementing the functions and making some alterations in jiter to make things faster. Feel free to invite me, or I can just submit PRs. |
If you are going to write the code I think you should decide how to do so :) If we get to the point where we want to propose moving the code from
That is a great idea. I suspect that will make both DataFusion and jiter better
I invited you -- I don't think I will likely have time to work on / review PRs at this time but I suspect others on this ticket may. Please chime in if you do |
Hi @alamb, We (GreptimeDB) are also planning to support JSON. We are participating in the OSPP (Open Source Promotion Plan), similar to Google Summer of Code, and are currently recruiting students to implement this feature. We aim to find an efficient and elegant solution for storing JSON data. As part of this effort, we also plan to evaluate various storage implementations of JSON formats in the OLAP scenario. Thank you for @thinkharderdev's work. JSONA greatly inspired me. I propose a JSONA variant, which may work well in our scenario. A naive proposal of JSONA variantFor JSON
The |
Thanks @WenyXu -- sounds very neat. FWI I think @samuelcolvin is also thinking about the representation in datafusion-contrib/datafusion-functions-json#2 |
https://github.com/datafusion-contrib/datafusion-functions-json now provides the following methods, I think we're nearly ready for a first release, see datafusion-contrib/datafusion-functions-json#5.
Cast expressions with select * from foo where json_get(attributes, 'bar')::string='ham' Will be rewritten to: select * from foo where json_get_str(attributes, 'bar')='ham' Oh, and as per the micro-benchmarks on pydantic/jiter#84, the performance of these methods should now be on a par with duckdb. @alamb, you might want to take a look at the serde-json (which |
@samuelcolvin this is awesome! I'm very excited to try it in Arroyo. |
I've done some further digging into fast querying of semi-structured data, in particularly I have a prototype library "batson" (binary alternative to (J)SON), pydantic/jiter#136. Batson is heavily inspired by Postgres's JSONB type and snowflake's Variant type (see datafusion-contrib/datafusion-functions-variant#11), and the apache initiative Open Variant. The main advantages of batson are it maintains order of items in objects, ints > Performance:
(for reference, and for those wondering why datafusion-functions-json doesn't use serde-json, batson is 106x and 588x faster than serde-json in the same cases respectively) So far so good. The problem is that batson isn't actually that much faster than
The improvement would be even worse in cases (like ours) where the data is actually being read from an object store. The main difference is that decompression takes up the lion’s share of time when reading parquet files locally. Running a query like In memory:
From parquet:
(happy to share the code I used, if it helps anyone) We can see why that is using samply:
So making batson faster, or implementing it at all isn't immediately that useful. Simple data shredding (as described in Open Variant here) won’t help I believe since datafusion will get and decompress the body column if they’re included in the query, even if no rows in the body column are accessed. ways forward:
Any pointers from those who understand datafusion better than me (👀 @alamb @andygrove @jayzhan211) on the best way forward would be much appreciate. |
What is the the explained plain for this query? Maybe we could push down filter |
This seems like the right idea to pursue to me The
(we have it on in InfluxData) But it is not turned on by default as it isn't faster for all cases yet: #12524 |
I've shared the code I'm using to run these tests: https://github.com/samuelcolvin/batson-perf Unfortunately setting
With However, overall decompression still dominates: You can view this flamegraph properly here. Since these flamegraphs run right to left (you can see when functions were actually running by selecting them and viewing the timeline), the chronology seems to be:
My guess is that |
I think this falls into the category of "pushdown_filters still needs more optimization so it doesn't slow down" -- it is a matter of engineering I think (probably preventing the same column from being decoded more than once e.g. apache/arrow-rs#6454 (comment) |
I think this is a general issue with low-selectivity filters pushed down to the parquet scan. How the row filtering works now is that the column will be decoded once to execute the filter and then decoded again to produce the output batches. If the decoding time is non-trivial (eg it requires zstd decompression of a lot of data) and the filter is not particularly selective then the redundant decoding can easily more than offset the cost of just materializing the whole column and filtering. When the row filtering was initially implemented we discussed keeping the decoded data cached but ended up deciding against it because it can potentially consume a lot of memory |
Yes I would like to revisit this assumption eventually but I need to study the code more to convince myself it is possible |
FWIW arrow-json doesn't make use of serde-json for reading or writing anymore - apache/arrow-rs#3479 apache/arrow-rs#5318 |
Is there any way to do per-batch rewrites, or otherwise modify how an operator gets applied? The "traditional" way to getting performance from JSON data in an analytical system is to dynamically create columns for keys (e.g. ClickHouse). For our use case that breaks down because the data is extremely variable, e.g. in one place |
FWIW DataFusion has pretty strong assumptions that the schema matches (aka inserting extra columns would likely be challenging) One thing to potentially look into is building an "inverted index" externally maybe 🤔 That is basically "state of the art" in text search / search engines 🤔 |
Is your feature request related to a problem or challenge?
Datafusion does not support JSON/JSONB datatype. Is there a plan to support it in the future?
Describe the solution you'd like
support JSON/JSONB data
Describe alternatives you've considered
Treat string/binary data as json/jsonb in json functions or json operators.
Additional context
json functions in postgres: functions-json
The text was updated successfully, but these errors were encountered: