Skip to content

Commit

Permalink
Several fixes & improvements of C++ MonitorStage (#2170)
Browse files Browse the repository at this point in the history
- For some short-running pipelines, `MonitorStage` now ensures there's at least one line of progress bar output with throughput shown.
- For pipelines need explicitly stopped with `Ctrl+C`, `MonitorStage` now ensures to output the progress bars again when the pipeline is completed, avoids the progress bars from being covered by other logs.
- Using `microseconds` when calculating the throughput to avoid `inf` throughput
- Each completed progress bar will turn into green with text `[Completed]`

Closes #2148

## By Submitting this PR I confirm:
- I am familiar with the [Contributing Guidelines](https://github.com/nv-morpheus/Morpheus/blob/main/docs/source/developer_guide/contributing.md).
- When the PR is ready for review, new or existing tests cover these changes.
- When the PR is ready for review, the documentation is up to date with these changes.

Authors:
  - Yuchen Zhang (https://github.com/yczhang-nv)

Approvers:
  - Will Killian (https://github.com/willkill07)

URL: #2170
  • Loading branch information
yczhang-nv authored Feb 6, 2025
1 parent 159fc7d commit 2226afe
Show file tree
Hide file tree
Showing 2 changed files with 60 additions and 36 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -104,27 +104,22 @@ class ProgressBarContextManager
{
std::lock_guard<std::mutex> lock(m_mutex);

// To avoid display_all() being executed after calling mark_pbar_as_completed() in some race conditions
// If the progress bars needs to be updated after completion, move the cursor up to the beginning
if (m_is_completed)
{
return;
move_cursor_up(m_progress_bars.size());
}

// A bit of hack here to make the font settings work. Indicators enables the font options only if the bars are
// output to standard streams (see is_colorized() in <indicators/termcolor.hpp>), but since we are still using
// the ostream (m_stdout_os) that is connected to the console terminal, the font options should be enabled.
// The internal function here is used to manually enable the font display.
m_stdout_os.iword(termcolor::_internal::colorize_index()) = 1;
display_all_impl();

for (auto& pbar : m_progress_bars)
// If all the progress bars are completed, keep the cursor position as it is
if (m_is_completed)
{
pbar->print_progress(true);
m_stdout_os << termcolor::reset; // The font option only works for the current bar
m_stdout_os << std::endl;
return;
}

// After each round of display, move cursor up ("\033[A") to the beginning of the first bar
m_stdout_os << "\033[" << m_progress_bars.size() << "A" << std::flush;
// Otherwise, move cursor up to the beginning after each round of display
move_cursor_up(m_progress_bars.size());
}

void mark_pbar_as_completed(size_t bar_id)
Expand All @@ -145,17 +140,9 @@ class ProgressBarContextManager
}
if (all_pbars_completed)
{
// Move the cursor down to the bottom of the last progress bar
// Doing this here instead of the destructor to avoid a race condition with the pipeline's
// "====Pipeline Complete====" log message.
// Using a string stream to ensure other logs are not interleaved.
std::ostringstream new_lines;
for (std::size_t i = 0; i < m_progress_bars.size(); ++i)
{
new_lines << "\n";
}
// Display again when completed to avoid progress bars being covered by other logs
display_all_impl();

m_stdout_os << new_lines.str() << std::flush;
m_is_completed = true;
}
}
Expand Down Expand Up @@ -199,6 +186,34 @@ class ProgressBarContextManager
return std::move(progress_bar);
}

void display_all_impl()
{
// A bit of hack here to make the font settings work. Indicators enables the font options only if the bars are
// output to standard streams (see is_colorized() in <indicators/termcolor.hpp>), but since we are still using
// the ostream (m_stdout_os) that is connected to the console terminal, the font options should be enabled.
// The internal function here is used to manually enable the font display.
m_stdout_os.iword(termcolor::_internal::colorize_index()) = 1;

for (auto& pbar : m_progress_bars)
{
pbar->print_progress(true);
m_stdout_os << termcolor::reset; // The font option only works for the current bar
m_stdout_os << std::endl;
}
}

void move_cursor_up(size_t lines)
{
// "\033[<n>A" means moving the cursor up for n lines
m_stdout_os << "\033[" << lines << "A" << std::flush;
}

void move_cursor_down(size_t lines)
{
// "\033[<n>B" means moving the cursor down for n lines
m_stdout_os << "\033[" << lines << "B" << std::flush;
}

indicators::DynamicProgress<indicators::IndeterminateProgressBar> m_dynamic_progress_bars;
std::vector<std::unique_ptr<indicators::IndeterminateProgressBar>> m_progress_bars;
std::mutex m_mutex;
Expand Down Expand Up @@ -227,8 +242,8 @@ class MonitorController
* @param unit : the unit of message count
* @param determine_count_fn : A function that computes the count for each incoming message
*/
MonitorController(const std::string& description,
std::string unit = "messages",
MonitorController(const std::string& description = "Progress",
const std::string& unit = "messages",
indicators::Color text_color = indicators::Color::cyan,
indicators::FontStyle font_style = indicators::FontStyle::bold,
std::optional<std::function<size_t(MessageT)>> determine_count_fn = std::nullopt);
Expand All @@ -239,23 +254,26 @@ class MonitorController
void sink_on_completed();

private:
static std::string format_duration(std::chrono::seconds duration);
static std::string format_throughput(std::chrono::seconds duration, size_t count, const std::string& unit);
static std::string format_duration(std::chrono::microseconds duration);
static std::string format_throughput(std::chrono::microseconds duration, size_t count, const std::string& unit);

size_t m_bar_id;
const std::string m_description;
const std::string m_unit;
std::optional<std::function<size_t(MessageT)>> m_determine_count_fn;
size_t m_count{0};
time_point_t m_start_time;
bool m_is_started{false}; // Set to true after the first call to progress_sink()
bool m_is_completed{false};
};

template <typename MessageT>
MonitorController<MessageT>::MonitorController(const std::string& description,
std::string unit,
const std::string& unit,
indicators::Color text_color,
indicators::FontStyle font_style,
std::optional<std::function<size_t(MessageT)>> determine_count_fn) :
m_description(std::move(description)),
m_unit(std::move(unit)),
m_determine_count_fn(determine_count_fn)
{
Expand All @@ -268,7 +286,7 @@ MonitorController<MessageT>::MonitorController(const std::string& description,
}
}

m_bar_id = ProgressBarContextManager::get_instance().add_progress_bar(description, text_color, font_style);
m_bar_id = ProgressBarContextManager::get_instance().add_progress_bar(m_description, text_color, font_style);
}

template <typename MessageT>
Expand All @@ -280,13 +298,15 @@ MessageT MonitorController<MessageT>::progress_sink(MessageT msg)
m_is_started = true;
}
m_count += (*m_determine_count_fn)(msg);
auto duration = std::chrono::duration_cast<std::chrono::seconds>(std::chrono::system_clock::now() - m_start_time);
auto duration =
std::chrono::duration_cast<std::chrono::microseconds>(std::chrono::system_clock::now() - m_start_time);

auto& manager = ProgressBarContextManager::get_instance();
auto& pbar = manager.progress_bars()[m_bar_id];

// Update the progress bar
pbar->set_option(indicators::option::PostfixText{format_throughput(duration, m_count, m_unit)});
pbar->set_option(indicators::option::PrefixText{m_description});
pbar->tick();

manager.display_all();
Expand All @@ -298,14 +318,19 @@ template <typename MessageT>
void MonitorController<MessageT>::sink_on_completed()
{
auto& manager = ProgressBarContextManager::get_instance();
auto& pbar = manager.progress_bars()[m_bar_id];

pbar->set_option(indicators::option::PrefixText{"[Completed]" + m_description});
pbar->set_option(indicators::option::ForegroundColor{indicators::Color::green});

manager.mark_pbar_as_completed(m_bar_id);
}

template <typename MessageT>
std::string MonitorController<MessageT>::format_duration(std::chrono::seconds duration)
std::string MonitorController<MessageT>::format_duration(std::chrono::microseconds duration)
{
auto minutes = std::chrono::duration_cast<std::chrono::minutes>(duration);
auto seconds = duration - minutes;
auto seconds = std::chrono::duration_cast<std::chrono::seconds>(duration - minutes);

std::ostringstream oss;
oss << std::setw(2) << std::setfill('0') << minutes.count() << "m:" << std::setw(2) << std::setfill('0')
Expand All @@ -314,11 +339,12 @@ std::string MonitorController<MessageT>::format_duration(std::chrono::seconds du
}

template <typename MessageT>
std::string MonitorController<MessageT>::format_throughput(std::chrono::seconds duration,
std::string MonitorController<MessageT>::format_throughput(std::chrono::microseconds duration,
size_t count,
const std::string& unit)
{
double throughput = static_cast<double>(count) / duration.count();
double time_in_seconds = std::chrono::duration_cast<std::chrono::duration<double>>(duration).count();
double throughput = static_cast<double>(count) / time_in_seconds;
std::ostringstream oss;
oss << count << " " << unit << " in " << format_duration(duration) << ", "
<< "Throughput: " << std::fixed << std::setprecision(2) << throughput << " " << unit << "/s";
Expand Down
2 changes: 0 additions & 2 deletions python/morpheus/morpheus/stages/general/monitor_stage.py
Original file line number Diff line number Diff line change
Expand Up @@ -150,8 +150,6 @@ def _build_single(self, builder: mrc.Builder, input_node: mrc.SegmentObject) ->
self._mc._font_style,
self._mc._determine_count_fn)

node.launch_options.pe_count = self._config.num_threads

else:
# Use a component so we track progress using the upstream progress engine. This will provide more accurate
# results
Expand Down

0 comments on commit 2226afe

Please sign in to comment.