Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Pre release #79

Merged
merged 12 commits into from
Mar 28, 2024
19 changes: 5 additions & 14 deletions cmd/otel-collector/components.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@ import (
"github.com/open-telemetry/opentelemetry-collector-contrib/exporter/kafkaexporter"
"github.com/open-telemetry/opentelemetry-collector-contrib/exporter/loadbalancingexporter"
"github.com/open-telemetry/opentelemetry-collector-contrib/exporter/opencensusexporter"
"github.com/open-telemetry/opentelemetry-collector-contrib/exporter/parquetexporter"
"github.com/open-telemetry/opentelemetry-collector-contrib/exporter/prometheusexporter"
"github.com/open-telemetry/opentelemetry-collector-contrib/exporter/prometheusremotewriteexporter"
"github.com/open-telemetry/opentelemetry-collector-contrib/exporter/zipkinexporter"
Expand All @@ -27,7 +26,7 @@ import (
"github.com/open-telemetry/opentelemetry-collector-contrib/extension/encoding/zipkinencodingextension"
"github.com/open-telemetry/opentelemetry-collector-contrib/extension/headerssetterextension"
"github.com/open-telemetry/opentelemetry-collector-contrib/extension/healthcheckextension"
"github.com/open-telemetry/opentelemetry-collector-contrib/extension/httpforwarder"
"github.com/open-telemetry/opentelemetry-collector-contrib/extension/httpforwarderextension"
"github.com/open-telemetry/opentelemetry-collector-contrib/extension/jaegerremotesampling"
"github.com/open-telemetry/opentelemetry-collector-contrib/extension/oauth2clientauthextension"
"github.com/open-telemetry/opentelemetry-collector-contrib/extension/observer/dockerobserver"
Expand Down Expand Up @@ -59,8 +58,6 @@ import (
"github.com/open-telemetry/opentelemetry-collector-contrib/processor/resourceprocessor"
"github.com/open-telemetry/opentelemetry-collector-contrib/processor/routingprocessor"
"github.com/open-telemetry/opentelemetry-collector-contrib/processor/schemaprocessor"
"github.com/open-telemetry/opentelemetry-collector-contrib/processor/servicegraphprocessor"
"github.com/open-telemetry/opentelemetry-collector-contrib/processor/spanmetricsprocessor"
"github.com/open-telemetry/opentelemetry-collector-contrib/processor/spanprocessor"
"github.com/open-telemetry/opentelemetry-collector-contrib/processor/tailsamplingprocessor"
"github.com/open-telemetry/opentelemetry-collector-contrib/processor/transformprocessor"
Expand Down Expand Up @@ -88,7 +85,6 @@ import (
"github.com/open-telemetry/opentelemetry-collector-contrib/receiver/elasticsearchreceiver"
"github.com/open-telemetry/opentelemetry-collector-contrib/receiver/expvarreceiver"
"github.com/open-telemetry/opentelemetry-collector-contrib/receiver/filelogreceiver"
"github.com/open-telemetry/opentelemetry-collector-contrib/receiver/filereceiver"
"github.com/open-telemetry/opentelemetry-collector-contrib/receiver/filestatsreceiver"
"github.com/open-telemetry/opentelemetry-collector-contrib/receiver/flinkmetricsreceiver"
"github.com/open-telemetry/opentelemetry-collector-contrib/receiver/fluentforwardreceiver"
Expand Down Expand Up @@ -154,11 +150,10 @@ import (
"go.opentelemetry.io/collector/connector"
"go.opentelemetry.io/collector/connector/forwardconnector"
"go.opentelemetry.io/collector/exporter"
"go.opentelemetry.io/collector/exporter/loggingexporter"
"go.opentelemetry.io/collector/exporter/debugexporter"
"go.opentelemetry.io/collector/exporter/otlpexporter"
"go.opentelemetry.io/collector/exporter/otlphttpexporter"
"go.opentelemetry.io/collector/extension"
"go.opentelemetry.io/collector/extension/ballastextension"
"go.opentelemetry.io/collector/extension/zpagesextension"
"go.opentelemetry.io/collector/otelcol"
"go.opentelemetry.io/collector/processor"
Expand Down Expand Up @@ -187,7 +182,7 @@ func components() (otelcol.Factories, error) {
bearertokenauthextension.NewFactory(),
headerssetterextension.NewFactory(),
healthcheckextension.NewFactory(),
httpforwarder.NewFactory(),
httpforwarderextension.NewFactory(),
jaegerremotesampling.NewFactory(),
oauth2clientauthextension.NewFactory(),
ecsobserver.NewFactory(),
Expand Down Expand Up @@ -243,7 +238,6 @@ func components() (otelcol.Factories, error) {
expvarreceiver.NewFactory(),
filelogreceiver.NewFactory(),
filestatsreceiver.NewFactory(),
filereceiver.NewFactory(),
flinkmetricsreceiver.NewFactory(),
fluentforwardreceiver.NewFactory(),
googlecloudpubsubreceiver.NewFactory(),
Expand Down Expand Up @@ -323,7 +317,6 @@ func components() (otelcol.Factories, error) {
kafkaexporter.NewFactory(),
loadbalancingexporter.NewFactory(),
opencensusexporter.NewFactory(),
parquetexporter.NewFactory(),
prometheusexporter.NewFactory(),
prometheusremotewriteexporter.NewFactory(),
zipkinexporter.NewFactory(),
Expand Down Expand Up @@ -353,8 +346,6 @@ func components() (otelcol.Factories, error) {
resourceprocessor.NewFactory(),
routingprocessor.NewFactory(),
schemaprocessor.NewFactory(),
servicegraphprocessor.NewFactory(),
spanmetricsprocessor.NewFactory(),
spanprocessor.NewFactory(),
tailsamplingprocessor.NewFactory(),
transformprocessor.NewFactory(),
Expand All @@ -372,6 +363,7 @@ func components() (otelcol.Factories, error) {
datadogconnector.NewFactory(),
exceptionsconnector.NewFactory(),
routingconnector.NewFactory(),
// it is previously in processor now it is in connector
servicegraphconnector.NewFactory(),
spanmetricsconnector.NewFactory(),
}
Expand All @@ -394,7 +386,6 @@ func CoreComponents() (

extensions, err := extension.MakeFactoryMap(
zpagesextension.NewFactory(),
ballastextension.NewFactory(),
)
errs = multierr.Append(errs, err)

Expand All @@ -404,7 +395,7 @@ func CoreComponents() (
errs = multierr.Append(errs, err)

exporters, err := exporter.MakeFactoryMap(
loggingexporter.NewFactory(),
debugexporter.NewFactory(),
otlpexporter.NewFactory(),
otlphttpexporter.NewFactory(),
)
Expand Down
15 changes: 1 addition & 14 deletions exporter/clickhouseprofileexporter/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,30 +10,17 @@ import (
type Config struct {
exporterhelper.TimeoutSettings `mapstructure:",squash"`
configretry.BackOffConfig `mapstructure:"retry_on_failure"`
QueueSettings `mapstructure:"sending_queue"`
exporterhelper.QueueSettings `mapstructure:"sending_queue"`

// DSN is the ClickHouse server Data Source Name.
// For tcp protocol reference: [ClickHouse/clickhouse-go#dsn](https://github.com/ClickHouse/clickhouse-go#dsn).
// For http protocol reference: [mailru/go-clickhouse/#dsn](https://github.com/mailru/go-clickhouse/#dsn).
Dsn string `mapstructure:"dsn"`
}

type QueueSettings struct {
// Length of the sending queue
QueueSize int `mapstructure:"queue_size"`
}

var _ component.Config = (*Config)(nil)

// Checks that the receiver configuration is valid
func (cfg *Config) Validate() error {
return nil
}

func (cfg *Config) enforceQueueSettings() exporterhelper.QueueSettings {
return exporterhelper.QueueSettings{
Enabled: true,
NumConsumers: 1,
QueueSize: cfg.QueueSettings.QueueSize,
}
}
6 changes: 3 additions & 3 deletions exporter/clickhouseprofileexporter/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ const (
func createDefaultConfig() component.Config {
return &Config{
TimeoutSettings: exporterhelper.NewDefaultTimeoutSettings(),
QueueSettings: QueueSettings{QueueSize: exporterhelper.NewDefaultQueueSettings().QueueSize},
QueueSettings: exporterhelper.NewDefaultQueueSettings(),
BackOffConfig: configretry.NewDefaultBackOffConfig(),
Dsn: defaultDsn,
}
Expand All @@ -37,7 +37,7 @@ func createLogsExporter(ctx context.Context, set exporter.CreateSettings, cfg co
cfg,
exp.send,
exporterhelper.WithShutdown(exp.Shutdown),
exporterhelper.WithQueue(c.enforceQueueSettings()),
exporterhelper.WithQueue(c.QueueSettings),
exporterhelper.WithTimeout(c.TimeoutSettings),
exporterhelper.WithRetry(c.BackOffConfig),
)
Expand All @@ -46,7 +46,7 @@ func createLogsExporter(ctx context.Context, set exporter.CreateSettings, cfg co
// Creates a factory for the clickhouse profile exporter.
func NewFactory() exporter.Factory {
return exporter.NewFactory(
typeStr,
component.MustNewType(typeStr),
createDefaultConfig,
exporter.WithLogs(createLogsExporter, component.StabilityLevelAlpha),
)
Expand Down
18 changes: 1 addition & 17 deletions exporter/qrynexporter/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,9 +28,7 @@ const (
type Config struct {
exporterhelper.TimeoutSettings `mapstructure:",squash"`
configretry.BackOffConfig `mapstructure:"retry_on_failure"`
// QueueSettings is a subset of exporterhelper.QueueSettings,
// because only QueueSize is user-settable.
QueueSettings QueueSettings `mapstructure:"sending_queue"`
exporterhelper.QueueSettings `mapstructure:"sending_queue"`

ClusteredClickhouse bool `mapstructure:"clustered_clickhouse"`

Expand All @@ -45,12 +43,6 @@ type Config struct {
Metrics MetricsConfig `mapstructure:"metrics"`
}

// QueueSettings is a subset of exporterhelper.QueueSettings.
type QueueSettings struct {
// QueueSize set the length of the sending queue
QueueSize int `mapstructure:"queue_size"`
}

// LogsConfig holds the configuration for log data.
type LogsConfig struct {
// AttributeLabels is the string representing attribute labels.
Expand All @@ -74,11 +66,3 @@ var _ component.Config = (*Config)(nil)
func (cfg *Config) Validate() error {
return nil
}

func (cfg *Config) enforcedQueueSettings() exporterhelper.QueueSettings {
return exporterhelper.QueueSettings{
Enabled: true,
NumConsumers: 1,
QueueSize: cfg.QueueSettings.QueueSize,
}
}
7 changes: 5 additions & 2 deletions exporter/qrynexporter/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,8 +45,11 @@ func TestLoadConfig(t *testing.T) {
RandomizationFactor: 0.5,
Multiplier: 1.5,
},
QueueSettings: QueueSettings{
QueueSize: 100,
QueueSettings: exporterhelper.QueueSettings{
Enabled: true,
QueueSize: 100,
NumConsumers: 10,
StorageID: nil,
},
},
},
Expand Down
20 changes: 10 additions & 10 deletions exporter/qrynexporter/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ const (
// NewFactory creates a factory for Logging exporter
func NewFactory() exporter.Factory {
return exporter.NewFactory(
typeStr,
component.MustNewType(typeStr),
createDefaultConfig,
exporter.WithTraces(createTracesExporter, stability),
exporter.WithLogs(createLogsExporter, stability),
Expand All @@ -42,7 +42,7 @@ func NewFactory() exporter.Factory {
func createDefaultConfig() component.Config {
return &Config{
TimeoutSettings: exporterhelper.NewDefaultTimeoutSettings(),
QueueSettings: QueueSettings{QueueSize: exporterhelper.NewDefaultQueueSettings().QueueSize},
QueueSettings: exporterhelper.NewDefaultQueueSettings(),
BackOffConfig: configretry.NewDefaultBackOffConfig(),
DSN: defaultDSN,
}
Expand All @@ -52,23 +52,23 @@ func createDefaultConfig() component.Config {
// Traces are directly insert into clickhouse.
func createTracesExporter(
ctx context.Context,
params exporter.CreateSettings,
set exporter.CreateSettings,
cfg component.Config,
) (exporter.Traces, error) {
c := cfg.(*Config)
oce, err := newTracesExporter(params.Logger, c)
oce, err := newTracesExporter(set.Logger, c, &set)
if err != nil {
return nil, fmt.Errorf("cannot configure qryn traces exporter: %w", err)
}

return exporterhelper.NewTracesExporter(
ctx,
params,
set,
cfg,
oce.pushTraceData,
exporterhelper.WithShutdown(oce.Shutdown),
exporterhelper.WithTimeout(c.TimeoutSettings),
exporterhelper.WithQueue(c.enforcedQueueSettings()),
exporterhelper.WithQueue(c.QueueSettings),
exporterhelper.WithRetry(c.BackOffConfig),
)
}
Expand All @@ -81,7 +81,7 @@ func createLogsExporter(
cfg component.Config,
) (exporter.Logs, error) {
c := cfg.(*Config)
exporter, err := newLogsExporter(set.Logger, c)
exporter, err := newLogsExporter(set.Logger, c, &set)
if err != nil {
return nil, fmt.Errorf("cannot configure qryn logs exporter: %w", err)
}
Expand All @@ -93,7 +93,7 @@ func createLogsExporter(
exporter.pushLogsData,
exporterhelper.WithShutdown(exporter.Shutdown),
exporterhelper.WithTimeout(c.TimeoutSettings),
exporterhelper.WithQueue(c.enforcedQueueSettings()),
exporterhelper.WithQueue(c.QueueSettings),
exporterhelper.WithRetry(c.BackOffConfig),
)
}
Expand All @@ -106,7 +106,7 @@ func createMetricsExporter(
cfg component.Config,
) (exporter.Metrics, error) {
c := cfg.(*Config)
exporter, err := newMetricsExporter(set.Logger, c)
exporter, err := newMetricsExporter(set.Logger, c, &set)
if err != nil {
return nil, fmt.Errorf("cannot configure qryn logs exporter: %w", err)
}
Expand All @@ -118,7 +118,7 @@ func createMetricsExporter(
exporter.pushMetricsData,
exporterhelper.WithShutdown(exporter.Shutdown),
exporterhelper.WithTimeout(c.TimeoutSettings),
exporterhelper.WithQueue(c.enforcedQueueSettings()),
exporterhelper.WithQueue(c.QueueSettings),
exporterhelper.WithRetry(c.BackOffConfig),
)
}
20 changes: 15 additions & 5 deletions exporter/qrynexporter/logs.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,10 @@ import (
"github.com/ClickHouse/clickhouse-go/v2"
"github.com/go-logfmt/logfmt"
"github.com/prometheus/common/model"
"go.opentelemetry.io/collector/exporter"
"go.opentelemetry.io/collector/pdata/pcommon"
"go.opentelemetry.io/collector/pdata/plog"
"go.opentelemetry.io/otel/metric"
"go.uber.org/zap"
)

Expand All @@ -30,6 +32,7 @@ const (

type logsExporter struct {
logger *zap.Logger
meter metric.Meter

db clickhouse.Conn

Expand All @@ -39,7 +42,7 @@ type logsExporter struct {
cluster bool
}

func newLogsExporter(logger *zap.Logger, cfg *Config) (*logsExporter, error) {
func newLogsExporter(logger *zap.Logger, cfg *Config, set *exporter.CreateSettings) (*logsExporter, error) {
opts, err := clickhouse.ParseDSN(cfg.DSN)
if err != nil {
return nil, err
Expand All @@ -48,14 +51,20 @@ func newLogsExporter(logger *zap.Logger, cfg *Config) (*logsExporter, error) {
if err != nil {
return nil, err
}
return &logsExporter{
exp := &logsExporter{
logger: logger,
meter: set.MeterProvider.Meter(typeStr),
db: db,
format: cfg.Logs.Format,
attributeLabels: cfg.Logs.AttributeLabels,
resourceLabels: cfg.Logs.ResourceLabels,
cluster: cfg.ClusteredClickhouse,
}, nil
}
if err := initMetrics(exp.meter); err != nil {
exp.logger.Error(fmt.Sprintf("failed to init metrics: %s", err.Error()))
return exp, err
}
return exp, nil
}

// Shutdown will shutdown the exporter.
Expand Down Expand Up @@ -433,11 +442,12 @@ func (e *logsExporter) pushLogsData(ctx context.Context, ld plog.Logs) error {
}

if err := batchSamplesAndTimeSeries(context.WithValue(ctx, "cluster", e.cluster), e.db, samples, timeSeries); err != nil {
otelcolExporterQrynBatchInsertDurationMillis.Record(ctx, time.Now().UnixMilli()-start.UnixMilli(), metric.WithAttributeSet(*newOtelcolAttrSetBatch(errorCodeError, dataTypeLogs)))
e.logger.Error(fmt.Sprintf("failed to insert batch: [%s]", err.Error()))
return err
}

otelcolExporterQrynBatchInsertDurationMillis.Record(ctx, time.Now().UnixMilli()-start.UnixMilli(), metric.WithAttributeSet(*newOtelcolAttrSetBatch(errorCodeSuccess, dataTypeLogs)))
e.logger.Debug("pushLogsData", zap.Int("samples", len(samples)), zap.Int("timeseries", len(timeSeries)), zap.String("cost", time.Since(start).String()))

return nil
}

Expand Down
Loading
Loading