Skip to content

Commit

Permalink
support custom tokens
Browse files Browse the repository at this point in the history
  • Loading branch information
dylenfu committed Jun 26, 2018
1 parent 856a2fa commit 356518c
Show file tree
Hide file tree
Showing 7 changed files with 44 additions and 39 deletions.
10 changes: 6 additions & 4 deletions config/extractor.toml
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,6 @@ title = "extractor"

owner = "Loopring corporation"

# aws cloudwatch switch
# push heart beat message, proposed false
watch_cloud_open = false

# zap log
[log]
level = "debug"
Expand Down Expand Up @@ -76,3 +72,9 @@ watch_cloud_open = false

[kafka]
brokers = ["127.0.0.1:9092"]

# aws cloudwatch switch
# push heart beat message, proposed false
[cloud_watch]
enabled = false
region = "ap-northeast-1"
33 changes: 19 additions & 14 deletions extractor/contract.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,12 +97,6 @@ func (processor *AbiProcessor) GetMethodName(tx *ethtyp.Transaction) string {
return contract.METHOD_UNKNOWN
}

// SupportedContract judge protocol have ever been load
func (processor *AbiProcessor) SupportedContract(protocol common.Address) bool {
_, ok := processor.protocols[protocol]
return ok
}

// SupportedEvents supported contract events and unsupported erc20 events
func (processor *AbiProcessor) SupportedEvents(receipt *ethtyp.TransactionReceipt) bool {
if receipt == nil || len(receipt.Logs) == 0 {
Expand Down Expand Up @@ -131,6 +125,13 @@ func (processor *AbiProcessor) SupportedEvents(receipt *ethtyp.TransactionReceip
// SupportedMethod only supported contracts method
func (processor *AbiProcessor) SupportedMethod(tx *ethtyp.Transaction) bool {
protocol := common.HexToAddress(tx.To)
tokens, _ := util.GetAllCustomTokenList()
for _, v := range tokens {
if v.Address == protocol {

}
}

if _, ok := processor.protocols[protocol]; !ok {
return false
}
Expand All @@ -148,8 +149,7 @@ func (processor *AbiProcessor) loadProtocolAddress() {
processor.protocols[v.Protocol] = v.Symbol
log.Infof("extractor,contract protocol %s->%s", v.Symbol, v.Protocol.Hex())
}

processor.loadPersonalTokens()
processor.AddCustomTokens()

for _, v := range lpraccessor.ProtocolAddresses() {
protocolSymbol := "loopring"
Expand Down Expand Up @@ -331,12 +331,17 @@ func (processor *AbiProcessor) loadTokenTransferDelegateProtocol() {
}
}

// todo: consume kafka message and add token in protocols
func (process *AbiProcessor) addToken(token *types.Token) error {
return nil
func (processor *AbiProcessor) IsTokenSupported(token common.Address) bool {
_, ok := processor.protocols[token]
return ok
}

// todo: get tokens from redis, and delete duplicated tokens
func (process *AbiProcessor) loadPersonalTokens() error {
return nil
func (processor *AbiProcessor) AddCustomTokens() {
tokens, _ := util.GetAllCustomTokenList()
for _, v := range tokens {
if !processor.IsTokenSupported(v.Address) {
processor.protocols[v.Address] = v.Symbol
log.Infof("extractor,contract protocol %s->%s", v.Symbol, v.Address.Hex())
}
}
}
15 changes: 7 additions & 8 deletions extractor/emit.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@ import (
ex "github.com/Loopring/relay-lib/extractor"
"github.com/Loopring/relay-lib/kafka"
"github.com/Loopring/relay-lib/log"
"github.com/Loopring/relay-lib/types"
"github.com/Loopring/relay-lib/zklock"
)

Expand Down Expand Up @@ -55,16 +54,16 @@ func RegistryEmitter(zkOpt zklock.ZkLockConfig, producerOpt, consumerOpt kafka.K
if len(consumerOpt.Brokers) < 1 {
return fmt.Errorf("kafka consumer brokers should not be empty")
}

register = &kafka.ConsumerRegister{}
register.Initialize(consumerOpt.Brokers)
if err := register.RegisterTopicAndHandler(kafka.Kafka_Topic_Extractor_PendingTransaction, kafka.Kafka_Group_Extractor_PendingTransaction, ethtyp.Transaction{}, service.WatchingPendingTransaction); err != nil {
return err
}
if err := register.RegisterTopicAndHandler(kafka.Kafka_Topic_Extractor_AddToken, kafka.Kafka_Group_Extractor_AddToken, types.Token{}, service.WatchingAddToken); err != nil {
return err
}
err := register.RegisterTopicAndHandler(
kafka.Kafka_Topic_Extractor_PendingTransaction,
kafka.Kafka_Group_Extractor_PendingTransaction,
ethtyp.Transaction{},
service.WatchingPendingTransaction)

return nil
return err
}

func UnRegistryEmitter() {
Expand Down
10 changes: 3 additions & 7 deletions extractor/extractor.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,6 @@ type ExtractorService interface {
Stop()
ForkProcess(block *types.Block) error
WatchingPendingTransaction(input interface{}) error
WatchingAddToken(input interface{}) error
}

type ExtractorServiceImpl struct {
Expand Down Expand Up @@ -172,12 +171,6 @@ func (l *ExtractorServiceImpl) WatchingPendingTransaction(input interface{}) err
return nil
}

// todo
func (l *ExtractorServiceImpl) WatchingAddToken(input interface{}) error {
token := input.(*types.Token)
return l.processor.addToken(token)
}

func (l *ExtractorServiceImpl) ProcessBlock() error {
inter, err := l.iterator.Next()
if err != nil {
Expand Down Expand Up @@ -217,6 +210,9 @@ func (l *ExtractorServiceImpl) ProcessBlock() error {
blockEvent.IsFinished = false
Produce(blockEvent)

// add custom tokens
l.processor.AddCustomTokens()

if len(block.Transactions) > 0 {
for idx, transaction := range block.Transactions {
receipt := block.Receipts[idx]
Expand Down
4 changes: 3 additions & 1 deletion node/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (

"github.com/Loopring/extractor/extractor"
"github.com/Loopring/relay-lib/cache/redis"
"github.com/Loopring/relay-lib/cloudwatch"
libdao "github.com/Loopring/relay-lib/dao"
"github.com/Loopring/relay-lib/eth/accessor"
lpraccssor "github.com/Loopring/relay-lib/eth/loopringaccessor"
Expand Down Expand Up @@ -52,6 +53,7 @@ func LoadConfig(file string) *GlobalConfig {
panic(err)
}

println(c.CloudWatch.Enabled, c.CloudWatch.Region)
return c
}

Expand All @@ -67,7 +69,7 @@ type GlobalConfig struct {
Accessor accessor.AccessorOptions
LoopringProtocol lpraccssor.LoopringProtocolOptions
Market util.MarketOptions
WatchCloudOpen bool // default false
CloudWatch cloudwatch.CloudWatchConfig
}

func Validator(cv reflect.Value) (bool, error) {
Expand Down
2 changes: 1 addition & 1 deletion node/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,5 +103,5 @@ func (n *Node) registerEmitter() {
}

func (n *Node) registerCloudWatch() {
watch.Initialize(n.globalConfig.WatchCloudOpen)
watch.Initialize(n.globalConfig.CloudWatch)
}
9 changes: 5 additions & 4 deletions watch/watch.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,16 +29,17 @@ const (

var watchOpen bool

func Initialize(open bool) {
if !open {
func Initialize(conf cloudwatch.CloudWatchConfig) {
if !conf.Enabled {
watchOpen = false
return
}

if err := cloudwatch.Initialize(); err != nil {
if err := cloudwatch.Initialize(conf); err != nil {
log.Fatalf("node start, register cloud watch error:%s", err.Error())
}

watchOpen = open
watchOpen = true
}

func ReportHeartBeat() {
Expand Down

0 comments on commit 356518c

Please sign in to comment.