Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[WIP] Support Raco User-defined Aggregates #189

Open
wants to merge 56 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
56 commits
Select commit Hold shift + click to select a range
0aecc50
change init value to init function for hash updates
bmyerz Sep 18, 2014
47f9fcc
deal temporarily with disambiguation like this
bmyerz Sep 18, 2014
9df6a18
const ref for combine function
bmyerz Sep 19, 2014
ac74936
refactor to use c++ standard hash format
bmyerz Sep 20, 2014
212dbc7
forgot a use of unordered_map without Hash
bmyerz Sep 20, 2014
cd465a2
make ascii read compatible with the new tuple
bmyerz Sep 21, 2014
e77cefd
comment out hash join shuffle
bmyerz Sep 21, 2014
9c2d2bc
use new tuple ostream binary
bmyerz Sep 22, 2014
d91b529
generic Zero function
bmyerz Sep 22, 2014
45af573
scheme writing as zipped list
bmyerz Sep 22, 2014
eeea068
fix non initialization bug in relation_io
bmyerz Sep 29, 2014
9c8f7ac
igor for naive bayes
bmyerz Sep 29, 2014
4392f1f
sequential Output in io
bmyerz Sep 29, 2014
31b69a7
Merge branch 'master' into bmyerz/support-raco-udas
bmyerz Oct 7, 2014
db93823
add application must go after librarys added
bmyerz May 6, 2015
d589e68
update Grappa IO to handle native Radish strings
bmyerz May 13, 2015
3ec1284
add MAX as an aggregate
bmyerz May 13, 2015
bd74806
add min
bmyerz May 13, 2015
b722574
update relation tests for fix in Raco
bmyerz May 19, 2015
2f5653b
add readSplits to relation tests
bmyerz May 19, 2015
76f1373
add readSplits, hardcoded for JSON parsing for now; also fix an insid…
bmyerz May 19, 2015
082293d
make splits use hdfs style name
bmyerz Jun 9, 2015
6cee305
truncate strings to MAX_STR_LEN :(
bmyerz Jun 9, 2015
5a6f068
actually use OOP
bmyerz Jun 9, 2015
658ece9
Merge branch 'bmyerz/support-raco-udas' into bmyerz/refactor-io
bmyerz Jun 9, 2015
8ab0a16
make relation io test work with refactoring
bmyerz Jun 10, 2015
40f632f
add a schema to json parser
bmyerz Jun 16, 2015
3432925
this should be public like its parent class
bmyerz Jun 16, 2015
21bffae
Merge pull request #213 from uwsampa/bmyerz/refactor-io
bmyerz Jun 24, 2015
a570271
add io tests for string+double tuple and fix memcpy overlap bug
bmyerz Jun 24, 2015
cbbb2bb
Merge pull request #214 from uwsampa/bmyerz/fix-raco-443
bmyerz Jun 24, 2015
69e07f8
block distribution
bmyerz Jun 26, 2015
8feaf5c
enforce block size for global arrays of tuples
bmyerz Jun 29, 2015
004f632
strings.h to_array truncates now, so don't do it in file read io
bmyerz Jul 1, 2015
f0fa9e2
collection of parallel iterators
bmyerz Jul 31, 2015
8efbf42
boost dir
bmyerz Aug 3, 2015
db5d14e
add iterator based join and all the others
bmyerz Aug 11, 2015
0dced7c
Merge pull request #218 from uwsampa/bmyerz/radish-iterators
bmyerz Aug 11, 2015
507ce93
fix aggregate operator: shouldn't expose iterator to the mktuple impl…
bmyerz Aug 14, 2015
da44ca9
add version of DHT that takes update function as a member
bmyerz Aug 18, 2015
5f34251
fix bug: need to specify appropriate GCE
bmyerz Aug 19, 2015
d2fb164
add operators.hpp to sources
bmyerz Aug 19, 2015
5c70dfe
fix bug with zero key aggie
bmyerz Aug 27, 2015
11baafe
fixes for zero key aggie
bmyerz Aug 28, 2015
c4d5e6e
add broadcast cross product
bmyerz Sep 1, 2015
29c0b5b
type-o!
bmyerz Sep 1, 2015
281aab9
change signatuire of mktuple in broadcast
bmyerz Sep 1, 2015
4ab2c9b
fix the actual fix
bmyerz Sep 1, 2015
3c60cc7
forgot to count hash_tables_size
bmyerz Sep 8, 2015
ed0cb12
adding an important metric for perf comparison
bmyerz Sep 8, 2015
a3dfc50
track misses in cells
bmyerz Sep 9, 2015
bdeb17a
add partition local updates
bmyerz Sep 23, 2015
1fcf859
emulate a forall_here in forall_entries
bmyerz Sep 23, 2015
996a2ad
Merge branch 'bmyerz/support-raco-udas' of github.com:uwsampa/grappa …
bmyerz Sep 23, 2015
44c7432
iterators with local groupby optimization
bmyerz Oct 5, 2015
33ea389
fix typo in iterator groupby
bmyerz Oct 5, 2015
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
29 changes: 27 additions & 2 deletions applications/join/Aggregates.hpp
Original file line number Diff line number Diff line change
@@ -1,13 +1,38 @@
#pragma once
#include <algorithm>

namespace Aggregates {
template < typename State, typename UV >
State SUM(State sofar, UV nextval) {
State SUM(const State& sofar, const UV& nextval) {
return sofar + nextval;
}

template < typename State, typename UV >
State COUNT(State sofar, UV nextval) {
State COUNT(const State& sofar, const UV& nextval) {
return sofar + 1;
}

// keep MAX macro from being used here
#pragma push_macro("MAX")
#undef MAX
template <typename State, typename UV >
State MAX(const State& sofar, const UV& nextval) {
return std::max(sofar, nextval);
}
#pragma pop_macro("MAX")
// keep MIN macro from being used here
#pragma push_macro("MIN")
#undef MIN
template <typename State, typename UV >
State MIN(const State& sofar, const UV& nextval) {
return std::min(sofar, nextval);
}
#pragma pop_macro("MIN")


template <typename N>
N Zero() {
return 0;
}

}
20 changes: 17 additions & 3 deletions applications/join/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ set(QUERYLIB_SOURCES
MatchesDHT.hpp
MatchesDHT.cpp
DoubleDHT.hpp
DoubleDHT.cpp
Hypercube.hpp
Hypercube.cpp
local_graph.cpp
Expand All @@ -23,17 +24,27 @@ set(QUERYLIB_SOURCES
stats.cpp
MapReduce.cpp
MapReduce.hpp
HashJoin.hpp
HashJoin.cpp
Aggregates.hpp
DHT_symmetric.hpp
DHT_symmetric.cpp
DHT_symmetric_generic.hpp
Operators.hpp
)
#FIXME: these MapReduce Hash joins belong in the above sources
#HashJoin.hpp
#HashJoin.cpp


set(QUERYIO_SOURCES
relation_io.hpp
relation_io.cpp
block_distribution.hpp
block_distribution.cpp
Tuple.hpp
Tuple.cpp
relation.hpp
json/json.h
jsoncpp.cpp
)

set(INCLUDE_DIRS
Expand Down Expand Up @@ -98,7 +109,6 @@ endforeach()
# exe targets for generated query codes
foreach(query ${GENERATED_SOURCES})
get_filename_component(query_name ${query}, NAME_WE)
add_grappa_application(${query_name}.exe ${query})

# Raco C++ environment
set(RACO_DIR "$ENV{RACO_HOME}")
Expand All @@ -110,6 +120,8 @@ foreach(query ${GENERATED_SOURCES})
message(FATAL_ERROR "Undefined RACO_HOME environment variable, required
for generated queries applications/join/grappa_*.cpp ")
endif()

add_grappa_application(${query_name}.exe ${query})

target_link_libraries(${query_name}.exe generator querylib queryio racoc)
list(APPEND GENERATED_EXES "${query_name}.exe")
Expand Down Expand Up @@ -151,5 +163,7 @@ macro(add_check test_cpp nnode ppn target)
add_dependencies( check-all-${target}-compile-only ${test})
endmacro()

# add test for IO
add_check(Relation_io_tests.cpp 2 1 pass)
file(COPY splits_test DESTINATION .)

5 changes: 5 additions & 0 deletions applications/join/DHT_symmetric.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
//#include "DHT_symmetric.hpp"
#include "Metrics.hpp"

GRAPPA_DEFINE_METRIC(SimpleMetric<uint64_t>, dht_inserts, 0);
GRAPPA_DEFINE_METRIC(SimpleMetric<uint64_t>, dht_partition_inserts, 0);
59 changes: 43 additions & 16 deletions applications/join/DHT_symmetric.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -7,38 +7,37 @@
#include <Metrics.hpp>

#include <cmath>
#include <unordered_map>


//GRAPPA_DECLARE_METRIC(MaxMetric<uint64_t>, max_cell_length);
GRAPPA_DECLARE_METRIC(SimpleMetric<uint64_t>, hash_tables_size);
GRAPPA_DECLARE_METRIC(SummarizingMetric<uint64_t>, hash_tables_lookup_steps);

GRAPPA_DECLARE_METRIC(SimpleMetric<uint64_t>, dht_inserts);
GRAPPA_DECLARE_METRIC(SimpleMetric<uint64_t>, dht_partition_inserts);

// for naming the types scoped in DHT_symmetric
#define DHT_symmetric_TYPE(type) typename DHT_symmetric<K,V,HF>::type
#define DHT_symmetric_T DHT_symmetric<K,V,HF>
#define DHT_symmetric_TYPE(type) typename DHT_symmetric<K,V,Hash>::type
#define DHT_symmetric_T DHT_symmetric<K,V,Hash>

// Hash table for joins
// * allows multiple copies of a Key
// * lookups return all Key matches
template <typename K, typename V, uint64_t (*HF)(K)>
template <typename K, typename V, typename Hash>
class DHT_symmetric {

private:
// private members
GlobalAddress< DHT_symmetric_T > self;
std::unordered_map<K, V> * local_map;
std::unordered_map<K, V, Hash > * local_map;
size_t partitions;

uint64_t computeIndex( K key ) {
return HF(key) % partitions;
size_t computeIndex( K key ) {
return Hash()(key) % partitions;
}

// for creating local DHT_symmetric
DHT_symmetric( GlobalAddress<DHT_symmetric_T> self )
: self(self)
, partitions(Grappa::cores())
, local_map(new std::unordered_map<K,V>())
, local_map(new std::unordered_map<K,V,Hash>())
{}

public:
Expand All @@ -55,30 +54,44 @@ class DHT_symmetric {
return object;
}

template< GlobalCompletionEvent * GCE, typename UV, V (*UpF)(V oldval, UV incVal), V Init, SyncMode S = SyncMode::Async >
template< typename UV, V (*UpF)(const V& oldval, const UV& incVal), V (*Init)(void) >
void update_partition( K key, UV val ) {
std::pair<K,V> entry(key, Init());

auto res = this->local_map->insert(entry); auto resIt = res.first; //auto resNew = res.second;

// perform the update in place
resIt->second = UpF(resIt->second, val);
dht_partition_inserts++;
}

template< GlobalCompletionEvent * GCE, typename UV, V (*UpF)(const V& oldval, const UV& incVal), V (*Init)(void), SyncMode S = SyncMode::Async >
void update( K key, UV val ) {
uint64_t index = computeIndex( key );
auto index = computeIndex( key );
auto target = this->self;

Grappa::delegate::call<S,GCE>(index, [key, val, target]() {
// inserts initial value only if the key is not yet present
std::pair<K,V> entry(key, Init);
std::pair<K,V> entry(key, Init());

auto res = target->local_map->insert(entry); auto resIt = res.first; //auto resNew = res.second;

// perform the update in place
resIt->second = UpF(resIt->second, val);
dht_inserts++;
});
}

template< GlobalCompletionEvent * GCE, SyncMode S = SyncMode::Async >
void insert_unique( K key, V val ) {
uint64_t index = computeIndex( key );
auto index = computeIndex( key );
auto target = this->self;

Grappa::delegate::call<S,GCE>(index, [key, val, target]() {
// inserts initial value only if the key is not yet present
std::pair<K,V> entry(key, val);
target->local_map->insert(entry);
dht_inserts++;
});
}

Expand All @@ -89,15 +102,29 @@ class DHT_symmetric {
// TODO: cannot use forall_here because unordered_map->begin() is a forward iterator (std::advance is O(n))
// TODO: for now the serial loop is only performant if the continuation code is also in CPS
// TODO: best solution is a forall_here where loop decomposition is just linear continuation instead of divide and conquer

int64_t iter_count = 0;
auto m = target->local_map;
for (auto it = m->begin(); it != m->end(); it++) {
for (auto it = m->begin(); it != m->end(); it++, iter_count++) {
// continuation takes a mapping
f(*it);

// get the same effect as a forall_here that would have linear decomposition:
// specifically that there is at least one yield per FLAGS_loop_theshold iterations.
if (iter_count == FLAGS_loop_threshold) {
iter_count = 0;
Grappa::yield();
}
}
});
// TODO GCE->wait(); // block until all tasks are done
}

std::unordered_map<K,V,Hash> * get_local_map() {
return local_map;
}



} GRAPPA_BLOCK_ALIGNED;

119 changes: 119 additions & 0 deletions applications/join/DHT_symmetric_generic.hpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,119 @@
#pragma once

#include <Grappa.hpp>
#include <GlobalAllocator.hpp>
#include <GlobalCompletionEvent.hpp>
#include <ParallelLoop.hpp>
#include <Metrics.hpp>

#include <cmath>
#include <unordered_map>


GRAPPA_DECLARE_METRIC(SimpleMetric<uint64_t>, dht_inserts);

// for naming the types scoped in DHT_symmetric_generic
#define DHT_symmetric_generic_TYPE(type) typename DHT_symmetric_generic<K,V,UV,Hash>::type
#define DHT_symmetric_generic_T DHT_symmetric_generic<K,V,UV,Hash>

// TODO: The functionality of this class is covered by DHT_symmetric already.
// The only difference is that update_f/init_f are template parameters
// versus instance parameters. We can just add these instance parameters
// to DHT_symmetric and have two versions of update() method.

// Hash table for joins
// * allows multiple copies of a Key
// * lookups return all Key matches
template <typename K, typename V, typename UV, typename Hash>
class DHT_symmetric_generic {
public:
typedef V (*update_f)(const V& oldval, const UV& incVal);
typedef V (*init_f)(void);

private:
// private members
GlobalAddress< DHT_symmetric_generic_T > self;
std::unordered_map<K, V, Hash > * local_map;
size_t partitions;

update_f UpF;
init_f Init;

size_t computeIndex( K key ) {
return Hash()(key) % partitions;
}

// for creating local DHT_symmetric_generic
DHT_symmetric_generic( GlobalAddress<DHT_symmetric_generic_T> self, update_f upf, init_f initf )
: self(self)
, UpF(upf)
, Init(initf)
, partitions(Grappa::cores())
, local_map(new std::unordered_map<K,V,Hash>())
{}

public:
// for static construction
DHT_symmetric_generic( ) {}

static GlobalAddress<DHT_symmetric_generic_T> create_DHT_symmetric( update_f upf, init_f initf ) {
auto object = Grappa::symmetric_global_alloc<DHT_symmetric_generic_T>();

Grappa::on_all_cores( [object, upf, initf] {
new(object.pointer()) DHT_symmetric_generic_T(object, upf, initf);
});

return object;
}

void update_partition(K key, UV val) {
std::pair<K,V> entry(key, Init());

auto res = this->local_map->insert(entry); auto resIt = res.first; //auto resNew = res.second;

// perform the update in place
resIt->second = this->UpF(resIt->second, val);
dht_partition_inserts++;
}

template< GlobalCompletionEvent * GCE, SyncMode S = SyncMode::Async >
void update( K key, UV val ) {
auto index = computeIndex( key );
auto target = this->self;

Grappa::delegate::call<S,GCE>(index, [key, val, target]() {
// inserts initial value only if the key is not yet present
std::pair<K,V> entry(key, target->Init());

auto res = target->local_map->insert(entry); auto resIt = res.first; //auto resNew = res.second;

// perform the update in place
resIt->second = target->UpF(resIt->second, val);
dht_inserts++;
});
}

template < GlobalCompletionEvent * GCE, typename CF >
void forall_entries( CF f ) {
auto target = this->self;
Grappa::on_all_cores([target, f] {
// TODO: cannot use forall_here because unordered_map->begin() is a forward iterator (std::advance is O(n))
// TODO: for now the serial loop is only performant if the continuation code is also in CPS
// TODO: best solution is a forall_here where loop decomposition is just linear continuation instead of divide and conquer
auto m = target->local_map;
for (auto it = m->begin(); it != m->end(); it++) {
// continuation takes a mapping
f(*it);
}
});
// TODO GCE->wait(); // block until all tasks are done
}

std::unordered_map<K,V,Hash> * get_local_map() {
return local_map;
}



} GRAPPA_BLOCK_ALIGNED;

5 changes: 5 additions & 0 deletions applications/join/DoubleDHT.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
#include <Metrics.hpp>

GRAPPA_DEFINE_METRIC(SimpleMetric<uint64_t>, hash_matches_iterator_cell_single_misses, 0);
GRAPPA_DEFINE_METRIC(SimpleMetric<uint64_t>, hash_matches_iterator_cell_hits, 0);
GRAPPA_DEFINE_METRIC(SimpleMetric<uint64_t>, hash_matches_iterator_cell_both_misses, 0);
Loading