This repository has been archived by the owner on Aug 1, 2023. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 0
/
heroku.go
107 lines (85 loc) · 2.59 KB
/
heroku.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
package heroku
import (
"crypto/tls"
"os"
"strings"
"github.com/Shopify/sarama"
)
// NewConsumer creates a github.com/Shopify/sarama.Consumer configured from the
// standard Heroku Kafka environment.
func NewConsumer(cfg *sarama.Config) (sarama.Consumer, error) {
herokuCfg, err := NewConfig()
if err != nil {
return nil, err
}
if cfg == nil {
cfg = sarama.NewConfig()
}
cfg.Net.TLS.Enable = herokuCfg.TLS()
cfg.Net.TLS.Config = herokuCfg.TLSConfig()
consumer, err := sarama.NewConsumer(herokuCfg.Brokers(), cfg)
if err != nil {
return nil, err
}
return consumer, nil
}
// NewAsyncProducer creates a github.com/Shopify/sarama.AsyncProducer
// configured from the standard Heroku Kafka environment. When publishing
// messages to Multitenant Kafka all topics need to start with KAFKA_PREFIX
// which is best added using AppendPrefixTo.
func NewAsyncProducer(cfg *sarama.Config) (sarama.AsyncProducer, error) {
herokuCfg, err := NewConfig()
if err != nil {
return nil, err
}
if cfg == nil {
cfg = sarama.NewConfig()
}
cfg.Net.TLS.Enable = herokuCfg.TLS()
cfg.Net.TLS.Config = herokuCfg.TLSConfig()
return sarama.NewAsyncProducer(herokuCfg.Brokers(), cfg)
}
// NewSyncProducer creates a github.com/Shopify/sarama.SyncProducer configured
// from the standard Heroku Kafka environment. When publishing messages to
// Multitenant Kafka all topics need to start with KAFKA_PREFIX which is best
// added using AppendPrefixTo.
func NewSyncProducer(cfg *sarama.Config) (sarama.SyncProducer, error) {
herokuCfg, err := NewConfig()
if err != nil {
return nil, err
}
if cfg == nil {
cfg = sarama.NewConfig()
}
cfg.Net.TLS.Enable = herokuCfg.TLS()
cfg.Net.TLS.Config = herokuCfg.TLSConfig()
return sarama.NewSyncProducer(herokuCfg.Brokers(), cfg)
}
// AppendPrefixTo adds the env variable KAFKA_PREFIX to the given string if
// necessary. Heroku requires prefixing topics and consumer group names with
// the prefix on multi-tenant plans. It is safe to use on dedicated clusters if
// KAFKA_PREFIX is not set.
func AppendPrefixTo(name string) string {
prefix := os.Getenv("KAFKA_PREFIX")
if strings.HasPrefix(name, prefix) {
return name
}
return prefix + name
}
// Create the TLS context, using the key and certificates provided.
func TLSConfig() (*tls.Config, error) {
config, err := NewConfig()
if err != nil {
return nil, err
}
return config.TLSConfig(), nil
}
// Brokers returns a list of host:port addresses for the Kafka brokers set in
// KAFKA_URL.
func Brokers() ([]string, error) {
config, err := NewConfig()
if err != nil {
return nil, err
}
return config.Brokers(), nil
}