Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 13 additions & 11 deletions node/src/manager/commands/rewind.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@ use graph::anyhow::bail;
use graph::components::store::{BlockStore as _, ChainStore as _, DeploymentLocator};
use graph::env::ENV_VARS;
use graph::prelude::{anyhow, BlockNumber, BlockPtr};
use graph_store_postgres::command_support::catalog::{self as store_catalog};
use graph_store_postgres::{BlockStore, NotificationSender};
use graph_store_postgres::{ConnectionPool, Store};

Expand Down Expand Up @@ -78,8 +77,6 @@ pub async fn run(
if !start_block && (block_hash.is_none() || block_number.is_none()) {
bail!("--block-hash and --block-number must be specified when --start-block is not set");
}
let pconn = primary.get()?;
let mut conn = store_catalog::Connection::new(pconn);

let subgraph_store = store.subgraph_store();
let block_store = store.block_store();
Expand Down Expand Up @@ -126,14 +123,19 @@ pub async fn run(

println!("Checking if its safe to rewind deployments");
for (_, locator) in &locators {
let site = conn
.locate_site(locator.clone())?
.ok_or_else(|| anyhow!("failed to locate site for {locator}"))?;
let deployment_store = subgraph_store.for_site(&site)?;
let deployment_details = deployment_store.deployment_details_for_id(locator)?;
let block_number_to = block_ptr_to.as_ref().map(|b| b.number).unwrap_or(0);

if block_number_to < deployment_details.earliest_block_number + ENV_VARS.reorg_threshold() {
let deployment_details = subgraph_store.load_deployment_by_id(locator.clone().into())?;
let mut block_number_to = block_ptr_to.as_ref().map(|b| b.number).unwrap_or(0);

if start_block {
block_number_to = match deployment_details.start_block {
Some(block) => block.number,
None => 0,
};
}

if block_number_to < deployment_details.earliest_block_number + ENV_VARS.reorg_threshold()
&& !start_block
{
bail!(
"The block number {} is not safe to rewind to for deployment {}. The earliest block number of this deployment is {}. You can only safely rewind to block number {}",
block_ptr_to.as_ref().map(|b| b.number).unwrap_or(0),
Expand Down
53 changes: 38 additions & 15 deletions store/postgres/src/deployment.rs
Original file line number Diff line number Diff line change
Expand Up @@ -581,22 +581,45 @@ pub fn revert_block_ptr(
) -> Result<(), StoreError> {
use deployment as d;
use head as h;
use subgraph_manifest as m;

// Intention is to revert to a block lower than the reorg threshold, on the other
// hand the earliest we can possibly go is genesys block, so go to genesys even
// if it's within the reorg threshold.
let earliest_block = i32::max(ptr.number - ENV_VARS.reorg_threshold(), 0);
let affected_rows = update(
d::table
.filter(d::id.eq(site.id))
.filter(d::earliest_block_number.le(earliest_block)),
)
.set((
d::reorg_count.eq(d::reorg_count + 1),
d::current_reorg_depth.eq(d::current_reorg_depth + 1),
d::max_reorg_depth.eq(sql("greatest(current_reorg_depth + 1, max_reorg_depth)")),
))
.execute(conn)?;
// Check if ptr is equal to the start block of the deployment
let (start_block_hash, start_block_number): (Option<Vec<u8>>, Option<BlockNumber>) = m::table
.filter(m::id.eq(site.id))
.select((m::start_block_hash, m::start_block_number))
.first(conn)?;

let is_start_block = match (start_block_hash, start_block_number) {
(Some(hash), Some(number)) => ptr.number == number && ptr.hash_slice() == hash.as_slice(),
_ => false,
};

let affected_rows = if is_start_block {
update(d::table.filter(d::id.eq(site.id)))
.set((
d::reorg_count.eq(d::reorg_count + 1),
d::current_reorg_depth.eq(d::current_reorg_depth + 1),
d::max_reorg_depth.eq(sql("greatest(current_reorg_depth + 1, max_reorg_depth)")),
d::earliest_block_number.eq(ptr.number),
))
.execute(conn)?
} else {
// Intention is to revert to a block lower than the reorg threshold, on the other
// hand the earliest we can possibly go is genesis block, so go to genesis even
// if it's within the reorg threshold.
let earliest_block = i32::max(ptr.number - ENV_VARS.reorg_threshold(), 0);
update(
d::table
.filter(d::id.eq(site.id))
.filter(d::earliest_block_number.le(earliest_block)),
)
.set((
d::reorg_count.eq(d::reorg_count + 1),
d::current_reorg_depth.eq(d::current_reorg_depth + 1),
d::max_reorg_depth.eq(sql("greatest(current_reorg_depth + 1, max_reorg_depth)")),
))
.execute(conn)?
};

update(h::table.filter(h::id.eq(site.id)))
.set((
Expand Down