diff --git a/modules/basic/ds/arrow.cc b/modules/basic/ds/arrow.cc index 23291c0bc..89be03e24 100644 --- a/modules/basic/ds/arrow.cc +++ b/modules/basic/ds/arrow.cc @@ -36,6 +36,7 @@ limitations under the License. #include "client/client.h" #include "client/ds/blob.h" #include "common/util/logging.h" // IWYU pragma: keep +#include "common/util/macros.h" namespace vineyard { @@ -108,6 +109,31 @@ class ArrowArrayBuilderVisitor { return Status::OK(); } + Status Visit(const arrow::Date32Type*) { + builder_ = std::make_shared(client_, array_); + return Status::OK(); + } + + Status Visit(const arrow::Date64Type*) { + builder_ = std::make_shared(client_, array_); + return Status::OK(); + } + + Status Visit(const arrow::Time32Type*) { + builder_ = std::make_shared(client_, array_); + return Status::OK(); + } + + Status Visit(const arrow::Time64Type*) { + builder_ = std::make_shared(client_, array_); + return Status::OK(); + } + + Status Visit(const arrow::TimestampType*) { + builder_ = std::make_shared(client_, array_); + return Status::OK(); + } + Status Visit(const arrow::StringType*) { builder_ = std::make_shared(client_, array_); return Status::OK(); @@ -295,10 +321,15 @@ std::shared_ptr CastToArray(std::shared_ptr object) { template void NumericArray::PostConstruct(const ObjectMeta& meta) { + std::shared_ptr data_type; + if (this->data_type_.empty()) { + data_type = ConvertToArrowType::TypeValue(); + } else { + data_type = type_name_to_arrow_type(this->data_type_); + } this->array_ = std::make_shared( - ConvertToArrowType::TypeValue(), this->length_, - this->buffer_->ArrowBufferOrEmpty(), this->null_bitmap_->ArrowBuffer(), - this->null_count_, this->offset_); + data_type, this->length_, this->buffer_->ArrowBufferOrEmpty(), + this->null_bitmap_->ArrowBuffer(), this->null_count_, this->offset_); } template class NumericArray; @@ -311,14 +342,15 @@ template class NumericArray; template class NumericArray; template class NumericArray; template class NumericArray; +template class NumericArray; +template class NumericArray; +template class NumericArray; +template class NumericArray; +template class NumericArray; template NumericArrayBuilder::NumericArrayBuilder(Client& client) - : NumericArrayBaseBuilder(client) { - std::shared_ptr array; - CHECK_ARROW_ERROR(ArrowBuilderType{}.Finish(&array)); - this->arrays_.emplace_back(array); -} + : NumericArrayBaseBuilder(client) {} template NumericArrayBuilder::NumericArrayBuilder( @@ -353,12 +385,19 @@ template Status NumericArrayBuilder::Build(Client& client) { memory::VineyardMemoryPool pool(client); std::shared_ptr array; - RETURN_ON_ARROW_ERROR_AND_ASSIGN( - array, arrow_shim::Concatenate(std::move(this->arrays_), &pool)); + if (this->arrays_.empty()) { + CHECK_ARROW_ERROR(ArrowBuilderType(ConvertToArrowType::TypeValue(), + arrow::default_memory_pool()) + .Finish(&array)); + } else { + RETURN_ON_ARROW_ERROR_AND_ASSIGN( + array, arrow_shim::Concatenate(std::move(this->arrays_), &pool)); + } std::shared_ptr array_ = std::dynamic_pointer_cast(array); this->set_length_(array_->length()); + this->set_data_type_(type_name_from_arrow_type(array_->type())); this->set_null_count_(array_->null_count()); this->set_offset_(array_->offset()); TAKE_BUFFER_OR_NULL_AND_APPLY(this, client, set_buffer_, pool, @@ -377,6 +416,11 @@ template class NumericArrayBuilder; template class NumericArrayBuilder; template class NumericArrayBuilder; template class NumericArrayBuilder; +template class NumericArrayBuilder; +template class NumericArrayBuilder; +template class NumericArrayBuilder; +template class NumericArrayBuilder; +template class NumericArrayBuilder; template FixedNumericArrayBuilder::FixedNumericArrayBuilder(Client& client, @@ -384,7 +428,7 @@ FixedNumericArrayBuilder::FixedNumericArrayBuilder(Client& client, : NumericArrayBaseBuilder(client), client_(client), size_(size) { if (size_ > 0) { VINEYARD_CHECK_OK(client.CreateBlob(size_ * sizeof(T), writer_)); - data_ = reinterpret_cast(writer_->data()); + data_ = reinterpret_cast*>(writer_->data()); } } @@ -408,7 +452,7 @@ Status FixedNumericArrayBuilder::Make( out->size_ = size; if (out->size_ > 0) { RETURN_ON_ERROR(client.CreateBlob(out->size_ * sizeof(T), out->writer_)); - out->data_ = reinterpret_cast(out->writer_->data()); + out->data_ = reinterpret_cast*>(out->writer_->data()); } return Status::OK(); } @@ -426,7 +470,7 @@ Status FixedNumericArrayBuilder::Make( "cannot make builder of size > 0 with a null buffer"); } out->writer_ = std::move(writer); - out->data_ = reinterpret_cast(out->writer_->data()); + out->data_ = reinterpret_cast*>(out->writer_->data()); } return Status::OK(); } @@ -462,7 +506,8 @@ size_t FixedNumericArrayBuilder::size() const { } template -T* FixedNumericArrayBuilder::MutablePointer(int64_t i) const { +ArrowValueType* FixedNumericArrayBuilder::MutablePointer( + int64_t i) const { if (data_) { return data_ + i; } @@ -470,7 +515,7 @@ T* FixedNumericArrayBuilder::MutablePointer(int64_t i) const { } template -T* FixedNumericArrayBuilder::data() const { +ArrowValueType* FixedNumericArrayBuilder::data() const { return data_; } @@ -498,6 +543,11 @@ template class FixedNumericArrayBuilder; template class FixedNumericArrayBuilder; template class FixedNumericArrayBuilder; template class FixedNumericArrayBuilder; +template class FixedNumericArrayBuilder; +template class FixedNumericArrayBuilder; +template class FixedNumericArrayBuilder; +template class FixedNumericArrayBuilder; +template class FixedNumericArrayBuilder; void BooleanArray::PostConstruct(const ObjectMeta& meta) { this->array_ = std::make_shared( diff --git a/modules/basic/ds/arrow.h b/modules/basic/ds/arrow.h index 79ee1e603..1653abb23 100644 --- a/modules/basic/ds/arrow.h +++ b/modules/basic/ds/arrow.h @@ -104,9 +104,9 @@ class FixedNumericArrayBuilder : public NumericArrayBaseBuilder { size_t size() const; - T* MutablePointer(int64_t i) const; + ArrowValueType* MutablePointer(int64_t i) const; - T* data() const; + ArrowValueType* data() const; Status Build(Client& client) override; @@ -116,7 +116,7 @@ class FixedNumericArrayBuilder : public NumericArrayBaseBuilder { Client& client_; size_t size_ = 0; std::unique_ptr writer_ = nullptr; - T* data_ = nullptr; + ArrowValueType* data_ = nullptr; }; using Int8Builder = NumericArrayBuilder; @@ -129,6 +129,11 @@ using UInt32Builder = NumericArrayBuilder; using UInt64Builder = NumericArrayBuilder; using FloatBuilder = NumericArrayBuilder; using DoubleBuilder = NumericArrayBuilder; +using Date32Builder = NumericArrayBuilder; +using Date64Builder = NumericArrayBuilder; +using Time32Builder = NumericArrayBuilder; +using Time64Builder = NumericArrayBuilder; +using TimestampBuilder = NumericArrayBuilder; using FixedInt8Builder = FixedNumericArrayBuilder; using FixedInt16Builder = FixedNumericArrayBuilder; @@ -140,6 +145,11 @@ using FixedUInt32Builder = FixedNumericArrayBuilder; using FixedUInt64Builder = FixedNumericArrayBuilder; using FixedFloatBuilder = FixedNumericArrayBuilder; using FixedDoubleBuilder = FixedNumericArrayBuilder; +using FixedDate32Builder = FixedNumericArrayBuilder; +using FixedDate64Builder = FixedNumericArrayBuilder; +using FixedTime32Builder = FixedNumericArrayBuilder; +using FixedTime64Builder = FixedNumericArrayBuilder; +using FixedTimestampBuilder = FixedNumericArrayBuilder; /** * @brief BooleanArrayBuilder is designed for constructing Arrow arrays of diff --git a/modules/basic/ds/arrow.vineyard-mod b/modules/basic/ds/arrow.vineyard-mod index e6fc2ac57..7935accf1 100644 --- a/modules/basic/ds/arrow.vineyard-mod +++ b/modules/basic/ds/arrow.vineyard-mod @@ -78,10 +78,11 @@ class [[vineyard]] NumericArray : public PrimitiveArray, const size_t length() const { return array_->length(); } - const T* raw_values() const { return array_->raw_values(); } + const ArrowValueType* raw_values() const { return array_->raw_values(); } private: [[shared]] size_t length_; + [[shared(optional)]] String data_type_; [[shared]] int64_t null_count_, offset_; [[shared]] std::shared_ptr buffer_, null_bitmap_; @@ -100,6 +101,11 @@ using UInt32Array = NumericArray; using UInt64Array = NumericArray; using FloatArray = NumericArray; using DoubleArray = NumericArray; +using Date32Array = NumericArray; +using Date64Array = NumericArray; +using Time32Array = NumericArray; +using Time64Array = NumericArray; +using TimestampArray = NumericArray; class BooleanArrayBaseBuilder; diff --git a/modules/basic/ds/arrow_utils.cc b/modules/basic/ds/arrow_utils.cc index a1f1441f8..d14462635 100644 --- a/modules/basic/ds/arrow_utils.cc +++ b/modules/basic/ds/arrow_utils.cc @@ -53,9 +53,15 @@ std::shared_ptr FromAnyType(AnyType type) { case AnyType::String: return arrow::large_utf8(); case AnyType::Date32: - return arrow::int32(); + return arrow::date32(); case AnyType::Date64: - return arrow::int64(); + return arrow::date64(); + case AnyType::Time32: + return arrow::time32(DefaultTimeUnit); + case AnyType::Time64: + return arrow::time64(DefaultTimeUnit); + case AnyType::Timestamp: + return arrow::timestamp(DefaultTimeUnit); default: return arrow::null(); } @@ -197,58 +203,73 @@ Status EmptyTableBuilder::Build(const std::shared_ptr& schema, std::shared_ptr dummy; auto type = schema->field(i)->type(); - if (type == arrow::boolean()) { + if (arrow::boolean()->Equals(type)) { arrow::BooleanBuilder builder; RETURN_ON_ARROW_ERROR(builder.Finish(&dummy)); - } else if (type == arrow::uint64()) { + } else if (arrow::uint64()->Equals(type)) { arrow::UInt64Builder builder; RETURN_ON_ARROW_ERROR(builder.Finish(&dummy)); - } else if (type == arrow::int64()) { + } else if (arrow::int64()->Equals(type)) { arrow::Int64Builder builder; RETURN_ON_ARROW_ERROR(builder.Finish(&dummy)); - } else if (type == arrow::uint32()) { + } else if (arrow::uint32()->Equals(type)) { arrow::UInt32Builder builder; RETURN_ON_ARROW_ERROR(builder.Finish(&dummy)); - } else if (type == arrow::int32()) { + } else if (arrow::int32()->Equals(type)) { arrow::Int32Builder builder; RETURN_ON_ARROW_ERROR(builder.Finish(&dummy)); - } else if (type == arrow::float32()) { + } else if (arrow::float32()->Equals(type)) { arrow::FloatBuilder builder; RETURN_ON_ARROW_ERROR(builder.Finish(&dummy)); - } else if (type == arrow::float64()) { + } else if (arrow::float64()->Equals(type)) { arrow::DoubleBuilder builder; RETURN_ON_ARROW_ERROR(builder.Finish(&dummy)); - } else if (type == arrow::utf8()) { + } else if (arrow::utf8()->Equals(type)) { arrow::StringBuilder builder; RETURN_ON_ARROW_ERROR(builder.Finish(&dummy)); - } else if (type == arrow::large_utf8()) { + } else if (arrow::large_utf8()->Equals(type)) { arrow::LargeStringBuilder builder; RETURN_ON_ARROW_ERROR(builder.Finish(&dummy)); - } else if (type == arrow::list(arrow::uint64())) { + } else if (arrow::date32()->Equals(type)) { + arrow::Date32Builder builder; + RETURN_ON_ARROW_ERROR(builder.Finish(&dummy)); + } else if (arrow::date64()->Equals(type)) { + arrow::Date64Builder builder; + RETURN_ON_ARROW_ERROR(builder.Finish(&dummy)); + } else if (type->id() == arrow::Type::TIME32) { + arrow::Time32Builder builder(type, arrow::default_memory_pool()); + RETURN_ON_ARROW_ERROR(builder.Finish(&dummy)); + } else if (type->id() == arrow::Type::TIME64) { + arrow::Time64Builder builder(type, arrow::default_memory_pool()); + RETURN_ON_ARROW_ERROR(builder.Finish(&dummy)); + } else if (type->id() == arrow::Type::TIMESTAMP) { + arrow::TimestampBuilder builder(type, arrow::default_memory_pool()); + RETURN_ON_ARROW_ERROR(builder.Finish(&dummy)); + } else if (arrow::list(arrow::uint64())->Equals(type)) { auto builder = std::make_shared(); arrow::ListBuilder list_builder(arrow::default_memory_pool(), builder); RETURN_ON_ARROW_ERROR(list_builder.Finish(&dummy)); - } else if (type == arrow::list(arrow::int64())) { + } else if (arrow::list(arrow::int64())->Equals(type)) { auto builder = std::make_shared(); arrow::ListBuilder list_builder(arrow::default_memory_pool(), builder); RETURN_ON_ARROW_ERROR(list_builder.Finish(&dummy)); - } else if (type == arrow::list(arrow::uint32())) { + } else if (arrow::list(arrow::uint32())->Equals(type)) { auto builder = std::make_shared(); arrow::ListBuilder list_builder(arrow::default_memory_pool(), builder); RETURN_ON_ARROW_ERROR(list_builder.Finish(&dummy)); - } else if (type == arrow::list(arrow::int32())) { + } else if (arrow::list(arrow::int32())->Equals(type)) { auto builder = std::make_shared(); arrow::ListBuilder list_builder(arrow::default_memory_pool(), builder); RETURN_ON_ARROW_ERROR(list_builder.Finish(&dummy)); - } else if (type == arrow::list(arrow::float64())) { + } else if (arrow::list(arrow::float64())->Equals(type)) { auto builder = std::make_shared(); arrow::ListBuilder list_builder(arrow::default_memory_pool(), builder); RETURN_ON_ARROW_ERROR(list_builder.Finish(&dummy)); - } else if (type == arrow::list(arrow::int64())) { + } else if (arrow::list(arrow::int64())->Equals(type)) { auto builder = std::make_shared(); arrow::ListBuilder list_builder(arrow::default_memory_pool(), builder); RETURN_ON_ARROW_ERROR(list_builder.Finish(&dummy)); - } else if (type == arrow::null()) { + } else if (arrow::null()->Equals(type)) { arrow::NullBuilder builder; RETURN_ON_ARROW_ERROR(builder.Finish(&dummy)); } else { @@ -659,6 +680,42 @@ std::shared_ptr AddMetadataToRecordBatch( return batch->ReplaceSchemaMetadata(metadata); } +namespace detail { + +inline std::string type_name_from_arrow_date_unit( + arrow::TimeUnit::type const& unit) { + switch (unit) { + case arrow::TimeUnit::SECOND: + return "[S]"; + case arrow::TimeUnit::MILLI: + return "[MS]"; + case arrow::TimeUnit::MICRO: + return "[US]"; + case arrow::TimeUnit::NANO: + return "[NS]"; + default: + return "Unsupported time unit: '" + std::to_string(static_cast(unit)) + + "'"; + } +} + +inline arrow::TimeUnit::type type_name_to_arrow_date_unit(const char* unit) { + if (std::strncmp(unit, "[S]", 3) == 0) { + return arrow::TimeUnit::SECOND; + } else if (std::strncmp(unit, "[MS]", 4) == 0) { + return arrow::TimeUnit::MILLI; + } else if (std::strncmp(unit, "[US]", 4) == 0) { + return arrow::TimeUnit::MICRO; + } else if (std::strncmp(unit, "[NS]", 4) == 0) { + return arrow::TimeUnit::NANO; + } else { + LOG(ERROR) << "Unsupported time unit: '" << unit << "'"; + return arrow::TimeUnit::SECOND; + } +} + +} // namespace detail + std::shared_ptr type_name_to_arrow_type( const std::string& name) { if (name == "bool") { @@ -686,6 +743,41 @@ std::shared_ptr type_name_to_arrow_type( } else if (name == "string" || name == "std::string" || name == "str" || name == "std::__1::string" || name == "std::__cxx11::string") { return vineyard::ConvertToArrowType::TypeValue(); + } else if (name == "date32[day]") { + return vineyard::ConvertToArrowType::TypeValue(); + } else if (name == "date64[ms]") { + return vineyard::ConvertToArrowType::TypeValue(); + } else if (name.substr(0, std::string("time[32]").length()) == + std::string("time[32]")) { + const std::string unit_content = + name.substr(std::string("time[32]").length()); + arrow::TimeUnit::type unit = DefaultTimeUnit; + if (unit_content.length() >= 3) { + unit = detail::type_name_to_arrow_date_unit(unit_content.c_str()); + } + return arrow::time32(unit); + } else if (name.substr(0, std::string("time[64]").length()) == + std::string("time[64]")) { + const std::string unit_content = + name.substr(std::string("time[64]").length()); + arrow::TimeUnit::type unit = DefaultTimeUnit; + if (unit_content.length() >= 3) { + unit = detail::type_name_to_arrow_date_unit(unit_content.c_str()); + } + return arrow::time64(unit); + } else if (name.substr(0, std::string("timestamp").length()) == + std::string("timestamp")) { + const std::string unit_content = + name.substr(std::string("timestamp").length()); + arrow::TimeUnit::type unit = DefaultTimeUnit; + if (unit_content.length() >= 3) { + unit = detail::type_name_to_arrow_date_unit(unit_content.c_str()); + std::string timezone = + name.substr(std::string("timestamp").length() + + detail::type_name_from_arrow_date_unit(unit).length()); + return arrow::timestamp(unit, timezone); + } + return arrow::timestamp(unit); } else if (name.substr(0, std::string("list::TypeValue()->Equals( type)) { return type_name(); + } else if (vineyard::ConvertToArrowType::TypeValue() + ->Equals(type)) { + return "date32[day]"; + } else if (vineyard::ConvertToArrowType::TypeValue() + ->Equals(type)) { + return "date64[ms]"; + } else if (type->id() == arrow::Type::TIME32) { + auto time32_type = std::dynamic_pointer_cast(type); + const std::string unit = + detail::type_name_from_arrow_date_unit(time32_type->unit()); + return "time[32]" + unit; + } else if (type->id() == arrow::Type::TIME64) { + auto time64_type = std::dynamic_pointer_cast(type); + const std::string unit = + detail::type_name_from_arrow_date_unit(time64_type->unit()); + return "time[64]" + unit; + } else if (type->id() == arrow::Type::TIMESTAMP) { + auto timestamp_type = std::dynamic_pointer_cast(type); + const std::string unit = + detail::type_name_from_arrow_date_unit(timestamp_type->unit()); + return "timestamp" + unit + "[" + timestamp_type->timezone() + "]"; } else if (type != nullptr && type->id() == arrow::Type::LIST) { auto list_type = std::static_pointer_cast(type); return "listvalue_type()) + @@ -804,6 +917,21 @@ const void* get_arrow_array_data(std::shared_ptr const& array) { } else if (array->type()->Equals(arrow::large_utf8())) { return reinterpret_cast( std::dynamic_pointer_cast(array).get()); + } else if (array->type()->Equals(arrow::date32())) { + return reinterpret_cast( + std::dynamic_pointer_cast(array).get()); + } else if (array->type()->Equals(arrow::date64())) { + return reinterpret_cast( + std::dynamic_pointer_cast(array).get()); + } else if (array->type()->id() == arrow::Type::TIME32) { + return reinterpret_cast( + std::dynamic_pointer_cast(array).get()); + } else if (array->type()->id() == arrow::Type::TIME64) { + return reinterpret_cast( + std::dynamic_pointer_cast(array).get()); + } else if (array->type()->id() == arrow::Type::TIMESTAMP) { + return reinterpret_cast( + std::dynamic_pointer_cast(array).get()); } else if (array->type()->id() == arrow::Type::LIST) { return reinterpret_cast( std::dynamic_pointer_cast(array).get()); @@ -847,8 +975,7 @@ Status TypeLoosen(const std::vector>& schemas, "Empty table list cannot be used for normalizing schema"); // Perform type lossen. - // Date32 -> int32 - // Timestamp -> int64 -> double -> utf8 binary (not supported) + // int64 -> double -> utf8 binary (not supported) // Timestamp value are stored as as number of seconds, milliseconds, // microseconds or nanoseconds since UNIX epoch. @@ -872,15 +999,6 @@ Status TypeLoosen(const std::vector>& schemas, if (res->Equals(arrow::boolean())) { res = arrow::int32(); } - if (res->Equals(arrow::date32())) { - res = arrow::int32(); - } - if (res->Equals(arrow::date64())) { - res = arrow::int64(); - } - if (res->id() == arrow::Type::TIMESTAMP) { - res = arrow::int64(); - } if (res->Equals(arrow::int64())) { for (size_t j = 1; j < fields[i].size(); ++j) { if (fields[i][j]->type()->Equals(arrow::float64())) { @@ -898,6 +1016,42 @@ Status TypeLoosen(const std::vector>& schemas, if (res->Equals(arrow::utf8())) { res = arrow::large_utf8(); } + // Note [date, time, and timestamp conversion rules] + // + // GIE has specific own unit and timezone conversion for dates, times and + // timestamps, see also: + // https://github.com/alibaba/GraphScope/blob/main/interactive_engine/executor/ir/proto/common.proto#L58-L72 + // + // More specifically, + // + // - Date32: for int32 days since 1970-01-01 + // - Time32: for int32 milliseconds past midnight + // - Timestamp: int64 milliseconds since 1970-01-01 00:00:00.000000 (in an + // unspecified timezone) + // the default timezone when parsing value is UTC in GIE. + // + // Thus we got the following conversion rules: + // + // - Date32: no change + // - Date64 -> Timestamp + // - Time32_* -> Time32_MS + // - Time64_* -> Time32_MS + // - Timestamp_* -> Timestamp_MS_UTC + if (res->Equals(arrow::date32())) { + res = arrow::date32(); + } + if (res->Equals(arrow::date64())) { + res = arrow::timestamp(arrow::TimeUnit::MILLI, "UTC"); + } + if (res->id() == arrow::Type::TIME32) { + res = arrow::time32(arrow::TimeUnit::MILLI); + } + if (res->id() == arrow::Type::TIME64) { + res = arrow::time32(arrow::TimeUnit::MILLI); + } + if (res->id() == arrow::Type::TIMESTAMP) { + res = arrow::timestamp(arrow::TimeUnit::MILLI, "UTC"); + } lossen_fields[i] = lossen_fields[i]->WithType(res); } schema = std::make_shared(lossen_fields, metadata); @@ -994,14 +1148,13 @@ Status CastBatchToSchema(const std::shared_ptr& batch, "not consistent"); std::vector> new_columns; for (int64_t i = 0; i < batch->num_columns(); ++i) { - auto col = batch->column(i); - if (batch->schema()->field(i)->type()->Equals(schema->field(i)->type())) { - new_columns.push_back(col); - continue; - } + auto array = batch->column(i); auto from_type = batch->schema()->field(i)->type(); auto to_type = schema->field(i)->type(); - auto array = col; + if (from_type->Equals(to_type)) { + new_columns.push_back(array); + continue; + } std::shared_ptr out; if (arrow::compute::CanCast(*from_type, *to_type)) { RETURN_ON_ERROR(GeneralCast(array, to_type, out)); @@ -1034,16 +1187,16 @@ Status CastTableToSchema(const std::shared_ptr& table, "The schema of original table and expected schema is not consistent"); std::vector> new_columns; for (int64_t i = 0; i < table->num_columns(); ++i) { - auto col = table->column(i); + auto chunked_column = table->column(i); if (table->field(i)->type()->Equals(schema->field(i)->type())) { - new_columns.push_back(col); + new_columns.push_back(chunked_column); continue; } auto from_type = table->field(i)->type(); auto to_type = schema->field(i)->type(); std::vector> chunks; - for (int64_t j = 0; j < col->num_chunks(); ++j) { - auto array = col->chunk(j); + for (int64_t j = 0; j < chunked_column->num_chunks(); ++j) { + auto array = chunked_column->chunk(j); std::shared_ptr out; if (arrow::compute::CanCast(*from_type, *to_type)) { RETURN_ON_ERROR(GeneralCast(array, to_type, out)); @@ -1082,7 +1235,12 @@ inline bool IsDataTypeConsolidatable(std::shared_ptr type) { case arrow::Type::UINT32: case arrow::Type::UINT64: case arrow::Type::FLOAT: - case arrow::Type::DOUBLE: { + case arrow::Type::DOUBLE: + case arrow::Type::DATE32: + case arrow::Type::DATE64: + case arrow::Type::TIME32: + case arrow::Type::TIME64: + case arrow::Type::TIMESTAMP: { return true; } default: { @@ -1161,6 +1319,31 @@ inline void AssignArrayWithStrideUntyped(std::shared_ptr array, stride, offset); return; } + case arrow::Type::DATE32: { + AssignArrayWithStride( + array->data()->buffers[1], target, length, stride, offset); + return; + } + case arrow::Type::DATE64: { + AssignArrayWithStride( + array->data()->buffers[1], target, length, stride, offset); + return; + } + case arrow::Type::TIME32: { + AssignArrayWithStride( + array->data()->buffers[1], target, length, stride, offset); + return; + } + case arrow::Type::TIME64: { + AssignArrayWithStride( + array->data()->buffers[1], target, length, stride, offset); + return; + } + case arrow::Type::TIMESTAMP: { + AssignArrayWithStride( + array->data()->buffers[1], target, length, stride, offset); + return; + } default: { } } diff --git a/modules/basic/ds/arrow_utils.h b/modules/basic/ds/arrow_utils.h index c41116f38..83276d189 100644 --- a/modules/basic/ds/arrow_utils.h +++ b/modules/basic/ds/arrow_utils.h @@ -57,12 +57,16 @@ class NullArrayBuilder; class BooleanArray; class BooleanArrayBuilder; -#define CONVERT_TO_ARROW_TYPE( \ - type, data_type, array_type, vineyard_array_type, builder_type, \ - vineyard_builder_type, fixed_vineyard_builder_type, type_value) \ +constexpr arrow::TimeUnit::type DefaultTimeUnit = arrow::TimeUnit::NANO; + +#define CONVERT_TO_ARROW_TYPE(type, data_type, value_type, array_type, \ + vineyard_array_type, builder_type, \ + vineyard_builder_type, \ + fixed_vineyard_builder_type, type_value) \ template <> \ struct ConvertToArrowType { \ using DataType = data_type; \ + using ValueType = value_type; \ using ArrayType = array_type; \ using VineyardArrayType = vineyard_array_type; \ using BuilderType = builder_type; \ @@ -74,6 +78,9 @@ class BooleanArrayBuilder; template using ArrowDataType = typename ConvertToArrowType::DataType; +template +using ArrowValueType = typename ConvertToArrowType::ValueType; + template using ArrowArrayType = typename ConvertToArrowType::ArrayType; @@ -105,73 +112,97 @@ using LargeStringArrayBuilder = GenericBinaryArrayBuilder; -CONVERT_TO_ARROW_TYPE(void, arrow::NullType, arrow::NullArray, NullArray, +CONVERT_TO_ARROW_TYPE(void, arrow::NullType, void, arrow::NullArray, NullArray, arrow::NullBuilder, NullArrayBuilder, void, arrow::null()) -CONVERT_TO_ARROW_TYPE(bool, arrow::BooleanType, arrow::BooleanArray, +CONVERT_TO_ARROW_TYPE(bool, arrow::BooleanType, bool, arrow::BooleanArray, BooleanArray, arrow::BooleanBuilder, BooleanArrayBuilder, void, arrow::boolean()) -CONVERT_TO_ARROW_TYPE(int8_t, arrow::Int8Type, arrow::Int8Array, +CONVERT_TO_ARROW_TYPE(int8_t, arrow::Int8Type, int8_t, arrow::Int8Array, NumericArray, arrow::Int8Builder, NumericArrayBuilder, FixedNumericArrayBuilder, arrow::int8()) -CONVERT_TO_ARROW_TYPE(uint8_t, arrow::UInt8Type, arrow::UInt8Array, +CONVERT_TO_ARROW_TYPE(uint8_t, arrow::UInt8Type, uint8_t, arrow::UInt8Array, NumericArray, arrow::UInt8Builder, NumericArrayBuilder, FixedNumericArrayBuilder, arrow::uint8()) -CONVERT_TO_ARROW_TYPE(int16_t, arrow::Int16Type, arrow::Int16Array, +CONVERT_TO_ARROW_TYPE(int16_t, arrow::Int16Type, int16_t, arrow::Int16Array, NumericArray, arrow::Int16Builder, NumericArrayBuilder, FixedNumericArrayBuilder, arrow::int16()) -CONVERT_TO_ARROW_TYPE(uint16_t, arrow::UInt16Type, arrow::UInt16Array, +CONVERT_TO_ARROW_TYPE(uint16_t, arrow::UInt16Type, uint16_t, arrow::UInt16Array, NumericArray, arrow::UInt16Builder, NumericArrayBuilder, FixedNumericArrayBuilder, arrow::uint16()) -CONVERT_TO_ARROW_TYPE(int32_t, arrow::Int32Type, arrow::Int32Array, +CONVERT_TO_ARROW_TYPE(int32_t, arrow::Int32Type, int32_t, arrow::Int32Array, NumericArray, arrow::Int32Builder, NumericArrayBuilder, FixedNumericArrayBuilder, arrow::int32()) -CONVERT_TO_ARROW_TYPE(uint32_t, arrow::UInt32Type, arrow::UInt32Array, +CONVERT_TO_ARROW_TYPE(uint32_t, arrow::UInt32Type, uint32_t, arrow::UInt32Array, NumericArray, arrow::UInt32Builder, NumericArrayBuilder, FixedNumericArrayBuilder, arrow::uint32()) -CONVERT_TO_ARROW_TYPE(int64_t, arrow::Int64Type, arrow::Int64Array, +CONVERT_TO_ARROW_TYPE(int64_t, arrow::Int64Type, int64_t, arrow::Int64Array, NumericArray, arrow::Int64Builder, NumericArrayBuilder, FixedNumericArrayBuilder, arrow::int64()) -CONVERT_TO_ARROW_TYPE(uint64_t, arrow::UInt64Type, arrow::UInt64Array, +CONVERT_TO_ARROW_TYPE(uint64_t, arrow::UInt64Type, uint64_t, arrow::UInt64Array, NumericArray, arrow::UInt64Builder, NumericArrayBuilder, FixedNumericArrayBuilder, arrow::uint64()) -CONVERT_TO_ARROW_TYPE(float, arrow::FloatType, arrow::FloatArray, +CONVERT_TO_ARROW_TYPE(float, arrow::FloatType, float, arrow::FloatArray, NumericArray, arrow::FloatBuilder, NumericArrayBuilder, FixedNumericArrayBuilder, arrow::float32()) -CONVERT_TO_ARROW_TYPE(double, arrow::DoubleType, arrow::DoubleArray, +CONVERT_TO_ARROW_TYPE(double, arrow::DoubleType, double, arrow::DoubleArray, NumericArray, arrow::DoubleBuilder, NumericArrayBuilder, FixedNumericArrayBuilder, arrow::float64()) -CONVERT_TO_ARROW_TYPE(RefString, arrow::LargeStringType, +CONVERT_TO_ARROW_TYPE(RefString, arrow::LargeStringType, RefString, arrow::LargeStringArray, BaseBinaryArray, arrow::LargeStringBuilder, LargeStringArrayBuilder, void, arrow::large_utf8()) -CONVERT_TO_ARROW_TYPE(std::string, arrow::LargeStringType, +CONVERT_TO_ARROW_TYPE(std::string, arrow::LargeStringType, std::string, arrow::LargeStringArray, BaseBinaryArray, arrow::LargeStringBuilder, LargeStringArrayBuilder, void, arrow::large_utf8()) CONVERT_TO_ARROW_TYPE(arrow_string_view, arrow::LargeStringType, - arrow::LargeStringArray, + arrow_string_view, arrow::LargeStringArray, BaseBinaryArray, arrow::LargeStringBuilder, LargeStringArrayBuilder, void, arrow::large_utf8()) +CONVERT_TO_ARROW_TYPE(arrow::Date32Type, arrow::Date32Type, + arrow::Date32Type::c_type, arrow::Date32Array, + NumericArray, arrow::Date32Builder, + NumericArrayBuilder, + FixedNumericArrayBuilder, + arrow::date32()) +CONVERT_TO_ARROW_TYPE(arrow::Date64Type, arrow::Date64Type, + arrow::Date64Type::c_type, arrow::Date64Array, + NumericArray, arrow::Date64Builder, + NumericArrayBuilder, + FixedNumericArrayBuilder, + arrow::date64()) +CONVERT_TO_ARROW_TYPE(arrow::Time32Type, arrow::Time32Type, + arrow::Time32Type::c_type, arrow::Time32Array, + NumericArray, arrow::Time32Builder, + NumericArrayBuilder, + FixedNumericArrayBuilder, + arrow::time32(DefaultTimeUnit)) +CONVERT_TO_ARROW_TYPE(arrow::Time64Type, arrow::Time64Type, + arrow::Time64Type::c_type, arrow::Time64Array, + NumericArray, arrow::Time64Builder, + NumericArrayBuilder, + FixedNumericArrayBuilder, + arrow::time64(DefaultTimeUnit)) CONVERT_TO_ARROW_TYPE(arrow::TimestampType, arrow::TimestampType, - arrow::TimestampArray, void, arrow::TimestampBuilder, - void, void, arrow::timestamp(arrow::TimeUnit::MILLI)) -CONVERT_TO_ARROW_TYPE(arrow::Date32Type, arrow::Date32Type, arrow::Date32Array, - void, arrow::Date32Builder, void, void, arrow::date32()) -CONVERT_TO_ARROW_TYPE(arrow::Date64Type, arrow::Date64Type, arrow::Date64Array, - void, arrow::Date64Builder, void, void, arrow::date64()) + arrow::TimestampType::c_type, arrow::TimestampArray, + NumericArray, + arrow::TimestampBuilder, + NumericArrayBuilder, + FixedNumericArrayBuilder, + arrow::timestamp(DefaultTimeUnit)) std::shared_ptr FromAnyType(AnyType type); diff --git a/modules/basic/ds/types.cc b/modules/basic/ds/types.cc index 06955354f..f94f6bcf5 100644 --- a/modules/basic/ds/types.cc +++ b/modules/basic/ds/types.cc @@ -38,6 +38,12 @@ AnyType ParseAnyType(const std::string& type_name) { return AnyType::Date32; } else if (type_name == "date64") { return AnyType::Date64; + } else if (type_name == "time32") { + return AnyType::Time32; + } else if (type_name == "time64") { + return AnyType::Time64; + } else if (type_name == "timestamp") { + return AnyType::Timestamp; } else { return AnyType::Undefined; } @@ -63,6 +69,12 @@ std::string GetAnyTypeName(AnyType type) { return "date32"; case AnyType::Date64: return "date64"; + case AnyType::Time32: + return "time32"; + case AnyType::Time64: + return "time64"; + case AnyType::Timestamp: + return "timestamp"; default: return "undefined"; } @@ -83,6 +95,12 @@ IdType ParseIdType(const std::string& type_name) { return IdType::Date32; } else if (type_name == "date64") { return IdType::Date64; + } else if (type_name == "time32") { + return IdType::Time32; + } else if (type_name == "time64") { + return IdType::Time64; + } else if (type_name == "timestamp") { + return IdType::Timestamp; } else { return IdType::Undefined; } @@ -104,6 +122,12 @@ std::string GetIdTypeName(IdType type) { return "date32"; case IdType::Date64: return "date64"; + case IdType::Time32: + return "time32"; + case IdType::Time64: + return "time64"; + case IdType::Timestamp: + return "timestamp"; default: return "undefined"; } diff --git a/modules/basic/ds/types.h b/modules/basic/ds/types.h index 9b8f80756..34c49bf0c 100644 --- a/modules/basic/ds/types.h +++ b/modules/basic/ds/types.h @@ -39,6 +39,9 @@ enum class AnyType { String = 7, Date32 = 8, Date64 = 9, + Time32 = 10, + Time64 = 11, + Timestamp = 12, }; template @@ -91,6 +94,21 @@ struct AnyTypeEnum { static constexpr AnyType value = AnyType::Date64; }; +template <> +struct AnyTypeEnum { + static constexpr AnyType value = AnyType::Time32; +}; + +template <> +struct AnyTypeEnum { + static constexpr AnyType value = AnyType::Time64; +}; + +template <> +struct AnyTypeEnum { + static constexpr AnyType value = AnyType::Timestamp; +}; + enum class IdType { Undefined = 0, Int32 = 1, @@ -100,6 +118,9 @@ enum class IdType { String = 5, Date32 = 6, Date64 = 7, + Time32 = 8, + Time64 = 9, + Timestamp = 10, }; template @@ -142,6 +163,21 @@ struct IdTypeEnum { static constexpr IdType value = IdType::Date64; }; +template <> +struct IdTypeEnum { + static constexpr IdType value = IdType::Time32; +}; + +template <> +struct IdTypeEnum { + static constexpr IdType value = IdType::Time64; +}; + +template <> +struct IdTypeEnum { + static constexpr IdType value = IdType::Timestamp; +}; + AnyType ParseAnyType(const std::string& type_name); std::string GetAnyTypeName(AnyType type); diff --git a/modules/graph/fragment/graph_schema.cc b/modules/graph/fragment/graph_schema.cc index 4b56832a1..d871b7c7d 100644 --- a/modules/graph/fragment/graph_schema.cc +++ b/modules/graph/fragment/graph_schema.cc @@ -34,6 +34,43 @@ namespace vineyard { namespace detail { +inline std::string ArrowDateTypeUnitToString( + arrow::TimeUnit::type const& unit) { + switch (unit) { + case arrow::TimeUnit::SECOND: + return "[S]"; + case arrow::TimeUnit::MILLI: + return "[MS]"; + case arrow::TimeUnit::MICRO: + return "[US]"; + case arrow::TimeUnit::NANO: + return "[NS]"; + default: + return "Unsupported time unit: '" + std::to_string(static_cast(unit)) + + "'"; + } +} + +inline arrow::TimeUnit::type ArrowDateTypeUnitFromString(const char* unit) { + if (std::strncmp(unit, "[S]", 3) == 0) { + return arrow::TimeUnit::SECOND; + } else if (std::strncmp(unit, "[MS]", 4) == 0) { + return arrow::TimeUnit::MILLI; + } else if (std::strncmp(unit, "[US]", 4) == 0) { + return arrow::TimeUnit::MICRO; + } else if (std::strncmp(unit, "[NS]", 4) == 0) { + return arrow::TimeUnit::NANO; + } else { + LOG(ERROR) << "Unsupported time unit: '" << unit << "'"; + return arrow::TimeUnit::SECOND; + } +} + +inline arrow::TimeUnit::type ArrowDateTypeUnitFromString( + std::string const& unit) { + return ArrowDateTypeUnitFromString(unit.c_str()); +} + std::string PropertyTypeToString(PropertyType type) { if (type == nullptr) { return "NULL"; @@ -63,6 +100,23 @@ std::string PropertyTypeToString(PropertyType type) { return "STRING"; } else if (arrow::large_utf8()->Equals(type)) { return "STRING"; + } else if (arrow::date32()->Equals(type)) { + return "DATE32[DAY]"; + } else if (arrow::date64()->Equals(type)) { + return "DATE64[MS]"; + } else if (type->id() == arrow::Type::TIME32) { + auto time32_type = std::dynamic_pointer_cast(type); + const std::string unit = ArrowDateTypeUnitToString(time32_type->unit()); + return "TIME[32]" + unit; + } else if (type->id() == arrow::Type::TIME64) { + auto time64_type = std::dynamic_pointer_cast(type); + const std::string unit = ArrowDateTypeUnitToString(time64_type->unit()); + return "TIME[64]" + unit; + } else if (type->id() == arrow::Type::TIMESTAMP) { + auto timestamp_type = std::dynamic_pointer_cast(type); + const std::string unit = ArrowDateTypeUnitToString(timestamp_type->unit()); + const std::string timezone = timestamp_type->timezone(); + return "TIMESTAMP" + unit + "[" + timezone + "]"; } else if (type->id() == arrow::Type::LIST) { auto ty = std::dynamic_pointer_cast(type); return "LIST" + PropertyTypeToString(ty->value_type()); @@ -113,6 +167,42 @@ PropertyType PropertyTypeFromString(const std::string& type) { return arrow::float64(); } else if (type_upper == "STRING") { return arrow::large_utf8(); + } else if (type_upper == "DATE32[DAY]") { + return arrow::date32(); + } else if (type_upper == "DATE64[MS]") { + return arrow::date64(); + } else if (type_upper.substr(0, std::string("TIME[32]").length()) == + std::string("TIME[32]")) { + const std::string unit_content = + type_upper.substr(std::string("TIME[32]").length()); + arrow::TimeUnit::type unit = DefaultTimeUnit; + if (unit_content.length() >= 3) { + unit = ArrowDateTypeUnitFromString(unit_content); + } + return arrow::time32(unit); + } else if (type_upper.substr(0, std::string("TIME[64]").length()) == + std::string("TIME[64]")) { + const std::string unit_content = + type_upper.substr(std::string("TIME[64]").length()); + arrow::TimeUnit::type unit = DefaultTimeUnit; + if (unit_content.length() >= 3) { + unit = ArrowDateTypeUnitFromString(unit_content); + } + return arrow::time64(unit); + } else if (type_upper.substr(0, std::string("TIMESTAMP").length()) == + std::string("TIMESTAMP")) { + const std::string unit_content = + type_upper.substr(std::string("TIMESTAMP").length()); + arrow::TimeUnit::type unit = DefaultTimeUnit; + if (unit_content.length() >= 3) { + unit = ArrowDateTypeUnitFromString(unit_content); + std::string timezone = + type_upper.substr(std::string("TIMESTAMP").length() + + ArrowDateTypeUnitToString(unit).length()); + timezone = timezone.substr(1, timezone.length() - 2); + return arrow::timestamp(unit); + } + return arrow::timestamp(DefaultTimeUnit); } else if (type_upper == "LISTINT") { return arrow::list(arrow::int32()); } else if (type_upper == "LISTLONG") { diff --git a/modules/graph/fragment/property_graph_types.h b/modules/graph/fragment/property_graph_types.h index 3622e78ce..3c10a99e0 100644 --- a/modules/graph/fragment/property_graph_types.h +++ b/modules/graph/fragment/property_graph_types.h @@ -209,6 +209,11 @@ class EdgeDataColumn { vineyard::ConvertToArrowType::TypeValue())) { data_ = std::dynamic_pointer_cast>(array) ->raw_values(); + } else if (array->type()->id() == arrow::Type::TIME32 || + array->type()->id() == arrow::Type::TIME64 || + array->type()->id() == arrow::Type::TIMESTAMP) { + data_ = + reinterpret_cast(array->data()->buffers[1]->data()); } else { data_ = NULL; } @@ -263,6 +268,12 @@ class VertexDataColumn { data_ = std::dynamic_pointer_cast>(array) ->raw_values() - static_cast(range.begin().GetValue()); + } else if (array->type()->id() == arrow::Type::TIME32 || + array->type()->id() == arrow::Type::TIME64 || + array->type()->id() == arrow::Type::TIMESTAMP) { + data_ = + reinterpret_cast(array->data()->buffers[1]->data()) - + range.begin().GetValue(); } else { data_ = NULL; } diff --git a/modules/graph/loader/gar_fragment_loader_impl.h b/modules/graph/loader/gar_fragment_loader_impl.h index 99d95eeb0..59c6e120f 100644 --- a/modules/graph/loader/gar_fragment_loader_impl.h +++ b/modules/graph/loader/gar_fragment_loader_impl.h @@ -135,14 +135,14 @@ GARFragmentLoader::LoadEdgeTables() { const auto& edge_info = edge.second; if (edge_info.ContainAdjList( GraphArchive::AdjListType::ordered_by_source)) { - loadEdgeTableOfLabel(edge_info, - GraphArchive::AdjListType::ordered_by_source); + BOOST_LEAF_CHECK(loadEdgeTableOfLabel( + edge_info, GraphArchive::AdjListType::ordered_by_source)); } if (this->directed_) { if (edge_info.ContainAdjList( GraphArchive::AdjListType::ordered_by_dest)) { - loadEdgeTableOfLabel(edge_info, - GraphArchive::AdjListType::ordered_by_dest); + BOOST_LEAF_CHECK(loadEdgeTableOfLabel( + edge_info, GraphArchive::AdjListType::ordered_by_dest)); } } } diff --git a/modules/graph/test/arrow_fragment_extend_test.cc b/modules/graph/test/arrow_fragment_extend_test.cc index f87395d93..b38a6d364 100644 --- a/modules/graph/test/arrow_fragment_extend_test.cc +++ b/modules/graph/test/arrow_fragment_extend_test.cc @@ -129,20 +129,20 @@ bool Validate(vineyard::Client& client, const grape::CommSpec& comm_spec, LOG(ERROR) << "different inner edges number"; return false; } - for (int j = 0; j < true_ivs[i].size(); ++j) { + for (size_t j = 0; j < true_ivs[i].size(); ++j) { if (true_ivs[i][j] != test_ivs[i][j]) { LOG(ERROR) << "ground-truth v is " << true_ivs[i][j] << "and program is " << test_ivs[i][j]; return false; } } - for (int j = 0; j < true_ies[i].size(); ++j) { + for (size_t j = 0; j < true_ies[i].size(); ++j) { if (true_ies[i][j] != test_ies[i][j]) { LOG(ERROR) << "different inner edge"; return false; } } - for (int j = 0; j < true_oes[i].size(); ++j) { + for (size_t j = 0; j < true_oes[i].size(); ++j) { if (true_oes[i][j] != test_oes[i][j]) { LOG(ERROR) << "different outgoing edge"; return false; diff --git a/modules/graph/utils/table_shuffler.cc b/modules/graph/utils/table_shuffler.cc index 49d4058f0..e7659d26c 100644 --- a/modules/graph/utils/table_shuffler.cc +++ b/modules/graph/utils/table_shuffler.cc @@ -304,30 +304,34 @@ static inline void select_list_items(std::shared_ptr array, TableAppender::TableAppender(std::shared_ptr schema) { for (const auto& field : schema->fields()) { std::shared_ptr type = field->type(); - if (type == arrow::uint64()) { + if (arrow::uint64()->Equals(type)) { funcs_.push_back(AppendHelper::append); - } else if (type == arrow::int64()) { + } else if (arrow::int64()->Equals(type)) { funcs_.push_back(AppendHelper::append); - } else if (type == arrow::uint32()) { + } else if (arrow::uint32()->Equals(type)) { funcs_.push_back(AppendHelper::append); - } else if (type == arrow::int32()) { + } else if (arrow::int32()->Equals(type)) { funcs_.push_back(AppendHelper::append); - } else if (type == arrow::float32()) { + } else if (arrow::float32()->Equals(type)) { funcs_.push_back(AppendHelper::append); - } else if (type == arrow::float64()) { + } else if (arrow::float64()->Equals(type)) { funcs_.push_back(AppendHelper::append); - } else if (type == arrow::large_binary()) { + } else if (arrow::large_binary()->Equals(type)) { funcs_.push_back(AppendHelper::append); - } else if (type == arrow::large_utf8()) { + } else if (arrow::large_utf8()->Equals(type)) { funcs_.push_back(AppendHelper::append); - } else if (type == arrow::null()) { + } else if (arrow::null()->Equals(type)) { funcs_.push_back(AppendHelper::append); - } else if (type->id() == arrow::Type::TIMESTAMP) { - funcs_.push_back(AppendHelper::append); - } else if (type == arrow::date32()) { + } else if (arrow::date32()->Equals(type)) { funcs_.push_back(AppendHelper::append); - } else if (type == arrow::date64()) { + } else if (arrow::date64()->Equals(type)) { funcs_.push_back(AppendHelper::append); + } else if (type->id() == arrow::Type::TIME32) { + funcs_.push_back(AppendHelper::append); + } else if (type->id() == arrow::Type::TIME64) { + funcs_.push_back(AppendHelper::append); + } else if (type->id() == arrow::Type::TIMESTAMP) { + funcs_.push_back(AppendHelper::append); } else { LOG(ERROR) << "Datatype [" << type->ToString() << "] not implemented..."; } @@ -554,7 +558,9 @@ Status CheckSchemaConsistency(const arrow::Schema& schema, void SerializeSelectedItems(grape::InArchive& arc, std::shared_ptr array, const std::vector& offset) { - if (array->type()->Equals(arrow::float64())) { + if (array->type()->Equals(arrow::null())) { + detail::serialize_null_items(arc, array, offset); + } else if (array->type()->Equals(arrow::float64())) { detail::serialize_typed_items(arc, array, offset); } else if (array->type()->Equals(arrow::float32())) { detail::serialize_typed_items(arc, array, offset); @@ -570,6 +576,21 @@ void SerializeSelectedItems(grape::InArchive& arc, detail::serialize_string_items(arc, array, offset); } else if (array->type()->Equals(arrow::null())) { detail::serialize_null_items(arc, array, offset); + } else if (array->type()->Equals(arrow::date32())) { + detail::serialize_typed_items(arc, array, + offset); + } else if (array->type()->Equals(arrow::date64())) { + detail::serialize_typed_items(arc, array, + offset); + } else if (array->type()->id() == arrow::Type::TIME32) { + detail::serialize_typed_items(arc, array, + offset); + } else if (array->type()->id() == arrow::Type::TIME64) { + detail::serialize_typed_items(arc, array, + offset); + } else if (array->type()->id() == arrow::Type::TIMESTAMP) { + detail::serialize_typed_items(arc, array, + offset); } else if (array->type()->Equals(arrow::large_list(arrow::float64()))) { detail::serialize_list_items(arc, array, offset); } else if (array->type()->Equals(arrow::large_list(arrow::float32()))) { @@ -602,7 +623,9 @@ void SerializeSelectedRows(grape::InArchive& arc, void DeserializeSelectedItems(grape::OutArchive& arc, int64_t num, arrow::ArrayBuilder* builder) { - if (builder->type()->Equals(arrow::float64())) { + if (builder->type()->Equals(arrow::null())) { + detail::deserialize_null_items(arc, num, builder); + } else if (builder->type()->Equals(arrow::float64())) { detail::deserialize_typed_items(arc, num, builder); } else if (builder->type()->Equals(arrow::float32())) { detail::deserialize_typed_items(arc, num, builder); @@ -616,8 +639,21 @@ void DeserializeSelectedItems(grape::OutArchive& arc, int64_t num, detail::deserialize_typed_items(arc, num, builder); } else if (builder->type()->Equals(arrow::large_utf8())) { detail::deserialize_string_items(arc, num, builder); - } else if (builder->type()->Equals(arrow::null())) { - detail::deserialize_null_items(arc, num, builder); + } else if (builder->type()->Equals(arrow::date32())) { + detail::deserialize_typed_items(arc, num, + builder); + } else if (builder->type()->Equals(arrow::date64())) { + detail::deserialize_typed_items(arc, num, + builder); + } else if (builder->type()->id() == arrow::Type::TIME32) { + detail::deserialize_typed_items(arc, num, + builder); + } else if (builder->type()->id() == arrow::Type::TIME64) { + detail::deserialize_typed_items(arc, num, + builder); + } else if (builder->type()->id() == arrow::Type::TIMESTAMP) { + detail::deserialize_typed_items(arc, num, + builder); } else if (builder->type()->Equals(arrow::large_list(arrow::float64()))) { detail::deserialize_list_items(arc, num, builder); } else if (builder->type()->Equals(arrow::large_list(arrow::float32()))) { @@ -663,7 +699,9 @@ void DeserializeSelectedRows(grape::OutArchive& arc, void SelectItems(std::shared_ptr array, const std::vector offset, arrow::ArrayBuilder* builder) { - if (array->type()->Equals(arrow::float64())) { + if (array->type()->Equals(arrow::null())) { + detail::select_null_items(array, offset, builder); + } else if (array->type()->Equals(arrow::float64())) { detail::select_typed_items(array, offset, builder); } else if (array->type()->Equals(arrow::float32())) { detail::select_typed_items(array, offset, builder); @@ -677,8 +715,16 @@ void SelectItems(std::shared_ptr array, detail::select_typed_items(array, offset, builder); } else if (array->type()->Equals(arrow::large_utf8())) { detail::select_string_items(array, offset, builder); - } else if (array->type()->Equals(arrow::null())) { - detail::select_null_items(array, offset, builder); + } else if (array->type()->Equals(arrow::date32())) { + detail::select_typed_items(array, offset, builder); + } else if (array->type()->Equals(arrow::date64())) { + detail::select_typed_items(array, offset, builder); + } else if (array->type()->id() == arrow::Type::TIME32) { + detail::select_typed_items(array, offset, builder); + } else if (array->type()->id() == arrow::Type::TIME64) { + detail::select_typed_items(array, offset, builder); + } else if (array->type()->id() == arrow::Type::TIMESTAMP) { + detail::select_typed_items(array, offset, builder); } else if (array->type()->Equals(arrow::large_list(arrow::float64()))) { detail::select_list_items(array, offset, builder); } else if (array->type()->Equals(arrow::large_list(arrow::float32()))) { diff --git a/modules/graph/utils/table_shuffler.h b/modules/graph/utils/table_shuffler.h index e84080404..466ad0984 100644 --- a/modules/graph/utils/table_shuffler.h +++ b/modules/graph/utils/table_shuffler.h @@ -137,6 +137,26 @@ struct AppendHelper { } }; +template <> +struct AppendHelper { + static Status append(arrow::ArrayBuilder* builder, + std::shared_ptr array, size_t offset) { + RETURN_ON_ARROW_ERROR(dynamic_cast(builder)->Append( + std::dynamic_pointer_cast(array)->GetView(offset))); + return Status::OK(); + } +}; + +template <> +struct AppendHelper { + static Status append(arrow::ArrayBuilder* builder, + std::shared_ptr array, size_t offset) { + RETURN_ON_ARROW_ERROR(dynamic_cast(builder)->Append( + std::dynamic_pointer_cast(array)->GetView(offset))); + return Status::OK(); + } +}; + template <> struct AppendHelper { static Status append(arrow::ArrayBuilder* builder, diff --git a/modules/graph/writer/arrow_fragment_writer.cc b/modules/graph/writer/arrow_fragment_writer.cc index 70d108b46..1c3ca1e23 100644 --- a/modules/graph/writer/arrow_fragment_writer.cc +++ b/modules/graph/writer/arrow_fragment_writer.cc @@ -65,6 +65,21 @@ void InitializeArrayArrayBuilders( builders[col_id] = std::make_shared(); } else if (arrow::large_utf8()->Equals(prop_type)) { builders[col_id] = std::make_shared(); + } else if (arrow::date32()->Equals(prop_type)) { + builders[col_id] = std::make_shared(); + } else if (arrow::date64()->Equals(prop_type)) { + builders[col_id] = std::make_shared(); + } else if (prop_type->id() == arrow::Type::TIME32) { + builders[col_id] = std::make_shared( + prop_type, arrow::default_memory_pool()); + } else if (prop_type->id() == arrow::Type::TIME64) { + builders[col_id] = std::make_shared( + prop_type, arrow::default_memory_pool()); + } else if (prop_type->id() == arrow::Type::TIMESTAMP) { + builders[col_id] = std::make_shared( + prop_type, arrow::default_memory_pool()); + } else { + LOG(FATAL) << "Unsupported property type: " << prop_type->ToString(); } ++col_id; } diff --git a/modules/graph/writer/arrow_fragment_writer_impl.h b/modules/graph/writer/arrow_fragment_writer_impl.h index 2ebdbbbf8..5ea412af9 100644 --- a/modules/graph/writer/arrow_fragment_writer_impl.h +++ b/modules/graph/writer/arrow_fragment_writer_impl.h @@ -456,6 +456,34 @@ ArrowFragmentWriter::appendPropertiesToArrowArrayBuilders( builders[col_id]); ARROW_OK_OR_RAISE( builder->Append(edge.template get_data(pid))); + } else if (arrow::date32()->Equals(prop_type)) { + auto builder = + std::dynamic_pointer_cast(builders[col_id]); + ARROW_OK_OR_RAISE(builder->Append( + edge.template get_data(pid))); + } else if (arrow::date64()->Equals(prop_type)) { + auto builder = + std::dynamic_pointer_cast(builders[col_id]); + ARROW_OK_OR_RAISE(builder->Append( + edge.template get_data(pid))); + } else if (prop_type->id() == arrow::Type::TIME32) { + auto builder = + std::dynamic_pointer_cast(builders[col_id]); + ARROW_OK_OR_RAISE(builder->Append( + edge.template get_data(pid))); + } else if (prop_type->id() == arrow::Type::TIME64) { + auto builder = + std::dynamic_pointer_cast(builders[col_id]); + ARROW_OK_OR_RAISE(builder->Append( + edge.template get_data(pid))); + } else if (prop_type->id() == arrow::Type::TIMESTAMP) { + auto builder = + std::dynamic_pointer_cast(builders[col_id]); + ARROW_OK_OR_RAISE(builder->Append( + edge.template get_data(pid))); + } else { + RETURN_GS_ERROR(ErrorCode::kInvalidValueError, + "Unsupported property type: " + prop_type->ToString()); } ++col_id; } diff --git a/python/vineyard/core/codegen/cpp.py b/python/vineyard/core/codegen/cpp.py index ce6574298..3325737b9 100644 --- a/python/vineyard/core/codegen/cpp.py +++ b/python/vineyard/core/codegen/cpp.py @@ -114,6 +114,11 @@ def codegen_create(class_header, class_name, class_name_elaborated, meth=False): construct_meta_tpl = ''' meta.GetKeyValue("{name}", this->{name});''' +construct_meta_optional_tpl = ''' + if (meta.HasKey("{name}")) {{ + meta.GetKeyValue("{name}", this->{name}); + }}''' + construct_plain_tpl = ''' this->{name}.Construct(meta.GetMemberMeta("{name}"));''' @@ -176,7 +181,10 @@ def codegen_construct( spec = parse_codegen_spec_from_type(field) name = field.spelling if spec.is_meta: - tpl = construct_meta_tpl + if spec.optional: + tpl = construct_meta_optional_tpl + else: + tpl = construct_meta_tpl if spec.is_plain: if spec.star: tpl = construct_plain_star_tpl diff --git a/python/vineyard/core/codegen/parsing.py b/python/vineyard/core/codegen/parsing.py index fe5d36027..0e55a87a9 100644 --- a/python/vineyard/core/codegen/parsing.py +++ b/python/vineyard/core/codegen/parsing.py @@ -49,13 +49,14 @@ # # __attribute__((annotate("vineyard"))): vineyard classes # __attribute__((annotate("shared"))): shared member/method -# __attribute__((annotate("streamable"))): shared member/method +# __attribute__((annotate("shared(optional)"))): shared member/method, optional +# __attribute__((annotate("vineyard(streamable)"))): shared member/method # __attribute__((annotate("distributed"))): shared member/method # class CodeGenKind: - def __init__(self, kind='meta', element_type=None): + def __init__(self, kind='meta', element_type=None, optional: bool = False): self.kind = kind if element_type is None: self.element_type = None @@ -72,6 +73,13 @@ def __init__(self, kind='meta', element_type=None): else: self.deref = '*' + # whether the metadata or member field is optional + self.optional = optional + + @property + def is_optional(self): + return self.optional + @property def is_meta(self): return self.kind == 'meta' @@ -98,19 +106,25 @@ def is_dict(self): def __repr__(self): star_str = '*' if self.star else '' + rep = None if self.is_meta: - return 'meta' + rep = 'meta' if self.is_plain: - return '%s%s' % (self.element_type, star_str) + rep = '%s%s' % (self.element_type, star_str) if self.is_list: - return '[%s%s]' % (self.element_type, star_str) + rep = '[%s%s]' % (self.element_type, star_str) if self.is_dlist: - return '[[%s%s]]' % (self.element_type, star_str) + rep = '[[%s%s]]' % (self.element_type, star_str) if self.is_set: - return '{%s%s}' % (self.element_type, star_str) + rep = '{%s%s}' % (self.element_type, star_str) if self.is_dict: - return '{%s: %s%s}' % (self.element_type[0], self.element_type[1], star_str) - raise RuntimeError('Invalid codegen kind: %s' % self.kind) + rep = '{%s: %s%s}' % (self.element_type[0], self.element_type[1], star_str) + if rep is not None: + if self.is_optional: + rep = 'optional(%s)' % rep + return rep + else: + raise RuntimeError('Invalid codegen kind: %s' % self.kind) def figure_out_namespace(node: Cursor) -> Optional[str]: @@ -215,6 +229,7 @@ def parse_codegen_spec_from_type(node: Cursor): 'Pointer of pointer %s is not supported' % node.type.spelling ) + optional = check_serialize_attribute(node) == 'shared(optional)' basename = typename.split('<')[0] namespace = figure_out_namespace(node_type.get_declaration()) @@ -242,10 +257,12 @@ def parse_codegen_spec_from_type(node: Cursor): 'pointer of primitive types inside Tuple/List is not ' 'supported: %s' % node.type.spelling ) - return CodeGenKind('meta') + return CodeGenKind('meta', optional=optional) else: typekind = 'list' - return CodeGenKind(typekind, (element_typename, inside_star)) + return CodeGenKind( + typekind, (element_typename, inside_star), optional=optional + ) if is_dict_type(namespace, basename): key_type = node_type.get_template_argument_type(0) @@ -258,17 +275,19 @@ def parse_codegen_spec_from_type(node: Cursor): 'pointer of primitive types inside Map is not supported: %s' % node.type.spelling ) - return CodeGenKind('meta') + return CodeGenKind('meta', optional=optional) else: return CodeGenKind( - 'dict', ((key_typename,), (value_typename, inside_star)) + 'dict', + ((key_typename,), (value_typename, inside_star)), + optional=optional, ) if is_primitive_types(node, node_type, typename, star): - return CodeGenKind('meta') + return CodeGenKind('meta', optional=optional) else: # directly return: generate data members, in pointer format - return CodeGenKind('plain', (basename, star)) + return CodeGenKind('plain', (basename, star), optional=optional) ############################################################################### @@ -392,6 +411,7 @@ def check_serialize_attribute(node): 'vineyard', 'vineyard(streamable)', 'shared', + 'shared(optional)', 'distributed', ]: if child.spelling.startswith(attr_kind): @@ -474,7 +494,7 @@ def find_fields(definition): if child.kind == CursorKind.FIELD_DECL: attribute = check_serialize_attribute(child) - if attribute in ['shared', 'distributed']: + if attribute in ['shared', 'shared(optional)', 'distributed']: fields.append(child) continue @@ -484,7 +504,7 @@ def find_fields(definition): raise ValueError( 'The annotation "[[distributed]]" is not allowed on methods' ) - if attribute == 'shared': + if attribute == 'shared' or attribute == 'shared(optional)': fields.append(child) if not has_post_construct and child.spelling == 'PostConstruct': for body in child.get_children(): @@ -587,7 +607,13 @@ def validate_and_strip_input_file(source): content = '\n'.join(content) # pass: rewrite `[[...]]` with `__attribute__((annotate(...)))` - attributes = ['vineyard', 'vineyard(streamable)', 'shared', 'distributed'] + attributes = [ + 'vineyard', + 'vineyard(streamable)', + 'shared', + 'shared(optional)', + 'distributed', + ] for attr in attributes: content = content.replace( '[[%s]]' % attr, '__attribute__((annotate("%s")))' % attr diff --git a/src/common/util/macros.h b/src/common/util/macros.h index b7e1d02cf..8a0f4c2a8 100644 --- a/src/common/util/macros.h +++ b/src/common/util/macros.h @@ -46,5 +46,9 @@ namespace vineyard { #define GET_MACRO3(_1, _2, _3, _4, NAME, ...) NAME #endif +// Allow comma in macro's argument, see also: +// https://stackoverflow.com/a/13842612/5080177 +#define ARG(...) __VA_ARGS__ + } // namespace vineyard #endif // SRC_COMMON_UTIL_MACROS_H_