diff --git a/graph/examples/append_row.rs b/graph/examples/append_row.rs new file mode 100644 index 00000000000..59f6fc3a5f2 --- /dev/null +++ b/graph/examples/append_row.rs @@ -0,0 +1,123 @@ +use std::{collections::HashSet, sync::Arc, time::Instant}; + +use anyhow::anyhow; +use clap::Parser; +use graph::{ + components::store::write::{EntityModification, RowGroupForPerfTest as RowGroup}, + data::{ + store::{Id, Value}, + subgraph::DeploymentHash, + value::Word, + }, + schema::{EntityType, InputSchema}, +}; +use lazy_static::lazy_static; +use rand::{rng, Rng}; + +#[derive(Parser)] +#[clap( + name = "append_row", + about = "Measure time it takes to append rows to a row group" +)] +struct Opt { + /// Number of repetitions of the test + #[clap(short, long, default_value = "5")] + niter: usize, + /// Number of rows + #[clap(short, long, default_value = "10000")] + rows: usize, + /// Number of blocks + #[clap(short, long, default_value = "300")] + blocks: usize, + /// Number of ids + #[clap(short, long, default_value = "500")] + ids: usize, +} + +// A very fake schema that allows us to get the entity types we need +const GQL: &str = r#" + type Thing @entity { id: ID!, count: Int! } + type RowGroup @entity { id: ID! } + type Entry @entity { id: ID! } + "#; +lazy_static! { + static ref DEPLOYMENT: DeploymentHash = DeploymentHash::new("batchAppend").unwrap(); + static ref SCHEMA: InputSchema = InputSchema::parse_latest(GQL, DEPLOYMENT.clone()).unwrap(); + static ref THING_TYPE: EntityType = SCHEMA.entity_type("Thing").unwrap(); + static ref ROW_GROUP_TYPE: EntityType = SCHEMA.entity_type("RowGroup").unwrap(); + static ref ENTRY_TYPE: EntityType = SCHEMA.entity_type("Entry").unwrap(); +} + +pub fn main() -> anyhow::Result<()> { + let opt = Opt::parse(); + let next_block = opt.blocks as f64 / opt.rows as f64; + for _ in 0..opt.niter { + let ids = (0..opt.ids) + .map(|n| Id::String(Word::from(format!("00{n}010203040506")))) + .collect::>(); + let mut existing: HashSet = HashSet::new(); + let mut mods = Vec::new(); + let mut block = 0; + let mut block_pos = Vec::new(); + for _ in 0..opt.rows { + if rng().random_bool(next_block) { + block += 1; + block_pos.clear(); + } + + let mut attempt = 0; + let pos = loop { + if attempt > 20 { + return Err(anyhow!( + "Failed to find a position in 20 attempts. Increase `ids`" + )); + } + attempt += 1; + let pos = rng().random_range(0..opt.ids); + if block_pos.contains(&pos) { + continue; + } + block_pos.push(pos); + break pos; + }; + let id = &ids[pos]; + let data = vec![ + (Word::from("id"), Value::String(id.to_string())), + (Word::from("count"), Value::Int(block as i32)), + ]; + let data = Arc::new(SCHEMA.make_entity(data).unwrap()); + let md = if existing.contains(id) { + EntityModification::Overwrite { + key: THING_TYPE.key(id.clone()), + data, + block, + end: None, + } + } else { + existing.insert(id.clone()); + EntityModification::Insert { + key: THING_TYPE.key(id.clone()), + data, + block, + end: None, + } + }; + mods.push(md); + } + let mut group = RowGroup::new(THING_TYPE.clone(), false); + + let start = Instant::now(); + for md in mods { + group.append_row(md).unwrap(); + } + let elapsed = start.elapsed(); + println!( + "Adding {} rows with {} ids across {} blocks took {:?}", + opt.rows, + existing.len(), + block, + elapsed + ); + } + Ok(()) +} diff --git a/graph/src/components/store/write.rs b/graph/src/components/store/write.rs index 2c470fd32be..76c71ce5e39 100644 --- a/graph/src/components/store/write.rs +++ b/graph/src/components/store/write.rs @@ -1,5 +1,8 @@ //! Data structures and helpers for writing subgraph changes to the store -use std::{collections::HashSet, sync::Arc}; +use std::{ + collections::{HashMap, HashSet}, + sync::Arc, +}; use crate::{ blockchain::{block_stream::FirehoseCursor, BlockPtr, BlockTime}, @@ -8,6 +11,7 @@ use crate::{ data::{store::Id, subgraph::schema::SubgraphError}, data_source::CausalityRegion, derive::CacheWeight, + env::ENV_VARS, internal_error, util::cache_weight::CacheWeight, }; @@ -154,9 +158,10 @@ impl EntityModification { } pub fn creates_entity(&self) -> bool { + use EntityModification::*; match self { - EntityModification::Insert { .. } => true, - EntityModification::Overwrite { .. } | EntityModification::Remove { .. } => false, + Insert { .. } => true, + Overwrite { .. } | Remove { .. } => false, } } @@ -310,6 +315,10 @@ pub struct RowGroup { rows: Vec, immutable: bool, + + /// Map the `key.entity_id` of all entries in `rows` to the index with + /// the most recent entry for that id to speed up lookups + last_mod: HashMap, } impl RowGroup { @@ -318,6 +327,7 @@ impl RowGroup { entity_type, rows: Vec::new(), immutable, + last_mod: HashMap::new(), } } @@ -374,6 +384,21 @@ impl RowGroup { } pub fn last_op(&self, key: &EntityKey, at: BlockNumber) -> Option> { + if ENV_VARS.store.write_batch_memoize { + let idx = *self.last_mod.get(&key.entity_id)?; + if let Some(op) = self.rows.get(idx).and_then(|emod| { + if emod.block() <= at { + Some(emod.as_entity_op(at)) + } else { + None + } + }) { + return Some(op); + } + } + // We are looking for the change at a block `at` that is before the + // change we remember in `last_mod`, and therefore have to scan + // through all changes self.rows .iter() // We are scanning backwards, i.e., in descendng order of @@ -383,7 +408,14 @@ impl RowGroup { .map(|emod| emod.as_entity_op(at)) } + /// Return an iterator over all changes that are effective at `at`. That + /// makes it possible to construct the state that the deployment will + /// have once all changes for block `at` have been written. pub fn effective_ops(&self, at: BlockNumber) -> impl Iterator> { + // We don't use `self.last_mod` here, because we need to return + // operations for all entities that have pending changes at block + // `at`, and there is no guarantee that `self.last_mod` is visible + // at `at` since the change in `self.last_mod` might come after `at` let mut seen = HashSet::new(); self.rows .iter() @@ -400,7 +432,12 @@ impl RowGroup { /// Find the most recent entry for `id` fn prev_row_mut(&mut self, id: &Id) -> Option<&mut EntityModification> { - self.rows.iter_mut().rfind(|emod| emod.id() == id) + if ENV_VARS.store.write_batch_memoize { + let idx = *self.last_mod.get(id)?; + self.rows.get_mut(idx) + } else { + self.rows.iter_mut().rfind(|emod| emod.id() == id) + } } /// Append `row` to `self.rows` by combining it with a previously @@ -433,6 +470,14 @@ impl RowGroup { )); } + if row.id() != prev_row.id() { + return Err(internal_error!( + "last_mod map is corrupted: got id {} looking up id {}", + prev_row.id(), + row.id() + )); + } + // The heart of the matter: depending on what `row` is, clamp // `prev_row` and either ignore `row` since it is not needed, or // turn it into an `Insert`, which also does not require @@ -460,25 +505,31 @@ impl RowGroup { Insert { .. }, ) => { // prev_row was deleted - self.rows.push(row); + self.push_row(row); } ( Insert { end: None, .. } | Overwrite { end: None, .. }, Overwrite { block, .. }, ) => { prev_row.clamp(*block)?; - self.rows.push(row.as_insert(&self.entity_type)?); + let row = row.as_insert(&self.entity_type)?; + self.push_row(row); } (Insert { end: None, .. } | Overwrite { end: None, .. }, Remove { block, .. }) => { prev_row.clamp(*block)?; } } } else { - self.rows.push(row); + self.push_row(row); } Ok(()) } + fn push_row(&mut self, row: EntityModification) { + self.last_mod.insert(row.id().clone(), self.rows.len()); + self.rows.push(row); + } + fn append(&mut self, group: RowGroup) -> Result<(), StoreError> { if self.entity_type != group.entity_type { return Err(internal_error!( @@ -501,6 +552,22 @@ impl RowGroup { } } +pub struct RowGroupForPerfTest(RowGroup); + +impl RowGroupForPerfTest { + pub fn new(entity_type: EntityType, immutable: bool) -> Self { + Self(RowGroup::new(entity_type, immutable)) + } + + pub fn push(&mut self, emod: EntityModification, block: BlockNumber) -> Result<(), StoreError> { + self.0.push(emod, block) + } + + pub fn append_row(&mut self, row: EntityModification) -> Result<(), StoreError> { + self.0.append_row(row) + } +} + struct ClampsByBlockIterator<'a> { position: usize, rows: &'a [EntityModification], @@ -643,7 +710,7 @@ pub struct Batch { pub first_block: BlockNumber, /// The firehose cursor corresponding to `block_ptr` pub firehose_cursor: FirehoseCursor, - mods: RowGroups, + pub mods: RowGroups, /// New data sources pub data_sources: DataSources, pub deterministic_errors: Vec, @@ -908,6 +975,7 @@ impl<'a> Iterator for WriteChunkIter<'a> { #[cfg(test)] mod test { + use std::collections::HashMap; use std::sync::Arc; use crate::{ @@ -931,7 +999,7 @@ mod test { assert_eq!(values.len(), blocks.len()); - let rows = values + let rows: Vec<_> = values .iter() .zip(blocks.iter()) .map(|(value, block)| EntityModification::Remove { @@ -939,10 +1007,19 @@ mod test { block: *block, }) .collect(); + let last_mod = rows + .iter() + .enumerate() + .fold(HashMap::new(), |mut map, (idx, emod)| { + map.insert(emod.id().clone(), idx); + map + }); + let group = RowGroup { entity_type: ENTRY_TYPE.clone(), rows, immutable: false, + last_mod, }; let act = group .clamps_by_block() diff --git a/graph/src/env/store.rs b/graph/src/env/store.rs index 7d52d514f1b..e267b28d8ce 100644 --- a/graph/src/env/store.rs +++ b/graph/src/env/store.rs @@ -129,6 +129,11 @@ pub struct EnvVarsStore { /// is 10_000 which corresponds to 10MB. Setting this to 0 disables /// write batching. pub write_batch_size: usize, + /// Whether to memoize the last operation for each entity in a write + /// batch to speed up adding more entities. Set by + /// `GRAPH_STORE_WRITE_BATCH_MEMOIZE`. The default is `true`. + /// Remove after 2025-07-01 if there have been no issues with it. + pub write_batch_memoize: bool, /// Whether to create GIN indexes for array attributes. Set by /// `GRAPH_STORE_CREATE_GIN_INDEXES`. The default is `false` pub create_gin_indexes: bool, @@ -184,6 +189,7 @@ impl TryFrom for EnvVarsStore { connection_min_idle: x.connection_min_idle, connection_idle_timeout: Duration::from_secs(x.connection_idle_timeout_in_secs), write_queue_size: x.write_queue_size, + write_batch_memoize: x.write_batch_memoize, batch_target_duration: Duration::from_secs(x.batch_target_duration_in_secs), batch_timeout: x.batch_timeout_in_secs.map(Duration::from_secs), batch_workers: x.batch_workers, @@ -277,6 +283,8 @@ pub struct InnerStore { write_batch_duration_in_secs: u64, #[envconfig(from = "GRAPH_STORE_WRITE_BATCH_SIZE", default = "10000")] write_batch_size: usize, + #[envconfig(from = "GRAPH_STORE_WRITE_BATCH_MEMOIZE", default = "true")] + write_batch_memoize: bool, #[envconfig(from = "GRAPH_STORE_CREATE_GIN_INDEXES", default = "false")] create_gin_indexes: bool, #[envconfig(from = "GRAPH_STORE_USE_BRIN_FOR_ALL_QUERY_TYPES", default = "false")]