Skip to content

Commit

Permalink
Retain data types in aggregation service (#604)
Browse files Browse the repository at this point in the history
* Retain int types in aggregate service

* Performance improvement

* More optimizations
  • Loading branch information
daboehme authored Oct 9, 2024
1 parent f95bca8 commit f196256
Show file tree
Hide file tree
Showing 5 changed files with 153 additions and 78 deletions.
6 changes: 5 additions & 1 deletion include/caliper/common/Variant.h
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,11 @@ class Variant

Variant& operator += (const Variant& val);

Variant& min(const Variant& val);
Variant& max(const Variant& val);

static void update_minmaxsum(const Variant& val, Variant& min_val, Variant& max_val, Variant& sum_val);

size_t pack(unsigned char* buf) const {
return cali_variant_pack(m_v, buf);
}
Expand Down Expand Up @@ -154,4 +159,3 @@ inline bool operator > (const Variant& lhs, const Variant& rhs) {
std::ostream& operator << (std::ostream& os, const Variant& v);

} // namespace cali

125 changes: 125 additions & 0 deletions src/common/Variant.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -223,3 +223,128 @@ Variant& Variant::operator += (const Variant& val)

return *this;
}

Variant& Variant::min(const Variant& val)
{
cali_attr_type type = this->type();

if (type == val.type()) {
switch (type) {
case CALI_TYPE_DOUBLE:
m_v.value.v_double = std::min(m_v.value.v_double, val.m_v.value.v_double);
break;
case CALI_TYPE_INT:
m_v.value.v_int = std::min(m_v.value.v_int, val.m_v.value.v_int);
break;
case CALI_TYPE_UINT:
m_v.value.v_uint = std::min(m_v.value.v_uint, val.m_v.value.v_uint);
break;
default:
break;
}
} else {
switch (type) {
case CALI_TYPE_INV:
*this = val;
break;
case CALI_TYPE_DOUBLE:
m_v.value.v_double = std::min(m_v.value.v_double, val.to_double());
break;
case CALI_TYPE_INT:
m_v.value.v_int = std::min(m_v.value.v_int, val.to_int64());
break;
case CALI_TYPE_UINT:
m_v.value.v_uint = std::min(m_v.value.v_uint, val.to_uint());
break;
default:
break;
}
}

return *this;
}

Variant& Variant::max(const Variant& val)
{
cali_attr_type type = this->type();

if (type == val.type()) {
switch (type) {
case CALI_TYPE_DOUBLE:
m_v.value.v_double = std::max(m_v.value.v_double, val.m_v.value.v_double);
break;
case CALI_TYPE_INT:
m_v.value.v_int = std::max(m_v.value.v_int, val.m_v.value.v_int);
break;
case CALI_TYPE_UINT:
m_v.value.v_uint = std::max(m_v.value.v_uint, val.m_v.value.v_uint);
break;
default:
break;
}
} else {
switch (type) {
case CALI_TYPE_INV:
*this = val;
break;
case CALI_TYPE_DOUBLE:
m_v.value.v_double = std::max(m_v.value.v_double, val.to_double());
break;
case CALI_TYPE_INT:
m_v.value.v_int = std::max(m_v.value.v_int, val.to_int64());
break;
case CALI_TYPE_UINT:
m_v.value.v_uint = std::max(m_v.value.v_uint, val.to_uint());
break;
default:
break;
}
}

return *this;
}

void Variant::update_minmaxsum(const Variant& val, Variant& min_val, Variant& max_val, Variant& sum_val)
{
if (min_val.empty()) {
min_val = val;
max_val = val;
sum_val = val;
return;
}

switch (val.m_v.type_and_size & CALI_VARIANT_TYPE_MASK) {
case CALI_TYPE_DOUBLE:
{
double d = val.m_v.value.v_double;
sum_val.m_v.value.v_double += d;
if (d < min_val.m_v.value.v_double)
min_val.m_v.value.v_double = d;
else if (d > max_val.m_v.value.v_double)
max_val.m_v.value.v_double = d;
}
break;
case CALI_TYPE_INT:
{
int64_t i = val.m_v.value.v_int;
sum_val.m_v.value.v_int += i;
if (i < min_val.m_v.value.v_int)
min_val.m_v.value.v_int = i;
else if (i > max_val.m_v.value.v_int)
max_val.m_v.value.v_int = i;
}
break;
case CALI_TYPE_UINT:
{
uint64_t u = val.m_v.value.v_uint;
sum_val.m_v.value.v_uint += u;
if (u < min_val.m_v.value.v_uint)
min_val.m_v.value.v_uint = u;
else if (u > max_val.m_v.value.v_uint)
max_val.m_v.value.v_uint = u;
}
break;
default:
break;
}
}
48 changes: 4 additions & 44 deletions src/reader/Aggregator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -507,28 +507,8 @@ class MinKernel : public AggregateKernel {
return;

for (const Entry& e : list) {
if (e.attribute() == target_attr.id() || e.attribute() == min_attr.id()) {
if (m_min.empty()) {
m_min = e.value();
} else {
switch (target_attr.type()) {
case CALI_TYPE_INT:
{
int64_t m = std::min(m_min.to_int64(), e.value().to_int64());
m_min = Variant(cali_make_variant_from_int64(m));
}
break;
case CALI_TYPE_DOUBLE:
m_min = Variant(std::min(m_min.to_double(), e.value().to_double()));
break;
case CALI_TYPE_UINT:
m_min = Variant(std::min(m_min.to_uint(), e.value().to_uint()));
break;
default:
;
}
}
}
if (e.attribute() == target_attr.id() || e.attribute() == min_attr.id())
m_min.min(e.value());
}
}

Expand Down Expand Up @@ -612,28 +592,8 @@ class MaxKernel : public AggregateKernel {
return;

for (const Entry& e : list) {
if (e.attribute() == target_attr.id() || e.attribute() == max_attr.id()) {
if (m_max.empty()) {
m_max = e.value();
} else {
switch (target_attr.type()) {
case CALI_TYPE_INT:
{
int64_t m = std::max(m_max.to_int64(), e.value().to_int64());
m_max = Variant(cali_make_variant_from_int64(m));
}
break;
case CALI_TYPE_DOUBLE:
m_max = Variant(std::max(m_max.to_double(), e.value().to_double()));
break;
case CALI_TYPE_UINT:
m_max = Variant(std::max(m_max.to_uint(), e.value().to_uint()));
break;
default:
;
}
}
}
if (e.attribute() == target_attr.id() || e.attribute() == max_attr.id())
m_max.max(e.value());
}
}

Expand Down
7 changes: 4 additions & 3 deletions src/services/aggregate/Aggregate.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -112,10 +112,11 @@ class Aggregate
ResultAttributes res;

int prop = CALI_ATTR_ASVALUE | CALI_ATTR_SCOPE_THREAD | CALI_ATTR_SKIP_EVENTS;
cali_attr_type type = attr.type();

res.min_attr = c->create_attribute(std::string("min#") + name, CALI_TYPE_DOUBLE, prop);
res.max_attr = c->create_attribute(std::string("max#") + name, CALI_TYPE_DOUBLE, prop);
res.sum_attr = c->create_attribute(std::string("sum#") + name, CALI_TYPE_DOUBLE, prop);
res.min_attr = c->create_attribute(std::string("min#") + name, type, prop);
res.max_attr = c->create_attribute(std::string("max#") + name, type, prop);
res.sum_attr = c->create_attribute(std::string("sum#") + name, type, prop);
res.avg_attr = c->create_attribute(std::string("avg#") + name, CALI_TYPE_DOUBLE, prop);
#ifdef CALIPER_ENABLE_HISTOGRAMS
for (int jj = 0; jj < CALI_AGG_HISTOGRAM_BINS; jj++) {
Expand Down
45 changes: 15 additions & 30 deletions src/services/aggregate/AggregationDB.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -17,16 +17,16 @@
using namespace cali;
using namespace aggregate;

#define MAX_KEYLEN 20
#define MAX_KEYLEN 16

namespace
{

struct AggregateKernel {
double min;
double max;
double sum;
int count;
Variant min;
Variant max;
Variant sum;
unsigned count;

#ifdef CALIPER_ENABLE_HISTOGRAMS
int histogram_max;
Expand All @@ -42,18 +42,14 @@ struct AggregateKernel {
#endif

AggregateKernel()
: min(std::numeric_limits<double>::max()),
max(std::numeric_limits<double>::min()),
sum(0), count(0)
: count(0)
#ifdef CALIPER_ENABLE_HISTOGRAMS
, histogram_max(0)
#endif
{ }

void update(double val) {
min = std::min(min, val);
max = std::max(max, val);
sum += val;
inline void update(const Variant& val) {
Variant::update_minmaxsum(val, min, max, sum);
++count;

#ifdef CALIPER_ENABLE_HISTOGRAMS
Expand Down Expand Up @@ -98,18 +94,6 @@ struct AggregateEntry {
size_t next_entry_idx;
};

bool key_equal(SnapshotView lhs, SnapshotView rhs)
{
if (lhs.size() != rhs.size())
return false;

for (size_t i = 0; i < lhs.size(); ++i)
if (lhs[i] != rhs[i])
return false;

return true;
}

} // namespace [anonymous]


Expand Down Expand Up @@ -164,11 +148,12 @@ struct AggregationDB::AggregationDBImpl

AggregateEntry* find_or_create_entry(SnapshotView key, std::size_t hash, std::size_t num_aggr_attrs, bool can_alloc) {
hash = hash % m_hashmap.size();
size_t key_len = key.size();
size_t count = 0;

for (std::size_t idx = m_hashmap[hash]; idx != 0; idx = m_entries[idx].next_entry_idx) {
AggregateEntry* e = &m_entries[idx];
if (key_equal(key, SnapshotView(e->key_len, &m_keyents[e->key_idx])))
if (key_len == e->key_len && std::equal(key.begin(), key.end(), m_keyents.begin()+e->key_idx))
return e;
++count;
}
Expand All @@ -179,7 +164,7 @@ struct AggregationDB::AggregationDBImpl
if (!can_alloc) {
if (m_kernels.size() + num_aggr_attrs >= m_kernels.capacity())
return &m_entries[0];
if (m_keyents.size() + key.size() >= m_keyents.capacity())
if (m_keyents.size() + key_len >= m_keyents.capacity())
return &m_entries[0];
if (m_entries.size() + 1 >= m_entries.capacity())
return &m_entries[0];
Expand All @@ -195,7 +180,7 @@ struct AggregationDB::AggregationDBImpl

e.count = 0;
e.key_idx = key_idx;
e.key_len = key.size();
e.key_len = key_len;
e.kernels_idx = kernels_idx;
e.num_kernels = num_aggr_attrs;
e.next_entry_idx = m_hashmap[hash];
Expand Down Expand Up @@ -263,7 +248,7 @@ struct AggregationDB::AggregationDBImpl
if (e.empty())
continue;

m_kernels[entry->kernels_idx + a].update(e.value().to_double());
m_kernels[entry->kernels_idx + a].update(e.value());
}
}

Expand All @@ -286,7 +271,7 @@ struct AggregationDB::AggregationDBImpl
SnapshotView kv(entry.key_len, &m_keyents[entry.key_idx]);

std::vector<Entry> rec;
rec.reserve(kv.size() + entry.num_kernels + 2);
rec.reserve(kv.size() + 4*entry.num_kernels + 2);

std::copy(kv.begin(), kv.end(), std::back_inserter(rec));

Expand All @@ -296,7 +281,7 @@ struct AggregationDB::AggregationDBImpl
if (k->count == 0)
continue;

double avg = k->sum / k->count;
double avg = k->sum.to_double() / k->count;

rec.push_back(Entry(info.result_attrs[a].min_attr, Variant(k->min)));
rec.push_back(Entry(info.result_attrs[a].max_attr, Variant(k->max)));
Expand Down

0 comments on commit f196256

Please sign in to comment.