-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathcontroller.go
49 lines (40 loc) · 1.07 KB
/
controller.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
package notifier
import (
"fmt"
"time"
"github.com/lib/pq"
"github.com/segmentio/kafka-go"
"github.com/yolkhovyy/go-otelw/otelw/slogw"
"github.com/yolkhovyy/go-userv/internal/contract/server"
"github.com/yolkhovyy/go-userv/internal/storage/postgres"
)
type Controller struct {
pqListener *pq.Listener
kafkaWriter *kafka.Writer
}
//nolint:ireturn
func New(config postgres.Config, kafkaConfig Config) (server.Contract, error) {
// Listener captures user changes in storage.
listener, err := Connect(config)
if err != nil {
return nil, fmt.Errorf("storage listener: %w", err)
}
slogw.DefaultLogger().Debug("notifier connected to database")
// TODO: make kafka params configurable.
const (
batchSize = 1e6
batchBytes = 1e6
batchTimeout = 100 * time.Millisecond
)
kafkaWriter := kafka.NewWriter(kafka.WriterConfig{
Brokers: kafkaConfig.Brokers,
BatchTimeout: batchTimeout,
BatchSize: batchSize,
BatchBytes: batchBytes,
})
controller := &Controller{
kafkaWriter: kafkaWriter,
pqListener: listener.Listener,
}
return controller, nil
}