Skip to content

Commit

Permalink
Fix end of iteration bug in Lambda Iterator
Browse files Browse the repository at this point in the history
  • Loading branch information
lczech committed Feb 23, 2024
1 parent 9be8ff2 commit ddb00d5
Show file tree
Hide file tree
Showing 2 changed files with 107 additions and 60 deletions.
53 changes: 38 additions & 15 deletions lib/genesis/utils/containers/lambda_iterator.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
260 Panama Street, Stanford, CA 94305, USA
*/

#include "genesis/utils/core/options.hpp"
#include "genesis/utils/core/thread_pool.hpp"

#include <cassert>
Expand Down Expand Up @@ -427,6 +428,9 @@ class LambdaIterator

void begin_iteration_with_buffer_()
{
// The main class makes sure that we have a thread pool when we want to use the buffer
assert( generator_->thread_pool_ );

// Init the records and create empty VcfRecord to read into.
// The blocks have been initialized in the contructor already; assert this.
current_block_->resize( generator_->block_size_ );
Expand All @@ -444,7 +448,7 @@ class LambdaIterator
// However, if the first block was fully read, we start the async worker thread
// to fill the buffer with the next block of data.
if( end_pos_ == generator_->block_size_ ) {
fill_buffer_block_();
enqueue_buffer_block_filling_();
} else if( end_pos_ == 0 ) {
// Edge case: zero elements read. We are already done then.
end_iteration_();
Expand Down Expand Up @@ -473,7 +477,7 @@ class LambdaIterator
current_block_->size() == 1
)
);
assert( buffer_block_->size() == generator_->block_size_ );
assert( buffer_block_->size() == generator_->block_size_ );

// Run the actual increment implementation.
if( generator_->block_size_ == 0 ) {
Expand Down Expand Up @@ -535,21 +539,32 @@ class LambdaIterator
return;
}

// Here, we know that there is more data, so we can swap the buffer,
// start reading again, and set or internal current location
// to the first element of the vector again.
// Here, we know that there is more data buffered, so we can swap the buffer.
// If that last read returned with less than a full block of items, we have reached
// the end of the input, and do not want to start reading more (that condition was
// missing before - nasty bug that only showed up in consequetive concurrent tests,
// so that a thread was started even after there was no more input. That thread
// would then access data from the next test case, which of course happened to
// occupy the same stack space. This never showed up in read data, as that
// stray thread would not communicate with anything, and somehow did not throw
// or segfault either. Super nasty to find.
// Anyway, if we had read a full block before, we need to start reading again.
// Finally, set or internal current location to the first element of the vector again.
assert( end_pos_ > 0 && end_pos_ <= generator_->block_size_ );
std::swap( buffer_block_, current_block_ );
fill_buffer_block_();
if( end_pos_ == generator_->block_size_ ) {
enqueue_buffer_block_filling_();
}
current_pos_ = 0;
}

// Now we have moved to the next element, and potentially the next block,
// so we are ready to call the observers for that element.
assert( current_pos_ < end_pos_ );
execute_observers_( (*current_block_)[current_pos_] );
}

void fill_buffer_block_()
void enqueue_buffer_block_filling_()
{
// Those shared pointers have been initialized in the constructor; let's assert this.
assert( generator_ );
Expand All @@ -569,7 +584,7 @@ class LambdaIterator
current_block_->size() == 1
)
);
assert( buffer_block_->size() == generator_->block_size_ );
assert( buffer_block_->size() == generator_->block_size_ );

// In order to use lambda captures by copy for class member variables in C++11, we first
// have to make local copies, and then capture those. Capturing the class members direclty
Expand All @@ -580,6 +595,10 @@ class LambdaIterator

// The lambda returns the result of read_block_ call, that is, the number of records that
// have been read, and which we later (in the future_) use to see how much data we got.
// It is of course important that the input is read in the correct order of elements.
// Despite its parallel nature, we can use a thread pool here, as we are only ever
// submitting a single read task to it, so there cannot be two reads of the same lambda
// iterator in the pool.
*future_ = generator_->thread_pool_->enqueue(
[ generator, buffer_block, block_size ](){
return read_block_( generator, buffer_block, block_size );
Expand Down Expand Up @@ -743,17 +762,16 @@ class LambdaIterator
*/
LambdaIterator(
std::function<bool(value_type&)> get_element,
std::shared_ptr<utils::ThreadPool> thread_pool = {},
std::shared_ptr<utils::ThreadPool> thread_pool = nullptr,
size_t block_size = DEFAULT_BLOCK_SIZE
)
: get_element_(get_element)
, thread_pool_( thread_pool )
, block_size_( block_size )
{
if( ! thread_pool ) {
thread_pool = std::make_shared<utils::ThreadPool>( 1 );
// We only need to set the thread pool if we are going to use buffering.
if( block_size > 0 ) {
thread_pool_ = thread_pool ? thread_pool : Options::get().global_thread_pool();
}
thread_pool_ = thread_pool;
}

/**
Expand All @@ -766,7 +784,7 @@ class LambdaIterator
LambdaIterator(
std::function<bool(value_type&)> get_element,
Data const& data,
std::shared_ptr<utils::ThreadPool> thread_pool = {},
std::shared_ptr<utils::ThreadPool> thread_pool = nullptr,
size_t block_size = DEFAULT_BLOCK_SIZE
)
: LambdaIterator( get_element, thread_pool, block_size )
Expand All @@ -782,7 +800,7 @@ class LambdaIterator
LambdaIterator(
std::function<bool(value_type&)> get_element,
Data&& data,
std::shared_ptr<utils::ThreadPool> thread_pool = {},
std::shared_ptr<utils::ThreadPool> thread_pool = nullptr,
size_t block_size = DEFAULT_BLOCK_SIZE
)
: LambdaIterator( get_element, thread_pool, block_size )
Expand Down Expand Up @@ -1041,6 +1059,11 @@ class LambdaIterator
"LambdaIterator: Cannot change thread pool after iteration has started."
);
}
if( !value ) {
throw std::runtime_error(
"LambdaIterator: Cannot change thread pool to empty object."
);
}
thread_pool_ = value;
return *this;
}
Expand Down
114 changes: 69 additions & 45 deletions test/src/utils/containers/iterators.cpp
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
/*
Genesis - A toolkit for working with phylogenetic data.
Copyright (C) 2014-2023 Lucas Czech
Copyright (C) 2014-2024 Lucas Czech
This program is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
Expand Down Expand Up @@ -35,6 +35,7 @@
#include "genesis/utils/containers/transform_iterator.hpp"

#include <algorithm>
#include <atomic>
#include <numeric>
#include <utility>
#include <vector>
Expand Down Expand Up @@ -208,6 +209,7 @@ TEST( Containers, FilterIterator )

void test_lambda_iterator_( size_t num_elements, size_t block_size )
{
LOG_DBG << "====================================";
LOG_DBG << "num_elements " << num_elements << ", block_size " << block_size;

// Create data as sequence of numbers, and get their sum.
Expand All @@ -216,21 +218,29 @@ void test_lambda_iterator_( size_t num_elements, size_t block_size )
std::iota( data.begin(), data.end(), 0 );
auto expected_sum = std::accumulate( data.begin(), data.end(), size_t{0} );

using NumberLambdaIterator = LambdaIterator<size_t>;
// The input is a sequence of numbers. We use a counter while looping to check every element.
std::atomic<size_t> read_counter{0};

// Set up the LambdaIterator.
using NumberLambdaIterator = LambdaIterator<size_t>;
auto beg = data.begin();
auto end = data.end();
auto generator = NumberLambdaIterator(
[beg, end]( size_t& value ) mutable {
[beg, end, &read_counter]( size_t& value ) mutable {
if( beg != end ) {
value = *beg;

// Check that the series is complete
auto const lc = read_counter.load();
EXPECT_EQ( lc, value );
++read_counter;

++beg;
return true;
} else {
return false;
}
}, {}, block_size
}, nullptr, block_size
);

// Result variables.
Expand Down Expand Up @@ -266,6 +276,16 @@ void test_lambda_iterator_( size_t num_elements, size_t block_size )
LOG_DBG << "at " << it;
loop_sum += it;
}

// Regression test.
// We had a bug where the lambda iterator would not check for the end of the input correctly.
// This test makes sure that after the loop is done, there is nothing in the thread pool
// any more - the iterator should have waited for the end of everything before finishing.
// We are only using the global thread pool sequentially in the tests here, so there
// cannot be anything left from other places once we are done with the iteration.
EXPECT_EQ( 0, Options::get().global_thread_pool()->currently_enqueued_tasks() );

// Check the numerical outputs as well
EXPECT_EQ( expected_sum, loop_sum );
EXPECT_EQ( expected_sum, visitor_sum );
}
Expand All @@ -279,45 +299,49 @@ TEST( Containers, LambdaIterator )
// But if needed, comment this line out, and each test will report its input.
LOG_SCOPE_LEVEL( genesis::utils::Logging::kInfo );

// No elements
test_lambda_iterator_( 0, 0 );
test_lambda_iterator_( 0, 1 );
test_lambda_iterator_( 0, 2 );
test_lambda_iterator_( 0, 3 );

// Single element
test_lambda_iterator_( 1, 0 );
test_lambda_iterator_( 1, 1 );
test_lambda_iterator_( 1, 2 );
test_lambda_iterator_( 1, 3 );

// Two elements
test_lambda_iterator_( 2, 0 );
test_lambda_iterator_( 2, 1 );
test_lambda_iterator_( 2, 2 );
test_lambda_iterator_( 2, 3 );

// Three elements
test_lambda_iterator_( 3, 0 );
test_lambda_iterator_( 3, 1 );
test_lambda_iterator_( 3, 2 );
test_lambda_iterator_( 3, 3 );

// Four elements
test_lambda_iterator_( 4, 0 );
test_lambda_iterator_( 4, 1 );
test_lambda_iterator_( 4, 2 );
test_lambda_iterator_( 4, 3 );

// Many elements
test_lambda_iterator_( 100, 0 );
test_lambda_iterator_( 100, 1 );
test_lambda_iterator_( 100, 2 );
test_lambda_iterator_( 100, 3 );

// Long buffer block
test_lambda_iterator_( 0, 100 );
test_lambda_iterator_( 1, 100 );
test_lambda_iterator_( 2, 100 );
test_lambda_iterator_( 3, 100 );
// Loop a few times, to have a higher chance of finding race conditions etc in the threading.
for( size_t i = 0; i < 250; ++i ) {

// No elements
test_lambda_iterator_( 0, 0 );
test_lambda_iterator_( 0, 1 );
test_lambda_iterator_( 0, 2 );
test_lambda_iterator_( 0, 3 );

// Single element
test_lambda_iterator_( 1, 0 );
test_lambda_iterator_( 1, 1 );
test_lambda_iterator_( 1, 2 );
test_lambda_iterator_( 1, 3 );

// Two elements
test_lambda_iterator_( 2, 0 );
test_lambda_iterator_( 2, 1 );
test_lambda_iterator_( 2, 2 );
test_lambda_iterator_( 2, 3 );

// Three elements
test_lambda_iterator_( 3, 0 );
test_lambda_iterator_( 3, 1 );
test_lambda_iterator_( 3, 2 );
test_lambda_iterator_( 3, 3 );

// Four elements
test_lambda_iterator_( 4, 0 );
test_lambda_iterator_( 4, 1 );
test_lambda_iterator_( 4, 2 );
test_lambda_iterator_( 4, 3 );

// Many elements
test_lambda_iterator_( 100, 0 );
test_lambda_iterator_( 100, 1 );
test_lambda_iterator_( 100, 2 );
test_lambda_iterator_( 100, 3 );

// Long buffer block
test_lambda_iterator_( 0, 100 );
test_lambda_iterator_( 1, 100 );
test_lambda_iterator_( 2, 100 );
test_lambda_iterator_( 3, 100 );
}
}

0 comments on commit ddb00d5

Please sign in to comment.