Skip to content

Commit

Permalink
[enchement](schema change)Standardize the behavior after a table sche…
Browse files Browse the repository at this point in the history
…ma change.
  • Loading branch information
hubgeter committed Jan 27, 2025
1 parent 744691a commit 47f4e9f
Show file tree
Hide file tree
Showing 12 changed files with 6,395 additions and 115 deletions.
181 changes: 131 additions & 50 deletions be/src/vec/exec/format/column_type_convert.cpp

Large diffs are not rendered by default.

410 changes: 365 additions & 45 deletions be/src/vec/exec/format/column_type_convert.h

Large diffs are not rendered by default.

61 changes: 47 additions & 14 deletions be/src/vec/exec/format/orc/vorc_reader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -474,20 +474,42 @@ std::tuple<bool, orc::Literal> convert_to_orc_literal(const orc::Type* type,
const auto* value = literal_data.data;
try {
switch (type->getKind()) {
case orc::TypeKind::BOOLEAN:
case orc::TypeKind::BOOLEAN: {
if (primitive_type != TYPE_BOOLEAN) {
return std::make_tuple(false, orc::Literal(false));
}
return std::make_tuple(true, orc::Literal(bool(*((uint8_t*)value))));
}
case orc::TypeKind::BYTE:
return std::make_tuple(true, orc::Literal(int64_t(*((int8_t*)value))));
case orc::TypeKind::SHORT:
return std::make_tuple(true, orc::Literal(int64_t(*((int16_t*)value))));
case orc::TypeKind::INT:
return std::make_tuple(true, orc::Literal(int64_t(*((int32_t*)value))));
case orc::TypeKind::LONG:
case orc::TypeKind::LONG: {
if constexpr (primitive_type == TYPE_TINYINT) {
return std::make_tuple(true, orc::Literal(int64_t(*((int8_t*)value))));
} else if constexpr (primitive_type == TYPE_SMALLINT) {
return std::make_tuple(true, orc::Literal(int64_t(*((int16_t*)value))));
} else if constexpr (primitive_type == TYPE_INT) {
return std::make_tuple(true, orc::Literal(int64_t(*((int32_t*)value))));
} else if constexpr (primitive_type == TYPE_BIGINT) {
return std::make_tuple(true, orc::Literal(int64_t(*((int64_t*)value))));
}
return std::make_tuple(false, orc::Literal(false));
}
return std::make_tuple(true, orc::Literal(*((int64_t*)value)));
case orc::TypeKind::FLOAT:
return std::make_tuple(true, orc::Literal(double(*((float*)value))));
case orc::TypeKind::DOUBLE:
return std::make_tuple(true, orc::Literal(*((double*)value)));
case orc::TypeKind::FLOAT: {
if constexpr (primitive_type == TYPE_FLOAT) {
return std::make_tuple(true, orc::Literal(double(*((float*)value))));
} else if constexpr (primitive_type == TYPE_DOUBLE) {
return std::make_tuple(true, orc::Literal(double(*((double*)value))));
}
return std::make_tuple(false, orc::Literal(false));
}
case orc::TypeKind::DOUBLE: {
if (primitive_type == TYPE_DOUBLE) {
return std::make_tuple(true, orc::Literal(*((double*)value)));
}
return std::make_tuple(false, orc::Literal(false));
}
case orc::TypeKind::STRING:
[[fallthrough]];
case orc::TypeKind::BINARY:
Expand All @@ -496,7 +518,11 @@ std::tuple<bool, orc::Literal> convert_to_orc_literal(const orc::Type* type,
// case orc::TypeKind::CHAR:
// [[fallthrough]];
case orc::TypeKind::VARCHAR: {
return std::make_tuple(true, orc::Literal(literal_data.data, literal_data.size));
if (primitive_type == TYPE_STRING || primitive_type == TYPE_CHAR ||
primitive_type == TYPE_VARCHAR) {
return std::make_tuple(true, orc::Literal(literal_data.data, literal_data.size));
}
return std::make_tuple(false, orc::Literal(false));
}
case orc::TypeKind::DECIMAL: {
int128_t decimal_value;
Expand All @@ -508,8 +534,10 @@ std::tuple<bool, orc::Literal> convert_to_orc_literal(const orc::Type* type,
decimal_value = *((int32_t*)value);
} else if constexpr (primitive_type == TYPE_DECIMAL64) {
decimal_value = *((int64_t*)value);
} else {
} else if constexpr (primitive_type == TYPE_DECIMAL128I) {
decimal_value = *((int128_t*)value);
} else {
return std::make_tuple(false, orc::Literal(false));
}
return std::make_tuple(true, orc::Literal(orc::Int128(uint64_t(decimal_value >> 64),
uint64_t(decimal_value)),
Expand All @@ -523,12 +551,14 @@ std::tuple<bool, orc::Literal> convert_to_orc_literal(const orc::Type* type,
cctz::civil_day civil_date(date_v1.year(), date_v1.month(), date_v1.day());
day_offset =
cctz::convert(civil_date, utc0).time_since_epoch().count() / (24 * 60 * 60);
} else { // primitive_type == TYPE_DATEV2
} else if (primitive_type == TYPE_DATEV2) {
const DateV2Value<DateV2ValueType> date_v2 =
*reinterpret_cast<const DateV2Value<DateV2ValueType>*>(value);
cctz::civil_day civil_date(date_v2.year(), date_v2.month(), date_v2.day());
day_offset =
cctz::convert(civil_date, utc0).time_since_epoch().count() / (24 * 60 * 60);
} else {
return std::make_tuple(false, orc::Literal(false));
}
return std::make_tuple(true, orc::Literal(orc::PredicateDataType::DATE, day_offset));
}
Expand All @@ -545,14 +575,16 @@ std::tuple<bool, orc::Literal> convert_to_orc_literal(const orc::Type* type,
datetime_v1.minute(), datetime_v1.second());
seconds = cctz::convert(civil_seconds, utc0).time_since_epoch().count();
nanos = 0;
} else { // primitive_type == TYPE_DATETIMEV2
} else if (primitive_type == TYPE_DATETIMEV2) {
const DateV2Value<DateTimeV2ValueType> datetime_v2 =
*reinterpret_cast<const DateV2Value<DateTimeV2ValueType>*>(value);
cctz::civil_second civil_seconds(datetime_v2.year(), datetime_v2.month(),
datetime_v2.day(), datetime_v2.hour(),
datetime_v2.minute(), datetime_v2.second());
seconds = cctz::convert(civil_seconds, utc0).time_since_epoch().count();
nanos = datetime_v2.microsecond() * 1000;
} else {
return std::make_tuple(false, orc::Literal(false));
}
return std::make_tuple(true, orc::Literal(seconds, nanos));
}
Expand Down Expand Up @@ -1754,7 +1786,8 @@ Status OrcReader::_orc_column_to_doris_column(const std::string& col_name, Colum

if (!_converters.contains(converter_key)) {
std::unique_ptr<converter::ColumnTypeConverter> converter =
converter::ColumnTypeConverter::get_converter(src_type, data_type);
converter::ColumnTypeConverter::get_converter(src_type, data_type,
converter::FileFormat::ORC);
if (!converter->support()) {
return Status::InternalError(
"The column type of '{}' has changed and is not supported: ", col_name,
Expand Down
9 changes: 4 additions & 5 deletions be/src/vec/exec/format/parquet/parquet_column_convert.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@ namespace doris::vectorized::parquet {
const cctz::time_zone ConvertParams::utc0 = cctz::utc_time_zone();

#define FOR_LOGICAL_DECIMAL_TYPES(M) \
M(TYPE_DECIMALV2) \
M(TYPE_DECIMAL32) \
M(TYPE_DECIMAL64) \
M(TYPE_DECIMAL128I) \
Expand Down Expand Up @@ -133,8 +132,8 @@ static void get_decimal_converter(FieldSchema* field_schema, TypeDescriptor src_
std::unique_ptr<PhysicalToLogicalConverter>& physical_converter) {
const tparquet::SchemaElement& parquet_schema = field_schema->parquet_schema;
if (is_decimal(remove_nullable(dst_logical_type))) {
// using destination decimal type, avoid type and scale change
src_logical_type = remove_nullable(dst_logical_type)->get_type_as_type_descriptor();
src_logical_type = create_decimal(parquet_schema.precision, parquet_schema.scale, false)
->get_type_as_type_descriptor();
}

tparquet::Type::type src_physical_type = parquet_schema.type;
Expand Down Expand Up @@ -298,8 +297,8 @@ std::unique_ptr<PhysicalToLogicalConverter> PhysicalToLogicalConverter::get_conv

if (physical_converter->support()) {
physical_converter->_convert_params = std::move(convert_params);
physical_converter->_logical_converter =
converter::ColumnTypeConverter::get_converter(src_logical_type, dst_logical_type);
physical_converter->_logical_converter = converter::ColumnTypeConverter::get_converter(
src_logical_type, dst_logical_type, converter::FileFormat::PARQUET);
if (!physical_converter->_logical_converter->support()) {
physical_converter.reset(new UnsupportedConverter(
"Unsupported type change: " +
Expand Down
2 changes: 1 addition & 1 deletion be/src/vec/exec/format/parquet/parquet_column_convert.h
Original file line number Diff line number Diff line change
Expand Up @@ -154,7 +154,7 @@ struct ConvertParams {
*
* Ultimate performance optimization:
* 1. If process of (First => Second) is consistent, eg. from BYTE_ARRAY to string, no additional copies and conversions will be introduced;
* 2. If process of (Second => Third) is consistent, eg. from decimal(12, 4) to decimal(8, 2), no additional copies and conversions will be introduced;
* 2. If process of (Second => Third) is consistent, no additional copies and conversions will be introduced;
* 3. Null map is share among all processes, no additional copies and conversions will be introduced in null map;
* 4. Only create one physical column in physical conversion, and reused in each loop;
* 5. Only create one logical column in logical conversion, and reused in each loop;
Expand Down
Loading

0 comments on commit 47f4e9f

Please sign in to comment.