From 65e0515f13b071b8a748a4fb71258efadac12edf Mon Sep 17 00:00:00 2001 From: acezen Date: Fri, 23 Feb 2024 19:07:03 +0800 Subject: [PATCH 1/2] Upgrade GraphAr to v0.11.2 and revise the gar loader and writer Signed-off-by: acezen --- .github/workflows/build-test-graph.yml | 1 - modules/graph/CMakeLists.txt | 2 +- .../fragment/gar_fragment_builder_impl.h | 20 +- .../fragment/gar_fragment_builder_string.cc | 32 -- modules/graph/loader/gar_fragment_loader.cc | 6 +- modules/graph/loader/gar_fragment_loader.h | 55 ++-- .../graph/loader/gar_fragment_loader_impl.h | 297 ++++++++++-------- .../loader/gar_fragment_loader_string.cc | 28 -- modules/graph/test/arrow_fragment_gar_test.cc | 63 +++- modules/graph/thirdparty/GraphAr | 2 +- modules/graph/writer/arrow_fragment_writer.h | 40 ++- .../graph/writer/arrow_fragment_writer_impl.h | 226 +++++++++---- modules/graph/writer/util.cc | 73 +++++ modules/graph/writer/util.h | 40 +++ test/runner.py | 15 +- 15 files changed, 594 insertions(+), 306 deletions(-) delete mode 100644 modules/graph/fragment/gar_fragment_builder_string.cc delete mode 100644 modules/graph/loader/gar_fragment_loader_string.cc create mode 100644 modules/graph/writer/util.cc create mode 100644 modules/graph/writer/util.h diff --git a/.github/workflows/build-test-graph.yml b/.github/workflows/build-test-graph.yml index 179be9cb5..d082e1b8b 100644 --- a/.github/workflows/build-test-graph.yml +++ b/.github/workflows/build-test-graph.yml @@ -222,7 +222,6 @@ jobs: export LD_LIBRARY_PATH=$LD_LIBRARY_PATH:/usr/local/lib:/usr/local/lib64:/usr/local/lib/x86_64-linux-gnu export VINEYARD_DATA_DIR=`pwd`/gstest - export GAR_DATA_DIR=`pwd`/gar-test export TMPDIR="${TMPDIR:-$(dirname $(mktemp))}" rm -rf default.etcd diff --git a/modules/graph/CMakeLists.txt b/modules/graph/CMakeLists.txt index 2529a090b..1cceb3df8 100644 --- a/modules/graph/CMakeLists.txt +++ b/modules/graph/CMakeLists.txt @@ -134,7 +134,7 @@ install(DIRECTORY "${CMAKE_CURRENT_SOURCE_DIR}/thirdparty/powturbo/include" if(BUILD_VINEYARD_GRAPH_WITH_GAR) target_compile_definitions(vineyard_graph PUBLIC -DENABLE_GAR) - find_package(gar QUIET) + find_package(gar 0.11.2 QUIET) if (gar_FOUND) message(STATUS "-- Found GraphAr: ${GAR_LIBRARIES}") target_include_directories(vineyard_graph PRIVATE ${GAR_INCLUDE_DIRS}) diff --git a/modules/graph/fragment/gar_fragment_builder_impl.h b/modules/graph/fragment/gar_fragment_builder_impl.h index fd8a31403..661290ad4 100644 --- a/modules/graph/fragment/gar_fragment_builder_impl.h +++ b/modules/graph/fragment/gar_fragment_builder_impl.h @@ -140,27 +140,31 @@ boost::leaf::result generate_csr( auto tvnum = tvnums[v_label]; // build the arrow's offset array std::shared_ptr offsets_buffer; + int64_t buffer_length = static_cast(tvnum + 1); ARROW_OK_ASSIGN_OR_RAISE( - offsets_buffer, arrow::AllocateBuffer((tvnum + 1) * sizeof(int64_t))); + offsets_buffer, arrow::AllocateBuffer(buffer_length * sizeof(int64_t))); if (v_label == vertex_label) { + int64_t array_length = offset_array->length(); + VINEYARD_ASSERT(array_length <= buffer_length, + "Invalid offset array: the offset array length is larger " + "than the tvnum + 1."); memcpy(offsets_buffer->mutable_data(), reinterpret_cast(offset_array->raw_values()), - offset_array->length() * sizeof(int64_t)); + array_length * sizeof(int64_t)); // we do not store the edge offset of outer vertices, so fill edge_num // to the outer vertices offset - std::fill_n( - reinterpret_cast(offsets_buffer->mutable_data() + - offset_array->length() * sizeof(int64_t)), - (tvnum + 1) - offset_array->length(), edge_num); + std::fill_n(reinterpret_cast(offsets_buffer->mutable_data() + + array_length * sizeof(int64_t)), + buffer_length - array_length, edge_num); edges[v_label] = std::make_shared>(client, edge_num); } else { std::fill_n(reinterpret_cast(offsets_buffer->mutable_data()), - tvnum + 1, 0); + buffer_length, 0); edges[v_label] = std::make_shared>(client, 0); } edge_offsets[v_label] = std::make_shared( - arrow::int64(), tvnum + 1, offsets_buffer, nullptr, 0, 0); + arrow::int64(), buffer_length, offsets_buffer, nullptr, 0, 0); } std::vector chunk_offsets(num_chunks + 1, 0); diff --git a/modules/graph/fragment/gar_fragment_builder_string.cc b/modules/graph/fragment/gar_fragment_builder_string.cc deleted file mode 100644 index 254e79af3..000000000 --- a/modules/graph/fragment/gar_fragment_builder_string.cc +++ /dev/null @@ -1,32 +0,0 @@ -/** Copyright 2020-2023 Alibaba Group Holding Limited. - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ - -#ifdef ENABLE_GAR - -#include "graph/fragment/gar_fragment_builder_impl.h" - -namespace vineyard { - -template class GARFragmentBuilder< - std::string, uint32_t, - ArrowVertexMap::type, uint32_t>>; - -template class GARFragmentBuilder< - std::string, uint64_t, - ArrowVertexMap::type, uint64_t>>; - -} // namespace vineyard - -#endif // ENABLE_GAR diff --git a/modules/graph/loader/gar_fragment_loader.cc b/modules/graph/loader/gar_fragment_loader.cc index 43a2f11d2..d3d0b2a0b 100644 --- a/modules/graph/loader/gar_fragment_loader.cc +++ b/modules/graph/loader/gar_fragment_loader.cc @@ -17,15 +17,17 @@ limitations under the License. #include "graph/loader/gar_fragment_loader.h" +#include "gar/util/data_type.h" + #include "arrow/api.h" #include "gar/graph_info.h" namespace vineyard { std::shared_ptr ConstructSchemaFromPropertyGroup( - const GraphArchive::PropertyGroup& property_group) { + const std::shared_ptr& property_group) { std::vector> fields; - for (const auto& prop : property_group.GetProperties()) { + for (const auto& prop : property_group->GetProperties()) { fields.emplace_back(arrow::field( prop.name, GraphArchive::DataType::DataTypeToArrowDataType(prop.type))); } diff --git a/modules/graph/loader/gar_fragment_loader.h b/modules/graph/loader/gar_fragment_loader.h index 5f13708a8..9d41ff034 100644 --- a/modules/graph/loader/gar_fragment_loader.h +++ b/modules/graph/loader/gar_fragment_loader.h @@ -51,7 +51,7 @@ enum class AdjListType : std::uint8_t; namespace vineyard { std::shared_ptr ConstructSchemaFromPropertyGroup( - const GraphArchive::PropertyGroup& property_group); + const std::shared_ptr& property_group); template & vertex_labels = {}, + const std::vector>& edge_labels = {}, + bool directed = true, bool generate_eid = false, + bool store_in_local = false); ~GARFragmentLoader() = default; @@ -111,7 +122,7 @@ class GARFragmentLoader { const std::string& vertex_label); boost::leaf::result loadEdgeTableOfLabel( - const GraphArchive::EdgeInfo& edge_info, + const std::shared_ptr& edge_info, GraphArchive::AdjListType adj_list_type); boost::leaf::result initSchema(PropertyGraphSchema& schema); @@ -131,34 +142,12 @@ class GARFragmentLoader { label_id_t label_id, const std::shared_ptr id_array_in, bool all_be_local_vertex, std::shared_ptr& out); - fid_t getPartitionId(gar_id_t oid, label_id_t label_id) { - auto chunk_index = oid / vertex_chunk_sizes_[label_id]; - auto& vertex_chunk_begins = - vertex_chunk_begin_of_frag_[vertex_labels_[label_id]]; - // binary search - fid_t low = 0, high = comm_spec_.fnum(); - while (low <= high) { - fid_t mid = (low + high) / 2; - if (vertex_chunk_begins[mid] <= chunk_index && - vertex_chunk_begins[mid + 1] > chunk_index) { - return mid; - } else if (vertex_chunk_begins[mid] > chunk_index) { - high = mid - 1; - } else { - low = mid + 1; - } - } - return low; - } - private: Client& client_; grape::CommSpec comm_spec_; std::shared_ptr vm_ptr_; std::shared_ptr graph_info_; - std::map> vertex_chunk_begin_of_frag_; - bool directed_; label_id_t vertex_label_num_; @@ -177,6 +166,10 @@ class GARFragmentLoader { bool generate_eid_; IdParser vid_parser_; + + bool store_in_local_; + std::vector vertex_chunk_begins_; + std::vector vertex_chunk_nums_; }; namespace detail { diff --git a/modules/graph/loader/gar_fragment_loader_impl.h b/modules/graph/loader/gar_fragment_loader_impl.h index 59c6e120f..e28f3ed97 100644 --- a/modules/graph/loader/gar_fragment_loader_impl.h +++ b/modules/graph/loader/gar_fragment_loader_impl.h @@ -24,13 +24,17 @@ limitations under the License. #include #include +#include "arrow/filesystem/api.h" +#include "gar/graph_info.h" #include "gar/reader/arrow_chunk_reader.h" +#include "gar/util/adj_list_type.h" #include "gar/util/general_params.h" #include "graph/fragment/property_graph_utils.h" #include "graph/fragment/property_graph_utils_impl.h" #include "graph/loader/fragment_loader_utils.h" #include "graph/loader/gar_fragment_loader.h" +#include "graph/writer/util.h" namespace vineyard { @@ -47,19 +51,47 @@ template class VERTEX_MAP_T> GARFragmentLoader::GARFragmentLoader( Client& client, const grape::CommSpec& comm_spec, - const std::string& graph_info_yaml, bool directed, bool generate_eid) + const std::string& graph_info_yaml, + const std::vector& vertex_labels, + const std::vector>& edge_labels, bool directed, + bool generate_eid, bool store_in_local) : client_(client), comm_spec_(comm_spec), directed_(directed), - generate_eid_(generate_eid) { + generate_eid_(generate_eid), + store_in_local_(store_in_local) { // Load graph info. auto maybe_graph_info = GraphArchive::GraphInfo::Load(graph_info_yaml); if (!maybe_graph_info.status().ok()) { LOG(ERROR) << "Failed to load graph info from " << graph_info_yaml << ", error: " << maybe_graph_info.status().message(); } - graph_info_ = std::make_shared( - std::move(maybe_graph_info.value())); + graph_info_ = maybe_graph_info.value(); + if (!vertex_labels.empty() && !edge_labels.empty()) { + // project a subgraph from the original graph. + GraphArchive::VertexInfoVector project_vertex_infos; + GraphArchive::EdgeInfoVector project_edge_infos; + for (const auto& label : vertex_labels) { + auto vertex_info = graph_info_->GetVertexInfo(label); + if (vertex_info == nullptr) { + LOG(ERROR) << "Vertex label " << label << " is not found in graph info"; + } + project_vertex_infos.push_back(vertex_info); + } + for (const auto& triple : edge_labels) { + auto edge_info = + graph_info_->GetEdgeInfo(triple[0], triple[1], triple[2]); + if (edge_info == nullptr) { + LOG(ERROR) << "Edge label " << triple[0] << " -> " << triple[1] + << " -> " << triple[2] << " is not found in graph info"; + } + project_edge_infos.push_back(edge_info); + } + graph_info_ = GraphArchive::CreateGraphInfo( + graph_info_->GetName(), project_vertex_infos, project_edge_infos, + graph_info_->GetPrefix(), graph_info_->version(), + graph_info_->GetExtraInfo()); + } } template ::LoadEdgeTables() { csr_edge_tables_with_label_.resize(edge_label_num_); csc_edge_tables_with_label_.resize(edge_label_num_); // read the adj list chunk tables - for (const auto& edge : graph_info_->GetEdgeInfos()) { - const auto& edge_info = edge.second; - if (edge_info.ContainAdjList( + for (const auto& edge_info : graph_info_->GetEdgeInfos()) { + if (edge_info->HasAdjacentListType( GraphArchive::AdjListType::ordered_by_source)) { BOOST_LEAF_CHECK(loadEdgeTableOfLabel( edge_info, GraphArchive::AdjListType::ordered_by_source)); } if (this->directed_) { - if (edge_info.ContainAdjList( + if (edge_info->HasAdjacentListType( GraphArchive::AdjListType::ordered_by_dest)) { BOOST_LEAF_CHECK(loadEdgeTableOfLabel( edge_info, GraphArchive::AdjListType::ordered_by_dest)); @@ -193,37 +224,79 @@ template class VERTEX_MAP_T> boost::leaf::result GARFragmentLoader::distributeVertices() { - for (const auto& item : graph_info_->GetVertexInfos()) { - const auto& label = item.first; - const auto& vertex_info = item.second; - if (std::find(vertex_labels_.begin(), vertex_labels_.end(), label) != - vertex_labels_.end()) { - continue; - } + vertex_chunk_begins_.resize(graph_info_->VertexInfoNum(), 0); + vertex_chunk_nums_.resize(graph_info_->VertexInfoNum(), 0); + for (int i = 0; i < graph_info_->VertexInfoNum(); ++i) { + const auto& vertex_info = graph_info_->GetVertexInfoByIndex(i); + const auto& label = vertex_info->GetLabel(); vertex_labels_.push_back(label); - vertex_chunk_sizes_.push_back(vertex_info.GetChunkSize()); - auto chunk_num_result = GraphArchive::util::GetVertexChunkNum( - graph_info_->GetPrefix(), vertex_info); - RETURN_GS_ERROR_IF_NOT_OK(chunk_num_result.status()); - // distribute the vertex chunks for fragments - auto chunk_num = chunk_num_result.value(); - - if (chunk_num < static_cast(comm_spec_.fnum())) { - int64_t index = 0; - for (; index < chunk_num; ++index) { - vertex_chunk_begin_of_frag_[label][index] = index; + vertex_chunk_sizes_.push_back(vertex_info->GetChunkSize()); + if (store_in_local_) { + // distribute the vertex chunks base on the local metadata + const auto& extra_info = graph_info_->GetExtraInfo(); + if (extra_info.find(LOCAL_METADATA_KEY) == extra_info.end()) { + RETURN_GS_ERROR( + ErrorCode::kInvalidValueError, + "The local metadata key-value is not found in graph info"); + } + std::string local_metadata_prefix = extra_info.at(LOCAL_METADATA_KEY); + std::string path = graph_info_->GetPrefix() + vertex_info->GetPrefix() + + local_metadata_prefix + + std::to_string(comm_spec_.fid()); + + std::shared_ptr file; + std::shared_ptr fs; + + auto fs_result = arrow::fs::FileSystemFromUriOrPath(path); + if (!fs_result.ok()) { + RETURN_GS_ERROR(ErrorCode::kArrowError, fs_result.status().message()); + } + fs = fs_result.ValueOrDie(); + auto input_stream_result = fs->OpenInputStream(path); + if (!input_stream_result.status().ok()) { + RETURN_GS_ERROR(ErrorCode::kArrowError, + input_stream_result.status().message()); + } + auto input_stream = input_stream_result.ValueOrDie(); + // read the vertex chunk begin of vertex label i + auto read_result = + input_stream->Read(sizeof(int64_t), &vertex_chunk_begins_[i]); + if (!read_result.ok()) { + RETURN_GS_ERROR(ErrorCode::kArrowError, read_result.status().message()); } - for (; index <= static_cast(comm_spec_.fnum()); ++index) { - vertex_chunk_begin_of_frag_[label][index] = chunk_num; + assert(read_result.ValueOrDie() == sizeof(int64_t)); + // read the vertex chunk num of vertex label i + read_result = input_stream->Read(sizeof(int64_t), &vertex_chunk_nums_[i]); + if (!read_result.ok()) { + RETURN_GS_ERROR(ErrorCode::kArrowError, read_result.status().message()); } + assert(read_result.ValueOrDie() == sizeof(int64_t)); } else { - int64_t bsize = chunk_num / static_cast(comm_spec_.fnum()); - vertex_chunk_begin_of_frag_[label].resize(comm_spec_.fnum() + 1, 0); - for (fid_t fid = 0; fid < comm_spec_.fnum(); ++fid) { - vertex_chunk_begin_of_frag_[label][fid] = - static_cast(fid) * bsize; + // distribute the vertex chunks for fragments + auto chunk_num_result = GraphArchive::util::GetVertexChunkNum( + graph_info_->GetPrefix(), vertex_info); + RETURN_GS_ERROR_IF_NOT_OK(chunk_num_result.status()); + auto chunk_num = chunk_num_result.value(); + + if (chunk_num <= static_cast(comm_spec_.fnum())) { + if (chunk_num < comm_spec_.fid() + 1) { + // no vertex chunk can be assigned to this fragment + vertex_chunk_begins_[i] = 0; + vertex_chunk_nums_[i] = 0; + } else { + vertex_chunk_begins_[i] = static_cast(comm_spec_.fid()); + vertex_chunk_nums_[i] = 1; + } + } else { + int64_t bsize = chunk_num / static_cast(comm_spec_.fnum()); + vertex_chunk_begins_[i] = + static_cast(comm_spec_.fid()) * bsize; + if (comm_spec_.fid() == comm_spec_.fnum() - 1) { + vertex_chunk_nums_[i] = chunk_num - vertex_chunk_begins_[i]; + } else { + vertex_chunk_nums_[i] = bsize; + } } - vertex_chunk_begin_of_frag_[label][comm_spec_.fnum()] = chunk_num; } } vertex_label_num_ = vertex_labels_.size(); @@ -231,12 +304,11 @@ GARFragmentLoader::distributeVertices() { vertex_label_to_index_[vertex_labels_[i]] = i; } - for (const auto& item : graph_info_->GetEdgeInfos()) { - auto& edge_info = item.second; + for (const auto& edge_info : graph_info_->GetEdgeInfos()) { // record edge label - const auto& edge_label = edge_info.GetEdgeLabel(); - const auto& src_label = edge_info.GetSrcLabel(); - const auto& dst_label = edge_info.GetDstLabel(); + const auto& edge_label = edge_info->GetEdgeLabel(); + const auto& src_label = edge_info->GetSrcLabel(); + const auto& dst_label = edge_info->GetDstLabel(); auto it = std::find(edge_labels_.begin(), edge_labels_.end(), edge_label); if (it == edge_labels_.end()) { edge_labels_.push_back(edge_label); @@ -261,32 +333,13 @@ GARFragmentLoader::constructVertexMap() { auto shuffle_procedure = [&](const label_id_t label_id) -> boost::leaf::result { std::vector> shuffled_oid_array; - auto& vertex_info = - graph_info_->GetVertexInfo(vertex_labels_[label_id]).value(); - const auto& property_groups = vertex_info.GetPropertyGroups(); - std::string primary_key; - for (const auto& pg : property_groups) { - for (const auto& prop : pg.GetProperties()) { - if (prop.is_primary) { - primary_key = prop.name; - break; - } - } - if (!primary_key.empty()) { - break; - } - } - if (primary_key.empty()) { - std::string msg = "primary key is not found in " + - vertex_labels_[label_id] + " property groups"; - RETURN_GS_ERROR(ErrorCode::kInvalidValueError, msg); - } - auto local_oid_array = - vertex_tables_[label_id]->GetColumnByName(primary_key); + const auto& vertex_info = + graph_info_->GetVertexInfo(vertex_labels_[label_id]); + auto local_oid_array = vertex_tables_[label_id]->GetColumnByName( + GraphArchive::GeneralParams::kVertexIndexCol); if (local_oid_array == nullptr) { - std::string msg = "primary key column " + primary_key + - " is not found in " + vertex_labels_[label_id] + - " table"; + std::string msg = "vertex index column is not found in " + + vertex_labels_[label_id] + " table"; RETURN_GS_ERROR(ErrorCode::kInvalidValueError, msg); } VY_OK_OR_RAISE(FragmentAllGatherArray(comm_spec_, local_oid_array, @@ -314,18 +367,12 @@ template GARFragmentLoader::loadVertexTableOfLabel( const std::string& vertex_label) { - auto maybe_vertex_info = graph_info_->GetVertexInfo(vertex_label); - RETURN_GS_ERROR_IF_NOT_OK(maybe_vertex_info.status()); - auto& vertex_info = maybe_vertex_info.value(); - const auto& label = vertex_info.GetLabel(); - label_id_t label_id = vertex_label_to_index_[label]; - auto vertex_chunk_begin = - vertex_chunk_begin_of_frag_[label][comm_spec_.fid()]; - auto vertex_chunk_num_of_fragment = - vertex_chunk_begin_of_frag_[label][comm_spec_.fid() + 1] - - vertex_chunk_begin_of_frag_[label][comm_spec_.fid()]; - auto chunk_size = vertex_info.GetChunkSize(); - const auto& property_groups = vertex_info.GetPropertyGroups(); + auto vertex_info = graph_info_->GetVertexInfo(vertex_label); + label_id_t label_id = vertex_label_to_index_[vertex_label]; + auto vertex_chunk_begin = vertex_chunk_begins_[label_id]; + auto vertex_chunk_num_of_fragment = vertex_chunk_nums_[label_id]; + auto chunk_size = vertex_info->GetChunkSize(); + const auto& property_groups = vertex_info->GetPropertyGroups(); table_vec_t pg_tables; int64_t thread_num = @@ -339,9 +386,8 @@ GARFragmentLoader::loadVertexTableOfLabel( std::atomic cur_chunk_index(0); for (int64_t i = 0; i < thread_num; ++i) { threads[i] = std::thread([&]() -> boost::leaf::result { - auto maybe_reader = - GraphArchive::ConstructVertexPropertyArrowChunkReader( - *(graph_info_.get()), label, pg); + auto maybe_reader = GraphArchive::VertexPropertyArrowChunkReader::Make( + vertex_info, pg, graph_info_->GetPrefix()); RETURN_GS_ERROR_IF_NOT_OK(maybe_reader.status()); auto& reader = maybe_reader.value(); while (true) { @@ -354,8 +400,8 @@ GARFragmentLoader::loadVertexTableOfLabel( int64_t iter = begin; while (iter != end) { RETURN_GS_ERROR_IF_NOT_OK( - reader.seek((vertex_chunk_begin + iter) * chunk_size)); - auto chunk_table = reader.GetChunk(); + reader->seek((vertex_chunk_begin + iter) * chunk_size)); + auto chunk_table = reader->GetChunk(); RETURN_GS_ERROR_IF_NOT_OK(chunk_table.status()); vertex_chunk_tables[iter] = chunk_table.value(); ++iter; @@ -394,7 +440,7 @@ GARFragmentLoader::loadVertexTableOfLabel( std::shared_ptr table_out; VY_OK_OR_RAISE(CastTableToSchema(concat_table, normalized_schema, table_out)); auto metadata = std::make_shared(); - metadata->Append("label", label); + metadata->Append("label", vertex_label); metadata->Append("label_id", std::to_string(label_id)); metadata->Append("type", PropertyGraphSchema::VERTEX_TYPE_NAME); metadata->Append("retain_oid", std::to_string(false)); @@ -406,32 +452,29 @@ template class VERTEX_MAP_T> boost::leaf::result GARFragmentLoader::loadEdgeTableOfLabel( - const GraphArchive::EdgeInfo& edge_info, + const std::shared_ptr& edge_info, GraphArchive::AdjListType adj_list_type) { - auto src_label = edge_info.GetSrcLabel(); - auto dst_label = edge_info.GetDstLabel(); - auto edge_label = edge_info.GetEdgeLabel(); - const auto& property_groups = - edge_info.GetPropertyGroups(adj_list_type).value(); + auto src_label = edge_info->GetSrcLabel(); + auto dst_label = edge_info->GetDstLabel(); + auto edge_label = edge_info->GetEdgeLabel(); + const auto& property_groups = edge_info->GetPropertyGroups(); int64_t vertex_chunk_begin = 0; int64_t vertex_chunk_num_of_fragment = 0; - int64_t edge_chunk_size = edge_info.GetChunkSize(); + int64_t edge_chunk_size = edge_info->GetChunkSize(); int64_t vertex_chunk_size; if (adj_list_type == GraphArchive::AdjListType::ordered_by_source) { vertex_chunk_begin = - vertex_chunk_begin_of_frag_[src_label][comm_spec_.fid()]; + vertex_chunk_begins_[vertex_label_to_index_[src_label]]; vertex_chunk_num_of_fragment = - vertex_chunk_begin_of_frag_[src_label][comm_spec_.fid() + 1] - - vertex_chunk_begin_of_frag_[src_label][comm_spec_.fid()]; - vertex_chunk_size = edge_info.GetSrcChunkSize(); + vertex_chunk_nums_[vertex_label_to_index_[src_label]]; + vertex_chunk_size = edge_info->GetSrcChunkSize(); } else { vertex_chunk_begin = - vertex_chunk_begin_of_frag_[dst_label][comm_spec_.fid()]; + vertex_chunk_begins_[vertex_label_to_index_[dst_label]]; vertex_chunk_num_of_fragment = - vertex_chunk_begin_of_frag_[dst_label][comm_spec_.fid() + 1] - - vertex_chunk_begin_of_frag_[dst_label][comm_spec_.fid()]; - vertex_chunk_size = edge_info.GetDstChunkSize(); + vertex_chunk_nums_[vertex_label_to_index_[dst_label]]; + vertex_chunk_size = edge_info->GetDstChunkSize(); } std::vector> offset_arrays( vertex_chunk_num_of_fragment); @@ -449,11 +492,10 @@ GARFragmentLoader::loadEdgeTableOfLabel( for (int64_t i = 0; i < thread_num; ++i) { threads[i] = std::thread([&]() -> boost::leaf::result { auto maybe_offset_reader = - GraphArchive::ConstructAdjListOffsetArrowChunkReader( - *(graph_info_.get()), src_label, edge_label, dst_label, - adj_list_type); + GraphArchive::AdjListOffsetArrowChunkReader::Make( + edge_info, adj_list_type, graph_info_->GetPrefix()); RETURN_GS_ERROR_IF_NOT_OK(maybe_offset_reader.status()); - auto& offset_reader = maybe_offset_reader.value(); + auto offset_reader = maybe_offset_reader.value(); while (true) { int64_t begin = cur_chunk.fetch_add(batch_size); if (begin >= vertex_chunk_num_of_fragment) { @@ -465,8 +507,8 @@ GARFragmentLoader::loadEdgeTableOfLabel( while (iter != end) { int64_t vertex_chunk_id = iter + vertex_chunk_begin; RETURN_GS_ERROR_IF_NOT_OK( - offset_reader.seek(vertex_chunk_id * vertex_chunk_size)); - auto offset_result = offset_reader.GetChunk(); + offset_reader->seek(vertex_chunk_id * vertex_chunk_size)); + auto offset_result = offset_reader->GetChunk(); RETURN_GS_ERROR_IF_NOT_OK(offset_result.status()); offset_arrays[iter] = std::dynamic_pointer_cast( offset_result.value()); @@ -500,24 +542,28 @@ GARFragmentLoader::loadEdgeTableOfLabel( auto total_edge_chunk_num = agg_edge_chunk_num.back(); table_vec_t edge_chunk_tables(total_edge_chunk_num); - std::vector edge_property_chunk_tables(property_groups.size()); - for (size_t i = 0; i < property_groups.size(); ++i) { + std::vector edge_property_chunk_tables( + edge_info->PropertyGroupNum()); + for (int i = 0; i < edge_info->PropertyGroupNum(); ++i) { edge_property_chunk_tables[i].resize(total_edge_chunk_num); } std::atomic cur(0); batch_size = (total_edge_chunk_num + thread_num - 1) / thread_num; for (int64_t i = 0; i < thread_num; ++i) { threads[i] = std::thread([&]() -> boost::leaf::result { - auto expect = GraphArchive::ConstructAdjListArrowChunkReader( - *(graph_info_.get()), src_label, edge_label, dst_label, - adj_list_type); - RETURN_GS_ERROR_IF_NOT_OK(expect.status()); - auto& reader = expect.value(); - std::vector + auto maybe_reader = GraphArchive::AdjListArrowChunkReader::Make( + edge_info, adj_list_type, graph_info_->GetPrefix()); + RETURN_GS_ERROR_IF_NOT_OK(maybe_reader.status()); + auto reader = maybe_reader.value(); + std::vector< + std::shared_ptr> property_readers; for (const auto& pg : property_groups) { - property_readers.emplace_back(edge_info, pg, adj_list_type, - graph_info_->GetPrefix()); + auto maybe_pg_reader = + GraphArchive::AdjListPropertyArrowChunkReader::Make( + edge_info, pg, adj_list_type, graph_info_->GetPrefix()); + RETURN_GS_ERROR_IF_NOT_OK(maybe_pg_reader.status()); + property_readers.emplace_back(maybe_pg_reader.value()); } while (true) { int64_t begin = cur.fetch_add(batch_size); @@ -534,15 +580,15 @@ GARFragmentLoader::loadEdgeTableOfLabel( auto vertex_chunk_id = chunk_pair.first + vertex_chunk_begin; auto edge_chunk_index = chunk_pair.second; RETURN_GS_ERROR_IF_NOT_OK( - reader.seek_chunk_index(vertex_chunk_id, edge_chunk_index)); - auto edge_chunk_result = reader.GetChunk(); + reader->seek_chunk_index(vertex_chunk_id, edge_chunk_index)); + auto edge_chunk_result = reader->GetChunk(); RETURN_GS_ERROR_IF_NOT_OK(edge_chunk_result.status()); edge_chunk_tables[iter] = edge_chunk_result.value(); for (size_t j = 0; j < property_groups.size(); ++j) { auto& pg_reader = property_readers[j]; RETURN_GS_ERROR_IF_NOT_OK( - pg_reader.seek_chunk_index(vertex_chunk_id, edge_chunk_index)); - auto pg_result = pg_reader.GetChunk(); + pg_reader->seek_chunk_index(vertex_chunk_id, edge_chunk_index)); + auto pg_result = pg_reader->GetChunk(); RETURN_GS_ERROR_IF_NOT_OK(pg_result.status()); edge_property_chunk_tables[j][iter] = pg_result.value(); } @@ -559,6 +605,7 @@ GARFragmentLoader::loadEdgeTableOfLabel( auto adj_list_table = arrow::ConcatenateTables(edge_chunk_tables); RETURN_GS_ERROR_IF_NOT_OK(adj_list_table.status()); std::shared_ptr adj_list_table_with_gid; + // internal id to global id BOOST_LEAF_ASSIGN( adj_list_table_with_gid, parseEdgeIdArrays(std::move(adj_list_table).ValueOrDie(), @@ -743,24 +790,24 @@ GARFragmentLoader::parseIdChunkedArrayChunk( } vid_t* builder = reinterpret_cast(buffer->mutable_data()); - const auto& label_name = vertex_labels_[label_id]; const gar_id_t* ids = reinterpret_cast(id_array->raw_values()); if (all_be_local_vertex) { gar_id_t start_id = - vertex_chunk_begin_of_frag_[label_name][comm_spec_.fid()] * - vertex_chunk_sizes_[label_id]; + vertex_chunk_begins_[label_id] * vertex_chunk_sizes_[label_id]; for (int64_t k = 0; k != id_array->length(); ++k) { builder[k] = vid_parser_.GenerateId(comm_spec_.fid(), label_id, ids[k] - start_id); } } else { + vid_t gid = 0; for (int64_t k = 0; k != id_array->length(); ++k) { - fid_t fid = getPartitionId(ids[k], label_id); - builder[k] = vid_parser_.GenerateId( - fid, label_id, - ids[k] - vertex_chunk_begin_of_frag_[label_name][fid] * - vertex_chunk_sizes_[label_id]); + if (vm_ptr_->GetGid(label_id, ids[k], gid)) { + builder[k] = gid; + } else { + LOG(WARNING) << "vertex " << ids[k] << " is not found in fragment " + << comm_spec_.fid(); + } } } out = std::make_shared>( diff --git a/modules/graph/loader/gar_fragment_loader_string.cc b/modules/graph/loader/gar_fragment_loader_string.cc deleted file mode 100644 index e04c71022..000000000 --- a/modules/graph/loader/gar_fragment_loader_string.cc +++ /dev/null @@ -1,28 +0,0 @@ -/** Copyright 2020-2023 Alibaba Group Holding Limited. - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ - -#ifdef ENABLE_GAR - -#include "graph/loader/gar_fragment_loader_impl.h" - -namespace vineyard { - -template class GARFragmentLoader; - -template class GARFragmentLoader; - -} // namespace vineyard - -#endif // ENABLE_GAR diff --git a/modules/graph/test/arrow_fragment_gar_test.cc b/modules/graph/test/arrow_fragment_gar_test.cc index 5ebed0054..4a33c7418 100644 --- a/modules/graph/test/arrow_fragment_gar_test.cc +++ b/modules/graph/test/arrow_fragment_gar_test.cc @@ -25,6 +25,7 @@ limitations under the License. #include "graph/fragment/arrow_fragment.h" #include "graph/fragment/graph_schema.h" +#include "graph/loader/arrow_fragment_loader.h" #include "graph/loader/gar_fragment_loader.h" #include "graph/writer/arrow_fragment_writer.h" @@ -32,6 +33,8 @@ using namespace vineyard; // NOLINT(build/namespaces) using GraphType = ArrowFragment; +using StringGraphType = + ArrowFragment; using LabelType = typename GraphType::label_id_t; void traverse_graph(std::shared_ptr graph, const std::string& path) { @@ -67,11 +70,14 @@ void traverse_graph(std::shared_ptr graph, const std::string& path) { } } -boost::leaf::result write_out_to_gar(const grape::CommSpec& comm_spec, - std::shared_ptr graph, - const std::string& graph_yaml_path) { - auto writer = std::make_unique>( - graph, comm_spec, graph_yaml_path); +boost::leaf::result write_out_to_gar( + const grape::CommSpec& comm_spec, std::shared_ptr graph, + const std::string& output_path, const std::string& file_type) { + auto writer = std::make_unique>( + graph, comm_spec, /* graph_name */ "graph", output_path, + /* vertex_chunk_size */ 512, + /* edge_chunk_size */ 1024, file_type); + BOOST_LEAF_CHECK(writer->WriteGraphInfo(output_path)); BOOST_LEAF_CHECK(writer->WriteFragment()); LOG(INFO) << "[worker-" << comm_spec.worker_id() << "] generate GAR files..."; return 0; @@ -80,19 +86,21 @@ boost::leaf::result write_out_to_gar(const grape::CommSpec& comm_spec, int main(int argc, char** argv) { if (argc < 3) { printf( - "usage: ./arrow_fragment_test " - "[directed]\n"); + "usage: ./arrow_fragment_gar_test vdata_path edata_path " + "output_path file_type\n"); return 1; } int index = 1; std::string ipc_socket = std::string(argv[index++]); - std::string graph_yaml_path = - vineyard::ExpandEnvironmentVariables(argv[index++]); - int directed = 1; - if (argc > index) { - directed = atoi(argv[index]); - } + std::string v_file_path = vineyard::ExpandEnvironmentVariables(argv[index++]); + std::string e_file_path = vineyard::ExpandEnvironmentVariables(argv[index++]); + std::string output_path = vineyard::ExpandEnvironmentVariables(argv[index++]); + std::string file_type = std::string(argv[index++]); + + std::string v_file_suffix = ".csv#header_row=true&label=person"; + std::string e_file_suffix = + ".csv#header_row=true&label=knows&src_label=person&dst_label=person"; vineyard::Client client; VINEYARD_CHECK_OK(client.Connect(ipc_socket)); @@ -105,12 +113,38 @@ int main(int argc, char** argv) { grape::CommSpec comm_spec; comm_spec.Init(MPI_COMM_WORLD); + // Load graph from csv + vineyard::ObjectID frag_group; + { + std::string vfile = v_file_path + v_file_suffix; + std::string efile = e_file_path + e_file_suffix; + auto loader = std::make_unique< + ArrowFragmentLoader>( + client, comm_spec, std::vector{efile}, + std::vector{vfile}, /*directed*/ true, + /*generate_eid*/ false, /*retain_oid*/ true); + frag_group = loader->LoadFragmentAsFragmentGroup().value(); + LOG(INFO) << "Loaded fragment group: " << ObjectIDToString(frag_group); + } + + // Write out to GAR files + { + auto fg = std::dynamic_pointer_cast( + client.GetObject(frag_group)); + auto fid = comm_spec.WorkerToFrag(comm_spec.worker_id()); + auto frag_id = fg->Fragments().at(fid); + auto arrow_frag = + std::static_pointer_cast(client.GetObject(frag_id)); + write_out_to_gar(comm_spec, arrow_frag, output_path, file_type); + } + // Load from GAR files { + std::string graph_yaml_path = output_path + "graph.graph.yaml"; auto loader = std::make_unique>( - client, comm_spec, graph_yaml_path, directed != 0); + client, comm_spec, graph_yaml_path); vineyard::ObjectID fragment_id = loader->LoadFragment().value(); std::shared_ptr graph = @@ -119,7 +153,6 @@ int main(int argc, char** argv) { << "]: " << ObjectIDToString(fragment_id); traverse_graph(graph, "./xx/output_graph_" + std::to_string(graph->fid())); - write_out_to_gar(comm_spec, graph, graph_yaml_path); } } grape::FinalizeMPIComm(); diff --git a/modules/graph/thirdparty/GraphAr b/modules/graph/thirdparty/GraphAr index f37895d8c..4158bb816 160000 --- a/modules/graph/thirdparty/GraphAr +++ b/modules/graph/thirdparty/GraphAr @@ -1 +1 @@ -Subproject commit f37895d8c393f0587282b16f70fcea70bf654aff +Subproject commit 4158bb8167694b0bca5d799c9ce3f52b71b61141 diff --git a/modules/graph/writer/arrow_fragment_writer.h b/modules/graph/writer/arrow_fragment_writer.h index 0becaf81b..85634eaaf 100644 --- a/modules/graph/writer/arrow_fragment_writer.h +++ b/modules/graph/writer/arrow_fragment_writer.h @@ -85,13 +85,46 @@ class ArrowFragmentWriter { using fragment_t = FRAG_T; + protected: + static constexpr const char* MARKER = "PROGRESS--GRAPH-LOADING-"; + public: + /** + * @brief Initialize ArrowFragmentWriter with graph info + * + * @param frag The fragment to write + * @param comm_spec The communicator specification + * @param graph_info_yaml The graph info yaml path + */ ArrowFragmentWriter(const std::shared_ptr& frag, const grape::CommSpec& comm_spec, const std::string& graph_info_yaml); + /** + * @brief Initialize ArrowFragmentWriter with base graph info information + * The graph info will be generated from the fragment schema and the given + * information. + * + * @param frag The fragment to write + * @param comm_spec The communicator specification + * @param graph_name The graph name + * @param path The path to store the fragment + * @param vertex_block_size The vertex chunk size + * @param edge_block_size The edge chunk size + * @param file_type The file type to store the fragment + * @param store_in_local Whether write the fragment to local + */ + ArrowFragmentWriter(const std::shared_ptr& frag, + const grape::CommSpec& comm_spec, + const std::string& graph_name, + const std::string& out_path, int64_t vertex_block_size, + int64_t edge_block_size, const std::string& file_type, + bool store_in_local = false); + ~ArrowFragmentWriter() = default; + boost::leaf::result WriteGraphInfo(const std::string& output_path); + boost::leaf::result WriteFragment(); boost::leaf::result WriteVertices(); @@ -106,8 +139,9 @@ class ArrowFragmentWriter { private: boost::leaf::result writeEdgeImpl( - const GraphArchive::EdgeInfo& edge_info, label_id_t src_label_id, - label_id_t edge_label_id, label_id_t dst_label_id, + const std::shared_ptr& edge_info, + label_id_t src_label_id, label_id_t edge_label_id, + label_id_t dst_label_id, const std::vector& main_start_chunk_indices, const std::vector& another_start_chunk_indices, const vertex_range_t& vertices, GraphArchive::AdjListType adj_list_type); @@ -122,6 +156,8 @@ class ArrowFragmentWriter { grape::CommSpec comm_spec_; std::shared_ptr graph_info_; std::map label_id_to_vnum_; + + bool store_in_local_; }; } // namespace vineyard diff --git a/modules/graph/writer/arrow_fragment_writer_impl.h b/modules/graph/writer/arrow_fragment_writer_impl.h index 5ea412af9..6ca75c5b6 100644 --- a/modules/graph/writer/arrow_fragment_writer_impl.h +++ b/modules/graph/writer/arrow_fragment_writer_impl.h @@ -29,9 +29,14 @@ #include #include +#include "arrow/filesystem/api.h" #include "boost/leaf/error.hpp" #include "boost/leaf/result.hpp" #include "gar/graph_info.h" +#include "gar/util/adj_list_type.h" +#include "gar/util/data_type.h" +#include "gar/util/file_type.h" +#include "gar/util/general_params.h" #include "gar/writer/arrow_chunk_writer.h" #include "grape/worker/comm_spec.h" @@ -42,37 +47,93 @@ #include "graph/utils/partitioner.h" #include "graph/utils/thread_group.h" #include "graph/writer/arrow_fragment_writer.h" +#include "graph/writer/util.h" #include "io/io/i_io_adaptor.h" #include "io/io/io_factory.h" +namespace GAR = GraphArchive; + namespace vineyard { template ArrowFragmentWriter::ArrowFragmentWriter( const std::shared_ptr& frag, const grape::CommSpec& comm_spec, const std::string& graph_info_yaml) - : frag_(frag), comm_spec_(comm_spec) { + : frag_(frag), comm_spec_(comm_spec), store_in_local_(false) { // Load graph info. - auto maybe_graph_info = GraphArchive::GraphInfo::Load(graph_info_yaml); + auto maybe_graph_info = GAR::GraphInfo::Load(graph_info_yaml); if (!maybe_graph_info.status().ok()) { LOG(ERROR) << "Failed to load graph info from " << graph_info_yaml; } - graph_info_ = std::make_shared( - std::move(maybe_graph_info.value())); + graph_info_ = maybe_graph_info.value(); +} + +template +ArrowFragmentWriter::ArrowFragmentWriter( + const std::shared_ptr& frag, const grape::CommSpec& comm_spec, + const std::string& graph_name, const std::string& out_path, + int64_t vertex_block_size, int64_t edge_block_size, + const std::string& file_type, bool store_in_local) + : frag_(frag), comm_spec_(comm_spec), store_in_local_(store_in_local) { + auto& schema = frag_->schema(); + graph_info_ = generate_graph_info_with_schema( + schema, graph_name, out_path, vertex_block_size, edge_block_size, + GAR::StringToFileType(file_type), store_in_local); +} + +template +boost::leaf::result ArrowFragmentWriter::WriteGraphInfo( + const std::string& output_path) { + LOG_IF(INFO, !comm_spec_.worker_id()) << MARKER << "WRITING-GRAPH-INFO-0"; + if (store_in_local_ || frag_->fid() == frag_->fnum() - 1) { + // store in local: all fragments write graph info + // otherwise, only the last fragment writes graph info + for (const auto& vertex_info : graph_info_->GetVertexInfos()) { + const auto& label = vertex_info->GetLabel(); + auto st = vertex_info->Save(output_path + label + ".vertex.yaml"); + if (!st.ok()) { + RETURN_GS_ERROR(ErrorCode::kGraphArError, st.message()); + } + } + for (const auto& edge_info : graph_info_->GetEdgeInfos()) { + const auto& src_label = edge_info->GetSrcLabel(); + const auto& edge_label = edge_info->GetEdgeLabel(); + const auto& dst_label = edge_info->GetDstLabel(); + auto st = edge_info->Save(output_path + src_label + "_" + edge_label + + "_" + dst_label + ".edge.yaml"); + if (!st.ok()) { + RETURN_GS_ERROR(ErrorCode::kGraphArError, st.message()); + } + } + const auto& graph_name = graph_info_->GetName(); + auto st = graph_info_->Save(output_path + graph_name + ".graph.yaml"); + if (!st.ok()) { + RETURN_GS_ERROR(ErrorCode::kGraphArError, st.message()); + } + } + LOG_IF(INFO, !comm_spec_.worker_id()) << MARKER << "WRITING-GRAPH-INFO-100"; + + return {}; } template boost::leaf::result ArrowFragmentWriter::WriteFragment() { + LOG_IF(INFO, !comm_spec_.worker_id()) << MARKER << "WRITING-VERTICES-0"; BOOST_LEAF_CHECK(WriteVertices()); + LOG_IF(INFO, !comm_spec_.worker_id()) << MARKER << "WRITING-VERTICES-100"; + LOG_IF(INFO, !comm_spec_.worker_id()) << MARKER << "WRITING-EDGES-0"; BOOST_LEAF_CHECK(WriteEdges()); + LOG_IF(INFO, !comm_spec_.worker_id()) << MARKER << "WRITING-EDGES-100"; MPI_Barrier(comm_spec_.comm()); return {}; } template boost::leaf::result ArrowFragmentWriter::WriteVertices() { - for (auto& item : graph_info_->GetVertexInfos()) { - std::string label = item.first; + for (const auto& vertex_info : graph_info_->GetVertexInfos()) { + const auto& label = vertex_info->GetLabel(); + LOG_IF(INFO, !comm_spec_.worker_id()) + << MARKER << "WRITING-VERTEX: " << label; BOOST_LEAF_CHECK(WriteVertex(label)); } return {}; @@ -81,12 +142,7 @@ boost::leaf::result ArrowFragmentWriter::WriteVertices() { template boost::leaf::result ArrowFragmentWriter::WriteVertex( const std::string& label) { - auto maybe_vertex_info = graph_info_->GetVertexInfo(label); - if (maybe_vertex_info.has_error()) { - RETURN_GS_ERROR(ErrorCode::kGraphArError, - maybe_vertex_info.status().message()); - } - auto& vertex_info = maybe_vertex_info.value(); + auto vertex_info = graph_info_->GetVertexInfo(label); auto& schema = frag_->schema(); auto label_id = schema.GetVertexLabelId(label); @@ -101,31 +157,82 @@ boost::leaf::result ArrowFragmentWriter::WriteVertex( for (fid_t fid = 0; fid < frag_->fid(); ++fid) { chunk_index_begin += static_cast( std::ceil(vm_ptr->GetInnerVertexSize(fid, label_id) / - static_cast(vertex_info.GetChunkSize()))); + static_cast(vertex_info->GetChunkSize()))); } - GraphArchive::VertexPropertyWriter writer(vertex_info, - graph_info_->GetPrefix()); + auto maybe_writer = GraphArchive::VertexPropertyWriter::Make( + vertex_info, graph_info_->GetPrefix()); + auto writer = maybe_writer.value(); // write vertex data start from chunk index begin auto vertex_table = frag_->vertex_data_table(label_id); auto num_rows = vertex_table->num_rows(); if (frag_->fid() != frag_->fnum() - 1 && - num_rows % vertex_info.GetChunkSize() != 0) { + num_rows % vertex_info->GetChunkSize() != 0) { // Append nulls if the number of rows is not a multiple of chunk size. vertex_table = AppendNullsToArrowTable( vertex_table, - vertex_info.GetChunkSize() - num_rows % vertex_info.GetChunkSize()); + vertex_info->GetChunkSize() - num_rows % vertex_info->GetChunkSize()); } - auto st = writer.WriteTable(vertex_table, chunk_index_begin); + auto st = writer->WriteTable(vertex_table, chunk_index_begin); if (!st.ok()) { RETURN_GS_ERROR(ErrorCode::kGraphArError, st.message()); } + if (store_in_local_) { + // write local store meta data + int64_t vertex_chunk_num = + std::ceil(vertex_table->num_rows() / + static_cast(vertex_info->GetChunkSize())); + const auto& extra_info = graph_info_->GetExtraInfo(); + std::string local_metadata_prefix = extra_info.at(LOCAL_METADATA_KEY); + std::string path = graph_info_->GetPrefix() + vertex_info->GetPrefix() + + local_metadata_prefix + std::to_string(frag_->fid()); + + std::shared_ptr fs; + auto fs_result = arrow::fs::FileSystemFromUriOrPath(path); + if (!fs_result.ok()) { + RETURN_GS_ERROR(ErrorCode::kArrowError, fs_result.status().message()); + } + fs = fs_result.ValueOrDie(); + std::shared_ptr output_stream; + auto output_stream_result = fs->OpenOutputStream(path); + if (!output_stream_result.ok()) { + RETURN_GS_ERROR(ErrorCode::kArrowError, + output_stream_result.status().message()); + } + output_stream = output_stream_result.ValueOrDie(); - if (frag_->fid() == frag_->fnum() - 1) { + // write vertex chunk index begin and vertex chunk number to local + auto st = output_stream->Write( + reinterpret_cast(&chunk_index_begin), + sizeof(chunk_index_begin)); + if (!st.ok()) { + RETURN_GS_ERROR(ErrorCode::kArrowError, st.message()); + } + st = output_stream->Write( + reinterpret_cast(&vertex_chunk_num), + sizeof(vertex_chunk_num)); + if (!st.ok()) { + RETURN_GS_ERROR(ErrorCode::kArrowError, st.message()); + } + + st = output_stream->Close(); + if (!st.ok()) { + RETURN_GS_ERROR(ErrorCode::kArrowError, st.message()); + } + } + + if (store_in_local_ || frag_->fid() == frag_->fnum() - 1) { + GAR::IdType last_chunk_index_begin = 0; + for (fid_t fid = 0; fid < frag_->fnum() - 1; ++fid) { + last_chunk_index_begin += static_cast( + std::ceil(vm_ptr->GetInnerVertexSize(fid, label_id) / + static_cast(vertex_info->GetChunkSize()))); + } // write vertex number - auto total_vertices_num = chunk_index_begin * vertex_info.GetChunkSize() + - vertex_table->num_rows(); + auto total_vertices_num = + last_chunk_index_begin * vertex_info->GetChunkSize() + + vm_ptr->GetInnerVertexSize(frag_->fnum() - 1, label_id); label_id_to_vnum_[label_id] = total_vertices_num; - auto st = writer.WriteVerticesNum(total_vertices_num); + auto st = writer->WriteVerticesNum(total_vertices_num); if (!st.ok()) { RETURN_GS_ERROR(ErrorCode::kGraphArError, st.message()); } @@ -136,10 +243,13 @@ boost::leaf::result ArrowFragmentWriter::WriteVertex( template boost::leaf::result ArrowFragmentWriter::WriteEdges() { - for (auto& item : graph_info_->GetEdgeInfos()) { - const auto src_label = item.second.GetSrcLabel(); - const auto edge_label = item.second.GetEdgeLabel(); - const auto dst_label = item.second.GetDstLabel(); + for (const auto& edge_info : graph_info_->GetEdgeInfos()) { + const auto& src_label = edge_info->GetSrcLabel(); + const auto& edge_label = edge_info->GetEdgeLabel(); + const auto& dst_label = edge_info->GetDstLabel(); + LOG_IF(INFO, !comm_spec_.worker_id()) + << MARKER << "WRITING-EDGE: " << src_label << "_" << edge_label << "_" + << dst_label; BOOST_LEAF_CHECK(WriteEdge(src_label, edge_label, dst_label)); } return {}; @@ -149,13 +259,7 @@ template boost::leaf::result ArrowFragmentWriter::WriteEdge( const std::string& src_label, const std::string& edge_label, const std::string& dst_label) { - auto maybe_edge_info = - graph_info_->GetEdgeInfo(src_label, edge_label, dst_label); - if (maybe_edge_info.has_error()) { - RETURN_GS_ERROR(ErrorCode::kGraphArError, - maybe_edge_info.status().message()); - } - auto& edge_info = maybe_edge_info.value(); + auto edge_info = graph_info_->GetEdgeInfo(src_label, edge_label, dst_label); // check if the edge information is valid in fragment bool is_valid_edge = true; @@ -193,21 +297,22 @@ boost::leaf::result ArrowFragmentWriter::WriteEdge( src_vertex_chunk_begin_indices[fid] + static_cast( std::ceil(vm_ptr->GetInnerVertexSize(fid, src_label_id) / - static_cast(edge_info.GetSrcChunkSize()))); + static_cast(edge_info->GetSrcChunkSize()))); dst_vertex_chunk_begin_indices[fid + 1] = dst_vertex_chunk_begin_indices[fid] + static_cast( std::ceil(vm_ptr->GetInnerVertexSize(fid, dst_label_id) / - static_cast(edge_info.GetDstChunkSize()))); + static_cast(edge_info->GetDstChunkSize()))); } - if (edge_info.ContainAdjList(GraphArchive::AdjListType::ordered_by_source)) { + if (edge_info->HasAdjacentListType( + GraphArchive::AdjListType::ordered_by_source)) { auto inner_vertices = frag_->InnerVertices(src_label_id); writeEdgeImpl(edge_info, src_label_id, edge_label_id, dst_label_id, src_vertex_chunk_begin_indices, dst_vertex_chunk_begin_indices, inner_vertices, GraphArchive::AdjListType::ordered_by_source); } - if (edge_info.ContainAdjList( + if (edge_info->HasAdjacentListType( GraphArchive::AdjListType::unordered_by_source)) { auto inner_vertices = frag_->InnerVertices(src_label_id); writeEdgeImpl(edge_info, src_label_id, edge_label_id, dst_label_id, @@ -215,14 +320,16 @@ boost::leaf::result ArrowFragmentWriter::WriteEdge( dst_vertex_chunk_begin_indices, inner_vertices, GraphArchive::AdjListType::unordered_by_source); } - if (edge_info.ContainAdjList(GraphArchive::AdjListType::ordered_by_dest)) { + if (edge_info->HasAdjacentListType( + GraphArchive::AdjListType::ordered_by_dest)) { auto inner_vertices = frag_->InnerVertices(dst_label_id); writeEdgeImpl(edge_info, dst_label_id, edge_label_id, src_label_id, dst_vertex_chunk_begin_indices, src_vertex_chunk_begin_indices, inner_vertices, GraphArchive::AdjListType::ordered_by_dest); } - if (edge_info.ContainAdjList(GraphArchive::AdjListType::unordered_by_dest)) { + if (edge_info->HasAdjacentListType( + GraphArchive::AdjListType::unordered_by_dest)) { auto inner_vertices = frag_->InnerVertices(dst_label_id); writeEdgeImpl(edge_info, dst_label_id, edge_label_id, src_label_id, dst_vertex_chunk_begin_indices, @@ -235,8 +342,9 @@ boost::leaf::result ArrowFragmentWriter::WriteEdge( template boost::leaf::result ArrowFragmentWriter::writeEdgeImpl( - const GraphArchive::EdgeInfo& edge_info, label_id_t main_label_id, - label_id_t edge_label_id, label_id_t another_label_id, + const std::shared_ptr& edge_info, + label_id_t main_label_id, label_id_t edge_label_id, + label_id_t another_label_id, const std::vector& main_start_chunk_indices, const std::vector& another_start_chunk_indices, const vertex_range_t& vertices, GraphArchive::AdjListType adj_list_type) { @@ -246,15 +354,15 @@ boost::leaf::result ArrowFragmentWriter::writeEdgeImpl( std::vector> fields; if (adj_list_type == GraphArchive::AdjListType::ordered_by_source || adj_list_type == GraphArchive::AdjListType::unordered_by_source) { - main_vertex_chunk_size = edge_info.GetSrcChunkSize(); - another_vertex_chunk_size = edge_info.GetDstChunkSize(); + main_vertex_chunk_size = edge_info->GetSrcChunkSize(); + another_vertex_chunk_size = edge_info->GetDstChunkSize(); fields = { arrow::field(GraphArchive::GeneralParams::kSrcIndexCol, arrow::int64()), arrow::field(GraphArchive::GeneralParams::kDstIndexCol, arrow::int64())}; } else { - main_vertex_chunk_size = edge_info.GetDstChunkSize(); - another_vertex_chunk_size = edge_info.GetSrcChunkSize(); + main_vertex_chunk_size = edge_info->GetDstChunkSize(); + another_vertex_chunk_size = edge_info->GetSrcChunkSize(); fields = { arrow::field(GraphArchive::GeneralParams::kDstIndexCol, arrow::int64()), arrow::field(GraphArchive::GeneralParams::kSrcIndexCol, @@ -263,16 +371,17 @@ boost::leaf::result ArrowFragmentWriter::writeEdgeImpl( auto main_start_chunk_index = main_start_chunk_indices[frag_->fid()]; auto another_start_chunk_index = another_start_chunk_indices[frag_->fid()]; - GraphArchive::EdgeChunkWriter writer(edge_info, graph_info_->GetPrefix(), - adj_list_type); + auto maybe_writer = GraphArchive::EdgeChunkWriter::Make( + edge_info, graph_info_->GetPrefix(), adj_list_type); + auto writer = maybe_writer.value(); size_t vertex_chunk_num = std::ceil(vertices.size() / static_cast(main_vertex_chunk_size)); // collect properties auto& graph_schema = frag_->schema(); std::set properties; - for (auto& pg : edge_info.GetPropertyGroups(adj_list_type).value()) { - for (auto& property : pg.GetProperties()) { + for (const auto& pg : edge_info->GetPropertyGroups()) { + for (const auto& property : pg->GetProperties()) { label_id_t property_label_id = graph_schema.GetEdgePropertyId(edge_label_id, property.name); if (property_label_id == -1) { @@ -357,7 +466,7 @@ boost::leaf::result ArrowFragmentWriter::writeEdgeImpl( // write the adj list chunks FinishArrowArrayBuilders(builders, column_arrays); auto table = arrow::Table::Make(table_schema, column_arrays); - auto s = writer.WriteTable(table, vertex_chunk_index); + auto s = writer->WriteTable(table, vertex_chunk_index); if (!s.ok()) { return Status::IOError( "GAR error: " + std::to_string(static_cast(s.code())) + ", " + @@ -367,9 +476,12 @@ boost::leaf::result ArrowFragmentWriter::writeEdgeImpl( // write the offset chunks if (adj_list_type == GraphArchive::AdjListType::ordered_by_source || adj_list_type == GraphArchive::AdjListType::ordered_by_dest) { - while (distance % main_vertex_chunk_size != 0) { - RETURN_ON_ARROW_ERROR(offset_builder.Append(edge_offset)); - ++distance; + if (frag_->fid() != frag_->fnum() - 1) { + // not the last fragment, align the offset chunk size + while (distance % main_vertex_chunk_size != 0) { + RETURN_ON_ARROW_ERROR(offset_builder.Append(edge_offset)); + ++distance; + } } RETURN_ON_ARROW_ERROR( offset_builder.Append(edge_offset)); // append the last offset @@ -378,7 +490,7 @@ boost::leaf::result ArrowFragmentWriter::writeEdgeImpl( arrow::schema({arrow::field(GraphArchive::GeneralParams::kOffsetCol, arrow::int64())}), offset_columns); - auto st = writer.WriteOffsetChunk(offset_table, vertex_chunk_index); + auto st = writer->WriteOffsetChunk(offset_table, vertex_chunk_index); if (!st.ok()) { return Status::IOError( "GAR error: " + std::to_string(static_cast(st.code())) + ", " + @@ -387,15 +499,15 @@ boost::leaf::result ArrowFragmentWriter::writeEdgeImpl( } // write edge num of vertex chunk - auto st = writer.WriteEdgesNum(vertex_chunk_index, edge_offset); + auto st = writer->WriteEdgesNum(vertex_chunk_index, edge_offset); if (!st.ok()) { return Status::IOError( "GAR error: " + std::to_string(static_cast(st.code())) + ", " + st.message()); } - if (frag_->fid() == frag_->fnum() - 1) { + if (store_in_local_ || frag_->fid() == frag_->fnum() - 1) { // write vertex number - auto st = writer.WriteVerticesNum(label_id_to_vnum_.at(main_label_id)); + auto st = writer->WriteVerticesNum(label_id_to_vnum_.at(main_label_id)); if (!st.ok()) { return Status::IOError( "GAR error: " + std::to_string(static_cast(st.code())) + ", " + diff --git a/modules/graph/writer/util.cc b/modules/graph/writer/util.cc new file mode 100644 index 000000000..9c3336453 --- /dev/null +++ b/modules/graph/writer/util.cc @@ -0,0 +1,73 @@ + +/** Copyright 2020-2023 Alibaba Group Holding Limited. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include +#include + +#include "gar/util/adj_list_type.h" +#include "gar/util/data_type.h" +#include "gar/util/file_type.h" + +#include "graph/writer/util.h" + +namespace GAR = GraphArchive; + +namespace vineyard { + +std::shared_ptr generate_graph_info_with_schema( + const PropertyGraphSchema& schema, const std::string& graph_name, + const std::string& path, int64_t vertex_block_size, int64_t edge_block_size, + GAR::FileType file_type, bool store_in_local) { + GraphArchive::VertexInfoVector vertex_infos; + GraphArchive::EdgeInfoVector edge_infos; + + for (const auto& entry : schema.vertex_entries()) { + std::vector properties; + for (const auto& prop : entry.props_) { + properties.emplace_back(GAR::Property( + prop.name, GAR::DataType::ArrowDataTypeToDataType(prop.type), false)); + } + auto pg = GAR::CreatePropertyGroup(properties, file_type); + auto vertex_info = + GAR::CreateVertexInfo(entry.label, vertex_block_size, {pg}); + vertex_infos.emplace_back(vertex_info); + } + GAR::AdjacentListVector default_adjacent_lists{ + GAR::CreateAdjacentList(GAR::AdjListType::ordered_by_source, file_type), + GAR::CreateAdjacentList(GAR::AdjListType::ordered_by_dest, file_type)}; + for (const auto& entry : schema.edge_entries()) { + std::vector properties; + for (const auto& prop : entry.props_) { + properties.emplace_back(GAR::Property( + prop.name, GAR::DataType::ArrowDataTypeToDataType(prop.type), false)); + } + auto pg = GAR::CreatePropertyGroup(properties, file_type); + for (const auto& relation : entry.relations) { + auto edge_info = GAR::CreateEdgeInfo( + relation.first, entry.label, relation.second, edge_block_size, + vertex_block_size, vertex_block_size, true /* directed */, + default_adjacent_lists, {pg}); + edge_infos.emplace_back(edge_info); + } + } + std::unordered_map extra_info; + if (store_in_local) { + extra_info[LOCAL_METADATA_KEY] = LOCAL_METADATA_VALUE; + } + return GAR::CreateGraphInfo(graph_name, vertex_infos, edge_infos, path, + nullptr, extra_info); +} +} // namespace vineyard diff --git a/modules/graph/writer/util.h b/modules/graph/writer/util.h new file mode 100644 index 000000000..371428410 --- /dev/null +++ b/modules/graph/writer/util.h @@ -0,0 +1,40 @@ +/** Copyright 2020-2023 Alibaba Group Holding Limited. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#ifndef MODULES_GRAPH_WRITER_UTIL_H_ +#define MODULES_GRAPH_WRITER_UTIL_H_ + +#include +#include + +#include "gar/graph_info.h" +#include "gar/util/file_type.h" +#include "graph/fragment/graph_schema.h" + +namespace GAR = GraphArchive; + +namespace vineyard { + +#define LOCAL_METADATA_KEY "local_meta_prefix" +#define LOCAL_METADATA_VALUE "__local_metadata__" + +std::shared_ptr generate_graph_info_with_schema( + const PropertyGraphSchema& schema, const std::string& graph_name, + const std::string& path, int64_t vertex_block_size, int64_t edge_block_size, + GAR::FileType file_type, bool store_in_local); + +} // namespace vineyard + +#endif // MODULES_GRAPH_WRITER_UTIL_H_ diff --git a/test/runner.py b/test/runner.py index 5148226b4..94c658263 100755 --- a/test/runner.py +++ b/test/runner.py @@ -527,17 +527,26 @@ def run_graph_tests(meta, allocator, endpoints, tests): run_test( tests, 'arrow_fragment_gar_test', - '$GAR_DATA_DIR/ldbc_sample/csv/ldbc_sample.graph.yml', + '$VINEYARD_DATA_DIR/p2p_v', + '$VINEYARD_DATA_DIR/p2p_e', + '$TMPDIR/', + 'csv', ) run_test( tests, 'arrow_fragment_gar_test', - '$GAR_DATA_DIR/ldbc_sample/orc/ldbc_sample.graph.yml', + '$VINEYARD_DATA_DIR/p2p_v', + '$VINEYARD_DATA_DIR/p2p_e', + '$TMPDIR/', + 'parquet', ) run_test( tests, 'arrow_fragment_gar_test', - '$GAR_DATA_DIR/ldbc_sample/parquet/ldbc_sample.graph.yml', + '$VINEYARD_DATA_DIR/p2p_v', + '$VINEYARD_DATA_DIR/p2p_e', + '$TMPDIR/', + 'orc', ) From 499d0c64fbac428cd736fcfb8f37b077c37ff268 Mon Sep 17 00:00:00 2001 From: acezen Date: Fri, 23 Feb 2024 19:59:06 +0800 Subject: [PATCH 2/2] Fix CodeFactor error Signed-off-by: acezen --- modules/graph/loader/gar_fragment_loader.h | 5 + .../graph/loader/gar_fragment_loader_impl.h | 146 ++++++++++-------- modules/graph/writer/arrow_fragment_writer.h | 5 + .../graph/writer/arrow_fragment_writer_impl.h | 85 +++++----- 4 files changed, 136 insertions(+), 105 deletions(-) diff --git a/modules/graph/loader/gar_fragment_loader.h b/modules/graph/loader/gar_fragment_loader.h index 9d41ff034..3235dbb3a 100644 --- a/modules/graph/loader/gar_fragment_loader.h +++ b/modules/graph/loader/gar_fragment_loader.h @@ -43,6 +43,7 @@ limitations under the License. namespace GraphArchive { class GraphInfo; +class VertexInfo; class EdgeInfo; class PropertyGroup; enum class AdjListType : std::uint8_t; @@ -142,6 +143,10 @@ class GARFragmentLoader { label_id_t label_id, const std::shared_ptr id_array_in, bool all_be_local_vertex, std::shared_ptr& out); + boost::leaf::result initializeVertexChunkBeginAndNum( + int vertex_label_index, + const std::shared_ptr& vertex_info); + private: Client& client_; grape::CommSpec comm_spec_; diff --git a/modules/graph/loader/gar_fragment_loader_impl.h b/modules/graph/loader/gar_fragment_loader_impl.h index e28f3ed97..adf1ae8de 100644 --- a/modules/graph/loader/gar_fragment_loader_impl.h +++ b/modules/graph/loader/gar_fragment_loader_impl.h @@ -231,73 +231,7 @@ GARFragmentLoader::distributeVertices() { const auto& label = vertex_info->GetLabel(); vertex_labels_.push_back(label); vertex_chunk_sizes_.push_back(vertex_info->GetChunkSize()); - if (store_in_local_) { - // distribute the vertex chunks base on the local metadata - const auto& extra_info = graph_info_->GetExtraInfo(); - if (extra_info.find(LOCAL_METADATA_KEY) == extra_info.end()) { - RETURN_GS_ERROR( - ErrorCode::kInvalidValueError, - "The local metadata key-value is not found in graph info"); - } - std::string local_metadata_prefix = extra_info.at(LOCAL_METADATA_KEY); - std::string path = graph_info_->GetPrefix() + vertex_info->GetPrefix() + - local_metadata_prefix + - std::to_string(comm_spec_.fid()); - - std::shared_ptr file; - std::shared_ptr fs; - - auto fs_result = arrow::fs::FileSystemFromUriOrPath(path); - if (!fs_result.ok()) { - RETURN_GS_ERROR(ErrorCode::kArrowError, fs_result.status().message()); - } - fs = fs_result.ValueOrDie(); - auto input_stream_result = fs->OpenInputStream(path); - if (!input_stream_result.status().ok()) { - RETURN_GS_ERROR(ErrorCode::kArrowError, - input_stream_result.status().message()); - } - auto input_stream = input_stream_result.ValueOrDie(); - // read the vertex chunk begin of vertex label i - auto read_result = - input_stream->Read(sizeof(int64_t), &vertex_chunk_begins_[i]); - if (!read_result.ok()) { - RETURN_GS_ERROR(ErrorCode::kArrowError, read_result.status().message()); - } - assert(read_result.ValueOrDie() == sizeof(int64_t)); - // read the vertex chunk num of vertex label i - read_result = input_stream->Read(sizeof(int64_t), &vertex_chunk_nums_[i]); - if (!read_result.ok()) { - RETURN_GS_ERROR(ErrorCode::kArrowError, read_result.status().message()); - } - assert(read_result.ValueOrDie() == sizeof(int64_t)); - } else { - // distribute the vertex chunks for fragments - auto chunk_num_result = GraphArchive::util::GetVertexChunkNum( - graph_info_->GetPrefix(), vertex_info); - RETURN_GS_ERROR_IF_NOT_OK(chunk_num_result.status()); - auto chunk_num = chunk_num_result.value(); - - if (chunk_num <= static_cast(comm_spec_.fnum())) { - if (chunk_num < comm_spec_.fid() + 1) { - // no vertex chunk can be assigned to this fragment - vertex_chunk_begins_[i] = 0; - vertex_chunk_nums_[i] = 0; - } else { - vertex_chunk_begins_[i] = static_cast(comm_spec_.fid()); - vertex_chunk_nums_[i] = 1; - } - } else { - int64_t bsize = chunk_num / static_cast(comm_spec_.fnum()); - vertex_chunk_begins_[i] = - static_cast(comm_spec_.fid()) * bsize; - if (comm_spec_.fid() == comm_spec_.fnum() - 1) { - vertex_chunk_nums_[i] = chunk_num - vertex_chunk_begins_[i]; - } else { - vertex_chunk_nums_[i] = bsize; - } - } - } + BOOST_LEAF_CHECK(initializeVertexChunkBeginAndNum(i, vertex_info)); } vertex_label_num_ = vertex_labels_.size(); for (size_t i = 0; i < vertex_labels_.size(); ++i) { @@ -324,6 +258,84 @@ GARFragmentLoader::distributeVertices() { return {}; } +template class VERTEX_MAP_T> +boost::leaf::result +GARFragmentLoader::initializeVertexChunkBeginAndNum( + int vertex_label_index, + const std::shared_ptr& vertex_info) { + if (store_in_local_) { + // distribute the vertex chunks base on the local metadata + const auto& extra_info = graph_info_->GetExtraInfo(); + if (extra_info.find(LOCAL_METADATA_KEY) == extra_info.end()) { + RETURN_GS_ERROR( + ErrorCode::kInvalidValueError, + "The local metadata key-value is not found in graph info"); + } + std::string local_metadata_prefix = extra_info.at(LOCAL_METADATA_KEY); + std::string path = graph_info_->GetPrefix() + vertex_info->GetPrefix() + + local_metadata_prefix + std::to_string(comm_spec_.fid()); + + std::shared_ptr file; + std::shared_ptr fs; + + auto fs_result = arrow::fs::FileSystemFromUriOrPath(path); + if (!fs_result.ok()) { + RETURN_GS_ERROR(ErrorCode::kArrowError, fs_result.status().message()); + } + fs = fs_result.ValueOrDie(); + auto input_stream_result = fs->OpenInputStream(path); + if (!input_stream_result.status().ok()) { + RETURN_GS_ERROR(ErrorCode::kArrowError, + input_stream_result.status().message()); + } + auto input_stream = input_stream_result.ValueOrDie(); + // read the vertex chunk begin of vertex label i + auto read_result = input_stream->Read( + sizeof(int64_t), &vertex_chunk_begins_[vertex_label_index]); + if (!read_result.ok()) { + RETURN_GS_ERROR(ErrorCode::kArrowError, read_result.status().message()); + } + assert(read_result.ValueOrDie() == sizeof(int64_t)); + // read the vertex chunk num of vertex label i + read_result = input_stream->Read(sizeof(int64_t), + &vertex_chunk_nums_[vertex_label_index]); + if (!read_result.ok()) { + RETURN_GS_ERROR(ErrorCode::kArrowError, read_result.status().message()); + } + assert(read_result.ValueOrDie() == sizeof(int64_t)); + } else { + // distribute the vertex chunks for fragments + auto chunk_num_result = GraphArchive::util::GetVertexChunkNum( + graph_info_->GetPrefix(), vertex_info); + RETURN_GS_ERROR_IF_NOT_OK(chunk_num_result.status()); + auto chunk_num = chunk_num_result.value(); + + if (chunk_num <= static_cast(comm_spec_.fnum())) { + if (chunk_num < comm_spec_.fid() + 1) { + // no vertex chunk can be assigned to this fragment + vertex_chunk_begins_[vertex_label_index] = 0; + vertex_chunk_nums_[vertex_label_index] = 0; + } else { + vertex_chunk_begins_[vertex_label_index] = + static_cast(comm_spec_.fid()); + vertex_chunk_nums_[vertex_label_index] = 1; + } + } else { + int64_t bsize = chunk_num / static_cast(comm_spec_.fnum()); + vertex_chunk_begins_[vertex_label_index] = + static_cast(comm_spec_.fid()) * bsize; + if (comm_spec_.fid() == comm_spec_.fnum() - 1) { + vertex_chunk_nums_[vertex_label_index] = + chunk_num - vertex_chunk_begins_[vertex_label_index]; + } else { + vertex_chunk_nums_[vertex_label_index] = bsize; + } + } + } + return {}; +} + template class VERTEX_MAP_T> boost::leaf::result diff --git a/modules/graph/writer/arrow_fragment_writer.h b/modules/graph/writer/arrow_fragment_writer.h index 85634eaaf..3a43f492c 100644 --- a/modules/graph/writer/arrow_fragment_writer.h +++ b/modules/graph/writer/arrow_fragment_writer.h @@ -46,6 +46,7 @@ limitations under the License. namespace GraphArchive { class GraphInfo; class EdgeInfo; +class VertexInfo; enum class AdjListType : std::uint8_t; } // namespace GraphArchive @@ -151,6 +152,10 @@ class ArrowFragmentWriter { const label_id_t edge_label, const PropertyGraphSchema& graph_schema, std::vector>& builders); + boost::leaf::result writeLocalVertexChunkBeginAndNum( + const std::shared_ptr& vertex_info, + int64_t vertex_chunk_begin, int64_t vertex_chunk_num); + private: std::shared_ptr> frag_; grape::CommSpec comm_spec_; diff --git a/modules/graph/writer/arrow_fragment_writer_impl.h b/modules/graph/writer/arrow_fragment_writer_impl.h index 6ca75c5b6..a54ff5e57 100644 --- a/modules/graph/writer/arrow_fragment_writer_impl.h +++ b/modules/graph/writer/arrow_fragment_writer_impl.h @@ -177,47 +177,11 @@ boost::leaf::result ArrowFragmentWriter::WriteVertex( RETURN_GS_ERROR(ErrorCode::kGraphArError, st.message()); } if (store_in_local_) { - // write local store meta data int64_t vertex_chunk_num = std::ceil(vertex_table->num_rows() / static_cast(vertex_info->GetChunkSize())); - const auto& extra_info = graph_info_->GetExtraInfo(); - std::string local_metadata_prefix = extra_info.at(LOCAL_METADATA_KEY); - std::string path = graph_info_->GetPrefix() + vertex_info->GetPrefix() + - local_metadata_prefix + std::to_string(frag_->fid()); - - std::shared_ptr fs; - auto fs_result = arrow::fs::FileSystemFromUriOrPath(path); - if (!fs_result.ok()) { - RETURN_GS_ERROR(ErrorCode::kArrowError, fs_result.status().message()); - } - fs = fs_result.ValueOrDie(); - std::shared_ptr output_stream; - auto output_stream_result = fs->OpenOutputStream(path); - if (!output_stream_result.ok()) { - RETURN_GS_ERROR(ErrorCode::kArrowError, - output_stream_result.status().message()); - } - output_stream = output_stream_result.ValueOrDie(); - - // write vertex chunk index begin and vertex chunk number to local - auto st = output_stream->Write( - reinterpret_cast(&chunk_index_begin), - sizeof(chunk_index_begin)); - if (!st.ok()) { - RETURN_GS_ERROR(ErrorCode::kArrowError, st.message()); - } - st = output_stream->Write( - reinterpret_cast(&vertex_chunk_num), - sizeof(vertex_chunk_num)); - if (!st.ok()) { - RETURN_GS_ERROR(ErrorCode::kArrowError, st.message()); - } - - st = output_stream->Close(); - if (!st.ok()) { - RETURN_GS_ERROR(ErrorCode::kArrowError, st.message()); - } + BOOST_LEAF_CHECK(writeLocalVertexChunkBeginAndNum( + vertex_info, chunk_index_begin, vertex_chunk_num)); } if (store_in_local_ || frag_->fid() == frag_->fnum() - 1) { @@ -602,6 +566,51 @@ ArrowFragmentWriter::appendPropertiesToArrowArrayBuilders( return {}; } +template +boost::leaf::result +ArrowFragmentWriter::writeLocalVertexChunkBeginAndNum( + const std::shared_ptr& vertex_info, + int64_t vertex_chunk_begin, int64_t vertex_chunk_num) { + // write local store meta data + const auto& extra_info = graph_info_->GetExtraInfo(); + std::string local_metadata_prefix = extra_info.at(LOCAL_METADATA_KEY); + std::string path = graph_info_->GetPrefix() + vertex_info->GetPrefix() + + local_metadata_prefix + std::to_string(frag_->fid()); + + std::shared_ptr fs; + auto fs_result = arrow::fs::FileSystemFromUriOrPath(path); + if (!fs_result.ok()) { + RETURN_GS_ERROR(ErrorCode::kArrowError, fs_result.status().message()); + } + fs = fs_result.ValueOrDie(); + std::shared_ptr output_stream; + auto output_stream_result = fs->OpenOutputStream(path); + if (!output_stream_result.ok()) { + RETURN_GS_ERROR(ErrorCode::kArrowError, + output_stream_result.status().message()); + } + output_stream = output_stream_result.ValueOrDie(); + + // write vertex chunk index begin and vertex chunk number to local + auto st = output_stream->Write( + reinterpret_cast(&vertex_chunk_begin), + sizeof(vertex_chunk_begin)); + if (!st.ok()) { + RETURN_GS_ERROR(ErrorCode::kArrowError, st.message()); + } + st = output_stream->Write(reinterpret_cast(&vertex_chunk_num), + sizeof(vertex_chunk_num)); + if (!st.ok()) { + RETURN_GS_ERROR(ErrorCode::kArrowError, st.message()); + } + + st = output_stream->Close(); + if (!st.ok()) { + RETURN_GS_ERROR(ErrorCode::kArrowError, st.message()); + } + return {}; +} + } // namespace vineyard #endif // ENABLE_GAR