Skip to content

Commit

Permalink
[GraphAr] Align the vertices of fragment to vertex chunk size & Fix t…
Browse files Browse the repository at this point in the history
…he shuffle array multi-thread bug & Bump up GraphAr (#1604)

- Align the vertices of fragment to vertex chunk size by append nulls, so the multi-fragment writer out to gar format also correct
- Fix the gar loader shuffle oid array bug: remove the multi-thread implementation since that shuffle already use thread to send/recv, that
would make message conflict
- Bump up GraphAr to support build with local installed arrow.

Fixes #issue number
  • Loading branch information
acezen authored Oct 25, 2023
1 parent c70ff7b commit a384f8a
Show file tree
Hide file tree
Showing 10 changed files with 180 additions and 43 deletions.
7 changes: 6 additions & 1 deletion modules/graph/fragment/gar_fragment_builder.h
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,12 @@ class GARFragmentBuilder
std::shared_ptr<vertex_map_t> vm_ptr)
: ArrowFragmentBaseBuilder<oid_t, vid_t, vertex_map_t>(client),
client_(client),
vm_ptr_(vm_ptr) {}
vm_ptr_(vm_ptr) {
Base::set_compact_edges_(false);
VINEYARD_ASSERT(
!Base::compact_edges_,
"Compacting edges is not supported when loading from GraphAr.");
}

vineyard::Status Build(vineyard::Client& client) override;

Expand Down
25 changes: 16 additions & 9 deletions modules/graph/fragment/gar_fragment_builder_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -136,24 +136,31 @@ boost::leaf::result<void> generate_csr(

int64_t num_chunks = src_chunks.size();
int64_t edge_num = offset_array->Value(offset_array->length() - 1);

for (int v_label = 0; v_label != vertex_label_num; ++v_label) {
auto tvnum = tvnums[v_label];
// build the arrow's offset array
std::shared_ptr<arrow::Buffer> offsets_buffer;
ARROW_OK_ASSIGN_OR_RAISE(
offsets_buffer, arrow::AllocateBuffer((tvnum + 1) * sizeof(int64_t)));
if (v_label == vertex_label) {
edge_offsets[v_label] = offset_array;
memcpy(offsets_buffer->mutable_data(),
reinterpret_cast<const uint8_t*>(offset_array->raw_values()),
offset_array->length() * sizeof(int64_t));
// we do not store the edge offset of outer vertices, so fill edge_num
// to the outer vertices offset
std::fill_n(
reinterpret_cast<int64_t*>(offsets_buffer->mutable_data() +
offset_array->length() * sizeof(int64_t)),
(tvnum + 1) - offset_array->length(), edge_num);
edges[v_label] =
std::make_shared<PodArrayBuilder<nbr_unit_t>>(client, edge_num);
} else {
auto tvnum = tvnums[v_label];
// build the arrow's offset array
std::shared_ptr<arrow::Buffer> offsets_buffer;
ARROW_OK_ASSIGN_OR_RAISE(
offsets_buffer, arrow::AllocateBuffer((tvnum + 1) * sizeof(int64_t)));
std::fill_n(reinterpret_cast<int64_t*>(offsets_buffer->mutable_data()),
tvnum + 1, 0);
edge_offsets[v_label] = std::make_shared<arrow::Int64Array>(
arrow::int64(), tvnum + 1, offsets_buffer, nullptr, 0, 0);
edges[v_label] = std::make_shared<PodArrayBuilder<nbr_unit_t>>(client, 0);
}
edge_offsets[v_label] = std::make_shared<arrow::Int64Array>(
arrow::int64(), tvnum + 1, offsets_buffer, nullptr, 0, 0);
}

std::vector<int64_t> chunk_offsets(num_chunks + 1, 0);
Expand Down
37 changes: 37 additions & 0 deletions modules/graph/loader/gar_fragment_loader.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
/** Copyright 2020-2023 Alibaba Group Holding Limited.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

#ifdef ENABLE_GAR

#include "graph/loader/gar_fragment_loader.h"

#include "arrow/api.h"
#include "gar/graph_info.h"

namespace vineyard {

std::shared_ptr<arrow::Schema> ConstructSchemaFromPropertyGroup(
const GraphArchive::PropertyGroup& property_group) {
std::vector<std::shared_ptr<arrow::Field>> fields;
for (const auto& prop : property_group.GetProperties()) {
fields.emplace_back(arrow::field(
prop.name, GraphArchive::DataType::DataTypeToArrowDataType(prop.type)));
}
return arrow::schema(fields);
}

} // namespace vineyard

#endif // ENABLE_GAR
4 changes: 4 additions & 0 deletions modules/graph/loader/gar_fragment_loader.h
Original file line number Diff line number Diff line change
Expand Up @@ -44,11 +44,15 @@ limitations under the License.
namespace GraphArchive {
class GraphInfo;
class EdgeInfo;
class PropertyGroup;
enum class AdjListType : std::uint8_t;
} // namespace GraphArchive

namespace vineyard {

std::shared_ptr<arrow::Schema> ConstructSchemaFromPropertyGroup(
const GraphArchive::PropertyGroup& property_group);

template <typename OID_T = property_graph_types::OID_TYPE,
typename VID_T = property_graph_types::VID_TYPE,
template <typename OID_T_ = typename InternalType<OID_T>::type,
Expand Down
72 changes: 45 additions & 27 deletions modules/graph/loader/gar_fragment_loader_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -202,17 +202,29 @@ GARFragmentLoader<OID_T, VID_T, VERTEX_MAP_T>::distributeVertices() {
}
vertex_labels_.push_back(label);
vertex_chunk_sizes_.push_back(vertex_info.GetChunkSize());
auto chunk_num = GraphArchive::utils::GetVertexChunkNum(
auto chunk_num_result = GraphArchive::utils::GetVertexChunkNum(
graph_info_->GetPrefix(), vertex_info);
RETURN_GS_ERROR_IF_NOT_OK(chunk_num.status());
RETURN_GS_ERROR_IF_NOT_OK(chunk_num_result.status());
// distribute the vertex chunks for fragments
int64_t bsize = chunk_num.value() / static_cast<int64_t>(comm_spec_.fnum());
vertex_chunk_begin_of_frag_[label].resize(comm_spec_.fnum() + 1, 0);
for (fid_t fid = 0; fid < comm_spec_.fnum(); ++fid) {
vertex_chunk_begin_of_frag_[label][fid] =
static_cast<gar_id_t>(fid) * bsize;
auto chunk_num = chunk_num_result.value();

if (chunk_num < static_cast<int64_t>(comm_spec_.fnum())) {
int64_t index = 0;
for (; index < chunk_num; ++index) {
vertex_chunk_begin_of_frag_[label][index] = index;
}
for (; index <= static_cast<int64_t>(comm_spec_.fnum()); ++index) {
vertex_chunk_begin_of_frag_[label][index] = chunk_num;
}
} else {
int64_t bsize = chunk_num / static_cast<int64_t>(comm_spec_.fnum());
vertex_chunk_begin_of_frag_[label].resize(comm_spec_.fnum() + 1, 0);
for (fid_t fid = 0; fid < comm_spec_.fnum(); ++fid) {
vertex_chunk_begin_of_frag_[label][fid] =
static_cast<gar_id_t>(fid) * bsize;
}
vertex_chunk_begin_of_frag_[label][comm_spec_.fnum()] = chunk_num;
}
vertex_chunk_begin_of_frag_[label][comm_spec_.fnum()] = chunk_num.value();
}
vertex_label_num_ = vertex_labels_.size();
for (size_t i = 0; i < vertex_labels_.size(); ++i) {
Expand Down Expand Up @@ -246,8 +258,8 @@ boost::leaf::result<void>
GARFragmentLoader<OID_T, VID_T, VERTEX_MAP_T>::constructVertexMap() {
std::vector<std::vector<std::shared_ptr<arrow::ChunkedArray>>> oid_lists(
vertex_label_num_);
ThreadGroup tg(comm_spec_);
auto shuffle_procedure = [&](const label_id_t label_id) -> Status {
auto shuffle_procedure =
[&](const label_id_t label_id) -> boost::leaf::result<std::nullptr_t> {
std::vector<std::shared_ptr<arrow::ChunkedArray>> shuffled_oid_array;
auto& vertex_info =
graph_info_->GetVertexInfo(vertex_labels_[label_id]).value();
Expand All @@ -267,33 +279,26 @@ GARFragmentLoader<OID_T, VID_T, VERTEX_MAP_T>::constructVertexMap() {
if (primary_key.empty()) {
std::string msg = "primary key is not found in " +
vertex_labels_[label_id] + " property groups";
return Status::Invalid(msg);
RETURN_GS_ERROR(ErrorCode::kInvalidValueError, msg);
}
auto local_oid_array =
vertex_tables_[label_id]->GetColumnByName(primary_key);
if (local_oid_array == nullptr) {
std::string msg = "primary key column " + primary_key +
" is not found in " + vertex_labels_[label_id] +
" table";
return Status::Invalid(msg);
RETURN_GS_ERROR(ErrorCode::kInvalidValueError, msg);
}
RETURN_ON_ERROR(FragmentAllGatherArray(comm_spec_, local_oid_array,
shuffled_oid_array));
VY_OK_OR_RAISE(FragmentAllGatherArray(comm_spec_, local_oid_array,
shuffled_oid_array));
for (auto const& array : shuffled_oid_array) {
oid_lists[label_id].emplace_back(
std::dynamic_pointer_cast<arrow::ChunkedArray>(array));
}
return Status::OK();
return nullptr;
};
for (label_id_t label_id = 0; label_id < vertex_label_num_; ++label_id) {
tg.AddTask(shuffle_procedure, label_id);
}
{
Status status;
for (auto const& s : tg.TakeResults()) {
status += s;
}
VY_OK_OR_RAISE(status);
BOOST_LEAF_CHECK(sync_gs_error(comm_spec_, shuffle_procedure, label_id));
}

BasicArrowVertexMapBuilder<internal_oid_t, vid_t> vm_builder(
Expand Down Expand Up @@ -362,11 +367,24 @@ GARFragmentLoader<OID_T, VID_T, VERTEX_MAP_T>::loadVertexTableOfLabel(
for (auto& t : threads) {
t.join();
}
auto pg_table = arrow::ConcatenateTables(vertex_chunk_tables);
if (!pg_table.status().ok()) {
RETURN_GS_ERROR(ErrorCode::kArrowError, pg_table.status().message());
std::shared_ptr<arrow::Table> pg_table;
if (vertex_chunk_num_of_fragment > 0) {
auto pg_table_ret = arrow::ConcatenateTables(vertex_chunk_tables);
if (!pg_table_ret.status().ok()) {
RETURN_GS_ERROR(ErrorCode::kArrowError,
pg_table_ret.status().message());
}
pg_table = pg_table_ret.ValueOrDie();
} else {
auto schema = ConstructSchemaFromPropertyGroup(pg);
auto pg_table_ret = arrow::Table::MakeEmpty(schema);
if (!pg_table_ret.status().ok()) {
RETURN_GS_ERROR(ErrorCode::kArrowError,
pg_table_ret.status().message());
}
pg_table = pg_table_ret.ValueOrDie();
}
pg_tables.push_back(std::move(pg_table).ValueOrDie());
pg_tables.push_back(std::move(pg_table));
}
std::shared_ptr<arrow::Table> concat_table;
VY_OK_OR_RAISE(ConcatenateTablesColumnWise(pg_tables, concat_table));
Expand Down
2 changes: 1 addition & 1 deletion modules/graph/thirdparty/GraphAr
25 changes: 25 additions & 0 deletions modules/graph/writer/arrow_fragment_writer.cc
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,31 @@ void InitializeArrayArrayBuilders(
}
}

std::shared_ptr<arrow::Table> AppendNullsToArrowTable(
const std::shared_ptr<arrow::Table>& table, size_t num_rows_to_append) {
std::vector<std::shared_ptr<arrow::Array>> columns;
for (int i = 0; i < table->num_columns(); ++i) {
auto type = table->field(i)->type();
std::unique_ptr<arrow::ArrayBuilder> builder;
auto st = arrow::MakeBuilder(arrow::default_memory_pool(), type, &builder);
if (!st.ok()) {
LOG(FATAL) << "Failed to create array builder: " << st.message();
}
st = builder->AppendNulls(num_rows_to_append);
if (!st.ok()) {
LOG(FATAL) << "Failed to append null to arrow table: " << st.message();
}
std::shared_ptr<arrow::Array> nulls;
st = builder->Finish(&nulls);
if (!st.ok()) {
LOG(FATAL) << "Failed to finish null builder: " << st.message();
}
columns.push_back(nulls);
}
auto null_table = arrow::Table::Make(table->schema(), columns);
return arrow::ConcatenateTables({table, null_table}).ValueOrDie();
}

} // namespace vineyard

#endif // ENABLE_GAR
4 changes: 4 additions & 0 deletions modules/graph/writer/arrow_fragment_writer.h
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,9 @@ void InitializeArrayArrayBuilders(
const property_graph_types::LABEL_ID_TYPE edge_label,
const PropertyGraphSchema& graph_schema);

std::shared_ptr<arrow::Table> AppendNullsToArrowTable(
const std::shared_ptr<arrow::Table>& table, size_t num_rows_to_append);

template <typename FRAG_T>
class ArrowFragmentWriter {
using oid_t = typename FRAG_T::oid_t;
Expand Down Expand Up @@ -118,6 +121,7 @@ class ArrowFragmentWriter {
std::shared_ptr<ArrowFragment<oid_t, vid_t>> frag_;
grape::CommSpec comm_spec_;
std::shared_ptr<GraphArchive::GraphInfo> graph_info_;
std::map<label_id_t, int64_t> label_id_to_vnum_;
};

} // namespace vineyard
Expand Down
21 changes: 16 additions & 5 deletions modules/graph/writer/arrow_fragment_writer_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@ template <typename FRAG_T>
boost::leaf::result<void> ArrowFragmentWriter<FRAG_T>::WriteFragment() {
BOOST_LEAF_CHECK(WriteVertices());
BOOST_LEAF_CHECK(WriteEdges());
MPI_Barrier(comm_spec_.comm());
return {};
}

Expand Down Expand Up @@ -106,14 +107,25 @@ boost::leaf::result<void> ArrowFragmentWriter<FRAG_T>::WriteVertex(
graph_info_->GetPrefix());
// write vertex data start from chunk index begin
auto vertex_table = frag_->vertex_data_table(label_id);
auto num_rows = vertex_table->num_rows();
if (frag_->fid() != frag_->fnum() - 1 &&
num_rows % vertex_info.GetChunkSize() != 0) {
// Append nulls if the number of rows is not a multiple of chunk size.
vertex_table = AppendNullsToArrowTable(
vertex_table,
vertex_info.GetChunkSize() - num_rows % vertex_info.GetChunkSize());
}
auto st = writer.WriteTable(vertex_table, chunk_index_begin);
if (!st.ok()) {
RETURN_GS_ERROR(ErrorCode::kGraphArError, st.message());
}

if (comm_spec_.worker_id() == 0) {
if (frag_->fid() == frag_->fnum() - 1) {
// write vertex number
auto st = writer.WriteVerticesNum(vm_ptr->GetTotalNodesNum(label_id));
auto total_vertices_num = chunk_index_begin * vertex_info.GetChunkSize() +
vertex_table->num_rows();
label_id_to_vnum_[label_id] = total_vertices_num;
auto st = writer.WriteVerticesNum(total_vertices_num);
if (!st.ok()) {
RETURN_GS_ERROR(ErrorCode::kGraphArError, st.message());
}
Expand Down Expand Up @@ -381,10 +393,9 @@ boost::leaf::result<void> ArrowFragmentWriter<FRAG_T>::writeEdgeImpl(
"GAR error: " + std::to_string(static_cast<int>(st.code())) + ", " +
st.message());
}
if (comm_spec_.worker_id() == 0) {
if (frag_->fid() == frag_->fnum() - 1) {
// write vertex number
auto st = writer.WriteVerticesNum(
frag_->GetVertexMap()->GetTotalNodesNum(main_label_id));
auto st = writer.WriteVerticesNum(label_id_to_vnum_.at(main_label_id));
if (!st.ok()) {
return Status::IOError(
"GAR error: " + std::to_string(static_cast<int>(st.code())) + ", " +
Expand Down
26 changes: 26 additions & 0 deletions modules/graph/writer/arrow_fragment_writer_string.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
/** Copyright 2020-2023 Alibaba Group Holding Limited.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

#ifdef ENABLE_GAR

#include "graph/writer/arrow_fragment_writer_impl.h"

namespace vineyard {

template class ArrowFragmentWriter<ArrowFragment<std::string, uint64_t>>;

} // namespace vineyard

#endif // ENABLE_GA

0 comments on commit a384f8a

Please sign in to comment.