Skip to content

Commit

Permalink
Fix CodeFactor error
Browse files Browse the repository at this point in the history
Signed-off-by: acezen <[email protected]>
  • Loading branch information
acezen committed Feb 23, 2024
1 parent 65e0515 commit 499d0c6
Show file tree
Hide file tree
Showing 4 changed files with 136 additions and 105 deletions.
5 changes: 5 additions & 0 deletions modules/graph/loader/gar_fragment_loader.h
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ limitations under the License.

namespace GraphArchive {
class GraphInfo;
class VertexInfo;
class EdgeInfo;
class PropertyGroup;
enum class AdjListType : std::uint8_t;
Expand Down Expand Up @@ -142,6 +143,10 @@ class GARFragmentLoader {
label_id_t label_id, const std::shared_ptr<arrow::Array> id_array_in,
bool all_be_local_vertex, std::shared_ptr<arrow::Array>& out);

boost::leaf::result<void> initializeVertexChunkBeginAndNum(
int vertex_label_index,
const std::shared_ptr<GraphArchive::VertexInfo>& vertex_info);

private:
Client& client_;
grape::CommSpec comm_spec_;
Expand Down
146 changes: 79 additions & 67 deletions modules/graph/loader/gar_fragment_loader_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -231,73 +231,7 @@ GARFragmentLoader<OID_T, VID_T, VERTEX_MAP_T>::distributeVertices() {
const auto& label = vertex_info->GetLabel();
vertex_labels_.push_back(label);
vertex_chunk_sizes_.push_back(vertex_info->GetChunkSize());
if (store_in_local_) {
// distribute the vertex chunks base on the local metadata
const auto& extra_info = graph_info_->GetExtraInfo();
if (extra_info.find(LOCAL_METADATA_KEY) == extra_info.end()) {
RETURN_GS_ERROR(
ErrorCode::kInvalidValueError,
"The local metadata key-value is not found in graph info");
}
std::string local_metadata_prefix = extra_info.at(LOCAL_METADATA_KEY);
std::string path = graph_info_->GetPrefix() + vertex_info->GetPrefix() +
local_metadata_prefix +
std::to_string(comm_spec_.fid());

std::shared_ptr<arrow::io::ReadableFile> file;
std::shared_ptr<arrow::fs::FileSystem> fs;

auto fs_result = arrow::fs::FileSystemFromUriOrPath(path);
if (!fs_result.ok()) {
RETURN_GS_ERROR(ErrorCode::kArrowError, fs_result.status().message());
}
fs = fs_result.ValueOrDie();
auto input_stream_result = fs->OpenInputStream(path);
if (!input_stream_result.status().ok()) {
RETURN_GS_ERROR(ErrorCode::kArrowError,
input_stream_result.status().message());
}
auto input_stream = input_stream_result.ValueOrDie();
// read the vertex chunk begin of vertex label i
auto read_result =
input_stream->Read(sizeof(int64_t), &vertex_chunk_begins_[i]);
if (!read_result.ok()) {
RETURN_GS_ERROR(ErrorCode::kArrowError, read_result.status().message());
}
assert(read_result.ValueOrDie() == sizeof(int64_t));
// read the vertex chunk num of vertex label i
read_result = input_stream->Read(sizeof(int64_t), &vertex_chunk_nums_[i]);
if (!read_result.ok()) {
RETURN_GS_ERROR(ErrorCode::kArrowError, read_result.status().message());
}
assert(read_result.ValueOrDie() == sizeof(int64_t));
} else {
// distribute the vertex chunks for fragments
auto chunk_num_result = GraphArchive::util::GetVertexChunkNum(
graph_info_->GetPrefix(), vertex_info);
RETURN_GS_ERROR_IF_NOT_OK(chunk_num_result.status());
auto chunk_num = chunk_num_result.value();

if (chunk_num <= static_cast<int64_t>(comm_spec_.fnum())) {
if (chunk_num < comm_spec_.fid() + 1) {
// no vertex chunk can be assigned to this fragment
vertex_chunk_begins_[i] = 0;
vertex_chunk_nums_[i] = 0;
} else {
vertex_chunk_begins_[i] = static_cast<int64_t>(comm_spec_.fid());
vertex_chunk_nums_[i] = 1;
}
} else {
int64_t bsize = chunk_num / static_cast<int64_t>(comm_spec_.fnum());
vertex_chunk_begins_[i] =
static_cast<int64_t>(comm_spec_.fid()) * bsize;
if (comm_spec_.fid() == comm_spec_.fnum() - 1) {
vertex_chunk_nums_[i] = chunk_num - vertex_chunk_begins_[i];
} else {
vertex_chunk_nums_[i] = bsize;
}
}
}
BOOST_LEAF_CHECK(initializeVertexChunkBeginAndNum(i, vertex_info));
}
vertex_label_num_ = vertex_labels_.size();
for (size_t i = 0; i < vertex_labels_.size(); ++i) {
Expand All @@ -324,6 +258,84 @@ GARFragmentLoader<OID_T, VID_T, VERTEX_MAP_T>::distributeVertices() {
return {};
}

template <typename OID_T, typename VID_T,
template <typename, typename> class VERTEX_MAP_T>
boost::leaf::result<void>
GARFragmentLoader<OID_T, VID_T, VERTEX_MAP_T>::initializeVertexChunkBeginAndNum(
int vertex_label_index,
const std::shared_ptr<GraphArchive::VertexInfo>& vertex_info) {
if (store_in_local_) {
// distribute the vertex chunks base on the local metadata
const auto& extra_info = graph_info_->GetExtraInfo();
if (extra_info.find(LOCAL_METADATA_KEY) == extra_info.end()) {
RETURN_GS_ERROR(
ErrorCode::kInvalidValueError,
"The local metadata key-value is not found in graph info");
}
std::string local_metadata_prefix = extra_info.at(LOCAL_METADATA_KEY);
std::string path = graph_info_->GetPrefix() + vertex_info->GetPrefix() +
local_metadata_prefix + std::to_string(comm_spec_.fid());

std::shared_ptr<arrow::io::ReadableFile> file;
std::shared_ptr<arrow::fs::FileSystem> fs;

auto fs_result = arrow::fs::FileSystemFromUriOrPath(path);
if (!fs_result.ok()) {
RETURN_GS_ERROR(ErrorCode::kArrowError, fs_result.status().message());
}
fs = fs_result.ValueOrDie();
auto input_stream_result = fs->OpenInputStream(path);
if (!input_stream_result.status().ok()) {
RETURN_GS_ERROR(ErrorCode::kArrowError,
input_stream_result.status().message());
}
auto input_stream = input_stream_result.ValueOrDie();
// read the vertex chunk begin of vertex label i
auto read_result = input_stream->Read(
sizeof(int64_t), &vertex_chunk_begins_[vertex_label_index]);
if (!read_result.ok()) {
RETURN_GS_ERROR(ErrorCode::kArrowError, read_result.status().message());
}
assert(read_result.ValueOrDie() == sizeof(int64_t));
// read the vertex chunk num of vertex label i
read_result = input_stream->Read(sizeof(int64_t),
&vertex_chunk_nums_[vertex_label_index]);
if (!read_result.ok()) {
RETURN_GS_ERROR(ErrorCode::kArrowError, read_result.status().message());
}
assert(read_result.ValueOrDie() == sizeof(int64_t));
} else {
// distribute the vertex chunks for fragments
auto chunk_num_result = GraphArchive::util::GetVertexChunkNum(
graph_info_->GetPrefix(), vertex_info);
RETURN_GS_ERROR_IF_NOT_OK(chunk_num_result.status());
auto chunk_num = chunk_num_result.value();

if (chunk_num <= static_cast<int64_t>(comm_spec_.fnum())) {
if (chunk_num < comm_spec_.fid() + 1) {
// no vertex chunk can be assigned to this fragment
vertex_chunk_begins_[vertex_label_index] = 0;
vertex_chunk_nums_[vertex_label_index] = 0;
} else {
vertex_chunk_begins_[vertex_label_index] =
static_cast<int64_t>(comm_spec_.fid());
vertex_chunk_nums_[vertex_label_index] = 1;
}
} else {
int64_t bsize = chunk_num / static_cast<int64_t>(comm_spec_.fnum());
vertex_chunk_begins_[vertex_label_index] =
static_cast<int64_t>(comm_spec_.fid()) * bsize;
if (comm_spec_.fid() == comm_spec_.fnum() - 1) {
vertex_chunk_nums_[vertex_label_index] =
chunk_num - vertex_chunk_begins_[vertex_label_index];
} else {
vertex_chunk_nums_[vertex_label_index] = bsize;
}
}
}
return {};
}

template <typename OID_T, typename VID_T,
template <typename, typename> class VERTEX_MAP_T>
boost::leaf::result<void>
Expand Down
5 changes: 5 additions & 0 deletions modules/graph/writer/arrow_fragment_writer.h
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ limitations under the License.
namespace GraphArchive {
class GraphInfo;
class EdgeInfo;
class VertexInfo;
enum class AdjListType : std::uint8_t;
} // namespace GraphArchive

Expand Down Expand Up @@ -151,6 +152,10 @@ class ArrowFragmentWriter {
const label_id_t edge_label, const PropertyGraphSchema& graph_schema,
std::vector<std::shared_ptr<arrow::ArrayBuilder>>& builders);

boost::leaf::result<void> writeLocalVertexChunkBeginAndNum(
const std::shared_ptr<GraphArchive::VertexInfo>& vertex_info,
int64_t vertex_chunk_begin, int64_t vertex_chunk_num);

private:
std::shared_ptr<ArrowFragment<oid_t, vid_t>> frag_;
grape::CommSpec comm_spec_;
Expand Down
85 changes: 47 additions & 38 deletions modules/graph/writer/arrow_fragment_writer_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -177,47 +177,11 @@ boost::leaf::result<void> ArrowFragmentWriter<FRAG_T>::WriteVertex(
RETURN_GS_ERROR(ErrorCode::kGraphArError, st.message());
}
if (store_in_local_) {
// write local store meta data
int64_t vertex_chunk_num =
std::ceil(vertex_table->num_rows() /
static_cast<double>(vertex_info->GetChunkSize()));
const auto& extra_info = graph_info_->GetExtraInfo();
std::string local_metadata_prefix = extra_info.at(LOCAL_METADATA_KEY);
std::string path = graph_info_->GetPrefix() + vertex_info->GetPrefix() +
local_metadata_prefix + std::to_string(frag_->fid());

std::shared_ptr<arrow::fs::FileSystem> fs;
auto fs_result = arrow::fs::FileSystemFromUriOrPath(path);
if (!fs_result.ok()) {
RETURN_GS_ERROR(ErrorCode::kArrowError, fs_result.status().message());
}
fs = fs_result.ValueOrDie();
std::shared_ptr<arrow::io::OutputStream> output_stream;
auto output_stream_result = fs->OpenOutputStream(path);
if (!output_stream_result.ok()) {
RETURN_GS_ERROR(ErrorCode::kArrowError,
output_stream_result.status().message());
}
output_stream = output_stream_result.ValueOrDie();

// write vertex chunk index begin and vertex chunk number to local
auto st = output_stream->Write(
reinterpret_cast<const uint8_t*>(&chunk_index_begin),
sizeof(chunk_index_begin));
if (!st.ok()) {
RETURN_GS_ERROR(ErrorCode::kArrowError, st.message());
}
st = output_stream->Write(
reinterpret_cast<const uint8_t*>(&vertex_chunk_num),
sizeof(vertex_chunk_num));
if (!st.ok()) {
RETURN_GS_ERROR(ErrorCode::kArrowError, st.message());
}

st = output_stream->Close();
if (!st.ok()) {
RETURN_GS_ERROR(ErrorCode::kArrowError, st.message());
}
BOOST_LEAF_CHECK(writeLocalVertexChunkBeginAndNum(
vertex_info, chunk_index_begin, vertex_chunk_num));
}

if (store_in_local_ || frag_->fid() == frag_->fnum() - 1) {
Expand Down Expand Up @@ -602,6 +566,51 @@ ArrowFragmentWriter<FRAG_T>::appendPropertiesToArrowArrayBuilders(
return {};
}

template <typename FRAG_T>
boost::leaf::result<void>
ArrowFragmentWriter<FRAG_T>::writeLocalVertexChunkBeginAndNum(
const std::shared_ptr<GraphArchive::VertexInfo>& vertex_info,
int64_t vertex_chunk_begin, int64_t vertex_chunk_num) {
// write local store meta data
const auto& extra_info = graph_info_->GetExtraInfo();
std::string local_metadata_prefix = extra_info.at(LOCAL_METADATA_KEY);
std::string path = graph_info_->GetPrefix() + vertex_info->GetPrefix() +
local_metadata_prefix + std::to_string(frag_->fid());

std::shared_ptr<arrow::fs::FileSystem> fs;
auto fs_result = arrow::fs::FileSystemFromUriOrPath(path);
if (!fs_result.ok()) {
RETURN_GS_ERROR(ErrorCode::kArrowError, fs_result.status().message());
}
fs = fs_result.ValueOrDie();
std::shared_ptr<arrow::io::OutputStream> output_stream;
auto output_stream_result = fs->OpenOutputStream(path);
if (!output_stream_result.ok()) {
RETURN_GS_ERROR(ErrorCode::kArrowError,
output_stream_result.status().message());
}
output_stream = output_stream_result.ValueOrDie();

// write vertex chunk index begin and vertex chunk number to local
auto st = output_stream->Write(
reinterpret_cast<const uint8_t*>(&vertex_chunk_begin),
sizeof(vertex_chunk_begin));
if (!st.ok()) {
RETURN_GS_ERROR(ErrorCode::kArrowError, st.message());
}
st = output_stream->Write(reinterpret_cast<const uint8_t*>(&vertex_chunk_num),
sizeof(vertex_chunk_num));
if (!st.ok()) {
RETURN_GS_ERROR(ErrorCode::kArrowError, st.message());
}

st = output_stream->Close();
if (!st.ok()) {
RETURN_GS_ERROR(ErrorCode::kArrowError, st.message());
}
return {};
}

} // namespace vineyard

#endif // ENABLE_GAR
Expand Down

0 comments on commit 499d0c6

Please sign in to comment.