Skip to content

Commit

Permalink
Merge branch 'gpimm/read_signal_before_batch_is_complete' into 'master'
Browse files Browse the repository at this point in the history
Read content from source files prior to batch completion

See merge request minknow/pod5-file-format!312
  • Loading branch information
HalfPhoton committed Nov 13, 2023
2 parents dd2ad59 + f0b63c4 commit 25a3406
Show file tree
Hide file tree
Showing 6 changed files with 142 additions and 107 deletions.
6 changes: 6 additions & 0 deletions c++/pod5_format/file_writer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,8 @@ class FileWriterImpl {

virtual ~FileWriterImpl() = default;

virtual std::string path() const = 0;

pod5::Result<EndReasonDictionaryIndex> lookup_end_reason(ReadEndReason end_reason)
{
return m_read_table_dict_writers.end_reason_writer->lookup(end_reason);
Expand Down Expand Up @@ -345,6 +347,8 @@ class CombinedFileWriterImpl : public FileWriterImpl {
{
}

std::string path() const override { return m_path; }

arrow::Status close() override
{
if (is_closed()) {
Expand Down Expand Up @@ -423,6 +427,8 @@ FileWriter::FileWriter(std::unique_ptr<FileWriterImpl> && impl) : m_impl(std::mo

FileWriter::~FileWriter() { (void)close(); }

std::string FileWriter::path() const { return m_impl->path(); }

arrow::Status FileWriter::close() { return m_impl->close(); }

arrow::Status FileWriter::add_complete_read(
Expand Down
2 changes: 2 additions & 0 deletions c++/pod5_format/file_writer.h
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,8 @@ class POD5_FORMAT_EXPORT FileWriter {
FileWriter(std::unique_ptr<FileWriterImpl> && impl);
~FileWriter();

std::string path() const;

pod5::Status close();

pod5::Status add_complete_read(
Expand Down
161 changes: 88 additions & 73 deletions c++/pod5_format_pybind/repack/repack_functions.h
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ struct ReadReadData {
std::vector<std::size_t> signal_row_sizes;

std::vector<boost::uuids::uuid> signal_rows_read_ids;
std::vector<std::size_t> signal_rows;
std::vector<std::uint64_t> signal_rows;
};

arrow::Result<ReadReadData> read_read_data(
Expand Down Expand Up @@ -131,21 +131,73 @@ arrow::Result<ReadReadData> read_read_data(
return result;
}

arrow::Status read_signal(
std::shared_ptr<pod5::FileReader> const & source_file,
pod5::SignalType input_compression_type,
std::uint64_t abs_signal_row,
boost::uuids::uuid read_id,
pod5::SignalType output_compression_type,
arrow::FixedSizeBinaryBuilder & read_id_builder,
pod5::SignalBuilderVariant & signal_builder,
arrow::UInt32Builder & samples_builder,
arrow::MemoryPool * pool)
{
auto signal_rows_span = gsl::make_span(&abs_signal_row, 1);

// If were using the same compression type in both files, just copy compressed:
if (input_compression_type == output_compression_type
&& output_compression_type == pod5::SignalType::VbzSignal)
{
std::vector<uint32_t> sample_counts;
ARROW_ASSIGN_OR_RAISE(
auto extracted_signal,
source_file->extract_samples_inplace(signal_rows_span, sample_counts));

assert(1 == extracted_signal.size());
assert(sample_counts.size() == extracted_signal.size());
auto signal_span =
gsl::make_span(extracted_signal.front()->data(), extracted_signal.front()->size());

ARROW_RETURN_NOT_OK(read_id_builder.Append(read_id.begin()));
ARROW_RETURN_NOT_OK(boost::apply_visitor(
pod5::visitors::append_pre_compressed_signal{signal_span}, signal_builder));
ARROW_RETURN_NOT_OK(samples_builder.Append(sample_counts.front()));
} else {
// Find the sample count of the complete read:
ARROW_ASSIGN_OR_RAISE(
auto sample_count, source_file->extract_sample_count(signal_rows_span));

std::vector<std::int16_t> signal(sample_count);
auto signal_buffer_span = gsl::make_span(signal);
ARROW_RETURN_NOT_OK(source_file->extract_samples(signal_rows_span, signal_buffer_span));

ARROW_RETURN_NOT_OK(read_id_builder.Append(read_id.begin()));
ARROW_RETURN_NOT_OK(boost::apply_visitor(
pod5::visitors::append_signal{signal_buffer_span, pool}, signal_builder));
ARROW_RETURN_NOT_OK(samples_builder.Append(sample_count));
}
return arrow::Status::OK();
}

struct RequestedSignalReads {
std::vector<states::shared_variant> complete_requests;
std::shared_ptr<states::unread_split_signal_table_batch_rows> partial_request;
std::shared_ptr<states::read_split_signal_table_batch_rows> partial_request;
};

arrow::Result<RequestedSignalReads> request_signal_reads(
std::shared_ptr<pod5::FileReader> const & source_file,
pod5::SignalType output_compression_type,
std::size_t signal_batch_size,
std::vector<boost::uuids::uuid> read_ids,
std::vector<std::size_t> signal_rows,
std::shared_ptr<states::unread_split_signal_table_batch_rows> const & partial_request,
std::shared_ptr<states::read_read_table_rows_no_signal> const & dest_read_table_rows)
std::vector<std::uint64_t> signal_rows,
std::shared_ptr<states::read_split_signal_table_batch_rows> const & partial_request,
std::shared_ptr<states::read_read_table_rows_no_signal> const & dest_read_table_rows,
arrow::MemoryPool * pool)
{
POD5_TRACE_FUNCTION();

auto const input_signal_type = source_file->signal_type();

assert(read_ids.size() == signal_rows.size());

RequestedSignalReads result;
Expand All @@ -156,28 +208,39 @@ arrow::Result<RequestedSignalReads> request_signal_reads(
std::size_t signal_rows_position = 0;
while (signal_rows_position < signal_rows.size()) {
if (!next_request) {
next_request = std::make_shared<states::unread_split_signal_table_batch_rows>();
next_request->rows.reserve(signal_batch_size);
ARROW_ASSIGN_OR_RAISE(
auto signal_builder, pod5::make_signal_builder(output_compression_type, pool));
next_request = std::make_shared<states::read_split_signal_table_batch_rows>(
std::move(signal_builder), pool);
next_request->patch_rows.reserve(signal_batch_size);
}
auto to_write = std::min(
signal_rows.size() - signal_rows_position,
signal_batch_size - next_request->rows.size());
signal_batch_size - next_request->patch_rows.size());

for (std::size_t i = 0; i < to_write; ++i) {
auto const batch_row_index = signal_rows_position + i;
assert(batch_row_index < signal_rows.size());
assert(batch_row_index < dest_read_table_rows->signal_row_indices.size());
next_request->rows.emplace_back(
auto const dest_batch_row_index = signal_rows_position + i;
assert(dest_batch_row_index < signal_rows.size());
assert(dest_batch_row_index < dest_read_table_rows->signal_row_indices.size());

ARROW_RETURN_NOT_OK(read_signal(
source_file,
dest_read_table_rows,
read_ids[signal_rows_position + i],
input_signal_type,
signal_rows[signal_rows_position + i],
batch_row_index);
read_ids[signal_rows_position + i],
output_compression_type,
*next_request->read_id_builder,
next_request->signal_builder,
next_request->samples_builder,
pool));

next_request->patch_rows.emplace_back(dest_read_table_rows, dest_batch_row_index);
}
signal_rows_position += to_write;

assert(next_request->rows.size() <= signal_batch_size);
if (next_request->rows.size() >= signal_batch_size) {
assert(next_request->row_count() <= signal_batch_size);
assert(next_request->row_count() <= signal_batch_size);
if (next_request->row_count() >= signal_batch_size) {
result.complete_requests.emplace_back(std::move(next_request));
next_request.reset();
}
Expand All @@ -193,71 +256,23 @@ struct ReadSignal {
std::vector<std::shared_ptr<arrow::Array>> columns;
};

arrow::Result<ReadSignal> read_signal_data(
states::unread_split_signal_table_batch_rows const & signal_rows,
pod5::SignalType output_compression_type,
arrow::MemoryPool * pool)
arrow::Result<ReadSignal> read_signal_data(states::read_split_signal_table_batch_rows & signal_rows)
{
POD5_TRACE_FUNCTION();

std::unique_ptr<arrow::FixedSizeBinaryBuilder> read_id_builder =
pod5::make_read_id_builder(pool);
ARROW_ASSIGN_OR_RAISE(
pod5::SignalBuilderVariant signal_builder,
pod5::make_signal_builder(output_compression_type, pool));
std::unique_ptr<arrow::UInt32Builder> samples_builder =
std::make_unique<arrow::UInt32Builder>(pool);

for (auto const row : signal_rows.rows) {
auto const input_signal_type = row.source_file->signal_type();

auto signal_rows_span = gsl::make_span(&row.source_signal_row_index, 1);

// If were using the same compression type in both files, just copy compressed:
if (input_signal_type == output_compression_type
&& output_compression_type == pod5::SignalType::VbzSignal)
{
std::vector<uint32_t> sample_counts;
ARROW_ASSIGN_OR_RAISE(
auto extracted_signal,
row.source_file->extract_samples_inplace(signal_rows_span, sample_counts));

assert(1 == extracted_signal.size());
assert(sample_counts.size() == extracted_signal.size());
auto signal_span =
gsl::make_span(extracted_signal.front()->data(), extracted_signal.front()->size());

ARROW_RETURN_NOT_OK(read_id_builder->Append(row.read_id.begin()));
ARROW_RETURN_NOT_OK(boost::apply_visitor(
pod5::visitors::append_pre_compressed_signal{signal_span}, signal_builder));
ARROW_RETURN_NOT_OK(samples_builder->Append(sample_counts.front()));
} else {
// Find the sample count of the complete read:
ARROW_ASSIGN_OR_RAISE(
auto sample_count, row.source_file->extract_sample_count(signal_rows_span));

std::vector<std::int16_t> signal(sample_count);
auto signal_buffer_span = gsl::make_span(signal);
ARROW_RETURN_NOT_OK(
row.source_file->extract_samples(signal_rows_span, signal_buffer_span));

ARROW_RETURN_NOT_OK(read_id_builder->Append(row.read_id.begin()));
ARROW_RETURN_NOT_OK(boost::apply_visitor(
pod5::visitors::append_signal{signal_buffer_span, pool}, signal_builder));
ARROW_RETURN_NOT_OK(samples_builder->Append(sample_count));
}
}

ReadSignal result;

pod5::SignalTableSchemaDescription field_locations;
result.final_batch = signal_rows.final_batch;
result.row_count = signal_rows.rows.size();
result.row_count = signal_rows.row_count();
result.columns = {nullptr, nullptr, nullptr};
ARROW_RETURN_NOT_OK(read_id_builder->Finish(&result.columns[field_locations.read_id]));
ARROW_RETURN_NOT_OK(
signal_rows.read_id_builder->Finish(&result.columns[field_locations.read_id]));
ARROW_RETURN_NOT_OK(boost::apply_visitor(
pod5::visitors::finish_column{&result.columns[field_locations.signal]}, signal_builder));
ARROW_RETURN_NOT_OK(samples_builder->Finish(&result.columns[field_locations.samples]));
pod5::visitors::finish_column{&result.columns[field_locations.signal]},
signal_rows.signal_builder));
ARROW_RETURN_NOT_OK(
signal_rows.samples_builder.Finish(&result.columns[field_locations.samples]));
return result;
}

Expand Down
31 changes: 16 additions & 15 deletions c++/pod5_format_pybind/repack/repack_output.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -67,9 +67,10 @@ struct Pod5RepackerOutputState {

Pod5RepackerOutputThreadState * get_thread_state()
{
auto it = thread_states.find(std::this_thread::get_id());
if (it == thread_states.end()) {
it = thread_states.emplace(std::this_thread::get_id(), dict_manager).first;
auto ts = thread_states.synchronize();
auto it = ts->find(std::this_thread::get_id());
if (it == ts->end()) {
it = ts->emplace(std::this_thread::get_id(), dict_manager).first;
}
return &it->second;
}
Expand All @@ -80,11 +81,12 @@ struct Pod5RepackerOutputState {
std::mutex read_table_writer_mutex;
std::mutex signal_table_writer_mutex;
std::shared_ptr<ReadsTableDictionaryManager> dict_manager;
boost::synchronized_value<std::shared_ptr<states::unread_split_signal_table_batch_rows>>
boost::synchronized_value<std::shared_ptr<states::read_split_signal_table_batch_rows>>
partial_signal_batch;
std::atomic<std::size_t> reads_completed{0};

std::unordered_map<std::thread::id, Pod5RepackerOutputThreadState> thread_states;
boost::synchronized_value<std::unordered_map<std::thread::id, Pod5RepackerOutputThreadState>>
thread_states;

boost::synchronized_value<std::unordered_set<boost::uuids::uuid>> output_read_ids;
};
Expand Down Expand Up @@ -134,26 +136,25 @@ struct StateOperator : boost::static_visitor<arrow::Result<StateProgressResult>>
auto signal_request_result,
request_signal_reads(
read_result.input,
progress_state->output_file->signal_type(),
progress_state->output_file->signal_table_batch_size(),
read_result.signal_rows_read_ids,
read_result.signal_rows,
*partial_signal_batch,
read_table_rows));
read_table_rows,
progress_state->memory_pool));

*partial_signal_batch = signal_request_result.partial_request;
return StateProgressResult{std::move(signal_request_result.complete_requests)};
}
}

arrow::Result<StateProgressResult> operator()(
std::shared_ptr<states::unread_split_signal_table_batch_rows> & batch) const
std::shared_ptr<states::read_split_signal_table_batch_rows> & batch) const
{
POD5_TRACE_FUNCTION();

ARROW_ASSIGN_OR_RAISE(
auto read_signal_result,
read_signal_data(
*batch, progress_state->output_file->signal_type(), progress_state->memory_pool));
ARROW_ASSIGN_OR_RAISE(auto read_signal_result, read_signal_data(*batch));

std::pair<pod5::SignalTableRowIndex, pod5::SignalTableRowIndex> inserted_signal_rows;
{
Expand All @@ -168,13 +169,13 @@ struct StateOperator : boost::static_visitor<arrow::Result<StateProgressResult>>

std::vector<states::shared_variant> result_new_states;

for (std::size_t i = 0; i < batch->rows.size(); ++i) {
auto const & row = batch->rows[i];
for (std::size_t i = 0; i < batch->patch_rows.size(); ++i) {
auto const & row = batch->patch_rows[i];

auto const & dest_read_table = row.dest_read_table;
assert(dest_read_table);
assert(row.batch_row_index < dest_read_table->signal_row_indices.size());
dest_read_table->signal_row_indices[row.batch_row_index] =
assert(row.dest_batch_row_index < dest_read_table->signal_row_indices.size());
dest_read_table->signal_row_indices[row.dest_batch_row_index] =
inserted_signal_rows.first + i;
dest_read_table->written_row_indices += 1;

Expand Down
2 changes: 2 additions & 0 deletions c++/pod5_format_pybind/repack/repack_output.h
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@ class Pod5RepackerOutput {
bool check_duplicate_read_ids);
~Pod5RepackerOutput();

std::string path() const { return m_output->path(); }

std::shared_ptr<Pod5Repacker> const & repacker() const { return m_repacker; }

bool has_tasks() const;
Expand Down
Loading

0 comments on commit 25a3406

Please sign in to comment.