Skip to content

Commit

Permalink
[Enhancement] Support infer struct type from parquet in files() (Star…
Browse files Browse the repository at this point in the history
…Rocks#50481)

Signed-off-by: wyb <[email protected]>
  • Loading branch information
wyb authored and va-os-commits committed Sep 10, 2024
1 parent cf62a95 commit 1028ffc
Show file tree
Hide file tree
Showing 4 changed files with 66 additions and 10 deletions.
46 changes: 46 additions & 0 deletions be/src/exec/parquet_schema_builder.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ static Status get_parquet_type_from_group(const ::parquet::schema::NodePtr& node
static Status get_parquet_type_from_primitive(const ::parquet::schema::NodePtr& node, TypeDescriptor* type_desc);
static Status get_parquet_type_from_list(const ::parquet::schema::NodePtr& node, TypeDescriptor* type_desc);
static Status get_parquet_type_from_map(const ::parquet::schema::NodePtr& node, TypeDescriptor* type_desc);
static Status try_to_infer_struct_type(const ::parquet::schema::NodePtr& node, TypeDescriptor* type_desc);

Status get_parquet_type(const ::parquet::schema::NodePtr& node, TypeDescriptor* type_desc) {
if (node->is_group()) {
Expand Down Expand Up @@ -121,6 +122,11 @@ static Status get_parquet_type_from_group(const ::parquet::schema::NodePtr& node
return get_parquet_type_from_map(node, type_desc);
}

auto st = try_to_infer_struct_type(node, type_desc);
if (st.ok()) {
return Status::OK();
}

// Treat unsupported types as VARCHAR.
*type_desc = TypeDescriptor::create_varchar_type(TypeDescriptor::MAX_VARCHAR_LENGTH);
return Status::OK();
Expand Down Expand Up @@ -217,4 +223,44 @@ static Status get_parquet_type_from_map(const ::parquet::schema::NodePtr& node,
return Status::OK();
}

/*
try to infer struct type from group node.
parquet does not have struct type, there is no struct definition in parquet.
try to infer like this.
group <name> {
type field0;
type field1;
...
}
*/
static Status try_to_infer_struct_type(const ::parquet::schema::NodePtr& node, TypeDescriptor* type_desc) {
// 1st level.
// group name
DCHECK(node->is_group());

auto group_node = std::static_pointer_cast<::parquet::schema::GroupNode>(node);
int field_count = group_node->field_count();
if (field_count == 0) {
return Status::Unknown("unknown type");
}

// 2nd level.
// field
std::vector<std::string> field_names;
std::vector<TypeDescriptor> field_types;
field_names.reserve(field_count);
field_types.reserve(field_count);
for (auto i = 0; i < group_node->field_count(); ++i) {
const auto& field = group_node->field(i);
field_names.emplace_back(field->name());
auto& field_type_desc = field_types.emplace_back();
RETURN_IF_ERROR(get_parquet_type(field, &field_type_desc));
}

*type_desc = TypeDescriptor::create_struct_type(field_names, field_types);

return Status::OK();
}

} //namespace starrocks
18 changes: 14 additions & 4 deletions be/test/exec/parquet_scanner_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -701,16 +701,26 @@ TEST_F(ParquetScannerTest, get_file_schema) {
{"col_json_map_timestamp",
TypeDescriptor::create_map_type(TypeDescriptor::from_logical_type(TYPE_DATETIME),
TypeDescriptor::from_logical_type(TYPE_INT))},
{"col_json_struct", TypeDescriptor::create_varchar_type(1048576)},
{"col_json_struct",
TypeDescriptor::create_struct_type({"s0", "s1"}, {TypeDescriptor::from_logical_type(TYPE_INT),
TypeDescriptor::create_varchar_type(1048576)})},
{"col_json_list_list", TypeDescriptor::create_array_type(TypeDescriptor::create_array_type(
TypeDescriptor::from_logical_type(TYPE_INT)))},
{"col_json_map_list",
TypeDescriptor::create_map_type(
TypeDescriptor::create_varchar_type(1048576),
TypeDescriptor::create_array_type(TypeDescriptor::from_logical_type(TYPE_INT)))},
{"col_json_list_struct", TypeDescriptor::create_array_type(TypeDescriptor::create_varchar_type(1048576))},
{"col_json_struct_struct", TypeDescriptor::create_varchar_type(1048576)},
{"col_json_struct_string", TypeDescriptor::create_varchar_type(1048576)},
{"col_json_list_struct", TypeDescriptor::create_array_type(TypeDescriptor::create_struct_type(
{"s0", "s1"}, {TypeDescriptor::from_logical_type(TYPE_INT),
TypeDescriptor::create_varchar_type(1048576)}))},
{"col_json_struct_struct",
TypeDescriptor::create_struct_type(
{"s0", "s1"},
{TypeDescriptor::from_logical_type(TYPE_INT),
TypeDescriptor::create_struct_type({"s2"}, {TypeDescriptor::from_logical_type(TYPE_INT)})})},
{"col_json_struct_string",
TypeDescriptor::create_struct_type({"s0", "s1"}, {TypeDescriptor::from_logical_type(TYPE_INT),
TypeDescriptor::create_varchar_type(1048576)})},
{"col_json_json_string", TypeDescriptor::create_varchar_type(1048576)}}},
{test_exec_dir + "/test_data/parquet_data/decimal.parquet",
{{"col_decimal32", TypeDescriptor::create_decimalv3_type(TYPE_DECIMAL32, 9, 2)},
Expand Down
Loading

0 comments on commit 1028ffc

Please sign in to comment.