Skip to content

Commit

Permalink
snapshot id + 1
Browse files Browse the repository at this point in the history
  • Loading branch information
siyuan0322 committed Oct 22, 2024
1 parent 3b6adce commit 59fec6a
Show file tree
Hide file tree
Showing 3 changed files with 5 additions and 15 deletions.
16 changes: 3 additions & 13 deletions interactive_engine/executor/store/mcsr/src/graph_partitioner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -214,19 +214,16 @@ impl<G: FromStr + Send + Sync + IndexType + Eq> GraphPartitioner<G> {
{
let input_path = vertex_file
.as_os_str()
.clone()
.to_str()
.unwrap();
let input_dir_path = self
.input_dir
.as_os_str()
.clone()
.to_str()
.unwrap();
let output_path = if let Some(pos) = input_path.find(input_dir_path) {
self.partition_dir.join(
input_path
.clone()
.split_at(pos + input_dir_path.len() + 1)
.1,
)
Expand Down Expand Up @@ -260,21 +257,18 @@ impl<G: FromStr + Send + Sync + IndexType + Eq> GraphPartitioner<G> {
{
let input_path = vertex_file
.as_os_str()
.clone()
.to_str()
.unwrap();
let input_dir_path = self
.input_dir
.as_os_str()
.clone()
.to_str()
.unwrap();
let gz_loc = input_path.find(".gz").unwrap();
let input_path = input_path.split_at(gz_loc).0;
let output_path = if let Some(pos) = input_path.find(input_dir_path) {
self.partition_dir.join(
input_path
.clone()
.split_at(pos + input_dir_path.len() + 1)
.1,
)
Expand Down Expand Up @@ -327,18 +321,16 @@ impl<G: FromStr + Send + Sync + IndexType + Eq> GraphPartitioner<G> {
.unwrap()
.ends_with(".csv")
{
info!("{}", edge_file.as_os_str().clone().to_str().unwrap());
let input_path = edge_file.as_os_str().clone().to_str().unwrap();
info!("{}", edge_file.as_os_str().to_str().unwrap());
let input_path = edge_file.as_os_str().to_str().unwrap();
let input_dir_path = self
.input_dir
.as_os_str()
.clone()
.to_str()
.unwrap();
let output_path = if let Some(pos) = input_path.find(input_dir_path) {
self.partition_dir.join(
input_path
.clone()
.split_at(pos + input_dir_path.len() + 1)
.1,
)
Expand Down Expand Up @@ -373,19 +365,17 @@ impl<G: FromStr + Send + Sync + IndexType + Eq> GraphPartitioner<G> {
.unwrap()
.ends_with(".csv.gz")
{
let input_path = edge_file.as_os_str().clone().to_str().unwrap();
let input_path = edge_file.as_os_str().to_str().unwrap();
let input_dir_path = self
.input_dir
.as_os_str()
.clone()
.to_str()
.unwrap();
let gz_loc = input_path.find(".gz").unwrap();
let input_path = input_path.split_at(gz_loc).0;
let output_path = if let Some(pos) = input_path.find(input_dir_path) {
self.partition_dir.join(
input_path
.clone()
.split_at(pos + input_dir_path.len() + 1)
.1,
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -259,7 +259,7 @@ public List<Long> replayDMLRecordsFrom(long offset, long timestamp) throws IOExc
logWriter.append(storeId, new LogEntry(batchSnapshotId, batch));
replayCount++;
}
ids.add(batchSnapshotId);
ids.add(batchSnapshotId + 1);
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -393,6 +393,6 @@ public List<Long> replayDMLRecordsFrom(long offset, long timestamp) throws IOExc
replayInProgress.set(false);
}
logger.info("replay DML records finished. total replayed [{}] records", replayCount);
return List.of(batchSnapshotId);
return List.of(batchSnapshotId + 1);
}
}

0 comments on commit 59fec6a

Please sign in to comment.