-
Notifications
You must be signed in to change notification settings - Fork 602
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
datalake: introduced datalake backlog controller
Added datalake backlog controller to dynamically adjust the `datalake` scheduling group shares. Simple proportional controller tracks the average size of partition translation backlog and calculates how far is it from the desired value. The number of shares given to the `datalake` scheduling group is proportional to the difference between the desired and current backlog values. The rationale behind the controller is that datalake translation is almost always CPU bound and the translation rate (bytes of translated bytes per second) can be controlled by giving the translator more CPU time when required. Signed-off-by: Michał Maślanka <[email protected]>
- Loading branch information
1 parent
6d8e573
commit 3f2293e
Showing
6 changed files
with
189 additions
and
1 deletion.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,93 @@ | ||
/* | ||
* Copyright 2025 Redpanda Data, Inc. | ||
* | ||
* Licensed as a Redpanda Enterprise file under the Redpanda Community | ||
* License (the "License"); you may not use this file except in compliance with | ||
* the License. You may obtain a copy of the License at | ||
* | ||
* https://github.com/redpanda-data/redpanda/blob/master/licenses/rcl.md | ||
*/ | ||
#include "datalake/backlog_controller.h" | ||
|
||
#include "base/vlog.h" | ||
#include "config/configuration.h" | ||
#include "datalake/logger.h" | ||
#include "metrics/prometheus_sanitize.h" | ||
|
||
#include <seastar/core/coroutine.hh> | ||
#include <seastar/core/metrics.hh> | ||
#include <seastar/core/reactor.hh> | ||
using namespace std::chrono_literals; // NOLINT | ||
|
||
namespace datalake { | ||
|
||
backlog_controller::backlog_controller( | ||
sampling_fn sampling_fn, ss::scheduling_group sg) | ||
: _sampling_f(std::move(sampling_fn)) | ||
, _scheduling_group(sg) | ||
, _proportional_coeff( | ||
config::shard_local_cfg().iceberg_backlog_controller_p_coeff.bind()) | ||
, _setpoint(config::shard_local_cfg().iceberg_target_backlog_size.bind()) | ||
, _sampling_interval(5s) {} | ||
|
||
ss::future<> backlog_controller::start() { | ||
setup_metrics(); | ||
_sampling_timer.set_callback([this] { | ||
update(); | ||
if (!_as.abort_requested()) { | ||
_sampling_timer.arm(_sampling_interval); | ||
} | ||
}); | ||
|
||
_sampling_timer.arm(_sampling_interval); | ||
co_return; | ||
} | ||
|
||
ss::future<> backlog_controller::stop() { | ||
_sampling_timer.cancel(); | ||
_as.request_abort(); | ||
co_return; | ||
} | ||
|
||
void backlog_controller::update() { | ||
using namespace std::chrono_literals; | ||
|
||
_current_sample = _sampling_f(); | ||
|
||
auto current_err = _setpoint() - _current_sample; | ||
auto update = _proportional_coeff() * current_err; | ||
|
||
update = std::clamp(static_cast<int>(update), _min_shares, _max_shares); | ||
|
||
vlog( | ||
datalake_log.trace, | ||
"state update: {{setpoint: {}, current_backlog: {:2f}, current_error: " | ||
"{:2f}, shares_update: {:2f}, current_share: {}}}", | ||
_setpoint(), | ||
_current_sample, | ||
current_err, | ||
|
||
update, | ||
update); | ||
|
||
_scheduling_group.set_shares(static_cast<float>(update)); | ||
} | ||
|
||
void backlog_controller::setup_metrics() { | ||
if (config::shard_local_cfg().disable_metrics()) { | ||
return; | ||
} | ||
namespace sm = ss::metrics; | ||
_metrics.add_group( | ||
prometheus_sanitize::metrics_name("iceberg:backlog:controller"), | ||
{ | ||
sm::make_gauge( | ||
"backlog_size", | ||
[this] { return _current_sample; }, | ||
sm::description("Iceberg controller current backlog - averaged size " | ||
"of the backlog per partition")), | ||
|
||
}); | ||
} | ||
|
||
} // namespace datalake |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,50 @@ | ||
/* | ||
* Copyright 2025 Redpanda Data, Inc. | ||
* | ||
* Licensed as a Redpanda Enterprise file under the Redpanda Community | ||
* License (the "License"); you may not use this file except in compliance with | ||
* the License. You may obtain a copy of the License at | ||
* | ||
* https://github.com/redpanda-data/redpanda/blob/master/licenses/rcl.md | ||
*/ | ||
|
||
#pragma once | ||
#include "base/seastarx.h" | ||
#include "config/property.h" | ||
#include "metrics/metrics.h" | ||
|
||
#include <seastar/core/gate.hh> | ||
#include <seastar/core/io_priority_class.hh> | ||
#include <seastar/core/metrics_registration.hh> | ||
#include <seastar/core/timer.hh> | ||
#include <seastar/util/log.hh> | ||
|
||
namespace datalake { | ||
|
||
class backlog_controller { | ||
public: | ||
using sampling_fn = ss::noncopyable_function<long double()>; | ||
backlog_controller(sampling_fn, ss::scheduling_group sg); | ||
|
||
ss::future<> start(); | ||
ss::future<> stop(); | ||
|
||
private: | ||
void update(); | ||
void setup_metrics(); | ||
|
||
sampling_fn _sampling_f; | ||
ss::scheduling_group _scheduling_group; | ||
|
||
config::binding<double> _proportional_coeff; | ||
config::binding<uint32_t> _setpoint; | ||
std::chrono::steady_clock::duration _sampling_interval; | ||
ss::timer<> _sampling_timer; | ||
long double _current_sample{0}; | ||
|
||
int _min_shares{1}; | ||
int _max_shares{1000}; | ||
ss::abort_source _as; | ||
metrics::internal_metric_groups _metrics; | ||
}; | ||
} // namespace datalake |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters