Skip to content

Commit

Permalink
Merge pull request #18 from ihippik/new_golang
Browse files Browse the repository at this point in the history
Refactoring
  • Loading branch information
ihippik authored Sep 17, 2023
2 parents f233f9c + 087fc87 commit cb360ef
Show file tree
Hide file tree
Showing 11 changed files with 237 additions and 78 deletions.
1 change: 1 addition & 0 deletions cmd/wal-listener/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@ func main() {
rConn,
pub,
listener.NewBinaryParser(logger, binary.BigEndian),
config.NewMetrics(),
)

if err := service.Process(c.Context); err != nil {
Expand Down
41 changes: 41 additions & 0 deletions config/metrics.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
package config

import (
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
)

// Metrics Prometheus metrics.
type Metrics struct {
filterSkippedEvents, publishedEvents *prometheus.CounterVec
}

// NewMetrics create and initialize new Prometheus metrics.
func NewMetrics() *Metrics {
return &Metrics{
publishedEvents: promauto.NewCounterVec(prometheus.CounterOpts{
Name: "published_events",
Help: "The total number of published events",
},
[]string{"app", "subject", "table"},
),
filterSkippedEvents: promauto.NewCounterVec(prometheus.CounterOpts{
Name: "filter_skipped_events",
Help: "The total number of skipped events",
},
[]string{"app", "table"},
),
}
}

const appName = "wal-listener"

// IncPublishedEvents increment published events counter.
func (m Metrics) IncPublishedEvents(subject, table string) {
m.publishedEvents.With(prometheus.Labels{"app": appName, "subject": subject, "table": table}).Inc()
}

// IncFilterSkippedEvents increment skipped by filter events counter.
func (m Metrics) IncFilterSkippedEvents(table string) {
m.filterSkippedEvents.With(prometheus.Labels{"app": appName, "table": table}).Inc()
}
1 change: 0 additions & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@ require (
github.com/magiconair/properties v1.8.7
github.com/nats-io/nats.go v1.24.0
github.com/prometheus/client_golang v1.16.0
github.com/sirupsen/logrus v1.9.0
github.com/spf13/viper v1.15.0
github.com/stretchr/testify v1.8.4
github.com/urfave/cli/v2 v2.24.4
Expand Down
3 changes: 0 additions & 3 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -253,8 +253,6 @@ github.com/russross/blackfriday/v2 v2.1.0 h1:JIOH55/0cWyOuilr9/qlrm0BSXldqnqwMsf
github.com/russross/blackfriday/v2 v2.1.0/go.mod h1:+Rmxgy9KzJVeS9/2gXHxylqXiyQDYRxCVz55jmeOWTM=
github.com/shopspring/decimal v1.3.1 h1:2Usl1nmF/WZucqkFZhnfFYxxxu8LG21F6nPQBE5gKV8=
github.com/shopspring/decimal v1.3.1/go.mod h1:DKyhrW/HYNuLGql+MJL6WCR6knT2jwCFRcu2hWCYk4o=
github.com/sirupsen/logrus v1.9.0 h1:trlNQbNUG3OdDrDil03MCb1H2o9nJ1x4/5LYw7byDE0=
github.com/sirupsen/logrus v1.9.0/go.mod h1:naHLuLoDiP4jHNo9R0sCBMtWGeIprob74mVsIT4qYEQ=
github.com/spf13/afero v1.9.4 h1:Sd43wM1IWz/s1aVXdOBkjJvuP8UdyqioeE4AmM0QsBs=
github.com/spf13/afero v1.9.4/go.mod h1:iUV7ddyEEZPO5gA3zD4fJt6iStLlL+Lg4m2cihcDf8Y=
github.com/spf13/cast v1.5.0 h1:rj3WzYc11XZaIZMPKmwP96zkFEnnAmV8s6XbB2aY32w=
Expand Down Expand Up @@ -429,7 +427,6 @@ golang.org/x/sys v0.0.0-20210225134936-a50acf3fe073/go.mod h1:h1NjWce9XRLGQEsW7w
golang.org/x/sys v0.0.0-20210423082822-04245dca01da/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20210423185535-09eb48e85fd7/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20210615035016-665e8c7367d1/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20220715151400-c0bba94af5f8/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20220722155257-8c9f86f7a55f/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20220908164124-27713097b956/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.10.0 h1:SqMFp9UcQJZa+pmYuAKjd9xq1f0j5rLcDIk0mj4qAsA=
Expand Down
49 changes: 20 additions & 29 deletions listener/listener.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,11 +10,10 @@ import (
"sync"
"time"

"github.com/jackc/pgx"

"github.com/ihippik/wal-listener/v2/config"
"github.com/ihippik/wal-listener/v2/publisher"
"github.com/jackc/pgx"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
)

const errorBufferSize = 100
Expand Down Expand Up @@ -47,10 +46,16 @@ type repository interface {
Close() error
}

type monitor interface {
IncPublishedEvents(subject, table string)
IncFilterSkippedEvents(table string)
}

// Listener main service struct.
type Listener struct {
cfg *config.Config
log *slog.Logger
monitor monitor
mu sync.RWMutex
slotName string
publisher eventPublisher
Expand All @@ -61,22 +66,6 @@ type Listener struct {
errChannel chan error
}

var (
publishedEvents = promauto.NewCounterVec(prometheus.CounterOpts{
Name: "published_events",
Help: "The total number of published events",
},
[]string{"subject", "table"},
)

filterSkippedEvents = promauto.NewCounterVec(prometheus.CounterOpts{
Name: "filter_skipped_events",
Help: "The total number of skipped events",
},
[]string{"table"},
)
)

// NewWalListener create and initialize new service instance.
func NewWalListener(
cfg *config.Config,
Expand All @@ -85,9 +74,11 @@ func NewWalListener(
repl replication,
publ eventPublisher,
parser parser,
monitor monitor,
) *Listener {
return &Listener{
log: log,
monitor: monitor,
slotName: cfg.Listener.SlotName,
cfg: cfg,
publisher: publ,
Expand Down Expand Up @@ -157,12 +148,12 @@ ProcessLoop:
return err
}

l.log.Error("received error", "err", err)
logger.Error("listener: received error", "err", err)
case <-ctx.Done():
logger.Debug("context was canceled")
logger.Debug("listener: context was canceled")

if err := l.Stop(); err != nil {
logger.Error("listener stop error", "err", err)
logger.Error("listener: stop error", "err", err)
}

break ProcessLoop
Expand All @@ -176,7 +167,7 @@ ProcessLoop:
func (l *Listener) slotIsExists() (bool, error) {
restartLSNStr, err := l.repository.GetSlotLSN(l.slotName)
if err != nil {
return false, err
return false, fmt.Errorf("get slot lsn: %w", err)
}

if len(restartLSNStr) == 0 {
Expand Down Expand Up @@ -215,11 +206,11 @@ func (l *Listener) Stream(ctx context.Context) {

go l.SendPeriodicHeartbeats(ctx)

tx := NewWalTransaction()
tx := NewWalTransaction(l.log, l.monitor)

for {
if err := ctx.Err(); err != nil {
l.errChannel <- newListenerError("context canceled", err)
l.errChannel <- newListenerError("stream: context canceled", err)
break
}

Expand All @@ -234,8 +225,8 @@ func (l *Listener) Stream(ctx context.Context) {
l.log.Debug("receive wal message", slog.Uint64("wal", msg.WalMessage.WalStart))

if err := l.parser.ParseWalMessage(msg.WalMessage.WalData, tx); err != nil {
l.log.Error("msg parse failed", "err", err)
l.errChannel <- fmt.Errorf("unmarshal wal message: %w", err)
l.log.Error("message parse failed", "err", err)
l.errChannel <- fmt.Errorf("parse wal message: %w", err)

continue
}
Expand All @@ -251,7 +242,7 @@ func (l *Listener) Stream(ctx context.Context) {
continue
}

publishedEvents.With(prometheus.Labels{"subject": subjectName, "table": event.Table}).Inc()
l.monitor.IncPublishedEvents(subjectName, event.Table)

l.log.Info(
"event was sent",
Expand Down Expand Up @@ -321,7 +312,7 @@ func (l *Listener) SendPeriodicHeartbeats(ctx context.Context) {
for {
select {
case <-ctx.Done():
l.log.Info("periodic heartbeats: context was canceled")
l.log.Warn("periodic heartbeats: context was canceled")
return
case <-heart.C:
if err := l.SendStandbyStatus(); err != nil {
Expand Down
27 changes: 22 additions & 5 deletions listener/listener_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,11 +25,12 @@ var (
)

func TestListener_slotIsExists(t *testing.T) {
repo := new(repositoryMock)
type fields struct {
slotName string
}

repo := new(repositoryMock)

setGetSlotLSN := func(slotName, lsn string, err error) {
repo.On("GetSlotLSN", slotName).
Return(lsn, err).
Expand Down Expand Up @@ -204,6 +205,8 @@ func TestListener_SendStandbyStatus(t *testing.T) {
Once()
}

logger := slog.New(slog.NewJSONHandler(io.Discard, nil))

tests := []struct {
name string
setup func()
Expand Down Expand Up @@ -249,17 +252,21 @@ func TestListener_SendStandbyStatus(t *testing.T) {
wantErr: true,
},
}

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
tt.setup()

w := &Listener{
log: slog.New(slog.NewJSONHandler(io.Discard, nil)),
log: logger,
replicator: repl,
lsn: tt.fields.restartLSN,
}

if err := w.SendStandbyStatus(); (err != nil) != tt.wantErr {
t.Errorf("SendStandbyStatus() error = %v, wantErr %v", err, tt.wantErr)
}

repl.AssertExpectations(t)
})
}
Expand Down Expand Up @@ -355,11 +362,13 @@ func TestListener_AckWalMessage(t *testing.T) {
},
}

logger := slog.New(slog.NewJSONHandler(io.Discard, nil))

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
tt.setup()
w := &Listener{
log: slog.New(slog.NewJSONHandler(io.Discard, nil)),
log: logger,
replicator: repl,
lsn: tt.fields.restartLSN,
}
Expand Down Expand Up @@ -442,6 +451,9 @@ func TestListener_Stream(t *testing.T) {

uuid.SetRand(bytes.NewReader(make([]byte, 512)))

logger := slog.New(slog.NewJSONHandler(io.Discard, nil))
metrics := new(monitorMock)

tests := []struct {
name string
setup func()
Expand Down Expand Up @@ -482,6 +494,8 @@ func TestListener_Stream(t *testing.T) {
setParseWalMessageOnce(
[]byte(`some bytes`),
&WalTransaction{
monitor: metrics,
log: logger,
LSN: 0,
BeginTime: nil,
CommitTime: nil,
Expand Down Expand Up @@ -674,6 +688,8 @@ func TestListener_Stream(t *testing.T) {
setParseWalMessageOnce(
[]byte(`some bytes`),
&WalTransaction{
monitor: metrics,
log: logger,
LSN: 0,
BeginTime: nil,
CommitTime: nil,
Expand Down Expand Up @@ -753,6 +769,8 @@ func TestListener_Stream(t *testing.T) {
setParseWalMessageOnce(
[]byte(`some bytes`),
&WalTransaction{
monitor: metrics,
log: logger,
LSN: 0,
BeginTime: nil,
CommitTime: nil,
Expand Down Expand Up @@ -808,15 +826,14 @@ func TestListener_Stream(t *testing.T) {
},
}

logger := slog.New(slog.NewJSONHandler(io.Discard, nil))

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
tt.setup()

ctx, cancel := context.WithTimeout(context.Background(), tt.args.timeout)
w := &Listener{
log: logger,
monitor: metrics,
cfg: tt.fields.config,
slotName: tt.fields.slotName,
publisher: publ,
Expand Down
7 changes: 7 additions & 0 deletions listener/monitor_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
package listener

type monitorMock struct{}

func (m *monitorMock) IncPublishedEvents(subject, table string) {}

func (m *monitorMock) IncFilterSkippedEvents(table string) {}
Loading

0 comments on commit cb360ef

Please sign in to comment.