Skip to content

Commit

Permalink
storage: schedule compaction by dirty_ratio
Browse files Browse the repository at this point in the history
  • Loading branch information
WillemKauf committed Feb 1, 2025
1 parent a67c4bf commit 7a6aab3
Showing 1 changed file with 45 additions and 13 deletions.
58 changes: 45 additions & 13 deletions src/v/storage/log_manager.cc
Original file line number Diff line number Diff line change
Expand Up @@ -234,6 +234,8 @@ log_manager::housekeeping_scan(model::timestamp collection_threshold) {
* compaction is already sequential when this will be unified with
* compaction, the whole task could be made concurrent
*/
absl::btree_map<double, model::ntp, std::greater<>>
ntp_by_compaction_heuristic;
while (!_logs_list.empty()
&& is_not_set(_logs_list.front().flags, bflags::lifetime_checked)) {
if (_abort_source.abort_requested()) {
Expand All @@ -248,6 +250,37 @@ log_manager::housekeeping_scan(model::timestamp collection_threshold) {
// prevents the removal of the parent object. this makes awaiting
// apply_segment_ms safe against removal of segments from _logs_list
co_await current_log.handle->apply_segment_ms();

auto should_compact_log = [](ss::shared_ptr<log> l) {
// Consider the dirty ratio.
const auto min_cleanable_dirty_ratio
= l->config().min_cleanable_dirty_ratio().value_or(0.0) / 100.0;
const auto dirty_ratio = l->dirty_ratio();
vlog(
gclog.info,
"{} comparing dirty ratio of log {} against min cleanable {}",
l->config().ntp(),
dirty_ratio,
min_cleanable_dirty_ratio);
if (dirty_ratio >= min_cleanable_dirty_ratio) {
return true;
}

return false;
};

const auto compact_log = should_compact_log(current_log.handle);

if (compact_log) {
// Order ntps by compaction heuristic.
// Currently, this is just the dirty ratio.
auto compute_compaction_heuristic =
[](ss::shared_ptr<log> l) -> double { return l->dirty_ratio(); };
auto compaction_heuristic_weight = compute_compaction_heuristic(
current_log.handle);
ntp_by_compaction_heuristic.emplace(
compaction_heuristic_weight, current_log.handle->config().ntp());
}
}

if (
Expand All @@ -260,29 +293,28 @@ log_manager::housekeeping_scan(model::timestamp collection_threshold) {
co_await compaction_map->initialize(compaction_mem_bytes);
_compaction_hash_key_map = std::move(compaction_map);
}
while (!_logs_list.empty()
&& is_not_set(_logs_list.front().flags, bflags::compacted)) {
if (_abort_source.abort_requested()) {
co_return;
}

auto& current_log = _logs_list.front();
for (const auto& [_, ntp] : ntp_by_compaction_heuristic) {
auto current_log = get(ntp);

_logs_list.shift_forward();
if (!current_log) {
continue;
}

current_log.flags |= bflags::compacted;
current_log.last_compaction = ss::lowres_clock::now();
if (_abort_source.abort_requested()) {
co_return;
}

auto ntp_sanitizer_cfg = _config.maybe_get_ntp_sanitizer_config(
current_log.handle->config().ntp());
current_log->config().ntp());
// NOTE: housekeeping holds _compaction_housekeeping_gate, that prevents
// the removal of the parent object. this makes awaiting housekeeping
// safe against removal of segments from _logs_list
co_await current_log.handle->housekeeping(housekeeping_config(
co_await current_log->housekeeping(housekeeping_config(
collection_threshold,
_config.retention_bytes(),
current_log.handle->stm_manager()->max_collectible_offset(),
current_log.handle->config().tombstone_retention_ms(),
current_log->stm_manager()->max_collectible_offset(),
current_log->config().tombstone_retention_ms(),
_config.compaction_priority,
_abort_source,
std::move(ntp_sanitizer_cfg),
Expand Down

0 comments on commit 7a6aab3

Please sign in to comment.