diff --git a/cmd/main.go b/cmd/main.go index 51d9fb9..be11108 100644 --- a/cmd/main.go +++ b/cmd/main.go @@ -5,6 +5,7 @@ import ( "protocol-state-cacher/pkgs/prost" "protocol-state-cacher/pkgs/redis" "protocol-state-cacher/pkgs/utils" + "sync" ) func main() { @@ -15,5 +16,9 @@ func main() { prost.ConfigureContractInstance() redis.RedisClient = redis.NewRedisClient() - prost.ColdSync() + wg := &sync.WaitGroup{} + wg.Add(2) + go prost.ColdSyncMappings() + go prost.ColdSyncValues() + wg.Wait() } diff --git a/pkgs/prost/contract.go b/pkgs/prost/contract.go index 2a66277..185d885 100644 --- a/pkgs/prost/contract.go +++ b/pkgs/prost/contract.go @@ -15,6 +15,7 @@ import ( listenerCommon "protocol-state-cacher/pkgs/common" "protocol-state-cacher/pkgs/contract" "protocol-state-cacher/pkgs/redis" + "sync" "time" ) @@ -58,53 +59,77 @@ func MustQuery[K any](ctx context.Context, call func(opts *bind.CallOpts) (val K return val, err } -func ColdSync() { +func ColdSyncMappings() { for { - PopulateStateVars() coldSyncAllSlots() - time.Sleep((time.Millisecond * 500) * BlockTime) + time.Sleep(1 * time.Minute) } + } -func coldSyncAllSlots() { - var allSlots []string +func ColdSyncValues() { + go func() { + for { + PopulateStateVars() + time.Sleep((time.Millisecond * 500) * BlockTime) + } + }() +} +func coldSyncAllSlots() { if slotCount, err := Instance.SlotCounter(&bind.CallOpts{}); slotCount != nil && err == nil { PersistState(context.Background(), redis.ContractStateVariable(pkgs.SlotCounter), slotCount.String()) - for i := int64(0); i <= slotCount.Int64(); i++ { - slot, err := Instance.GetSlotInfo(&bind.CallOpts{}, DataMarket, big.NewInt(i)) - - if slot == (contract.PowerloomDataMarketSlotInfo{}) || err != nil { - log.Debugln("Error getting slot info: ", i) - continue + var allSlots []string + var mu sync.Mutex + + // TODO: MAKE THIS COUNT TO SLOT NUMBER + for i := int64(0); i <= 5000; i += 20 { + var wg sync.WaitGroup + + for j := i; j < i+20 && j <= 5000; j++ { + wg.Add(1) + go func(slotIndex int64) { + defer wg.Done() + slot, err := Instance.GetSlotInfo(&bind.CallOpts{}, DataMarket, big.NewInt(slotIndex)) + + if slot == (contract.PowerloomDataMarketSlotInfo{}) || err != nil { + log.Debugln("Error getting slot info: ", slotIndex) + return + } + + slotMarshalled, err := json.Marshal(slot) + if err != nil { + log.Debugln("Error marshalling slot info: ", slotIndex) + return + } + + PersistState( + context.Background(), + redis.SlotInfo(slot.SlotId.String()), + string(slotMarshalled), + ) + + mu.Lock() + allSlots = append(allSlots, redis.SlotInfo(slot.SlotId.String())) + mu.Unlock() + + log.Debugln("Slot info: ", slotIndex, string(slotMarshalled), slot.SlotId.String()) + }(j) } - slotMarshalled, err := json.Marshal(slot) - - if err != nil { - log.Debugln("Error marshalling slot info: ", i) - continue + wg.Wait() + + if len(allSlots) > 0 { + mu.Lock() + err := redis.AddToSet(context.Background(), "AllSlotsInfo", allSlots...) + mu.Unlock() + if err != nil { + log.Errorln("Error adding slots to set: ", err) + listenerCommon.SendFailureNotification("ColdSync", err.Error(), time.Now().String(), "ERROR") + } + allSlots = nil // reset batch } - - PersistState( - context.Background(), - redis.SlotInfo(slot.SlotId.String()), - string(slotMarshalled), - ) - - allSlots = append( - allSlots, - redis.SlotInfo(slot.SlotId.String()), - ) - - log.Debugln("Slot info: ", i, string(slotMarshalled), slot.SlotId.String()) - } - - err := redis.AddToSet(context.Background(), "AllSlotsInfo", allSlots...) - if err != nil { - log.Errorln("Error adding slots to set: ", err) - listenerCommon.SendFailureNotification("ColdSync", err.Error(), time.Now().String(), "ERROR") } } else { log.Errorln("Error getting slot counter: ", err)