diff --git a/modules/graph/loader/gar_fragment_loader.h b/modules/graph/loader/gar_fragment_loader.h index 9d41ff034..3235dbb3a 100644 --- a/modules/graph/loader/gar_fragment_loader.h +++ b/modules/graph/loader/gar_fragment_loader.h @@ -43,6 +43,7 @@ limitations under the License. namespace GraphArchive { class GraphInfo; +class VertexInfo; class EdgeInfo; class PropertyGroup; enum class AdjListType : std::uint8_t; @@ -142,6 +143,10 @@ class GARFragmentLoader { label_id_t label_id, const std::shared_ptr id_array_in, bool all_be_local_vertex, std::shared_ptr& out); + boost::leaf::result initializeVertexChunkBeginAndNum( + int vertex_label_index, + const std::shared_ptr& vertex_info); + private: Client& client_; grape::CommSpec comm_spec_; diff --git a/modules/graph/loader/gar_fragment_loader_impl.h b/modules/graph/loader/gar_fragment_loader_impl.h index e28f3ed97..adf1ae8de 100644 --- a/modules/graph/loader/gar_fragment_loader_impl.h +++ b/modules/graph/loader/gar_fragment_loader_impl.h @@ -231,73 +231,7 @@ GARFragmentLoader::distributeVertices() { const auto& label = vertex_info->GetLabel(); vertex_labels_.push_back(label); vertex_chunk_sizes_.push_back(vertex_info->GetChunkSize()); - if (store_in_local_) { - // distribute the vertex chunks base on the local metadata - const auto& extra_info = graph_info_->GetExtraInfo(); - if (extra_info.find(LOCAL_METADATA_KEY) == extra_info.end()) { - RETURN_GS_ERROR( - ErrorCode::kInvalidValueError, - "The local metadata key-value is not found in graph info"); - } - std::string local_metadata_prefix = extra_info.at(LOCAL_METADATA_KEY); - std::string path = graph_info_->GetPrefix() + vertex_info->GetPrefix() + - local_metadata_prefix + - std::to_string(comm_spec_.fid()); - - std::shared_ptr file; - std::shared_ptr fs; - - auto fs_result = arrow::fs::FileSystemFromUriOrPath(path); - if (!fs_result.ok()) { - RETURN_GS_ERROR(ErrorCode::kArrowError, fs_result.status().message()); - } - fs = fs_result.ValueOrDie(); - auto input_stream_result = fs->OpenInputStream(path); - if (!input_stream_result.status().ok()) { - RETURN_GS_ERROR(ErrorCode::kArrowError, - input_stream_result.status().message()); - } - auto input_stream = input_stream_result.ValueOrDie(); - // read the vertex chunk begin of vertex label i - auto read_result = - input_stream->Read(sizeof(int64_t), &vertex_chunk_begins_[i]); - if (!read_result.ok()) { - RETURN_GS_ERROR(ErrorCode::kArrowError, read_result.status().message()); - } - assert(read_result.ValueOrDie() == sizeof(int64_t)); - // read the vertex chunk num of vertex label i - read_result = input_stream->Read(sizeof(int64_t), &vertex_chunk_nums_[i]); - if (!read_result.ok()) { - RETURN_GS_ERROR(ErrorCode::kArrowError, read_result.status().message()); - } - assert(read_result.ValueOrDie() == sizeof(int64_t)); - } else { - // distribute the vertex chunks for fragments - auto chunk_num_result = GraphArchive::util::GetVertexChunkNum( - graph_info_->GetPrefix(), vertex_info); - RETURN_GS_ERROR_IF_NOT_OK(chunk_num_result.status()); - auto chunk_num = chunk_num_result.value(); - - if (chunk_num <= static_cast(comm_spec_.fnum())) { - if (chunk_num < comm_spec_.fid() + 1) { - // no vertex chunk can be assigned to this fragment - vertex_chunk_begins_[i] = 0; - vertex_chunk_nums_[i] = 0; - } else { - vertex_chunk_begins_[i] = static_cast(comm_spec_.fid()); - vertex_chunk_nums_[i] = 1; - } - } else { - int64_t bsize = chunk_num / static_cast(comm_spec_.fnum()); - vertex_chunk_begins_[i] = - static_cast(comm_spec_.fid()) * bsize; - if (comm_spec_.fid() == comm_spec_.fnum() - 1) { - vertex_chunk_nums_[i] = chunk_num - vertex_chunk_begins_[i]; - } else { - vertex_chunk_nums_[i] = bsize; - } - } - } + BOOST_LEAF_CHECK(initializeVertexChunkBeginAndNum(i, vertex_info)); } vertex_label_num_ = vertex_labels_.size(); for (size_t i = 0; i < vertex_labels_.size(); ++i) { @@ -324,6 +258,84 @@ GARFragmentLoader::distributeVertices() { return {}; } +template class VERTEX_MAP_T> +boost::leaf::result +GARFragmentLoader::initializeVertexChunkBeginAndNum( + int vertex_label_index, + const std::shared_ptr& vertex_info) { + if (store_in_local_) { + // distribute the vertex chunks base on the local metadata + const auto& extra_info = graph_info_->GetExtraInfo(); + if (extra_info.find(LOCAL_METADATA_KEY) == extra_info.end()) { + RETURN_GS_ERROR( + ErrorCode::kInvalidValueError, + "The local metadata key-value is not found in graph info"); + } + std::string local_metadata_prefix = extra_info.at(LOCAL_METADATA_KEY); + std::string path = graph_info_->GetPrefix() + vertex_info->GetPrefix() + + local_metadata_prefix + std::to_string(comm_spec_.fid()); + + std::shared_ptr file; + std::shared_ptr fs; + + auto fs_result = arrow::fs::FileSystemFromUriOrPath(path); + if (!fs_result.ok()) { + RETURN_GS_ERROR(ErrorCode::kArrowError, fs_result.status().message()); + } + fs = fs_result.ValueOrDie(); + auto input_stream_result = fs->OpenInputStream(path); + if (!input_stream_result.status().ok()) { + RETURN_GS_ERROR(ErrorCode::kArrowError, + input_stream_result.status().message()); + } + auto input_stream = input_stream_result.ValueOrDie(); + // read the vertex chunk begin of vertex label i + auto read_result = input_stream->Read( + sizeof(int64_t), &vertex_chunk_begins_[vertex_label_index]); + if (!read_result.ok()) { + RETURN_GS_ERROR(ErrorCode::kArrowError, read_result.status().message()); + } + assert(read_result.ValueOrDie() == sizeof(int64_t)); + // read the vertex chunk num of vertex label i + read_result = input_stream->Read(sizeof(int64_t), + &vertex_chunk_nums_[vertex_label_index]); + if (!read_result.ok()) { + RETURN_GS_ERROR(ErrorCode::kArrowError, read_result.status().message()); + } + assert(read_result.ValueOrDie() == sizeof(int64_t)); + } else { + // distribute the vertex chunks for fragments + auto chunk_num_result = GraphArchive::util::GetVertexChunkNum( + graph_info_->GetPrefix(), vertex_info); + RETURN_GS_ERROR_IF_NOT_OK(chunk_num_result.status()); + auto chunk_num = chunk_num_result.value(); + + if (chunk_num <= static_cast(comm_spec_.fnum())) { + if (chunk_num < comm_spec_.fid() + 1) { + // no vertex chunk can be assigned to this fragment + vertex_chunk_begins_[vertex_label_index] = 0; + vertex_chunk_nums_[vertex_label_index] = 0; + } else { + vertex_chunk_begins_[vertex_label_index] = + static_cast(comm_spec_.fid()); + vertex_chunk_nums_[vertex_label_index] = 1; + } + } else { + int64_t bsize = chunk_num / static_cast(comm_spec_.fnum()); + vertex_chunk_begins_[vertex_label_index] = + static_cast(comm_spec_.fid()) * bsize; + if (comm_spec_.fid() == comm_spec_.fnum() - 1) { + vertex_chunk_nums_[vertex_label_index] = + chunk_num - vertex_chunk_begins_[vertex_label_index]; + } else { + vertex_chunk_nums_[vertex_label_index] = bsize; + } + } + } + return {}; +} + template class VERTEX_MAP_T> boost::leaf::result diff --git a/modules/graph/writer/arrow_fragment_writer.h b/modules/graph/writer/arrow_fragment_writer.h index 85634eaaf..3a43f492c 100644 --- a/modules/graph/writer/arrow_fragment_writer.h +++ b/modules/graph/writer/arrow_fragment_writer.h @@ -46,6 +46,7 @@ limitations under the License. namespace GraphArchive { class GraphInfo; class EdgeInfo; +class VertexInfo; enum class AdjListType : std::uint8_t; } // namespace GraphArchive @@ -151,6 +152,10 @@ class ArrowFragmentWriter { const label_id_t edge_label, const PropertyGraphSchema& graph_schema, std::vector>& builders); + boost::leaf::result writeLocalVertexChunkBeginAndNum( + const std::shared_ptr& vertex_info, + int64_t vertex_chunk_begin, int64_t vertex_chunk_num); + private: std::shared_ptr> frag_; grape::CommSpec comm_spec_; diff --git a/modules/graph/writer/arrow_fragment_writer_impl.h b/modules/graph/writer/arrow_fragment_writer_impl.h index 6ca75c5b6..a54ff5e57 100644 --- a/modules/graph/writer/arrow_fragment_writer_impl.h +++ b/modules/graph/writer/arrow_fragment_writer_impl.h @@ -177,47 +177,11 @@ boost::leaf::result ArrowFragmentWriter::WriteVertex( RETURN_GS_ERROR(ErrorCode::kGraphArError, st.message()); } if (store_in_local_) { - // write local store meta data int64_t vertex_chunk_num = std::ceil(vertex_table->num_rows() / static_cast(vertex_info->GetChunkSize())); - const auto& extra_info = graph_info_->GetExtraInfo(); - std::string local_metadata_prefix = extra_info.at(LOCAL_METADATA_KEY); - std::string path = graph_info_->GetPrefix() + vertex_info->GetPrefix() + - local_metadata_prefix + std::to_string(frag_->fid()); - - std::shared_ptr fs; - auto fs_result = arrow::fs::FileSystemFromUriOrPath(path); - if (!fs_result.ok()) { - RETURN_GS_ERROR(ErrorCode::kArrowError, fs_result.status().message()); - } - fs = fs_result.ValueOrDie(); - std::shared_ptr output_stream; - auto output_stream_result = fs->OpenOutputStream(path); - if (!output_stream_result.ok()) { - RETURN_GS_ERROR(ErrorCode::kArrowError, - output_stream_result.status().message()); - } - output_stream = output_stream_result.ValueOrDie(); - - // write vertex chunk index begin and vertex chunk number to local - auto st = output_stream->Write( - reinterpret_cast(&chunk_index_begin), - sizeof(chunk_index_begin)); - if (!st.ok()) { - RETURN_GS_ERROR(ErrorCode::kArrowError, st.message()); - } - st = output_stream->Write( - reinterpret_cast(&vertex_chunk_num), - sizeof(vertex_chunk_num)); - if (!st.ok()) { - RETURN_GS_ERROR(ErrorCode::kArrowError, st.message()); - } - - st = output_stream->Close(); - if (!st.ok()) { - RETURN_GS_ERROR(ErrorCode::kArrowError, st.message()); - } + BOOST_LEAF_CHECK(writeLocalVertexChunkBeginAndNum( + vertex_info, chunk_index_begin, vertex_chunk_num)); } if (store_in_local_ || frag_->fid() == frag_->fnum() - 1) { @@ -602,6 +566,51 @@ ArrowFragmentWriter::appendPropertiesToArrowArrayBuilders( return {}; } +template +boost::leaf::result +ArrowFragmentWriter::writeLocalVertexChunkBeginAndNum( + const std::shared_ptr& vertex_info, + int64_t vertex_chunk_begin, int64_t vertex_chunk_num) { + // write local store meta data + const auto& extra_info = graph_info_->GetExtraInfo(); + std::string local_metadata_prefix = extra_info.at(LOCAL_METADATA_KEY); + std::string path = graph_info_->GetPrefix() + vertex_info->GetPrefix() + + local_metadata_prefix + std::to_string(frag_->fid()); + + std::shared_ptr fs; + auto fs_result = arrow::fs::FileSystemFromUriOrPath(path); + if (!fs_result.ok()) { + RETURN_GS_ERROR(ErrorCode::kArrowError, fs_result.status().message()); + } + fs = fs_result.ValueOrDie(); + std::shared_ptr output_stream; + auto output_stream_result = fs->OpenOutputStream(path); + if (!output_stream_result.ok()) { + RETURN_GS_ERROR(ErrorCode::kArrowError, + output_stream_result.status().message()); + } + output_stream = output_stream_result.ValueOrDie(); + + // write vertex chunk index begin and vertex chunk number to local + auto st = output_stream->Write( + reinterpret_cast(&vertex_chunk_begin), + sizeof(vertex_chunk_begin)); + if (!st.ok()) { + RETURN_GS_ERROR(ErrorCode::kArrowError, st.message()); + } + st = output_stream->Write(reinterpret_cast(&vertex_chunk_num), + sizeof(vertex_chunk_num)); + if (!st.ok()) { + RETURN_GS_ERROR(ErrorCode::kArrowError, st.message()); + } + + st = output_stream->Close(); + if (!st.ok()) { + RETURN_GS_ERROR(ErrorCode::kArrowError, st.message()); + } + return {}; +} + } // namespace vineyard #endif // ENABLE_GAR