Skip to content
This repository has been archived by the owner on Oct 26, 2020. It is now read-only.

Commit

Permalink
Added sync mode
Browse files Browse the repository at this point in the history
  • Loading branch information
vaibhavchellani committed Jul 20, 2020
1 parent 450f5dd commit 33c7b71
Show file tree
Hide file tree
Showing 14 changed files with 346 additions and 193 deletions.
65 changes: 9 additions & 56 deletions aggregator/aggregator.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ type Aggregator struct {
}

// NewAggregator returns new aggregator object
func NewAggregator(db core.DB) *Aggregator {
func NewAggregator() *Aggregator {
// create logger
logger := common.Logger.With("module", AggregatingService)
LoadedBazooka, err := core.NewPreLoadedBazooka()
Expand All @@ -44,7 +44,11 @@ func NewAggregator(db core.DB) *Aggregator {
}
aggregator := &Aggregator{}
aggregator.BaseService = *core.NewBaseService(logger, AggregatingService, aggregator)
aggregator.DB = db
DB, err := core.NewDB()
if err != nil {
panic(err)
}
aggregator.DB = DB
aggregator.LoadedBazooka = LoadedBazooka
return aggregator
}
Expand All @@ -64,7 +68,7 @@ func (a *Aggregator) OnStart() error {
// OnStop stops all necessary go routines
func (a *Aggregator) OnStop() {
a.BaseService.OnStop() // Always call the overridden method.

a.DB.Close()
// cancel ack process
a.cancelAggregating()
}
Expand Down Expand Up @@ -133,8 +137,9 @@ func (a *Aggregator) ProcessTx(txs []core.Tx) error {
return err
}

fromAccProof, toAccProof, PDAproof, err := a.GetTxVerificationData(tx)
fromAccProof, toAccProof, PDAproof, err := tx.GetVerificationData()
if err != nil {
a.Logger.Error("Unable to create verification data", "error", err)
return err
}

Expand All @@ -157,57 +162,5 @@ func (a *Aggregator) ProcessTx(txs []core.Tx) error {

currentRoot = updatedRoot
}

return nil
}

// GetTxVerificationData fetches all the data required to prove validity fo transaction
func (a *Aggregator) GetTxVerificationData(tx core.Tx) (fromMerkleProof, toMerkleProof core.AccountMerkleProof, PDAProof core.PDAMerkleProof, err error) {
fromAcc, err := a.DB.GetAccountByID(tx.From)
if err != nil {
return
}

fromSiblings, err := a.DB.GetSiblings(fromAcc.Path)
if err != nil {
return
}
fromMerkleProof = core.NewAccountMerkleProof(fromAcc, fromSiblings)

toAcc, err := a.DB.GetAccountByID(tx.To)
if err != nil {
return
}
var toSiblings []core.UserAccount

mysqlTx := a.DB.Instance.Begin()
defer func() {
if r := recover(); r != nil {
mysqlTx.Rollback()
}
}()
dbCopy, _ := core.NewDB()
dbCopy.Instance = mysqlTx

updatedFromAccountBytes, _, err := a.LoadedBazooka.ApplyTx(fromMerkleProof, tx)
if err != nil {
return
}

fromAcc.Data = updatedFromAccountBytes
err = dbCopy.UpdateAccount(fromAcc)
if err != nil {
return
}

// TODO add a check to ensure that DB copy of state matches the one returned by ApplyTransferTx
toSiblings, err = dbCopy.GetSiblings(toAcc.Path)
if err != nil {
return
}

toMerkleProof = core.NewAccountMerkleProof(toAcc, toSiblings)
PDAProof = core.NewPDAProof(fromAcc.Path, fromAcc.PublicKey, fromSiblings)
mysqlTx.Rollback()
return fromMerkleProof, toMerkleProof, PDAProof, nil
}
3 changes: 2 additions & 1 deletion cmd/start.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ func StartCmd() *cobra.Command {
//

// create aggregator service
aggregator := agg.NewAggregator(core.DBInstance)
aggregator := agg.NewAggregator()

// create the syncer service
syncer := listener.NewSyncer()
Expand Down Expand Up @@ -189,6 +189,7 @@ func LoadGenesisData(genesis config.Genesis) {
newParams := core.Params{StakeAmount: genesis.StakeAmount, MaxDepth: genesis.MaxTreeDepth, MaxDepositSubTreeHeight: genesis.MaxDepositSubTreeHeight}
core.DBInstance.UpdateStakeAmount(newParams.StakeAmount)
core.DBInstance.UpdateMaxDepth(newParams.MaxDepth)
core.DBInstance.UpdateFinalisationTimePerBatch(40320)
core.DBInstance.UpdateDepositSubTreeHeight(newParams.MaxDepositSubTreeHeight)

// load sync status
Expand Down
38 changes: 37 additions & 1 deletion contracts/rolluputils/rolluputils.abi
Original file line number Diff line number Diff line change
Expand Up @@ -392,6 +392,42 @@
"stateMutability": "pure",
"type": "function"
},
{
"constant": true,
"inputs": [
{
"internalType": "bytes",
"name": "txBytes",
"type": "bytes"
}
],
"name": "DecompressTx",
"outputs": [
{
"internalType": "uint256",
"name": "from",
"type": "uint256"
},
{
"internalType": "uint256",
"name": "to",
"type": "uint256"
},
{
"internalType": "uint256",
"name": "amount",
"type": "uint256"
},
{
"internalType": "bytes",
"name": "sig",
"type": "bytes"
}
],
"payable": false,
"stateMutability": "pure",
"type": "function"
},
{
"constant": true,
"inputs": [
Expand Down Expand Up @@ -781,4 +817,4 @@
"stateMutability": "view",
"type": "function"
}
]
]
46 changes: 45 additions & 1 deletion contracts/rolluputils/rolluputils.go

Large diffs are not rendered by default.

1 change: 0 additions & 1 deletion core/DepositTree.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,6 @@ func (db *DB) FinaliseDepositsAndAddBatch(accountsRoot ByteArray, pathToDepositS

func (db *DB) FinaliseDeposits(pendingAccs []UserAccount, pathToDepositSubTree uint64, maxTreeDepth uint64) error {
var accounts []UserAccount

// fetch 2**DepositSubTree inactive accounts ordered by path
err := db.Instance.Limit(len(pendingAccs)).Order("path").Where("status = ?", STATUS_NON_INITIALIZED).Find(&accounts).Error
if err != nil {
Expand Down
103 changes: 18 additions & 85 deletions core/batch.go
Original file line number Diff line number Diff line change
@@ -1,95 +1,21 @@
package core

import (
"fmt"
"math/big"

"encoding/json"
)

// Batch is the batches that need to be submitted on-chain periodically
type Batch struct {
Index uint64
StateRoot ByteArray
BatchID uint64
StateRoot string
Committer string
TxRoot ByteArray
TxRoot string
StakeAmount uint64
FinalisesOn big.Int
SubmissionHash string
TransactionsIncluded [][]byte
TransactionsIncluded []byte `gorm:"size:1000000"`
BatchType uint64
}

func (b *Batch) DBModel() (BatchModel, error) {
encodedTxs, err := json.Marshal(b.TransactionsIncluded)
if err != nil {
return BatchModel{}, err
}
encodedStateRoot, err := json.Marshal(b.StateRoot)
if err != nil {
return BatchModel{}, err
}
encodedTxRoot, err := json.Marshal(b.TxRoot)
if err != nil {
return BatchModel{}, err
}
finalisationBlockBytes := b.FinalisesOn.Bytes()
newBatchModel := BatchModel{
Index: b.Index,
StateRoot: encodedStateRoot,
Committer: b.Committer,
TxRoot: encodedTxRoot,
StakeAmount: b.StakeAmount,
FinalisesOn: finalisationBlockBytes,
SubmissionHash: b.SubmissionHash,
TransactionsIncluded: encodedTxs,
}
return newBatchModel, nil
}

// BatchModel represents the actual stuff stored in the DB
// We are encoding the whole struct because we will save some operations
// if we can just read the model for data in some cases and only decode when we need the encoded data
type BatchModel struct {
Index uint64
StateRoot []byte
Committer string
TxRoot []byte
StakeAmount uint64
FinalisesOn []byte
SubmissionHash string
TransactionsIncluded []byte `gorm:"size:10000"`
}

func (b *BatchModel) Batch() (Batch, error) {
var decodedTxs [][]byte
err := json.Unmarshal(b.TransactionsIncluded, &decodedTxs)
if err != nil {
return Batch{}, err
}
var decodedStateRoot ByteArray
err = json.Unmarshal(b.StateRoot, &decodedStateRoot)
if err != nil {
return Batch{}, err
}
var decodedTxRoot ByteArray
err = json.Unmarshal(b.TxRoot, &decodedTxRoot)
if err != nil {
return Batch{}, err
}
finalisationBlockBN := big.NewInt(0)
finalisationBlockBN.SetBytes(b.FinalisesOn)
newBatch := Batch{
Index: b.Index,
StateRoot: decodedStateRoot,
Committer: b.Committer,
TxRoot: decodedTxRoot,
StakeAmount: b.StakeAmount,
FinalisesOn: *finalisationBlockBN,
SubmissionHash: b.SubmissionHash,
TransactionsIncluded: decodedTxs,
}
return newBatch, nil
Status uint64
}

func (db *DB) GetAllBatches() (batches []Batch, err error) {
Expand All @@ -103,8 +29,8 @@ func (db *DB) GetAllBatches() (batches []Batch, err error) {
}

func (db *DB) GetLatestBatch() (batch Batch, err error) {
if err := db.Instance.First(&batch).Error; err != nil {
return batch, ErrRecordNotFound(fmt.Sprintf("unable to find latest batch"))
if err := db.Instance.Order("batch_id desc").First(&batch).Error; err != nil {
return batch, err
}
return batch, nil
}
Expand All @@ -116,9 +42,16 @@ func (db *DB) GetBatchCount() (int, error) {
}

func (db *DB) AddNewBatch(batch Batch) error {
batchModel, err := batch.DBModel()
if err != nil {
return err
return db.Instance.Create(batch).Error
}

func (db *DB) GetBatchByIndex(index uint64) (batch Batch, err error) {
if err := db.Instance.Where("batch_id = ?", index).Find(&batch).Error; err != nil {
return batch, err
}
return db.Instance.Create(batchModel).Error
return batch, nil
}

func (db *DB) CommitBatch(batch Batch) error {
return db.Instance.Update(batch).Error
}
49 changes: 34 additions & 15 deletions core/bazooka.go
Original file line number Diff line number Diff line change
Expand Up @@ -244,6 +244,21 @@ func (b *Bazooka) CompressTransferTx(tx Tx) ([]byte, error) {
return b.RollupUtils.CompressTxWithMessage(&opts, tx.Data, sigBytes)
}

func (b *Bazooka) DecompressTransferTxs(compressedTxs [][]byte) (from, to, amount []*big.Int, sig [][]byte, err error) {
for _, compressedTx := range compressedTxs {
opts := bind.CallOpts{From: config.OperatorAddress}
decompressedTx, err := b.RollupUtils.DecompressTx(&opts, compressedTx)
if err != nil {
return from, to, amount, sig, err
}
from = append(from, decompressedTx.From)
to = append(to, decompressedTx.To)
amount = append(amount, decompressedTx.Amount)
sig = append(sig, decompressedTx.Sig)
}
return from, to, amount, sig, nil
}

func (b *Bazooka) EncodeTransferTx(from, to, token, nonce, amount, txType int64) ([]byte, error) {
opts := bind.CallOpts{From: config.OperatorAddress}
return b.RollupUtils.BytesFromTxDeconstructed(&opts, big.NewInt(from), big.NewInt(to), big.NewInt(token), big.NewInt(nonce), big.NewInt(txType), big.NewInt(amount))
Expand Down Expand Up @@ -275,20 +290,7 @@ func (b *Bazooka) GetGenesisAccounts() (genesisAccount []UserAccount, err error)
if err != nil {
return
}
for _, account := range accounts {
ID, _, _, _, _ := b.DecodeAccount(account)
genesisAccount = append(genesisAccount, *NewUserAccount(ID.Uint64(), STATUS_ACTIVE, "", UintToString(ID.Uint64()), account))
}
return
}

func (b *Bazooka) GetZeroValue() (genesisAccount []UserAccount, err error) {
opts := bind.CallOpts{From: config.OperatorAddress}
// get genesis accounts
accounts, err := b.RollupUtils.GetGenesisDataBlocks(&opts)
if err != nil {
return
}
for _, account := range accounts {
ID, _, _, _, _ := b.DecodeAccount(account)
genesisAccount = append(genesisAccount, *NewUserAccount(ID.Uint64(), STATUS_ACTIVE, "", UintToString(ID.Uint64()), account))
Expand Down Expand Up @@ -380,7 +382,6 @@ func (b *Bazooka) SubmitBatch(updatedRoot ByteArray, txs []Tx) error {
"txs",
len(txs),
)

var compressedTxs [][]byte
for _, tx := range txs {
compressedTx, err := tx.Compress()
Expand Down Expand Up @@ -411,16 +412,34 @@ func (b *Bazooka) SubmitBatch(updatedRoot ByteArray, txs []Tx) error {
return err
}

latestBatch, err := DBInstance.GetLatestBatch()
if err != nil {
return err
}

newBatch := Batch{
BatchID: latestBatch.BatchID + 1,
StateRoot: updatedRoot.String(),
Committer: config.OperatorAddress.String(),
Status: BATCH_BROADCASTED,
}
b.log.Info("Broadcasting a new batch", "newBatch", newBatch)
err = DBInstance.AddNewBatch(newBatch)
if err != nil {
return err
}
tx, err := b.RollupContract.SubmitBatch(auth, compressedTxs, updatedRoot)
if err != nil {
return err
}

b.log.Info("Sent a new batch!", "txHash", tx.Hash().String())
return nil
}

func GetTxsFromInput(input map[string]interface{}) (txs [][]byte) {
return input["_txs"].([][]byte)
data := input["_txs"].([][]byte)
return data
}

func (b *Bazooka) GenerateAuthObj(client *ethclient.Client, callMsg ethereum.CallMsg) (auth *bind.TransactOpts, err error) {
Expand Down
Loading

0 comments on commit 33c7b71

Please sign in to comment.