Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Ensure MonitorStage returns the cursor back to the end of output #2121

Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,40 @@ class ProgressBarContextManager
m_stdout_os << "\033[" << m_progress_bars.size() << "A";
}

void mark_pbar_as_completed(size_t bar_id)
{
std::lock_guard<std::mutex> lock(m_mutex);
m_progress_bars[bar_id]->mark_as_completed();

if (!m_is_completed)
{
bool all_pbars_completed = true;
for (auto& pbar : m_progress_bars)
{
if (!pbar->is_completed())
{
all_pbars_completed = false;
break;
}
}
if (all_pbars_completed)
{
// Move the cursor down to the bottom of the last progress bar
// Doing this here instead of the destructor to avoid a race condition with the pipeline's
// "====Pipeline Complete====" log message.
// Using a string stream to ensure other logs are not interleaved.
std::ostringstream new_lines;
for (std::size_t i = 0; i < m_progress_bars.size(); ++i)
{
new_lines << "\n";
}

m_stdout_os << new_lines.str() << std::flush;
m_is_completed = true;
}
}
}

private:
ProgressBarContextManager() : m_stdout_streambuf(std::cout.rdbuf()), m_stdout_os(m_stdout_streambuf)
{
Expand Down Expand Up @@ -170,6 +204,7 @@ class ProgressBarContextManager
std::streambuf* m_stdout_streambuf; // Stores the original std::cout.rdbuf()
std::ostream m_stdout_os;
boost::iostreams::filtering_ostreambuf m_log_streambuf;
bool m_is_completed{false};
};

/**
Expand Down Expand Up @@ -201,7 +236,7 @@ class MonitorController
static std::string format_duration(std::chrono::seconds duration);
static std::string format_throughput(std::chrono::seconds duration, size_t count, const std::string& unit);

int m_bar_id;
size_t m_bar_id;
const std::string m_unit;
std::optional<std::function<size_t(MessageT)>> m_determine_count_fn;
size_t m_count{0};
Expand Down Expand Up @@ -257,9 +292,7 @@ template <typename MessageT>
void MonitorController<MessageT>::sink_on_completed()
{
auto& manager = ProgressBarContextManager::get_instance();
auto& pbar = manager.progress_bars()[m_bar_id];

pbar->mark_as_completed();
manager.mark_pbar_as_completed(m_bar_id);
}

template <typename MessageT>
Expand Down
Loading