From 16c7ca6f48beb2f1e884ed53860753e3fd831974 Mon Sep 17 00:00:00 2001 From: shiyasmohd Date: Sat, 28 Dec 2024 23:52:12 +0530 Subject: [PATCH] node: fix graphman rewind to start block --- node/src/manager/commands/rewind.rs | 24 +++++++------ store/postgres/src/deployment.rs | 53 +++++++++++++++++++++-------- 2 files changed, 51 insertions(+), 26 deletions(-) diff --git a/node/src/manager/commands/rewind.rs b/node/src/manager/commands/rewind.rs index 51d432dfd49..fde8f86a8b9 100644 --- a/node/src/manager/commands/rewind.rs +++ b/node/src/manager/commands/rewind.rs @@ -9,7 +9,6 @@ use graph::anyhow::bail; use graph::components::store::{BlockStore as _, ChainStore as _, DeploymentLocator}; use graph::env::ENV_VARS; use graph::prelude::{anyhow, BlockNumber, BlockPtr}; -use graph_store_postgres::command_support::catalog::{self as store_catalog}; use graph_store_postgres::{BlockStore, NotificationSender}; use graph_store_postgres::{ConnectionPool, Store}; @@ -78,8 +77,6 @@ pub async fn run( if !start_block && (block_hash.is_none() || block_number.is_none()) { bail!("--block-hash and --block-number must be specified when --start-block is not set"); } - let pconn = primary.get()?; - let mut conn = store_catalog::Connection::new(pconn); let subgraph_store = store.subgraph_store(); let block_store = store.block_store(); @@ -126,14 +123,19 @@ pub async fn run( println!("Checking if its safe to rewind deployments"); for (_, locator) in &locators { - let site = conn - .locate_site(locator.clone())? - .ok_or_else(|| anyhow!("failed to locate site for {locator}"))?; - let deployment_store = subgraph_store.for_site(&site)?; - let deployment_details = deployment_store.deployment_details_for_id(locator)?; - let block_number_to = block_ptr_to.as_ref().map(|b| b.number).unwrap_or(0); - - if block_number_to < deployment_details.earliest_block_number + ENV_VARS.reorg_threshold() { + let deployment_details = subgraph_store.load_deployment_by_id(locator.clone().into())?; + let mut block_number_to = block_ptr_to.as_ref().map(|b| b.number).unwrap_or(0); + + if start_block { + block_number_to = match deployment_details.start_block { + Some(block) => block.number, + None => 0, + }; + } + + if block_number_to < deployment_details.earliest_block_number + ENV_VARS.reorg_threshold() + && !start_block + { bail!( "The block number {} is not safe to rewind to for deployment {}. The earliest block number of this deployment is {}. You can only safely rewind to block number {}", block_ptr_to.as_ref().map(|b| b.number).unwrap_or(0), diff --git a/store/postgres/src/deployment.rs b/store/postgres/src/deployment.rs index 4b3da58469d..6e5818ad7d6 100644 --- a/store/postgres/src/deployment.rs +++ b/store/postgres/src/deployment.rs @@ -581,22 +581,45 @@ pub fn revert_block_ptr( ) -> Result<(), StoreError> { use deployment as d; use head as h; + use subgraph_manifest as m; - // Intention is to revert to a block lower than the reorg threshold, on the other - // hand the earliest we can possibly go is genesys block, so go to genesys even - // if it's within the reorg threshold. - let earliest_block = i32::max(ptr.number - ENV_VARS.reorg_threshold(), 0); - let affected_rows = update( - d::table - .filter(d::id.eq(site.id)) - .filter(d::earliest_block_number.le(earliest_block)), - ) - .set(( - d::reorg_count.eq(d::reorg_count + 1), - d::current_reorg_depth.eq(d::current_reorg_depth + 1), - d::max_reorg_depth.eq(sql("greatest(current_reorg_depth + 1, max_reorg_depth)")), - )) - .execute(conn)?; + // Check if ptr is equal to the start block of the deployment + let (start_block_hash, start_block_number): (Option>, Option) = m::table + .filter(m::id.eq(site.id)) + .select((m::start_block_hash, m::start_block_number)) + .first(conn)?; + + let is_start_block = match (start_block_hash, start_block_number) { + (Some(hash), Some(number)) => ptr.number == number && ptr.hash_slice() == hash.as_slice(), + _ => false, + }; + + let affected_rows = if is_start_block { + update(d::table.filter(d::id.eq(site.id))) + .set(( + d::reorg_count.eq(d::reorg_count + 1), + d::current_reorg_depth.eq(d::current_reorg_depth + 1), + d::max_reorg_depth.eq(sql("greatest(current_reorg_depth + 1, max_reorg_depth)")), + d::earliest_block_number.eq(ptr.number), + )) + .execute(conn)? + } else { + // Intention is to revert to a block lower than the reorg threshold, on the other + // hand the earliest we can possibly go is genesis block, so go to genesis even + // if it's within the reorg threshold. + let earliest_block = i32::max(ptr.number - ENV_VARS.reorg_threshold(), 0); + update( + d::table + .filter(d::id.eq(site.id)) + .filter(d::earliest_block_number.le(earliest_block)), + ) + .set(( + d::reorg_count.eq(d::reorg_count + 1), + d::current_reorg_depth.eq(d::current_reorg_depth + 1), + d::max_reorg_depth.eq(sql("greatest(current_reorg_depth + 1, max_reorg_depth)")), + )) + .execute(conn)? + }; update(h::table.filter(h::id.eq(site.id))) .set((