Skip to content

Commit

Permalink
Merge pull request #156 from vulcanize/schema_updates
Browse files Browse the repository at this point in the history
Schema updates
  • Loading branch information
i-norden authored Nov 20, 2021
2 parents 898e64b + b9a82f6 commit ac6ef33
Show file tree
Hide file tree
Showing 20 changed files with 961 additions and 326 deletions.
6 changes: 3 additions & 3 deletions statediff/indexer/database/dump/indexer.go
Original file line number Diff line number Diff line change
Expand Up @@ -184,10 +184,10 @@ func (sdi *StateDiffIndexer) PushBlock(block *types.Block, receipts types.Receip
func (sdi *StateDiffIndexer) processHeader(tx *BatchTx, header *types.Header, headerNode node.Node, reward, td *big.Int) (string, error) {
tx.cacheIPLD(headerNode)

var baseFee *int64
var baseFee *string
if header.BaseFee != nil {
baseFee = new(int64)
*baseFee = header.BaseFee.Int64()
baseFee = new(string)
*baseFee = header.BaseFee.String()
}

headerID := header.Hash().String()
Expand Down
12 changes: 12 additions & 0 deletions statediff/indexer/database/file/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,3 +31,15 @@ type Config struct {
func (c Config) Type() shared.DBType {
return shared.FILE
}

// TestConfig config for unit tests
var TestConfig = Config{
FilePath: "./statediffing_test_file.sql",
NodeInfo: node.Info{
GenesisBlock: "0xd4e56740f876aef8c010b86a40d5f56745a118d0906a34e69aec8c0db1cb8fa3",
NetworkID: "1",
ChainID: 1,
ID: "mockNodeID",
ClientName: "go-ethereum",
},
}
54 changes: 27 additions & 27 deletions statediff/indexer/database/file/indexer.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ var (

// StateDiffIndexer satisfies the indexer.StateDiffIndexer interface for ethereum statediff objects on top of a void
type StateDiffIndexer struct {
writer *SQLWriter
fileWriter *SQLWriter
chainConfig *params.ChainConfig
nodeID string
wg *sync.WaitGroup
Expand All @@ -79,7 +79,7 @@ func NewStateDiffIndexer(ctx context.Context, chainConfig *params.ChainConfig, c
w.upsertNode(config.NodeInfo)
w.upsertIPLDDirect(shared.RemovedNodeMhKey, []byte{})
return &StateDiffIndexer{
writer: w,
fileWriter: w,
chainConfig: chainConfig,
nodeID: config.NodeInfo.ID,
wg: wg,
Expand Down Expand Up @@ -133,7 +133,7 @@ func (sdi *StateDiffIndexer) PushBlock(block *types.Block, receipts types.Receip
indexerMetrics.tStateStoreCodeProcessing.Update(tDiff)
traceMsg += fmt.Sprintf("state, storage, and code storage processing time: %s\r\n", tDiff.String())
t = time.Now()
sdi.writer.Flush()
sdi.fileWriter.Flush()
tDiff = time.Since(t)
indexerMetrics.tPostgresCommit.Update(tDiff)
traceMsg += fmt.Sprintf("postgres transaction commit duration: %s\r\n", tDiff.String())
Expand Down Expand Up @@ -189,15 +189,15 @@ func (sdi *StateDiffIndexer) PushBlock(block *types.Block, receipts types.Receip
// processHeader write a header IPLD insert SQL stmt to a file
// it returns the headerID
func (sdi *StateDiffIndexer) processHeader(header *types.Header, headerNode node.Node, reward, td *big.Int) string {
sdi.writer.upsertIPLDNode(headerNode)
sdi.fileWriter.upsertIPLDNode(headerNode)

var baseFee *int64
var baseFee *string
if header.BaseFee != nil {
baseFee = new(int64)
*baseFee = header.BaseFee.Int64()
baseFee = new(string)
*baseFee = header.BaseFee.String()
}
headerID := header.Hash().String()
sdi.writer.upsertHeaderCID(models.HeaderModel{
sdi.fileWriter.upsertHeaderCID(models.HeaderModel{
NodeID: sdi.nodeID,
CID: headerNode.Cid().String(),
MhKey: shared.MultihashKeyFromCID(headerNode.Cid()),
Expand All @@ -221,15 +221,15 @@ func (sdi *StateDiffIndexer) processHeader(header *types.Header, headerNode node
func (sdi *StateDiffIndexer) processUncles(headerID string, blockNumber uint64, uncleNodes []*ipld2.EthHeader) {
// publish and index uncles
for _, uncleNode := range uncleNodes {
sdi.writer.upsertIPLDNode(uncleNode)
sdi.fileWriter.upsertIPLDNode(uncleNode)
var uncleReward *big.Int
// in PoA networks uncle reward is 0
if sdi.chainConfig.Clique != nil {
uncleReward = big.NewInt(0)
} else {
uncleReward = shared.CalcUncleMinerReward(blockNumber, uncleNode.Number.Uint64())
}
sdi.writer.upsertUncleCID(models.UncleModel{
sdi.fileWriter.upsertUncleCID(models.UncleModel{
HeaderID: headerID,
CID: uncleNode.Cid().String(),
MhKey: shared.MultihashKeyFromCID(uncleNode.Cid()),
Expand Down Expand Up @@ -261,10 +261,10 @@ func (sdi *StateDiffIndexer) processReceiptsAndTxs(args processArgs) error {
signer := types.MakeSigner(sdi.chainConfig, args.blockNumber)
for i, receipt := range args.receipts {
for _, logTrieNode := range args.logTrieNodes[i] {
sdi.writer.upsertIPLDNode(logTrieNode)
sdi.fileWriter.upsertIPLDNode(logTrieNode)
}
txNode := args.txNodes[i]
sdi.writer.upsertIPLDNode(txNode)
sdi.fileWriter.upsertIPLDNode(txNode)

// index tx
trx := args.txs[i]
Expand All @@ -285,7 +285,7 @@ func (sdi *StateDiffIndexer) processReceiptsAndTxs(args processArgs) error {
MhKey: shared.MultihashKeyFromCID(txNode.Cid()),
Type: trx.Type(),
}
sdi.writer.upsertTransactionCID(txModel)
sdi.fileWriter.upsertTransactionCID(txModel)

// index access list if this is one
for j, accessListElement := range trx.AccessList() {
Expand All @@ -299,7 +299,7 @@ func (sdi *StateDiffIndexer) processReceiptsAndTxs(args processArgs) error {
Address: accessListElement.Address.Hex(),
StorageKeys: storageKeys,
}
sdi.writer.upsertAccessListElement(accessListElementModel)
sdi.fileWriter.upsertAccessListElement(accessListElementModel)
}

// this is the contract address if this receipt is for a contract creation tx
Expand Down Expand Up @@ -327,7 +327,7 @@ func (sdi *StateDiffIndexer) processReceiptsAndTxs(args processArgs) error {
} else {
rctModel.PostState = common.Bytes2Hex(receipt.PostState)
}
sdi.writer.upsertReceiptCID(rctModel)
sdi.fileWriter.upsertReceiptCID(rctModel)

// index logs
logDataSet := make([]*models.LogsModel, len(receipt.Logs))
Expand All @@ -354,13 +354,13 @@ func (sdi *StateDiffIndexer) processReceiptsAndTxs(args processArgs) error {
Topic3: topicSet[3],
}
}
sdi.writer.upsertLogCID(logDataSet)
sdi.fileWriter.upsertLogCID(logDataSet)
}

// publish trie nodes, these aren't indexed directly
for i, n := range args.txTrieNodes {
sdi.writer.upsertIPLDNode(n)
sdi.writer.upsertIPLDNode(args.rctTrieNodes[i])
sdi.fileWriter.upsertIPLDNode(n)
sdi.fileWriter.upsertIPLDNode(args.rctTrieNodes[i])
}

return nil
Expand All @@ -380,10 +380,10 @@ func (sdi *StateDiffIndexer) PushStateNode(batch interfaces.Batch, stateNode sdt
MhKey: shared.RemovedNodeMhKey,
NodeType: stateNode.NodeType.Int(),
}
sdi.writer.upsertStateCID(stateModel)
sdi.fileWriter.upsertStateCID(stateModel)
return nil
}
stateCIDStr, stateMhKey, err := sdi.writer.upsertIPLDRaw(ipld2.MEthStateTrie, multihash.KECCAK_256, stateNode.NodeValue)
stateCIDStr, stateMhKey, err := sdi.fileWriter.upsertIPLDRaw(ipld2.MEthStateTrie, multihash.KECCAK_256, stateNode.NodeValue)
if err != nil {
return fmt.Errorf("error generating and cacheing state node IPLD: %v", err)
}
Expand All @@ -396,7 +396,7 @@ func (sdi *StateDiffIndexer) PushStateNode(batch interfaces.Batch, stateNode sdt
NodeType: stateNode.NodeType.Int(),
}
// index the state node
sdi.writer.upsertStateCID(stateModel)
sdi.fileWriter.upsertStateCID(stateModel)
// if we have a leaf, decode and index the account data
if stateNode.NodeType == sdtypes.Leaf {
var i []interface{}
Expand All @@ -418,7 +418,7 @@ func (sdi *StateDiffIndexer) PushStateNode(batch interfaces.Batch, stateNode sdt
CodeHash: account.CodeHash,
StorageRoot: account.Root.String(),
}
sdi.writer.upsertStateAccount(accountModel)
sdi.fileWriter.upsertStateAccount(accountModel)
}
// if there are any storage nodes associated with this node, publish and index them
for _, storageNode := range stateNode.StorageNodes {
Expand All @@ -434,10 +434,10 @@ func (sdi *StateDiffIndexer) PushStateNode(batch interfaces.Batch, stateNode sdt
MhKey: shared.RemovedNodeMhKey,
NodeType: storageNode.NodeType.Int(),
}
sdi.writer.upsertStorageCID(storageModel)
sdi.fileWriter.upsertStorageCID(storageModel)
continue
}
storageCIDStr, storageMhKey, err := sdi.writer.upsertIPLDRaw(ipld2.MEthStorageTrie, multihash.KECCAK_256, storageNode.NodeValue)
storageCIDStr, storageMhKey, err := sdi.fileWriter.upsertIPLDRaw(ipld2.MEthStorageTrie, multihash.KECCAK_256, storageNode.NodeValue)
if err != nil {
return fmt.Errorf("error generating and cacheing storage node IPLD: %v", err)
}
Expand All @@ -450,7 +450,7 @@ func (sdi *StateDiffIndexer) PushStateNode(batch interfaces.Batch, stateNode sdt
MhKey: storageMhKey,
NodeType: storageNode.NodeType.Int(),
}
sdi.writer.upsertStorageCID(storageModel)
sdi.fileWriter.upsertStorageCID(storageModel)
}

return nil
Expand All @@ -463,11 +463,11 @@ func (sdi *StateDiffIndexer) PushCodeAndCodeHash(batch interfaces.Batch, codeAnd
if err != nil {
return fmt.Errorf("error deriving multihash key from codehash: %v", err)
}
sdi.writer.upsertIPLDDirect(mhKey, codeAndCodeHash.Code)
sdi.fileWriter.upsertIPLDDirect(mhKey, codeAndCodeHash.Code)
return nil
}

// Close satisfies io.Closer
func (sdi *StateDiffIndexer) Close() error {
return sdi.writer.Close()
return sdi.fileWriter.Close()
}
132 changes: 132 additions & 0 deletions statediff/indexer/database/file/indexer_legacy_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,132 @@
// VulcanizeDB
// Copyright © 2019 Vulcanize

// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU Affero General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.

// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Affero General Public License for more details.

// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see <http://www.gnu.org/licenses/>.

package file_test

import (
"context"
"errors"
"os"
"testing"

"github.com/ipfs/go-cid"
"github.com/jmoiron/sqlx"
"github.com/multiformats/go-multihash"
"github.com/stretchr/testify/require"

"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/statediff/indexer/database/file"
"github.com/ethereum/go-ethereum/statediff/indexer/database/sql/postgres"
"github.com/ethereum/go-ethereum/statediff/indexer/interfaces"
"github.com/ethereum/go-ethereum/statediff/indexer/ipld"
"github.com/ethereum/go-ethereum/statediff/indexer/mocks"
"github.com/ethereum/go-ethereum/statediff/indexer/test_helpers"
)

var (
legacyData = mocks.NewLegacyData()
mockLegacyBlock *types.Block
legacyHeaderCID cid.Cid
)

func setupLegacy(t *testing.T) {
mockLegacyBlock = legacyData.MockBlock
legacyHeaderCID, _ = ipld.RawdataToCid(ipld.MEthHeader, legacyData.MockHeaderRlp, multihash.KECCAK_256)
if _, err := os.Stat(file.TestConfig.FilePath); !errors.Is(err, os.ErrNotExist) {
err := os.Remove(file.TestConfig.FilePath)
require.NoError(t, err)
}
ind, err := file.NewStateDiffIndexer(context.Background(), legacyData.Config, file.TestConfig)
require.NoError(t, err)
var tx interfaces.Batch
tx, err = ind.PushBlock(
mockLegacyBlock,
legacyData.MockReceipts,
legacyData.MockBlock.Difficulty())
require.NoError(t, err)

defer func() {
if err := tx.Submit(err); err != nil {
t.Fatal(err)
}
if err := ind.Close(); err != nil {
t.Fatal(err)
}
}()
for _, node := range legacyData.StateDiffs {
err = ind.PushStateNode(tx, node, legacyData.MockBlock.Hash().String())
require.NoError(t, err)
}

test_helpers.ExpectEqual(t, tx.(*file.BatchTx).BlockNumber, legacyData.BlockNumber.Uint64())

connStr := postgres.DefaultConfig.DbConnectionString()

sqlxdb, err = sqlx.Connect("postgres", connStr)
if err != nil {
t.Fatalf("failed to connect to db with connection string: %s err: %v", connStr, err)
}
}

func dumpData(t *testing.T) {
sqlFileBytes, err := os.ReadFile(file.TestConfig.FilePath)
require.NoError(t, err)

_, err = sqlxdb.Exec(string(sqlFileBytes))
require.NoError(t, err)
}

func tearDown(t *testing.T) {
file.TearDownDB(t, sqlxdb)
err := os.Remove(file.TestConfig.FilePath)
require.NoError(t, err)
err = sqlxdb.Close()
require.NoError(t, err)
}

func expectTrue(t *testing.T, value bool) {
if !value {
t.Fatalf("Assertion failed")
}
}

func TestFileIndexerLegacy(t *testing.T) {
t.Run("Publish and index header IPLDs", func(t *testing.T) {
setupLegacy(t)
dumpData(t)
defer tearDown(t)
pgStr := `SELECT cid, td, reward, block_hash, base_fee
FROM eth.header_cids
WHERE block_number = $1`
// check header was properly indexed
type res struct {
CID string
TD string
Reward string
BlockHash string `db:"block_hash"`
BaseFee *string `db:"base_fee"`
}
header := new(res)
err = sqlxdb.QueryRowx(pgStr, legacyData.BlockNumber.Uint64()).StructScan(header)
require.NoError(t, err)

test_helpers.ExpectEqual(t, header.CID, legacyHeaderCID.String())
test_helpers.ExpectEqual(t, header.TD, legacyData.MockBlock.Difficulty().String())
test_helpers.ExpectEqual(t, header.Reward, "5000000000000011250")
require.Nil(t, legacyData.MockHeader.BaseFee)
require.Nil(t, header.BaseFee)
})
}
Loading

0 comments on commit ac6ef33

Please sign in to comment.