From 4f42dbfc3a5fbc85d08ce0d0d90c89c05df26085 Mon Sep 17 00:00:00 2001 From: Tyler Etzel Date: Thu, 31 May 2018 16:32:27 -0700 Subject: [PATCH 1/2] single threaded test framework - mixer selects which producers to run - two different producers with different allocation patterns --- CMakeLists.txt | 4 ++ stress_test/Main.cpp | 30 +++++++++++++++ stress_test/Mixer.cpp | 34 +++++++++++++++++ stress_test/Mixer.h | 25 +++++++++++++ stress_test/Producers.cpp | 77 +++++++++++++++++++++++++++++++++++++++ stress_test/Producers.h | 68 ++++++++++++++++++++++++++++++++++ 6 files changed, 238 insertions(+) create mode 100644 stress_test/Main.cpp create mode 100644 stress_test/Mixer.cpp create mode 100644 stress_test/Mixer.h create mode 100644 stress_test/Producers.cpp create mode 100644 stress_test/Producers.h diff --git a/CMakeLists.txt b/CMakeLists.txt index 6c3354b..887a793 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -6,3 +6,7 @@ find_package(Threads) add_executable(memsetVsMadvise memsetVsMadvise.cpp) set_property(TARGET memsetVsMadvise PROPERTY CXX_STANDARD 11) target_link_libraries(memsetVsMadvise PRIVATE ${CMAKE_THREAD_LIBS_INIT} gflags) + +add_executable(stress stress_test/Main.cpp stress_test/Producers.cpp stress_test/Mixer.cpp) +set_property(TARGET stress PROPERTY CXX_STANDARD 14) +target_link_libraries(stress PRIVATE gflags) diff --git a/stress_test/Main.cpp b/stress_test/Main.cpp new file mode 100644 index 0000000..29c9c65 --- /dev/null +++ b/stress_test/Main.cpp @@ -0,0 +1,30 @@ +#include +#include +#include + +#include + +#include "Mixer.h" + +DEFINE_int32(num_producers, 100, "number of producers to run"); + +int main(int argc, char **argv) { + + gflags::ParseCommandLineFlags(&argc, &argv, true); + + vector> producers; + producers.push_back(std::move(std::make_unique(8, 100000))); + producers.push_back(std::move(std::make_unique( + 100000, std::chrono::duration(1.0)))); + + using namespace std::chrono; + + Mixer m(std::move(producers), FLAGS_num_producers); + + high_resolution_clock::time_point beginTime = high_resolution_clock::now(); + m.run(); + high_resolution_clock::time_point endTime = high_resolution_clock::now(); + + duration span = duration_cast>(endTime - beginTime); + std::cout << "Elapsed time: " << span.count() << std::endl; +} diff --git a/stress_test/Mixer.cpp b/stress_test/Mixer.cpp new file mode 100644 index 0000000..83f02b0 --- /dev/null +++ b/stress_test/Mixer.cpp @@ -0,0 +1,34 @@ +#include "Mixer.h" + +Mixer::Mixer(vector> producers, int numProducers) + : producers_(move(producers)), producersRemaining_(numProducers), + producerIndexPicker_(0, this->producers_.size() - 1) {} + +// Picks next producer for the mixer to run. Currently uniform random choice +const Producer &Mixer::pick() { + int producerIndex = this->producerIndexPicker_(this->generator_); + return *(this->producers_[producerIndex]); +} + +void Mixer::run() { + while (this->producersRemaining_ > 0) { + if (this->allocated_.size() > 0 && + this->allocated_.back().freeAfter() < + std::chrono::high_resolution_clock::now()) { + // deallocate something if it's lifetime has expired + this->allocated_.pop_back(); + } else { + // otherwise run a random producer + Allocation a = this->pick().run(); + if (!a.isEmpty()) { + this->allocated_.push_back(std::move(a)); + sort(rbegin(this->allocated_), rend(this->allocated_)); + } + producersRemaining_--; + } + } + // cleanup remaining allocated things immediately, regardless of lifetime + while (!this->allocated_.empty()) { + this->allocated_.pop_back(); + } +} diff --git a/stress_test/Mixer.h b/stress_test/Mixer.h new file mode 100644 index 0000000..d58cecc --- /dev/null +++ b/stress_test/Mixer.h @@ -0,0 +1,25 @@ +#pragma once + +#include +#include + +#include "Producers.h" + +using std::unique_ptr; +using std::vector; + +class Mixer { +public: + void run(); + Mixer(vector> producers, int numProducers); + +private: + /* maintains reverse-sorted order by lifetime; i.e. [push_back] yields the + * allocation that should be deallocated the soonest */ + vector allocated_; + vector> producers_; + int producersRemaining_; + const Producer &pick(); + std::uniform_int_distribution producerIndexPicker_; + std::default_random_engine generator_; +}; diff --git a/stress_test/Producers.cpp b/stress_test/Producers.cpp new file mode 100644 index 0000000..ace9c40 --- /dev/null +++ b/stress_test/Producers.cpp @@ -0,0 +1,77 @@ +#include "Producers.h" + +#include + +// Allocation + +bool Allocation::operator<(const Allocation &that) const { + return this->toFree_ < that.toFree_; +} + +bool Allocation::isEmpty() const { return this->toFree_.size() == 0; } + +std::chrono::high_resolution_clock::time_point Allocation::freeAfter() const { + return this->freeAfter_; +} + +Allocation::Allocation(std::vector toFree, + std::chrono::high_resolution_clock::time_point freeAfter) + : toFree_(toFree), freeAfter_(freeAfter) {} + +Allocation::~Allocation() { + for (auto it = begin(this->toFree_); it != end(this->toFree_); ++it) { + free(*it); + } +} + +// Simple Producer + +SimpleProducer::SimpleProducer(int allocSize, int numAllocs) + : allocSize_(allocSize), numAllocs_(numAllocs) {} + +Allocation SimpleProducer::run() const { + for (int i = 0; i < this->numAllocs_; i++) { + char *ptr = (char *)calloc(this->allocSize_, sizeof(char)); + if (ptr == NULL) { + std::cout << "allocation failed" << std::endl; + } + free(ptr); + } + return std::move(Allocation()); +} + +void swap(Allocation &a1, Allocation &a2) { + a1.toFree_.swap(a2.toFree_); + std::swap(a1.freeAfter_, a2.freeAfter_); +} + +// Vector Producer + +VectorProducer::VectorProducer(int vectorSize, + std::chrono::duration lifetime) + : vectorSize_(vectorSize), lifetime_(lifetime), shouldFree_(true) {} + +VectorProducer::VectorProducer(int vectorSize) + : vectorSize_(vectorSize), lifetime_(0.0), shouldFree_(true) {} + +Allocation VectorProducer::run() const { + void *ptr = malloc(1); + size_t currSize = 1; + while (currSize < this->vectorSize_) { + free(ptr); + currSize *= 2; + ptr = malloc(currSize); + } + if (this->shouldFree_) { + free(ptr); + return Allocation(); + } else { + + using namespace std::chrono; + high_resolution_clock::time_point t = high_resolution_clock::now(); + high_resolution_clock::duration d = + duration_cast(this->lifetime_); + t += d; + return std::move(Allocation(std::vector({ptr}), t)); + } +} diff --git a/stress_test/Producers.h b/stress_test/Producers.h new file mode 100644 index 0000000..0c25272 --- /dev/null +++ b/stress_test/Producers.h @@ -0,0 +1,68 @@ +#pragma once + +#include +#include + +class Allocation { +public: + // sorts based on [freeAfter] field + bool operator<(const Allocation &that) const; + // true iff [this->toFree_] is empty + bool isEmpty() const; + std::chrono::high_resolution_clock::time_point freeAfter() const; + Allocation(std::vector toFree, + std::chrono::high_resolution_clock::time_point freeAfter); + // makes an allocation such that [isEmpty()] is true + Allocation() = default; + + // Disable copy constructor: whoever owns the Allocation should deallocate it + Allocation(Allocation const &) = delete; + Allocation &operator=(Allocation const &) = delete; + + // must define a move constructor since we deleted the copy constructor + Allocation(Allocation&&) = default; + Allocation& operator=(Allocation&&) = default; + + // The destructor deallocates the memory in [toFree_] + ~Allocation(); + + // needed to sort + friend void swap(Allocation &a1, Allocation &a2); + +private: + std::vector toFree_; + // absolute time after which this should be freed + std::chrono::high_resolution_clock::time_point freeAfter_; +}; + +class Producer { +public: + virtual Allocation run() const = 0; +}; + +// allocates a vector of size [sz] and then frees it +class VectorProducer : public Producer { +public: + Allocation run() const; + // allocate, and then free after [lifetime] has elapsed + VectorProducer(int vectorSize, std::chrono::duration lifetime); + // allocate and then free immediately + VectorProducer(int vectorSize); + +private: + int vectorSize_; + std::chrono::duration lifetime_; + bool shouldFree_; +}; + +/* allocates a block of size [alloc_sz], and then immediately frees it. Repeats + * this [n_allocs] times. */ +class SimpleProducer : public Producer { +public: + Allocation run() const; + SimpleProducer(int allocSize, int numAllocs); + +private: + int allocSize_; + int numAllocs_; +}; From 59366ed85eb46c5959de2e79b2b075e507316336 Mon Sep 17 00:00:00 2001 From: Tyler Etzel Date: Tue, 5 Jun 2018 16:31:44 -0700 Subject: [PATCH 2/2] Added multithreading - Mixer creates producer consumer relations: picks a random producer, and then a random consumer to free the memory allocated in the producer. --- CMakeLists.txt | 2 +- stress_test/Main.cpp | 46 ++++++++++++++++++++++++++++++------- stress_test/Mixer.cpp | 43 +++++++++++++++++----------------- stress_test/Mixer.h | 21 +++++++++++++---- stress_test/Producers.cpp | 12 ++++++---- stress_test/Producers.h | 15 ++++++------ stress_test/ToFreeQueue.cpp | 27 ++++++++++++++++++++++ stress_test/ToFreeQueue.h | 22 ++++++++++++++++++ 8 files changed, 141 insertions(+), 47 deletions(-) create mode 100644 stress_test/ToFreeQueue.cpp create mode 100644 stress_test/ToFreeQueue.h diff --git a/CMakeLists.txt b/CMakeLists.txt index 887a793..75682f6 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -7,6 +7,6 @@ add_executable(memsetVsMadvise memsetVsMadvise.cpp) set_property(TARGET memsetVsMadvise PROPERTY CXX_STANDARD 11) target_link_libraries(memsetVsMadvise PRIVATE ${CMAKE_THREAD_LIBS_INIT} gflags) -add_executable(stress stress_test/Main.cpp stress_test/Producers.cpp stress_test/Mixer.cpp) +add_executable(stress stress_test/Main.cpp stress_test/Producers.cpp stress_test/Mixer.cpp stress_test/ToFreeQueue.cpp) set_property(TARGET stress PROPERTY CXX_STANDARD 14) target_link_libraries(stress PRIVATE gflags) diff --git a/stress_test/Main.cpp b/stress_test/Main.cpp index 29c9c65..9af4caf 100644 --- a/stress_test/Main.cpp +++ b/stress_test/Main.cpp @@ -1,5 +1,6 @@ #include #include +#include #include #include @@ -7,22 +8,51 @@ #include "Mixer.h" DEFINE_int32(num_producers, 100, "number of producers to run"); +DEFINE_int32(num_threads, 1, "number of threads to run"); -int main(int argc, char **argv) { +using std::shared_ptr; +using std::vector; + +void createAndRunMixer(vector> producers, int me, + vector> toFreeQueues) { + Mixer m(producers, FLAGS_num_producers, me, toFreeQueues); + m.run(); +} +int main(int argc, char **argv) { gflags::ParseCommandLineFlags(&argc, &argv, true); - vector> producers; - producers.push_back(std::move(std::make_unique(8, 100000))); - producers.push_back(std::move(std::make_unique( - 100000, std::chrono::duration(1.0)))); + // Initialize producers + vector> producers; + producers.push_back( + shared_ptr(new SimpleProducer(8, 100000))); + producers.push_back(shared_ptr( + new VectorProducer(100000, std::chrono::duration(1.0)))); + + // Set up a work queue for each thread + vector threads; + vector> toFreeQueues; + for (int i = 0; i < FLAGS_num_threads; i++) { + auto toFreeQ = shared_ptr(new ToFreeQueue()); + toFreeQueues.push_back(toFreeQ); + } + + for (int i = 0; i < FLAGS_num_threads; i++) { + // each thread gets an arbitrary id given by [i] + threads.push_back( + std::thread(createAndRunMixer, producers, i, toFreeQueues)); + } using namespace std::chrono; - Mixer m(std::move(producers), FLAGS_num_producers); - high_resolution_clock::time_point beginTime = high_resolution_clock::now(); - m.run(); + for (auto& t : threads) { + t.join(); + } + // Cleanup any remaining memory + for (int i = 0; i < FLAGS_num_threads; i++) { + toFreeQueues[i]->freeIgnoreLifetime(); + } high_resolution_clock::time_point endTime = high_resolution_clock::now(); duration span = duration_cast>(endTime - beginTime); diff --git a/stress_test/Mixer.cpp b/stress_test/Mixer.cpp index 83f02b0..6c835e4 100644 --- a/stress_test/Mixer.cpp +++ b/stress_test/Mixer.cpp @@ -1,34 +1,33 @@ #include "Mixer.h" -Mixer::Mixer(vector> producers, int numProducers) - : producers_(move(producers)), producersRemaining_(numProducers), - producerIndexPicker_(0, this->producers_.size() - 1) {} +Mixer::Mixer(vector> producers, int numProducers, int me, + vector> toFreeQueues) + : producers_(producers), producersRemaining_(numProducers), + toFreeQueues_(toFreeQueues), me_(me), + producerIdPicker_(0, producers.size() - 1), + consumerIdPicker_(0, toFreeQueues.size() - 1) {} // Picks next producer for the mixer to run. Currently uniform random choice -const Producer &Mixer::pick() { - int producerIndex = this->producerIndexPicker_(this->generator_); +const Producer& Mixer::pickProducer() { + int producerIndex = this->producerIdPicker_(this->generator_); return *(this->producers_[producerIndex]); } +// Picks next producer for the mixer to run. Currently uniform random choice +ToFreeQueue& Mixer::pickConsumer() { + int consumerIndex = this->consumerIdPicker_(this->generator_); + return *(this->toFreeQueues_[consumerIndex]); +} + void Mixer::run() { while (this->producersRemaining_ > 0) { - if (this->allocated_.size() > 0 && - this->allocated_.back().freeAfter() < - std::chrono::high_resolution_clock::now()) { - // deallocate something if it's lifetime has expired - this->allocated_.pop_back(); - } else { - // otherwise run a random producer - Allocation a = this->pick().run(); - if (!a.isEmpty()) { - this->allocated_.push_back(std::move(a)); - sort(rbegin(this->allocated_), rend(this->allocated_)); - } - producersRemaining_--; + this->toFreeQueues_[this->me_]->free(); + // otherwise run a random producer + Allocation a = this->pickProducer().run(); + if (!a.isEmpty()) { + this->pickConsumer().addToFree(std::move(a)); } + producersRemaining_--; } - // cleanup remaining allocated things immediately, regardless of lifetime - while (!this->allocated_.empty()) { - this->allocated_.pop_back(); - } + // Main loop will eventually cleanup memory } diff --git a/stress_test/Mixer.h b/stress_test/Mixer.h index d58cecc..ca5ada7 100644 --- a/stress_test/Mixer.h +++ b/stress_test/Mixer.h @@ -4,22 +4,33 @@ #include #include "Producers.h" +#include "ToFreeQueue.h" -using std::unique_ptr; +using std::shared_ptr; using std::vector; class Mixer { public: void run(); - Mixer(vector> producers, int numProducers); + Mixer(vector> producers, int numProducers, int me, + vector> toFreeQueues); private: /* maintains reverse-sorted order by lifetime; i.e. [push_back] yields the * allocation that should be deallocated the soonest */ vector allocated_; - vector> producers_; + vector> producers_; int producersRemaining_; - const Producer &pick(); - std::uniform_int_distribution producerIndexPicker_; + // the thread id that this mixer is running on + int me_; + // work queues for each thread indexed by thread number + vector> toFreeQueues_; + // Picks next producer for the mixer to run. Currently uniform random choice. + const Producer& pickProducer(); + // Picks a consumer to free memory allocated by a producer. Currently uniform + // random choice. + ToFreeQueue& pickConsumer(); + std::uniform_int_distribution producerIdPicker_; + std::uniform_int_distribution consumerIdPicker_; std::default_random_engine generator_; }; diff --git a/stress_test/Producers.cpp b/stress_test/Producers.cpp index ace9c40..d830ba5 100644 --- a/stress_test/Producers.cpp +++ b/stress_test/Producers.cpp @@ -4,10 +4,14 @@ // Allocation -bool Allocation::operator<(const Allocation &that) const { +bool Allocation::operator<(const Allocation& that) const { return this->toFree_ < that.toFree_; } +bool Allocation::operator>(const Allocation& that) const { + return !(*this < that); +} + bool Allocation::isEmpty() const { return this->toFree_.size() == 0; } std::chrono::high_resolution_clock::time_point Allocation::freeAfter() const { @@ -40,9 +44,9 @@ Allocation SimpleProducer::run() const { return std::move(Allocation()); } -void swap(Allocation &a1, Allocation &a2) { - a1.toFree_.swap(a2.toFree_); - std::swap(a1.freeAfter_, a2.freeAfter_); +void swap(Allocation& a1, Allocation& a2) { + a1.toFree_.swap(a2.toFree_); + std::swap(a1.freeAfter_, a2.freeAfter_); } // Vector Producer diff --git a/stress_test/Producers.h b/stress_test/Producers.h index 0c25272..0b93f9f 100644 --- a/stress_test/Producers.h +++ b/stress_test/Producers.h @@ -6,7 +6,8 @@ class Allocation { public: // sorts based on [freeAfter] field - bool operator<(const Allocation &that) const; + bool operator<(const Allocation& that) const; + bool operator>(const Allocation& that) const; // true iff [this->toFree_] is empty bool isEmpty() const; std::chrono::high_resolution_clock::time_point freeAfter() const; @@ -19,15 +20,15 @@ class Allocation { Allocation(Allocation const &) = delete; Allocation &operator=(Allocation const &) = delete; - // must define a move constructor since we deleted the copy constructor - Allocation(Allocation&&) = default; - Allocation& operator=(Allocation&&) = default; + // must define a move constructor since we deleted the copy constructor + Allocation(Allocation &&) = default; + Allocation &operator=(Allocation &&) = default; - // The destructor deallocates the memory in [toFree_] + // The destructor deallocates the memory in [toFree_] ~Allocation(); - // needed to sort - friend void swap(Allocation &a1, Allocation &a2); + // needed to sort + friend void swap(Allocation& a1, Allocation& a2); private: std::vector toFree_; diff --git a/stress_test/ToFreeQueue.cpp b/stress_test/ToFreeQueue.cpp new file mode 100644 index 0000000..f9ce95e --- /dev/null +++ b/stress_test/ToFreeQueue.cpp @@ -0,0 +1,27 @@ +#include "ToFreeQueue.h" + +#include +#include +#include + +void ToFreeQueue::free() { + std::lock_guard guard(this->lock_); + + while (!this->q_.empty() && this->q_.top().freeAfter() < + std::chrono::high_resolution_clock::now()) { + this->q_.pop(); + } +} + +void ToFreeQueue::freeIgnoreLifetime() { + std::lock_guard guard(this->lock_); + + while (!this->q_.empty()) { + this->q_.pop(); + } +} + +void ToFreeQueue::addToFree(Allocation a) { + std::lock_guard guard(this->lock_); + this->q_.push(std::move(a)); +} diff --git a/stress_test/ToFreeQueue.h b/stress_test/ToFreeQueue.h new file mode 100644 index 0000000..b30b78b --- /dev/null +++ b/stress_test/ToFreeQueue.h @@ -0,0 +1,22 @@ +#pragma once + +#include +#include + +#include "Producers.h" + +class ToFreeQueue { +public: + // frees all allocations whose lifetime has elapsed + void free(); + // free all allocations, even if the lifetime hasn't expired + void freeIgnoreLifetime(); + // Add an allocation to be freed after a particular time + void addToFree(Allocation a); + +private: + std::mutex lock_; + std::priority_queue, + std::greater> + q_; +};