From 0409aefa46f3e4cc9575176113adf9e97a2ee3a1 Mon Sep 17 00:00:00 2001 From: Rafi Shamim Date: Thu, 1 May 2025 00:28:45 +0000 Subject: [PATCH] ttljob: add cluster setting to control concurrency Each processor of the TTL job creates a number of goroutines that operate concurrently to scan for expired rows and delete them. Previously, the concurrency was always equal to GOMAXPROCS. This new setting allows it to be overriden. Release note: None --- pkg/sql/ttl/ttlbase/ttl_helpers.go | 20 ++++++++++++++++++++ pkg/sql/ttl/ttljob/ttljob_processor.go | 2 +- 2 files changed, 21 insertions(+), 1 deletion(-) diff --git a/pkg/sql/ttl/ttlbase/ttl_helpers.go b/pkg/sql/ttl/ttlbase/ttl_helpers.go index e41dacdc2795..608945858d63 100644 --- a/pkg/sql/ttl/ttlbase/ttl_helpers.go +++ b/pkg/sql/ttl/ttlbase/ttl_helpers.go @@ -76,6 +76,14 @@ var ( false, settings.WithPublic, ) + processorConcurrencyOverride = settings.RegisterIntSetting( + settings.ApplicationLevel, + "sql.ttl.processor_concurrency", + "override for the TTL job processor concurrency (0 means use default based on GOMAXPROCS, "+ + "and any value greater than GOMAXPROCS will be capped at GOMAXPROCS)", + 0, + settings.NonNegativeInt, + ) ) var ( @@ -160,6 +168,18 @@ func GetChangefeedReplicationDisabled( return changefeedReplicationDisabled.Get(settingsValues) } +// GetProcessorConcurrency returns the concurrency to use for TTL job processors. +// If the cluster setting is 0 (default), it will return the provided default value. +// If the cluster setting is greater than 0, it will return the minimum of the +// cluster setting and the default value. +func GetProcessorConcurrency(settingsValues *settings.Values, defaultConcurrency int64) int64 { + override := processorConcurrencyOverride.Get(settingsValues) + if override > 0 { + return min(override, defaultConcurrency) + } + return defaultConcurrency +} + // BuildScheduleLabel returns a string value intended for use as the // schedule_name/label column for the scheduled job created by row level TTL. func BuildScheduleLabel(tbl *tabledesc.Mutable) string { diff --git a/pkg/sql/ttl/ttljob/ttljob_processor.go b/pkg/sql/ttl/ttljob/ttljob_processor.go index d8d785b792ac..3cbd173dd20d 100644 --- a/pkg/sql/ttl/ttljob/ttljob_processor.go +++ b/pkg/sql/ttl/ttljob/ttljob_processor.go @@ -162,7 +162,7 @@ func (t *ttlProcessor) work(ctx context.Context) error { group := ctxgroup.WithContext(ctx) processorSpanCount := int64(len(ttlSpec.Spans)) - processorConcurrency := int64(runtime.GOMAXPROCS(0)) + processorConcurrency := ttlbase.GetProcessorConcurrency(&flowCtx.Cfg.Settings.SV, int64(runtime.GOMAXPROCS(0))) if processorSpanCount < processorConcurrency { processorConcurrency = processorSpanCount }