Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Mixer redesign #6

Open
wants to merge 6 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -6,3 +6,12 @@ find_package(Threads)
add_executable(memsetVsMadvise memsetVsMadvise.cpp)
set_property(TARGET memsetVsMadvise PROPERTY CXX_STANDARD 11)
target_link_libraries(memsetVsMadvise PRIVATE ${CMAKE_THREAD_LIBS_INIT} gflags)

SET(JEMALLOC_COMPILE_FLAGS "-L/usr/local/lib")
SET(JEMALLOC_LINK_FLAGS "-ljemalloc -lm -lpthread")

SET(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} ${JEMALLOC_COMPILE_FLAGS}")
SET(CMAKE_EXE_LINKER_FLAGS "${CMAKE_EXE_LINKER_FLAGS} ${JEMALLOC_LINK_FLAGS}")
add_executable(stress stress_test/Main.cpp stress_test/Producers.cpp stress_test/Mixer.cpp stress_test/ThreadObject.cpp stress_test/Distribution.cpp stress_test/Allocation.cpp)
set_property(TARGET stress PROPERTY CXX_STANDARD 14)
target_link_libraries(stress PRIVATE gflags)
20 changes: 20 additions & 0 deletions stress_test/Allocation.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
#include "Allocation.h"

bool Allocation::operator<(const Allocation &that) const {
return this->freeAfterAbsolute < that.freeAfterAbsolute;
}

bool Allocation::operator>(const Allocation &that) const {
return this->freeAfterAbsolute > that.freeAfterAbsolute;
}

bool Allocation::isEmpty() const { return this->toFree_.size() == 0; }

Allocation::Allocation(std::vector<void *> toFree, int freeAfterArg)
: toFree_(toFree), freeAfterRelative(freeAfterArg), freeAfterAbsolute(0) {}

void Allocation::clear() const {
for (auto &ptr : this->toFree_) {
free(ptr);
}
}
31 changes: 31 additions & 0 deletions stress_test/Allocation.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
#pragma once

#include <vector>

// simple wrapper that pairs a group of allocated blocks with a lifetime

class Allocation {
public:
// sorts based on [freeAfterAbsolute] field
bool operator<(const Allocation &that) const;
bool operator>(const Allocation &that) const;
// true iff [this->toFree_] is empty
bool isEmpty() const;
// free the memory stored within
void clear() const;

Allocation() = default;

/* [freeAfter] is a number of phases, according to the thread that is
* responsible for this allocation */
Allocation(std::vector<void *> toFree, int freeAfter);

// number of phases to live for relative to allocation
int freeAfterRelative;
// absolute phase number at which to free, based on a particular threads clock
int freeAfterAbsolute;

private:
std::vector<void *> toFree_;
// absolute time after which this should be freed
};
50 changes: 50 additions & 0 deletions stress_test/Distribution.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
#include "Distribution.h"

#include <algorithm>
#include <fstream>
#include <iostream>
#include <sstream>
#include <string>

#include <gflags/gflags.h>

DEFINE_int64(max_size_class, 10 * k1KB, "max size class to allocate");

SizeClass parseSizeClass(const std::string &ln) {
std::istringstream strStream(ln);
size_t sizeClass;
double freq;
if (!(strStream >> sizeClass >> freq)) {
std::cout << "File format invalid. Failed to following line:\n\e[0;31m"
<< ln << "\e[0m" << std::endl;
exit(1);
}
if (freq > 1.0) {
std::cout << "Warning: this looks off; frequency greater than 1.0"
<< std::endl;
freq = 1.0;
}
return {sizeClass, freq};
}

Distribution parseDistribution(const char *fileName) {
std::string line;
std::ifstream f(fileName);

if (!f) {
std::cout << "Specified file '" << fileName << "' not found." << std::endl;
exit(1);
}

Distribution d;

while (std::getline(f, line)) {
SizeClass sz = parseSizeClass(line);
if (sz.size <= FLAGS_max_size_class) {
d.push_back(sz);
}
}

std::sort(begin(d), end(d));
return d;
}
15 changes: 15 additions & 0 deletions stress_test/Distribution.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
#pragma once

#include <vector>

#include "SizeConstants.h"

struct SizeClass {
size_t size;
double freq;
bool operator<(const SizeClass &that) const { return this->freq < that.freq; }
};

typedef std::vector<SizeClass> Distribution;

Distribution parseDistribution(const char *fileName);
77 changes: 77 additions & 0 deletions stress_test/Main.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
#include <chrono>
#include <iostream>
#include <string>
#include <thread>
#include <vector>

#include <gflags/gflags.h>
#include <jemalloc/jemalloc.h>

#include "Distribution.h"
#include "Mixer.h"

DEFINE_int32(num_threads, 1, "number of threads to run");
DEFINE_bool(print_malloc_stats, false, "print out malloc stats after running");
DEFINE_string(distribution_file, "", "path to distribution file");
static bool validateDistributionFile(const char *flagName,
const std::string &val) {
return val.length() != 0;
}
DEFINE_validator(distribution_file, &validateDistributionFile);

using std::shared_ptr;
using std::vector;

void createAndRunMixer(const Distribution *distr, int me,
vector<shared_ptr<ThreadObject>> threadObjects) {
Mixer m(distr, me, threadObjects);
m.run();
}

double run() {
initInstBurner();
Distribution distr = parseDistribution(FLAGS_distribution_file.c_str());

// Set up a work queue for each thread
vector<std::thread> threads;
vector<shared_ptr<ThreadObject>> threadObjects;
for (int i = 0; i < FLAGS_num_threads; i++) {
auto threadObject = shared_ptr<ThreadObject>(new ThreadObject());
threadObjects.push_back(threadObject);
}

for (int i = 0; i < FLAGS_num_threads; i++) {
// each thread gets an arbitrary id given by [i]
threads.push_back(std::thread(createAndRunMixer, &distr, i, threadObjects));
}

using namespace std::chrono;

high_resolution_clock::time_point beginTime = high_resolution_clock::now();
for (auto &t : threads) {
t.join();
}

// Cleanup any remaining memory
for (auto &t : threadObjects) {
t->freeIgnoreLifetime();
}
high_resolution_clock::time_point endTime = high_resolution_clock::now();
duration<double> span = duration_cast<duration<double>>(endTime - beginTime);

return span.count();
}

int main(int argc, char **argv) {
gflags::ParseCommandLineFlags(&argc, &argv, true);
double time = run();

if (FLAGS_print_malloc_stats) {
if (je_mallctl("thread.tcache.flush", NULL, NULL, NULL, 0)) {
std::cout << "je_mallctl failed. Exiting..." << std::endl;
}
je_malloc_stats_print(NULL, NULL, NULL);
}

std::cout << "Elapsed time: " << time << std::endl;
}
Loading