Skip to content

Commit c81be71

Browse files
committed
added base implementation
1 parent 7f9c77c commit c81be71

File tree

8 files changed

+602
-0
lines changed

8 files changed

+602
-0
lines changed

app/lts/main.go

Lines changed: 169 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,169 @@
1+
package main
2+
3+
import (
4+
"bytes"
5+
"context"
6+
"encoding/json"
7+
"fmt"
8+
"github.com/ardanlabs/conf"
9+
"github.com/elastic/go-elasticsearch/v8"
10+
"github.com/pkg/errors"
11+
"github.com/qubic/go-archiver/lts/tx"
12+
"github.com/qubic/go-archiver/protobuff"
13+
"go.uber.org/zap"
14+
"go.uber.org/zap/zapcore"
15+
"google.golang.org/grpc"
16+
"google.golang.org/grpc/credentials/insecure"
17+
"log"
18+
"net/http"
19+
"os"
20+
"time"
21+
)
22+
23+
const prefix = "QUBIC_ARCHIVER"
24+
25+
func main() {
26+
if err := run(); err != nil {
27+
log.Fatalf("main: exited with error: %s", err.Error())
28+
}
29+
}
30+
31+
func run() error {
32+
config := zap.NewProductionConfig()
33+
// this is just for sugar, to display a readable date instead of an epoch time
34+
config.EncoderConfig.EncodeTime = zapcore.TimeEncoderOfLayout(time.DateTime)
35+
36+
logger, err := config.Build()
37+
if err != nil {
38+
fmt.Errorf("creating logger: %v", err)
39+
}
40+
defer logger.Sync()
41+
sLogger := logger.Sugar()
42+
43+
var cfg struct {
44+
InternalStoreFolder string `conf:"default:store"`
45+
ArchiverHost string `conf:"default:127.0.0.1:6001"`
46+
ArchiverReadTimeout time.Duration `conf:"default:10s"`
47+
ElasticSearchAddress string `conf:"default:http://127.0.0.1:9200"`
48+
ElasticSearchWriteTimeout time.Duration `conf:"default:5m"`
49+
BatchSize int `conf:"default:10000"`
50+
NrWorkers int `conf:"default:20"`
51+
}
52+
53+
if err := conf.Parse(os.Args[1:], prefix, &cfg); err != nil {
54+
switch err {
55+
case conf.ErrHelpWanted:
56+
usage, err := conf.Usage(prefix, &cfg)
57+
if err != nil {
58+
return fmt.Errorf("generating config usage: %v", err)
59+
}
60+
fmt.Println(usage)
61+
return nil
62+
case conf.ErrVersionWanted:
63+
version, err := conf.VersionString(prefix, &cfg)
64+
if err != nil {
65+
return fmt.Errorf("generating config version: %v", err)
66+
}
67+
fmt.Println(version)
68+
return nil
69+
}
70+
return fmt.Errorf("parsing config: %v", err)
71+
}
72+
73+
out, err := conf.String(&cfg)
74+
if err != nil {
75+
return fmt.Errorf("generating config for output: %v", err)
76+
}
77+
log.Printf("main: Config :\n%v\n", out)
78+
79+
store, err := tx.NewProcessorStore(cfg.InternalStoreFolder)
80+
if err != nil {
81+
return fmt.Errorf("creating processor store: %v", err)
82+
}
83+
84+
archiverConn, err := grpc.NewClient(cfg.ArchiverHost, grpc.WithTransportCredentials(insecure.NewCredentials()))
85+
if err != nil {
86+
return fmt.Errorf("creating archiver connection: %v", err)
87+
}
88+
89+
archiverClient := protobuff.NewArchiveServiceClient(archiverConn)
90+
91+
esInserter, err := NewElasticSearchTxsInserter(cfg.ElasticSearchAddress, "transactions", cfg.ElasticSearchWriteTimeout)
92+
if err != nil {
93+
return fmt.Errorf("creating elasticsearch tx inserter: %v", err)
94+
}
95+
96+
proc, err := tx.NewProcessor(store, archiverClient, esInserter, cfg.BatchSize, sLogger, cfg.ArchiverReadTimeout, cfg.ElasticSearchWriteTimeout)
97+
if err != nil {
98+
return fmt.Errorf("creating processor: %v", err)
99+
}
100+
101+
err = proc.Start(cfg.NrWorkers)
102+
if err != nil {
103+
return fmt.Errorf("starting processor: %v", err)
104+
}
105+
106+
return nil
107+
}
108+
109+
type ElasticSearchTxsInserter struct {
110+
port string
111+
index string
112+
esClient *elasticsearch.Client
113+
}
114+
115+
func NewElasticSearchTxsInserter(address, index string, timeout time.Duration) (*ElasticSearchTxsInserter, error) {
116+
cfg := elasticsearch.Config{
117+
Addresses: []string{address},
118+
Transport: &http.Transport{
119+
MaxIdleConnsPerHost: 10,
120+
ResponseHeaderTimeout: timeout,
121+
},
122+
}
123+
124+
esClient, err := elasticsearch.NewClient(cfg)
125+
if err != nil {
126+
return nil, fmt.Errorf("creating elasticsearch client: %v", err)
127+
}
128+
129+
return &ElasticSearchTxsInserter{
130+
index: index,
131+
esClient: esClient,
132+
}, nil
133+
}
134+
135+
func (es *ElasticSearchTxsInserter) PushSingleTx(ctx context.Context, tx tx.Tx) error {
136+
return errors.New("not implemented")
137+
}
138+
139+
func (es *ElasticSearchTxsInserter) PushMultipleTx(ctx context.Context, txs []tx.Tx) error {
140+
var buf bytes.Buffer
141+
142+
for _, tx := range txs {
143+
// Metadata line for each document
144+
meta := []byte(fmt.Sprintf(`{ "index": { "_index": "%s", "_id": "%s" } }%s`, es.index, tx.TxID, "\n"))
145+
buf.Write(meta)
146+
147+
// Serialize the transaction to JSON
148+
data, err := json.Marshal(tx)
149+
if err != nil {
150+
return fmt.Errorf("error serializing transaction: %w", err)
151+
}
152+
buf.Write(data)
153+
buf.Write([]byte("\n")) // Add a newline between documents
154+
}
155+
156+
// Send the bulk request
157+
res, err := es.esClient.Bulk(bytes.NewReader(buf.Bytes()), es.esClient.Bulk.WithRefresh("true"))
158+
if err != nil {
159+
return fmt.Errorf("bulk request failed: %w", err)
160+
}
161+
defer res.Body.Close()
162+
163+
// Check response for errors
164+
if res.IsError() {
165+
return fmt.Errorf("bulk request error: %s", res.String())
166+
}
167+
168+
return nil
169+
}

go.mod

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ require (
66
github.com/ardanlabs/conf v1.5.0
77
github.com/cloudflare/circl v1.5.0
88
github.com/cockroachdb/pebble v1.1.2
9+
github.com/elastic/go-elasticsearch/v8 v8.17.1
910
github.com/google/go-cmp v0.6.0
1011
github.com/grpc-ecosystem/grpc-gateway/v2 v2.22.0
1112
github.com/pkg/errors v0.9.1
@@ -30,7 +31,10 @@ require (
3031
github.com/cockroachdb/tokenbucket v0.0.0-20230807174530-cc333fc44b06 // indirect
3132
github.com/consensys/gnark-crypto v0.14.0 // indirect
3233
github.com/davecgh/go-spew v1.1.1 // indirect
34+
github.com/elastic/elastic-transport-go/v8 v8.6.1 // indirect
3335
github.com/getsentry/sentry-go v0.27.0 // indirect
36+
github.com/go-logr/logr v1.4.2 // indirect
37+
github.com/go-logr/stdr v1.2.2 // indirect
3438
github.com/gogo/protobuf v1.3.2 // indirect
3539
github.com/golang/protobuf v1.5.3 // indirect
3640
github.com/golang/snappy v0.0.4 // indirect
@@ -47,6 +51,9 @@ require (
4751
github.com/rogpeppe/go-internal v1.13.1 // indirect
4852
github.com/silenceper/pool v1.0.0 // indirect
4953
github.com/sirupsen/logrus v1.9.0 // indirect
54+
go.opentelemetry.io/otel v1.28.0 // indirect
55+
go.opentelemetry.io/otel/metric v1.28.0 // indirect
56+
go.opentelemetry.io/otel/trace v1.28.0 // indirect
5057
go.uber.org/multierr v1.10.0 // indirect
5158
golang.org/x/exp v0.0.0-20240325151524-a685a6edb6d8 // indirect
5259
golang.org/x/net v0.28.0 // indirect

go.sum

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -82,6 +82,10 @@ github.com/creack/pty v1.1.9/go.mod h1:oKZEueFk5CKHvIhNR5MUki03XCEU+Q6VDXinZuGJ3
8282
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
8383
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
8484
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
85+
github.com/elastic/elastic-transport-go/v8 v8.6.1 h1:h2jQRqH6eLGiBSN4eZbQnJLtL4bC5b4lfVFRjw2R4e4=
86+
github.com/elastic/elastic-transport-go/v8 v8.6.1/go.mod h1:YLHer5cj0csTzNFXoNQ8qhtGY1GTvSqPnKWKaqQE3Hk=
87+
github.com/elastic/go-elasticsearch/v8 v8.17.1 h1:bOXChDoCMB4TIwwGqKd031U8OXssmWLT3UrAr9EGs3Q=
88+
github.com/elastic/go-elasticsearch/v8 v8.17.1/go.mod h1:MVJCtL+gJJ7x5jFeUmA20O7rvipX8GcQmo5iBcmaJn4=
8589
github.com/envoyproxy/go-control-plane v0.9.0/go.mod h1:YTl/9mNaCwkRvm6d1a2C3ymFceY/DCBVvsKhRF0iEA4=
8690
github.com/envoyproxy/go-control-plane v0.9.1-0.20191026205805-5f8ba28d4473/go.mod h1:YTl/9mNaCwkRvm6d1a2C3ymFceY/DCBVvsKhRF0iEA4=
8791
github.com/envoyproxy/go-control-plane v0.9.4/go.mod h1:6rpuAdCZL397s3pYoYcLgu1mIlRU8Am5FuJP05cCM98=
@@ -99,6 +103,11 @@ github.com/go-kit/log v0.1.0/go.mod h1:zbhenjAZHb184qTLMA9ZjW7ThYL0H2mk7Q6pNt4vb
99103
github.com/go-logfmt/logfmt v0.3.0/go.mod h1:Qt1PoO58o5twSAckw1HlFXLmHsOX5/0LbT9GBnD5lWE=
100104
github.com/go-logfmt/logfmt v0.4.0/go.mod h1:3RMwSq7FuexP4Kalkev3ejPJsZTpXXBr9+V4qmtdjCk=
101105
github.com/go-logfmt/logfmt v0.5.0/go.mod h1:wCYkCAKZfumFQihp8CzCvQ3paCTfi41vtzG1KdI/P7A=
106+
github.com/go-logr/logr v1.2.2/go.mod h1:jdQByPbusPIv2/zmleS9BjJVeZ6kBagPoEUsqbVz/1A=
107+
github.com/go-logr/logr v1.4.2 h1:6pFjapn8bFcIbiKo3XT4j/BhANplGihG6tvd+8rYgrY=
108+
github.com/go-logr/logr v1.4.2/go.mod h1:9T104GzyrTigFIr8wt5mBrctHMim0Nb2HLGrmQ40KvY=
109+
github.com/go-logr/stdr v1.2.2 h1:hSWxHoqTgW2S2qGc0LTAI563KZ5YKYRhT3MFKZMbjag=
110+
github.com/go-logr/stdr v1.2.2/go.mod h1:mMo/vtBO5dYbehREoey6XUKy/eSumjCCveDpRre4VKE=
102111
github.com/go-stack/stack v1.8.0/go.mod h1:v0f6uXyyMGvRgIKkXu+yp6POWl0qKG85gN/melR3HDY=
103112
github.com/gogo/protobuf v1.1.1/go.mod h1:r8qH/GZQm5c6nD/R0oafs1akxWv10x8SbQlK7atdtwQ=
104113
github.com/gogo/protobuf v1.3.2 h1:Ov1cvc58UF3b5XjBnZv7+opcTcQFZebYjWzi34vdm4Q=
@@ -266,6 +275,14 @@ go.opencensus.io v0.22.0/go.mod h1:+kGneAE2xo2IficOXnaByMWTGM9T73dGwxeWcUqIpI8=
266275
go.opencensus.io v0.22.2/go.mod h1:yxeiOL68Rb0Xd1ddK5vPZ/oVn4vY4Ynel7k9FzqtOIw=
267276
go.opencensus.io v0.22.3/go.mod h1:yxeiOL68Rb0Xd1ddK5vPZ/oVn4vY4Ynel7k9FzqtOIw=
268277
go.opencensus.io v0.22.4/go.mod h1:yxeiOL68Rb0Xd1ddK5vPZ/oVn4vY4Ynel7k9FzqtOIw=
278+
go.opentelemetry.io/otel v1.28.0 h1:/SqNcYk+idO0CxKEUOtKQClMK/MimZihKYMruSMViUo=
279+
go.opentelemetry.io/otel v1.28.0/go.mod h1:q68ijF8Fc8CnMHKyzqL6akLO46ePnjkgfIMIjUIX9z4=
280+
go.opentelemetry.io/otel/metric v1.28.0 h1:f0HGvSl1KRAU1DLgLGFjrwVyismPlnuU6JD6bOeuA5Q=
281+
go.opentelemetry.io/otel/metric v1.28.0/go.mod h1:Fb1eVBFZmLVTMb6PPohq3TO9IIhUisDsbJoL/+uQW4s=
282+
go.opentelemetry.io/otel/sdk v1.21.0 h1:FTt8qirL1EysG6sTQRZ5TokkU8d0ugCj8htOgThZXQ8=
283+
go.opentelemetry.io/otel/sdk v1.21.0/go.mod h1:Nna6Yv7PWTdgJHVRD9hIYywQBRx7pbox6nwBnZIxl/E=
284+
go.opentelemetry.io/otel/trace v1.28.0 h1:GhQ9cUuQGmNDd5BTCP2dAvv75RdMxEfTmYejp+lkx9g=
285+
go.opentelemetry.io/otel/trace v1.28.0/go.mod h1:jPyXzNPg6da9+38HEwElrQiHlVMTnVfM3/yv2OlIHaI=
269286
go.uber.org/goleak v1.3.0 h1:2K3zAYmnTNqV73imy9J1T3WC+gmCePx2hEGkimedGto=
270287
go.uber.org/goleak v1.3.0/go.mod h1:CoHD4mav9JJNrW/WLlf7HGZPjdw8EucARQHekz1X6bE=
271288
go.uber.org/multierr v1.10.0 h1:S0h4aNzvfcFsC3dRF1jLoaov7oRaKqRGC/pUEJ2yvPQ=

lts/tx/models.go

Lines changed: 47 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,47 @@
1+
package tx
2+
3+
import (
4+
"encoding/base64"
5+
"encoding/hex"
6+
"fmt"
7+
"github.com/qubic/go-archiver/protobuff"
8+
)
9+
10+
type Tx struct {
11+
TxID string `json:"txID"`
12+
SourceID string `json:"sourceID"`
13+
DestID string `json:"destID"`
14+
Amount int64 `json:"amount"`
15+
TickNumber uint32 `json:"tickNumber"`
16+
InputType uint32 `json:"inputType"`
17+
InputSize uint32 `json:"inputSize"`
18+
Input string `json:"input"`
19+
Signature string `json:"signature"`
20+
Timestamp uint64 `json:"timestamp"`
21+
MoneyFlew bool `json:"moneyFlew"`
22+
}
23+
24+
func ArchiveTxToLtsTx(archiveTx *protobuff.TransactionData) (Tx, error) {
25+
inputBytes, err := hex.DecodeString(archiveTx.Transaction.InputHex)
26+
if err != nil {
27+
return Tx{}, fmt.Errorf("decoding input hex: %v", err)
28+
}
29+
sigBytes, err := hex.DecodeString(archiveTx.Transaction.SignatureHex)
30+
if err != nil {
31+
return Tx{}, fmt.Errorf("decoding signature hex: %v", err)
32+
}
33+
34+
return Tx{
35+
TxID: archiveTx.Transaction.TxId,
36+
SourceID: archiveTx.Transaction.SourceId,
37+
DestID: archiveTx.Transaction.DestId,
38+
Amount: archiveTx.Transaction.Amount,
39+
TickNumber: archiveTx.Transaction.TickNumber,
40+
InputType: archiveTx.Transaction.InputType,
41+
InputSize: archiveTx.Transaction.InputSize,
42+
Input: base64.StdEncoding.EncodeToString(inputBytes),
43+
Signature: base64.StdEncoding.EncodeToString(sigBytes),
44+
Timestamp: archiveTx.Timestamp,
45+
MoneyFlew: archiveTx.MoneyFlew,
46+
}, nil
47+
}

0 commit comments

Comments
 (0)