Skip to content

Commit d77ba3c

Browse files
committedMar 15, 2025
feat: add worker to get yield metrics
1 parent 5b30b3c commit d77ba3c

19 files changed

+964
-309
lines changed
 

‎cmd/api/server/server.go

+21
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ import (
1818
"github.com/zuni-lab/yexus-api/pkg/openobserve"
1919
"github.com/zuni-lab/yexus-api/pkg/swap"
2020
"github.com/zuni-lab/yexus-api/pkg/utils"
21+
"github.com/zuni-lab/yexus-api/pkg/worker"
2122
sdktrace "go.opentelemetry.io/otel/sdk/trace"
2223
)
2324

@@ -28,6 +29,7 @@ type Server struct {
2829
ctx context.Context
2930
cancel context.CancelFunc
3031
wg sync.WaitGroup
32+
workers []*worker.Scheduler
3133
}
3234

3335
func New() *Server {
@@ -59,13 +61,16 @@ func New() *Server {
5961
setupRoute(e)
6062
setupValidator(e)
6163

64+
workers := setupWorkers()
65+
6266
loadSvcs(ctx)
6367

6468
return &Server{
6569
Raw: e,
6670
traceProvider: tp,
6771
ctx: ctx,
6872
cancel: cancel,
73+
workers: workers,
6974
}
7075
}
7176

@@ -77,6 +82,7 @@ func (s *Server) Start() error {
7782

7883
s.printRoutes()
7984
s.startTwapMatcher()
85+
s.startWorkers()
8086

8187
log.Info().Msgf("🥪 Environment loaded: %+v", config.Env)
8288

@@ -130,6 +136,20 @@ func (s *Server) startTwapMatcher() {
130136
}()
131137
}
132138

139+
func (s *Server) startWorkers() {
140+
var wg sync.WaitGroup
141+
for _, w := range s.workers {
142+
wg.Add(1)
143+
go func(worker *worker.Scheduler) {
144+
defer wg.Done()
145+
worker.Start()
146+
}(w)
147+
}
148+
wg.Wait()
149+
150+
log.Info().Msg("All workers started")
151+
}
152+
133153
func (s *Server) Close() {
134154
s.cancel() // Signal all goroutines to stop
135155
s.wg.Wait() // Wait for all goroutines to finish
@@ -151,6 +171,7 @@ func loadSvcs(ctx context.Context) {
151171
db.Init(ctx, config.Env.PostgresUrl, config.Env.MigrationUrl)
152172
openai.Init()
153173
swap.InitPoolInfo()
174+
worker.Init()
154175
}
155176

156177
func closeSvcs() {

‎cmd/api/server/setup.go

+18
Original file line numberDiff line numberDiff line change
@@ -1,16 +1,19 @@
11
package server
22

33
import (
4+
"context"
45
"os"
56
"sort"
67
"strings"
78

89
"github.com/jedib0t/go-pretty/v6/table"
910
"github.com/labstack/echo/v4"
1011
"github.com/labstack/echo/v4/middleware"
12+
"github.com/rs/zerolog/log"
1113
"github.com/zuni-lab/yexus-api/config"
1214
"github.com/zuni-lab/yexus-api/pkg/openobserve"
1315
"github.com/zuni-lab/yexus-api/pkg/utils"
16+
"github.com/zuni-lab/yexus-api/pkg/worker"
1417
)
1518

1619
type RouteInfo struct {
@@ -84,6 +87,21 @@ func setupValidator(e *echo.Echo) {
8487
e.Validator = utils.NewValidator()
8588
}
8689

90+
func setupWorkers() []*worker.Scheduler {
91+
workers := make([]*worker.Scheduler, 0)
92+
yieldMetricsAt := config.Env.YieldMetricsRunAt
93+
w := worker.NewScheduler(yieldMetricsAt, "fetch-yield-metrics")
94+
w.AddJob(func() {
95+
log.Info().Msg("Fetching yield metrics")
96+
err := worker.FetchAndUpdateYieldMetrics(context.Background())
97+
if err != nil {
98+
log.Err(err).Msg("Error fetching yield metrics")
99+
}
100+
})
101+
workers = append(workers, w)
102+
return workers
103+
}
104+
87105
func (s *Server) printRoutes() {
88106
t := table.NewWriter()
89107
t.SetOutputMirror(os.Stdout)

‎config/env.go

+19-6
Original file line numberDiff line numberDiff line change
@@ -54,19 +54,17 @@ type RealtimeManagerConfig struct {
5454
Address common.Address
5555
}
5656

57-
// Indexer-specific configuration
58-
type IndexerConfig struct {
59-
ChunkSize uint64 `validate:"min=1"`
60-
Concurrency int `validate:"min=1"`
61-
StartBlock uint64 `validate:"min=1"`
62-
FetchInterval time.Duration `validate:"min=1s"`
57+
type WorkerConfig struct {
58+
YieldMetricsSource string `validate:"url"`
59+
YieldMetricsRunAt time.Time `validate:"required"`
6360
}
6461

6562
// ServerEnv combines all configurations
6663
type ServerEnv struct {
6764
CommonConfig
6865
ServerConfig
6966
RealtimeManagerConfig
67+
WorkerConfig
7068
}
7169

7270
var Env ServerEnv
@@ -102,10 +100,13 @@ func loadEnv() {
102100

103101
managerConfig := loadRealtimeManagerConfig()
104102

103+
workerConfig := loadWorkerConfig()
104+
105105
Env = ServerEnv{
106106
CommonConfig: commonConfig,
107107
ServerConfig: serverConfig,
108108
RealtimeManagerConfig: managerConfig,
109+
WorkerConfig: workerConfig,
109110
}
110111

111112
validate := validator.New()
@@ -179,6 +180,18 @@ func loadRealtimeManagerConfig() RealtimeManagerConfig {
179180
}
180181
}
181182

183+
func loadWorkerConfig() WorkerConfig {
184+
runAt, err := time.Parse("15:04", os.Getenv("YIELD_METRICS_RUN_AT"))
185+
if err != nil {
186+
log.Fatal().Msgf("Error parsing YIELD_METRICS_AT: %s", err)
187+
}
188+
189+
return WorkerConfig{
190+
YieldMetricsSource: os.Getenv("YIELD_METRICS_SOURCE"),
191+
YieldMetricsRunAt: runAt,
192+
}
193+
}
194+
182195
// Helper functions for environment variable parsing
183196
func getEnvDuration(key, defaultValue string) time.Duration {
184197
value := os.Getenv(key)

‎go.mod

+3-1
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ toolchain go1.23.3
77
require (
88
github.com/cenkalti/backoff/v4 v4.3.0
99
github.com/ethereum/go-ethereum v1.15.2
10+
github.com/go-co-op/gocron v1.37.0
1011
github.com/go-playground/validator/v10 v10.24.0
1112
github.com/golang-migrate/migrate/v4 v4.18.2
1213
github.com/jackc/pgx/v5 v5.7.2
@@ -58,6 +59,7 @@ require (
5859
github.com/mattn/go-runewidth v0.0.16 // indirect
5960
github.com/mmcloughlin/addchain v0.4.0 // indirect
6061
github.com/rivo/uniseg v0.4.7 // indirect
62+
github.com/robfig/cron/v3 v3.0.1 // indirect
6163
github.com/shirou/gopsutil v3.21.4-0.20210419000835-c7a38de76ee5+incompatible // indirect
6264
github.com/supranational/blst v0.3.14 // indirect
6365
github.com/tidwall/gjson v1.14.4 // indirect
@@ -72,7 +74,7 @@ require (
7274
go.opentelemetry.io/otel/exporters/otlp/otlptrace v1.34.0 // indirect
7375
go.opentelemetry.io/otel/metric v1.34.0 // indirect
7476
go.opentelemetry.io/proto/otlp v1.5.0 // indirect
75-
go.uber.org/atomic v1.7.0 // indirect
77+
go.uber.org/atomic v1.9.0 // indirect
7678
golang.org/x/crypto v0.32.0 // indirect
7779
golang.org/x/exp v0.0.0-20231110203233-9a3e6036ecaa // indirect
7880
golang.org/x/net v0.34.0 // indirect

‎go.sum

+24-2
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,7 @@ github.com/crate-crypto/go-ipa v0.0.0-20240724233137-53bbb0ceb27a h1:W8mUrRp6NOV
4141
github.com/crate-crypto/go-ipa v0.0.0-20240724233137-53bbb0ceb27a/go.mod h1:sTwzHBvIzm2RfVCGNEBZgRyjwK40bVoun3ZnGOCafNM=
4242
github.com/crate-crypto/go-kzg-4844 v1.1.0 h1:EN/u9k2TF6OWSHrCCDBBU6GLNMq88OspHHlMnHfoyU4=
4343
github.com/crate-crypto/go-kzg-4844 v1.1.0/go.mod h1:JolLjpSff1tCCJKaJx4psrlEdlXuJEC996PL3tTAFks=
44+
github.com/creack/pty v1.1.9/go.mod h1:oKZEueFk5CKHvIhNR5MUki03XCEU+Q6VDXinZuGJ33E=
4445
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
4546
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
4647
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
@@ -74,6 +75,8 @@ github.com/gabriel-vasile/mimetype v1.4.8 h1:FfZ3gj38NjllZIeJAmMhr+qKL8Wu+nOoI3G
7475
github.com/gabriel-vasile/mimetype v1.4.8/go.mod h1:ByKUIKGjh1ODkGM1asKUbQZOLGrPjydw3hYPU2YU9t8=
7576
github.com/getsentry/sentry-go v0.27.0 h1:Pv98CIbtB3LkMWmXi4Joa5OOcwbmnX88sF5qbK3r3Ps=
7677
github.com/getsentry/sentry-go v0.27.0/go.mod h1:lc76E2QywIyW8WuBnwl8Lc4bkmQH4+w1gwTf25trprY=
78+
github.com/go-co-op/gocron v1.37.0 h1:ZYDJGtQ4OMhTLKOKMIch+/CY70Brbb1dGdooLEhh7b0=
79+
github.com/go-co-op/gocron v1.37.0/go.mod h1:3L/n6BkO7ABj+TrfSVXLRzsP26zmikL4ISkLQ0O8iNY=
7780
github.com/go-logr/logr v1.2.2/go.mod h1:jdQByPbusPIv2/zmleS9BjJVeZ6kBagPoEUsqbVz/1A=
7881
github.com/go-logr/logr v1.4.2 h1:6pFjapn8bFcIbiKo3XT4j/BhANplGihG6tvd+8rYgrY=
7982
github.com/go-logr/logr v1.4.2/go.mod h1:9T104GzyrTigFIr8wt5mBrctHMim0Nb2HLGrmQ40KvY=
@@ -108,6 +111,7 @@ github.com/google/go-cmp v0.6.0/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeN
108111
github.com/google/gofuzz v1.2.0 h1:xRy4A+RhZaiKjJ1bPfwQ8sedCA+YS2YcCHW6ec7JMi0=
109112
github.com/google/gofuzz v1.2.0/go.mod h1:dBl0BpW6vV/+mYPU4Po3pmUjxk6FQPldtuIdl/M65Eg=
110113
github.com/google/subcommands v1.2.0/go.mod h1:ZjhPrFU+Olkh9WazFPsl27BQ4UPiG37m3yTrtFlrHVk=
114+
github.com/google/uuid v1.4.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
111115
github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0=
112116
github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
113117
github.com/gorilla/websocket v1.4.2 h1:+/TMaTYc4QFitKJxsQ7Yye35DkWvkdLcvGKqM+x0Ufc=
@@ -147,8 +151,13 @@ github.com/joho/godotenv v1.5.1 h1:7eLL/+HRGLY0ldzfGMeQkb7vMd0as4CfYvUVzLqw0N0=
147151
github.com/joho/godotenv v1.5.1/go.mod h1:f4LDr5Voq0i2e/R5DDNOoa2zzDfwtkZa6DnEwAbqwq4=
148152
github.com/klauspost/compress v1.16.0 h1:iULayQNOReoYUe+1qtKOqw9CwJv3aNQu8ivo7lw1HU4=
149153
github.com/klauspost/compress v1.16.0/go.mod h1:ntbaceVETuRiXiv4DpjP66DpAtAGkEQskQzEyD//IeE=
154+
github.com/kr/pretty v0.1.0/go.mod h1:dAy3ld7l9f0ibDNOQOHHMYYIIbhfbHSm3C4ZsoJORNo=
155+
github.com/kr/pretty v0.2.1/go.mod h1:ipq/a2n7PKx3OHsz4KJII5eveXtPO4qwEXGdVfWzfnI=
156+
github.com/kr/pretty v0.3.0/go.mod h1:640gp4NfQd8pI5XOwp5fnNeVWj67G7CFk/SaSQn7NBk=
150157
github.com/kr/pretty v0.3.1 h1:flRD4NNwYAUpkphVc1HcthR4KEIFJ65n8Mw5qdRn3LE=
151158
github.com/kr/pretty v0.3.1/go.mod h1:hoEshYVHaxMs3cyo3Yncou5ZscifuDolrwPKZanG3xk=
159+
github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ=
160+
github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI=
152161
github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY=
153162
github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE=
154163
github.com/kylelemons/godebug v1.1.0 h1:RPNrshWIDI6G2gRW9EHilWtl7Z6Sb1BR0xunSBf0SNc=
@@ -204,6 +213,7 @@ github.com/pion/transport/v2 v2.2.1 h1:7qYnCBlpgSJNYMbLCKuSY9KbQdBFoETvPNETv0y4N
204213
github.com/pion/transport/v2 v2.2.1/go.mod h1:cXXWavvCnFF6McHTft3DWS9iic2Mftcz1Aq29pGcU5g=
205214
github.com/pion/transport/v3 v3.0.1 h1:gDTlPJwROfSfz6QfSi0ZmeCSkFcnWWiiR9ES0ouANiM=
206215
github.com/pion/transport/v3 v3.0.1/go.mod h1:UY7kiITrlMv7/IKgd5eTUcaahZx5oUN3l9SzK5f5xE0=
216+
github.com/pkg/diff v0.0.0-20210226163009-20ebb0f2a09e/go.mod h1:pJLUxLENpZxwdsKMEsNbx1VGcRFpLqf3715MtcvvzbA=
207217
github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4=
208218
github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
209219
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
@@ -219,6 +229,10 @@ github.com/prometheus/procfs v0.7.3/go.mod h1:cz+aTbrPOrUb4q7XlbU9ygM+/jj0fzG6c1
219229
github.com/rivo/uniseg v0.2.0/go.mod h1:J6wj4VEh+S6ZtnVlnTBMWIodfgj8LQOQFoIToxlJtxc=
220230
github.com/rivo/uniseg v0.4.7 h1:WUdvkW8uEhrYfLC4ZzdpI2ztxP1I582+49Oc5Mq64VQ=
221231
github.com/rivo/uniseg v0.4.7/go.mod h1:FN3SvrM+Zdj16jyLfmOkMNblXMcoc8DfTHruCPUcx88=
232+
github.com/robfig/cron/v3 v3.0.1 h1:WdRxkvbJztn8LMz/QEvLN5sBU+xKpSqwwUO1Pjr4qDs=
233+
github.com/robfig/cron/v3 v3.0.1/go.mod h1:eQICP3HwyT7UooqI/z+Ov+PtYAWygg1TEWWzGIFLtro=
234+
github.com/rogpeppe/go-internal v1.6.1/go.mod h1:xXDCJY+GAPziupqXw64V24skbSoqbTEfhy4qGm1nDQc=
235+
github.com/rogpeppe/go-internal v1.8.1/go.mod h1:JeRgkft04UBgHMgCIwADu4Pn6Mtm5d4nPKWu0nJ5d+o=
222236
github.com/rogpeppe/go-internal v1.13.1 h1:KvO1DLK/DRN07sQ1LQKScxyZJuNnedQ5/wKSR38lUII=
223237
github.com/rogpeppe/go-internal v1.13.1/go.mod h1:uMEvuHeurkdAXX61udpOXGD/AzZDWNMNyH2VO9fmH0o=
224238
github.com/rs/cors v1.7.0 h1:+88SsELBHx5r+hZ8TCkggzSstaWNbDvThkVK8H6f9ik=
@@ -231,8 +245,13 @@ github.com/russross/blackfriday/v2 v2.1.0/go.mod h1:+Rmxgy9KzJVeS9/2gXHxylqXiyQD
231245
github.com/shirou/gopsutil v3.21.4-0.20210419000835-c7a38de76ee5+incompatible h1:Bn1aCHHRnjv4Bl16T8rcaFjYSrGrIZvpiGO6P3Q4GpU=
232246
github.com/shirou/gopsutil v3.21.4-0.20210419000835-c7a38de76ee5+incompatible/go.mod h1:5b4v6he4MtMOwMlS0TUMTu2PcXUg8+E1lC7eC3UO/RA=
233247
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
248+
github.com/stretchr/objx v0.4.0/go.mod h1:YvHI0jy2hoMjB+UWwv71VJQ9isScKT/TqJzVSSt89Yw=
249+
github.com/stretchr/objx v0.5.0/go.mod h1:Yh+to48EsGEfYuaHDzXPcE3xhTkx73EhmCGUpEOglKo=
234250
github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI=
235251
github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
252+
github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
253+
github.com/stretchr/testify v1.8.0/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO+kdMU+MU=
254+
github.com/stretchr/testify v1.8.2/go.mod h1:w2LPCIKwWwSfY2zedu0+kehJoqGctiVI29o6fzry7u4=
236255
github.com/stretchr/testify v1.10.0 h1:Xv5erBjTwe/5IxqUQTdXv5kgmIvbHo3QQyRwhJsOfJA=
237256
github.com/stretchr/testify v1.10.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY=
238257
github.com/supranational/blst v0.3.14 h1:xNMoHRJOTwMn63ip6qoWJ2Ymgvj7E2b9jY2FAwY+qRo=
@@ -281,8 +300,8 @@ go.opentelemetry.io/otel/trace v1.34.0 h1:+ouXS2V8Rd4hp4580a8q23bg0azF2nI8cqLYnC
281300
go.opentelemetry.io/otel/trace v1.34.0/go.mod h1:Svm7lSjQD7kG7KJ/MUHPVXSDGz2OX4h0M2jHBhmSfRE=
282301
go.opentelemetry.io/proto/otlp v1.5.0 h1:xJvq7gMzB31/d406fB8U5CBdyQGw4P399D1aQWU/3i4=
283302
go.opentelemetry.io/proto/otlp v1.5.0/go.mod h1:keN8WnHxOy8PG0rQZjJJ5A2ebUoafqWp0eVQ4yIXvJ4=
284-
go.uber.org/atomic v1.7.0 h1:ADUqmZGgLDDfbSL9ZmPxKTybcoEYHgpYfELNoN+7hsw=
285-
go.uber.org/atomic v1.7.0/go.mod h1:fEN4uk6kAWBTFdckzkM89CLk9XfWZrxpCo0nPH17wJc=
303+
go.uber.org/atomic v1.9.0 h1:ECmE8Bn/WFTYwEW/bpKD3M8VtR/zQVbavAoalC1PYyE=
304+
go.uber.org/atomic v1.9.0/go.mod h1:fEN4uk6kAWBTFdckzkM89CLk9XfWZrxpCo0nPH17wJc=
286305
golang.org/x/crypto v0.32.0 h1:euUpcYgM8WcP71gNpTqQCn6rC2t6ULUPiOzfWaXVVfc=
287306
golang.org/x/crypto v0.32.0/go.mod h1:ZnnJkOaASj8g0AjIduWNlq2NRxL0PlBrbKVyZ6V/Ugc=
288307
golang.org/x/exp v0.0.0-20231110203233-9a3e6036ecaa h1:FRnLl4eNAQl8hwxVVC17teOw8kdjVDVAiFMtgUdTSRQ=
@@ -314,6 +333,9 @@ google.golang.org/grpc v1.69.4/go.mod h1:vyjdE6jLBI76dgpDojsFGNaHlxdjXN9ghpnd2o7
314333
google.golang.org/protobuf v1.36.3 h1:82DV7MYdb8anAVi3qge1wSnMDrnKK7ebr+I0hHRN1BU=
315334
google.golang.org/protobuf v1.36.3/go.mod h1:9fA7Ob0pmnwhb644+1+CVWFRbNajQ6iRojtC/QF5bRE=
316335
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
336+
gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
337+
gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c/go.mod h1:JHkPIbrfpd72SG/EVd6muEfDQjcINNoR0C8j2r3qZ4Q=
338+
gopkg.in/errgo.v2 v2.1.0/go.mod h1:hNsd1EY+bozCKY1Ytp96fpM3vjJbqLJn88ws8XvfDNI=
317339
gopkg.in/natefinch/lumberjack.v2 v2.2.1 h1:bBRl1b0OH9s/DuPhuXpNl+VtCaJXFZ5/uEFST95x9zc=
318340
gopkg.in/natefinch/lumberjack.v2 v2.2.1/go.mod h1:YD8tP3GAjkrDg1eZH7EGmyESg/lsYskCTPBJVb9jqSc=
319341
gopkg.in/yaml.v2 v2.4.0 h1:D8xgwECY7CYvx+Y2n4sBz93Jn9JRvxdiyyo8CTfuKaY=

‎pkg/db/create_batch_yeild_metrics.go

+233
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,233 @@
1+
package db
2+
3+
import (
4+
"context"
5+
"encoding/json"
6+
"fmt"
7+
"strings"
8+
9+
"github.com/jackc/pgx/v5/pgtype"
10+
"github.com/zuni-lab/yexus-api/pkg/utils"
11+
)
12+
13+
type YieldMetricData struct {
14+
Pool string `json:"pool"`
15+
Chain string `json:"chain"`
16+
Project string `json:"project"`
17+
Symbol string `json:"symbol"`
18+
TvlUsd float64 `json:"tvlUsd"`
19+
ApyBase float64 `json:"apyBase"`
20+
ApyReward float64 `json:"apyReward"`
21+
Apy float64 `json:"apy"`
22+
RewardTokens []string `json:"rewardTokens"`
23+
ApyPct1d float64 `json:"apyPct1D"`
24+
ApyPct7d float64 `json:"apyPct7D"`
25+
ApyPct30d float64 `json:"apyPct30D"`
26+
Stablecoin bool `json:"stablecoin"`
27+
IlRisk string `json:"ilRisk"`
28+
Exposure string `json:"exposure"`
29+
Predictions json.RawMessage `json:"predictions"`
30+
PoolMeta string `json:"poolMeta"`
31+
UnderlyingTokens []string `json:"underlyingTokens"`
32+
Il7d float64 `json:"il7d"`
33+
ApyBase7d float64 `json:"apyBase7d"`
34+
ApyMean30d float64 `json:"apyMean30d"`
35+
VolumeUsd1d float64 `json:"volumeUsd1d"`
36+
VolumeUsd7d float64 `json:"volumeUsd7d"`
37+
ApyBaseInception float64 `json:"apyBaseInception"`
38+
}
39+
40+
type CreateBatchYieldMetricsTxResult struct {
41+
RowsAffected int64
42+
}
43+
44+
func (store *SqlStore) CreateBatchYieldMetricsTx(ctx context.Context, yieldMetrics []*YieldMetricData) (CreateBatchYieldMetricsTxResult, error) {
45+
var result CreateBatchYieldMetricsTxResult
46+
47+
// Calculate max records per batch (65535 / 24 parameters per record)
48+
const maxParamsPerQuery = 65535
49+
const paramsPerRecord = 24
50+
const batchSize = maxParamsPerQuery / paramsPerRecord
51+
52+
// Process in batches
53+
for i := 0; i < len(yieldMetrics); i += batchSize {
54+
end := i + batchSize
55+
if end > len(yieldMetrics) {
56+
end = len(yieldMetrics)
57+
}
58+
59+
batchMetrics := yieldMetrics[i:end]
60+
61+
conn, err := store.connPool.Acquire(ctx)
62+
if err != nil {
63+
return result, fmt.Errorf("acquire connection: %w", err)
64+
}
65+
66+
// Build values portion of query for this batch
67+
valueStrings := make([]string, 0, len(batchMetrics))
68+
valueArgs := make([]interface{}, 0, len(batchMetrics)*paramsPerRecord)
69+
70+
for j, metric := range batchMetrics {
71+
tvlUsd, err := scanNumeric(metric.TvlUsd, "tvl_usd")
72+
if err != nil {
73+
conn.Release()
74+
return result, fmt.Errorf("failed to scan tvl_usd: %w", err)
75+
}
76+
77+
apyBase, err := scanNumeric(metric.ApyBase, "apy_base")
78+
if err != nil {
79+
conn.Release()
80+
return result, fmt.Errorf("failed to scan apy_base: %w", err)
81+
}
82+
83+
apyReward, err := scanNumeric(metric.ApyReward, "apy_reward")
84+
if err != nil {
85+
conn.Release()
86+
return result, fmt.Errorf("failed to scan apy_reward: %w", err)
87+
}
88+
89+
apy, err := scanNumeric(metric.Apy, "apy")
90+
if err != nil {
91+
conn.Release()
92+
return result, fmt.Errorf("failed to scan apy: %w", err)
93+
}
94+
95+
apyPct1d, err := scanNumeric(metric.ApyPct1d, "apy_pct_1d")
96+
if err != nil {
97+
conn.Release()
98+
return result, fmt.Errorf("failed to scan apy_pct_1d: %w", err)
99+
}
100+
101+
apyPct7d, err := scanNumeric(metric.ApyPct7d, "apy_pct_7d")
102+
if err != nil {
103+
conn.Release()
104+
return result, fmt.Errorf("failed to scan apy_pct_7d: %w", err)
105+
}
106+
107+
apyPct30d, err := scanNumeric(metric.ApyPct30d, "apy_pct_30d")
108+
if err != nil {
109+
conn.Release()
110+
return result, fmt.Errorf("failed to scan apy_pct_30d: %w", err)
111+
}
112+
113+
stablecoin, err := scanBool(metric.Stablecoin, "stablecoin")
114+
if err != nil {
115+
conn.Release()
116+
return result, fmt.Errorf("failed to scan stablecoin: %w", err)
117+
}
118+
119+
ilRisk, err := scanString(metric.IlRisk, "il_risk")
120+
if err != nil {
121+
conn.Release()
122+
return result, fmt.Errorf("failed to scan il_risk: %w", err)
123+
}
124+
125+
exposure, err := scanString(metric.Exposure, "exposure")
126+
if err != nil {
127+
conn.Release()
128+
return result, fmt.Errorf("failed to scan exposure: %w", err)
129+
}
130+
131+
poolMeta, err := scanString(metric.PoolMeta, "pool_meta")
132+
if err != nil {
133+
conn.Release()
134+
return result, fmt.Errorf("failed to scan pool_meta: %w", err)
135+
}
136+
137+
il7d, err := scanNumeric(metric.Il7d, "il_7d")
138+
if err != nil {
139+
conn.Release()
140+
return result, fmt.Errorf("failed to scan il_7d: %w", err)
141+
}
142+
143+
apyBase7d, err := scanNumeric(metric.ApyBase7d, "apy_base_7d")
144+
if err != nil {
145+
conn.Release()
146+
return result, fmt.Errorf("failed to scan apy_base_7d: %w", err)
147+
}
148+
149+
apyMean30d, err := scanNumeric(metric.ApyMean30d, "apy_mean_30d")
150+
if err != nil {
151+
conn.Release()
152+
return result, fmt.Errorf("failed to scan apy_mean_30d: %w", err)
153+
}
154+
155+
volumeUsd1d, err := scanNumeric(metric.VolumeUsd1d, "volume_usd_1d")
156+
if err != nil {
157+
conn.Release()
158+
return result, fmt.Errorf("failed to scan volume_usd_1d: %w", err)
159+
}
160+
161+
volumeUsd7d, err := scanNumeric(metric.VolumeUsd7d, "volume_usd_7d")
162+
if err != nil {
163+
conn.Release()
164+
return result, fmt.Errorf("failed to scan volume_usd_7d: %w", err)
165+
}
166+
167+
apyBaseInception, err := scanNumeric(metric.ApyBaseInception, "apy_base_inception")
168+
if err != nil {
169+
conn.Release()
170+
return result, fmt.Errorf("failed to scan apy_base_inception: %w", err)
171+
}
172+
173+
// Create placeholders for this row
174+
offset := j * paramsPerRecord
175+
valueStrings = append(valueStrings, fmt.Sprintf("($%d, $%d, $%d, $%d, $%d, $%d, $%d, $%d, $%d, $%d, $%d, $%d, $%d, $%d, $%d, $%d, $%d, $%d, $%d, $%d, $%d, $%d, $%d, $%d)",
176+
offset+1, offset+2, offset+3, offset+4, offset+5, offset+6, offset+7, offset+8, offset+9, offset+10,
177+
offset+11, offset+12, offset+13, offset+14, offset+15, offset+16, offset+17, offset+18, offset+19, offset+20,
178+
offset+21, offset+22, offset+23, offset+24))
179+
180+
valueArgs = append(valueArgs,
181+
metric.Pool, metric.Chain, metric.Project, metric.Symbol,
182+
tvlUsd, apyBase, apyReward, apy,
183+
metric.RewardTokens, apyPct1d, apyPct7d, apyPct30d,
184+
stablecoin, ilRisk, exposure, metric.Predictions,
185+
poolMeta, metric.UnderlyingTokens, il7d, apyBase7d,
186+
apyMean30d, volumeUsd1d, volumeUsd7d, apyBaseInception)
187+
}
188+
189+
query := fmt.Sprintf(`
190+
INSERT INTO yield_metrics (
191+
pool, chain, project, symbol, tvl_usd, apy_base, apy_reward, apy,
192+
reward_tokens, apy_pct_1d, apy_pct_7d, apy_pct_30d, stablecoin,
193+
il_risk, exposure, predictions, pool_meta, underlying_tokens,
194+
il_7d, apy_base_7d, apy_mean_30d, volume_usd_1d, volume_usd_7d,
195+
apy_base_inception
196+
) VALUES %s`, strings.Join(valueStrings, ","))
197+
198+
tag, err := conn.Exec(ctx, query, valueArgs...)
199+
conn.Release()
200+
201+
if err != nil {
202+
return result, fmt.Errorf("executing bulk insert: %w", err)
203+
}
204+
205+
result.RowsAffected += tag.RowsAffected()
206+
}
207+
208+
return result, nil
209+
}
210+
211+
func scanNumeric(value float64, field string) (*pgtype.Numeric, error) {
212+
val, err := utils.ScanNumericValue(fmt.Sprintf("%.18f", value))
213+
if err != nil {
214+
return nil, fmt.Errorf("failed to scan %s: %w", field, err)
215+
}
216+
return val, nil
217+
}
218+
219+
func scanString(value string, field string) (*pgtype.Text, error) {
220+
val, err := utils.ScanStringValue(value)
221+
if err != nil {
222+
return nil, fmt.Errorf("failed to scan %s: %w", field, err)
223+
}
224+
return val, nil
225+
}
226+
227+
func scanBool(value bool, field string) (*pgtype.Bool, error) {
228+
val, err := utils.ScanBoolValue(fmt.Sprintf("%t", value))
229+
if err != nil {
230+
return nil, fmt.Errorf("failed to scan %s: %w", field, err)
231+
}
232+
return val, nil
233+
}

‎pkg/db/errors.go

+27
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,27 @@
1+
package db
2+
3+
import (
4+
"errors"
5+
6+
"github.com/jackc/pgx/v5"
7+
"github.com/jackc/pgx/v5/pgconn"
8+
)
9+
10+
const (
11+
ForeignKeyViolation = "23503"
12+
UniqueViolation = "23505"
13+
)
14+
15+
var ErrRecordNotFound = pgx.ErrNoRows
16+
17+
var ErrUniqueViolation = &pgconn.PgError{
18+
Code: UniqueViolation,
19+
}
20+
21+
func ErrorCode(err error) string {
22+
var pgErr *pgconn.PgError
23+
if errors.As(err, &pgErr) {
24+
return pgErr.Code
25+
}
26+
return ""
27+
}

‎pkg/db/migration/000001_init.up.sql

+45-12
Original file line numberDiff line numberDiff line change
@@ -98,17 +98,6 @@ INSERT INTO pools (id, token0_id, token1_id) VALUES
9898
LOWER('0x2791Bca1f2de4661ED88A30C99A7a9449Aa84174'),
9999
LOWER('0x7ceB23fD6bC0adD59E62ac25578270cFf1b9f619'));
100100

101-
-- -- For tracking processed blocks
102-
CREATE TABLE block_processing_state (
103-
pool_address VARCHAR(42) NOT NULL,
104-
last_processed_block BIGINT NOT NULL,
105-
is_backfill BOOLEAN NOT NULL,
106-
created_at TIMESTAMP NOT NULL DEFAULT NOW(),
107-
updated_at TIMESTAMP NOT NULL DEFAULT NOW(),
108-
PRIMARY KEY (pool_address, is_backfill)
109-
);
110-
111-
112101
--- For user chat ---
113102
CREATE TABLE IF NOT EXISTS chat_threads (
114103
id BIGSERIAL PRIMARY KEY,
@@ -120,4 +109,48 @@ CREATE TABLE IF NOT EXISTS chat_threads (
120109
is_deleted BOOLEAN NOT NULL DEFAULT FALSE
121110
);
122111

123-
CREATE UNIQUE INDEX IF NOT EXISTS idx_chat_threads_thread_id_user_address ON chat_threads(thread_id, user_address) WHERE NOT is_deleted;
112+
CREATE UNIQUE INDEX IF NOT EXISTS idx_chat_threads_thread_id_user_address ON chat_threads(thread_id, user_address) WHERE NOT is_deleted;
113+
114+
CREATE TABLE IF NOT EXISTS yield_metrics (
115+
id BIGSERIAL PRIMARY KEY,
116+
pool TEXT NOT NULL,
117+
chain TEXT NOT NULL,
118+
project TEXT NOT NULL,
119+
symbol TEXT NOT NULL,
120+
tvl_usd NUMERIC(20,2),
121+
apy_base NUMERIC(10,3),
122+
apy_reward NUMERIC(10,3),
123+
apy NUMERIC(10,3),
124+
reward_tokens TEXT[],
125+
apy_pct_1d NUMERIC(10,3),
126+
apy_pct_7d NUMERIC(10,3),
127+
apy_pct_30d NUMERIC(10,3),
128+
stablecoin BOOLEAN DEFAULT false,
129+
il_risk TEXT,
130+
exposure TEXT,
131+
predictions JSONB,
132+
pool_meta TEXT,
133+
mu NUMERIC(10,5),
134+
sigma NUMERIC(10,5),
135+
count INTEGER,
136+
outlier BOOLEAN DEFAULT false,
137+
underlying_tokens TEXT[],
138+
il_7d NUMERIC(10,3),
139+
apy_base_7d NUMERIC(10,3),
140+
apy_mean_30d NUMERIC(10,3),
141+
volume_usd_1d NUMERIC(20,2),
142+
volume_usd_7d NUMERIC(20,2),
143+
apy_base_inception NUMERIC(10,3),
144+
created_at TIMESTAMP WITH TIME ZONE DEFAULT CURRENT_TIMESTAMP,
145+
updated_at TIMESTAMP WITH TIME ZONE DEFAULT CURRENT_TIMESTAMP
146+
);
147+
148+
DROP INDEX IF EXISTS idx_yield_metrics_pool_created_at;
149+
150+
-- Create unique index
151+
CREATE UNIQUE INDEX idx_yield_metrics_pool_created_at ON yield_metrics(pool, created_at);
152+
153+
-- Create index on frequently queried columns
154+
CREATE INDEX idx_yield_metrics_pool ON yield_metrics(pool);
155+
CREATE INDEX idx_yield_metrics_chain ON yield_metrics(chain);
156+
CREATE INDEX idx_yield_metrics_project ON yield_metrics(project);

‎pkg/db/models.go

+34-8
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

‎pkg/db/querier.go

+3
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

‎pkg/db/query/yield_metrics.sql

+30
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,30 @@
1+
-- name: GetYieldMetrics :many
2+
SELECT * FROM yield_metrics
3+
WHERE pool = $1
4+
ORDER BY created_at DESC
5+
LIMIT $2 OFFSET $3;
6+
7+
-- name: GetLatestYieldMetric :one
8+
SELECT * FROM yield_metrics
9+
ORDER BY created_at DESC
10+
LIMIT 1;
11+
12+
-- name: GetYieldMetricsForChat :many
13+
WITH latest_metrics AS (
14+
SELECT DISTINCT ON (pool) *
15+
FROM yield_metrics
16+
WHERE project = ANY($1::text[])
17+
ORDER BY pool, created_at DESC
18+
)
19+
SELECT
20+
pool,
21+
chain,
22+
project,
23+
symbol,
24+
apy,
25+
apy_base as "apyBase",
26+
apy_reward as "apyReward",
27+
reward_tokens as "rewardTokens",
28+
underlying_tokens as "underlyingTokens"
29+
FROM latest_metrics
30+
ORDER BY symbol ASC;

‎pkg/db/tx_create_pool.go

-122
This file was deleted.

‎pkg/db/yield_metrics.sql.go

+185
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

‎pkg/utils/converter.go

-71
This file was deleted.

‎pkg/utils/converter_test.go

-87
This file was deleted.

‎pkg/utils/scan.go

+92
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
package utils
22

33
import (
4+
"fmt"
45
"math/big"
56

67
"github.com/jackc/pgx/v5/pgtype"
@@ -17,3 +18,94 @@ func BigFloatToNumeric(f *big.Float) (pgtype.Numeric, error) {
1718
}
1819
return numeric, nil
1920
}
21+
22+
func ConvertNumericToDecimals(num *pgtype.Numeric, decimals uint8) (*big.Int, error) {
23+
if !num.Valid {
24+
return nil, fmt.Errorf("invalid numeric value")
25+
}
26+
if num.NaN {
27+
return nil, fmt.Errorf("NaN value not supported")
28+
}
29+
if num.InfinityModifier != 0 {
30+
return nil, fmt.Errorf("infinity not supported")
31+
}
32+
33+
result := new(big.Int).Set(num.Int)
34+
35+
expDiff := int64(num.Exp) + int64(decimals)
36+
37+
if expDiff == 0 {
38+
return result, nil
39+
}
40+
41+
if expDiff < 0 {
42+
divisor := new(big.Int).Exp(
43+
big.NewInt(10),
44+
big.NewInt(-expDiff),
45+
nil,
46+
)
47+
return result.Div(result, divisor), nil
48+
}
49+
50+
// expDiff > 0
51+
multiplier := new(big.Int).Exp(
52+
big.NewInt(10),
53+
big.NewInt(expDiff),
54+
nil,
55+
)
56+
result.Mul(result, multiplier)
57+
58+
return result, nil
59+
}
60+
61+
func ConvertDecimalsToWei(num *pgtype.Numeric) (*big.Int, error) {
62+
return ConvertNumericToDecimals(num, 18)
63+
}
64+
65+
func ConvertFloat8ToDecimals(num pgtype.Float8, decimals uint64) (*big.Int, error) {
66+
if !num.Valid {
67+
return nil, fmt.Errorf("invalid float8 value")
68+
}
69+
70+
bigFloat := new(big.Float).SetFloat64(num.Float64)
71+
72+
multiplier := new(big.Int).Exp(
73+
big.NewInt(10),
74+
big.NewInt(int64(decimals)),
75+
nil,
76+
)
77+
78+
bigFloat.Mul(bigFloat, new(big.Float).SetInt(multiplier))
79+
80+
result := new(big.Int)
81+
bigFloat.Int(result)
82+
return result, nil
83+
}
84+
85+
// ScanValue is a generic function that converts a string value to a pgtype type
86+
func ScanNumericValue(value string) (*pgtype.Numeric, error) {
87+
var numeric pgtype.Numeric
88+
if err := numeric.Scan(value); err != nil {
89+
return nil, fmt.Errorf("failed to scan value %q: %w", value, err)
90+
}
91+
92+
return &numeric, nil
93+
}
94+
95+
func ScanStringValue(value string) (*pgtype.Text, error) {
96+
var text pgtype.Text
97+
if err := text.Scan(value); err != nil {
98+
return nil, fmt.Errorf("failed to scan value %q: %w", value, err)
99+
}
100+
101+
return &text, nil
102+
}
103+
104+
func ScanBoolValue(value string) (*pgtype.Bool, error) {
105+
var bool pgtype.Bool
106+
if err := bool.Scan(value); err != nil {
107+
return nil, fmt.Errorf("failed to scan value %q: %w", value, err)
108+
}
109+
110+
return &bool, nil
111+
}

‎pkg/utils/scan_test.go

+79
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ import (
66
"math/big"
77
"testing"
88

9+
"github.com/jackc/pgx/v5/pgtype"
910
"github.com/zuni-lab/yexus-api/pkg/utils"
1011
)
1112

@@ -46,3 +47,81 @@ func TestFloatTextLarge(t *testing.T) {
4647

4748
fmt.Println(priceFloat.Text('f', -1))
4849
}
50+
51+
func TestNumericConversion(t *testing.T) {
52+
testCases := []struct {
53+
name string
54+
input pgtype.Numeric
55+
decimals uint8
56+
expected string
57+
}{
58+
{
59+
name: "USDC",
60+
input: pgtype.Numeric{
61+
Int: func() *big.Int {
62+
n, _ := new(big.Int).SetString("19005000000000000000000", 10)
63+
return n
64+
}(),
65+
Exp: -19,
66+
Valid: true,
67+
},
68+
decimals: 18,
69+
expected: "1900500000000000000000", // 1900.5 * 10^18
70+
},
71+
{
72+
name: "WBTC",
73+
input: pgtype.Numeric{
74+
Int: func() *big.Int {
75+
n, _ := new(big.Int).SetString("19005000000000000000000", 10)
76+
return n
77+
}(),
78+
Exp: -17,
79+
Valid: true,
80+
},
81+
decimals: 18,
82+
expected: "190050000000000000000000",
83+
},
84+
{
85+
name: "WSOL",
86+
input: pgtype.Numeric{
87+
Int: func() *big.Int {
88+
n, _ := new(big.Int).SetString("5", 10)
89+
return n
90+
}(),
91+
Exp: 5,
92+
Valid: true,
93+
},
94+
decimals: 8,
95+
expected: "50000000000000", // 5*10^13
96+
},
97+
}
98+
99+
for _, tc := range testCases {
100+
t.Run(tc.name, func(t *testing.T) {
101+
result, err := utils.ConvertNumericToDecimals(&tc.input, tc.decimals)
102+
if err != nil {
103+
t.Fatalf("ConvertNumericToDecimals failed: %v", err)
104+
}
105+
106+
t.Logf("result: %+v\n", result)
107+
108+
if result.String() != tc.expected {
109+
t.Errorf("Expected %s, got %s", tc.expected, result.String())
110+
}
111+
})
112+
}
113+
}
114+
115+
func TestFloatToWei(t *testing.T) {
116+
float := pgtype.Float8{
117+
Float64: 0.1,
118+
Valid: true,
119+
}
120+
121+
result, err := utils.ConvertFloat8ToDecimals(float, 6)
122+
if err != nil {
123+
t.Fatalf("ConvertFloat8ToWei failed: %v", err)
124+
}
125+
126+
t.Logf("result: %+v\n", result)
127+
}

‎pkg/worker/fetch_yield_metrics.go

+48
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,48 @@
1+
package worker
2+
3+
import (
4+
"context"
5+
"encoding/json"
6+
"fmt"
7+
"net/http"
8+
9+
"github.com/rs/zerolog/log"
10+
"github.com/zuni-lab/yexus-api/config"
11+
"github.com/zuni-lab/yexus-api/pkg/db"
12+
)
13+
14+
type YieldMetricsResponse struct {
15+
Data []*db.YieldMetricData `json:"data"`
16+
}
17+
18+
var httpClient = &http.Client{}
19+
20+
func FetchAndUpdateYieldMetrics(ctx context.Context) error {
21+
req, err := http.NewRequestWithContext(ctx, http.MethodGet, config.Env.YieldMetricsSource, nil)
22+
if err != nil {
23+
return fmt.Errorf("failed to create request: %w", err)
24+
}
25+
26+
// Use cached client to make request
27+
resp, err := httpClient.Do(req)
28+
if err != nil {
29+
return fmt.Errorf("failed to fetch yield metrics: %w", err)
30+
}
31+
defer resp.Body.Close()
32+
33+
var yieldMetrics YieldMetricsResponse
34+
if err := json.NewDecoder(resp.Body).Decode(&yieldMetrics); err != nil {
35+
return fmt.Errorf("failed to decode yield metrics: %w", err)
36+
}
37+
38+
_, err = db.DB.CreateBatchYieldMetricsTx(ctx, yieldMetrics.Data)
39+
if err != nil {
40+
return fmt.Errorf("failed to create batch yield metrics: %w", err)
41+
}
42+
43+
log.Info().
44+
Int("count", len(yieldMetrics.Data)).
45+
Msg("Successfully updated yield metrics")
46+
47+
return nil
48+
}

‎pkg/worker/init.go

+103
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,103 @@
1+
package worker
2+
3+
import (
4+
"context"
5+
"errors"
6+
"sync"
7+
"time"
8+
9+
"github.com/go-co-op/gocron"
10+
"github.com/rs/zerolog"
11+
"github.com/rs/zerolog/log"
12+
"github.com/zuni-lab/yexus-api/pkg/db"
13+
"github.com/zuni-lab/yexus-api/pkg/utils"
14+
)
15+
16+
type Scheduler struct {
17+
raw *gocron.Scheduler
18+
atUtc time.Time
19+
jobs []func()
20+
log *zerolog.Event
21+
}
22+
23+
func NewScheduler(at time.Time, name ...string) *Scheduler {
24+
nameStr := "Scheduler"
25+
if len(name) > 0 {
26+
nameStr = name[0]
27+
}
28+
29+
atUtc := at.UTC()
30+
31+
log.Info().Msgf("Scheduler %s at %s", nameStr, atUtc.String())
32+
33+
return &Scheduler{
34+
raw: gocron.NewScheduler(time.UTC),
35+
atUtc: atUtc,
36+
log: log.Info().Func(func(e *zerolog.Event) {
37+
e.Any(utils.YellowMsg("scheduler"), nameStr).Any(utils.BlueMsg("at"), atUtc.String())
38+
}),
39+
}
40+
}
41+
42+
func (s *Scheduler) Start() {
43+
for _, job := range s.jobs {
44+
s.raw.Every(1).Day().At(s.atUtc).Do(job)
45+
}
46+
s.raw.StartAsync()
47+
}
48+
49+
func (s *Scheduler) Shutdown() {
50+
s.raw.Stop()
51+
}
52+
53+
func (s *Scheduler) AddJob(job func()) {
54+
// Wrap the original job with panic recovery
55+
wrappedJob := func() {
56+
defer func() {
57+
if r := recover(); r != nil {
58+
log.Error().
59+
Interface("panic", r).
60+
Msg("Recovered from panic in scheduled job")
61+
}
62+
}()
63+
job()
64+
}
65+
s.jobs = append(s.jobs, wrappedJob)
66+
}
67+
68+
func (s *Scheduler) Len() int {
69+
return len(s.jobs)
70+
}
71+
72+
// Run all the jobs at once
73+
func Init() {
74+
var wg sync.Once
75+
wg.Do(func() {
76+
ctx := context.Background()
77+
78+
var shouldFetch = false
79+
80+
latestYieldMetric, err := db.DB.GetLatestYieldMetric(ctx)
81+
if err != nil {
82+
if errors.Is(err, db.ErrRecordNotFound) {
83+
shouldFetch = true
84+
} else {
85+
log.Error().Msgf("failed to get latest yield metric: %s", err)
86+
return
87+
}
88+
}
89+
90+
if latestYieldMetric.CreatedAt.Time.Add(time.Hour * 24).Before(time.Now()) {
91+
log.Info().Msg("Data is more than 24 hours old, fetching and updating yield metrics")
92+
shouldFetch = true
93+
}
94+
95+
if !shouldFetch {
96+
log.Info().Msg("No need to fetch and update yield metrics")
97+
return
98+
}
99+
100+
FetchAndUpdateYieldMetrics(ctx)
101+
102+
})
103+
}

0 commit comments

Comments
 (0)
Please sign in to comment.