diff --git a/pkg/sql/ttl/ttlbase/ttl_helpers.go b/pkg/sql/ttl/ttlbase/ttl_helpers.go index 6c9058a3c20c..4ba8af9ea9de 100644 --- a/pkg/sql/ttl/ttlbase/ttl_helpers.go +++ b/pkg/sql/ttl/ttlbase/ttl_helpers.go @@ -75,6 +75,14 @@ var ( false, settings.WithPublic, ) + processorConcurrencyOverride = settings.RegisterIntSetting( + settings.ApplicationLevel, + "sql.ttl.processor_concurrency", + "override for the TTL job processor concurrency (0 means use default based on GOMAXPROCS, "+ + "and any value greater than GOMAXPROCS will be capped at GOMAXPROCS)", + 0, + settings.NonNegativeInt, + ) ) var ( @@ -159,6 +167,18 @@ func GetChangefeedReplicationDisabled( return changefeedReplicationDisabled.Get(settingsValues) } +// GetProcessorConcurrency returns the concurrency to use for TTL job processors. +// If the cluster setting is 0 (default), it will return the provided default value. +// If the cluster setting is greater than 0, it will return the minimum of the +// cluster setting and the default value. +func GetProcessorConcurrency(settingsValues *settings.Values, defaultConcurrency int64) int64 { + override := processorConcurrencyOverride.Get(settingsValues) + if override > 0 { + return min(override, defaultConcurrency) + } + return defaultConcurrency +} + // BuildScheduleLabel returns a string value intended for use as the // schedule_name/label column for the scheduled job created by row level TTL. func BuildScheduleLabel(tbl *tabledesc.Mutable) string { diff --git a/pkg/sql/ttl/ttljob/ttljob_processor.go b/pkg/sql/ttl/ttljob/ttljob_processor.go index d7dfca3cabff..ff62cc288965 100644 --- a/pkg/sql/ttl/ttljob/ttljob_processor.go +++ b/pkg/sql/ttl/ttljob/ttljob_processor.go @@ -151,7 +151,7 @@ func (t *ttlProcessor) work(ctx context.Context) error { group := ctxgroup.WithContext(ctx) processorSpanCount := int64(len(ttlSpec.Spans)) - processorConcurrency := int64(runtime.GOMAXPROCS(0)) + processorConcurrency := ttlbase.GetProcessorConcurrency(&flowCtx.Cfg.Settings.SV, int64(runtime.GOMAXPROCS(0))) if processorSpanCount < processorConcurrency { processorConcurrency = processorSpanCount }