From eeaa8e8371996541c0a4e1fc01d35cc26b3467f7 Mon Sep 17 00:00:00 2001 From: koles Date: Thu, 30 May 2024 12:39:31 -0700 Subject: [PATCH] Move fast_pipeline shared object naming into shared functions. PiperOrigin-RevId: 638748189 --- build_and_test.sh | 2 +- build_release_binaries.sh | 2 +- deepvariant/BUILD | 12 +++++ deepvariant/fast_pipeline.cc | 34 +++++++++----- deepvariant/fast_pipeline.h | 8 ++++ deepvariant/fast_pipeline_utils.h | 67 +++++++++++++++++++++++++++ deepvariant/stream_examples.cc | 9 ++-- deepvariant/stream_examples_kernel.cc | 17 +++---- 8 files changed, 123 insertions(+), 28 deletions(-) create mode 100644 deepvariant/fast_pipeline_utils.h diff --git a/build_and_test.sh b/build_and_test.sh index 6690ab54..9bdc676c 100755 --- a/build_and_test.sh +++ b/build_and_test.sh @@ -54,7 +54,7 @@ g++ -std=c++14 -shared \ -o deepvariant/examples_from_stream.so \ -fPIC \ -l:libtensorflow_framework.so.2 \ - -I/home/koles/deepvariant \ + -I. \ ${TF_CFLAGS[@]} \ ${TF_LFLAGS[@]} \ -D_GLIBCXX_USE_CXX11_ABI=1 \ diff --git a/build_release_binaries.sh b/build_release_binaries.sh index 2f3ec91b..42397758 100755 --- a/build_release_binaries.sh +++ b/build_release_binaries.sh @@ -114,7 +114,7 @@ g++ -std=c++14 -shared \ -o deepvariant/examples_from_stream.so \ -fPIC \ -l:libtensorflow_framework.so.2 \ - -I/home/koles/deepvariant \ + -I. \ ${TF_CFLAGS[@]} \ ${TF_LFLAGS[@]} \ -D_GLIBCXX_USE_CXX11_ABI=1 \ diff --git a/deepvariant/BUILD b/deepvariant/BUILD index e9b81cfa..f76bc702 100644 --- a/deepvariant/BUILD +++ b/deepvariant/BUILD @@ -1566,6 +1566,7 @@ cc_library( hdrs = ["make_examples_native.h"], deps = [ ":alt_aligned_pileup_lib", + ":fast_pipeline_utils", ":pileup_image_native", ":stream_examples", "//deepvariant/protos:deepvariant_cc_pb2", @@ -1664,6 +1665,7 @@ cc_library( srcs = ["stream_examples.cc"], hdrs = ["stream_examples.h"], deps = [ + ":fast_pipeline_utils", ":pileup_image_native", "//deepvariant/protos:deepvariant_cc_pb2", "//third_party/nucleus/protos:variants_cc_pb2", @@ -1681,9 +1683,19 @@ cc_binary( "fast_pipeline.h", ], deps = [ + ":fast_pipeline_utils", "@com_google_absl//absl/flags:flag", "@com_google_absl//absl/flags:parse", "@com_google_absl//absl/log", "@com_google_absl//absl/strings", ], ) + +cc_library( + name = "fast_pipeline_utils", + srcs = ["fast_pipeline_utils.h"], + hdrs = ["fast_pipeline_utils.h"], + deps = [ + "@com_google_absl//absl/strings", + ], +) diff --git a/deepvariant/fast_pipeline.cc b/deepvariant/fast_pipeline.cc index 1480495c..47a3e8f0 100644 --- a/deepvariant/fast_pipeline.cc +++ b/deepvariant/fast_pipeline.cc @@ -39,6 +39,7 @@ #include +#include "deepvariant/fast_pipeline_utils.h" #include "absl/flags/flag.h" #include "absl/flags/parse.h" #include "absl/log/log.h" @@ -54,9 +55,6 @@ #include "boost/process.hpp" // NOLINT #include "boost/process/search_path.hpp" // NOLINT -namespace bp = boost::process; -namespace bi = boost::interprocess; - ABSL_FLAG(std::string, make_example_flags, "", "file containing make_examples flags"); ABSL_FLAG(std::string, call_variants_flags, "", @@ -66,6 +64,13 @@ ABSL_FLAG(int, num_shards, 0, "number of make_examples shards"); ABSL_FLAG(int, buffer_size, 10485760, "Shared memory buffer size for each shard, default is 10MB"); +namespace learning { +namespace genomics { +namespace deepvariant { + +namespace bp = boost::process; +namespace bi = boost::interprocess; + FastPipeline::FastPipeline(int num_shards, int buffer_size, absl::string_view shm_prefix, absl::string_view path_to_make_examples_flags, @@ -106,7 +111,7 @@ FastPipeline::FastPipeline(int num_shards, int buffer_size, void FastPipeline::SetGlobalObjects() { for (int shard = 0; shard < num_shards_; ++shard) { // Create shared memory buffers. - std::string shard_shm_name = absl::StrCat(shm_prefix_, "_shm_", shard); + std::string shard_shm_name = GetShmBufferName(shm_prefix_, shard); shm_[shard] = std::make_unique(bi::shared_memory_object( bi::open_or_create, shard_shm_name.data(), bi::read_write)); @@ -115,17 +120,17 @@ void FastPipeline::SetGlobalObjects() { LOG(INFO) << "Creating buffer_empty mutex"; buffer_empty_[shard] = std::make_unique( bi::open_or_create, - absl::StrCat(shm_prefix_, "_buffer_empty_", shard).data()); + GetBufferEmptyMutexName(shm_prefix_, shard).data()); // Create mutex signalling that items are available in the buffer. LOG(INFO) << "Creating items_available mutex"; items_available_[shard] = std::make_unique( bi::open_or_create, - absl::StrCat(shm_prefix_, "_items_available_", shard).data()); + GetItemsAvailableMutexName(shm_prefix_, shard).data()); // Create mutex signalling that shard is finished. LOG(INFO) << "Creating shard_finished mutex"; make_examples_shard_finished_[shard] = std::make_unique( bi::open_or_create, - absl::StrCat(shm_prefix_, "_shard_finished_", shard).data()); + GetShardFinishedMutexName(shm_prefix_, shard).data()); } } @@ -135,13 +140,13 @@ void FastPipeline::ClearGlobalObjects() { buffer_empty_[shard].release(); items_available_[shard].release(); make_examples_shard_finished_[shard].release(); - shm_[shard]->remove(absl::StrCat(shm_prefix_, "_shm_", shard).data()); + shm_[shard]->remove(GetShmBufferName(shm_prefix_, shard).data()); buffer_empty_[shard]->remove( - absl::StrCat(shm_prefix_, "_buffer_empty_", shard).data()); + GetBufferEmptyMutexName(shm_prefix_, shard).data()); items_available_[shard]->remove( - absl::StrCat(shm_prefix_, "_items_available_", shard).data()); + GetItemsAvailableMutexName(shm_prefix_, shard).data()); make_examples_shard_finished_[shard]->remove( - absl::StrCat(shm_prefix_, "_shard_finished_", shard).data()); + GetShardFinishedMutexName(shm_prefix_, shard).data()); } } @@ -206,6 +211,10 @@ void RunFastPipeline(absl::string_view dv_bin_path) { fast_pipeline.ClearGlobalObjects(); } +} // namespace deepvariant +} // namespace genomics +} // namespace learning + int main(int argc, char** argv) { absl::ParseCommandLine(argc, argv); @@ -224,5 +233,6 @@ int main(int argc, char** argv) { // 5. call_variants_flags file exists // 6. No SHM files with the same prefix exist. - RunFastPipeline(dv_bin_path); + learning::genomics::deepvariant::RunFastPipeline(dv_bin_path); + return EXIT_SUCCESS; } diff --git a/deepvariant/fast_pipeline.h b/deepvariant/fast_pipeline.h index b64bc597..4dbe4f58 100644 --- a/deepvariant/fast_pipeline.h +++ b/deepvariant/fast_pipeline.h @@ -40,6 +40,10 @@ #include "boost/interprocess/sync/named_mutex.hpp" // NOLINT #include "boost/process.hpp" // NOLINT +namespace learning { +namespace genomics { +namespace deepvariant { + class FastPipeline { public: FastPipeline(int num_shards, int buffer_size, absl::string_view shm_prefix, @@ -72,4 +76,8 @@ class FastPipeline { std::vector call_variants_flags_; }; +} // namespace deepvariant +} // namespace genomics +} // namespace learning + #endif // LEARNING_GENOMICS_DEEPVARIANT_FAST_PIPELINE_H_ diff --git a/deepvariant/fast_pipeline_utils.h b/deepvariant/fast_pipeline_utils.h new file mode 100644 index 00000000..0e263ad6 --- /dev/null +++ b/deepvariant/fast_pipeline_utils.h @@ -0,0 +1,67 @@ +/* + * Copyright 2024 Google LLC. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions + * are met: + * + * 1. Redistributions of source code must retain the above copyright notice, + * this list of conditions and the following disclaimer. + * + * 2. Redistributions in binary form must reproduce the above copyright + * notice, this list of conditions and the following disclaimer in the + * documentation and/or other materials provided with the distribution. + * + * 3. Neither the name of the copyright holder nor the names of its + * contributors may be used to endorse or promote products derived from this + * software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" + * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE + * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE + * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE + * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR + * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF + * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS + * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN + * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) + * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE + * POSSIBILITY OF SUCH DAMAGE. + */ + +#ifndef LEARNING_GENOMICS_DEEPVARIANT_FAST_PIPELINE_UTILS_H_ +#define LEARNING_GENOMICS_DEEPVARIANT_FAST_PIPELINE_UTILS_H_ + +#include + +#include "absl/strings/str_cat.h" +#include "absl/strings/string_view.h" + +namespace learning { +namespace genomics { +namespace deepvariant { + +inline std::string GetShmBufferName(absl::string_view prefix, int shard) { + return absl::StrCat(prefix, "_shm_", shard); +} + +inline std::string GetBufferEmptyMutexName(absl::string_view prefix, + int shard) { + return absl::StrCat(prefix, "_buffer_empty_", shard); +} + +inline std::string GetItemsAvailableMutexName(absl::string_view prefix, + int shard) { + return absl::StrCat(prefix, "_items_available_", shard); +} + +inline std::string GetShardFinishedMutexName(absl::string_view prefix, + int shard) { + return absl::StrCat(prefix, "_shard_finished_", shard); +} + +} // namespace deepvariant +} // namespace genomics +} // namespace learning + +#endif // LEARNING_GENOMICS_DEEPVARIANT_FAST_PIPELINE_UTILS_H_ diff --git a/deepvariant/stream_examples.cc b/deepvariant/stream_examples.cc index d93d7d4b..ff8dfc5b 100644 --- a/deepvariant/stream_examples.cc +++ b/deepvariant/stream_examples.cc @@ -41,6 +41,7 @@ #include "absl/strings/str_cat.h" #include "absl/strings/string_view.h" #include "third_party/nucleus/protos/variants.pb.h" +#include "deepvariant/fast_pipeline_utils.h" namespace learning { namespace genomics { @@ -61,7 +62,7 @@ StreamExamples::StreamExamples( } absl::string_view shm_prefix = options.shm_prefix(); // Name of the shared memory buffer. - shm_name_ = absl::StrCat(shm_prefix, "_shm_", options_.task_id()); + shm_name_ = GetShmBufferName(shm_prefix, options_.task_id()); shm_buffer_size_ = options_.shm_buffer_size(); // Open an existing shared memory buffer. @@ -78,16 +79,16 @@ StreamExamples::StreamExamples( LOG(INFO) << "Creating buffer_empty mutex"; buffer_empty_ = std::make_unique( boost::interprocess::open_only, - absl::StrCat(shm_prefix, "_buffer_empty_", options_.task_id()).data()); + GetBufferEmptyMutexName(shm_prefix, options_.task_id()).data()); LOG(INFO) << "Creating items_available mutex"; items_available_ = std::make_unique( boost::interprocess::open_only, - absl::StrCat(shm_prefix, "_items_available_", options_.task_id()).data()); + GetItemsAvailableMutexName(shm_prefix, options_.task_id()).data()); items_available_->lock(); LOG(INFO) << "Creating shard_finished mutex"; shard_finished_ = std::make_unique( boost::interprocess::open_only, - absl::StrCat(shm_prefix, "_shard_finished_", options_.task_id()).data()); + GetShardFinishedMutexName(shm_prefix, options_.task_id()).data()); shard_finished_->lock(); } diff --git a/deepvariant/stream_examples_kernel.cc b/deepvariant/stream_examples_kernel.cc index 06e66bb0..707417c6 100644 --- a/deepvariant/stream_examples_kernel.cc +++ b/deepvariant/stream_examples_kernel.cc @@ -61,13 +61,14 @@ #include "tensorflow/core/platform/types.h" #include "tensorflow/tsl/platform/errors.h" #include "tensorflow/tsl/platform/thread_annotations.h" +#include "deepvariant/fast_pipeline_utils.h" namespace learning { namespace genomics { namespace deepvariant { // This class implements a custom TensorFlow op that reads data from shared -// memory files and returns a batch of examples. Batches are variable length. +// memory buffers and returns a batch of examples. Batches are variable length. // Examples are read from shared memory buffers (one for each shard). // Examples are written in the following format: // int: length of alt_indices_encoded @@ -98,11 +99,10 @@ class StreamExamplesResource : public tensorflow::ResourceBase { make_examples_shard_finished_.resize(num_shards); num_shards_ = num_shards; for (int shard = 0; shard < num_shards; shard++) { - // TODO Move name generation into a shared function. - std::string shm_name = absl::StrCat(shm_prefix, "_shm_", shard); shm_[shard] = std::make_unique( boost::interprocess::shared_memory_object( - boost::interprocess::open_only, shm_name.data(), + boost::interprocess::open_only, + GetShmBufferName(shm_prefix, shard).data(), boost::interprocess::read_write)); shm_region_[shard] = std::make_unique( *shm_[shard], boost::interprocess::read_write); @@ -112,18 +112,15 @@ class StreamExamplesResource : public tensorflow::ResourceBase { // Init mutexes buffer_empty_[shard] = std::make_unique( boost::interprocess::open_only, - // TODO Move name generation into a shared function. - absl::StrCat(shm_prefix, "_buffer_empty_", shard).data()); + GetBufferEmptyMutexName(shm_prefix, shard).data()); items_available_[shard] = std::make_unique( boost::interprocess::open_only, - // TODO Move name generation into a shared function. - absl::StrCat(shm_prefix, "_items_available_", shard).data()); + GetItemsAvailableMutexName(shm_prefix, shard).data()); make_examples_shard_finished_[shard] = std::make_unique( boost::interprocess::open_only, - // TODO Move name generation into a shared function. - absl::StrCat(shm_prefix, "_shard_finished_", shard).data()); + GetShardFinishedMutexName(shm_prefix, shard).data()); } return tensorflow::Status();