Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

multiple mpmc queue #37

Open
wants to merge 27 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
27 commits
Select commit Hold shift + click to select a range
30ac35f
Changed spsc queue to mpmc queue.
Oct 19, 2020
caf826b
run experiments.
Oct 26, 2020
74b8e6a
added multiple mpmc queue.
Oct 31, 2020
48c6a19
revised concurrent queue.
Nov 1, 2020
f208a2d
applied partitioning the tasks.
Nov 3, 2020
04e1570
get debug info for the project.
Nov 3, 2020
a2b1824
multiple mpmc queue supports partitioning.
Nov 13, 2020
a1bd255
added bulk update/
Nov 15, 2020
6f2cb0b
integrated google benchmark
Nov 16, 2020
23056e2
added delete to the benchmark
Nov 16, 2020
22ba5f5
commented google benchmark.
Nov 19, 2020
a5c4723
added work stealing for threads.
Nov 19, 2020
c9b1f99
added global mpmcQ
Nov 25, 2020
a248f78
Merge branch 'mpmcQ' of https://github.com/domargan/parallel-packed-csr
Nov 25, 2020
649e5cc
Merge branch 'numa-mpmcQ' of https://github.com/domargan/parallel-pac…
Nov 25, 2020
097a2c2
added multiple mpmcQ
Nov 25, 2020
6383a67
removed bulk update.
Nov 25, 2020
9fefbef
removed work stealing
Nov 25, 2020
f8acffe
revised the number of queues.
Nov 29, 2020
2736a9d
added cluster threads distribution.
Dec 3, 2020
8e934ad
removed function calls for number of cpus
Dec 3, 2020
bd19872
added balanced queue distribution.
Dec 4, 2020
950e017
incorporated constraint of the number of queues.
Dec 4, 2020
800277a
balanced threads distributin over queues and activated work stealing.
Dec 6, 2020
d0033a3
clustered threads distribution.
Dec 6, 2020
6477257
corrected the name of thread to partition mapping datastructure.
Dec 7, 2020
d0b7a43
added support of clustered/balanced thread distrivution.
Dec 7, 2020
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 11 additions & 0 deletions CMakeLists.txt
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
cmake_minimum_required(VERSION 3.8 FATAL_ERROR)
set(CMAKE_VERBOSE_MAKEFILE ON)
include(ExternalProject)

project(parallel-packed-csr VERSION 0.1 LANGUAGES CXX)

Expand All @@ -14,6 +15,11 @@ set(CMAKE_CXX_FLAGS_RELEASE "-O3")

set(PROJECT_SOURCE_DIR ${CMAKE_SOURCE_DIR}/src)

ExternalProject_Add(googlebenchmark
URL "https://github.com/google/benchmark/archive/v1.5.0.tar.gz"
CMAKE_ARGS -DCMAKE_INSTALL_PREFIX=${parallel-packed-csr_BINARY_DIR}/deps -DBENCHMARK_DOWNLOAD_DEPENDENCIES=ON -DBENCHMARK_ENABLE_GTEST_TESTS=OFF -DCMAKE_BUILD_TYPE=${CMAKE_BUILD_TYPE}
)

file(GLOB_RECURSE parallel-packed-csr_SOURCES "${PROJECT_SOURCE_DIR}/*.cpp")
file(GLOB_RECURSE parallel-packed-csr_HEADERS "${PROJECT_SOURCE_DIR}/*.h")

Expand All @@ -38,6 +44,11 @@ set(THREADS_PREFER_PTHREAD_FLAG ON)
find_package(Threads REQUIRED)
target_link_libraries(parallel-packed-csr PRIVATE Threads::Threads numa)

add_dependencies(parallel-packed-csr googlebenchmark)
target_link_libraries(parallel-packed-csr PRIVATE ${parallel-packed-csr_BINARY_DIR}/deps/lib/${CMAKE_SHARED_LIBRARY_PREFIX}benchmark.a)
target_link_libraries(parallel-packed-csr PRIVATE ${parallel-packed-csr_BINARY_DIR}/deps/lib/${CMAKE_SHARED_LIBRARY_PREFIX}benchmark_main.a)
target_include_directories(parallel-packed-csr SYSTEM PUBLIC ${parallel-packed-csr_BINARY_DIR}/deps/include)

list(REMOVE_ITEM parallel-packed-csr_SOURCES ${PROJECT_SOURCE_DIR}/main.cpp)
add_executable(tests ${parallel-packed-csr_SOURCES} ${parallel-packed-csr_TEST_SOURCES})
add_executable(tests-tsan ${parallel-packed-csr_SOURCES} ${parallel-packed-csr_TEST_SOURCES})
Expand Down
19 changes: 15 additions & 4 deletions src/main.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,17 @@
#include <thread>
#include <utility>
#include <vector>
#include <functional>
#include <map>

#include "thread_pool/thread_pool.h"
#include "thread_pool_pppcsr/thread_pool_pppcsr.h"

#include <benchmark/benchmark.h>

using namespace std;

enum class Operation { READ, ADD, DELETE };
// enum class Operation { READ, ADD, DELETE };

// Reads edge list with separator
pair<vector<tuple<Operation, int, int>>, int> read_input(string filename, Operation defaultOp) {
Expand Down Expand Up @@ -85,6 +89,8 @@ void update_existing_graph(const vector<tuple<Operation, int, int>> &input, Thre
template <typename ThreadPool_t>
void execute(int threads, int size, const vector<tuple<Operation, int, int>> &core_graph,
const vector<tuple<Operation, int, int>> &updates, std::unique_ptr<ThreadPool_t> &thread_pool) {


// Load core graph
update_existing_graph(core_graph, thread_pool.get(), threads, core_graph.size());
// Do updates
Expand Down Expand Up @@ -114,6 +120,7 @@ int main(int argc, char *argv[]) {
int num_nodes = 0;
bool lock_search = true;
bool insert = true;
bool balance = true;
Version v = Version::PPPCSRNUMA;
int partitions_per_domain = 1;
vector<tuple<Operation, int, int>> core_graph;
Expand All @@ -130,6 +137,10 @@ int main(int argc, char *argv[]) {
insert = true;
} else if (s.rfind("-delete", 0) == 0) {
insert = false;
} else if (s.rfind("-balance", 0) == 0) {
balance = true;
} else if (s.rfind("-cluster", 0) == 0) {
balance = false;
} else if (s.rfind("-pppcsrnuma", 0) == 0) {
v = Version::PPPCSRNUMA;
} else if (s.rfind("-pppcsr", 0) == 0) {
Expand Down Expand Up @@ -174,16 +185,16 @@ int main(int argc, char *argv[]) {
}
case Version::PPPCSR: {
auto thread_pool =
make_unique<ThreadPoolPPPCSR>(threads, lock_search, num_nodes + 1, partitions_per_domain, false);
make_unique<ThreadPoolPPPCSR>(threads, lock_search, num_nodes + 1, partitions_per_domain, false, balance);
execute(threads, size, core_graph, updates, thread_pool);
break;
}
default: {
auto thread_pool =
make_unique<ThreadPoolPPPCSR>(threads, lock_search, num_nodes + 1, partitions_per_domain, true);
make_unique<ThreadPoolPPPCSR>(threads, lock_search, num_nodes + 1, partitions_per_domain, true, balance);
execute(threads, size, core_graph, updates, thread_pool);
}
}

return 0;
}
}
2 changes: 1 addition & 1 deletion src/pcsr/PCSR.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ void PCSR::resizeEdgeArray(size_t newSize) {
edges.N = newSize;
edges.logN = (1 << bsr_word(bsr_word(edges.N) * 2 + 1));
edges.H = bsr_word(edges.N / edges.logN);
std::cout << "Edges: " << edges.N << " logN: " << edges.logN << " #count: " << edges.N / edges.logN << std::endl;
// std::cout << "Edges: " << edges.N << " logN: " << edges.logN << " #count: " << edges.N / edges.logN << std::endl;
}

void PCSR::clear() {
Expand Down
2 changes: 2 additions & 0 deletions src/pcsr/PCSR.h
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@ using namespace std;
#ifndef PCSR2_PCSR_H
#define PCSR2_PCSR_H

enum class Operation { READ, ADD, DELETE };

/** Types */
typedef struct _node {
// beginning and end of the associated region in the edge list
Expand Down
2 changes: 1 addition & 1 deletion src/pppcsr/PPPCSR.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ PPPCSR::PPPCSR(uint32_t init_n, uint32_t src_n, bool lock_search, int numDomain,
partitions.emplace_back(partitionSize, partitionSize, lock_search, (use_numa) ? i : -1);
}
}
cout << "Number of partitions: " << partitions.size() << std::endl;
// cout << "Number of partitions: " << partitions.size() << std::endl;
}

bool PPPCSR::edge_exists(uint32_t src, uint32_t dest) {
Expand Down
145 changes: 83 additions & 62 deletions src/thread_pool_pppcsr/thread_pool_pppcsr.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -11,39 +11,53 @@
#include <mutex>
#include <thread>
#include <vector>
#include <cmath>

using namespace std;

/**
* Initializes a pool of threads. Every thread has its own task queue.
*/
ThreadPoolPPPCSR::ThreadPoolPPPCSR(const int NUM_OF_THREADS, bool lock_search, uint32_t init_num_nodes,
int partitions_per_domain, bool use_numa)
: tasks(NUM_OF_THREADS),
finished(false),
available_nodes(std::min(numa_max_node() + 1, NUM_OF_THREADS)),
int partitions_per_domain, bool use_numa, bool balance)
: finished(false),
available_nodes(numa_max_node() + 1),
indeces(available_nodes, 0),
partitions_per_domain(partitions_per_domain),
threadToDomain(NUM_OF_THREADS),
threadToQueue(NUM_OF_THREADS),
firstThreadDomain(available_nodes, 0),
numThreadsDomain(available_nodes) {
pcsr = new PPPCSR(init_num_nodes, init_num_nodes, lock_search, available_nodes, partitions_per_domain, use_numa);

int d = available_nodes;
int minNumThreads = NUM_OF_THREADS / d;
int threshold = NUM_OF_THREADS % d;
int counter = 0;
int currentDomain = 0;

for (int i = 0; i < NUM_OF_THREADS; i++) {
threadToDomain[i] = currentDomain;
counter++;
if (counter == minNumThreads + (currentDomain < threshold)) {
numThreadsDomain[currentDomain] = counter;
firstThreadDomain[currentDomain] = i - counter + 1;
counter = 0;
currentDomain++;
}
numThreadsDomain(available_nodes),
balance(balance) {

if(balance){
numberOfQueues = min(available_nodes * partitions_per_domain, NUM_OF_THREADS);
}
else{
auto threadsPerDomain = thread::hardware_concurrency()/available_nodes;
numberOfQueues = ceil(NUM_OF_THREADS/ (double)threadsPerDomain);
}

tasks = vector<moodycamel::ConcurrentQueue<task>>(numberOfQueues);

pcsr =
new PPPCSR(init_num_nodes, init_num_nodes, lock_search, available_nodes, partitions_per_domain, use_numa);

int d = available_nodes;
int minNumThreads = NUM_OF_THREADS / d;
int threshold = NUM_OF_THREADS % d;
int counter = 0;
int currentDomain = 0;

for (int i = 0; i < NUM_OF_THREADS; i++) {
threadToDomain[i] = currentDomain;
counter++;
if (counter == minNumThreads + (currentDomain < threshold)) {
numThreadsDomain[currentDomain] = counter;
firstThreadDomain[currentDomain] = i - counter + 1;
counter = 0;
currentDomain++;
}
}
}

Expand All @@ -52,21 +66,25 @@ ThreadPoolPPPCSR::ThreadPoolPPPCSR(const int NUM_OF_THREADS, bool lock_search, u
// Finishes when finished is set to true and there are no outstanding tasks
template <bool isMasterThread>
void ThreadPoolPPPCSR::execute(const int thread_id) {
cout << "Thread " << thread_id << " has " << tasks[thread_id].size() << " tasks, runs on domain "
<< threadToDomain[thread_id] << endl;
if (numa_available() >= 0) {
numa_run_on_node(threadToDomain[thread_id]);
}
int registered = -1;
auto queue_id = threadToQueue[thread_id];
auto queueCounter = 1;

while (!tasks[queue_id].empty() || (!isMasterThread && !finished)) {
while(queueCounter <= numberOfQueues &&
tasks[queue_id].empty()){
queue_id = (queue_id + 1) % numberOfQueues;
queueCounter++;
}
if (!tasks[queue_id].empty()) {
task t = tasks[queue_id].front();

while (!tasks[thread_id].empty() || (!isMasterThread && !finished)) {
if (!tasks[thread_id].empty()) {
task t = tasks[thread_id].front();
tasks[thread_id].pop();

int currentPar = pcsr->get_partiton(t.src);
int currentPar = threadToQueue[thread_id];

if (registered != currentPar) {
if (registered != currentPar) {
if (registered != -1) {
pcsr->unregisterThread(registered);
}
Expand All @@ -81,10 +99,10 @@ void ThreadPoolPPPCSR::execute(const int thread_id) {
pcsr->read_neighbourhood(t.src);
}
} else {
if (registered != -1) {
pcsr->unregisterThread(registered);
registered = -1;
}
if (registered != -1) {
pcsr->unregisterThread(registered);
registered = -1;
}
}
}
if (registered != -1) {
Expand All @@ -94,50 +112,53 @@ void ThreadPoolPPPCSR::execute(const int thread_id) {

// Submit an update for edge {src, target} to thread with number thread_id
void ThreadPoolPPPCSR::submit_add(int thread_id, int src, int target) {
(void)thread_id;
auto par = pcsr->get_partiton(src) / partitions_per_domain;
auto index = (indeces[par]++) % numThreadsDomain[par];
tasks[firstThreadDomain[par] + index].push(task{true, false, src, target});
auto par = pcsr->get_partiton(src);
auto queue_id = balance ? queueTurn : par % numberOfQueues;
threadToQueue[thread_id] = queue_id;
queueTurn = (queueTurn + 1) % numberOfQueues;
tasks[queue_id].push(task{true, false, src, target});
}

// Submit a delete edge task for edge {src, target} to thread with number thread_id
void ThreadPoolPPPCSR::submit_delete(int thread_id, int src, int target) {
(void)thread_id;
auto par = pcsr->get_partiton(src) / partitions_per_domain;
auto index = (indeces[par]++) % numThreadsDomain[par];
tasks[firstThreadDomain[par] + index].push(task{false, false, src, target});
auto par = pcsr->get_partiton(src);
auto queue_id = balance ? queueTurn : par % numberOfQueues;
threadToQueue[thread_id] = queue_id;
queueTurn = (queueTurn + 1) % numberOfQueues;
tasks[queue_id].push(task{false, false, src, target});
}

// Submit a read neighbourhood task for vertex src to thread with number thread_id
void ThreadPoolPPPCSR::submit_read(int thread_id, int src) {
(void)thread_id;
auto par = pcsr->get_partiton(src) / partitions_per_domain;
auto index = (indeces[par]++) % numThreadsDomain[par];
tasks[firstThreadDomain[par] + index].push(task{false, true, src, src});
auto par = pcsr->get_partiton(src);
auto queue_id = balance ? queueTurn : par % numberOfQueues;
threadToQueue[thread_id] = queue_id;
queueTurn = (queueTurn + 1) % numberOfQueues;
tasks[queue_id].push(task{false, true, src, src});
}

// starts a new number of threads
// number of threads is passed to the constructor
void ThreadPoolPPPCSR::start(int threads) {
s = chrono::steady_clock::now();
finished = false;
// finished = false;

for (int i = 1; i < threads; i++) {
thread_pool.push_back(thread(&ThreadPoolPPPCSR::execute<false>, this, i));
// Pin thread to core
// cpu_set_t cpuset;
// CPU_ZERO(&cpuset);
// CPU_SET((i * 4), &cpuset);
// if (i >= 4) {
// CPU_SET(1 + (i * 4), &cpuset);
// } else {
// CPU_SET(i * 4, &cpuset);
// }
// int rc = pthread_setaffinity_np(thread_pool.back().native_handle(),
// sizeof(cpu_set_t), &cpuset);
// if (rc != 0) {
// cout << "error pinning thread" << endl;
// }
// cpu_set_t cpuset;
// CPU_ZERO(&cpuset);
// CPU_SET((i * 4), &cpuset);
// if (i >= 4) {
// CPU_SET(1 + (i * 4), &cpuset);
// } else {
// CPU_SET(i * 4, &cpuset);
// }
// int rc = pthread_setaffinity_np(thread_pool.back().native_handle(),
// sizeof(cpu_set_t), &cpuset);
// if (rc != 0) {
// cout << "error pinning thread" << endl;
// }
}
execute<true>(0);
}
Expand All @@ -148,7 +169,7 @@ void ThreadPoolPPPCSR::stop() {
finished = true;
for (auto &&t : thread_pool) {
if (t.joinable()) t.join();
cout << "Done" << endl;
// cout << "Done" << endl;
}
end = chrono::steady_clock::now();
cout << "Elapsed wall clock time: " << chrono::duration_cast<chrono::milliseconds>(end - s).count() << endl;
Expand Down
11 changes: 9 additions & 2 deletions src/thread_pool_pppcsr/thread_pool_pppcsr.h
Original file line number Diff line number Diff line change
Expand Up @@ -8,18 +8,21 @@
#include <vector>

#include "../pppcsr/PPPCSR.h"
#include "../utility/concurrentqueue.h"
#include "task.h"

using namespace std;
#ifndef PPPCSR_THREAD_POOL_H
#define PPPCSR_THREAD_POOL_H

// enum class Operation { READ, ADD, DELETE };

class ThreadPoolPPPCSR {
public:
PPPCSR *pcsr;

explicit ThreadPoolPPPCSR(const int NUM_OF_THREADS, bool lock_search, uint32_t init_num_nodes,
int partitions_per_domain, bool use_numa);
int partitions_per_domain, bool use_numa, bool balance);
~ThreadPoolPPPCSR() = default;
/** Public API */
void submit_add(int thread_id, int src, int dest); // submit task to thread {thread_id} to insert edge {src, dest}
Expand All @@ -30,7 +33,8 @@ class ThreadPoolPPPCSR {

private:
vector<thread> thread_pool;
vector<queue<task>> tasks;
vector<moodycamel::ConcurrentQueue<task>> tasks;
size_t numberOfQueues;
chrono::steady_clock::time_point s;
chrono::steady_clock::time_point end;
std::atomic_bool finished;
Expand All @@ -39,9 +43,12 @@ class ThreadPoolPPPCSR {
void execute(int);

const int available_nodes;
const bool balance;
size_t queueTurn = 0;
std::vector<unsigned> indeces;
int partitions_per_domain = 1;
std::vector<int> threadToDomain;
std::vector<int> threadToQueue;
std::vector<int> firstThreadDomain;
std::vector<int> numThreadsDomain;
};
Expand Down
Loading