Skip to content

Commit

Permalink
sequential algorithm runable
Browse files Browse the repository at this point in the history
  • Loading branch information
SiberiaWolfP committed Apr 1, 2024
1 parent a50334c commit df01f0e
Show file tree
Hide file tree
Showing 3 changed files with 160 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ void LogicalPathFindingOperator::ResolveTypes() {
types = children[0]->types;
auto right_types = children[1]->types;
types.insert(types.end(), right_types.begin(), right_types.end());
// types = {LogicalType::BIGINT, LogicalType::BIGINT};
}

string LogicalPathFindingOperator::ParamsToString() const {
Expand Down
163 changes: 156 additions & 7 deletions duckpgq/src/duckpgq/operators/physical_path_finding_operator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -70,9 +70,9 @@ class PathFindingLocalState : public LocalSinkState {
if (global_csr.is_ready) {
// Add the tasks (src, dst) to sink
// Optimizations: Eliminate duplicate sources/destinations
input.Print();
// input.Print();
local_tasks.Append(input);
local_tasks.Print();
// local_tasks.Print();
return;
}
CreateCSR(input, global_csr);
Expand Down Expand Up @@ -112,7 +112,7 @@ void PathFindingLocalState::CreateCSR(DataChunk &input,
global_csr.edge_ids[static_cast<int64_t>(pos) - 1] = edge_id;
return 1;
});
global_csr.Print(); // Debug print
// global_csr.Print(); // Debug print
}

class PathFindingGlobalState : public GlobalSinkState {
Expand All @@ -129,18 +129,22 @@ class PathFindingGlobalState : public GlobalSinkState {

PathFindingGlobalState(PathFindingGlobalState &prev)
: GlobalSinkState(prev), global_tasks(prev.global_tasks),
scan_state(prev.scan_state), append_state(prev.append_state),
global_csr(std::move(prev.global_csr)), child(prev.child + 1) {}

void Sink(DataChunk &input, PathFindingLocalState &lstate) const {
lstate.Sink(input, *global_csr);
}

unique_ptr<GlobalCompressedSparseRow> global_csr;
size_t child;

// pairs is a 2-column table with src and dst
ColumnDataCollection global_tasks;
// pairs with path exists
// ColumnDataCollection global_results;
ColumnDataScanState scan_state;
ColumnDataAppendState append_state;

unique_ptr<GlobalCompressedSparseRow> global_csr;
size_t child;
};

unique_ptr<GlobalSinkState>
Expand Down Expand Up @@ -178,18 +182,163 @@ PhysicalPathFinding::Combine(ExecutionContext &context,

gstate.global_tasks.Combine(lstate.local_tasks);
client_profiler.Flush(context.thread.profiler);
gstate.global_tasks.Print();
// gstate.global_tasks.Print();
return SinkCombineResultType::FINISHED;
}

//===--------------------------------------------------------------------===//
// Finalize
//===--------------------------------------------------------------------===//

static bool IterativeLength(int64_t v_size, int64_t *v, vector<int64_t> &e,
vector<std::bitset<LANE_LIMIT>> &seen,
vector<std::bitset<LANE_LIMIT>> &visit,
vector<std::bitset<LANE_LIMIT>> &next) {
bool change = false;
for (auto i = 0; i < v_size; i++) {
next[i] = 0;
}
for (auto i = 0; i < v_size; i++) {
if (visit[i].any()) {
for (auto offset = v[i]; offset < v[i + 1]; offset++) {
auto n = e[offset];
next[n] = next[n] | visit[i];
}
}
}
for (auto i = 0; i < v_size; i++) {
next[i] = next[i] & ~seen[i];
seen[i] = seen[i] | next[i];
change |= next[i].any();
}
return change;
}

static void IterativeLengthFunction(const unique_ptr<PathFindingGlobalState::GlobalCompressedSparseRow> &csr,
DataChunk &pairs, Vector &result) {
int64_t v_size = csr->v_size;
int64_t *v = (int64_t *)csr->v;
vector<int64_t> &e = csr->e;

// get src and dst vectors for searches
auto &src = pairs.data[0];
auto &dst = pairs.data[1];
UnifiedVectorFormat vdata_src;
UnifiedVectorFormat vdata_dst;
src.ToUnifiedFormat(pairs.size(), vdata_src);
dst.ToUnifiedFormat(pairs.size(), vdata_dst);

auto src_data = FlatVector::GetData<int64_t>(src);
auto dst_data = FlatVector::GetData<int64_t>(dst);

ValidityMask &result_validity = FlatVector::Validity(result);

// create result vector
result.SetVectorType(VectorType::FLAT_VECTOR);
auto result_data = FlatVector::GetData<int64_t>(result);

// create temp SIMD arrays
vector<std::bitset<LANE_LIMIT>> seen(v_size);
vector<std::bitset<LANE_LIMIT>> visit1(v_size);
vector<std::bitset<LANE_LIMIT>> visit2(v_size);

// maps lane to search number
short lane_to_num[LANE_LIMIT];
for (int64_t lane = 0; lane < LANE_LIMIT; lane++) {
lane_to_num[lane] = -1; // inactive
}

idx_t started_searches = 0;
while (started_searches < pairs.size()) {

// empty visit vectors
for (auto i = 0; i < v_size; i++) {
seen[i] = 0;
visit1[i] = 0;
}

// add search jobs to free lanes
uint64_t active = 0;
for (int64_t lane = 0; lane < LANE_LIMIT; lane++) {
lane_to_num[lane] = -1;
while (started_searches < pairs.size()) {
int64_t search_num = started_searches++;
int64_t src_pos = vdata_src.sel->get_index(search_num);
int64_t dst_pos = vdata_dst.sel->get_index(search_num);
if (!vdata_src.validity.RowIsValid(src_pos)) {
result_validity.SetInvalid(search_num);
result_data[search_num] = (uint64_t)-1; /* no path */
} else if (src_data[src_pos] == dst_data[dst_pos]) {
result_data[search_num] =
(uint64_t)0; // path of length 0 does not require a search
} else {
visit1[src_data[src_pos]][lane] = true;
lane_to_num[lane] = search_num; // active lane
active++;
break;
}
}
}

// make passes while a lane is still active
for (int64_t iter = 1; active; iter++) {
if (!IterativeLength(v_size, v, e, seen, (iter & 1) ? visit1 : visit2,
(iter & 1) ? visit2 : visit1)) {
break;
}
// detect lanes that finished
for (int64_t lane = 0; lane < LANE_LIMIT; lane++) {
int64_t search_num = lane_to_num[lane];
if (search_num >= 0) { // active lane
int64_t dst_pos = vdata_dst.sel->get_index(search_num);
if (seen[dst_data[dst_pos]][lane]) {
result_data[search_num] =
iter; /* found at iter => iter = path length */
lane_to_num[lane] = -1; // mark inactive
active--;
}
}
}
}

// no changes anymore: any still active searches have no path
for (int64_t lane = 0; lane < LANE_LIMIT; lane++) {
int64_t search_num = lane_to_num[lane];
if (search_num >= 0) { // active lane
result_validity.SetInvalid(search_num);
result_data[search_num] = (int64_t)-1; /* no path */
lane_to_num[lane] = -1; // mark inactive
}
}
}
}


SinkFinalizeType
PhysicalPathFinding::Finalize(Pipeline &pipeline, Event &event,
ClientContext &context,
OperatorSinkFinalizeInput &input) const {
auto &gstate = input.global_state.Cast<PathFindingGlobalState>();
auto &csr = gstate.global_csr;
auto &global_tasks = gstate.global_tasks;
if (global_tasks.Count() != 0) {
DataChunk pairs;
global_tasks.InitializeScanChunk(pairs);
ColumnDataScanState scan_state;
global_tasks.InitializeScan(scan_state);
while (global_tasks.Scan(scan_state, pairs)) {
Vector result(LogicalType::BIGINT, true, true);
IterativeLengthFunction(csr, pairs, result);
// store the result
// gstate.global_results.InitializeAppend(gstate.append_state);
// gstate.global_results.Append(gstate.append_state, pairs);
// // debug print
// gstate.global_results.Print();
}
}

// Move to the next input child
++gstate.child;

return SinkFinalizeType::READY;
}
Expand Down
5 changes: 3 additions & 2 deletions test/sql/path-finding/parallel_path_finding.test
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
require duckpgq

statement ok
CREATE TABLE pairs(src INT, dst INT); INSERT INTO pairs(src, dst) VALUES (0, 1), (1, 2), (2,0);
CREATE TABLE pairs(src BIGINT, dst BIGINT); INSERT INTO pairs(src, dst) VALUES (0, 1), (1, 2), (2,0);

statement ok
create table student(id INT); INSERT INTO student(id) VALUES (10), (20), (30), (40);
Expand Down Expand Up @@ -38,7 +38,7 @@ knows SOURCE KEY (src) REFERENCES student (id)
# COLUMNS (*)
# );

statement ok
query II
SELECT *
FROM pairs AS p
WHERE p.src BETWEEN (SELECT CREATE_CSR_EDGE(
Expand All @@ -55,6 +55,7 @@ WHERE p.src BETWEEN (SELECT CREATE_CSR_EDGE(
LEFT JOIN knows k ON k.src = a.id
GROUP BY a.rowid) t
ON t.a_rowid = a.rowid) AND p.dst;
----


# CAST (
Expand Down

0 comments on commit df01f0e

Please sign in to comment.