Skip to content

Commit

Permalink
Move fast_pipeline shared object naming into shared functions.
Browse files Browse the repository at this point in the history
PiperOrigin-RevId: 638748189
  • Loading branch information
akolesnikov authored and copybara-github committed May 30, 2024
1 parent 8cbd49f commit eeaa8e8
Show file tree
Hide file tree
Showing 8 changed files with 123 additions and 28 deletions.
2 changes: 1 addition & 1 deletion build_and_test.sh
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ g++ -std=c++14 -shared \
-o deepvariant/examples_from_stream.so \
-fPIC \
-l:libtensorflow_framework.so.2 \
-I/home/koles/deepvariant \
-I. \
${TF_CFLAGS[@]} \
${TF_LFLAGS[@]} \
-D_GLIBCXX_USE_CXX11_ABI=1 \
Expand Down
2 changes: 1 addition & 1 deletion build_release_binaries.sh
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,7 @@ g++ -std=c++14 -shared \
-o deepvariant/examples_from_stream.so \
-fPIC \
-l:libtensorflow_framework.so.2 \
-I/home/koles/deepvariant \
-I. \
${TF_CFLAGS[@]} \
${TF_LFLAGS[@]} \
-D_GLIBCXX_USE_CXX11_ABI=1 \
Expand Down
12 changes: 12 additions & 0 deletions deepvariant/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -1566,6 +1566,7 @@ cc_library(
hdrs = ["make_examples_native.h"],
deps = [
":alt_aligned_pileup_lib",
":fast_pipeline_utils",
":pileup_image_native",
":stream_examples",
"//deepvariant/protos:deepvariant_cc_pb2",
Expand Down Expand Up @@ -1664,6 +1665,7 @@ cc_library(
srcs = ["stream_examples.cc"],
hdrs = ["stream_examples.h"],
deps = [
":fast_pipeline_utils",
":pileup_image_native",
"//deepvariant/protos:deepvariant_cc_pb2",
"//third_party/nucleus/protos:variants_cc_pb2",
Expand All @@ -1681,9 +1683,19 @@ cc_binary(
"fast_pipeline.h",
],
deps = [
":fast_pipeline_utils",
"@com_google_absl//absl/flags:flag",
"@com_google_absl//absl/flags:parse",
"@com_google_absl//absl/log",
"@com_google_absl//absl/strings",
],
)

cc_library(
name = "fast_pipeline_utils",
srcs = ["fast_pipeline_utils.h"],
hdrs = ["fast_pipeline_utils.h"],
deps = [
"@com_google_absl//absl/strings",
],
)
34 changes: 22 additions & 12 deletions deepvariant/fast_pipeline.cc
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
#include <string>


#include "deepvariant/fast_pipeline_utils.h"
#include "absl/flags/flag.h"
#include "absl/flags/parse.h"
#include "absl/log/log.h"
Expand All @@ -54,9 +55,6 @@
#include "boost/process.hpp" // NOLINT
#include "boost/process/search_path.hpp" // NOLINT

namespace bp = boost::process;
namespace bi = boost::interprocess;

ABSL_FLAG(std::string, make_example_flags, "",
"file containing make_examples flags");
ABSL_FLAG(std::string, call_variants_flags, "",
Expand All @@ -66,6 +64,13 @@ ABSL_FLAG(int, num_shards, 0, "number of make_examples shards");
ABSL_FLAG(int, buffer_size, 10485760,
"Shared memory buffer size for each shard, default is 10MB");

namespace learning {
namespace genomics {
namespace deepvariant {

namespace bp = boost::process;
namespace bi = boost::interprocess;

FastPipeline::FastPipeline(int num_shards, int buffer_size,
absl::string_view shm_prefix,
absl::string_view path_to_make_examples_flags,
Expand Down Expand Up @@ -106,7 +111,7 @@ FastPipeline::FastPipeline(int num_shards, int buffer_size,
void FastPipeline::SetGlobalObjects() {
for (int shard = 0; shard < num_shards_; ++shard) {
// Create shared memory buffers.
std::string shard_shm_name = absl::StrCat(shm_prefix_, "_shm_", shard);
std::string shard_shm_name = GetShmBufferName(shm_prefix_, shard);
shm_[shard] =
std::make_unique<bi::shared_memory_object>(bi::shared_memory_object(
bi::open_or_create, shard_shm_name.data(), bi::read_write));
Expand All @@ -115,17 +120,17 @@ void FastPipeline::SetGlobalObjects() {
LOG(INFO) << "Creating buffer_empty mutex";
buffer_empty_[shard] = std::make_unique<bi::named_mutex>(
bi::open_or_create,
absl::StrCat(shm_prefix_, "_buffer_empty_", shard).data());
GetBufferEmptyMutexName(shm_prefix_, shard).data());
// Create mutex signalling that items are available in the buffer.
LOG(INFO) << "Creating items_available mutex";
items_available_[shard] = std::make_unique<bi::named_mutex>(
bi::open_or_create,
absl::StrCat(shm_prefix_, "_items_available_", shard).data());
GetItemsAvailableMutexName(shm_prefix_, shard).data());
// Create mutex signalling that shard is finished.
LOG(INFO) << "Creating shard_finished mutex";
make_examples_shard_finished_[shard] = std::make_unique<bi::named_mutex>(
bi::open_or_create,
absl::StrCat(shm_prefix_, "_shard_finished_", shard).data());
GetShardFinishedMutexName(shm_prefix_, shard).data());
}
}

Expand All @@ -135,13 +140,13 @@ void FastPipeline::ClearGlobalObjects() {
buffer_empty_[shard].release();
items_available_[shard].release();
make_examples_shard_finished_[shard].release();
shm_[shard]->remove(absl::StrCat(shm_prefix_, "_shm_", shard).data());
shm_[shard]->remove(GetShmBufferName(shm_prefix_, shard).data());
buffer_empty_[shard]->remove(
absl::StrCat(shm_prefix_, "_buffer_empty_", shard).data());
GetBufferEmptyMutexName(shm_prefix_, shard).data());
items_available_[shard]->remove(
absl::StrCat(shm_prefix_, "_items_available_", shard).data());
GetItemsAvailableMutexName(shm_prefix_, shard).data());
make_examples_shard_finished_[shard]->remove(
absl::StrCat(shm_prefix_, "_shard_finished_", shard).data());
GetShardFinishedMutexName(shm_prefix_, shard).data());
}
}

Expand Down Expand Up @@ -206,6 +211,10 @@ void RunFastPipeline(absl::string_view dv_bin_path) {
fast_pipeline.ClearGlobalObjects();
}

} // namespace deepvariant
} // namespace genomics
} // namespace learning

int main(int argc, char** argv) {

absl::ParseCommandLine(argc, argv);
Expand All @@ -224,5 +233,6 @@ int main(int argc, char** argv) {
// 5. call_variants_flags file exists
// 6. No SHM files with the same prefix exist.

RunFastPipeline(dv_bin_path);
learning::genomics::deepvariant::RunFastPipeline(dv_bin_path);
return EXIT_SUCCESS;
}
8 changes: 8 additions & 0 deletions deepvariant/fast_pipeline.h
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,10 @@
#include "boost/interprocess/sync/named_mutex.hpp" // NOLINT
#include "boost/process.hpp" // NOLINT

namespace learning {
namespace genomics {
namespace deepvariant {

class FastPipeline {
public:
FastPipeline(int num_shards, int buffer_size, absl::string_view shm_prefix,
Expand Down Expand Up @@ -72,4 +76,8 @@ class FastPipeline {
std::vector<std::string> call_variants_flags_;
};

} // namespace deepvariant
} // namespace genomics
} // namespace learning

#endif // LEARNING_GENOMICS_DEEPVARIANT_FAST_PIPELINE_H_
67 changes: 67 additions & 0 deletions deepvariant/fast_pipeline_utils.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
/*
* Copyright 2024 Google LLC.
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions
* are met:
*
* 1. Redistributions of source code must retain the above copyright notice,
* this list of conditions and the following disclaimer.
*
* 2. Redistributions in binary form must reproduce the above copyright
* notice, this list of conditions and the following disclaimer in the
* documentation and/or other materials provided with the distribution.
*
* 3. Neither the name of the copyright holder nor the names of its
* contributors may be used to endorse or promote products derived from this
* software without specific prior written permission.
*
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
* AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
* IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
* ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE
* LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
* CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
* SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
* INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
* CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
* ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
* POSSIBILITY OF SUCH DAMAGE.
*/

#ifndef LEARNING_GENOMICS_DEEPVARIANT_FAST_PIPELINE_UTILS_H_
#define LEARNING_GENOMICS_DEEPVARIANT_FAST_PIPELINE_UTILS_H_

#include <string>

#include "absl/strings/str_cat.h"
#include "absl/strings/string_view.h"

namespace learning {
namespace genomics {
namespace deepvariant {

inline std::string GetShmBufferName(absl::string_view prefix, int shard) {
return absl::StrCat(prefix, "_shm_", shard);
}

inline std::string GetBufferEmptyMutexName(absl::string_view prefix,
int shard) {
return absl::StrCat(prefix, "_buffer_empty_", shard);
}

inline std::string GetItemsAvailableMutexName(absl::string_view prefix,
int shard) {
return absl::StrCat(prefix, "_items_available_", shard);
}

inline std::string GetShardFinishedMutexName(absl::string_view prefix,
int shard) {
return absl::StrCat(prefix, "_shard_finished_", shard);
}

} // namespace deepvariant
} // namespace genomics
} // namespace learning

#endif // LEARNING_GENOMICS_DEEPVARIANT_FAST_PIPELINE_UTILS_H_
9 changes: 5 additions & 4 deletions deepvariant/stream_examples.cc
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
#include "absl/strings/str_cat.h"
#include "absl/strings/string_view.h"
#include "third_party/nucleus/protos/variants.pb.h"
#include "deepvariant/fast_pipeline_utils.h"

namespace learning {
namespace genomics {
Expand All @@ -61,7 +62,7 @@ StreamExamples::StreamExamples(
}
absl::string_view shm_prefix = options.shm_prefix();
// Name of the shared memory buffer.
shm_name_ = absl::StrCat(shm_prefix, "_shm_", options_.task_id());
shm_name_ = GetShmBufferName(shm_prefix, options_.task_id());
shm_buffer_size_ = options_.shm_buffer_size();

// Open an existing shared memory buffer.
Expand All @@ -78,16 +79,16 @@ StreamExamples::StreamExamples(
LOG(INFO) << "Creating buffer_empty mutex";
buffer_empty_ = std::make_unique<boost::interprocess::named_mutex>(
boost::interprocess::open_only,
absl::StrCat(shm_prefix, "_buffer_empty_", options_.task_id()).data());
GetBufferEmptyMutexName(shm_prefix, options_.task_id()).data());
LOG(INFO) << "Creating items_available mutex";
items_available_ = std::make_unique<boost::interprocess::named_mutex>(
boost::interprocess::open_only,
absl::StrCat(shm_prefix, "_items_available_", options_.task_id()).data());
GetItemsAvailableMutexName(shm_prefix, options_.task_id()).data());
items_available_->lock();
LOG(INFO) << "Creating shard_finished mutex";
shard_finished_ = std::make_unique<boost::interprocess::named_mutex>(
boost::interprocess::open_only,
absl::StrCat(shm_prefix, "_shard_finished_", options_.task_id()).data());
GetShardFinishedMutexName(shm_prefix, options_.task_id()).data());
shard_finished_->lock();
}

Expand Down
17 changes: 7 additions & 10 deletions deepvariant/stream_examples_kernel.cc
Original file line number Diff line number Diff line change
Expand Up @@ -61,13 +61,14 @@
#include "tensorflow/core/platform/types.h"
#include "tensorflow/tsl/platform/errors.h"
#include "tensorflow/tsl/platform/thread_annotations.h"
#include "deepvariant/fast_pipeline_utils.h"

namespace learning {
namespace genomics {
namespace deepvariant {

// This class implements a custom TensorFlow op that reads data from shared
// memory files and returns a batch of examples. Batches are variable length.
// memory buffers and returns a batch of examples. Batches are variable length.
// Examples are read from shared memory buffers (one for each shard).
// Examples are written in the following format:
// int: length of alt_indices_encoded
Expand Down Expand Up @@ -98,11 +99,10 @@ class StreamExamplesResource : public tensorflow::ResourceBase {
make_examples_shard_finished_.resize(num_shards);
num_shards_ = num_shards;
for (int shard = 0; shard < num_shards; shard++) {
// TODO Move name generation into a shared function.
std::string shm_name = absl::StrCat(shm_prefix, "_shm_", shard);
shm_[shard] = std::make_unique<boost::interprocess::shared_memory_object>(
boost::interprocess::shared_memory_object(
boost::interprocess::open_only, shm_name.data(),
boost::interprocess::open_only,
GetShmBufferName(shm_prefix, shard).data(),
boost::interprocess::read_write));
shm_region_[shard] = std::make_unique<boost::interprocess::mapped_region>(
*shm_[shard], boost::interprocess::read_write);
Expand All @@ -112,18 +112,15 @@ class StreamExamplesResource : public tensorflow::ResourceBase {
// Init mutexes
buffer_empty_[shard] = std::make_unique<boost::interprocess::named_mutex>(
boost::interprocess::open_only,
// TODO Move name generation into a shared function.
absl::StrCat(shm_prefix, "_buffer_empty_", shard).data());
GetBufferEmptyMutexName(shm_prefix, shard).data());
items_available_[shard] =
std::make_unique<boost::interprocess::named_mutex>(
boost::interprocess::open_only,
// TODO Move name generation into a shared function.
absl::StrCat(shm_prefix, "_items_available_", shard).data());
GetItemsAvailableMutexName(shm_prefix, shard).data());
make_examples_shard_finished_[shard] =
std::make_unique<boost::interprocess::named_mutex>(
boost::interprocess::open_only,
// TODO Move name generation into a shared function.
absl::StrCat(shm_prefix, "_shard_finished_", shard).data());
GetShardFinishedMutexName(shm_prefix, shard).data());
}

return tensorflow::Status();
Expand Down

0 comments on commit eeaa8e8

Please sign in to comment.