From fb4f66a6f4a78518e18da0cf8bfef8555f2d0aef Mon Sep 17 00:00:00 2001 From: Rafi Shamim Date: Thu, 1 May 2025 00:28:45 +0000 Subject: [PATCH] ttljob: add cluster setting to control concurrency Each processor of the TTL job creates a number of goroutines that operate concurrently to scan for expired rows and delete them. Previously, the concurrency was always equal to GOMAXPROCS. This new setting allows it to be overriden. Release note: None --- pkg/sql/ttl/ttlbase/ttl_helpers.go | 20 ++++++++++++++++++++ pkg/sql/ttl/ttljob/ttljob_processor.go | 2 +- 2 files changed, 21 insertions(+), 1 deletion(-) diff --git a/pkg/sql/ttl/ttlbase/ttl_helpers.go b/pkg/sql/ttl/ttlbase/ttl_helpers.go index 6c9058a3c20..4ba8af9ea9d 100644 --- a/pkg/sql/ttl/ttlbase/ttl_helpers.go +++ b/pkg/sql/ttl/ttlbase/ttl_helpers.go @@ -75,6 +75,14 @@ var ( false, settings.WithPublic, ) + processorConcurrencyOverride = settings.RegisterIntSetting( + settings.ApplicationLevel, + "sql.ttl.processor_concurrency", + "override for the TTL job processor concurrency (0 means use default based on GOMAXPROCS, "+ + "and any value greater than GOMAXPROCS will be capped at GOMAXPROCS)", + 0, + settings.NonNegativeInt, + ) ) var ( @@ -159,6 +167,18 @@ func GetChangefeedReplicationDisabled( return changefeedReplicationDisabled.Get(settingsValues) } +// GetProcessorConcurrency returns the concurrency to use for TTL job processors. +// If the cluster setting is 0 (default), it will return the provided default value. +// If the cluster setting is greater than 0, it will return the minimum of the +// cluster setting and the default value. +func GetProcessorConcurrency(settingsValues *settings.Values, defaultConcurrency int64) int64 { + override := processorConcurrencyOverride.Get(settingsValues) + if override > 0 { + return min(override, defaultConcurrency) + } + return defaultConcurrency +} + // BuildScheduleLabel returns a string value intended for use as the // schedule_name/label column for the scheduled job created by row level TTL. func BuildScheduleLabel(tbl *tabledesc.Mutable) string { diff --git a/pkg/sql/ttl/ttljob/ttljob_processor.go b/pkg/sql/ttl/ttljob/ttljob_processor.go index d7dfca3cabf..ff62cc28896 100644 --- a/pkg/sql/ttl/ttljob/ttljob_processor.go +++ b/pkg/sql/ttl/ttljob/ttljob_processor.go @@ -151,7 +151,7 @@ func (t *ttlProcessor) work(ctx context.Context) error { group := ctxgroup.WithContext(ctx) processorSpanCount := int64(len(ttlSpec.Spans)) - processorConcurrency := int64(runtime.GOMAXPROCS(0)) + processorConcurrency := ttlbase.GetProcessorConcurrency(&flowCtx.Cfg.Settings.SV, int64(runtime.GOMAXPROCS(0))) if processorSpanCount < processorConcurrency { processorConcurrency = processorSpanCount }