Skip to content

Commit

Permalink
Batching mapping values sync
Browse files Browse the repository at this point in the history
  • Loading branch information
Sulejman committed Aug 3, 2024
1 parent c3e3859 commit df95b3f
Show file tree
Hide file tree
Showing 2 changed files with 66 additions and 36 deletions.
7 changes: 6 additions & 1 deletion cmd/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"protocol-state-cacher/pkgs/prost"
"protocol-state-cacher/pkgs/redis"
"protocol-state-cacher/pkgs/utils"
"sync"
)

func main() {
Expand All @@ -15,5 +16,9 @@ func main() {
prost.ConfigureContractInstance()
redis.RedisClient = redis.NewRedisClient()

prost.ColdSync()
wg := &sync.WaitGroup{}
wg.Add(2)
go prost.ColdSyncMappings()
go prost.ColdSyncValues()
wg.Wait()
}
95 changes: 60 additions & 35 deletions pkgs/prost/contract.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (
listenerCommon "protocol-state-cacher/pkgs/common"
"protocol-state-cacher/pkgs/contract"
"protocol-state-cacher/pkgs/redis"
"sync"
"time"
)

Expand Down Expand Up @@ -58,53 +59,77 @@ func MustQuery[K any](ctx context.Context, call func(opts *bind.CallOpts) (val K
return val, err
}

func ColdSync() {
func ColdSyncMappings() {
for {
PopulateStateVars()
coldSyncAllSlots()
time.Sleep((time.Millisecond * 500) * BlockTime)
time.Sleep(1 * time.Minute)
}

}

func coldSyncAllSlots() {
var allSlots []string
func ColdSyncValues() {
go func() {
for {
PopulateStateVars()
time.Sleep((time.Millisecond * 500) * BlockTime)
}
}()
}

func coldSyncAllSlots() {
if slotCount, err := Instance.SlotCounter(&bind.CallOpts{}); slotCount != nil && err == nil {
PersistState(context.Background(), redis.ContractStateVariable(pkgs.SlotCounter), slotCount.String())

for i := int64(0); i <= slotCount.Int64(); i++ {
slot, err := Instance.GetSlotInfo(&bind.CallOpts{}, DataMarket, big.NewInt(i))

if slot == (contract.PowerloomDataMarketSlotInfo{}) || err != nil {
log.Debugln("Error getting slot info: ", i)
continue
var allSlots []string
var mu sync.Mutex

// TODO: MAKE THIS COUNT TO SLOT NUMBER
for i := int64(0); i <= 5000; i += 20 {
var wg sync.WaitGroup

for j := i; j < i+20 && j <= 5000; j++ {
wg.Add(1)
go func(slotIndex int64) {
defer wg.Done()
slot, err := Instance.GetSlotInfo(&bind.CallOpts{}, DataMarket, big.NewInt(slotIndex))

if slot == (contract.PowerloomDataMarketSlotInfo{}) || err != nil {
log.Debugln("Error getting slot info: ", slotIndex)
return
}

slotMarshalled, err := json.Marshal(slot)
if err != nil {
log.Debugln("Error marshalling slot info: ", slotIndex)
return
}

PersistState(
context.Background(),
redis.SlotInfo(slot.SlotId.String()),
string(slotMarshalled),
)

mu.Lock()
allSlots = append(allSlots, redis.SlotInfo(slot.SlotId.String()))
mu.Unlock()

log.Debugln("Slot info: ", slotIndex, string(slotMarshalled), slot.SlotId.String())
}(j)
}

slotMarshalled, err := json.Marshal(slot)

if err != nil {
log.Debugln("Error marshalling slot info: ", i)
continue
wg.Wait()

if len(allSlots) > 0 {
mu.Lock()
err := redis.AddToSet(context.Background(), "AllSlotsInfo", allSlots...)
mu.Unlock()
if err != nil {
log.Errorln("Error adding slots to set: ", err)
listenerCommon.SendFailureNotification("ColdSync", err.Error(), time.Now().String(), "ERROR")
}
allSlots = nil // reset batch
}

PersistState(
context.Background(),
redis.SlotInfo(slot.SlotId.String()),
string(slotMarshalled),
)

allSlots = append(
allSlots,
redis.SlotInfo(slot.SlotId.String()),
)

log.Debugln("Slot info: ", i, string(slotMarshalled), slot.SlotId.String())
}

err := redis.AddToSet(context.Background(), "AllSlotsInfo", allSlots...)
if err != nil {
log.Errorln("Error adding slots to set: ", err)
listenerCommon.SendFailureNotification("ColdSync", err.Error(), time.Now().String(), "ERROR")
}
} else {
log.Errorln("Error getting slot counter: ", err)
Expand Down

0 comments on commit df95b3f

Please sign in to comment.