Skip to content

Commit

Permalink
Merge pull request #17 from ihippik/new_golang
Browse files Browse the repository at this point in the history
Bump Golang version
  • Loading branch information
ihippik authored Sep 17, 2023
2 parents ef2c226 + 201c33b commit f233f9c
Show file tree
Hide file tree
Showing 12 changed files with 219 additions and 297 deletions.
3 changes: 1 addition & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -100,9 +100,8 @@ listener:
topicsMap:
schema_table_name: "notifier"
logger:
caller: false
level: info
format: json
fmt: json
database:
host: localhost
port: 5432
Expand Down
136 changes: 14 additions & 122 deletions cmd/wal-listener/init.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,109 +2,19 @@ package main

import (
"fmt"
"net/http"
"runtime/debug"

"github.com/evalphobia/logrus_sentry"
"github.com/jackc/pgx"
"github.com/nats-io/nats.go"
"github.com/prometheus/client_golang/prometheus/promhttp"
"github.com/sirupsen/logrus"
"github.com/spf13/viper"
"log/slog"

"github.com/ihippik/wal-listener/v2/config"
"github.com/ihippik/wal-listener/v2/publisher"
"github.com/jackc/pgx"
"github.com/nats-io/nats.go"
)

// logger log levels.
const (
warningLoggerLevel = "warning"
errorLoggerLevel = "error"
fatalLoggerLevel = "fatal"
infoLoggerLevel = "info"
)

func getVersion() string {
var version = "unknown"

info, ok := debug.ReadBuildInfo()
if ok {
for _, item := range info.Settings {
if item.Key == "vcs.revision" {
version = item.Value[:4]
}
}
}

return version
}

// getConf load config from file.
func getConf(path string) (*config.Config, error) {
var cfg config.Config

viper.SetConfigFile(path)

if err := viper.ReadInConfig(); err != nil {
return nil, fmt.Errorf("error reading config: %w", err)
}

if err := viper.Unmarshal(&cfg); err != nil {
return nil, fmt.Errorf("unable to decode into config struct: %w", err)
}

return &cfg, nil
}

func initMetrics(addr string, logger *logrus.Entry) {
if len(addr) == 0 {
return
}

logger.WithField("addr", addr).Infoln("metrics handler")

http.Handle("/metrics", promhttp.Handler())
if err := http.ListenAndServe(addr, nil); err != nil {
logger.WithError(err).Errorln("init metrics handler")
return
}
}

// initLogger init Logrus preferences.
func initLogger(cfg *config.LoggerCfg, version string) *logrus.Entry {
logger := logrus.New()

logger.SetReportCaller(cfg.Caller)

if cfg.Format == "json" {
logger.SetFormatter(&logrus.JSONFormatter{})
}

var level logrus.Level

switch cfg.Level {
case warningLoggerLevel:
level = logrus.WarnLevel
case errorLoggerLevel:
level = logrus.ErrorLevel
case fatalLoggerLevel:
level = logrus.FatalLevel
case infoLoggerLevel:
level = logrus.InfoLevel
default:
level = logrus.DebugLevel
}

logger.SetLevel(level)

return logger.WithField("version", version)
}

// createStream creates a stream by using JetStreamContext. We can do it manually.
func createStream(logger *logrus.Entry, js nats.JetStreamContext, streamName string) error {
func createStream(logger *slog.Logger, js nats.JetStreamContext, streamName string) error {
stream, err := js.StreamInfo(streamName)
if err != nil {
logger.WithError(err).Warnln("stream info")
logger.Warn("failed to get stream info", "err", err)
}

if stream == nil {
Expand All @@ -114,41 +24,21 @@ func createStream(logger *logrus.Entry, js nats.JetStreamContext, streamName str
Name: streamName,
Subjects: []string{streamSubjects},
}); err != nil {
return err
return fmt.Errorf("add stream: %w", err)
}

logger.WithField("subjects", streamSubjects).Infoln("stream not exists, created..")
logger.Info("stream not exists, created", slog.String("subjects", streamSubjects))
}

return nil
}

func initSentry(dsn string, logger *logrus.Entry) {
if len(dsn) == 0 {
logger.Warnln("empty Sentry DSN")
return
}

hook, err := logrus_sentry.NewSentryHook(
dsn,
[]logrus.Level{
logrus.PanicLevel,
logrus.FatalLevel,
logrus.ErrorLevel,
},
)

if err == nil {
logger.Logger.AddHook(hook)
}
}

// initPgxConnections initialise db and replication connections.
func initPgxConnections(cfg *config.DatabaseCfg) (*pgx.Conn, *pgx.ReplicationConn, error) {
func initPgxConnections(cfg *config.DatabaseCfg, logger *slog.Logger) (*pgx.Conn, *pgx.ReplicationConn, error) {
pgxConf := pgx.ConnConfig{
// TODO logger
LogLevel: pgx.LogLevelInfo,
Logger: pgxLogger{},
Logger: pgxLogger{logger},
Host: cfg.Host,
Port: cfg.Port,
Database: cfg.Name,
Expand All @@ -169,18 +59,20 @@ func initPgxConnections(cfg *config.DatabaseCfg) (*pgx.Conn, *pgx.ReplicationCon
return pgConn, rConnection, nil
}

type pgxLogger struct{}
type pgxLogger struct {
logger *slog.Logger
}

func (l pgxLogger) Log(_ pgx.LogLevel, msg string, _ map[string]any) {
logrus.Debugln(msg)
l.logger.Debug(msg)
}

type eventPublisher interface {
Publish(string, publisher.Event) error
}

// factoryPublisher represents a factory function for creating a eventPublisher.
func factoryPublisher(cfg *config.PublisherCfg, logger *logrus.Entry) (eventPublisher, error) {
func factoryPublisher(cfg *config.PublisherCfg, logger *slog.Logger) (eventPublisher, error) {
switch cfg.Type {
case config.PublisherTypeKafka:
producer, err := publisher.NewProducer(cfg)
Expand Down
22 changes: 13 additions & 9 deletions cmd/wal-listener/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,13 @@ package main
import (
"encoding/binary"
"fmt"
"log/slog"
"os"

"github.com/sirupsen/logrus"
scfg "github.com/ihippik/config"
"github.com/urfave/cli/v2"

"github.com/ihippik/wal-listener/v2/config"
"github.com/ihippik/wal-listener/v2/listener"
)

Expand All @@ -18,7 +20,7 @@ func main() {
Usage: "print only the version",
}

version := getVersion()
version := scfg.GetVersion()

app := &cli.App{
Name: "WAL-Listener",
Expand All @@ -33,7 +35,7 @@ func main() {
},
},
Action: func(c *cli.Context) error {
cfg, err := getConf(c.String("config"))
cfg, err := config.InitConfig(c.String("config"))
if err != nil {
return fmt.Errorf("get config: %w", err)
}
Expand All @@ -42,13 +44,15 @@ func main() {
return fmt.Errorf("validate config: %w", err)
}

logger := initLogger(cfg.Logger, version)
if err := scfg.InitSentry(cfg.Monitoring.SentryDSN, version); err != nil {
return fmt.Errorf("init sentry: %w", err)
}

initSentry(cfg.Monitoring.SentryDSN, logger)
logger := scfg.InitSlog(cfg.Logger, version, cfg.Monitoring.SentryDSN != "")

go initMetrics(cfg.Monitoring.PromAddr, logger)
go scfg.InitMetrics(cfg.Monitoring.PromAddr, logger)

conn, rConn, err := initPgxConnections(cfg.Database)
conn, rConn, err := initPgxConnections(cfg.Database, logger)
if err != nil {
return fmt.Errorf("pgx connection: %w", err)
}
Expand All @@ -64,7 +68,7 @@ func main() {
listener.NewRepository(conn),
rConn,
pub,
listener.NewBinaryParser(binary.BigEndian),
listener.NewBinaryParser(logger, binary.BigEndian),
)

if err := service.Process(c.Context); err != nil {
Expand All @@ -76,6 +80,6 @@ func main() {
}

if err := app.Run(os.Args); err != nil {
logrus.Errorln(err)
slog.Error("service error", "error", err)
}
}
38 changes: 23 additions & 15 deletions config/config.go
Original file line number Diff line number Diff line change
@@ -1,9 +1,13 @@
package config

import (
"fmt"
"time"

"github.com/asaskevich/govalidator"
"github.com/spf13/viper"

cfg "github.com/ihippik/config"
)

type PublisherType string
Expand All @@ -18,8 +22,8 @@ type Config struct {
Listener *ListenerCfg `valid:"required"`
Database *DatabaseCfg `valid:"required"`
Publisher *PublisherCfg `valid:"required"`
Logger *LoggerCfg `valid:"required"`
Monitoring *MonitoringCfg
Logger *cfg.Logger `valid:"required"`
Monitoring *cfg.Monitoring
}

// ListenerCfg path of the listener config.
Expand All @@ -44,19 +48,6 @@ type PublisherCfg struct {
CACert string `json:"ca_cert"`
}

// MonitoringCfg monitoring configuration.
type MonitoringCfg struct {
SentryDSN string
PromAddr string
}

// LoggerCfg path of the logger config.
type LoggerCfg struct {
Caller bool
Level string
Format string
}

// DatabaseCfg path of the PostgreSQL DB config.
type DatabaseCfg struct {
Host string `valid:"required"`
Expand All @@ -76,3 +67,20 @@ func (c Config) Validate() error {
_, err := govalidator.ValidateStruct(c)
return err
}

// InitConfig load config from file.
func InitConfig(path string) (*Config, error) {
var conf Config

viper.SetConfigFile(path)

if err := viper.ReadInConfig(); err != nil {
return nil, fmt.Errorf("error reading config: %w", err)
}

if err := viper.Unmarshal(&conf); err != nil {
return nil, fmt.Errorf("unable to decode into config struct: %w", err)
}

return &conf, nil
}
11 changes: 6 additions & 5 deletions config/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,13 @@ import (
"errors"
"testing"

scfg "github.com/ihippik/config"
"github.com/stretchr/testify/assert"
)

func TestConfig_Validate(t *testing.T) {
type fields struct {
Logger *LoggerCfg
Logger *scfg.Logger
Listener *ListenerCfg
Database *DatabaseCfg
Publisher *PublisherCfg
Expand All @@ -22,7 +23,7 @@ func TestConfig_Validate(t *testing.T) {
{
name: "success",
fields: fields{
Logger: &LoggerCfg{
Logger: &scfg.Logger{
Level: "info",
},
Listener: &ListenerCfg{
Expand Down Expand Up @@ -50,7 +51,7 @@ func TestConfig_Validate(t *testing.T) {
{
name: "bad listener cfg",
fields: fields{
Logger: &LoggerCfg{
Logger: &scfg.Logger{
Level: "info",
},
Listener: &ListenerCfg{
Expand All @@ -76,7 +77,7 @@ func TestConfig_Validate(t *testing.T) {
{
name: "bad db cfg",
fields: fields{
Logger: &LoggerCfg{
Logger: &scfg.Logger{
Level: "info",
},
Listener: &ListenerCfg{
Expand All @@ -102,7 +103,7 @@ func TestConfig_Validate(t *testing.T) {
{
name: "empty publisher kind",
fields: fields{
Logger: &LoggerCfg{
Logger: &scfg.Logger{
Level: "info",
},
Listener: &ListenerCfg{
Expand Down
3 changes: 1 addition & 2 deletions config_example.yml
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,8 @@ listener:
topicsMap:
schema_table_name: "notifier"
logger:
caller: false
level: info
format: json
fmt: json
database:
host: localhost
port: 5432
Expand Down
Loading

0 comments on commit f233f9c

Please sign in to comment.