Skip to content

Commit

Permalink
don't flush all requests at end of PA
Browse files Browse the repository at this point in the history
  • Loading branch information
tgerdesnv committed May 29, 2024
1 parent 7df8f67 commit 3604a7d
Show file tree
Hide file tree
Showing 12 changed files with 61 additions and 26 deletions.
8 changes: 8 additions & 0 deletions src/c++/perf_analyzer/client_backend/mock_client_backend.h
Original file line number Diff line number Diff line change
Expand Up @@ -489,6 +489,14 @@ class NaggyMockClientBackend : public ClientBackend {
});
}

~NaggyMockClientBackend()
{
// Make sure no requests carry over to the next test
while (stats_->num_active_infer_calls) {
std::this_thread::sleep_for(std::chrono::milliseconds(1));
}
}

MOCK_METHOD(
Error, ModelConfig,
(rapidjson::Document*, const std::string&, const std::string&),
Expand Down
8 changes: 4 additions & 4 deletions src/c++/perf_analyzer/concurrency_worker.cc
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,7 @@ ConcurrencyWorker::HandleExecuteOff()
// Wait if no request should be sent and it is not exiting
thread_config_->is_paused_ = true;
std::unique_lock<std::mutex> lock(wake_mutex_);
wake_signal_.wait(lock, [this]() { return early_exit || execute_; });
wake_signal_.wait(lock, [this]() { return exiting_ || execute_; });

// TODO REFACTOR TMA-1043 - memory manager should be handling this instead
// of here
Expand All @@ -131,10 +131,10 @@ ConcurrencyWorker::HandleNoConcurrency()
// Wait if no request should be sent and it is not exiting
std::unique_lock<std::mutex> lock(wake_mutex_);
wake_signal_.wait(lock, [this]() {
return early_exit || (thread_config_->concurrency_ > 0);
return exiting_ || (thread_config_->concurrency_ > 0);
});
// Stop executing if concurrency is 0 and early exit is requested
if (early_exit && thread_config_->concurrency_ == 0) {
if (exiting_ && thread_config_->concurrency_ == 0) {
return true;
}
}
Expand Down Expand Up @@ -181,7 +181,7 @@ ConcurrencyWorker::WaitForResponses()
std::unique_lock<std::mutex> lk(cb_mtx_);
thread_stat_->idle_timer.Start();
cb_cv_.wait(lk, [this] {
if (notified_) {
if (notified_ || exiting_) {
notified_ = false;
return true;
}
Expand Down
6 changes: 5 additions & 1 deletion src/c++/perf_analyzer/infer_context.cc
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ InferContext::SendSequenceInferRequest(uint32_t seq_stat_index, bool delayed)
// This also helps in reporting the realistic latencies.
std::lock_guard<std::mutex> guard(
sequence_manager_->GetMutex(seq_stat_index));
if (!early_exit && execute_) {
if (!exiting_ && execute_) {
sequence_manager_->SetInferSequenceOptions(
seq_stat_index, infer_data_.options_);

Expand Down Expand Up @@ -298,6 +298,10 @@ InferContext::AsyncCallbackFuncImpl(cb::InferResult* result)
// Add the request record to thread request records vector with
// proper locking
std::lock_guard<std::mutex> lock(thread_stat_->mu_);
if (exiting_) {
return;
}

thread_stat_->cb_status_ = result_ptr->RequestStatus();
if (thread_stat_->cb_status_.IsOk()) {
std::string request_id;
Expand Down
4 changes: 4 additions & 0 deletions src/c++/perf_analyzer/infer_context.h
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,9 @@ class InferContext {
// Initialize the context. Must be done before any inferences are sent
void Init();

// Signal to the context to stop working and exit
void Exit() { exiting_ = true; }

// Send a single inference request to the server
void SendInferRequest(bool delayed = false);

Expand Down Expand Up @@ -192,6 +195,7 @@ class InferContext {

const uint32_t id_{0};
const size_t thread_id_{0};
bool exiting_{false};

size_t GetNumActiveThreads() { return num_active_threads_; }

Expand Down
1 change: 1 addition & 0 deletions src/c++/perf_analyzer/iworker.h
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ namespace triton { namespace perfanalyzer {
class IWorker {
public:
virtual void Infer() = 0;
virtual void Exit() = 0;
};

}} // namespace triton::perfanalyzer
12 changes: 9 additions & 3 deletions src/c++/perf_analyzer/load_manager.cc
Original file line number Diff line number Diff line change
Expand Up @@ -248,9 +248,15 @@ LoadManager::InitManagerInputs(
void
LoadManager::StopWorkerThreads()
{
early_exit = true;
// wake up all threads
wake_signal_.notify_all();
// FIXME do I need to acquire the lock first?
for (auto& worker : workers_) {
worker->Exit();
}

{
std::unique_lock<std::mutex> lock(wake_mutex_);
wake_signal_.notify_all();
}

size_t cnt = 0;
for (auto& thread : threads_) {
Expand Down
19 changes: 16 additions & 3 deletions src/c++/perf_analyzer/load_worker.cc
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,21 @@

namespace triton { namespace perfanalyzer {

void
LoadWorker::Exit()
{
for (auto ctx : ctxs_) {
ctx->Exit();
}

exiting_ = true;

{
std::lock_guard<std::mutex> lk(cb_mtx_);
cb_cv_.notify_all();
}
}

bool
LoadWorker::ShouldExit()
{
Expand All @@ -44,16 +59,14 @@ LoadWorker::ShouldExit()
thread_config_->num_requests_ != 0 &&
thread_stat_->num_sent_requests_ >= thread_config_->num_requests_;

return early_exit || bad_status || done_with_request_count;
return exiting_ || bad_status || done_with_request_count;
}

bool
LoadWorker::HandleExitConditions()
{
if (ShouldExit()) {
CompleteOngoingSequences();
thread_stat_->idle_timer.Start();
WaitForOngoingRequests();
return true;
}
return false;
Expand Down
4 changes: 4 additions & 0 deletions src/c++/perf_analyzer/load_worker.h
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,8 @@ class LoadWorker : public IWorker {

virtual ~LoadWorker() = default;

virtual void Exit() override;

protected:
// Return the total number of async requests that have started and not
// finished
Expand Down Expand Up @@ -117,6 +119,8 @@ class LoadWorker : public IWorker {

void AsyncCallbackFinalize(uint32_t ctx_id);

bool exiting_ = false;

uint32_t id_;

std::vector<std::shared_ptr<InferContext>> ctxs_;
Expand Down
7 changes: 5 additions & 2 deletions src/c++/perf_analyzer/request_rate_worker.cc
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,9 @@ RequestRateWorker::Infer()
HandleExecuteOff();

bool is_delayed = SleepIfNecessary();
if (HandleExitConditions()) {
return;
}
uint32_t ctx_id = GetCtxId();
SendInferRequest(ctx_id, is_delayed);
RestoreFreeCtxId(ctx_id);
Expand Down Expand Up @@ -119,7 +122,7 @@ RequestRateWorker::HandleExecuteOff()
// Wait if no request should be sent and it is not exiting
thread_config_->is_paused_ = true;
std::unique_lock<std::mutex> lock(wake_mutex_);
wake_signal_.wait(lock, [this]() { return early_exit || execute_; });
wake_signal_.wait(lock, [this]() { return exiting_ || execute_; });
}

thread_config_->is_paused_ = false;
Expand Down Expand Up @@ -155,7 +158,7 @@ RequestRateWorker::WaitForFreeCtx()
std::unique_lock<std::mutex> lk(cb_mtx_);
thread_stat_->idle_timer.Start();
cb_cv_.wait(lk, [this] {
if (notified_) {
if (notified_ || exiting_) {
notified_ = false;
return true;
}
Expand Down
4 changes: 2 additions & 2 deletions src/c++/perf_analyzer/test_concurrency_manager.cc
Original file line number Diff line number Diff line change
Expand Up @@ -474,7 +474,7 @@ TEST_CASE("concurrency_free_ctx_ids")

std::this_thread::sleep_for(std::chrono::milliseconds(15));

early_exit = true;
worker->Exit();
infer_future.get();

// The first sequence should only be called two times, once at the very start,
Expand Down Expand Up @@ -590,7 +590,7 @@ TEST_CASE("Concurrency - shared memory infer input calls")

std::this_thread::sleep_for(std::chrono::milliseconds(18));

early_exit = true;
worker->Exit();
infer_future.get();

const auto& actual_append_raw_calls{tcm.stats_->num_append_raw_calls};
Expand Down
6 changes: 0 additions & 6 deletions src/c++/perf_analyzer/test_load_manager_base.h
Original file line number Diff line number Diff line change
Expand Up @@ -66,12 +66,6 @@ class TestLoadManagerBase {
is_sequence_model, is_decoupled_model);
}

~TestLoadManagerBase()
{
// Reset early_exit in case any test sets it to true during execution.
early_exit = false;
}

// Helper function to process custom json data in testing
// Creates a model tensor to pass to a mock parser which is consumed by the
// mock data loader
Expand Down
8 changes: 3 additions & 5 deletions src/c++/perf_analyzer/test_request_rate_manager.cc
Original file line number Diff line number Diff line change
Expand Up @@ -149,7 +149,6 @@ class TestRequestRateManager : public TestLoadManagerBase,
REQUIRE(timestamp.count() == expected_current_timestamp.count());
}
}
early_exit = true;
}

void TestCreateSchedule(
Expand All @@ -168,7 +167,6 @@ class TestRequestRateManager : public TestLoadManagerBase,
total_num_seqs += w->thread_config_->num_sequences_;
worker_schedule_sizes.push_back(w->schedule_->intervals.size());
}
early_exit = true;

CHECK(num_of_sequences_ == total_num_seqs);
for (int i = 0; i < worker_schedule_sizes.size() - 1; i++) {
Expand Down Expand Up @@ -977,7 +975,7 @@ TEST_CASE("request_rate_streaming: test that streaming-specific logic works")
std::dynamic_pointer_cast<IScheduler>(worker)->SetSchedule(schedule);
std::future<void> infer_future{std::async(&IWorker::Infer, worker)};

early_exit = true;
worker->Exit();
infer_future.get();

CHECK(
Expand Down Expand Up @@ -1827,7 +1825,7 @@ TEST_CASE("Request rate - Shared memory infer input calls")

std::this_thread::sleep_for(milliseconds(18));

early_exit = true;
worker->Exit();
infer_future.get();

const auto& actual_append_raw_calls{trrm.stats_->num_append_raw_calls};
Expand Down Expand Up @@ -2184,7 +2182,7 @@ TEST_CASE("request rate create schedule")
params.max_trials = 10;
bool is_sequence_model = false;
bool is_decoupled = false;
bool use_mock_infer = false;
bool use_mock_infer = true;
double rate = 10;
std::vector<uint32_t> expected_worker_ratio;

Expand Down

0 comments on commit 3604a7d

Please sign in to comment.