Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

eth, eth/downloader: remove references to LightChain, LightSync #2776

Merged
merged 1 commit into from
Nov 28, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 1 addition & 2 deletions core/forkchoice.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,7 @@ import (
)

// ChainReader defines a small collection of methods needed to access the local
// blockchain during header verification. It's implemented by both blockchain
// and lightchain.
// blockchain during header verification. It's implemented by blockchain.
type ChainReader interface {
// Config retrieves the header chain's chain configuration.
Config() *params.ChainConfig
Expand Down
137 changes: 45 additions & 92 deletions eth/downloader/downloader.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,11 +46,10 @@ var (
MaxReceiptFetch = 256 // Number of transaction receipts to allow fetching per request
MaxStateFetch = 384 // Number of node state values to allow fetching per request

maxQueuedHeaders = 32 * 1024 // [eth/62] Maximum number of headers to queue for import (DOS protection)
maxHeadersProcess = 2048 // Number of header download results to import at once into the chain
maxResultsProcess = 2048 // Number of content download results to import at once into the chain
FullMaxForkAncestry uint64 = params.FullImmutabilityThreshold // Maximum chain reorganisation (locally redeclared so tests can reduce it)
lightMaxForkAncestry uint64 = 30000 // Maximum chain reorganisation (locally redeclared so tests can reduce it)
maxQueuedHeaders = 32 * 1024 // [eth/62] Maximum number of headers to queue for import (DOS protection)
maxHeadersProcess = 2048 // Number of header download results to import at once into the chain
maxResultsProcess = 2048 // Number of content download results to import at once into the chain
FullMaxForkAncestry uint64 = params.FullImmutabilityThreshold // Maximum chain reorganisation (locally redeclared so tests can reduce it)

reorgProtThreshold = 48 // Threshold number of recent blocks to disable mini reorg protection
reorgProtHeaderDelay = 2 // Number of headers to delay delivering to cover mini reorgs
Expand Down Expand Up @@ -96,9 +95,8 @@ type Downloader struct {
mode atomic.Uint32 // Synchronisation mode defining the strategy used (per sync cycle), use d.getMode() to get the SyncMode
mux *event.TypeMux // Event multiplexer to announce sync operation events

genesis uint64 // Genesis block number to limit sync to (e.g. light client CHT)
queue *queue // Scheduler for selecting the hashes to download
peers *peerSet // Set of active peers from which download can proceed
queue *queue // Scheduler for selecting the hashes to download
peers *peerSet // Set of active peers from which download can proceed

stateDB ethdb.Database // Database to state sync into (and deduplicate via)

Expand All @@ -107,7 +105,6 @@ type Downloader struct {
syncStatsChainHeight uint64 // Highest block number known when syncing started
syncStatsLock sync.RWMutex // Lock protecting the sync stats fields

lightchain LightChain
blockchain BlockChain

// Callbacks
Expand Down Expand Up @@ -151,8 +148,8 @@ type Downloader struct {
syncLogTime time.Time // Time instance when status was last reported
}

// LightChain encapsulates functions required to synchronise a light chain.
type LightChain interface {
// BlockChain encapsulates functions required to sync a (full or snap) blockchain.
type BlockChain interface {
// HasHeader verifies a header's presence in the local chain.
HasHeader(common.Hash, uint64) bool

Expand All @@ -170,11 +167,6 @@ type LightChain interface {

// SetHead rewinds the local chain to a new head.
SetHead(uint64) error
}

// BlockChain encapsulates functions required to sync a (full or snap) blockchain.
type BlockChain interface {
LightChain

// HasBlock verifies a block's presence in the local chain.
HasBlock(common.Hash, uint64) bool
Expand Down Expand Up @@ -217,17 +209,13 @@ type BlockChain interface {
type DownloadOption func(downloader *Downloader) *Downloader

// New creates a new downloader to fetch hashes and blocks from remote peers.
func New(stateDb ethdb.Database, mux *event.TypeMux, chain BlockChain, lightchain LightChain, dropPeer peerDropFn, _ func()) *Downloader {
if lightchain == nil {
lightchain = chain
}
func New(stateDb ethdb.Database, mux *event.TypeMux, chain BlockChain, dropPeer peerDropFn, _ func()) *Downloader {
dl := &Downloader{
stateDB: stateDb,
mux: mux,
queue: newQueue(blockCacheMaxItems, blockCacheInitialItems),
peers: newPeerSet(),
blockchain: chain,
lightchain: lightchain,
dropPeer: dropPeer,
headerProcCh: make(chan *headerTask, 1),
quitCh: make(chan struct{}),
Expand All @@ -254,15 +242,13 @@ func (d *Downloader) Progress() ethereum.SyncProgress {

current := uint64(0)
mode := d.getMode()
switch {
case d.blockchain != nil && mode == FullSync:
switch mode {
case FullSync:
current = d.blockchain.CurrentBlock().Number.Uint64()
case d.blockchain != nil && mode == SnapSync:
case SnapSync:
current = d.blockchain.CurrentSnapBlock().Number.Uint64()
case d.lightchain != nil:
current = d.lightchain.CurrentHeader().Number.Uint64()
default:
log.Error("Unknown downloader chain/mode combo", "light", d.lightchain != nil, "full", d.blockchain != nil, "mode", mode)
log.Error("Unknown downloader mode", "mode", mode)
}
progress, pending := d.SnapSyncer.Progress()

Expand Down Expand Up @@ -455,7 +441,7 @@ func (d *Downloader) syncWithPeer(p *peerConnection, hash common.Hash, td, ttd *
if err != nil {
d.mux.Post(FailedEvent{err})
} else {
latest := d.lightchain.CurrentHeader()
latest := d.blockchain.CurrentHeader()
d.mux.Post(DoneEvent{latest})
}
}()
Expand Down Expand Up @@ -492,7 +478,7 @@ func (d *Downloader) syncWithPeer(p *peerConnection, hash common.Hash, td, ttd *
case SnapSync:
localHeight = d.blockchain.CurrentSnapBlock().Number.Uint64()
default:
localHeight = d.lightchain.CurrentHeader().Number.Uint64()
localHeight = d.blockchain.CurrentHeader().Number.Uint64()
}

origin, err := d.findAncestor(p, localHeight, remoteHeader)
Expand All @@ -502,8 +488,7 @@ func (d *Downloader) syncWithPeer(p *peerConnection, hash common.Hash, td, ttd *

if localHeight >= remoteHeight {
// if remoteHeader does not exist in local chain, will move on to insert it as a side chain.
if d.blockchain.GetBlockByHash(remoteHeader.Hash()) != nil ||
(mode == LightSync && d.blockchain.GetHeaderByHash(remoteHeader.Hash()) != nil) {
if d.blockchain.GetBlockByHash(remoteHeader.Hash()) != nil {
p.log.Warn("syncWithPeer", "local", localHeight, "remote", remoteHeight, "mode", mode, "err", errLaggingPeer)
p.peer.MarkLagging()
return errLaggingPeer
Expand Down Expand Up @@ -570,7 +555,7 @@ func (d *Downloader) syncWithPeer(p *peerConnection, hash common.Hash, td, ttd *
}
// Rewind the ancient store and blockchain if reorg happens.
if origin+1 < frozen {
if err := d.lightchain.SetHead(origin); err != nil {
if err := d.blockchain.SetHead(origin); err != nil {
return err
}
log.Info("Truncated excess ancient chain segment", "oldhead", frozen-1, "newhead", origin)
Expand Down Expand Up @@ -786,16 +771,13 @@ func (d *Downloader) findAncestor(p *peerConnection, localHeight uint64, remoteH
case SnapSync:
localHeight = d.blockchain.CurrentSnapBlock().Number.Uint64()
default:
localHeight = d.lightchain.CurrentHeader().Number.Uint64()
localHeight = d.blockchain.CurrentHeader().Number.Uint64()
}
*/
p.log.Debug("Looking for common ancestor", "local", localHeight, "remote", remoteHeight)

// Recap floor value for binary search
maxForkAncestry := FullMaxForkAncestry
if d.getMode() == LightSync {
maxForkAncestry = lightMaxForkAncestry
}
if localHeight >= maxForkAncestry {
// We're above the max reorg threshold, find the earliest fork point
floor = int64(localHeight - maxForkAncestry)
Expand All @@ -805,26 +787,6 @@ func (d *Downloader) findAncestor(p *peerConnection, localHeight uint64, remoteH
floor = int64(tail)
}

// If we're doing a light sync, ensure the floor doesn't go below the CHT, as
// all headers before that point will be missing.
if mode == LightSync {
// If we don't know the current CHT position, find it
if d.genesis == 0 {
header := d.lightchain.CurrentHeader()
for header != nil {
d.genesis = header.Number.Uint64()
if floor >= int64(d.genesis)-1 {
break
}
header = d.lightchain.GetHeaderByHash(header.ParentHash)
}
}
// We already know the "genesis" block number, cap floor to that
if floor < int64(d.genesis)-1 {
floor = int64(d.genesis) - 1
}
}

ancestor, err := d.findAncestorSpanSearch(p, mode, remoteHeight, localHeight, floor)
if err == nil {
return ancestor, nil
Expand Down Expand Up @@ -885,7 +847,7 @@ func (d *Downloader) findAncestorSpanSearch(p *peerConnection, mode SyncMode, re
case SnapSync:
known = d.blockchain.HasFastBlock(h, n)
default:
known = d.lightchain.HasHeader(h, n)
known = d.blockchain.HasHeader(h, n)
}
if known {
number, hash = n, h
Expand Down Expand Up @@ -938,13 +900,13 @@ func (d *Downloader) findAncestorBinarySearch(p *peerConnection, mode SyncMode,
case SnapSync:
known = d.blockchain.HasFastBlock(h, n)
default:
known = d.lightchain.HasHeader(h, n)
known = d.blockchain.HasHeader(h, n)
}
if !known {
end = check
continue
}
header := d.lightchain.GetHeaderByHash(h) // Independent of sync mode, header surely exists
header := d.blockchain.GetHeaderByHash(h) // Independent of sync mode, header surely exists
if header == nil {
p.log.Error("header not found", "hash", h, "request", check)
return 0, fmt.Errorf("%w: header no found (%s)", errBadPeer, h)
Expand Down Expand Up @@ -982,7 +944,6 @@ func (d *Downloader) fetchHeaders(p *peerConnection, from uint64, head uint64) e
skeleton = true // Skeleton assembly phase or finishing up
pivoting = false // Whether the next request is pivot verification
ancestor = from
mode = d.getMode()
)
for {
// Pull the next batch of headers, it either:
Expand Down Expand Up @@ -1129,13 +1090,9 @@ func (d *Downloader) fetchHeaders(p *peerConnection, from uint64, head uint64) e
if n := len(headers); n > 0 {
// Retrieve the current head we're at
var head uint64
if mode == LightSync {
head = d.lightchain.CurrentHeader().Number.Uint64()
} else {
head = d.blockchain.CurrentSnapBlock().Number.Uint64()
if full := d.blockchain.CurrentBlock().Number.Uint64(); head < full {
head = full
}
head = d.blockchain.CurrentSnapBlock().Number.Uint64()
if full := d.blockchain.CurrentBlock().Number.Uint64(); head < full {
head = full
}
// If the head is below the common ancestor, we're actually deduplicating
// already existing chain segments, so use the ancestor as the fake head.
Expand Down Expand Up @@ -1275,11 +1232,9 @@ func (d *Downloader) processHeaders(origin uint64, td, ttd *big.Int, beaconMode
// L: Sync begins, and finds common ancestor at 11
// L: Request new headers up from 11 (R's TD was higher, it must have something)
// R: Nothing to give
if mode != LightSync {
head := d.blockchain.CurrentBlock()
if !gotHeaders && td.Cmp(d.blockchain.GetTd(head.Hash(), head.Number.Uint64())) > 0 {
return errStallingPeer
}
head := d.blockchain.CurrentBlock()
if !gotHeaders && td.Cmp(d.blockchain.GetTd(head.Hash(), head.Number.Uint64())) > 0 {
return errStallingPeer
}
// If snap or light syncing, ensure promised headers are indeed delivered. This is
// needed to detect scenarios where an attacker feeds a bad pivot and then bails out
Expand All @@ -1288,9 +1243,9 @@ func (d *Downloader) processHeaders(origin uint64, td, ttd *big.Int, beaconMode
// This check cannot be executed "as is" for full imports, since blocks may still be
// queued for processing when the header download completes. However, as long as the
// peer gave us something useful, we're already happy/progressed (above check).
if mode == SnapSync || mode == LightSync {
head := d.lightchain.CurrentHeader()
if td.Cmp(d.lightchain.GetTd(head.Hash(), head.Number.Uint64())) > 0 {
if mode == SnapSync {
head := d.blockchain.CurrentHeader()
if td.Cmp(d.blockchain.GetTd(head.Hash(), head.Number.Uint64())) > 0 {
return errStallingPeer
}
}
Expand All @@ -1317,31 +1272,29 @@ func (d *Downloader) processHeaders(origin uint64, td, ttd *big.Int, beaconMode
chunkHashes := hashes[:limit]

// In case of header only syncing, validate the chunk immediately
if mode == SnapSync || mode == LightSync {
if mode == SnapSync {
if len(chunkHeaders) > 0 {
if n, err := d.lightchain.InsertHeaderChain(chunkHeaders); err != nil {
if n, err := d.blockchain.InsertHeaderChain(chunkHeaders); err != nil {
log.Warn("Invalid header encountered", "number", chunkHeaders[n].Number, "hash", chunkHashes[n], "parent", chunkHeaders[n].ParentHash, "err", err)
return fmt.Errorf("%w: %v", errInvalidChain, err)
}
}
}
// Unless we're doing light chains, schedule the headers for associated content retrieval
if mode == FullSync || mode == SnapSync {
// If we've reached the allowed number of pending headers, stall a bit
for d.queue.PendingBodies() >= maxQueuedHeaders || d.queue.PendingReceipts() >= maxQueuedHeaders {
timer.Reset(time.Second)
select {
case <-d.cancelCh:
return errCanceled
case <-timer.C:
}
}
// Otherwise insert the headers for content retrieval
inserts := d.queue.Schedule(chunkHeaders, chunkHashes, origin)
if len(inserts) != len(chunkHeaders) {
return fmt.Errorf("%w: stale headers", errBadPeer)
// If we've reached the allowed number of pending headers, stall a bit
for d.queue.PendingBodies() >= maxQueuedHeaders || d.queue.PendingReceipts() >= maxQueuedHeaders {
timer.Reset(time.Second)
select {
case <-d.cancelCh:
return errCanceled
case <-timer.C:
}
}
// Otherwise insert the headers for content retrieval
inserts := d.queue.Schedule(chunkHeaders, chunkHashes, origin)
if len(inserts) != len(chunkHeaders) {
return fmt.Errorf("%w: stale headers", errBadPeer)
}

headers = headers[limit:]
hashes = hashes[limit:]
origin += uint64(limit)
Expand Down
Loading