Skip to content

Commit

Permalink
feat: added sink batch size config for sink concurrency
Browse files Browse the repository at this point in the history
  • Loading branch information
ravisuhag committed Oct 31, 2024
1 parent 0d02ea1 commit f2b32ed
Show file tree
Hide file tree
Showing 7 changed files with 11 additions and 4 deletions.
6 changes: 3 additions & 3 deletions agent/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,6 @@ import (
"github.com/raystack/salt/log"
)

const defaultBatchSize = 1

// TimerFn of function type
type TimerFn func() func() int

Expand All @@ -32,6 +30,7 @@ type Agent struct {
retrier *retrier
stopOnSinkError bool
timerFn TimerFn
sinkBatchSize int
}

// NewAgent returns an Agent with plugin factories.
Expand All @@ -53,6 +52,7 @@ func NewAgent(config Config) *Agent {
logger: config.Logger,
retrier: retrier,
timerFn: timerFn,
sinkBatchSize: config.SinkBatchSize,
}
}

Expand Down Expand Up @@ -313,7 +313,7 @@ func (r *Agent) setupSink(ctx context.Context, sr recipe.PluginRecipe, stream *s

r.logger.Info("Successfully published record", "sink", sr.Name, "recipe", recipeName)
return nil
}, defaultBatchSize)
}, r.sinkBatchSize)

stream.onClose(func() {
if err := sink.Close(); err != nil {
Expand Down
1 change: 1 addition & 0 deletions agent/agent_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -772,6 +772,7 @@ func TestAgentRun(t *testing.T) {
Monitor: monitor,
MaxRetries: 2, // need to retry "at least" 2 times since Extractor returns RetryError twice
RetryInitialInterval: 1 * time.Millisecond, // this is to override default retry interval to reduce test time
SinkBatchSize: 1,
})
run := r.Run(ctx, validRecipe)
assert.NoError(t, run.Error)
Expand Down
1 change: 1 addition & 0 deletions agent/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,4 +17,5 @@ type Config struct {
RetryInitialInterval time.Duration
StopOnSinkError bool
TimerFn TimerFn
SinkBatchSize int
}
1 change: 1 addition & 0 deletions cmd/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,7 @@ func RunCmd() *cobra.Command {
MaxRetries: cfg.MaxRetries,
RetryInitialInterval: time.Duration(cfg.RetryInitialIntervalSeconds) * time.Second,
StopOnSinkError: cfg.StopOnSinkError,
SinkBatchSize: cfg.SinkBatchSize,
})

recipes, err := recipe.NewReader(lg, pathToConfig).Read(args[0])
Expand Down
1 change: 1 addition & 0 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ type Config struct {
OtelEnabled bool `mapstructure:"OTEL_ENABLED" default:"false"`
OtelCollectorAddr string `mapstructure:"OTEL_COLLECTOR_ADDR" default:"localhost:4317"`
OtelTraceSampleProbability float64 `mapstructure:"OTEL_TRACE_SAMPLE_PROBABILITY" default:"1"`
SinkBatchSize int `mapstructure:"SINK_BATCH_SIZE" default:"1"`
}

func Load(configFile string) (Config, error) {
Expand Down
2 changes: 2 additions & 0 deletions config/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ func TestLoad(t *testing.T) {
MaxRetries: 5,
RetryInitialIntervalSeconds: 5,
StopOnSinkError: false,
SinkBatchSize: 1,
},
},
{
Expand All @@ -46,6 +47,7 @@ func TestLoad(t *testing.T) {
OtelTraceSampleProbability: 1,
MaxRetries: 5,
RetryInitialIntervalSeconds: 5,
SinkBatchSize: 1,
},
expectedErr: "",
},
Expand Down
3 changes: 2 additions & 1 deletion config/meteor.yaml.sample
Original file line number Diff line number Diff line change
Expand Up @@ -5,4 +5,5 @@ STOP_ON_SINK_ERROR: false
APP_NAME: meteor
OTEL_ENABLED: false
OTEL_COLLECTOR_ADDR: "localhost:4317"
OTEL_TRACE_SAMPLE_PROBABILITY: 1
OTEL_TRACE_SAMPLE_PROBABILITY: 1
SINK_BATCH_SIZE: 10

0 comments on commit f2b32ed

Please sign in to comment.