Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Multithreading #4

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -6,3 +6,7 @@ find_package(Threads)
add_executable(memsetVsMadvise memsetVsMadvise.cpp)
set_property(TARGET memsetVsMadvise PROPERTY CXX_STANDARD 11)
target_link_libraries(memsetVsMadvise PRIVATE ${CMAKE_THREAD_LIBS_INIT} gflags)

add_executable(stress stress_test/Main.cpp stress_test/Producers.cpp stress_test/Mixer.cpp stress_test/ToFreeQueue.cpp)
set_property(TARGET stress PROPERTY CXX_STANDARD 14)
target_link_libraries(stress PRIVATE gflags)
60 changes: 60 additions & 0 deletions stress_test/Main.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
#include <chrono>
#include <iostream>
#include <thread>
#include <vector>

#include <gflags/gflags.h>

#include "Mixer.h"

DEFINE_int32(num_producers, 100, "number of producers to run");
DEFINE_int32(num_threads, 1, "number of threads to run");

using std::shared_ptr;
using std::vector;

void createAndRunMixer(vector<shared_ptr<Producer>> producers, int me,
vector<shared_ptr<ToFreeQueue>> toFreeQueues) {
Mixer m(producers, FLAGS_num_producers, me, toFreeQueues);
m.run();
}

int main(int argc, char **argv) {
gflags::ParseCommandLineFlags(&argc, &argv, true);

// Initialize producers
vector<shared_ptr<Producer>> producers;
producers.push_back(
shared_ptr<SimpleProducer>(new SimpleProducer(8, 100000)));
producers.push_back(shared_ptr<VectorProducer>(
new VectorProducer(100000, std::chrono::duration<double>(1.0))));

// Set up a work queue for each thread
vector<std::thread> threads;
vector<shared_ptr<ToFreeQueue>> toFreeQueues;
for (int i = 0; i < FLAGS_num_threads; i++) {
auto toFreeQ = shared_ptr<ToFreeQueue>(new ToFreeQueue());
toFreeQueues.push_back(toFreeQ);
}

for (int i = 0; i < FLAGS_num_threads; i++) {
// each thread gets an arbitrary id given by [i]
threads.push_back(
std::thread(createAndRunMixer, producers, i, toFreeQueues));
}

using namespace std::chrono;

high_resolution_clock::time_point beginTime = high_resolution_clock::now();
for (auto& t : threads) {
t.join();
}
// Cleanup any remaining memory
for (int i = 0; i < FLAGS_num_threads; i++) {
toFreeQueues[i]->freeIgnoreLifetime();
}
high_resolution_clock::time_point endTime = high_resolution_clock::now();

duration<double> span = duration_cast<duration<double>>(endTime - beginTime);
std::cout << "Elapsed time: " << span.count() << std::endl;
}
33 changes: 33 additions & 0 deletions stress_test/Mixer.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
#include "Mixer.h"

Mixer::Mixer(vector<shared_ptr<Producer>> producers, int numProducers, int me,
vector<shared_ptr<ToFreeQueue>> toFreeQueues)
: producers_(producers), producersRemaining_(numProducers),
toFreeQueues_(toFreeQueues), me_(me),
producerIdPicker_(0, producers.size() - 1),
consumerIdPicker_(0, toFreeQueues.size() - 1) {}

// Picks next producer for the mixer to run. Currently uniform random choice
const Producer& Mixer::pickProducer() {
int producerIndex = this->producerIdPicker_(this->generator_);
return *(this->producers_[producerIndex]);
}

// Picks next producer for the mixer to run. Currently uniform random choice
ToFreeQueue& Mixer::pickConsumer() {
int consumerIndex = this->consumerIdPicker_(this->generator_);
return *(this->toFreeQueues_[consumerIndex]);
}

void Mixer::run() {
while (this->producersRemaining_ > 0) {
this->toFreeQueues_[this->me_]->free();
// otherwise run a random producer
Allocation a = this->pickProducer().run();
if (!a.isEmpty()) {
this->pickConsumer().addToFree(std::move(a));
}
producersRemaining_--;
}
// Main loop will eventually cleanup memory
}
36 changes: 36 additions & 0 deletions stress_test/Mixer.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
#pragma once

#include <random>
#include <vector>

#include "Producers.h"
#include "ToFreeQueue.h"

using std::shared_ptr;
using std::vector;

class Mixer {
public:
void run();
Mixer(vector<shared_ptr<Producer>> producers, int numProducers, int me,
vector<shared_ptr<ToFreeQueue>> toFreeQueues);

private:
/* maintains reverse-sorted order by lifetime; i.e. [push_back] yields the
* allocation that should be deallocated the soonest */
vector<Allocation> allocated_;
vector<shared_ptr<Producer>> producers_;
int producersRemaining_;
// the thread id that this mixer is running on
int me_;
// work queues for each thread indexed by thread number
vector<shared_ptr<ToFreeQueue>> toFreeQueues_;
// Picks next producer for the mixer to run. Currently uniform random choice.
const Producer& pickProducer();
// Picks a consumer to free memory allocated by a producer. Currently uniform
// random choice.
ToFreeQueue& pickConsumer();
std::uniform_int_distribution<int> producerIdPicker_;
std::uniform_int_distribution<int> consumerIdPicker_;
std::default_random_engine generator_;
};
81 changes: 81 additions & 0 deletions stress_test/Producers.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
#include "Producers.h"

#include <iostream>

// Allocation

bool Allocation::operator<(const Allocation& that) const {
return this->toFree_ < that.toFree_;
}

bool Allocation::operator>(const Allocation& that) const {
return !(*this < that);
}

bool Allocation::isEmpty() const { return this->toFree_.size() == 0; }

std::chrono::high_resolution_clock::time_point Allocation::freeAfter() const {
return this->freeAfter_;
}

Allocation::Allocation(std::vector<void *> toFree,
std::chrono::high_resolution_clock::time_point freeAfter)
: toFree_(toFree), freeAfter_(freeAfter) {}

Allocation::~Allocation() {
for (auto it = begin(this->toFree_); it != end(this->toFree_); ++it) {
free(*it);
}
}

// Simple Producer

SimpleProducer::SimpleProducer(int allocSize, int numAllocs)
: allocSize_(allocSize), numAllocs_(numAllocs) {}

Allocation SimpleProducer::run() const {
for (int i = 0; i < this->numAllocs_; i++) {
char *ptr = (char *)calloc(this->allocSize_, sizeof(char));
if (ptr == NULL) {
std::cout << "allocation failed" << std::endl;
}
free(ptr);
}
return std::move(Allocation());
}

void swap(Allocation& a1, Allocation& a2) {
a1.toFree_.swap(a2.toFree_);
std::swap(a1.freeAfter_, a2.freeAfter_);
}

// Vector Producer

VectorProducer::VectorProducer(int vectorSize,
std::chrono::duration<double> lifetime)
: vectorSize_(vectorSize), lifetime_(lifetime), shouldFree_(true) {}

VectorProducer::VectorProducer(int vectorSize)
: vectorSize_(vectorSize), lifetime_(0.0), shouldFree_(true) {}

Allocation VectorProducer::run() const {
void *ptr = malloc(1);
size_t currSize = 1;
while (currSize < this->vectorSize_) {
free(ptr);
currSize *= 2;
ptr = malloc(currSize);
}
if (this->shouldFree_) {
free(ptr);
return Allocation();
} else {

using namespace std::chrono;
high_resolution_clock::time_point t = high_resolution_clock::now();
high_resolution_clock::duration d =
duration_cast<high_resolution_clock::duration>(this->lifetime_);
t += d;
return std::move(Allocation(std::vector<void *>({ptr}), t));
}
}
69 changes: 69 additions & 0 deletions stress_test/Producers.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
#pragma once

#include <chrono>
#include <vector>

class Allocation {
public:
// sorts based on [freeAfter] field
bool operator<(const Allocation& that) const;
bool operator>(const Allocation& that) const;
// true iff [this->toFree_] is empty
bool isEmpty() const;
std::chrono::high_resolution_clock::time_point freeAfter() const;
Allocation(std::vector<void *> toFree,
std::chrono::high_resolution_clock::time_point freeAfter);
// makes an allocation such that [isEmpty()] is true
Allocation() = default;

// Disable copy constructor: whoever owns the Allocation should deallocate it
Allocation(Allocation const &) = delete;
Allocation &operator=(Allocation const &) = delete;

// must define a move constructor since we deleted the copy constructor
Allocation(Allocation &&) = default;
Allocation &operator=(Allocation &&) = default;

// The destructor deallocates the memory in [toFree_]
~Allocation();

// needed to sort
friend void swap(Allocation& a1, Allocation& a2);

private:
std::vector<void *> toFree_;
// absolute time after which this should be freed
std::chrono::high_resolution_clock::time_point freeAfter_;
};

class Producer {
public:
virtual Allocation run() const = 0;
};

// allocates a vector of size [sz] and then frees it
class VectorProducer : public Producer {
public:
Allocation run() const;
// allocate, and then free after [lifetime] has elapsed
VectorProducer(int vectorSize, std::chrono::duration<double> lifetime);
// allocate and then free immediately
VectorProducer(int vectorSize);

private:
int vectorSize_;
std::chrono::duration<double> lifetime_;
bool shouldFree_;
};

/* allocates a block of size [alloc_sz], and then immediately frees it. Repeats
* this [n_allocs] times. */
class SimpleProducer : public Producer {
public:
Allocation run() const;
SimpleProducer(int allocSize, int numAllocs);

private:
int allocSize_;
int numAllocs_;
};
27 changes: 27 additions & 0 deletions stress_test/ToFreeQueue.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
#include "ToFreeQueue.h"

#include <chrono>
#include <mutex>
#include <thread>

void ToFreeQueue::free() {
std::lock_guard<std::mutex> guard(this->lock_);

while (!this->q_.empty() && this->q_.top().freeAfter() <
std::chrono::high_resolution_clock::now()) {
this->q_.pop();
}
}

void ToFreeQueue::freeIgnoreLifetime() {
std::lock_guard<std::mutex> guard(this->lock_);

while (!this->q_.empty()) {
this->q_.pop();
}
}

void ToFreeQueue::addToFree(Allocation a) {
std::lock_guard<std::mutex> guard(this->lock_);
this->q_.push(std::move(a));
}
22 changes: 22 additions & 0 deletions stress_test/ToFreeQueue.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
#pragma once

#include <queue>
#include <thread>

#include "Producers.h"

class ToFreeQueue {
public:
// frees all allocations whose lifetime has elapsed
void free();
// free all allocations, even if the lifetime hasn't expired
void freeIgnoreLifetime();
// Add an allocation to be freed after a particular time
void addToFree(Allocation a);

private:
std::mutex lock_;
std::priority_queue<Allocation, std::vector<Allocation>,
std::greater<Allocation>>
q_;
};