Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Kafka Consumer Worker #320

Merged
merged 39 commits into from
Oct 18, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
39 commits
Select commit Hold shift + click to select a range
10d9970
feat(sink): worker
hekike Oct 7, 2023
35b489c
feat(worker): clickhouse insert
hekike Oct 7, 2023
89d429a
feat(sink): run
hekike Oct 8, 2023
8914b74
feat(sink): get meters from ch
hekike Oct 9, 2023
f28454a
feat(sink): periodic namespace fetch
hekike Oct 9, 2023
87c1b77
feat(sink): implement deadletter
hekike Oct 9, 2023
b14d781
refactor(sink): consolidate deduplicator
hekike Oct 10, 2023
0bc570f
feat(sink): manual offset tracking
hekike Oct 12, 2023
cd9f093
feat(sink): remove comment
hekike Oct 12, 2023
dbcd2c8
feat(sink): interval
hekike Oct 12, 2023
fe19eb4
feat(sink): consumer rebelancing
hekike Oct 13, 2023
52aa322
fix(sink): offset
hekike Oct 14, 2023
9a4f48c
docs(sink): offset
hekike Oct 14, 2023
d818b74
feat(cmd): server and sink-worker
hekike Oct 16, 2023
721a090
feat(sink): lint
hekike Oct 16, 2023
e6400f6
test(dedupe): fix
hekike Oct 16, 2023
9015c32
chore(docker): fix image
hekike Oct 16, 2023
c773dc3
chore(ci): fix quickstart
hekike Oct 16, 2023
52ad13b
chore(ci): fix quickstart
hekike Oct 16, 2023
ceaa18a
feat(sink): configure via viper
hekike Oct 16, 2023
98cae6b
test(config): fix
hekike Oct 16, 2023
854b1d8
feat(sink): optional deduplicator
hekike Oct 16, 2023
e22596c
fix(sink): add run group
hekike Oct 16, 2023
a8834a6
chore(docker): one image
hekike Oct 17, 2023
742be90
refactor(sink): pr comments
hekike Oct 17, 2023
a57a811
chore(ci): fix quickstart
hekike Oct 17, 2023
8391d39
feat(sink): sink specific dedupe
hekike Oct 17, 2023
a476c15
docs(sink): pr comment
hekike Oct 17, 2023
27b499e
refactor(sink): organize code
hekike Oct 17, 2023
61f54bb
chore(make): assign different teleemtry port to sink worker
hekike Oct 17, 2023
a34c850
feat(sink): use meter repository
hekike Oct 17, 2023
c5de155
feat(sink): make group id configurable
hekike Oct 17, 2023
3031383
feat(sink): export sink in openmeter
hekike Oct 17, 2023
d49ac35
test(sink): fix config test
hekike Oct 17, 2023
0a5a2ed
test(sink): add
hekike Oct 18, 2023
2a67bc9
test(sink): add tests
hekike Oct 18, 2023
0d922ef
test(sink): fix flaky tests
hekike Oct 18, 2023
8f403b9
fix(server): add back dedupe
hekike Oct 18, 2023
b8d35e2
feat(config): only pickup namespace from namespace.default config
hekike Oct 18, 2023
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 11 additions & 4 deletions .github/workflows/ci.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,11 @@ jobs:
- name: Prepare Nix shell
run: nix develop --impure .#ci

- name: Build
run: nix develop --impure .#ci -c make build
- name: Build Server
run: nix develop --impure .#ci -c make build-server

- name: Build Sink Worker
run: nix develop --impure .#ci -c make build-sink-worker

test:
name: Test
Expand Down Expand Up @@ -159,8 +162,12 @@ jobs:
run: docker compose -f docker-compose.yaml -f docker-compose.ci.yaml up -d
working-directory: quickstart

- name: Wait for OpenMeter to become ready
run: curl --retry 10 --retry-max-time 120 --retry-all-errors http://localhost:8888/version
- name: Wait for worker to become ready
run: curl --retry 10 --retry-max-time 120 --retry-all-errors http://localhost:10000/healthz

# This is probably overkill but it can happen that OpenMeter creates namespaces slower
- name: Wait to pickup namespace
run: sleep 5

- name: Ingest sample data
run: ./ingest.sh
Expand Down
10 changes: 9 additions & 1 deletion Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -27,14 +27,20 @@ ARG VERSION

# See https://github.com/confluentinc/confluent-kafka-go#librdkafka
# See https://github.com/confluentinc/confluent-kafka-go#static-builds-on-linux
RUN go build -ldflags "-linkmode external -extldflags \"-static\" -X main.version=${VERSION}" -tags musl -o /usr/local/bin/openmeter .
# Build server binary (default)
RUN go build -ldflags "-linkmode external -extldflags \"-static\" -X main.version=${VERSION}" -tags musl -o /usr/local/bin/openmeter ./cmd/server
hekike marked this conversation as resolved.
Show resolved Hide resolved
RUN xx-verify /usr/local/bin/openmeter

# Build sink-worker binary
RUN go build -ldflags "-linkmode external -extldflags \"-static\" -X main.version=${VERSION}" -tags musl -o /usr/local/bin/openmeter-sink-worker ./cmd/sink-worker
RUN xx-verify /usr/local/bin/openmeter-sink-worker

FROM gcr.io/distroless/base-debian11:latest@sha256:b31a6e02605827e77b7ebb82a0ac9669ec51091edd62c2c076175e05556f4ab9 AS distroless

COPY --from=builder /etc/ssl/certs/ca-certificates.crt /etc/ssl/certs/

COPY --from=builder /usr/local/bin/openmeter /usr/local/bin/
COPY --from=builder /usr/local/bin/openmeter-sink-worker /usr/local/bin/
COPY --from=builder /usr/local/src/openmeter/go.* /usr/local/src/openmeter/

CMD openmeter
Expand All @@ -44,6 +50,7 @@ FROM redhat/ubi8-micro:8.8-7@sha256:6fa456671239c7ac791dac2537425d1ba5612df36355
COPY --from=builder /etc/ssl/certs/ca-certificates.crt /etc/ssl/certs/

COPY --from=builder /usr/local/bin/openmeter /usr/local/bin/
COPY --from=builder /usr/local/bin/openmeter-sink-worker /usr/local/bin/
COPY --from=builder /usr/local/src/openmeter/go.* /usr/local/src/openmeter/

CMD openmeter
Expand All @@ -55,6 +62,7 @@ RUN apk add --update --no-cache ca-certificates tzdata bash
SHELL ["/bin/bash", "-c"]

COPY --from=builder /usr/local/bin/openmeter /usr/local/bin/
COPY --from=builder /usr/local/bin/openmeter-sink-worker /usr/local/bin/
COPY --from=builder /usr/local/src/openmeter/go.* /usr/local/src/openmeter/

CMD openmeter
23 changes: 17 additions & 6 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -15,19 +15,30 @@ generate: ## Generate code
$(call print-target)
go generate ./...

.PHONY: build
build: ## Build binary
.PHONY: build-server
build-server: ## Build server binary
$(call print-target)
go build -tags dynamic -o build/ .
go build -tags dynamic -o build/server ./cmd/server

.PHONY: build-sink-worker
build-sink-worker: ## Build binary
$(call print-target)
go build -tags dynamic -o build/sink-worker ./cmd/sink-worker

config.yaml:
cp config.example.yaml config.yaml

run: config.yaml
run: ## Run OpenMeter
.PHONY: server
server: ## Run sink-worker
@ if [ config.yaml -ot config.example.yaml ]; then diff -u config.yaml config.example.yaml || (echo "!!! The configuration example changed. Please update your config.yaml file accordingly (or at least touch it). !!!" && false); fi
$(call print-target)
air -c ./cmd/server/.air.toml

.PHONY: sink-worker
sink-worker: ## Run sink-worker
@ if [ config.yaml -ot config.example.yaml ]; then diff -u config.yaml config.example.yaml || (echo "!!! The configuration example changed. Please update your config.yaml file accordingly (or at least touch it). !!!" && false); fi
$(call print-target)
air
air -c ./cmd/sink-worker/.air.toml

.PHONY: test
test: ## Run tests
Expand Down
6 changes: 3 additions & 3 deletions .air.toml → cmd/server/.air.toml
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,9 @@ testdata_dir = "testdata"
tmp_dir = "tmp"

[build]
args_bin = ["--config", "config.yaml"]
bin = "./tmp/openmeter"
cmd = "go build -o ./tmp/openmeter ."
args_bin = ["--config", "./config.yaml"]
bin = "./tmp/openmeter-server"
cmd = "go build -o ./tmp/openmeter-server ./cmd/server"
delay = 0
exclude_dir = ["assets", "ci", "deploy", "docs", "examples", "testdata", "quickstart", "tmp", "vendor", "api/client", "node_modules"]
exclude_file = []
Expand Down
48 changes: 10 additions & 38 deletions main.go → cmd/server/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,12 +41,10 @@ import (
"github.com/openmeterio/openmeter/internal/namespace"
"github.com/openmeterio/openmeter/internal/server"
"github.com/openmeterio/openmeter/internal/server/router"
"github.com/openmeterio/openmeter/internal/sink"
"github.com/openmeterio/openmeter/internal/streaming"
"github.com/openmeterio/openmeter/internal/streaming/clickhouse_connector"
"github.com/openmeterio/openmeter/pkg/gosundheit"
pkgkafka "github.com/openmeterio/openmeter/pkg/kafka"
"github.com/openmeterio/openmeter/pkg/kafkaconnect"
"github.com/openmeterio/openmeter/pkg/models"
"github.com/openmeterio/openmeter/pkg/slicesx"
)
Expand Down Expand Up @@ -178,15 +176,6 @@ func main() {
return *meter
}))

// Initialize Kafka Connect sink
if conf.Sink.KafkaConnect.Enabled {
err := initKafkaConnect(conf.Sink.KafkaConnect, logger)
if err != nil {
logger.Error("failed to initialize kafka connect sink", "error", err)
os.Exit(1)
}
}

// Initialize ClickHouse Aggregation
clickhouseStreamingConnector, err := initClickHouseStreaming(conf, meterRepository, logger)
if err != nil {
Expand Down Expand Up @@ -359,7 +348,7 @@ func initKafkaIngest(ctx context.Context, config config.Configuration, logger *s
return collector, namespaceHandler, nil
}

func initClickHouseStreaming(config config.Configuration, meterRepository meter.Repository, logger *slog.Logger) (*clickhouse_connector.ClickhouseConnector, error) {
func initClickHouseClient(config config.Configuration) (clickhouse.Conn, error) {
options := &clickhouse.Options{
Addr: []string{config.Aggregation.ClickHouse.Address},
Auth: clickhouse.Auth{
Expand All @@ -386,6 +375,15 @@ func initClickHouseStreaming(config config.Configuration, meterRepository meter.
return nil, fmt.Errorf("init clickhouse client: %w", err)
}

return clickHouseClient, nil
}

func initClickHouseStreaming(config config.Configuration, meterRepository meter.Repository, logger *slog.Logger) (*clickhouse_connector.ClickhouseConnector, error) {
clickHouseClient, err := initClickHouseClient(config)
if err != nil {
return nil, fmt.Errorf("init clickhouse client: %w", err)
}

streamingConnector, err := clickhouse_connector.NewClickhouseConnector(clickhouse_connector.ClickhouseConnectorConfig{
Logger: logger,
ClickHouse: clickHouseClient,
Expand All @@ -399,32 +397,6 @@ func initClickHouseStreaming(config config.Configuration, meterRepository meter.
return streamingConnector, nil
}

func initKafkaConnect(config config.KafkaConnectSinkConfiguration, logger *slog.Logger) error {
client, err := kafkaconnect.NewClient(config.URL)
if err != nil {
return fmt.Errorf("init kafka connect client: %w", err)
}

kafkaConnectSink := sink.KafkaConnect{
Client: client,
Logger: logger.With(slog.String("subsystem", "sink.kafkaconnect")),
}

for _, connector := range config.Connectors {
connectorConfig, err := connector.ConnectorConfig()
if err != nil {
return fmt.Errorf("create kafka connector config %q: %w", connector.Name, err)
}

err = kafkaConnectSink.ConfigureConnector(context.Background(), connector.Name, connectorConfig)
if err != nil {
return fmt.Errorf("init kafka connector %q: %w", connector.Name, err)
}
}

return nil
}

func initNamespace(config config.Configuration, namespaces ...namespace.Handler) (*namespace.Manager, error) {
namespaceManager, err := namespace.NewManager(namespace.ManagerConfig{
Handlers: namespaces,
Expand Down
File renamed without changes.
44 changes: 44 additions & 0 deletions cmd/sink-worker/.air.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
root = "."
testdata_dir = "testdata"
tmp_dir = "tmp"

[build]
args_bin = ["--config", "./config.yaml", "--telemetry-address", ":1001"]
bin = "./tmp/openmeter-sink-worker"
cmd = "go build -o ./tmp/openmeter-sink-worker ./cmd/sink-worker"
delay = 0
exclude_dir = ["assets", "ci", "deploy", "docs", "examples", "testdata", "quickstart", "tmp", "vendor", "api/client", "node_modules"]
exclude_file = []
exclude_regex = ["_test.go"]
exclude_unchanged = false
follow_symlink = false
full_bin = ""
include_dir = []
include_ext = ["go", "tpl", "tmpl", "html", "yml", "yaml", "sql", "json"]
include_file = []
kill_delay = "0s"
log = "build-errors.log"
poll = false
poll_interval = 0
rerun = false
rerun_delay = 500
send_interrupt = false
stop_on_error = false

[color]
app = ""
build = "yellow"
main = "magenta"
runner = "green"
watcher = "cyan"

[log]
main_only = false
time = false

[misc]
clean_on_exit = false

[screen]
clear_on_rebuild = false
keep_scroll = true
Loading
Loading