Skip to content

Commit

Permalink
Construction in a background thread
Browse files Browse the repository at this point in the history
  • Loading branch information
jltsiren committed Nov 6, 2017
1 parent 4ea0e8e commit a0c3e2e
Show file tree
Hide file tree
Showing 2 changed files with 123 additions and 20 deletions.
113 changes: 93 additions & 20 deletions dynamic_gbwt.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -665,30 +665,24 @@ DynamicGBWT::insert(text_buffer_type& text, size_type batch_size)
return;
}
if(batch_size == 0) { batch_size = text.size(); }

// Find the last endmarker in the batch, read the batch into memory, and insert the sequences.
size_type old_sequences = this->sequences();
for(size_type start_offset = 0; start_offset < text.size(); )

// Create a builder using this index.
GBWTBuilder builder(text.width(), batch_size);
builder.index.swap(*this);

// Insert all sequences.
std::vector<node_type> sequence;
for(size_type node : text)
{
size_type limit = std::min(text.size(), start_offset + batch_size);
while(limit > start_offset)
{
if(text[limit - 1] == ENDMARKER) { break; }
limit--;
}
if(limit <= start_offset)
{
std::cerr << "DynamicGBWT::insert(): Cannot find an endmarker in the batch starting from offset " << start_offset << std::endl;
std::exit(EXIT_FAILURE);
}
text_type batch(limit - start_offset, 0, text.width());
for(size_type i = start_offset; i < limit; i++) { batch[i - start_offset] = text[i]; }
gbwt::insertBatch(*this, batch, batch.size(), this->sequences() - old_sequences);
start_offset = limit;
if(node == ENDMARKER) { builder.insert(sequence); sequence.clear(); }
else { sequence.push_back(node); }
}
if(!(sequence.empty())) { builder.insert(sequence); sequence.clear(); }

// Finally sort the outgoing edges.
this->recode();
// Finish the construction and get the index contents back.
builder.finish();
this->swap(builder.index);

if(Verbosity::level >= Verbosity::BASIC)
{
Expand Down Expand Up @@ -852,4 +846,83 @@ printStatistics(const DynamicGBWT& gbwt, const std::string& name)

//------------------------------------------------------------------------------

GBWTBuilder::GBWTBuilder(size_type node_width, size_type buffer_size) :
input_buffer(buffer_size, 0, node_width), construction_buffer(buffer_size, 0, node_width),
input_tail(0), construction_tail(0),
inserted_sequences(0), batch_sequences(0)
{
}

GBWTBuilder::~GBWTBuilder()
{
// Wait for the construction thread to finish.
if(this->builder.joinable()) { this->builder.join(); }
}

void
GBWTBuilder::insert(std::vector<node_type>& sequence, bool both_orientations)
{
size_type space_required = sequence.size() + 1;
if(both_orientations) { space_required *= 2; }
if(space_required > this->input_buffer.size())
{
std::cerr << "GBWTBuilder::insert(): Sequence is too long for the buffer, skipping" << std::endl;
return;
}

// Flush the buffer if necessary.
if(this->input_tail + space_required > this->input_buffer.size())
{
this->flush();
}

// Forward orientation.
for(node_type node : sequence) { this->input_buffer[this->input_tail] = node; this->input_tail++; }
this->input_buffer[this->input_tail] = ENDMARKER; this->input_tail++;
this->batch_sequences++;

// Reverse orientation.
if(both_orientations)
{
for(auto iter = sequence.rbegin(); iter != sequence.rend(); ++iter) { this->input_buffer[this->input_tail] = Node::reverse(*iter); this->input_tail++; }
this->input_buffer[this->input_tail] = ENDMARKER; this->input_tail++;
this->batch_sequences++;
}
}

void
GBWTBuilder::finish()
{
// Flush the buffer if necessary.
this->flush();

// Wait for the construction thread to finish.
if(this->builder.joinable()) { this->builder.join(); }

// Finally recode the index to make it serializable.
this->index.recode();
}

void
GBWTBuilder::flush()
{
// Wait for the construction thread to finish.
if(this->builder.joinable()) { this->builder.join(); }

// Swap the input buffer and the construction buffer.
this->input_buffer.swap(this->construction_buffer);
this->construction_tail = this->input_tail;
this->input_tail = 0;

// Launch a new construction thread if necessary.
if(this->construction_tail > 0)
{
this->builder = std::thread(gbwt::insertBatch<text_type>, std::ref(this->index), std::cref(this->construction_buffer), this->construction_tail, this->inserted_sequences);
this->inserted_sequences += this->batch_sequences;
this->batch_sequences = 0;
}
}

//------------------------------------------------------------------------------

} // namespace gbwt
30 changes: 30 additions & 0 deletions include/gbwt/dynamic_gbwt.h
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@
#ifndef GBWT_DYNAMIC_GBWT_H
#define GBWT_DYNAMIC_GBWT_H

#include <thread>

#include <gbwt/gbwt.h>

namespace gbwt
Expand All @@ -37,6 +39,8 @@ namespace gbwt

//------------------------------------------------------------------------------

class GBWTBuilder;

class DynamicGBWT
{
public:
Expand Down Expand Up @@ -250,6 +254,8 @@ class DynamicGBWT
*/
void recode();

friend class GBWTBuilder;

//------------------------------------------------------------------------------

}; // class DynamicGBWT
Expand All @@ -258,6 +264,30 @@ void printStatistics(const DynamicGBWT& gbwt, const std::string& name);

//------------------------------------------------------------------------------

class GBWTBuilder
{
public:
GBWTBuilder(size_type node_width, size_type batch_size = DynamicGBWT::INSERT_BATCH_SIZE);
~GBWTBuilder();

void insert(std::vector<node_type>& sequence, bool both_orientations = false);
void finish();

DynamicGBWT index;
text_type input_buffer, construction_buffer;
size_type input_tail, construction_tail;
size_type inserted_sequences, batch_sequences;
std::thread builder;

GBWTBuilder(const GBWTBuilder&) = delete;
GBWTBuilder& operator= (const GBWTBuilder&) = delete;

private:
void flush();
}; // class GBWTBuilder

//------------------------------------------------------------------------------

} // namespace gbwt

#endif // GBWT_DYNAMIC_GBWT_H

0 comments on commit a0c3e2e

Please sign in to comment.