Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Better producers #5

Open
wants to merge 4 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -6,3 +6,12 @@ find_package(Threads)
add_executable(memsetVsMadvise memsetVsMadvise.cpp)
set_property(TARGET memsetVsMadvise PROPERTY CXX_STANDARD 11)
target_link_libraries(memsetVsMadvise PRIVATE ${CMAKE_THREAD_LIBS_INIT} gflags)

SET(JEMALLOC_COMPILE_FLAGS "-L/usr/local/lib")
SET(JEMALLOC_LINK_FLAGS "-ljemalloc -lm -lpthread")

SET(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} ${JEMALLOC_COMPILE_FLAGS}")
SET(CMAKE_EXE_LINKER_FLAGS "${CMAKE_EXE_LINKER_FLAGS} ${JEMALLOC_LINK_FLAGS}")
add_executable(stress stress_test/Main.cpp stress_test/Producers.cpp stress_test/Mixer.cpp stress_test/ToFreeQueue.cpp stress_test/Distribution.cpp)
set_property(TARGET stress PROPERTY CXX_STANDARD 14)
target_link_libraries(stress PRIVATE gflags)
44 changes: 44 additions & 0 deletions stress_test/Distribution.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
#include "Distribution.h"

#include <algorithm>
#include <fstream>
#include <iostream>
#include <string>
#include <sstream>

#include <iostream>

SizeClass parseSizeClass(const std::string &ln) {
std::istringstream strStream(ln);
size_t sizeClass;
double freq;
if (!(strStream >> sizeClass >> freq)) {
std::cout << "File format invalid. Failed to following line:\n\e[0;31m"
<< ln << "\e[0m" << std::endl;
exit(1);
}
if (freq > 1.0) {
std::cout << "Warning: this looks off; frequency greater than 1.0" << std::endl;
freq = 1.0;
}
return {sizeClass, freq};
}

Distribution parseDistribution(const char *fileName) {
std::string line;
std::ifstream f(fileName);

if (!f) {
std::cout << "Specified file '" << fileName << "' not found." << std::endl;
exit(1);
}

Distribution d;

while (std::getline(f, line)) {
d.push_back(parseSizeClass(line));
}

std::sort(begin(d), end(d));
return d;
}
18 changes: 18 additions & 0 deletions stress_test/Distribution.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
#pragma once

#include <cstddef>
#include <vector>

using std::size_t;

struct SizeClass {
size_t size;
double freq;
bool operator<(const SizeClass &that) const {
return this->freq < that.freq;
}
};

typedef std::vector<SizeClass> Distribution;

Distribution parseDistribution(const char *fileName);
67 changes: 67 additions & 0 deletions stress_test/Main.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
#include <chrono>
#include <iostream>
#include <string>
#include <thread>
#include <vector>

#include <gflags/gflags.h>
#include <jemalloc/jemalloc.h>

#include "Mixer.h"
#include "Distribution.h"

DEFINE_int32(num_producers, 1000, "number of producers to run on each thread");
DEFINE_int32(num_threads, 1, "number of threads to run");
DEFINE_bool(print_malloc_stats, false, "print out malloc stats after running");
DEFINE_string(distribution_file, "", "path to distribution file");
static bool validateDistributionFile(const char *flagName, const std::string &val) {
return val.length() != 0;
}
DEFINE_validator(distribution_file, &validateDistributionFile);

using std::shared_ptr;
using std::make_shared;
using std::vector;

void createAndRunMixer(const Distribution &distr, int me,
vector<shared_ptr<ToFreeQueue>> toFreeQueues) {
Mixer m(FLAGS_num_producers, distr, me, toFreeQueues);
m.run();
}

int main(int argc, char **argv) {
gflags::ParseCommandLineFlags(&argc, &argv, true);
Distribution distr = parseDistribution(FLAGS_distribution_file.c_str());

// Set up a work queue for each thread
vector<std::thread> threads;
vector<shared_ptr<ToFreeQueue>> toFreeQueues;
for (int i = 0; i < FLAGS_num_threads; i++) {
shared_ptr<ToFreeQueue> toFreeQ = make_shared<ToFreeQueue>();
toFreeQueues.push_back(toFreeQ);
}

for (int i = 0; i < FLAGS_num_threads; i++) {
// each thread gets an arbitrary id given by [i]
threads.push_back(std::thread(createAndRunMixer, distr, i, toFreeQueues));
}

using namespace std::chrono;

high_resolution_clock::time_point beginTime = high_resolution_clock::now();
for (auto it = begin(threads); it != end(threads); ++it) {
it->join();
}
// Cleanup any remaining memory
for (int i = 0; i < FLAGS_num_threads; i++) {
toFreeQueues[i]->freeIgnoreLifetime();
}
high_resolution_clock::time_point endTime = high_resolution_clock::now();
duration<double> span = duration_cast<duration<double>>(endTime - beginTime);

if (FLAGS_print_malloc_stats) {
je_malloc_stats_print(NULL, NULL, NULL);
}

std::cout << "Elapsed time: " << span.count() << std::endl;
}
77 changes: 77 additions & 0 deletions stress_test/Mixer.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
#include "Mixer.h"

#include <gflags/gflags.h>
#include <jemalloc/jemalloc.h>

DEFINE_int32(producer_duration, 10000, "scales the length of producers. Making"
"this number higher means each producer runs for a long time.");

using std::make_unique;
using std::unique_ptr;
using std::shared_ptr;
using std::vector;

Mixer::Mixer(int numProducers, const Distribution &distr, int me,
vector<shared_ptr<ToFreeQueue>> toFreeQueues)
: producersRemaining_(numProducers),
toFreeQueues_(toFreeQueues), me_(me),
consumerIdPicker_(0, toFreeQueues.size() - 1) {
this->totalWeight_ = 0.0;
this->initProducers(distr);
this->producerWeightPicker_ = std::uniform_real_distribution<double>(0.0, this->totalWeight_);
}

void Mixer::addProducer(double weight, unique_ptr<Producer> p) {
this->totalWeight_ += weight;
this->producers_.push_back(std::move(p));
this->weightArray_.push_back(this->totalWeight_);
}

void Mixer::initProducers(const Distribution &distr) {
auto oneSecond = std::chrono::duration<double>(1.0);

std::uniform_int_distribution<int> vectorInitFuzzer(1, 100);

for (auto it = begin(distr); it != end(distr); ++it) {
addProducer(it->freq / 3.0,
std::move(make_unique<SimpleProducer>(it->size, FLAGS_producer_duration)));
// provide a bit of fuzziness to vector initial value
int vectorInit = vectorInitFuzzer(this->generator_);
addProducer(it->freq / 3.0,
std::move(make_unique<VectorProducer>(FLAGS_producer_duration, oneSecond, vectorInit)));
addProducer(it->freq / 3.0,
std::move(make_unique<LinkedListProducer>(it->size, FLAGS_producer_duration, oneSecond)));
}
}

const Producer &Mixer::pickProducer() {
double r = this->producerWeightPicker_(this->generator_);
int producerIndex;
for (producerIndex = 0; producerIndex < this->weightArray_.size(); ++producerIndex) {
if (r <= weightArray_[producerIndex]) {
break;
}
}
assert(producerIndex != this->weightArray_.size());
return *(this->producers_[producerIndex]);
}

// Picks next producer for the mixer to run. Currently uniform random choice
ToFreeQueue& Mixer::pickConsumer() {
int consumerIndex = this->consumerIdPicker_(this->generator_);
return *(this->toFreeQueues_[consumerIndex]);
}

void Mixer::run() {
while (this->producersRemaining_ > 0) {
this->toFreeQueues_[this->me_]->free();
// otherwise run a random producer
Allocation a = this->pickProducer().run();
if (!a.isEmpty()) {
this->pickConsumer().addToFree(std::move(a));
}
producersRemaining_--;
}
je_mallctl("thread.tcache.flush", NULL, NULL, NULL, 0);
// Main loop will eventually cleanup memory
}
39 changes: 39 additions & 0 deletions stress_test/Mixer.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
#pragma once

#include <random>
#include <vector>

#include "Producers.h"
#include "ToFreeQueue.h"
#include "Distribution.h"

class Mixer {
public:
void run();
Mixer(int numProducers, const Distribution &distr, int me,
std::vector<std::shared_ptr<ToFreeQueue>> toFreeQueues);

private:
int producersRemaining_;
// the thread id that this mixer is running on
int me_;
// work queues for each thread indexed by thread number
std::vector<std::shared_ptr<ToFreeQueue>> toFreeQueues_;
// Picks next producer for the mixer to run
const Producer &pickProducer();
/* Picks a consumer to free memory allocated by a producer. Currently uniform
* random choice */
ToFreeQueue &pickConsumer();

std::uniform_int_distribution<int> consumerIdPicker_;
std::default_random_engine generator_;

// for picking producer with weighted random choice
std::vector<double> weightArray_;
double totalWeight_;
std::vector<std::unique_ptr<Producer>> producers_;
std::uniform_real_distribution<double> producerWeightPicker_;
// initializes [producers_], [totalWeight_], [weightArray_], and [producerWeightPicker_]
void initProducers(const Distribution &distr);
void addProducer(double weight, std::unique_ptr<Producer> p);
};
96 changes: 96 additions & 0 deletions stress_test/Producers.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
#include "Producers.h"

#include <iostream>

// Allocation

bool Allocation::operator<(const Allocation &that) const {
return this->toFree_ < that.toFree_;
}

bool Allocation::operator>(const Allocation &that) const {
return !(*this < that);
}

bool Allocation::isEmpty() const { return this->toFree_.size() == 0; }

std::chrono::high_resolution_clock::time_point Allocation::freeAfter() const {
return this->freeAfter_;
}

Allocation::Allocation(std::vector<void *> toFree,
std::chrono::high_resolution_clock::time_point freeAfter)
: toFree_(toFree), freeAfter_(freeAfter) {}

Allocation::~Allocation() {
for (auto it = begin(this->toFree_); it != end(this->toFree_); ++it) {
free(*it);
}
}

// Simple Producer

SimpleProducer::SimpleProducer(int allocSize, int numAllocs)
: allocSize_(allocSize), numAllocs_(numAllocs) {}

Allocation SimpleProducer::run() const {
for (int i = 0; i < this->numAllocs_; i++) {
char *ptr = (char *)calloc(this->allocSize_, sizeof(char));
if (ptr == NULL) {
std::cout << "allocation failed" << std::endl;
}
free(ptr);
}
return std::move(Allocation());
}

void swap(Allocation &a1, Allocation &a2) {
a1.toFree_.swap(a2.toFree_);
std::swap(a1.freeAfter_, a2.freeAfter_);
}

// Vector Producer

VectorProducer::VectorProducer(size_t vectorSize,
std::chrono::duration<double> lifetime,
size_t initialSize)
: vectorSize_(vectorSize), lifetime_(lifetime), initialSize_(initialSize) {}

std::chrono::high_resolution_clock::time_point addToNow(std::chrono::duration<double> d) {
using namespace std::chrono;
high_resolution_clock::time_point t = high_resolution_clock::now();
high_resolution_clock::duration dHighResolution =
duration_cast<high_resolution_clock::duration>(d);
t += dHighResolution;;
return t;
}

Allocation VectorProducer::run() const {

void *ptr = malloc(this->initialSize_);
size_t currSize = this->initialSize_;
while (currSize < this->vectorSize_) {
free(ptr);
currSize *= 2;
ptr = malloc(currSize);
}

return std::move(Allocation(std::vector<void *>({ptr}), addToNow(this->lifetime_)));
}

// LinkedList Producer

Allocation LinkedListProducer::run() const {

std::vector<void *> toFree;
toFree.reserve(this->numNodes_);

for (int i = 0; i < this->numNodes_; i++) {
toFree.push_back(malloc(this->nodeSize_));
}

return std::move(Allocation(toFree, addToNow(this->lifetime_)));
}
// allocate [numNodes] blocks of size [nodeSize] with lifetime [lifetime]
LinkedListProducer::LinkedListProducer(size_t nodeSize, int numNodes, std::chrono::duration<double> lifetime) :
nodeSize_(nodeSize), numNodes_(numNodes), lifetime_(lifetime) {}
Loading