Skip to content

Commit

Permalink
[Enhancement] Support infer struct type from parquet in files() (back…
Browse files Browse the repository at this point in the history
…port #50481) (#50571)

Co-authored-by: wyb <[email protected]>
  • Loading branch information
mergify[bot] and wyb authored Sep 2, 2024
1 parent f7f5af4 commit fb9009b
Show file tree
Hide file tree
Showing 3 changed files with 61 additions and 5 deletions.
46 changes: 46 additions & 0 deletions be/src/exec/parquet_schema_builder.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ static Status get_parquet_type_from_group(const ::parquet::schema::NodePtr& node
static Status get_parquet_type_from_primitive(const ::parquet::schema::NodePtr& node, TypeDescriptor* type_desc);
static Status get_parquet_type_from_list(const ::parquet::schema::NodePtr& node, TypeDescriptor* type_desc);
static Status get_parquet_type_from_map(const ::parquet::schema::NodePtr& node, TypeDescriptor* type_desc);
static Status try_to_infer_struct_type(const ::parquet::schema::NodePtr& node, TypeDescriptor* type_desc);

Status get_parquet_type(const ::parquet::schema::NodePtr& node, TypeDescriptor* type_desc) {
if (node->is_group()) {
Expand Down Expand Up @@ -121,6 +122,11 @@ static Status get_parquet_type_from_group(const ::parquet::schema::NodePtr& node
return get_parquet_type_from_map(node, type_desc);
}

auto st = try_to_infer_struct_type(node, type_desc);
if (st.ok()) {
return Status::OK();
}

// Treat unsupported types as VARCHAR.
*type_desc = TypeDescriptor::create_varchar_type(TypeDescriptor::MAX_VARCHAR_LENGTH);
return Status::OK();
Expand Down Expand Up @@ -217,4 +223,44 @@ static Status get_parquet_type_from_map(const ::parquet::schema::NodePtr& node,
return Status::OK();
}

/*
try to infer struct type from group node.
parquet does not have struct type, there is no struct definition in parquet.
try to infer like this.
group <name> {
type field0;
type field1;
...
}
*/
static Status try_to_infer_struct_type(const ::parquet::schema::NodePtr& node, TypeDescriptor* type_desc) {
// 1st level.
// group name
DCHECK(node->is_group());

auto group_node = std::static_pointer_cast<::parquet::schema::GroupNode>(node);
int field_count = group_node->field_count();
if (field_count == 0) {
return Status::Unknown("unknown type");
}

// 2nd level.
// field
std::vector<std::string> field_names;
std::vector<TypeDescriptor> field_types;
field_names.reserve(field_count);
field_types.reserve(field_count);
for (auto i = 0; i < group_node->field_count(); ++i) {
const auto& field = group_node->field(i);
field_names.emplace_back(field->name());
auto& field_type_desc = field_types.emplace_back();
RETURN_IF_ERROR(get_parquet_type(field, &field_type_desc));
}

*type_desc = TypeDescriptor::create_struct_type(field_names, field_types);

return Status::OK();
}

} //namespace starrocks
18 changes: 14 additions & 4 deletions be/test/exec/parquet_scanner_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -701,16 +701,26 @@ TEST_F(ParquetScannerTest, get_file_schema) {
{"col_json_map_timestamp",
TypeDescriptor::create_map_type(TypeDescriptor::from_logical_type(TYPE_DATETIME),
TypeDescriptor::from_logical_type(TYPE_INT))},
{"col_json_struct", TypeDescriptor::create_varchar_type(1048576)},
{"col_json_struct",
TypeDescriptor::create_struct_type({"s0", "s1"}, {TypeDescriptor::from_logical_type(TYPE_INT),
TypeDescriptor::create_varchar_type(1048576)})},
{"col_json_list_list", TypeDescriptor::create_array_type(TypeDescriptor::create_array_type(
TypeDescriptor::from_logical_type(TYPE_INT)))},
{"col_json_map_list",
TypeDescriptor::create_map_type(
TypeDescriptor::create_varchar_type(1048576),
TypeDescriptor::create_array_type(TypeDescriptor::from_logical_type(TYPE_INT)))},
{"col_json_list_struct", TypeDescriptor::create_array_type(TypeDescriptor::create_varchar_type(1048576))},
{"col_json_struct_struct", TypeDescriptor::create_varchar_type(1048576)},
{"col_json_struct_string", TypeDescriptor::create_varchar_type(1048576)},
{"col_json_list_struct", TypeDescriptor::create_array_type(TypeDescriptor::create_struct_type(
{"s0", "s1"}, {TypeDescriptor::from_logical_type(TYPE_INT),
TypeDescriptor::create_varchar_type(1048576)}))},
{"col_json_struct_struct",
TypeDescriptor::create_struct_type(
{"s0", "s1"},
{TypeDescriptor::from_logical_type(TYPE_INT),
TypeDescriptor::create_struct_type({"s2"}, {TypeDescriptor::from_logical_type(TYPE_INT)})})},
{"col_json_struct_string",
TypeDescriptor::create_struct_type({"s0", "s1"}, {TypeDescriptor::from_logical_type(TYPE_INT),
TypeDescriptor::create_varchar_type(1048576)})},
{"col_json_json_string", TypeDescriptor::create_varchar_type(1048576)}}},
{test_exec_dir + "/test_data/parquet_data/decimal.parquet",
{{"col_decimal32", TypeDescriptor::create_decimalv3_type(TYPE_DECIMAL32, 9, 2)},
Expand Down
2 changes: 1 addition & 1 deletion test/sql/test_pipe/R/basic
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ desc t1;
-- result:
col_int int YES true None
col_map map<varchar(1048576),int> YES false None
col_struct varchar(1048576) YES false None
col_struct struct<a varchar(1048576), b int(11)> YES false None
-- !result
select count(*) from t1;
-- result:
Expand Down

0 comments on commit fb9009b

Please sign in to comment.