From 6d4de2528dbae430fa1151e3df77e6e728fdf60b Mon Sep 17 00:00:00 2001 From: Xun Li Date: Mon, 25 Nov 2024 13:25:17 -0800 Subject: [PATCH] [indexer-alt] Add obj_info pipeline --- .../2024-11-25-211949_obj_info/down.sql | 1 + .../2024-11-25-211949_obj_info/up.sql | 67 +++++++++++ crates/sui-indexer-alt/src/handlers/mod.rs | 1 + .../sui-indexer-alt/src/handlers/obj_info.rs | 112 ++++++++++++++++++ crates/sui-indexer-alt/src/lib.rs | 5 +- crates/sui-indexer-alt/src/models/objects.rs | 16 ++- crates/sui-indexer-alt/src/schema.rs | 14 +++ .../sui-types/src/full_checkpoint_content.rs | 30 +++-- 8 files changed, 235 insertions(+), 11 deletions(-) create mode 100644 crates/sui-indexer-alt/migrations/2024-11-25-211949_obj_info/down.sql create mode 100644 crates/sui-indexer-alt/migrations/2024-11-25-211949_obj_info/up.sql create mode 100644 crates/sui-indexer-alt/src/handlers/obj_info.rs diff --git a/crates/sui-indexer-alt/migrations/2024-11-25-211949_obj_info/down.sql b/crates/sui-indexer-alt/migrations/2024-11-25-211949_obj_info/down.sql new file mode 100644 index 0000000000000..ad46a56966c90 --- /dev/null +++ b/crates/sui-indexer-alt/migrations/2024-11-25-211949_obj_info/down.sql @@ -0,0 +1 @@ +DROP TABLE IF EXISTS obj_info; diff --git a/crates/sui-indexer-alt/migrations/2024-11-25-211949_obj_info/up.sql b/crates/sui-indexer-alt/migrations/2024-11-25-211949_obj_info/up.sql new file mode 100644 index 0000000000000..67ff1e097e19b --- /dev/null +++ b/crates/sui-indexer-alt/migrations/2024-11-25-211949_obj_info/up.sql @@ -0,0 +1,67 @@ +-- A table that keeps track of all the updates to object type and owner information. +-- In particular, whenever an object's presence or ownership changes, we insert a +-- new row into this table. Each row should have a unique (object_id, cp_sequence_number) +-- pair. +-- When implementing consistency queries, we will use this table to find all +-- object IDs that match the given filters bounded by the cursor checkpoint. +-- These object IDs can then be used to look up the latest version of the objects +-- bounded by the given checkpoint in the object_versions table. +CREATE TABLE IF NOT EXISTS obj_info +( + object_id BYTEA NOT NULL, + cp_sequence_number BIGINT NOT NULL, + -- An enum describing the object's ownership model: + -- + -- Immutable = 0, + -- Address-owned = 1, + -- Object-owned (dynamic field) = 2, + -- Shared = 3. + -- + -- Note that there is a distinction between an object that is owned by + -- another object (kind 2), which relates to dynamic fields, and an object + -- that is owned by another object's address (kind 1), which relates to + -- transfer-to-object. + owner_kind SMALLINT, + -- The address for address-owned objects, and the parent object for + -- object-owned objects. + owner_id BYTEA, + -- The following fields relate to the object's type. These only apply to + -- Move Objects. For Move Packages they will all be NULL. + -- + -- The type's package ID. + package BYTEA, + -- The type's module name. + module TEXT, + -- The type's name. + name TEXT, + -- The type's type parameters, as a BCS-encoded array of TypeTags. + instantiation BYTEA, + PRIMARY KEY (object_id, cp_sequence_number) +); + +CREATE INDEX IF NOT EXISTS obj_info_owner +ON obj_info (owner_kind, owner_id, cp_sequence_number, object_id); + +CREATE INDEX IF NOT EXISTS obj_info_pkg +ON obj_info (package, cp_sequence_number, object_id); + +CREATE INDEX IF NOT EXISTS obj_info_mod +ON obj_info (package, module, cp_sequence_number, object_id); + +CREATE INDEX IF NOT EXISTS obj_info_name +ON obj_info (package, module, name, cp_sequence_number, object_id); + +CREATE INDEX IF NOT EXISTS obj_info_inst +ON obj_info (package, module, name, instantiation, cp_sequence_number, object_id); + +CREATE INDEX IF NOT EXISTS obj_info_owner_pkg +ON obj_info (owner_kind, owner_id, package, cp_sequence_number, object_id); + +CREATE INDEX IF NOT EXISTS obj_info_owner_mod +ON obj_info (owner_kind, owner_id, package, module, cp_sequence_number, object_id); + +CREATE INDEX IF NOT EXISTS obj_info_owner_name +ON obj_info (owner_kind, owner_id, package, module, name, cp_sequence_number, object_id); + +CREATE INDEX IF NOT EXISTS obj_info_owner_inst +ON obj_info (owner_kind, owner_id, package, module, name, instantiation, cp_sequence_number, object_id); diff --git a/crates/sui-indexer-alt/src/handlers/mod.rs b/crates/sui-indexer-alt/src/handlers/mod.rs index 038ae73102d86..9346a087250cc 100644 --- a/crates/sui-indexer-alt/src/handlers/mod.rs +++ b/crates/sui-indexer-alt/src/handlers/mod.rs @@ -10,6 +10,7 @@ pub mod kv_feature_flags; pub mod kv_objects; pub mod kv_protocol_configs; pub mod kv_transactions; +pub mod obj_info; pub mod obj_versions; pub mod sum_coin_balances; pub mod sum_displays; diff --git a/crates/sui-indexer-alt/src/handlers/obj_info.rs b/crates/sui-indexer-alt/src/handlers/obj_info.rs new file mode 100644 index 0000000000000..4b673788cdaee --- /dev/null +++ b/crates/sui-indexer-alt/src/handlers/obj_info.rs @@ -0,0 +1,112 @@ +// Copyright (c) Mysten Labs, Inc. +// SPDX-License-Identifier: Apache-2.0 + +use std::{collections::BTreeMap, sync::Arc}; + +use anyhow::{anyhow, Result}; +use diesel_async::RunQueryDsl; +use sui_types::{base_types::ObjectID, full_checkpoint_content::CheckpointData, object::Owner}; + +use crate::{ + db, + models::objects::{StoredObjInfo, StoredOwnerKind}, + pipeline::{concurrent::Handler, Processor}, + schema::obj_info, +}; + +pub struct ObjInfo; + +impl Processor for ObjInfo { + const NAME: &'static str = "obj_info"; + type Value = StoredObjInfo; + + fn process(&self, checkpoint: &Arc) -> Result> { + let cp_sequence_number = checkpoint.checkpoint_summary.sequence_number as i64; + let checkpoint_input_objects = checkpoint.checkpoint_input_objects(); + let latest_live_output_objects = checkpoint + .latest_live_output_objects() + .into_iter() + .map(|o| (o.id(), o)) + .collect::>(); + let mut values: BTreeMap = BTreeMap::new(); + for object_id in checkpoint_input_objects.keys() { + if !latest_live_output_objects.contains_key(object_id) { + // If an input object is not in the latest live output objects, it must have been deleted + // or wrapped in this checkpoint. We keep an entry for it in the table. + // This is necessary when we query objects and iterating over them, so that we don't + // include the object in the result if it was deleted. + values.insert( + *object_id, + StoredObjInfo { + object_id: object_id.to_vec(), + cp_sequence_number, + owner_kind: None, + owner_id: None, + package: None, + module: None, + name: None, + instantiation: None, + }, + ); + } + } + for (object_id, object) in latest_live_output_objects.iter() { + // If an object is newly created/unwrapped in this checkpoint, or if the owner changed, + // we need to insert an entry for it in the table. + let should_insert = match checkpoint_input_objects.get(object_id) { + Some(input_object) => input_object.owner() != object.owner(), + None => true, + }; + if should_insert { + let type_ = object.type_(); + values.insert( + *object_id, + StoredObjInfo { + object_id: object_id.to_vec(), + cp_sequence_number, + owner_kind: Some(match object.owner() { + Owner::AddressOwner(_) => StoredOwnerKind::Address, + Owner::ObjectOwner(_) => StoredOwnerKind::Object, + Owner::Shared { .. } => StoredOwnerKind::Shared, + Owner::Immutable => StoredOwnerKind::Immutable, + Owner::ConsensusV2 { .. } => todo!(), + }), + + owner_id: match object.owner() { + Owner::AddressOwner(a) => Some(a.to_vec()), + Owner::ObjectOwner(o) => Some(o.to_vec()), + Owner::Shared { .. } | Owner::Immutable { .. } => None, + Owner::ConsensusV2 { .. } => todo!(), + }, + + package: type_.map(|t| t.address().to_vec()), + module: type_.map(|t| t.module().to_string()), + name: type_.map(|t| t.name().to_string()), + instantiation: type_ + .map(|t| bcs::to_bytes(&t.type_params())) + .transpose() + .map_err(|e| { + anyhow!( + "Failed to serialize type parameters for {}: {e}", + object.id().to_canonical_display(/* with_prefix */ true), + ) + })?, + }, + ); + } + } + + Ok(values.into_values().collect()) + } +} + +#[async_trait::async_trait] +impl Handler for ObjInfo { + async fn commit(values: &[Self::Value], conn: &mut db::Connection<'_>) -> Result { + Ok(diesel::insert_into(obj_info::table) + .values(values) + .on_conflict_do_nothing() + .execute(conn) + .await?) + } +} diff --git a/crates/sui-indexer-alt/src/lib.rs b/crates/sui-indexer-alt/src/lib.rs index 124197a2eae02..f2d367b0ad167 100644 --- a/crates/sui-indexer-alt/src/lib.rs +++ b/crates/sui-indexer-alt/src/lib.rs @@ -11,8 +11,8 @@ use handlers::{ ev_emit_mod::EvEmitMod, ev_struct_inst::EvStructInst, kv_checkpoints::KvCheckpoints, kv_epoch_ends::KvEpochEnds, kv_epoch_starts::KvEpochStarts, kv_feature_flags::KvFeatureFlags, kv_objects::KvObjects, kv_protocol_configs::KvProtocolConfigs, kv_transactions::KvTransactions, - obj_versions::ObjVersions, sum_coin_balances::SumCoinBalances, sum_displays::SumDisplays, - sum_obj_types::SumObjTypes, sum_packages::SumPackages, + obj_info::ObjInfo, obj_versions::ObjVersions, sum_coin_balances::SumCoinBalances, + sum_displays::SumDisplays, sum_obj_types::SumObjTypes, sum_packages::SumPackages, tx_affected_addresses::TxAffectedAddress, tx_affected_objects::TxAffectedObjects, tx_balance_changes::TxBalanceChanges, tx_calls::TxCalls, tx_digests::TxDigests, tx_kinds::TxKinds, wal_coin_balances::WalCoinBalances, wal_obj_types::WalObjTypes, @@ -435,6 +435,7 @@ pub async fn start_indexer( indexer.concurrent_pipeline(KvEpochStarts, None).await?; indexer.concurrent_pipeline(KvObjects, None).await?; indexer.concurrent_pipeline(KvTransactions, None).await?; + indexer.concurrent_pipeline(ObjInfo, None).await?; indexer.concurrent_pipeline(ObjVersions, None).await?; indexer.concurrent_pipeline(TxAffectedAddress, None).await?; indexer.concurrent_pipeline(TxAffectedObjects, None).await?; diff --git a/crates/sui-indexer-alt/src/models/objects.rs b/crates/sui-indexer-alt/src/models/objects.rs index 606fb367cbfa5..d719e774ae689 100644 --- a/crates/sui-indexer-alt/src/models/objects.rs +++ b/crates/sui-indexer-alt/src/models/objects.rs @@ -9,7 +9,8 @@ use sui_field_count::FieldCount; use sui_types::base_types::ObjectID; use crate::schema::{ - kv_objects, obj_versions, sum_coin_balances, sum_obj_types, wal_coin_balances, wal_obj_types, + kv_objects, obj_info, obj_versions, sum_coin_balances, sum_obj_types, wal_coin_balances, + wal_obj_types, }; #[derive(Insertable, Debug, Clone, FieldCount)] @@ -126,3 +127,16 @@ where }) } } + +#[derive(Insertable, Debug, Clone, FieldCount)] +#[diesel(table_name = obj_info, primary_key(object_id, cp_sequence_number))] +pub struct StoredObjInfo { + pub object_id: Vec, + pub cp_sequence_number: i64, + pub owner_kind: Option, + pub owner_id: Option>, + pub package: Option>, + pub module: Option, + pub name: Option, + pub instantiation: Option>, +} diff --git a/crates/sui-indexer-alt/src/schema.rs b/crates/sui-indexer-alt/src/schema.rs index e4fd6db117fb1..e1872ad90194c 100644 --- a/crates/sui-indexer-alt/src/schema.rs +++ b/crates/sui-indexer-alt/src/schema.rs @@ -103,6 +103,19 @@ diesel::table! { } } +diesel::table! { + obj_info (object_id, cp_sequence_number) { + object_id -> Bytea, + cp_sequence_number -> Int8, + owner_kind -> Nullable, + owner_id -> Nullable, + package -> Nullable, + module -> Nullable, + name -> Nullable, + instantiation -> Nullable, + } +} + diesel::table! { obj_versions (object_id, object_version) { object_id -> Bytea, @@ -250,6 +263,7 @@ diesel::allow_tables_to_appear_in_same_query!( kv_objects, kv_protocol_configs, kv_transactions, + obj_info, obj_versions, sum_coin_balances, sum_displays, diff --git a/crates/sui-types/src/full_checkpoint_content.rs b/crates/sui-types/src/full_checkpoint_content.rs index 2c8eef47f237e..41ca2d2f946ab 100644 --- a/crates/sui-types/src/full_checkpoint_content.rs +++ b/crates/sui-types/src/full_checkpoint_content.rs @@ -3,7 +3,7 @@ use std::collections::BTreeMap; -use crate::base_types::ObjectRef; +use crate::base_types::{ObjectID, ObjectRef}; use crate::effects::{ IDOperation, ObjectIn, ObjectOut, TransactionEffects, TransactionEffectsAPI, TransactionEvents, }; @@ -11,6 +11,7 @@ use crate::messages_checkpoint::{CertifiedCheckpointSummary, CheckpointContents} use crate::object::Object; use crate::storage::BackingPackageStore; use crate::transaction::Transaction; +use im::HashSet; use itertools::Either; use serde::{Deserialize, Serialize}; use tap::Pipe; @@ -51,11 +52,24 @@ impl CheckpointData { eventually_removed_object_refs.into_values().collect() } - pub fn input_objects(&self) -> Vec<&Object> { - self.transactions - .iter() - .flat_map(|tx| &tx.input_objects) - .collect() + /// Returns all objects that are used as input to the transactions in the checkpoint, + /// and already exist prior to the checkpoint. + pub fn checkpoint_input_objects(&self) -> BTreeMap { + let mut output_objects_seen = HashSet::new(); + let mut checkpoint_input_objects = BTreeMap::new(); + for tx in self.transactions.iter() { + for obj in tx.input_objects.iter() { + let id = obj.id(); + if output_objects_seen.contains(&id) || checkpoint_input_objects.contains_key(&id) { + continue; + } + checkpoint_input_objects.insert(id, obj); + } + for obj in tx.output_objects.iter() { + output_objects_seen.insert(obj.id()); + } + } + checkpoint_input_objects } pub fn all_objects(&self) -> Vec<&Object> { @@ -73,7 +87,7 @@ pub struct CheckpointTransaction { pub transaction: Transaction, /// The effects produced by executing this transaction pub effects: TransactionEffects, - /// The events, if any, emitted by this transaciton during execution + /// The events, if any, emitted by this transactions during execution pub events: Option, /// The state of all inputs to this transaction as they were prior to execution. pub input_objects: Vec, @@ -87,7 +101,7 @@ impl CheckpointTransaction { // Iterator over id and versions for all deleted or wrapped objects match &self.effects { TransactionEffects::V1(v1) => Either::Left( - // Effects v1 has delted and wrapped objects versions as the "new" version, not the + // Effects v1 has deleted and wrapped objects versions as the "new" version, not the // old one that was actually removed. So we need to take these and then look them // up in the `modified_at_versions`. // No need to chain unwrapped_then_deleted because these objects must have been wrapped