Skip to content

Speed up appending changes to a batch #6025

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
123 changes: 123 additions & 0 deletions graph/examples/append_row.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,123 @@
use std::{collections::HashSet, sync::Arc, time::Instant};

use anyhow::anyhow;
use clap::Parser;
use graph::{
components::store::write::{EntityModification, RowGroupForPerfTest as RowGroup},
data::{
store::{Id, Value},
subgraph::DeploymentHash,
value::Word,
},
schema::{EntityType, InputSchema},
};
use lazy_static::lazy_static;
use rand::{rng, Rng};

#[derive(Parser)]
#[clap(
name = "append_row",
about = "Measure time it takes to append rows to a row group"
)]
struct Opt {
/// Number of repetitions of the test
#[clap(short, long, default_value = "5")]
niter: usize,
/// Number of rows
#[clap(short, long, default_value = "10000")]
rows: usize,
/// Number of blocks
#[clap(short, long, default_value = "300")]
blocks: usize,
/// Number of ids
#[clap(short, long, default_value = "500")]
ids: usize,
}

// A very fake schema that allows us to get the entity types we need
const GQL: &str = r#"
type Thing @entity { id: ID!, count: Int! }
type RowGroup @entity { id: ID! }
type Entry @entity { id: ID! }
"#;
lazy_static! {
static ref DEPLOYMENT: DeploymentHash = DeploymentHash::new("batchAppend").unwrap();
static ref SCHEMA: InputSchema = InputSchema::parse_latest(GQL, DEPLOYMENT.clone()).unwrap();
static ref THING_TYPE: EntityType = SCHEMA.entity_type("Thing").unwrap();
static ref ROW_GROUP_TYPE: EntityType = SCHEMA.entity_type("RowGroup").unwrap();
static ref ENTRY_TYPE: EntityType = SCHEMA.entity_type("Entry").unwrap();
}

pub fn main() -> anyhow::Result<()> {
let opt = Opt::parse();
let next_block = opt.blocks as f64 / opt.rows as f64;
for _ in 0..opt.niter {
let ids = (0..opt.ids)
.map(|n| Id::String(Word::from(format!("00{n}010203040506"))))
.collect::<Vec<_>>();
let mut existing: HashSet<Id> = HashSet::new();
let mut mods = Vec::new();
let mut block = 0;
let mut block_pos = Vec::new();
for _ in 0..opt.rows {
if rng().random_bool(next_block) {
block += 1;
block_pos.clear();
}

let mut attempt = 0;
let pos = loop {
if attempt > 20 {
return Err(anyhow!(
"Failed to find a position in 20 attempts. Increase `ids`"
));
}
attempt += 1;
let pos = rng().random_range(0..opt.ids);
if block_pos.contains(&pos) {
continue;
}
block_pos.push(pos);
break pos;
};
let id = &ids[pos];
let data = vec![
(Word::from("id"), Value::String(id.to_string())),
(Word::from("count"), Value::Int(block as i32)),
];
let data = Arc::new(SCHEMA.make_entity(data).unwrap());
let md = if existing.contains(id) {
EntityModification::Overwrite {
key: THING_TYPE.key(id.clone()),
data,
block,
end: None,
}
} else {
existing.insert(id.clone());
EntityModification::Insert {
key: THING_TYPE.key(id.clone()),
data,
block,
end: None,
}
};
mods.push(md);
}
let mut group = RowGroup::new(THING_TYPE.clone(), false);

let start = Instant::now();
for md in mods {
group.append_row(md).unwrap();
}
let elapsed = start.elapsed();
println!(
"Adding {} rows with {} ids across {} blocks took {:?}",
opt.rows,
existing.len(),
block,
elapsed
);
}
Ok(())
}
95 changes: 86 additions & 9 deletions graph/src/components/store/write.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
//! Data structures and helpers for writing subgraph changes to the store
use std::{collections::HashSet, sync::Arc};
use std::{
collections::{HashMap, HashSet},
sync::Arc,
};

use crate::{
blockchain::{block_stream::FirehoseCursor, BlockPtr, BlockTime},
Expand All @@ -8,6 +11,7 @@ use crate::{
data::{store::Id, subgraph::schema::SubgraphError},
data_source::CausalityRegion,
derive::CacheWeight,
env::ENV_VARS,
internal_error,
util::cache_weight::CacheWeight,
};
Expand Down Expand Up @@ -154,9 +158,10 @@ impl EntityModification {
}

pub fn creates_entity(&self) -> bool {
use EntityModification::*;
match self {
EntityModification::Insert { .. } => true,
EntityModification::Overwrite { .. } | EntityModification::Remove { .. } => false,
Insert { .. } => true,
Overwrite { .. } | Remove { .. } => false,
}
}

Expand Down Expand Up @@ -310,6 +315,10 @@ pub struct RowGroup {
rows: Vec<EntityModification>,

immutable: bool,

/// Map the `key.entity_id` of all entries in `rows` to the index with
/// the most recent entry for that id to speed up lookups
last_mod: HashMap<Id, usize>,
}

impl RowGroup {
Expand All @@ -318,6 +327,7 @@ impl RowGroup {
entity_type,
rows: Vec::new(),
immutable,
last_mod: HashMap::new(),
}
}

Expand Down Expand Up @@ -374,6 +384,21 @@ impl RowGroup {
}

pub fn last_op(&self, key: &EntityKey, at: BlockNumber) -> Option<EntityOp<'_>> {
if ENV_VARS.store.write_batch_memoize {
let idx = *self.last_mod.get(&key.entity_id)?;
if let Some(op) = self.rows.get(idx).and_then(|emod| {
if emod.block() <= at {
Some(emod.as_entity_op(at))
} else {
None
}
}) {
return Some(op);
}
}
// We are looking for the change at a block `at` that is before the
// change we remember in `last_mod`, and therefore have to scan
// through all changes
self.rows
.iter()
// We are scanning backwards, i.e., in descendng order of
Expand All @@ -383,7 +408,14 @@ impl RowGroup {
.map(|emod| emod.as_entity_op(at))
}

/// Return an iterator over all changes that are effective at `at`. That
/// makes it possible to construct the state that the deployment will
/// have once all changes for block `at` have been written.
pub fn effective_ops(&self, at: BlockNumber) -> impl Iterator<Item = EntityOp<'_>> {
// We don't use `self.last_mod` here, because we need to return
// operations for all entities that have pending changes at block
// `at`, and there is no guarantee that `self.last_mod` is visible
// at `at` since the change in `self.last_mod` might come after `at`
let mut seen = HashSet::new();
self.rows
.iter()
Expand All @@ -400,7 +432,12 @@ impl RowGroup {

/// Find the most recent entry for `id`
fn prev_row_mut(&mut self, id: &Id) -> Option<&mut EntityModification> {
self.rows.iter_mut().rfind(|emod| emod.id() == id)
if ENV_VARS.store.write_batch_memoize {
let idx = *self.last_mod.get(id)?;
self.rows.get_mut(idx)
} else {
self.rows.iter_mut().rfind(|emod| emod.id() == id)
}
}

/// Append `row` to `self.rows` by combining it with a previously
Expand Down Expand Up @@ -433,6 +470,14 @@ impl RowGroup {
));
}

if row.id() != prev_row.id() {
return Err(internal_error!(
"last_mod map is corrupted: got id {} looking up id {}",
prev_row.id(),
row.id()
));
}

// The heart of the matter: depending on what `row` is, clamp
// `prev_row` and either ignore `row` since it is not needed, or
// turn it into an `Insert`, which also does not require
Expand Down Expand Up @@ -460,25 +505,31 @@ impl RowGroup {
Insert { .. },
) => {
// prev_row was deleted
self.rows.push(row);
self.push_row(row);
}
(
Insert { end: None, .. } | Overwrite { end: None, .. },
Overwrite { block, .. },
) => {
prev_row.clamp(*block)?;
self.rows.push(row.as_insert(&self.entity_type)?);
let row = row.as_insert(&self.entity_type)?;
self.push_row(row);
}
(Insert { end: None, .. } | Overwrite { end: None, .. }, Remove { block, .. }) => {
prev_row.clamp(*block)?;
}
}
} else {
self.rows.push(row);
self.push_row(row);
}
Ok(())
}

fn push_row(&mut self, row: EntityModification) {
self.last_mod.insert(row.id().clone(), self.rows.len());
self.rows.push(row);
}

fn append(&mut self, group: RowGroup) -> Result<(), StoreError> {
if self.entity_type != group.entity_type {
return Err(internal_error!(
Expand All @@ -501,6 +552,22 @@ impl RowGroup {
}
}

pub struct RowGroupForPerfTest(RowGroup);

impl RowGroupForPerfTest {
pub fn new(entity_type: EntityType, immutable: bool) -> Self {
Self(RowGroup::new(entity_type, immutable))
}

pub fn push(&mut self, emod: EntityModification, block: BlockNumber) -> Result<(), StoreError> {
self.0.push(emod, block)
}

pub fn append_row(&mut self, row: EntityModification) -> Result<(), StoreError> {
self.0.append_row(row)
}
}

struct ClampsByBlockIterator<'a> {
position: usize,
rows: &'a [EntityModification],
Expand Down Expand Up @@ -643,7 +710,7 @@ pub struct Batch {
pub first_block: BlockNumber,
/// The firehose cursor corresponding to `block_ptr`
pub firehose_cursor: FirehoseCursor,
mods: RowGroups,
pub mods: RowGroups,
/// New data sources
pub data_sources: DataSources,
pub deterministic_errors: Vec<SubgraphError>,
Expand Down Expand Up @@ -908,6 +975,7 @@ impl<'a> Iterator for WriteChunkIter<'a> {

#[cfg(test)]
mod test {
use std::collections::HashMap;
use std::sync::Arc;

use crate::{
Expand All @@ -931,18 +999,27 @@ mod test {

assert_eq!(values.len(), blocks.len());

let rows = values
let rows: Vec<_> = values
.iter()
.zip(blocks.iter())
.map(|(value, block)| EntityModification::Remove {
key: ROW_GROUP_TYPE.key(Id::String(Word::from(value.to_string()))),
block: *block,
})
.collect();
let last_mod = rows
.iter()
.enumerate()
.fold(HashMap::new(), |mut map, (idx, emod)| {
map.insert(emod.id().clone(), idx);
map
});

let group = RowGroup {
entity_type: ENTRY_TYPE.clone(),
rows,
immutable: false,
last_mod,
};
let act = group
.clamps_by_block()
Expand Down
8 changes: 8 additions & 0 deletions graph/src/env/store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,11 @@ pub struct EnvVarsStore {
/// is 10_000 which corresponds to 10MB. Setting this to 0 disables
/// write batching.
pub write_batch_size: usize,
/// Whether to memoize the last operation for each entity in a write
/// batch to speed up adding more entities. Set by
/// `GRAPH_STORE_WRITE_BATCH_MEMOIZE`. The default is `true`.
/// Remove after 2025-07-01 if there have been no issues with it.
pub write_batch_memoize: bool,
/// Whether to create GIN indexes for array attributes. Set by
/// `GRAPH_STORE_CREATE_GIN_INDEXES`. The default is `false`
pub create_gin_indexes: bool,
Expand Down Expand Up @@ -184,6 +189,7 @@ impl TryFrom<InnerStore> for EnvVarsStore {
connection_min_idle: x.connection_min_idle,
connection_idle_timeout: Duration::from_secs(x.connection_idle_timeout_in_secs),
write_queue_size: x.write_queue_size,
write_batch_memoize: x.write_batch_memoize,
batch_target_duration: Duration::from_secs(x.batch_target_duration_in_secs),
batch_timeout: x.batch_timeout_in_secs.map(Duration::from_secs),
batch_workers: x.batch_workers,
Expand Down Expand Up @@ -277,6 +283,8 @@ pub struct InnerStore {
write_batch_duration_in_secs: u64,
#[envconfig(from = "GRAPH_STORE_WRITE_BATCH_SIZE", default = "10000")]
write_batch_size: usize,
#[envconfig(from = "GRAPH_STORE_WRITE_BATCH_MEMOIZE", default = "true")]
write_batch_memoize: bool,
#[envconfig(from = "GRAPH_STORE_CREATE_GIN_INDEXES", default = "false")]
create_gin_indexes: bool,
#[envconfig(from = "GRAPH_STORE_USE_BRIN_FOR_ALL_QUERY_TYPES", default = "false")]
Expand Down