forked from getAlby/lndhub.go
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathmain.go
353 lines (310 loc) · 10.7 KB
/
main.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
package main
import (
"context"
"embed"
"fmt"
"log"
"net/http"
"os"
"os/signal"
"sync"
"time"
"github.com/getAlby/lndhub.go/rabbitmq"
cache "github.com/SporkHubr/echo-http-cache"
"github.com/SporkHubr/echo-http-cache/adapter/memory"
"github.com/getAlby/lndhub.go/db"
"github.com/getAlby/lndhub.go/db/migrations"
"github.com/getAlby/lndhub.go/docs"
"github.com/getAlby/lndhub.go/lib"
"github.com/getAlby/lndhub.go/lib/responses"
"github.com/getAlby/lndhub.go/lib/security"
"github.com/getAlby/lndhub.go/lib/service"
"github.com/getAlby/lndhub.go/lib/tokens"
"github.com/getAlby/lndhub.go/lnd"
"github.com/getsentry/sentry-go"
sentryecho "github.com/getsentry/sentry-go/echo"
"github.com/go-playground/validator/v10"
"github.com/joho/godotenv"
"github.com/kelseyhightower/envconfig"
"github.com/labstack/echo-contrib/prometheus"
"github.com/labstack/echo/v4"
"github.com/labstack/echo/v4/middleware"
"github.com/lightningnetwork/lnd/lnrpc"
echoSwagger "github.com/swaggo/echo-swagger"
"github.com/uptrace/bun/migrate"
"github.com/ziflex/lecho/v3"
"golang.org/x/time/rate"
ddEcho "gopkg.in/DataDog/dd-trace-go.v1/contrib/labstack/echo.v4"
"gopkg.in/DataDog/dd-trace-go.v1/ddtrace/tracer"
)
//go:embed templates/index.html
var indexHtml string
//go:embed static/*
var staticContent embed.FS
// @title LndHub.go
// @version 0.9.0
// @description Accounting wrapper for the Lightning Network providing separate accounts for end-users.
// @contact.name Alby
// @contact.url https://seedhypermedia.com
// @contact.email [email protected]
// @license.name GNU GPLv3
// @license.url https://www.gnu.org/licenses/gpl-3.0.en.html
// @BasePath /
// @securitydefinitions.oauth2.password OAuth2Password
// @tokenUrl /auth
// @schemes https http
func main() {
c := &service.Config{}
// Load configruation from environment variables
err := godotenv.Load(".env")
if err != nil {
fmt.Println("Failed to load .env file")
}
err = envconfig.Process("", c)
if err != nil {
log.Fatalf("Error loading environment variables: %v", err)
}
// Setup logging to STDOUT or a configrued log file
logger := lib.Logger(c.LogFilePath)
// Open a DB connection based on the configured DATABASE_URI
dbConn, err := db.Open(c)
if err != nil {
logger.Fatalf("Error initializing db connection: %v", err)
}
// Migrate the DB
//Todo: use timeout for startupcontext
startupCtx := context.Background()
migrator := migrate.NewMigrator(dbConn, migrations.Migrations)
err = migrator.Init(startupCtx)
if err != nil {
logger.Fatalf("Error initializing db migrator: %v", err)
}
_, err = migrator.Migrate(startupCtx)
if err != nil {
logger.Fatalf("Error migrating database: %v", err)
}
// Setup exception tracking with Sentry if configured
// sentry init needs to happen before the echo middlewares are added
if c.SentryDSN != "" {
if err = sentry.Init(sentry.ClientOptions{
Dsn: c.SentryDSN,
IgnoreErrors: []string{"401"},
EnableTracing: c.SentryTracesSampleRate > 0,
TracesSampleRate: c.SentryTracesSampleRate,
}); err != nil {
logger.Errorf("sentry init error: %v", err)
}
}
// New Echo app
e := echo.New()
e.HideBanner = true
e.HTTPErrorHandler = responses.HTTPErrorHandler
e.Validator = &lib.CustomValidator{Validator: validator.New()}
//if Datadog is configured, add datadog middleware
if c.DatadogAgentUrl != "" {
tracer.Start(tracer.WithAgentAddr(c.DatadogAgentUrl))
defer tracer.Stop()
e.Use(ddEcho.Middleware(ddEcho.WithServiceName("lndhub.go")))
}
e.Use(middleware.Recover())
// Absolute body limit
e.Use(middleware.BodyLimit("250K"))
e.Logger = logger
e.Use(middleware.RequestID())
e.Use(lecho.Middleware(lecho.Config{
Logger: logger,
}))
// Setup exception tracking with Sentry if configured
// sentry init needs to happen before the echo middlewares are added
if c.SentryDSN != "" {
e.Use(sentryecho.New(sentryecho.Options{}))
}
// Init new LND client
lndClient, err := lnd.NewLNDclient(lnd.LNDoptions{
Address: c.LNDAddress,
MacaroonFile: c.LNDMacaroonFile,
MacaroonHex: c.LNDMacaroonHex,
CertFile: c.LNDCertFile,
CertHex: c.LNDCertHex,
})
if err != nil {
e.Logger.Fatalf("Error initializing the LND connection: %v", err)
}
getInfo, err := lndClient.GetInfo(startupCtx, &lnrpc.GetInfoRequest{})
if err != nil {
e.Logger.Fatalf("Error getting node info: %v", err)
}
logger.Infof("Connected to LND: %s - %s", getInfo.Alias, getInfo.IdentityPubkey)
// If no RABBITMQ_URI was provided we will not attempt to create a client
// No rabbitmq features will be available in this case.
var rabbitmqClient rabbitmq.Client
if c.RabbitMQUri != "" {
rabbitmqClient, err = rabbitmq.Dial(c.RabbitMQUri,
rabbitmq.WithLogger(logger),
rabbitmq.WithLndInvoiceExchange(c.RabbitMQLndInvoiceExchange),
rabbitmq.WithLndHubInvoiceExchange(c.RabbitMQLndhubInvoiceExchange),
rabbitmq.WithLndInvoiceConsumerQueueName(c.RabbitMQInvoiceConsumerQueueName),
)
if err != nil {
logger.Fatal(err)
}
// close the connection gently at the end of the runtime
defer rabbitmqClient.Close()
}
svc := &service.LndhubService{
Config: c,
DB: dbConn,
LndClient: lndClient,
RabbitMQClient: rabbitmqClient,
Logger: logger,
IdentityPubkey: getInfo.IdentityPubkey,
InvoicePubSub: service.NewPubsub(),
}
// create user collecting remainder of splitting payments
if _, err := svc.CreateUser(context.Background(), svc.Config.HouseUser, "", ""); err != nil {
e.Logger.Fatalf("Error Creating House User: %v", err)
}
strictRateLimitPerMinMW := createRateLimitMiddleware(c.StrictRateLimitPerMin, 1*time.Minute)
strictRateLimitPerSecMW := createRateLimitMiddleware(c.StrictRateLimitPerSec, 1*time.Second)
regularRateLimitPerMinMW := createRateLimitMiddleware(c.DefaultRateLimitPerMin, 1*time.Minute)
regularRateLimitPerSecMW := createRateLimitMiddleware(c.DefaultRateLimitPerSec, 1*time.Second)
tokenMW := tokens.Middleware(c.JWTSecret)
RegisterLegacyEndpoints(svc, e, tokenMW, strictRateLimitPerMinMW, strictRateLimitPerSecMW, regularRateLimitPerMinMW, regularRateLimitPerSecMW, tokens.AdminTokenMiddleware(c.AdminToken))
RegisterV2Endpoints(svc, e, tokenMW, strictRateLimitPerMinMW, strictRateLimitPerSecMW, regularRateLimitPerMinMW, regularRateLimitPerSecMW, security.SignatureMiddleware(svc.Config.LoginMessage))
//Swagger API spec
docs.SwaggerInfo.Host = c.Host
e.GET("/swagger/*", echoSwagger.WrapHandler)
var backgroundWg sync.WaitGroup
backGroundCtx, _ := signal.NotifyContext(context.Background(), os.Interrupt)
// Subscribe to LND invoice updates in the background
backgroundWg.Add(1)
go func() {
switch svc.Config.SubscriptionConsumerType {
case "rabbitmq":
err = svc.RabbitMQClient.SubscribeToLndInvoices(backGroundCtx, svc.ProcessInvoiceUpdate)
if err != nil && err != context.Canceled {
// in case of an error in this routine, we want to restart LNDhub
sentry.CaptureException(err)
svc.Logger.Fatal(err)
}
case "grpc":
err = svc.InvoiceUpdateSubscription(backGroundCtx)
if err != nil && err != context.Canceled {
// in case of an error in this routine, we want to restart LNDhub
svc.Logger.Fatal(err)
}
default:
svc.Logger.Fatalf("Unrecognized subscription consumer type %s", svc.Config.SubscriptionConsumerType)
}
svc.Logger.Info("Invoice routine done")
backgroundWg.Done()
}()
// Check the status of all pending outgoing payments
// A goroutine will be spawned for each one
backgroundWg.Add(1)
go func() {
err = svc.CheckAllPendingOutgoingPayments(backGroundCtx)
if err != nil {
svc.Logger.Error(err)
}
svc.Logger.Info("Pending payment check routines done")
backgroundWg.Done()
}()
//Start webhook subscription
if svc.Config.WebhookUrl != "" {
backgroundWg.Add(1)
go func() {
svc.StartWebhookSubscription(backGroundCtx, svc.Config.WebhookUrl)
svc.Logger.Info("Webhook routine done")
backgroundWg.Done()
}()
}
//Start rabbit publisher
if svc.RabbitMQClient != nil {
backgroundWg.Add(1)
go func() {
err = svc.RabbitMQClient.StartPublishInvoices(backGroundCtx,
svc.SubscribeIncomingOutgoingInvoices,
svc.EncodeInvoiceWithUserLogin,
)
if err != nil {
svc.Logger.Error(err)
sentry.CaptureException(err)
}
svc.Logger.Info("Rabbit invoice publisher done")
backgroundWg.Done()
}()
}
//Start Prometheus server if necessary
var echoPrometheus *echo.Echo
if svc.Config.EnablePrometheus {
// Create Prometheus server and Middleware
echoPrometheus = echo.New()
echoPrometheus.HideBanner = true
prom := prometheus.NewPrometheus("echo", nil)
// Scrape metrics from Main Server
e.Use(prom.HandlerFunc)
// Setup metrics endpoint at another server
prom.SetMetricsPath(echoPrometheus)
go func() {
echoPrometheus.Logger = logger
echoPrometheus.Logger.Infof("Starting prometheus on port %d", svc.Config.PrometheusPort)
echoPrometheus.Logger.Fatal(echoPrometheus.Start(fmt.Sprintf(":%d", svc.Config.PrometheusPort)))
}()
}
// Start server
go func() {
if err := e.Start(fmt.Sprintf(":%v", c.Port)); err != nil && err != http.ErrServerClosed {
e.Logger.Fatal("shutting down the server. ", err.Error())
}
}()
<-backGroundCtx.Done()
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
if err := e.Shutdown(ctx); err != nil {
e.Logger.Fatal(err)
}
if echoPrometheus != nil {
if err := echoPrometheus.Shutdown(ctx); err != nil {
e.Logger.Fatal(err)
}
}
//Wait for graceful shutdown of background routines
backgroundWg.Wait()
svc.Logger.Info("LNDhub exiting gracefully. Goodbye.")
}
func createRateLimitMiddleware(requests int, interval time.Duration) echo.MiddlewareFunc {
config := middleware.RateLimiterConfig{
Store: middleware.NewRateLimiterMemoryStoreWithConfig(
middleware.RateLimiterMemoryStoreConfig{
Rate: rate.Limit(float64(requests) / interval.Seconds()),
},
),
DenyHandler: func(context echo.Context, identifier string, err error) error {
return &echo.HTTPError{
Code: http.StatusTooManyRequests,
Message: fmt.Sprintf("rate limit [%d/%s] exceeded", requests, interval.String()),
Internal: err,
}
},
}
return middleware.RateLimiterWithConfig(config)
}
func createCacheClient() *cache.Client {
memcached, err := memory.NewAdapter(
memory.AdapterWithAlgorithm(memory.LRU),
memory.AdapterWithCapacity(10000000),
)
if err != nil {
log.Fatalf("Error creating cache client memory adapter: %v", err)
}
cacheClient, err := cache.NewClient(
cache.ClientWithAdapter(memcached),
cache.ClientWithTTL(10*time.Minute),
cache.ClientWithRefreshKey("opn"),
)
if err != nil {
log.Fatalf("Error creating cache client: %v", err)
}
return cacheClient
}