Skip to content

Commit

Permalink
Add conflictscan1 test.
Browse files Browse the repository at this point in the history
In this test, threads 1-N repeatedly insert items at the end of a
range and delete items from the beginning of the range while thread
0 repeatedly scans different ranges.

The test demonstrates the "conflict-aware indexes" idea we're
talking about in the VLDB paper; when prefixlen=4 performance sucks,
when prefixlen=8 it is much better.
  • Loading branch information
kohler committed Jan 2, 2020
1 parent b69edcd commit f37e79f
Show file tree
Hide file tree
Showing 5 changed files with 155 additions and 12 deletions.
37 changes: 30 additions & 7 deletions kvrow.hh
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,8 @@ class query {
template <typename T>
void run_scan(T& table, Json& request, threadinfo& ti);
template <typename T>
void run_scan_versions(T& table, Json& request, std::vector<uint64_t>& scan_versions, threadinfo& ti);
template <typename T>
void run_rscan(T& table, Json& request, threadinfo& ti);

const loginfo::query_times& query_times() const {
Expand Down Expand Up @@ -266,8 +268,9 @@ inline void query<R>::apply_remove(R*& value, kvtimestamp_t& node_ts,
template <typename R>
class query_json_scanner {
public:
query_json_scanner(query<R> &q, lcdf::Json& request)
: q_(q), nleft_(request[3].as_i()), request_(request) {
query_json_scanner(query<R>& q, lcdf::Json& request, std::vector<uint64_t>* scan_versions)
: q_(q), nleft_(request[3].as_i()), request_(request),
scan_versions_(scan_versions) {
std::swap(request[2].value().as_s(), firstkey_);
request_.resize(2);
q_.scankeypos_ = 0;
Expand All @@ -276,7 +279,11 @@ class query_json_scanner {
return firstkey_;
}
template <typename SS, typename K>
void visit_leaf(const SS&, const K&, threadinfo&) {
void visit_leaf(const SS& scanstack, const K&, threadinfo&) {
if (scan_versions_) {
scan_versions_->push_back(reinterpret_cast<uint64_t>(scanstack.node()));
scan_versions_->push_back(scanstack.full_version_value());
}
}
bool visit_value(Str key, R* value, threadinfo& ti) {
if (row_is_marker(value)) {
Expand All @@ -301,25 +308,41 @@ class query_json_scanner {
int nleft_;
lcdf::Json& request_;
lcdf::String firstkey_;
std::vector<uint64_t>* scan_versions_;
};

template <typename R> template <typename T>
void query<R>::run_scan(T& table, Json& request, threadinfo& ti) {
assert(request[3].as_i() > 0);
f_.clear();
for (int i = 4; i != request.size(); ++i)
for (int i = 4; i != request.size(); ++i) {
f_.push_back(request[i].as_i());
}
query_json_scanner<R> scanf(*this, request, nullptr);
table.scan(scanf.firstkey(), true, scanf, ti);
}

template <typename R> template <typename T>
void query<R>::run_scan_versions(T& table, Json& request,
std::vector<uint64_t>& scan_versions,
threadinfo& ti) {
assert(request[3].as_i() > 0);
f_.clear();
for (int i = 4; i != request.size(); ++i) {
f_.push_back(request[i].as_i());
query_json_scanner<R> scanf(*this, request);
}
query_json_scanner<R> scanf(*this, request, &scan_versions);
table.scan(scanf.firstkey(), true, scanf, ti);
}

template <typename R> template <typename T>
void query<R>::run_rscan(T& table, Json& request, threadinfo& ti) {
assert(request[3].as_i() > 0);
f_.clear();
for (int i = 4; i != request.size(); ++i)
for (int i = 4; i != request.size(); ++i) {
f_.push_back(request[i].as_i());
query_json_scanner<R> scanf(*this, request);
}
query_json_scanner<R> scanf(*this, request, nullptr);
table.rscan(scanf.firstkey(), true, scanf, ti);
}

Expand Down
90 changes: 90 additions & 0 deletions kvtest.hh
Original file line number Diff line number Diff line change
Expand Up @@ -906,6 +906,96 @@ void kvtest_wd3(C& client, uint64_t nk_total)
client.report(result);
}

template <typename C>
void kvtest_conflictscan1(C& client)
{
unsigned prefixlen = client.param("prefixlen", 4).to_u64();
unsigned rangelen = client.param("rangelen", 4).to_u64();
unsigned keylen = prefixlen + rangelen;
unsigned rangesize = client.param("rangesize", 100).to_u64();
unsigned scansize = client.param("scansize", 20).to_u64();
assert(rangelen > 0 && rangelen <= 8);
assert(prefixlen > 0 && prefixlen <= 8);
assert(rangelen == 8 || rangesize < (1UL << (rangelen * 8)));
assert(scansize < rangesize);
using leaf_type = typename C::table_type::leaf_type;

union {
uint64_t u;
char c[8];
} x;

if (client.id() == 0) {
// scan worker
char hbuf[16];
uint64_t nscans = 0, naborts = 0;
kvrandom_uniform_int_distribution<uint64_t> ntd(1, client.nthreads() - 1);
std::vector<Str> keys, values;
client.wait_all();

while (!client.timeout(0)) {
++nscans;
memset(hbuf, 0, 16);
x.u = host_to_net_order(ntd(client.rand));
memcpy(hbuf, x.c + (8 - prefixlen), prefixlen);
while (!client.timeout(0)) {
client.scan_versions_sync(Str(hbuf, keylen), scansize, keys, values);
for (size_t i = 0; i != client.scan_versions().size(); i += 2) {
leaf_type* l = reinterpret_cast<leaf_type*>(client.scan_versions()[i]);
uint64_t uv = l->full_unlocked_version_value();
if (uv != client.scan_versions()[i + 1]) {
goto abort;
}
}
break;
abort:
++naborts;
}
}
Json result;
result.set("keylen", keylen).set("prefixlen", prefixlen).set("scansize", scansize).set("scans", nscans).set("aborts", naborts);
client.report(result);

} else {
// insert/delete worker
char hbuf[16], tbuf[16];
x.u = host_to_net_order(uint64_t(client.id()));
memcpy(hbuf, x.c + (8 - prefixlen), prefixlen);
memset(hbuf + prefixlen, 0, rangelen);
memcpy(tbuf, hbuf, 16);
char lastp = tbuf[prefixlen - 1];

// set initial values
for (unsigned i = 0; i != rangesize; ++i) {
client.insert_check(Str(tbuf, keylen), Str(tbuf, 8));
quick_istr::binary_increment_from_end(tbuf + keylen);
}
client.wait_all();

// insert/delete
uint64_t ninsert = 0, nremove = 0, cursize = rangesize;
while (cursize && !client.timeout(0)) {
if (tbuf[prefixlen - 1] == lastp
&& cursize < rangesize * 2
&& (cursize == scansize
|| client.rand() % 65536 < 32768)) {
client.insert_check(Str(tbuf, keylen), Str(tbuf + keylen - 8, 8));
quick_istr::binary_increment_from_end(tbuf + keylen);
++ninsert;
++cursize;
} else {
client.remove_check(Str(hbuf, keylen));
quick_istr::binary_increment_from_end(hbuf + keylen);
++nremove;
--cursize;
}
}
Json result;
result.set("rangesize", rangesize).set("inserts", ninsert).set("removes", nremove);
client.report(result);
}
}

// Create a range of keys [initial_pos, initial_pos + n)
// where key k == initial_pos + i has value (n - 1 - i).
// Many overwrites.
Expand Down
9 changes: 9 additions & 0 deletions misc.hh
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,15 @@ struct quick_istr {
*ends = '0';
}
}
static void binary_increment_from_end(char* ends) {
while (true) {
--ends;
*ends = (char) ((unsigned char) *ends + 1);
if (*ends != 0) {
return;
}
}
}
};

struct Clp_Parser;
Expand Down
30 changes: 25 additions & 5 deletions mttest.cc
Original file line number Diff line number Diff line change
Expand Up @@ -230,6 +230,11 @@ struct kvtest_client {
std::vector<Str>& keys, std::vector<Str>& values);
void rscan_sync(Str firstkey, int n,
std::vector<Str>& keys, std::vector<Str>& values);
void scan_versions_sync(Str firstkey, int n,
std::vector<Str>& keys, std::vector<Str>& values);
const std::vector<uint64_t>& scan_versions() const {
return scan_versions_;
}

void put(Str key, Str value);
void put(const char *key, const char *value) {
Expand Down Expand Up @@ -316,6 +321,7 @@ struct kvtest_client {
uint64_t limit_;
Json report_;
Json req_;
std::vector<uint64_t> scan_versions_;
int ncores_;
kvout *kvo_;

Expand Down Expand Up @@ -426,6 +432,16 @@ void kvtest_client<T>::rscan_sync(Str firstkey, int n,
output_scan(req_, keys, values);
}

template <typename T>
void kvtest_client<T>::scan_versions_sync(Str firstkey, int n,
std::vector<Str>& keys,
std::vector<Str>& values) {
req_ = Json::array(0, 0, firstkey, n);
scan_versions_.clear();
q_[0].run_scan_versions(table_->table(), req_, scan_versions_, *ti_);
output_scan(req_, keys, values);
}

template <typename T>
void kvtest_client<T>::output_scan(const Json& req, std::vector<Str>& keys,
std::vector<Str>& values) const {
Expand Down Expand Up @@ -581,6 +597,7 @@ MAKE_TESTRUNNER(rscan1, kvtest_rscan1(client, 0));
MAKE_TESTRUNNER(rscan1q80, kvtest_rscan1(client, 0.8));
MAKE_TESTRUNNER(splitremove1, kvtest_splitremove1(client));
MAKE_TESTRUNNER(url, kvtest_url(client));
MAKE_TESTRUNNER(conflictscan1, kvtest_conflictscan1(client));


enum {
Expand Down Expand Up @@ -659,8 +676,9 @@ struct test_thread {
++subtestno;
}
int at = fetch_and_add(&active_threads_, -1);
if (at == 1 && print_table)
if (at == 1 && print_table) {
kvtest_print(*table_, stdout, tt.client_.ti_);
}
if (at == 1 && json_stats) {
Json j;
kvtest_json_stats(*table_, j, *tt.client_.ti_);
Expand All @@ -673,16 +691,18 @@ struct test_thread {
return 0;
}
void ready_timeouts() {
for (size_t i = 0; i < arraysize(timeout); ++i)
for (size_t i = 0; i < arraysize(timeout); ++i) {
timeout[i] = false;
if (duration[0])
}
if (duration[0]) {
xalarm(duration[0]);
}
}
static T *table_;
static T* table_;
static unsigned active_threads_;
kvtest_client<T> client_;
};
template <typename T> T *test_thread<T>::table_;
template <typename T> T* test_thread<T>::table_;
template <typename T> unsigned test_thread<T>::active_threads_;

typedef test_thread<Masstree::default_table> masstree_test_thread;
Expand Down
1 change: 1 addition & 0 deletions query_masstree.hh
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ class query_table {
public:
typedef P parameters_type;
typedef node_base<P> node_type;
typedef leaf<P> leaf_type;
typedef typename P::threadinfo_type threadinfo;
typedef unlocked_tcursor<P> unlocked_cursor_type;
typedef tcursor<P> cursor_type;
Expand Down

0 comments on commit f37e79f

Please sign in to comment.