Skip to content

prove log spam fix #519

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 1 commit into
base: synapse
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
85 changes: 85 additions & 0 deletions tasks/pdp/cleanup_roots.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
package pdp

import (
"context"
"database/sql"
"errors"
"math/big"

"github.com/ethereum/go-ethereum/ethclient"
"golang.org/x/xerrors"

"github.com/filecoin-project/curio/harmony/harmonydb"
"github.com/filecoin-project/curio/pdp/contract"
)

// CleanupScheduledRemovals processes any scheduled root removals for a proofset
// This should be called before nextProvingPeriod to ensure deletions aren't lost
func CleanupScheduledRemovals(ctx context.Context, db *harmonydb.DB, ethClient *ethclient.Client, proofSetID int64) error {
pdpVerifier, err := contract.NewPDPVerifier(contract.ContractAddresses().PDPVerifier, ethClient)
if err != nil {
return xerrors.Errorf("failed to instantiate PDPVerifier contract: %w", err)
}

removals, err := pdpVerifier.GetScheduledRemovals(nil, big.NewInt(proofSetID))
if err != nil {
return xerrors.Errorf("failed to get scheduled removals: %w", err)
}

if len(removals) == 0 {
// No removals to process
return nil
}

// Execute cleanup in a transaction
ok, err := db.BeginTransaction(ctx, func(tx *harmonydb.Tx) (bool, error) {
for _, removeID := range removals {
log.Debugw("cleanupScheduledRemovals", "proofSetID", proofSetID, "removeID", removeID)

// Get the pdp_pieceref ID for the root before deleting
var pdpPieceRefID int64
err := tx.QueryRow(`
SELECT pdp_pieceref
FROM pdp_proofset_roots
WHERE proofset = $1 AND root_id = $2
`, proofSetID, removeID.Int64()).Scan(&pdpPieceRefID)
if err != nil {
if errors.Is(err, sql.ErrNoRows) {
// Root already deleted, skip
continue
}
return false, xerrors.Errorf("failed to get piece ref for root %d: %w", removeID, err)
}

// Delete the parked piece ref, this will cascade to the pdp piece ref too
_, err = tx.Exec(`
DELETE FROM parked_piece_refs
WHERE ref_id = $1
`, pdpPieceRefID)
if err != nil {
return false, xerrors.Errorf("failed to delete parked piece ref %d: %w", pdpPieceRefID, err)
}

// Delete the root entry
_, err = tx.Exec(`
DELETE FROM pdp_proofset_roots
WHERE proofset = $1 AND root_id = $2
`, proofSetID, removeID)
if err != nil {
return false, xerrors.Errorf("failed to delete root %d: %w", removeID, err)
}
}

return true, nil
}, harmonydb.OptionRetry())

if err != nil {
return xerrors.Errorf("failed to cleanup deleted roots: %w", err)
}
if !ok {
return xerrors.Errorf("database delete not committed")
}

log.Infow("cleaned up scheduled removals", "proofSetID", proofSetID, "count", len(removals))
return nil
}
8 changes: 8 additions & 0 deletions tasks/pdp/task_init_pp.go
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,14 @@ func (ipp *InitProvingPeriodTask) Do(taskID harmonytask.TaskID, stillOwned func(
return false, xerrors.Errorf("failed to get next challenge window start: %w", err)
}
init_prove_at = init_prove_at.Add(init_prove_at, challengeWindow.Div(challengeWindow, big.NewInt(2))) // Give a buffer of 1/2 challenge window epochs so that we are still within challenge window

// Clean up any scheduled removals before calling nextProvingPeriod
// This prevents losing deletions if we missed a proving window
err = CleanupScheduledRemovals(ctx, ipp.db, ipp.ethClient, proofSetID)
if err != nil {
return false, xerrors.Errorf("failed to cleanup scheduled removals: %w", err)
}

// Instantiate the PDPVerifier contract
pdpContracts := contract.ContractAddresses()
pdpVeriferAddress := pdpContracts.PDPVerifier
Expand Down
7 changes: 7 additions & 0 deletions tasks/pdp/task_next_pp.go
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,13 @@ func (n *NextProvingPeriodTask) Do(taskID harmonytask.TaskID, stillOwned func()
return false, xerrors.Errorf("failed to get next challenge window start: %w", err)
}

// Clean up any scheduled removals before calling nextProvingPeriod
// This prevents losing deletions if we missed a proving window
err = CleanupScheduledRemovals(ctx, n.db, n.ethClient, proofSetID)
if err != nil {
return false, xerrors.Errorf("failed to cleanup scheduled removals: %w", err)
}

// Instantiate the PDPVerifier contract
pdpContracts := contract.ContractAddresses()
pdpVerifierAddress := pdpContracts.PDPVerifier
Expand Down
77 changes: 18 additions & 59 deletions tasks/pdp/task_prove.go
Original file line number Diff line number Diff line change
Expand Up @@ -169,6 +169,23 @@ func (p *ProveTask) Do(taskID harmonytask.TaskID, stillOwned func() bool) (done
return false, xerrors.Errorf("failed to get task details: %w", err)
}

// Check if the proofset has any roots
var rootCount int
err = p.db.QueryRow(ctx, `
SELECT COUNT(*)
FROM pdp_proofset_roots
WHERE proofset = $1
`, proofSetID).Scan(&rootCount)
if err != nil {
return false, xerrors.Errorf("failed to check root count: %w", err)
}

if rootCount == 0 {
// No roots in proofset, nothing to prove
log.Infow("PDP Prove Task: proofset has no roots, skipping", "proofSetID", proofSetID, "taskID", taskID)
return true, nil
}

pdpContracts := contract.ContractAddresses()
pdpVerifierAddress := pdpContracts.PDPVerifier

Expand Down Expand Up @@ -282,7 +299,7 @@ func (p *ProveTask) Do(taskID harmonytask.TaskID, stillOwned func() bool) (done
}

// Remove the roots previously scheduled for deletion
err = p.cleanupDeletedRoots(ctx, proofSetID, pdpVerifier)
err = CleanupScheduledRemovals(ctx, p.db, p.ethClient, proofSetID)
if err != nil {
return false, xerrors.Errorf("failed to cleanup deleted roots: %w", err)
}
Expand Down Expand Up @@ -654,64 +671,6 @@ func (p *ProveTask) getSenderAddress(ctx context.Context, match common.Address)
return address, nil
}

func (p *ProveTask) cleanupDeletedRoots(ctx context.Context, proofSetID int64, pdpVerifier *contract.PDPVerifier) error {

removals, err := pdpVerifier.GetScheduledRemovals(nil, big.NewInt(proofSetID))
if err != nil {
return xerrors.Errorf("failed to get scheduled removals: %w", err)
}

// Execute cleanup in a transaction
ok, err := p.db.BeginTransaction(ctx, func(tx *harmonydb.Tx) (bool, error) {

for _, removeID := range removals {
log.Debugw("cleanupDeletedRoots", "removeID", removeID)
// Get the pdp_pieceref ID for the root before deleting
var pdpPieceRefID int64
err := tx.QueryRow(`
SELECT pdp_pieceref
FROM pdp_proofset_roots
WHERE proofset = $1 AND root_id = $2
`, proofSetID, removeID.Int64()).Scan(&pdpPieceRefID)
if err != nil {
if errors.Is(err, sql.ErrNoRows) {
// Root already deleted, skip
continue
}
return false, xerrors.Errorf("failed to get piece ref for root %d: %w", removeID, err)
}

// Delete the parked piece ref, this will cascade to the pdp piece ref too
_, err = tx.Exec(`
DELETE FROM parked_piece_refs
WHERE ref_id = $1
`, pdpPieceRefID)
if err != nil {
return false, xerrors.Errorf("failed to delete parked piece ref %d: %w", pdpPieceRefID, err)
}

// Delete the root entry
_, err = tx.Exec(`
DELETE FROM pdp_proofset_roots
WHERE proofset = $1 AND root_id = $2
`, proofSetID, removeID)
if err != nil {
return false, xerrors.Errorf("failed to delete root %d: %w", removeID, err)
}
}

return true, nil
}, harmonydb.OptionRetry())

if err != nil {
return xerrors.Errorf("failed to cleanup deleted roots: %w", err)
}
if !ok {
return xerrors.Errorf("database delete not committed")
}

return nil
}

func (p *ProveTask) CanAccept(ids []harmonytask.TaskID, engine *harmonytask.TaskEngine) (*harmonytask.TaskID, error) {
if len(ids) == 0 {
Expand Down
Loading