diff --git a/Dockerfile.pangenome_aware_deepvariant b/Dockerfile.pangenome_aware_deepvariant index 88283c96..0f435afc 100644 --- a/Dockerfile.pangenome_aware_deepvariant +++ b/Dockerfile.pangenome_aware_deepvariant @@ -65,6 +65,7 @@ COPY --from=builder /opt/deepvariant/bazel-out/k8-opt/bin/deepvariant/postproces COPY --from=builder /opt/deepvariant/bazel-out/k8-opt/bin/deepvariant/vcf_stats_report.zip . COPY --from=builder /opt/deepvariant/bazel-out/k8-opt/bin/deepvariant/show_examples.zip . COPY --from=builder /opt/deepvariant/bazel-out/k8-opt/bin/deepvariant/runtime_by_region_vis.zip . +COPY --from=builder /opt/deepvariant/bazel-out/k8-opt/bin/deepvariant/load_gbz_into_shared_memory.zip . COPY --from=builder /opt/deepvariant/scripts/run_pangenome_aware_deepvariant.py . RUN ./run-prereq.sh @@ -98,6 +99,10 @@ RUN \ "${BASH_HEADER}" \ 'python3 /opt/deepvariant/bin/runtime_by_region_vis.zip "$@"' > \ /opt/deepvariant/bin/runtime_by_region_vis && \ + printf "%s\n%s\n" \ + "${BASH_HEADER}" \ + 'python3 /opt/deepvariant/bin/load_gbz_into_shared_memory.zip "$@"' > \ + /opt/deepvariant/bin/load_gbz_into_shared_memory && \ printf "%s\n%s\n" \ "${BASH_HEADER}" \ 'python3 -u /opt/deepvariant/bin/run_pangenome_aware_deepvariant.py "$@"' > \ @@ -108,6 +113,7 @@ RUN \ /opt/deepvariant/bin/vcf_stats_report \ /opt/deepvariant/bin/show_examples \ /opt/deepvariant/bin/runtime_by_region_vis \ + /opt/deepvariant/bin/load_gbz_into_shared_memory \ /opt/deepvariant/bin/run_pangenome_aware_deepvariant # Copy models. diff --git a/WORKSPACE b/WORKSPACE index 8fcc114b..747939d4 100644 --- a/WORKSPACE +++ b/WORKSPACE @@ -43,10 +43,10 @@ http_archive( http_archive( name = "gbwt", build_file = "//:third_party/gbwt.BUILD", - sha256 = "eb90732969ba646702c7490e00859ec99bb9d5fa5e017bdfd5ddd13dc0c4ddc6", - strip_prefix = "gbwt-420f0f494a4cc4b258335d29bf43c58d59cfcd2e", + sha256 = "81eb0a9dc05100195f5dce7b537732d9c0e7896b118a6dd01e5fe1ac63b5deca", + strip_prefix = "gbwt-dbd5ba7c34687184ab46dd9df884f0223fdf1e18", urls = [ - "https://github.com/mobinasri/gbwt/archive/420f0f494a4cc4b258335d29bf43c58d59cfcd2e.zip", + "https://github.com/mobinasri/gbwt/archive/dbd5ba7c34687184ab46dd9df884f0223fdf1e18.zip", ], ) @@ -73,10 +73,10 @@ http_archive( http_archive( name = "gbwtgraph", build_file = "//:third_party/gbwtgraph.BUILD", - sha256 = "74ce1e0958c094717bbcf9e0c8d820ccc13b20b2c7824f2cca0045c764554748", - strip_prefix = "gbwtgraph-98661b0253a298838d645e71f65bdb9ddedfd408", + sha256 = "40c41c34b152a1eea6991e1acfdad8875e0c738e24cd36ca22dab5187c99a910", + strip_prefix = "gbwtgraph-c96ca88b65fc40ac4bd371319a29111015d38904", urls = [ - "https://github.com/mobinasri/gbwtgraph/archive/98661b0253a298838d645e71f65bdb9ddedfd408.zip", + "https://github.com/mobinasri/gbwtgraph/archive/c96ca88b65fc40ac4bd371319a29111015d38904.zip", ], ) @@ -115,9 +115,9 @@ http_archive( # That BUILD file must be kept in sync with the version of protobuf used. http_archive( name = "com_google_protobuf", + build_file = "//:third_party/protobuf.BUILD", patch_args = ["-p1"], patches = ["//:third_party/protobuf.patch"], - build_file = "//:third_party/protobuf.BUILD", sha256 = "cfcba2df10feec52a84208693937c17a4b5df7775e1635c1e3baffc487b24c9b", # This protobuf release is based on protobuf 3.9.2. strip_prefix = "protobuf-3.9.2", @@ -137,7 +137,6 @@ http_archive( ], ) - # bazel_skylib is now a required dependency of protobuf_archive. http_archive( name = "bazel_skylib", diff --git a/build_release_binaries.sh b/build_release_binaries.sh index 42397758..b7561179 100755 --- a/build_release_binaries.sh +++ b/build_release_binaries.sh @@ -168,6 +168,7 @@ bazel build -c opt \ # TODO: Replace this hand-made list with a find command. fix_zip_file "bazel-out/k8-opt/bin/deepvariant/train" fix_zip_file "bazel-out/k8-opt/bin/deepvariant/call_variants" +fix_zip_file "bazel-out/k8-opt/bin/deepvariant/load_gbz_into_shared_memory" fix_zip_file "bazel-out/k8-opt/bin/deepvariant/make_examples" fix_zip_file "bazel-out/k8-opt/bin/deepvariant/make_examples_pangenome_aware_dv" fix_zip_file "bazel-out/k8-opt/bin/deepvariant/make_examples_somatic" diff --git a/deepvariant/BUILD b/deepvariant/BUILD index b739d8f7..bbf4e4ca 100644 --- a/deepvariant/BUILD +++ b/deepvariant/BUILD @@ -12,6 +12,7 @@ filegroup( name = "binaries", srcs = [ "call_variants", + "load_gbz_into_shared_memory", "make_examples", "make_examples_pangenome_aware_dv", "make_examples_somatic", @@ -712,6 +713,24 @@ py_binary( ], ) +py_binary( + name = "load_gbz_into_shared_memory", + srcs = [ + "load_gbz_into_shared_memory.py", + ], + main = "load_gbz_into_shared_memory.py", + python_version = "PY3", + deps = [ + ":logging_level", + "//deepvariant/protos:deepvariant_py_pb2", + "//third_party/nucleus/io/python:gbz_reader", + "//third_party/nucleus/io/python:hts_verbose", + "//third_party/nucleus/util:errors", + "@absl_py//absl:app", + "@absl_py//absl/flags", + ], +) + py_binary( name = "make_examples_pangenome_aware_dv", srcs = [ diff --git a/deepvariant/load_gbz_into_shared_memory.py b/deepvariant/load_gbz_into_shared_memory.py new file mode 100644 index 00000000..028affe9 --- /dev/null +++ b/deepvariant/load_gbz_into_shared_memory.py @@ -0,0 +1,146 @@ +# Copyright 2021 Google LLC. +# +# Redistribution and use in source and binary forms, with or without +# modification, are permitted provided that the following conditions +# are met: +# +# 1. Redistributions of source code must retain the above copyright notice, +# this list of conditions and the following disclaimer. +# +# 2. Redistributions in binary form must reproduce the above copyright +# notice, this list of conditions and the following disclaimer in the +# documentation and/or other materials provided with the distribution. +# +# 3. Neither the name of the copyright holder nor the names of its +# contributors may be used to endorse or promote products derived from this +# software without specific prior written permission. +# +# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" +# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE +# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE +# ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE +# LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR +# CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF +# SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS +# INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN +# CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) +# ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE +# POSSIBILITY OF SUCH DAMAGE. +"""Load the sequneces of a GBZ file into shared memory, usable by multiple processes.""" + +from absl import app +from absl import flags + +from deepvariant import logging_level +from third_party.nucleus.io.python import gbz_reader +from third_party.nucleus.io.python import hts_verbose +from third_party.nucleus.util import errors + + +# Flags related to loading GBZ into shared memory. +_PANGENOME_GBZ = flags.DEFINE_string( + 'pangenome_gbz', + None, + ( + 'Required. Pangenome GBZ file to load into shared memory' + '(Only the sequences are loaded into shared memory.)' + ), +) + +_NUM_SHARDS = flags.DEFINE_integer( + 'num_shards', + None, + ( + 'Required. Number of shards that will use the shared memory. It is' + 'important to set this number correctly to make sure that the' + 'shared memory is not deleted before all processes are done using it.' + 'If this value is greater than required then the shared memory will' + 'exist even after all processes are done using it,' + 'which is not desired.' + ), +) + +_SHARED_MEMORY_NAME = flags.DEFINE_string( + 'shared_memory_name', + 'GBZ_SHARED_MEMORY', + ( + 'Name of the shared memory segment.' + ), +) + +_SHARED_MEMORY_SIZE_GB = flags.DEFINE_integer( + 'shared_memory_size_gb', + 10, + ( + 'Size of the shared memory in GB.' + ), +) + + +def load_gbz_into_shared_memory( + pangenome_gbz: str, + shared_memory_name: str, + shared_memory_size_gb: int, + num_shards: int, +): + """Loads a GBZ file into a shared memory segment.""" + sample_name = 'GRCh38' + context = 1000 + chrom_prefix = '' + create_shared_memory = True + num_processes = num_shards + + ## The only parameters that are important are: + ## - shared_memory_name, + ## - create_shared_memory, + ## - shared_memory_size_gb, + ## - num_processes + ## The rest are set to default values. This is because the shared memory will + ## keep only the sequences of the pangenome and not the rest of the + ## information. + ## Just instantating a gbz reader with create_shared_memory=True + ## will load the sequences into shared memory. + ## If num_processes is greater than 0 then the shared memory + ## will NOT be deleted after this call (to be more precise + ## after calling ~GbzReader() here). + gbz_reader.GbzReader( + pangenome_gbz, + sample_name, + context, + chrom_prefix, + shared_memory_name, + create_shared_memory, + shared_memory_size_gb, + num_processes + ) + + +def main(argv=()): + with errors.clean_commandline_error_exit(): + if len(argv) > 1: + errors.log_and_raise( + 'Command line parsing failure: load_gbz_into_shared_memory does not' + 'accept positional arguments but some are present on' + 'the command line:' + '"{}".'.format(str(argv)), + errors.CommandLineError, + ) + del argv # Unused. + + logging_level.set_from_flag() + hts_verbose.set(hts_verbose.htsLogLevel.HTS_LOG_WARNING) + + load_gbz_into_shared_memory( + _PANGENOME_GBZ.value, + _SHARED_MEMORY_NAME.value, + _SHARED_MEMORY_SIZE_GB.value, + _NUM_SHARDS.value, + ) + + +if __name__ == '__main__': + flags.mark_flags_as_required([ + 'pangenome_gbz', + 'num_shards', + ]) + app.run(main) diff --git a/deepvariant/make_examples_core.py b/deepvariant/make_examples_core.py index 4fed0e04..d326a754 100644 --- a/deepvariant/make_examples_core.py +++ b/deepvariant/make_examples_core.py @@ -1476,6 +1476,8 @@ def _make_sam_readers( ref_name=self.options.ref_name_pangenome, context=self.options.allele_counter_options.partition_size, chrom_prefix=self.options.ref_chrom_prefix, + shared_memory_name=self.options.gbz_shared_memory_name, + create_shared_memory=not self.options.use_loaded_gbz_shared_memory, ref_path=self.options.reference_filename if self.options.use_ref_for_cram else None, diff --git a/deepvariant/make_examples_pangenome_aware_dv.py b/deepvariant/make_examples_pangenome_aware_dv.py index 5ad700f4..b4b37a2e 100644 --- a/deepvariant/make_examples_pangenome_aware_dv.py +++ b/deepvariant/make_examples_pangenome_aware_dv.py @@ -169,6 +169,27 @@ ), ) +_USE_LOADED_GBZ_SHARED_MEMORY = flags.DEFINE_bool( + 'use_loaded_gbz_shared_memory', + False, + ( + 'If enabled, the sequences of the gbz file are already loaded into' + 'shared memory using load_gbz_into_shared_memory.py and the SamReader' + 'reads the sequences from the shared memory.' + ), +) + +_GBZ_SHARED_MEMORY_NAME = flags.DEFINE_string( + 'gbz_shared_memory_name', + 'GBZ_SHARED_MEMORY', + ( + 'Name of the shared memory segment that contains the sequences of the' + 'gbz format. If --use_loaded_gbz_shared_memory is enabled, this flag ' + 'must be set based on the name of the shared memory created by' + 'load_gbz_into_shared_memory.py' + ), +) + # Change any flag defaults that differ for Pangenome-aware DeepVariant. # I'm setting this to float('inf') because we don't want to include any # candidates from the non-target (i.e., pangenome) sample. @@ -294,6 +315,12 @@ def default_options(add_flags=True, flags_obj=None): options.ref_name_pangenome = flags_obj.ref_name_pangenome if flags_obj.ref_chrom_prefix: options.ref_chrom_prefix = flags_obj.ref_chrom_prefix + if flags_obj.use_loaded_gbz_shared_memory: + options.use_loaded_gbz_shared_memory = ( + flags_obj.use_loaded_gbz_shared_memory + ) + if flags_obj.gbz_shared_memory_name: + options.gbz_shared_memory_name = flags_obj.gbz_shared_memory_name if add_flags: options.bam_fname = f'{os.path.basename(flags_obj.reads)}|{os.path.basename(flags_obj.pangenome)}' diff --git a/deepvariant/protos/deepvariant.proto b/deepvariant/protos/deepvariant.proto index 0aadbf1e..69d6f66d 100644 --- a/deepvariant/protos/deepvariant.proto +++ b/deepvariant/protos/deepvariant.proto @@ -926,6 +926,14 @@ message MakeExamplesOptions { string ref_chrom_prefix = 80; bool output_phase_info = 81; + + // If true, the sequences of the gbz file is already loaded into shared + // memory and the SamReader reads the sequences from the shared memory. + bool use_loaded_gbz_shared_memory = 83; + + // The name of the shared memory segment that contains the sequences of the + // gbz file. + string gbz_shared_memory_name = 84; } // Config describe information needed for a dataset that can be used for diff --git a/deepvariant/stream_examples.h b/deepvariant/stream_examples.h index da885024..f3c8e31a 100644 --- a/deepvariant/stream_examples.h +++ b/deepvariant/stream_examples.h @@ -39,6 +39,7 @@ #include "deepvariant/pileup_image_native.h" #include "deepvariant/protos/deepvariant.pb.h" #include "absl/types/span.h" + #include "boost/interprocess/managed_shared_memory.hpp" // NOLINT #include "boost/interprocess/shared_memory_object.hpp" // NOLINT #include "boost/interprocess/sync/named_mutex.hpp" // NOLINT diff --git a/scripts/run_pangenome_aware_deepvariant.py b/scripts/run_pangenome_aware_deepvariant.py index 7ab198f5..cf1a030c 100644 --- a/scripts/run_pangenome_aware_deepvariant.py +++ b/scripts/run_pangenome_aware_deepvariant.py @@ -386,6 +386,35 @@ def _set_small_model_config( ) +def load_gbz_into_shared_memory_command( + gbz, + gbz_shared_memory_name, + gbz_shared_memory_size_gb, +) -> Tuple[str, Optional[str]]: + """Returns a load_gbz_into_shared_memory (command, logfile) for subprocess. + + Args: + gbz: Input pangenome GBZ file(s). + gbz_shared_memory_name: Name of the shared memory region to create. + gbz_shared_memory_size_gb: Size of the shared memory region to create. + + Returns: + (string, string) A command to run, and a log file to output to. + """ + command = ['time', '/opt/deepvariant/bin/load_gbz_into_shared_memory'] + command.extend(['--pangenome_gbz', '"{}"'.format(gbz)]) + command.extend( + ['--shared_memory_name', '"{}"'.format(gbz_shared_memory_name)] + ) + command.extend(['--shared_memory_size_gb', str(gbz_shared_memory_size_gb)]) + command.extend(['--num_shards', '"{}"'.format(_NUM_SHARDS.value)]) + + logfile = None + if _LOGGING_DIR.value: + logfile = '{}/load_gbz_into_shared_memory.log'.format(_LOGGING_DIR.value) + return (' '.join(command), logfile) + + def make_examples_pangenome_aware_dv_command( ref: str, reads: str, @@ -690,6 +719,15 @@ def create_all_commands_and_logfiles( else: runtime_by_region_path = None + # Load pangenome GBZ into shared memory. + if _PANGENOME.value is not None and _PANGENOME.value.endswith('.gbz'): + commands.append( + load_gbz_into_shared_memory_command( + gbz=_PANGENOME.value, + gbz_shared_memory_name='GBZ_SHARED_MEMORY', + gbz_shared_memory_size_gb=10, + ) + ) model_ckpt = get_model_ckpt(_MODEL_TYPE.value, _CUSTOMIZED_MODEL.value) commands.append( make_examples_pangenome_aware_dv_command( diff --git a/third_party/nucleus/io/gbz_reader.cc b/third_party/nucleus/io/gbz_reader.cc index c70bca8d..d6cac00a 100644 --- a/third_party/nucleus/io/gbz_reader.cc +++ b/third_party/nucleus/io/gbz_reader.cc @@ -18,6 +18,11 @@ // Implementation of gbz_reader.h #include "third_party/nucleus/io/gbz_reader.h" +#include "boost/interprocess/managed_shared_memory.hpp" // NOLINT +#include "boost/interprocess/shared_memory_object.hpp" // NOLINT +#include "boost/interprocess/sync/named_mutex.hpp" // NOLINT + + #include #include #include // IWYU pragma: keep @@ -31,9 +36,9 @@ #include "absl/log/log.h" #include "absl/strings/str_cat.h" #include "include/gbwt/metadata.h" -#include "include/gbwt/support.h" #include "include/gbwt/utils.h" #include "include/gbwtgraph/subgraph.h" +#include "include/gbwtgraph/gbz.h" #include "src/include/handlegraph/types.hpp" #include "third_party/nucleus/core/status.h" #include "third_party/nucleus/core/statusor.h" @@ -48,27 +53,141 @@ namespace nucleus { using nucleus::genomics::v1::Range; using nucleus::genomics::v1::Read; +namespace bi = boost::interprocess; + +void GbzReader::create_or_open_shared_memory(){ + if (this->shared_memory_ != nullptr){ + LOG(WARNING) << "shared memory is not null so not opening it again\n"; + return; + } + if (this->shared_memory_name_.empty()){ + LOG(FATAL) << "shared memory should have a non-empty name\n"; + return; + } + if (this->shared_memory_size_gb_ <= 0){ + LOG(FATAL) << "shared_memory should have a non-empty name\n"; + return; + } + + if (this->create_shared_memory_) { + bi::shared_memory_object::remove(this->shared_memory_name_.c_str()); + LOG(INFO) << "Creating shared memory: " << this->shared_memory_name_; + // Create shared memory + this->shared_memory_ = new bi::managed_shared_memory( + bi::create_only, + this->shared_memory_name_.c_str(), + shared_memory_size_gb_ * 1e9); + // set the process countdown to the number of processes that will use the + // shared memory + // lock the mutex to make sure no other process is modifying the countdown + bi::named_mutex mtx_process_countdown( + bi::open_or_create, + "PROCESS_COUNTDOWN_MUTEX"); + bi::scoped_lock lock(mtx_process_countdown); + int *process_countdown = this->shared_memory_->find_or_construct + ("PROCESS_COUNTDOWN")(0); + *process_countdown = num_processes_; + LOG(INFO) << + "Number of expected processes (excluding the one that created" + " the shared memory) using " + "shared memory is set to:\n" << + *process_countdown << "\n"; + } else { + LOG(INFO) << "Opening shared memory " << this->shared_memory_name_; + this->shared_memory_ = new bi::managed_shared_memory( + bi::open_only, + this->shared_memory_name_.c_str()); + } +} + +void GbzReader::close_shared_memory(){ + if (this->shared_memory_ != nullptr){ + // lock the mutex to make sure no other process is modifying the countdown + bi::named_mutex mtx_process_countdown( + bi::open_or_create, + "PROCESS_COUNTDOWN_MUTEX"); + bi::scoped_lock lock(mtx_process_countdown); + int *process_countdown = this->shared_memory_->find_or_construct + ("PROCESS_COUNTDOWN")(0); + if ((*process_countdown == 0 && this->create_shared_memory_)|| + (*process_countdown == 1 && !this->create_shared_memory_)){ + // If there is no process left using the shared memory or if the process + // that created the shared memory is the only one use it then + // delete the shared memory + LOG(INFO) << + "This is the last process using shared memory so removing memory: " << + this->shared_memory_name_; + bi::shared_memory_object::remove( + this->shared_memory_name_.c_str()); + } else if (!this->create_shared_memory_){ // if this is not the process + // that created the shared memory. + // The countdown will NOT be reduced if the shared memory was created + // by this process + *process_countdown -= 1; + LOG(INFO) << "This is not the last process using shared memory" << + "(remaining: " << *process_countdown <<") so keeping memory open: "<< + this->shared_memory_name_; + } + delete this->shared_memory_; + this->shared_memory_ = nullptr; + this->shared_memory_name_ = ""; + } +} GbzReader::GbzReader(const std::string& gbz_path, const std::string& sample_name, int context, - const std::string& chrom_prefix) - : sample_name_(sample_name), context_(context), - chrom_prefix_(chrom_prefix) { + const std::string& chrom_prefix, + const std::string& shared_memory_name, + bool create_shared_memory, + int shared_memory_size_gb, + int num_processes) + : sample_name_(sample_name), + context_(context), + chrom_prefix_(chrom_prefix), + shared_memory_name_(shared_memory_name), + shared_memory_size_gb_(shared_memory_size_gb), + create_shared_memory_(create_shared_memory), + num_processes_(num_processes){ double start = gbwt::readTimer(); // Open GBZ file in read mode. std::ifstream in(gbz_path); + this->shared_memory_ = nullptr; + // open shared memory + this->create_or_open_shared_memory(); // Create an empty GBZ object. - this->gbz_ = gbwtgraph::GBZ(); + this->gbz_ = + std::make_unique>( + this->shared_memory_ + ); // Load the GBZ file into the GBZ object. - gbz_.simple_sds_load(in); + gbz_->simple_sds_load(in); // Create a PathIndex object. - this->path_index_ = std::make_unique(this->gbz_); - // Size of the context window from each side - this->context_ = context; - this->chrom_prefix_ = chrom_prefix; + this->path_index_ = std::make_unique(*this->gbz_); + + if (shared_memory_ == nullptr){ + LOG(FATAL) << "shared memory is NOT opened/created!\n"; + } + + if (this->create_shared_memory_ && this->shared_memory_ != nullptr){ + bool* stringarray_forward_only_loaded = + shared_memory_->find_or_construct + ("StringArray_forward_only_loaded")(); + bool* stringarray_final_loaded = + shared_memory_->find_or_construct + ("StringArray_final_loaded")(); + // We should set these two variables to true because it might not be + // the first time that find_or_construct is called for these two bool + // variables. + // When we run load_gbz_into_shared_memory first it checks the existence of + // these variables in check_existence_in_shared_memory() in + // gbwt/src/support.cpp and creates these two variables there. + *stringarray_forward_only_loaded = true; + *stringarray_final_loaded = true; + LOG(INFO) << "shared memory is created and strings are loaded\n"; + } double end = gbwt::readTimer(); LOG(INFO) << "Loading GBZ file took " << end - start << " seconds"; @@ -87,7 +206,7 @@ nucleus::StatusOr> GbzReader::Query( } - const gbwt::Metadata& metadata = gbz_.index.metadata; + const gbwt::Metadata& metadata = gbz_->index.metadata; // remove the prefix from the contig name std::string contig_name_without_prefix = @@ -110,17 +229,17 @@ nucleus::StatusOr> GbzReader::Query( sample_name_)); } handlegraph::path_handle_t path = - gbz_.graph.path_to_handle(path_ids.front()); + gbz_->graph.path_to_handle(path_ids.front()); gbwtgraph::SubgraphQuery query = gbwtgraph::SubgraphQuery::path_interval( path, start_pos, end_pos, context_, gbwtgraph::SubgraphQuery::HaplotypeOutput::all_haplotypes); - gbwtgraph::Subgraph subgraph(gbz_, path_index_.get(), query); + gbwtgraph::Subgraph subgraph(*gbz_, path_index_.get(), query); const std::vector& reads = - GetReadsFromSubgraph(subgraph, gbz_, chrom_prefix_); + GetReadsFromSubgraph(subgraph, *gbz_, chrom_prefix_); updateCache(reads); @@ -165,9 +284,10 @@ void GbzReader::updateCache( last_read->aligned_sequence().size(); } +template std::vector GbzReader::GetReadsFromSubgraph( const gbwtgraph::Subgraph& subgraph, - const gbwtgraph::GBZ& gbz, + const gbwtgraph::GBZ& gbz, const std::string& chrom_prefix) { // W-lines: reference path std::string contig_name = "unknown"; @@ -277,8 +397,9 @@ std::vector GbzReader::GetCigarElements(const std::string& input) { return results; } +template std::string GbzReader::GetBases(const gbwt::vector_type& path, - const gbwtgraph::GBZ& gbz) { + const gbwtgraph::GBZ& gbz) { std::string bases(""); for (gbwt::node_type node : path) { handlegraph::handle_t handle = gbz.graph.get_handle(gbwt::Node::id(node)); diff --git a/third_party/nucleus/io/gbz_reader.h b/third_party/nucleus/io/gbz_reader.h index 764c696e..312e919a 100644 --- a/third_party/nucleus/io/gbz_reader.h +++ b/third_party/nucleus/io/gbz_reader.h @@ -36,6 +36,7 @@ #include #include +#include "include/gbwt/utils.h" #include "include/gbwtgraph/gbz.h" #include "include/gbwtgraph/subgraph.h" #include "third_party/nucleus/core/statusor.h" @@ -44,6 +45,11 @@ #include "third_party/nucleus/protos/range.pb.h" #include "third_party/nucleus/protos/reads.pb.h" +#include "boost/interprocess/managed_shared_memory.hpp" // NOLINT +#include "boost/interprocess/shared_memory_object.hpp" // NOLINT +#include "boost/interprocess/sync/named_mutex.hpp" // NOLINT + + namespace nucleus { using SamIterable = Iterable; @@ -68,20 +74,27 @@ class GbzReader : public Reader { GbzReader(const std::string& gbz_path, const std::string& sample_name, int context, - const std::string& chrom_prefix); + const std::string& chrom_prefix = "", + const std::string& shared_memory_name = "GBZ_SHARED_MEMORY", + bool create_shared_memory = true, + int shared_memory_size_gb = 10, + int num_processes = 0); + + ~GbzReader(){this->close_shared_memory();} nucleus::StatusOr> Query( const nucleus::genomics::v1::Range& range); nucleus::StatusOr> Iterate() const; + private: // The filename of the GBZ file. std::string gbz_filename_; // The sample name of the sample for query. std::string sample_name_; // The GBZ object. - gbwtgraph::GBZ gbz_; + std::unique_ptr> gbz_; // The PathIndex object. std::unique_ptr path_index_; // context size @@ -89,14 +102,32 @@ class GbzReader : public Reader { // chrom prefix std::string chrom_prefix_; + // shared memory + boost::interprocess::managed_shared_memory* shared_memory_; + // shared memory name + std::string shared_memory_name_; + // shared memory size + int shared_memory_size_gb_; + // shared memory status + bool create_shared_memory_; + // number of processes that will use shared memory excluding the one that + // created it. + // it is used to make sure that the shared memory is not deleted before all + // processes are done using it. + int num_processes_; + + void create_or_open_shared_memory(); + void close_shared_memory(); + std::vector reads_cache_; int cache_start_pos_ = 0; int cache_end_pos_ = 0; void updateCache(const std::vector& reads); + template static std::string GetBases(const gbwt::vector_type& path, - const gbwtgraph::GBZ& gbz); + const gbwtgraph::GBZ& gbz); static std::string GetReverseComplement(const std::string& sequence); static nucleus::genomics::v1::Read MakeRead( const std::string& chr, const int start, const std::string& bases, @@ -104,9 +135,11 @@ class GbzReader : public Reader { const std::string& read_name); static genomics::v1::CigarUnit_Operation ParseCigarOpStr(const char op); static std::vector GetCigarElements(const std::string& input); + + template static std::vector GetReadsFromSubgraph( const gbwtgraph::Subgraph& subgraph, - const gbwtgraph::GBZ& gbz, + const gbwtgraph::GBZ& gbz, const std::string& chrom_prefix); }; diff --git a/third_party/nucleus/io/python/gbz_reader_pybind.cc b/third_party/nucleus/io/python/gbz_reader_pybind.cc index 5c5dfe87..a0c40791 100644 --- a/third_party/nucleus/io/python/gbz_reader_pybind.cc +++ b/third_party/nucleus/io/python/gbz_reader_pybind.cc @@ -51,6 +51,13 @@ PYBIND11_MODULE(gbz_reader, m) { using namespace ::nucleus; // NOLINT py::classh(m, "GbzReader") - .def(py::init()) + .def(py::init()) .def("query", &GbzReader::Query, py::arg("region")); } diff --git a/third_party/nucleus/io/sam.py b/third_party/nucleus/io/sam.py index 1fd3d169..3314f762 100644 --- a/third_party/nucleus/io/sam.py +++ b/third_party/nucleus/io/sam.py @@ -262,9 +262,22 @@ def _native_reader(self, ref_name='', context=1000, chrom_prefix='', + shared_memory_name='GBZ_SHARED_MEMORY', + create_shared_memory=True, + shared_memory_size_gb=10, + num_processes=0, **kwargs): if input_path.endswith('.gbz'): - return gbz_reader.GbzReader(input_path, ref_name, context, chrom_prefix) + print('gbzReader python') + return gbz_reader.GbzReader(input_path, + ref_name, + context, + chrom_prefix, + shared_memory_name, + create_shared_memory, + shared_memory_size_gb, + num_processes) + return NativeSamReader(input_path, **kwargs) def _record_proto(self):