From a384f8a1320144875792a06b9d2f64545704d4bf Mon Sep 17 00:00:00 2001 From: Weibin Zeng Date: Wed, 25 Oct 2023 13:57:31 +0800 Subject: [PATCH] [GraphAr] Align the vertices of fragment to vertex chunk size & Fix the shuffle array multi-thread bug & Bump up GraphAr (#1604) - Align the vertices of fragment to vertex chunk size by append nulls, so the multi-fragment writer out to gar format also correct - Fix the gar loader shuffle oid array bug: remove the multi-thread implementation since that shuffle already use thread to send/recv, that would make message conflict - Bump up GraphAr to support build with local installed arrow. Fixes #issue number --- modules/graph/fragment/gar_fragment_builder.h | 7 +- .../fragment/gar_fragment_builder_impl.h | 25 ++++--- modules/graph/loader/gar_fragment_loader.cc | 37 ++++++++++ modules/graph/loader/gar_fragment_loader.h | 4 ++ .../graph/loader/gar_fragment_loader_impl.h | 72 ++++++++++++------- modules/graph/thirdparty/GraphAr | 2 +- modules/graph/writer/arrow_fragment_writer.cc | 25 +++++++ modules/graph/writer/arrow_fragment_writer.h | 4 ++ .../graph/writer/arrow_fragment_writer_impl.h | 21 ++++-- .../writer/arrow_fragment_writer_string.cc | 26 +++++++ 10 files changed, 180 insertions(+), 43 deletions(-) create mode 100644 modules/graph/loader/gar_fragment_loader.cc create mode 100644 modules/graph/writer/arrow_fragment_writer_string.cc diff --git a/modules/graph/fragment/gar_fragment_builder.h b/modules/graph/fragment/gar_fragment_builder.h index d2fc42686..b1737f75a 100644 --- a/modules/graph/fragment/gar_fragment_builder.h +++ b/modules/graph/fragment/gar_fragment_builder.h @@ -53,7 +53,12 @@ class GARFragmentBuilder std::shared_ptr vm_ptr) : ArrowFragmentBaseBuilder(client), client_(client), - vm_ptr_(vm_ptr) {} + vm_ptr_(vm_ptr) { + Base::set_compact_edges_(false); + VINEYARD_ASSERT( + !Base::compact_edges_, + "Compacting edges is not supported when loading from GraphAr."); + } vineyard::Status Build(vineyard::Client& client) override; diff --git a/modules/graph/fragment/gar_fragment_builder_impl.h b/modules/graph/fragment/gar_fragment_builder_impl.h index 1bc792903..fd8a31403 100644 --- a/modules/graph/fragment/gar_fragment_builder_impl.h +++ b/modules/graph/fragment/gar_fragment_builder_impl.h @@ -136,24 +136,31 @@ boost::leaf::result generate_csr( int64_t num_chunks = src_chunks.size(); int64_t edge_num = offset_array->Value(offset_array->length() - 1); - for (int v_label = 0; v_label != vertex_label_num; ++v_label) { + auto tvnum = tvnums[v_label]; + // build the arrow's offset array + std::shared_ptr offsets_buffer; + ARROW_OK_ASSIGN_OR_RAISE( + offsets_buffer, arrow::AllocateBuffer((tvnum + 1) * sizeof(int64_t))); if (v_label == vertex_label) { - edge_offsets[v_label] = offset_array; + memcpy(offsets_buffer->mutable_data(), + reinterpret_cast(offset_array->raw_values()), + offset_array->length() * sizeof(int64_t)); + // we do not store the edge offset of outer vertices, so fill edge_num + // to the outer vertices offset + std::fill_n( + reinterpret_cast(offsets_buffer->mutable_data() + + offset_array->length() * sizeof(int64_t)), + (tvnum + 1) - offset_array->length(), edge_num); edges[v_label] = std::make_shared>(client, edge_num); } else { - auto tvnum = tvnums[v_label]; - // build the arrow's offset array - std::shared_ptr offsets_buffer; - ARROW_OK_ASSIGN_OR_RAISE( - offsets_buffer, arrow::AllocateBuffer((tvnum + 1) * sizeof(int64_t))); std::fill_n(reinterpret_cast(offsets_buffer->mutable_data()), tvnum + 1, 0); - edge_offsets[v_label] = std::make_shared( - arrow::int64(), tvnum + 1, offsets_buffer, nullptr, 0, 0); edges[v_label] = std::make_shared>(client, 0); } + edge_offsets[v_label] = std::make_shared( + arrow::int64(), tvnum + 1, offsets_buffer, nullptr, 0, 0); } std::vector chunk_offsets(num_chunks + 1, 0); diff --git a/modules/graph/loader/gar_fragment_loader.cc b/modules/graph/loader/gar_fragment_loader.cc new file mode 100644 index 000000000..43a2f11d2 --- /dev/null +++ b/modules/graph/loader/gar_fragment_loader.cc @@ -0,0 +1,37 @@ +/** Copyright 2020-2023 Alibaba Group Holding Limited. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +#ifdef ENABLE_GAR + +#include "graph/loader/gar_fragment_loader.h" + +#include "arrow/api.h" +#include "gar/graph_info.h" + +namespace vineyard { + +std::shared_ptr ConstructSchemaFromPropertyGroup( + const GraphArchive::PropertyGroup& property_group) { + std::vector> fields; + for (const auto& prop : property_group.GetProperties()) { + fields.emplace_back(arrow::field( + prop.name, GraphArchive::DataType::DataTypeToArrowDataType(prop.type))); + } + return arrow::schema(fields); +} + +} // namespace vineyard + +#endif // ENABLE_GAR diff --git a/modules/graph/loader/gar_fragment_loader.h b/modules/graph/loader/gar_fragment_loader.h index cf32fa318..5f13708a8 100644 --- a/modules/graph/loader/gar_fragment_loader.h +++ b/modules/graph/loader/gar_fragment_loader.h @@ -44,11 +44,15 @@ limitations under the License. namespace GraphArchive { class GraphInfo; class EdgeInfo; +class PropertyGroup; enum class AdjListType : std::uint8_t; } // namespace GraphArchive namespace vineyard { +std::shared_ptr ConstructSchemaFromPropertyGroup( + const GraphArchive::PropertyGroup& property_group); + template ::type, diff --git a/modules/graph/loader/gar_fragment_loader_impl.h b/modules/graph/loader/gar_fragment_loader_impl.h index fffe5c47c..275cbbeca 100644 --- a/modules/graph/loader/gar_fragment_loader_impl.h +++ b/modules/graph/loader/gar_fragment_loader_impl.h @@ -202,17 +202,29 @@ GARFragmentLoader::distributeVertices() { } vertex_labels_.push_back(label); vertex_chunk_sizes_.push_back(vertex_info.GetChunkSize()); - auto chunk_num = GraphArchive::utils::GetVertexChunkNum( + auto chunk_num_result = GraphArchive::utils::GetVertexChunkNum( graph_info_->GetPrefix(), vertex_info); - RETURN_GS_ERROR_IF_NOT_OK(chunk_num.status()); + RETURN_GS_ERROR_IF_NOT_OK(chunk_num_result.status()); // distribute the vertex chunks for fragments - int64_t bsize = chunk_num.value() / static_cast(comm_spec_.fnum()); - vertex_chunk_begin_of_frag_[label].resize(comm_spec_.fnum() + 1, 0); - for (fid_t fid = 0; fid < comm_spec_.fnum(); ++fid) { - vertex_chunk_begin_of_frag_[label][fid] = - static_cast(fid) * bsize; + auto chunk_num = chunk_num_result.value(); + + if (chunk_num < static_cast(comm_spec_.fnum())) { + int64_t index = 0; + for (; index < chunk_num; ++index) { + vertex_chunk_begin_of_frag_[label][index] = index; + } + for (; index <= static_cast(comm_spec_.fnum()); ++index) { + vertex_chunk_begin_of_frag_[label][index] = chunk_num; + } + } else { + int64_t bsize = chunk_num / static_cast(comm_spec_.fnum()); + vertex_chunk_begin_of_frag_[label].resize(comm_spec_.fnum() + 1, 0); + for (fid_t fid = 0; fid < comm_spec_.fnum(); ++fid) { + vertex_chunk_begin_of_frag_[label][fid] = + static_cast(fid) * bsize; + } + vertex_chunk_begin_of_frag_[label][comm_spec_.fnum()] = chunk_num; } - vertex_chunk_begin_of_frag_[label][comm_spec_.fnum()] = chunk_num.value(); } vertex_label_num_ = vertex_labels_.size(); for (size_t i = 0; i < vertex_labels_.size(); ++i) { @@ -246,8 +258,8 @@ boost::leaf::result GARFragmentLoader::constructVertexMap() { std::vector>> oid_lists( vertex_label_num_); - ThreadGroup tg(comm_spec_); - auto shuffle_procedure = [&](const label_id_t label_id) -> Status { + auto shuffle_procedure = + [&](const label_id_t label_id) -> boost::leaf::result { std::vector> shuffled_oid_array; auto& vertex_info = graph_info_->GetVertexInfo(vertex_labels_[label_id]).value(); @@ -267,7 +279,7 @@ GARFragmentLoader::constructVertexMap() { if (primary_key.empty()) { std::string msg = "primary key is not found in " + vertex_labels_[label_id] + " property groups"; - return Status::Invalid(msg); + RETURN_GS_ERROR(ErrorCode::kInvalidValueError, msg); } auto local_oid_array = vertex_tables_[label_id]->GetColumnByName(primary_key); @@ -275,25 +287,18 @@ GARFragmentLoader::constructVertexMap() { std::string msg = "primary key column " + primary_key + " is not found in " + vertex_labels_[label_id] + " table"; - return Status::Invalid(msg); + RETURN_GS_ERROR(ErrorCode::kInvalidValueError, msg); } - RETURN_ON_ERROR(FragmentAllGatherArray(comm_spec_, local_oid_array, - shuffled_oid_array)); + VY_OK_OR_RAISE(FragmentAllGatherArray(comm_spec_, local_oid_array, + shuffled_oid_array)); for (auto const& array : shuffled_oid_array) { oid_lists[label_id].emplace_back( std::dynamic_pointer_cast(array)); } - return Status::OK(); + return nullptr; }; for (label_id_t label_id = 0; label_id < vertex_label_num_; ++label_id) { - tg.AddTask(shuffle_procedure, label_id); - } - { - Status status; - for (auto const& s : tg.TakeResults()) { - status += s; - } - VY_OK_OR_RAISE(status); + BOOST_LEAF_CHECK(sync_gs_error(comm_spec_, shuffle_procedure, label_id)); } BasicArrowVertexMapBuilder vm_builder( @@ -362,11 +367,24 @@ GARFragmentLoader::loadVertexTableOfLabel( for (auto& t : threads) { t.join(); } - auto pg_table = arrow::ConcatenateTables(vertex_chunk_tables); - if (!pg_table.status().ok()) { - RETURN_GS_ERROR(ErrorCode::kArrowError, pg_table.status().message()); + std::shared_ptr pg_table; + if (vertex_chunk_num_of_fragment > 0) { + auto pg_table_ret = arrow::ConcatenateTables(vertex_chunk_tables); + if (!pg_table_ret.status().ok()) { + RETURN_GS_ERROR(ErrorCode::kArrowError, + pg_table_ret.status().message()); + } + pg_table = pg_table_ret.ValueOrDie(); + } else { + auto schema = ConstructSchemaFromPropertyGroup(pg); + auto pg_table_ret = arrow::Table::MakeEmpty(schema); + if (!pg_table_ret.status().ok()) { + RETURN_GS_ERROR(ErrorCode::kArrowError, + pg_table_ret.status().message()); + } + pg_table = pg_table_ret.ValueOrDie(); } - pg_tables.push_back(std::move(pg_table).ValueOrDie()); + pg_tables.push_back(std::move(pg_table)); } std::shared_ptr concat_table; VY_OK_OR_RAISE(ConcatenateTablesColumnWise(pg_tables, concat_table)); diff --git a/modules/graph/thirdparty/GraphAr b/modules/graph/thirdparty/GraphAr index bccd591ac..c6b689190 160000 --- a/modules/graph/thirdparty/GraphAr +++ b/modules/graph/thirdparty/GraphAr @@ -1 +1 @@ -Subproject commit bccd591ac7bbd5d9ef4bf8162b01667e2047e28c +Subproject commit c6b689190f58247f26632281b5f0cf51435fc655 diff --git a/modules/graph/writer/arrow_fragment_writer.cc b/modules/graph/writer/arrow_fragment_writer.cc index fe9d88bfc..70d108b46 100644 --- a/modules/graph/writer/arrow_fragment_writer.cc +++ b/modules/graph/writer/arrow_fragment_writer.cc @@ -70,6 +70,31 @@ void InitializeArrayArrayBuilders( } } +std::shared_ptr AppendNullsToArrowTable( + const std::shared_ptr& table, size_t num_rows_to_append) { + std::vector> columns; + for (int i = 0; i < table->num_columns(); ++i) { + auto type = table->field(i)->type(); + std::unique_ptr builder; + auto st = arrow::MakeBuilder(arrow::default_memory_pool(), type, &builder); + if (!st.ok()) { + LOG(FATAL) << "Failed to create array builder: " << st.message(); + } + st = builder->AppendNulls(num_rows_to_append); + if (!st.ok()) { + LOG(FATAL) << "Failed to append null to arrow table: " << st.message(); + } + std::shared_ptr nulls; + st = builder->Finish(&nulls); + if (!st.ok()) { + LOG(FATAL) << "Failed to finish null builder: " << st.message(); + } + columns.push_back(nulls); + } + auto null_table = arrow::Table::Make(table->schema(), columns); + return arrow::ConcatenateTables({table, null_table}).ValueOrDie(); +} + } // namespace vineyard #endif // ENABLE_GAR diff --git a/modules/graph/writer/arrow_fragment_writer.h b/modules/graph/writer/arrow_fragment_writer.h index e165b1e8a..0becaf81b 100644 --- a/modules/graph/writer/arrow_fragment_writer.h +++ b/modules/graph/writer/arrow_fragment_writer.h @@ -64,6 +64,9 @@ void InitializeArrayArrayBuilders( const property_graph_types::LABEL_ID_TYPE edge_label, const PropertyGraphSchema& graph_schema); +std::shared_ptr AppendNullsToArrowTable( + const std::shared_ptr& table, size_t num_rows_to_append); + template class ArrowFragmentWriter { using oid_t = typename FRAG_T::oid_t; @@ -118,6 +121,7 @@ class ArrowFragmentWriter { std::shared_ptr> frag_; grape::CommSpec comm_spec_; std::shared_ptr graph_info_; + std::map label_id_to_vnum_; }; } // namespace vineyard diff --git a/modules/graph/writer/arrow_fragment_writer_impl.h b/modules/graph/writer/arrow_fragment_writer_impl.h index 728d23a96..2ebdbbbf8 100644 --- a/modules/graph/writer/arrow_fragment_writer_impl.h +++ b/modules/graph/writer/arrow_fragment_writer_impl.h @@ -65,6 +65,7 @@ template boost::leaf::result ArrowFragmentWriter::WriteFragment() { BOOST_LEAF_CHECK(WriteVertices()); BOOST_LEAF_CHECK(WriteEdges()); + MPI_Barrier(comm_spec_.comm()); return {}; } @@ -106,14 +107,25 @@ boost::leaf::result ArrowFragmentWriter::WriteVertex( graph_info_->GetPrefix()); // write vertex data start from chunk index begin auto vertex_table = frag_->vertex_data_table(label_id); + auto num_rows = vertex_table->num_rows(); + if (frag_->fid() != frag_->fnum() - 1 && + num_rows % vertex_info.GetChunkSize() != 0) { + // Append nulls if the number of rows is not a multiple of chunk size. + vertex_table = AppendNullsToArrowTable( + vertex_table, + vertex_info.GetChunkSize() - num_rows % vertex_info.GetChunkSize()); + } auto st = writer.WriteTable(vertex_table, chunk_index_begin); if (!st.ok()) { RETURN_GS_ERROR(ErrorCode::kGraphArError, st.message()); } - if (comm_spec_.worker_id() == 0) { + if (frag_->fid() == frag_->fnum() - 1) { // write vertex number - auto st = writer.WriteVerticesNum(vm_ptr->GetTotalNodesNum(label_id)); + auto total_vertices_num = chunk_index_begin * vertex_info.GetChunkSize() + + vertex_table->num_rows(); + label_id_to_vnum_[label_id] = total_vertices_num; + auto st = writer.WriteVerticesNum(total_vertices_num); if (!st.ok()) { RETURN_GS_ERROR(ErrorCode::kGraphArError, st.message()); } @@ -381,10 +393,9 @@ boost::leaf::result ArrowFragmentWriter::writeEdgeImpl( "GAR error: " + std::to_string(static_cast(st.code())) + ", " + st.message()); } - if (comm_spec_.worker_id() == 0) { + if (frag_->fid() == frag_->fnum() - 1) { // write vertex number - auto st = writer.WriteVerticesNum( - frag_->GetVertexMap()->GetTotalNodesNum(main_label_id)); + auto st = writer.WriteVerticesNum(label_id_to_vnum_.at(main_label_id)); if (!st.ok()) { return Status::IOError( "GAR error: " + std::to_string(static_cast(st.code())) + ", " + diff --git a/modules/graph/writer/arrow_fragment_writer_string.cc b/modules/graph/writer/arrow_fragment_writer_string.cc new file mode 100644 index 000000000..47f9ec278 --- /dev/null +++ b/modules/graph/writer/arrow_fragment_writer_string.cc @@ -0,0 +1,26 @@ +/** Copyright 2020-2023 Alibaba Group Holding Limited. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +#ifdef ENABLE_GAR + +#include "graph/writer/arrow_fragment_writer_impl.h" + +namespace vineyard { + +template class ArrowFragmentWriter>; + +} // namespace vineyard + +#endif // ENABLE_GA