diff --git a/c++/pod5_format/file_writer.cpp b/c++/pod5_format/file_writer.cpp index 38eab0d..a5f37b7 100644 --- a/c++/pod5_format/file_writer.cpp +++ b/c++/pod5_format/file_writer.cpp @@ -115,6 +115,8 @@ class FileWriterImpl { virtual ~FileWriterImpl() = default; + virtual std::string path() const = 0; + pod5::Result lookup_end_reason(ReadEndReason end_reason) { return m_read_table_dict_writers.end_reason_writer->lookup(end_reason); @@ -345,6 +347,8 @@ class CombinedFileWriterImpl : public FileWriterImpl { { } + std::string path() const override { return m_path; } + arrow::Status close() override { if (is_closed()) { @@ -423,6 +427,8 @@ FileWriter::FileWriter(std::unique_ptr && impl) : m_impl(std::mo FileWriter::~FileWriter() { (void)close(); } +std::string FileWriter::path() const { return m_impl->path(); } + arrow::Status FileWriter::close() { return m_impl->close(); } arrow::Status FileWriter::add_complete_read( diff --git a/c++/pod5_format/file_writer.h b/c++/pod5_format/file_writer.h index 40c699b..2f2e633 100644 --- a/c++/pod5_format/file_writer.h +++ b/c++/pod5_format/file_writer.h @@ -91,6 +91,8 @@ class POD5_FORMAT_EXPORT FileWriter { FileWriter(std::unique_ptr && impl); ~FileWriter(); + std::string path() const; + pod5::Status close(); pod5::Status add_complete_read( diff --git a/c++/pod5_format_pybind/repack/repack_functions.h b/c++/pod5_format_pybind/repack/repack_functions.h index f31a4d7..82fee78 100644 --- a/c++/pod5_format_pybind/repack/repack_functions.h +++ b/c++/pod5_format_pybind/repack/repack_functions.h @@ -24,7 +24,7 @@ struct ReadReadData { std::vector signal_row_sizes; std::vector signal_rows_read_ids; - std::vector signal_rows; + std::vector signal_rows; }; arrow::Result read_read_data( @@ -131,21 +131,73 @@ arrow::Result read_read_data( return result; } +arrow::Status read_signal( + std::shared_ptr const & source_file, + pod5::SignalType input_compression_type, + std::uint64_t abs_signal_row, + boost::uuids::uuid read_id, + pod5::SignalType output_compression_type, + arrow::FixedSizeBinaryBuilder & read_id_builder, + pod5::SignalBuilderVariant & signal_builder, + arrow::UInt32Builder & samples_builder, + arrow::MemoryPool * pool) +{ + auto signal_rows_span = gsl::make_span(&abs_signal_row, 1); + + // If were using the same compression type in both files, just copy compressed: + if (input_compression_type == output_compression_type + && output_compression_type == pod5::SignalType::VbzSignal) + { + std::vector sample_counts; + ARROW_ASSIGN_OR_RAISE( + auto extracted_signal, + source_file->extract_samples_inplace(signal_rows_span, sample_counts)); + + assert(1 == extracted_signal.size()); + assert(sample_counts.size() == extracted_signal.size()); + auto signal_span = + gsl::make_span(extracted_signal.front()->data(), extracted_signal.front()->size()); + + ARROW_RETURN_NOT_OK(read_id_builder.Append(read_id.begin())); + ARROW_RETURN_NOT_OK(boost::apply_visitor( + pod5::visitors::append_pre_compressed_signal{signal_span}, signal_builder)); + ARROW_RETURN_NOT_OK(samples_builder.Append(sample_counts.front())); + } else { + // Find the sample count of the complete read: + ARROW_ASSIGN_OR_RAISE( + auto sample_count, source_file->extract_sample_count(signal_rows_span)); + + std::vector signal(sample_count); + auto signal_buffer_span = gsl::make_span(signal); + ARROW_RETURN_NOT_OK(source_file->extract_samples(signal_rows_span, signal_buffer_span)); + + ARROW_RETURN_NOT_OK(read_id_builder.Append(read_id.begin())); + ARROW_RETURN_NOT_OK(boost::apply_visitor( + pod5::visitors::append_signal{signal_buffer_span, pool}, signal_builder)); + ARROW_RETURN_NOT_OK(samples_builder.Append(sample_count)); + } + return arrow::Status::OK(); +} + struct RequestedSignalReads { std::vector complete_requests; - std::shared_ptr partial_request; + std::shared_ptr partial_request; }; arrow::Result request_signal_reads( std::shared_ptr const & source_file, + pod5::SignalType output_compression_type, std::size_t signal_batch_size, std::vector read_ids, - std::vector signal_rows, - std::shared_ptr const & partial_request, - std::shared_ptr const & dest_read_table_rows) + std::vector signal_rows, + std::shared_ptr const & partial_request, + std::shared_ptr const & dest_read_table_rows, + arrow::MemoryPool * pool) { POD5_TRACE_FUNCTION(); + auto const input_signal_type = source_file->signal_type(); + assert(read_ids.size() == signal_rows.size()); RequestedSignalReads result; @@ -156,28 +208,39 @@ arrow::Result request_signal_reads( std::size_t signal_rows_position = 0; while (signal_rows_position < signal_rows.size()) { if (!next_request) { - next_request = std::make_shared(); - next_request->rows.reserve(signal_batch_size); + ARROW_ASSIGN_OR_RAISE( + auto signal_builder, pod5::make_signal_builder(output_compression_type, pool)); + next_request = std::make_shared( + std::move(signal_builder), pool); + next_request->patch_rows.reserve(signal_batch_size); } auto to_write = std::min( signal_rows.size() - signal_rows_position, - signal_batch_size - next_request->rows.size()); + signal_batch_size - next_request->patch_rows.size()); for (std::size_t i = 0; i < to_write; ++i) { - auto const batch_row_index = signal_rows_position + i; - assert(batch_row_index < signal_rows.size()); - assert(batch_row_index < dest_read_table_rows->signal_row_indices.size()); - next_request->rows.emplace_back( + auto const dest_batch_row_index = signal_rows_position + i; + assert(dest_batch_row_index < signal_rows.size()); + assert(dest_batch_row_index < dest_read_table_rows->signal_row_indices.size()); + + ARROW_RETURN_NOT_OK(read_signal( source_file, - dest_read_table_rows, - read_ids[signal_rows_position + i], + input_signal_type, signal_rows[signal_rows_position + i], - batch_row_index); + read_ids[signal_rows_position + i], + output_compression_type, + *next_request->read_id_builder, + next_request->signal_builder, + next_request->samples_builder, + pool)); + + next_request->patch_rows.emplace_back(dest_read_table_rows, dest_batch_row_index); } signal_rows_position += to_write; - assert(next_request->rows.size() <= signal_batch_size); - if (next_request->rows.size() >= signal_batch_size) { + assert(next_request->row_count() <= signal_batch_size); + assert(next_request->row_count() <= signal_batch_size); + if (next_request->row_count() >= signal_batch_size) { result.complete_requests.emplace_back(std::move(next_request)); next_request.reset(); } @@ -193,71 +256,23 @@ struct ReadSignal { std::vector> columns; }; -arrow::Result read_signal_data( - states::unread_split_signal_table_batch_rows const & signal_rows, - pod5::SignalType output_compression_type, - arrow::MemoryPool * pool) +arrow::Result read_signal_data(states::read_split_signal_table_batch_rows & signal_rows) { POD5_TRACE_FUNCTION(); - std::unique_ptr read_id_builder = - pod5::make_read_id_builder(pool); - ARROW_ASSIGN_OR_RAISE( - pod5::SignalBuilderVariant signal_builder, - pod5::make_signal_builder(output_compression_type, pool)); - std::unique_ptr samples_builder = - std::make_unique(pool); - - for (auto const row : signal_rows.rows) { - auto const input_signal_type = row.source_file->signal_type(); - - auto signal_rows_span = gsl::make_span(&row.source_signal_row_index, 1); - - // If were using the same compression type in both files, just copy compressed: - if (input_signal_type == output_compression_type - && output_compression_type == pod5::SignalType::VbzSignal) - { - std::vector sample_counts; - ARROW_ASSIGN_OR_RAISE( - auto extracted_signal, - row.source_file->extract_samples_inplace(signal_rows_span, sample_counts)); - - assert(1 == extracted_signal.size()); - assert(sample_counts.size() == extracted_signal.size()); - auto signal_span = - gsl::make_span(extracted_signal.front()->data(), extracted_signal.front()->size()); - - ARROW_RETURN_NOT_OK(read_id_builder->Append(row.read_id.begin())); - ARROW_RETURN_NOT_OK(boost::apply_visitor( - pod5::visitors::append_pre_compressed_signal{signal_span}, signal_builder)); - ARROW_RETURN_NOT_OK(samples_builder->Append(sample_counts.front())); - } else { - // Find the sample count of the complete read: - ARROW_ASSIGN_OR_RAISE( - auto sample_count, row.source_file->extract_sample_count(signal_rows_span)); - - std::vector signal(sample_count); - auto signal_buffer_span = gsl::make_span(signal); - ARROW_RETURN_NOT_OK( - row.source_file->extract_samples(signal_rows_span, signal_buffer_span)); - - ARROW_RETURN_NOT_OK(read_id_builder->Append(row.read_id.begin())); - ARROW_RETURN_NOT_OK(boost::apply_visitor( - pod5::visitors::append_signal{signal_buffer_span, pool}, signal_builder)); - ARROW_RETURN_NOT_OK(samples_builder->Append(sample_count)); - } - } - ReadSignal result; pod5::SignalTableSchemaDescription field_locations; result.final_batch = signal_rows.final_batch; - result.row_count = signal_rows.rows.size(); + result.row_count = signal_rows.row_count(); result.columns = {nullptr, nullptr, nullptr}; - ARROW_RETURN_NOT_OK(read_id_builder->Finish(&result.columns[field_locations.read_id])); + ARROW_RETURN_NOT_OK( + signal_rows.read_id_builder->Finish(&result.columns[field_locations.read_id])); ARROW_RETURN_NOT_OK(boost::apply_visitor( - pod5::visitors::finish_column{&result.columns[field_locations.signal]}, signal_builder)); - ARROW_RETURN_NOT_OK(samples_builder->Finish(&result.columns[field_locations.samples])); + pod5::visitors::finish_column{&result.columns[field_locations.signal]}, + signal_rows.signal_builder)); + ARROW_RETURN_NOT_OK( + signal_rows.samples_builder.Finish(&result.columns[field_locations.samples])); return result; } diff --git a/c++/pod5_format_pybind/repack/repack_output.cpp b/c++/pod5_format_pybind/repack/repack_output.cpp index c8eff03..8761b6f 100644 --- a/c++/pod5_format_pybind/repack/repack_output.cpp +++ b/c++/pod5_format_pybind/repack/repack_output.cpp @@ -67,9 +67,10 @@ struct Pod5RepackerOutputState { Pod5RepackerOutputThreadState * get_thread_state() { - auto it = thread_states.find(std::this_thread::get_id()); - if (it == thread_states.end()) { - it = thread_states.emplace(std::this_thread::get_id(), dict_manager).first; + auto ts = thread_states.synchronize(); + auto it = ts->find(std::this_thread::get_id()); + if (it == ts->end()) { + it = ts->emplace(std::this_thread::get_id(), dict_manager).first; } return &it->second; } @@ -80,11 +81,12 @@ struct Pod5RepackerOutputState { std::mutex read_table_writer_mutex; std::mutex signal_table_writer_mutex; std::shared_ptr dict_manager; - boost::synchronized_value> + boost::synchronized_value> partial_signal_batch; std::atomic reads_completed{0}; - std::unordered_map thread_states; + boost::synchronized_value> + thread_states; boost::synchronized_value> output_read_ids; }; @@ -134,11 +136,13 @@ struct StateOperator : boost::static_visitor> auto signal_request_result, request_signal_reads( read_result.input, + progress_state->output_file->signal_type(), progress_state->output_file->signal_table_batch_size(), read_result.signal_rows_read_ids, read_result.signal_rows, *partial_signal_batch, - read_table_rows)); + read_table_rows, + progress_state->memory_pool)); *partial_signal_batch = signal_request_result.partial_request; return StateProgressResult{std::move(signal_request_result.complete_requests)}; @@ -146,14 +150,11 @@ struct StateOperator : boost::static_visitor> } arrow::Result operator()( - std::shared_ptr & batch) const + std::shared_ptr & batch) const { POD5_TRACE_FUNCTION(); - ARROW_ASSIGN_OR_RAISE( - auto read_signal_result, - read_signal_data( - *batch, progress_state->output_file->signal_type(), progress_state->memory_pool)); + ARROW_ASSIGN_OR_RAISE(auto read_signal_result, read_signal_data(*batch)); std::pair inserted_signal_rows; { @@ -168,13 +169,13 @@ struct StateOperator : boost::static_visitor> std::vector result_new_states; - for (std::size_t i = 0; i < batch->rows.size(); ++i) { - auto const & row = batch->rows[i]; + for (std::size_t i = 0; i < batch->patch_rows.size(); ++i) { + auto const & row = batch->patch_rows[i]; auto const & dest_read_table = row.dest_read_table; assert(dest_read_table); - assert(row.batch_row_index < dest_read_table->signal_row_indices.size()); - dest_read_table->signal_row_indices[row.batch_row_index] = + assert(row.dest_batch_row_index < dest_read_table->signal_row_indices.size()); + dest_read_table->signal_row_indices[row.dest_batch_row_index] = inserted_signal_rows.first + i; dest_read_table->written_row_indices += 1; diff --git a/c++/pod5_format_pybind/repack/repack_output.h b/c++/pod5_format_pybind/repack/repack_output.h index 4bb3a68..03d7f0a 100644 --- a/c++/pod5_format_pybind/repack/repack_output.h +++ b/c++/pod5_format_pybind/repack/repack_output.h @@ -22,6 +22,8 @@ class Pod5RepackerOutput { bool check_duplicate_read_ids); ~Pod5RepackerOutput(); + std::string path() const { return m_output->path(); } + std::shared_ptr const & repacker() const { return m_repacker; } bool has_tasks() const; diff --git a/c++/pod5_format_pybind/repack/repack_states.h b/c++/pod5_format_pybind/repack/repack_states.h index 5878c84..ad4b130 100644 --- a/c++/pod5_format_pybind/repack/repack_states.h +++ b/c++/pod5_format_pybind/repack/repack_states.h @@ -1,7 +1,10 @@ #pragma once #include "pod5_format/file_reader.h" +#include "pod5_format/signal_builder.h" +#include +#include #include #include @@ -38,39 +41,45 @@ class read_read_table_rows_no_signal { std::vector signal_row_indices; }; -class unread_split_signal_table_batch_rows { +class read_split_signal_table_batch_rows { public: - struct RowToRead { - RowToRead( - std::shared_ptr const & _source_file, - std::shared_ptr const & _dest_read_table, - boost::uuids::uuid _read_id, - std::uint64_t _source_signal_row_index, - std::uint64_t _batch_row_index) - : read_id(_read_id) - , source_file(_source_file) - , dest_read_table(_dest_read_table) - , source_signal_row_index(_source_signal_row_index) - , batch_row_index(_batch_row_index) + struct PatchRecord { + PatchRecord( + std::shared_ptr dest_read_table, + std::uint64_t dest_batch_row_index) + : dest_read_table(dest_read_table) + , dest_batch_row_index(dest_batch_row_index) { } - boost::uuids::uuid read_id; - std::shared_ptr source_file; std::shared_ptr dest_read_table; - std::uint64_t source_signal_row_index; - std::uint64_t batch_row_index; + std::uint64_t dest_batch_row_index; }; - std::vector rows; + read_split_signal_table_batch_rows( + pod5::SignalBuilderVariant && signal_builder, + arrow::MemoryPool * pool) + : read_id_builder(pod5::make_read_id_builder(pool)) + , signal_builder(std::move(signal_builder)) + , samples_builder(pool) + { + } + + std::unique_ptr read_id_builder; + pod5::SignalBuilderVariant signal_builder; + arrow::UInt32Builder samples_builder; + + std::vector patch_rows; bool final_batch = false; + + std::size_t row_count() const { return patch_rows.size(); } }; struct finished {}; using shared_variant = boost::variant< std::shared_ptr, - std::shared_ptr, + std::shared_ptr, std::shared_ptr, std::shared_ptr>;