Skip to content

Commit

Permalink
Merge pull request #15 from ihippik/add_kafka
Browse files Browse the repository at this point in the history
Add kafka
  • Loading branch information
ihippik authored Aug 1, 2023
2 parents f8911c8 + ea557ec commit ef2c226
Show file tree
Hide file tree
Showing 17 changed files with 367 additions and 162 deletions.
2 changes: 1 addition & 1 deletion Dockerfile
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
FROM golang:1.20.1-bullseye as builder
FROM golang:1.20.6-bullseye as builder
LABEL stage=builder
LABEL maintainer="Konstantin Makarov <[email protected]>"

Expand Down
15 changes: 9 additions & 6 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,9 @@ Then we filter out only the events we need and publish them in the queue

### Event publishing

NATS JetStream is used as a message broker.
As the message broker will be used is of your choice:
NATS JetStream [`type=nats`] or Apache Kafka [`type=kafka`].

Service publishes the following structure.
The name of the topic for subscription to receive messages is formed from the prefix of the topic,
the name of the database and the name of the table `prefix + schema_table`.
Expand All @@ -42,7 +44,7 @@ the name of the database and the name of the table `prefix + schema_table`.
}
```

Messages are published to NATS (JetStream) at least once!
Messages are published to the broker at least once!

### Filter configuration example

Expand Down Expand Up @@ -108,10 +110,11 @@ database:
user: postgres
password: postgres
debug: false
nats:
address: localhost:4222
streamName: "wal_listener"
topicPrefix: ""
publisher:
type: nats
address: localhost:4222
topic: "wal_listener"
topicPrefix: ""
monitoring:
sentryDSN: "dsn string"
promAddr: ":2112"
Expand Down
45 changes: 41 additions & 4 deletions cmd/wal-listener/init.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
"github.com/spf13/viper"

"github.com/ihippik/wal-listener/v2/config"
"github.com/ihippik/wal-listener/v2/publisher"
)

// logger log levels.
Expand Down Expand Up @@ -69,8 +70,8 @@ func initMetrics(addr string, logger *logrus.Entry) {
}
}

// initLogger init logrus preferences.
func initLogger(cfg config.LoggerCfg, version string) *logrus.Entry {
// initLogger init Logrus preferences.
func initLogger(cfg *config.LoggerCfg, version string) *logrus.Entry {
logger := logrus.New()

logger.SetReportCaller(cfg.Caller)
Expand Down Expand Up @@ -143,7 +144,7 @@ func initSentry(dsn string, logger *logrus.Entry) {
}

// initPgxConnections initialise db and replication connections.
func initPgxConnections(cfg config.DatabaseCfg) (*pgx.Conn, *pgx.ReplicationConn, error) {
func initPgxConnections(cfg *config.DatabaseCfg) (*pgx.Conn, *pgx.ReplicationConn, error) {
pgxConf := pgx.ConnConfig{
// TODO logger
LogLevel: pgx.LogLevelInfo,
Expand All @@ -170,6 +171,42 @@ func initPgxConnections(cfg config.DatabaseCfg) (*pgx.Conn, *pgx.ReplicationConn

type pgxLogger struct{}

func (l pgxLogger) Log(level pgx.LogLevel, msg string, data map[string]any) {
func (l pgxLogger) Log(_ pgx.LogLevel, msg string, _ map[string]any) {
logrus.Debugln(msg)
}

type eventPublisher interface {
Publish(string, publisher.Event) error
}

// factoryPublisher represents a factory function for creating a eventPublisher.
func factoryPublisher(cfg *config.PublisherCfg, logger *logrus.Entry) (eventPublisher, error) {
switch cfg.Type {
case config.PublisherTypeKafka:
producer, err := publisher.NewProducer(cfg)
if err != nil {
return nil, fmt.Errorf("kafka producer: %w", err)
}

return publisher.NewKafkaPublisher(producer), nil
case config.PublisherTypeNats:
natsConn, err := nats.Connect(cfg.Address)
if err != nil {
return nil, fmt.Errorf("nats connection: %w", err)
}
defer natsConn.Close()

js, err := natsConn.JetStream()
if err != nil {
return nil, fmt.Errorf("jet stream: %w", err)
}

if err := createStream(logger, js, cfg.Topic); err != nil {
return nil, fmt.Errorf("create Nats stream: %w", err)
}

return publisher.NewNatsPublisher(js), nil
default:
return nil, fmt.Errorf("unknown publisher type: %s", cfg.Type)
}
}
27 changes: 8 additions & 19 deletions cmd/wal-listener/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ import (
"fmt"
"os"

"github.com/nats-io/nats.go"
"github.com/sirupsen/logrus"
"github.com/urfave/cli/v2"

Expand All @@ -22,8 +21,8 @@ func main() {
version := getVersion()

app := &cli.App{
Name: "Wal-Listener",
Usage: "listen postgres events",
Name: "WAL-Listener",
Usage: "listen PostgreSQL events",
Version: version,
Flags: []cli.Flag{
&cli.StringFlag{
Expand All @@ -49,32 +48,22 @@ func main() {

go initMetrics(cfg.Monitoring.PromAddr, logger)

natsConn, err := nats.Connect(cfg.Nats.Address)
if err != nil {
return fmt.Errorf("nats connection: %w", err)
}
defer natsConn.Close()

js, err := natsConn.JetStream()
conn, rConn, err := initPgxConnections(cfg.Database)
if err != nil {
return fmt.Errorf("jet stream: %w", err)
}

if err := createStream(logger, js, cfg.Nats.StreamName); err != nil {
return fmt.Errorf("create Nats stream: %w", err)
return fmt.Errorf("pgx connection: %w", err)
}

conn, rConn, err := initPgxConnections(cfg.Database)
pub, err := factoryPublisher(cfg.Publisher, logger)
if err != nil {
return fmt.Errorf("pgx connection: %w", err)
return fmt.Errorf("factory publisher: %w", err)
}

service := listener.NewWalListener(
cfg,
logger,
listener.NewRepository(conn),
rConn,
listener.NewNatsPublisher(js),
pub,
listener.NewBinaryParser(binary.BigEndian),
)

Expand All @@ -87,6 +76,6 @@ func main() {
}

if err := app.Run(os.Args); err != nil {
logrus.Fatal(err)
logrus.Errorln(err)
}
}
30 changes: 21 additions & 9 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,13 +6,20 @@ import (
"github.com/asaskevich/govalidator"
)

type PublisherType string

const (
PublisherTypeNats PublisherType = "nats"
PublisherTypeKafka PublisherType = "kafka"
)

// Config for wal-listener/
type Config struct {
Listener ListenerCfg
Database DatabaseCfg
Nats NatsCfg
Logger LoggerCfg
Monitoring MonitoringCfg
Listener *ListenerCfg `valid:"required"`
Database *DatabaseCfg `valid:"required"`
Publisher *PublisherCfg `valid:"required"`
Logger *LoggerCfg `valid:"required"`
Monitoring *MonitoringCfg
}

// ListenerCfg path of the listener config.
Expand All @@ -25,11 +32,16 @@ type ListenerCfg struct {
TopicsMap map[string]string
}

// NatsCfg path of the NATS config.
type NatsCfg struct {
Address string `valid:"required"`
StreamName string `valid:"required"`
// PublisherCfg represent configuration for any types pulisher.
type PublisherCfg struct {
Type PublisherType `valid:"required"`
Address string `valid:"required"`
Topic string `valid:"required"`
TopicPrefix string
EnableTLS bool `json:"enable_tls"`
ClientCert string `json:"client_cert"`
ClientKey string `json:"client_key"`
CACert string `json:"ca_cert"`
}

// MonitoringCfg monitoring configuration.
Expand Down
87 changes: 41 additions & 46 deletions config/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,10 @@ import (

func TestConfig_Validate(t *testing.T) {
type fields struct {
Listener ListenerCfg
Database DatabaseCfg
Nats NatsCfg
Logger *LoggerCfg
Listener *ListenerCfg
Database *DatabaseCfg
Publisher *PublisherCfg
}
tests := []struct {
name string
Expand All @@ -21,22 +22,26 @@ func TestConfig_Validate(t *testing.T) {
{
name: "success",
fields: fields{
Listener: ListenerCfg{
Logger: &LoggerCfg{
Level: "info",
},
Listener: &ListenerCfg{
SlotName: "slot",
AckTimeout: 10,
RefreshConnection: 10,
HeartbeatInterval: 10,
},
Database: DatabaseCfg{
Database: &DatabaseCfg{
Host: "host",
Port: 10,
Name: "db",
User: "usr",
Password: "pass",
},
Nats: NatsCfg{
Publisher: &PublisherCfg{
Type: "kafka",
Address: "addr",
StreamName: "stream",
Topic: "stream",
TopicPrefix: "prefix",
},
},
Expand All @@ -45,20 +50,24 @@ func TestConfig_Validate(t *testing.T) {
{
name: "bad listener cfg",
fields: fields{
Listener: ListenerCfg{
Logger: &LoggerCfg{
Level: "info",
},
Listener: &ListenerCfg{
SlotName: "",
HeartbeatInterval: 10,
},
Database: DatabaseCfg{
Database: &DatabaseCfg{
Host: "host",
Port: 10,
Name: "db",
User: "usr",
Password: "pass",
},
Nats: NatsCfg{
Publisher: &PublisherCfg{
Type: "kafka",
Address: "addr",
StreamName: "stream",
Topic: "stream",
TopicPrefix: "prefix",
},
},
Expand All @@ -67,78 +76,64 @@ func TestConfig_Validate(t *testing.T) {
{
name: "bad db cfg",
fields: fields{
Listener: ListenerCfg{
Logger: &LoggerCfg{
Level: "info",
},
Listener: &ListenerCfg{
SlotName: "slot",
AckTimeout: 10,
RefreshConnection: 10,
HeartbeatInterval: 10,
},
Database: DatabaseCfg{
Database: &DatabaseCfg{
Name: "db",
User: "usr",
Password: "pass",
},
Nats: NatsCfg{
Publisher: &PublisherCfg{
Type: "kafka",
Address: "addr",
StreamName: "stream",
Topic: "stream",
TopicPrefix: "prefix",
},
},
wantErr: errors.New("Database.Host: non zero value required;Database.Port: non zero value required"),
},
{
name: "empty nats addr cfg",
name: "empty publisher kind",
fields: fields{
Listener: ListenerCfg{
SlotName: "slot",
AckTimeout: 10,
RefreshConnection: 10,
HeartbeatInterval: 10,
Logger: &LoggerCfg{
Level: "info",
},
Database: DatabaseCfg{
Host: "host",
Port: 10,
Name: "db",
User: "usr",
Password: "pass",
},
Nats: NatsCfg{
StreamName: "stream",
TopicPrefix: "prefix",
},
},
wantErr: errors.New("Nats.Address: non zero value required"),
},
{
name: "empty nats stream cfg",
fields: fields{
Listener: ListenerCfg{
Listener: &ListenerCfg{
SlotName: "slot",
AckTimeout: 10,
RefreshConnection: 10,
HeartbeatInterval: 10,
},
Database: DatabaseCfg{
Database: &DatabaseCfg{
Host: "host",
Port: 10,
Name: "db",
User: "usr",
Password: "pass",
},
Nats: NatsCfg{
Address: "addr",
Publisher: &PublisherCfg{
Topic: "stream",
TopicPrefix: "prefix",
},
},
wantErr: errors.New("Nats.StreamName: non zero value required"),
wantErr: errors.New("Publisher.Address: non zero value required;Publisher.Type: non zero value required"),
},
}

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
c := Config{
Listener: tt.fields.Listener,
Database: tt.fields.Database,
Nats: tt.fields.Nats,
Logger: tt.fields.Logger,
Listener: tt.fields.Listener,
Database: tt.fields.Database,
Publisher: tt.fields.Publisher,
}
err := c.Validate()
if err != nil && assert.Error(t, tt.wantErr, err.Error()) {
Expand Down
Loading

0 comments on commit ef2c226

Please sign in to comment.