Skip to content

Commit

Permalink
datalake: introduced datalake backlog controller
Browse files Browse the repository at this point in the history
Added datalake backlog controller to dynamically adjust the `datalake`
scheduling group shares. Simple proportional controller tracks the
average size of partition translation backlog and calculates how far is
it from the desired value. The number of shares given to the `datalake`
scheduling group is proportional to the difference between the desired
and current backlog values.

The rationale behind the controller is that datalake translation is
almost always CPU bound and the translation rate (bytes of translated
bytes per second) can be controlled by giving the translator more CPU
time when required.

Signed-off-by: Michał Maślanka <[email protected]>
  • Loading branch information
mmaslankaprv committed Jan 31, 2025
1 parent 9a14105 commit f9e0982
Show file tree
Hide file tree
Showing 6 changed files with 189 additions and 1 deletion.
20 changes: 20 additions & 0 deletions src/v/datalake/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -588,3 +588,23 @@ redpanda_cc_library(
"//src/v/model",
],
)

redpanda_cc_library(
name = "backlog_controller",
srcs = [
"backlog_controller.cc",
],
hdrs = [
"backlog_controller.h",
],
include_prefix = "datalake",
visibility = [":__subpackages__"],
deps = [
":logger",
"//src/v/base",
"//src/v/config",
"//src/v/metrics",
"//src/v/model",
"@seastar",
],
)
1 change: 1 addition & 0 deletions src/v/datalake/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ v_cc_library(
NAME datalake_manager
SRCS
datalake_manager.cc
backlog_controller.cc
DEPS
v::datalake_translation
v::datalake_coordinator
Expand Down
93 changes: 93 additions & 0 deletions src/v/datalake/backlog_controller.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
/*
* Copyright 2025 Redpanda Data, Inc.
*
* Licensed as a Redpanda Enterprise file under the Redpanda Community
* License (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* https://github.com/redpanda-data/redpanda/blob/master/licenses/rcl.md
*/
#include "datalake/backlog_controller.h"

#include "base/vlog.h"
#include "config/configuration.h"
#include "datalake/logger.h"
#include "metrics/prometheus_sanitize.h"

#include <seastar/core/coroutine.hh>
#include <seastar/core/metrics.hh>
#include <seastar/core/reactor.hh>
using namespace std::chrono_literals; // NOLINT

namespace datalake {

backlog_controller::backlog_controller(
sampling_fn sampling_fn, ss::scheduling_group sg)
: _sampling_f(std::move(sampling_fn))
, _scheduling_group(sg)
, _proportional_coeff(
config::shard_local_cfg().iceberg_backlog_controller_p_coeff.bind())
, _setpoint(config::shard_local_cfg().iceberg_target_backlog_size.bind())
, _sampling_interval(5s) {}

ss::future<> backlog_controller::start() {
setup_metrics();
_sampling_timer.set_callback([this] {
update();
if (!_as.abort_requested()) {
_sampling_timer.arm(_sampling_interval);
}
});

_sampling_timer.arm(_sampling_interval);
co_return;
}

ss::future<> backlog_controller::stop() {
_sampling_timer.cancel();
_as.request_abort();
co_return;
}

void backlog_controller::update() {
using namespace std::chrono_literals;

_current_sample = _sampling_f();

auto current_err = _setpoint() - _current_sample;
auto update = _proportional_coeff() * current_err;

update = std::clamp(static_cast<int>(update), _min_shares, _max_shares);

vlog(
datalake_log.trace,
"state update: {{setpoint: {}, current_backlog: {:2f}, current_error: "
"{:2f}, shares_update: {:2f}, current_share: {}}}",
_setpoint(),
_current_sample,
current_err,

update,
update);

_scheduling_group.set_shares(static_cast<float>(update));
}

void backlog_controller::setup_metrics() {
if (config::shard_local_cfg().disable_metrics()) {
return;
}
namespace sm = ss::metrics;
_metrics.add_group(
prometheus_sanitize::metrics_name("iceberg:backlog:controller"),
{
sm::make_gauge(
"backlog_size",
[this] { return _current_sample; },
sm::description("Iceberg controller current backlog - averaged size "
"of the backlog per partition")),

});
}

} // namespace datalake
50 changes: 50 additions & 0 deletions src/v/datalake/backlog_controller.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
/*
* Copyright 2025 Redpanda Data, Inc.
*
* Licensed as a Redpanda Enterprise file under the Redpanda Community
* License (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* https://github.com/redpanda-data/redpanda/blob/master/licenses/rcl.md
*/

#pragma once
#include "base/seastarx.h"
#include "config/property.h"
#include "metrics/metrics.h"

#include <seastar/core/gate.hh>
#include <seastar/core/io_priority_class.hh>
#include <seastar/core/metrics_registration.hh>
#include <seastar/core/timer.hh>
#include <seastar/util/log.hh>

namespace datalake {

class backlog_controller {
public:
using sampling_fn = ss::noncopyable_function<long double()>;
backlog_controller(sampling_fn, ss::scheduling_group sg);

ss::future<> start();
ss::future<> stop();

private:
void update();
void setup_metrics();

sampling_fn _sampling_f;
ss::scheduling_group _scheduling_group;

config::binding<double> _proportional_coeff;
config::binding<uint32_t> _setpoint;
std::chrono::steady_clock::duration _sampling_interval;
ss::timer<> _sampling_timer;
long double _current_sample{0};

int _min_shares{1};
int _max_shares{1000};
ss::abort_source _as;
metrics::internal_metric_groups _metrics;
};
} // namespace datalake
22 changes: 22 additions & 0 deletions src/v/datalake/datalake_manager.cc
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
#include "cluster/partition_manager.h"
#include "cluster/topic_table.h"
#include "cluster/types.h"
#include "datalake/backlog_controller.h"
#include "datalake/catalog_schema_manager.h"
#include "datalake/cloud_data_io.h"
#include "datalake/coordinator/catalog_factory.h"
Expand Down Expand Up @@ -116,6 +117,23 @@ datalake_manager::datalake_manager(
}
datalake_manager::~datalake_manager() = default;

double datalake_manager::average_translation_backlog() {
if (_translators.empty()) {
return 0;
}
size_t total_lag = 0;
for (const auto& [_, translator] : _translators) {
auto backlog_size = translator->translation_backlog();
// skip over translators that are not yet ready to report anything
if (!backlog_size) {
continue;
}
total_lag += backlog_size.value();
}

return total_lag / _translators.size();
}

ss::future<> datalake_manager::start() {
_catalog = co_await _catalog_factory->create_catalog();
_schema_mgr = std::make_unique<catalog_schema_manager>(*_catalog);
Expand Down Expand Up @@ -184,10 +202,14 @@ ss::future<> datalake_manager::start() {
});
});
_schema_cache->start();
_backlog_controller = std::make_unique<backlog_controller>(
[this] { return average_translation_backlog(); }, _sg);
co_await _backlog_controller->start();
}

ss::future<> datalake_manager::stop() {
auto f = _gate.close();
co_await _backlog_controller->stop();
_deregistrations.clear();
co_await ss::max_concurrent_for_each(
_translators, 32, [](auto& entry) mutable {
Expand Down
4 changes: 3 additions & 1 deletion src/v/datalake/datalake_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
#include "cluster/fwd.h"
#include "config/property.h"
#include "container/chunked_hash_map.h"
#include "datalake/backlog_controller.h"
#include "datalake/fwd.h"
#include "datalake/record_schema_resolver.h"
#include "datalake/translation/partition_translator.h"
Expand Down Expand Up @@ -77,7 +78,7 @@ class datalake_manager : public ss::peering_sharded_service<datalake_manager> {
void start_translator(
ss::lw_shared_ptr<cluster::partition>, model::iceberg_mode);
void stop_translator(const model::ntp&);

double average_translation_backlog();
model::node_id _self;
ss::sharded<raft::group_manager>* _group_mgr;
ss::sharded<cluster::partition_manager>* _partition_mgr;
Expand All @@ -94,6 +95,7 @@ class datalake_manager : public ss::peering_sharded_service<datalake_manager> {
std::unique_ptr<datalake::schema_manager> _schema_mgr;
std::unique_ptr<datalake::type_resolver> _type_resolver;
std::unique_ptr<datalake::schema_cache> _schema_cache;
std::unique_ptr<backlog_controller> _backlog_controller;
ss::sharded<ss::abort_source>* _as;
ss::scheduling_group _sg;
ss::gate _gate;
Expand Down

0 comments on commit f9e0982

Please sign in to comment.