Skip to content

Commit

Permalink
Merge pull request #2 from alexKrauseTUD/rework
Browse files Browse the repository at this point in the history
PR from wip-master to master
  • Loading branch information
alexKrauseTUD authored Nov 14, 2023
2 parents af698fe + 8d39efe commit c1d3474
Show file tree
Hide file tree
Showing 11 changed files with 3,676 additions and 1,149 deletions.
2 changes: 1 addition & 1 deletion CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -43,4 +43,4 @@ target_link_libraries(memoLib ibverbs)
include_directories(${CMAKE_SOURCE_DIR}/ext/memoRDMA ${CMAKE_SOURCE_DIR}/ext/memoRDMA/include)

add_executable(disaggDataProvider ${SOURCES} ${HEADERS})
target_link_libraries(disaggDataProvider "pthread" "memoLib" ${OpenMP_CXX_LIBRARIES})
target_link_libraries(disaggDataProvider "pthread" "memoLib" "numa" ${OpenMP_CXX_LIBRARIES})
49 changes: 49 additions & 0 deletions include/Benchmarks.hpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
#pragma once

#include <chrono>
#include <cstdint>
#include <cstring>
#include <fstream>
#include <functional>
#include <iomanip>
#include <iostream>
#include <sstream>
#include <string>
#include <vector>

#include "Worker.hpp"

extern std::chrono::duration<double> waitingTime;
extern std::chrono::duration<double> workingTime;

class Benchmarks {
public:
static Benchmarks& getInstance() {
static Benchmarks instance;
return instance;
}
~Benchmarks();

void executeAllBenchmarks();

static const size_t OPTIMAL_BLOCK_SIZE = 65536;

private:
Benchmarks();

void execLocalBenchmark(std::string& logName, std::string locality);
void execRemoteBenchmark(std::string& logName, std::string locality);
void execLocalBenchmarkMW(std::string& logName, std::string locality);
void execRemoteBenchmarkMW(std::string& logName, std::string locality);

template <bool filter>
void execUPIBenchmark();

void execRDMABenchmark();
void execRDMAHashJoinBenchmark();
void execRDMAHashJoinPGBenchmark();
void execRDMAHashJoinStarBenchmark();

static const size_t WORKER_NUMBER = 8;
Worker workers[WORKER_NUMBER];
};
180 changes: 168 additions & 12 deletions include/Column.h
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
#include <cstring>
#include <mutex>
#include <Logger.h>
#include <numa.h>

#include "DataCatalog.h"

Expand Down Expand Up @@ -40,7 +41,7 @@ struct col_t {
(reinterpret_cast<char*>(data) == reinterpret_cast<char*>(col->current_end)) // Last readable element reached
) {
std::unique_lock<std::mutex> lk(col->iteratorLock);
// std::cout << "Stalling <" << (chunk_iterator ? "Chunked>" : "Full>") << std::endl;
LOG_DEBUG2("Stalling <" << (chunk_iterator ? "Chunked>" : "Full>") << std::endl;)
col->iterator_data_available.wait(lk, [this] { return reinterpret_cast<char*>(data) < reinterpret_cast<char*>(col->current_end); });
}
}
Expand Down Expand Up @@ -87,7 +88,8 @@ struct col_t {
std::condition_variable iterator_data_available;

~col_t() {
free(data);
// May be a problem when freeing memory not allocated with numa_alloc
numa_free(data, sizeInBytes);
}

template <typename T>
Expand All @@ -96,7 +98,16 @@ struct col_t {
size = _size;
data = aligned_alloc(alignof(T), _size * sizeof(T));
sizeInBytes = _size * sizeof(T);
// std::cout << "[col_t] Allocated " << _size * sizeof(T) << " bytes." << std::endl;
LOG_DEBUG2("[col_t] Allocated " << _size * sizeof(T) << " bytes." << std::endl;)
}
}

template <typename T>
void allocate_on_numa(size_t _size, int node) {
if (data == nullptr) {
size = _size;
data = numa_alloc_onnode(_size * sizeof(T), node);
sizeInBytes = _size * sizeof(T);
}
}

Expand Down Expand Up @@ -136,30 +147,56 @@ struct col_t {
break;
}
default: {
std::cout << "[col_t] Error allocating data: Invalid datatype submitted. Nothing was allocated." << std::endl;
LOG_ERROR("[col_t] Error allocating data: Invalid datatype submitted. Nothing was allocated." << std::endl;)
}
}

memset(reinterpret_cast<char*>(data), 0, _size);
current_end = data;
}

void allocate_on_numa(col_data_t type, size_t _size, int node) {
switch (type) {
case col_data_t::gen_smallint: {
allocate_on_numa<uint8_t>(_size, node);
break;
}
case col_data_t::gen_bigint: {
allocate_on_numa<uint64_t>(_size, node);
break;
}
case col_data_t::gen_float: {
allocate_on_numa<float>(_size, node);
break;
}
case col_data_t::gen_double: {
allocate_on_numa<double>(_size, node);
break;
}
default: {
LOG_ERROR("[col_t] Error allocating data: Invalid datatype submitted. Nothing was allocated." << std::endl;)
}
}

// memset(reinterpret_cast<char*>(data), 0, _size);
current_end = data;
}

void request_data(bool fetch_complete_column) {
std::unique_lock<std::mutex> _lk(iteratorLock);
if (is_complete || requested_chunks > received_chunks) {
// std::cout << "<data request ignored: " << (is_complete ? "is_complete" : "not_complete") << ">" << std::endl;
LOG_DEBUG2("<data request ignored: " << (is_complete ? "is_complete" : "not_complete") << ">" << std::endl;)
// Do Nothing, ignore.
return;
}
++requested_chunks;

// std::cout << "Col is requesting a new chunk." << std::endl;
DataCatalog::getInstance().fetchColStub(1, ident, fetch_complete_column);
}

void append_chunk(size_t offset, size_t chunkSize, char* remoteData) {
if (data == nullptr) {
std::cout << "!!! Implement allocation handling in append_chunk, aborting." << std::endl;
LOG_WARNING("!!! Implement allocation handling in append_chunk, aborting." << std::endl;)
return;
}
memcpy(reinterpret_cast<char*>(data) + offset, remoteData, chunkSize);
Expand Down Expand Up @@ -216,7 +253,7 @@ struct col_t {
}
default: {
using namespace memordma;
Logger::getInstance() << LogLevel::ERROR << "Saw gen_void but its not handled." << std::endl;
LOG_ERROR("Saw gen_void but its not handled." << std::endl;)
}
}
ss << " [" << (is_remote ? "remote," : "local,") << (is_complete ? "complete" : "incomplete") << "]"
Expand Down Expand Up @@ -245,22 +282,22 @@ struct col_t {
void log_to_file(std::string logfile) const {
switch (datatype) {
case col_data_t::gen_smallint: {
std::cout << "Printing uint8_t column" << std::endl;
LOG_DEBUG1("Printing uint8_t column" << std::endl;)
log_to_file_typed<uint8_t>(logfile);
break;
}
case col_data_t::gen_bigint: {
std::cout << "Printing uint64_t column" << std::endl;
LOG_DEBUG1("Printing uint64_t column" << std::endl;)
log_to_file_typed<uint64_t>(logfile);
break;
}
case col_data_t::gen_float: {
std::cout << "Printing float column" << std::endl;
LOG_DEBUG1("Printing float column" << std::endl;)
log_to_file_typed<float>(logfile);
break;
}
case col_data_t::gen_double: {
std::cout << "Printing double column" << std::endl;
LOG_DEBUG1("Printing double column" << std::endl;)
log_to_file_typed<double>(logfile);
break;
}
Expand Down Expand Up @@ -308,4 +345,123 @@ struct col_t {
log << std::endl;
log.close();
}
};

struct table_t {
public:
std::vector<col_t*> columns;
std::string ident;
size_t numCols;
size_t numRows;
size_t onNode;
size_t bufferRatio;
bool isFactTable;

explicit table_t(std::string _ident, size_t _onNode) : ident{_ident}, numCols{0}, numRows{0}, onNode{_onNode}, bufferRatio{0}, isFactTable{true} {};

table_t(std::string _ident, size_t _numCols, size_t _numRows, size_t _onNode, size_t _bufferRatio, bool _isFactTable) : ident{_ident}, numCols{_numCols}, numRows{_numRows}, onNode{_onNode}, bufferRatio{_bufferRatio}, isFactTable{_isFactTable} {
for (size_t i = 0; i < numCols; ++i) {
columns.emplace_back(new col_t);
}

std::default_random_engine generator;

uint8_t colId = 0;

for (auto& col : columns) {
col->ident = ident + "_col_" + std::to_string(colId);
col->size = numRows;
col->sizeInBytes = numRows * sizeof(uint64_t);

if (onNode != 0) {
col->is_remote = true;
col->is_complete = false;
} else {
col->is_remote = false;
col->is_complete = true;
}

col->datatype = col_data_t::gen_bigint;
col->data = numa_alloc_onnode(col->sizeInBytes, onNode);

auto data = reinterpret_cast<uint64_t*>(col->data);

if (colId == 0) {
for (size_t i = 0; i < numRows; ++i) {
data[i] = i;
}
} else {
std::uniform_int_distribution<uint64_t> distribution;

if (isFactTable) {
distribution = std::uniform_int_distribution<uint64_t>(0, (numRows * bufferRatio * 0.01) - 1);
} else {
distribution = std::uniform_int_distribution<uint64_t>(0, 100);
}

for (size_t i = 0; i < numRows; ++i) {
data[i] = distribution(generator);
}
}

if (col->is_complete) {
col->readableOffset = numRows * sizeof(uint64_t);
}

col->current_end = col->data;
DataCatalog::getInstance().add_column(col->ident, col);

++colId;
}
}

~table_t() {
for (auto col : columns) {
delete col;
}

columns.clear();
};

void addColumn(uint64_t* data, size_t elementCount) {
if (numRows == 0) {
numRows = elementCount;
} else {
if (numRows != elementCount) {
LOG_ERROR("Dimension of column does not match with table!" << std::endl;)
return;
}
}

col_t* tmp = new col_t();

tmp->ident = ident + "_col_" + std::to_string(numCols);
tmp->size = numRows;
tmp->sizeInBytes = numRows * sizeof(uint64_t);

if (onNode != 0) {
tmp->is_remote = true;
tmp->is_complete = false;
} else {
tmp->is_remote = false;
tmp->is_complete = true;
}

tmp->datatype = col_data_t::gen_bigint;
tmp->data = numa_alloc_onnode(tmp->sizeInBytes, onNode);

std::copy(data, data + elementCount, reinterpret_cast<uint64_t*>(tmp->data));

if (tmp->is_complete) {
tmp->readableOffset = numRows * sizeof(uint64_t);
}

columns.emplace_back(tmp);

++numCols;
}

col_t* getPrimaryKeyColumn() {
return columns[0];
}
};
Loading

0 comments on commit c1d3474

Please sign in to comment.