Skip to content

Commit

Permalink
Fix NATS connection
Browse files Browse the repository at this point in the history
  • Loading branch information
ihippik committed Oct 7, 2023
1 parent 409092a commit f3a30bb
Show file tree
Hide file tree
Showing 5 changed files with 66 additions and 35 deletions.
3 changes: 3 additions & 0 deletions Makefile
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
.PHONE: build
build:
docker buildx build --platform linux/amd64,linux/arm64 --push -t ihippik/wal-listener .
43 changes: 11 additions & 32 deletions cmd/wal-listener/init.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,34 +4,12 @@ import (
"fmt"
"log/slog"

"github.com/ihippik/wal-listener/v2/config"
"github.com/ihippik/wal-listener/v2/publisher"
"github.com/jackc/pgx"
"github.com/nats-io/nats.go"
)

// createStream creates a stream by using JetStreamContext. We can do it manually.
func createStream(logger *slog.Logger, js nats.JetStreamContext, streamName string) error {
stream, err := js.StreamInfo(streamName)
if err != nil {
logger.Warn("failed to get stream info", "err", err)
}

if stream == nil {
var streamSubjects = streamName + ".*"

if _, err = js.AddStream(&nats.StreamConfig{
Name: streamName,
Subjects: []string{streamSubjects},
}); err != nil {
return fmt.Errorf("add stream: %w", err)
}

logger.Info("stream not exists, created", slog.String("subjects", streamSubjects))
}

return nil
}
"github.com/ihippik/wal-listener/v2/config"
"github.com/ihippik/wal-listener/v2/publisher"
)

// initPgxConnections initialise db and replication connections.
func initPgxConnections(cfg *config.DatabaseCfg, logger *slog.Logger) (*pgx.Conn, *pgx.ReplicationConn, error) {
Expand Down Expand Up @@ -63,12 +41,14 @@ type pgxLogger struct {
logger *slog.Logger
}

// Log DB message.
func (l pgxLogger) Log(_ pgx.LogLevel, msg string, _ map[string]any) {
l.logger.Debug(msg)
}

type eventPublisher interface {
Publish(string, publisher.Event) error
Close() error
}

// factoryPublisher represents a factory function for creating a eventPublisher.
Expand All @@ -82,22 +62,21 @@ func factoryPublisher(cfg *config.PublisherCfg, logger *slog.Logger) (eventPubli

return publisher.NewKafkaPublisher(producer), nil
case config.PublisherTypeNats:
natsConn, err := nats.Connect(cfg.Address)
conn, err := nats.Connect(cfg.Address)
if err != nil {
return nil, fmt.Errorf("nats connection: %w", err)
}
defer natsConn.Close()

js, err := natsConn.JetStream()
pub, err := publisher.NewNatsPublisher(conn, logger)
if err != nil {
return nil, fmt.Errorf("jet stream: %w", err)
return nil, fmt.Errorf("new nats publisher: %w", err)
}

if err := createStream(logger, js, cfg.Topic); err != nil {
return nil, fmt.Errorf("create Nats stream: %w", err)
if err := pub.CreateStream(cfg.Topic); err != nil {
return nil, fmt.Errorf("create stream: %w", err)
}

return publisher.NewNatsPublisher(js), nil
return pub, err
default:
return nil, fmt.Errorf("unknown publisher type: %s", cfg.Type)
}
Expand Down
6 changes: 6 additions & 0 deletions cmd/wal-listener/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,12 @@ func main() {
return fmt.Errorf("factory publisher: %w", err)
}

defer func() {
if err := pub.Close(); err != nil {
slog.Error("close publisher failed", "err", err.Error())
}
}()

service := listener.NewWalListener(
cfg,
logger,
Expand Down
5 changes: 5 additions & 0 deletions publisher/kafka.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,11 @@ func (p *KafkaPublisher) Publish(s string, event Event) error {
return err
}

// Close connection close.
func (p *KafkaPublisher) Close() error {
return p.producer.Close()
}

// NewProducer return new Kafka producer instance.
func NewProducer(pCfg *config.PublisherCfg) (sarama.SyncProducer, error) {
cfg := sarama.NewConfig()
Expand Down
44 changes: 41 additions & 3 deletions publisher/nats.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,18 +2,33 @@ package publisher

import (
"fmt"
"log/slog"

"github.com/goccy/go-json"
"github.com/nats-io/nats.go"
)

// NatsPublisher represent event publisher.
type NatsPublisher struct {
js nats.JetStreamContext
conn *nats.Conn
js nats.JetStreamContext
logger *slog.Logger
}

// NewNatsPublisher return new NatsPublisher instance.
func NewNatsPublisher(js nats.JetStreamContext) *NatsPublisher {
return &NatsPublisher{js: js}
func NewNatsPublisher(conn *nats.Conn, logger *slog.Logger) (*NatsPublisher, error) {
js, err := conn.JetStream()
if err != nil {
return nil, fmt.Errorf("jet stream: %w", err)
}

return &NatsPublisher{conn: conn, js: js, logger: logger}, nil
}

// Close connection.
func (n NatsPublisher) Close() error {
n.conn.Close()
return nil
}

// Publish serializes the event and publishes it on the bus.
Expand All @@ -29,3 +44,26 @@ func (n NatsPublisher) Publish(subject string, event Event) error {

return nil
}

// CreateStream creates a stream by using JetStreamContext. We can do it manually.
func (n NatsPublisher) CreateStream(streamName string) error {
stream, err := n.js.StreamInfo(streamName)
if err != nil {
n.logger.Warn("failed to get stream info", "err", err)
}

if stream == nil {
var streamSubjects = streamName + ".*"

if _, err = n.js.AddStream(&nats.StreamConfig{
Name: streamName,
Subjects: []string{streamSubjects},
}); err != nil {
return fmt.Errorf("add stream: %w", err)
}

n.logger.Info("stream not exists, created", slog.String("subjects", streamSubjects))
}

return nil
}

0 comments on commit f3a30bb

Please sign in to comment.