Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[sharded 4] migrate statevalueindex #13773

Merged
merged 1 commit into from
Jun 25, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions aptos-node/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ aptos-peer-monitoring-service-server = { workspace = true }
aptos-peer-monitoring-service-types = { workspace = true }
aptos-runtimes = { workspace = true }
aptos-safety-rules = { workspace = true }
aptos-schemadb = { workspace = true }
aptos-state-sync-driver = { workspace = true }
aptos-storage-interface = { workspace = true }
aptos-storage-service-client = { workspace = true }
Expand Down
11 changes: 10 additions & 1 deletion aptos-node/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ use aptos_config::config::{
merge_node_config, InitialSafetyRulesConfig, NodeConfig, PersistableConfig,
};
use aptos_framework::ReleaseBundle;
use aptos_indexer_grpc_table_info::internal_indexer_db_service::InternalIndexerDBService;
use aptos_logger::{prelude::*, telemetry_log_writer::TelemetryLog, Level, LoggerFilterUpdater};
use aptos_state_sync_driver::driver_factory::StateSyncRuntimes;
use aptos_types::{chain_id::ChainId, on_chain_config::OnChainJWKConsensusConfig};
Expand Down Expand Up @@ -666,6 +667,8 @@ pub fn setup_environment_and_start_node(
db_rw.reader.clone(),
);

let internal_indexer_db = InternalIndexerDBService::get_indexer_db(&node_config);

// Start state sync and get the notification endpoints for mempool and consensus
let (aptos_data_client, state_sync_runtimes, mempool_listener, consensus_notifier) =
state_sync::start_state_sync_and_get_notification_handles(
Expand All @@ -674,6 +677,7 @@ pub fn setup_environment_and_start_node(
genesis_waypoint,
event_subscription_service,
db_rw.clone(),
internal_indexer_db.clone(),
)?;

// Start the node inspection service
Expand All @@ -691,7 +695,12 @@ pub fn setup_environment_and_start_node(
indexer_runtime,
indexer_grpc_runtime,
internal_indexer_db_runtime,
) = services::bootstrap_api_and_indexer(&node_config, db_rw.clone(), chain_id)?;
) = services::bootstrap_api_and_indexer(
&node_config,
db_rw.clone(),
chain_id,
internal_indexer_db,
)?;

// Create mempool and get the consensus to mempool sender
let (mempool_runtime, consensus_to_mempool_sender) =
Expand Down
4 changes: 3 additions & 1 deletion aptos-node/src/services.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ use aptos_peer_monitoring_service_server::{
PeerMonitoringServiceServer,
};
use aptos_peer_monitoring_service_types::PeerMonitoringServiceMessage;
use aptos_schemadb::DB;
use aptos_storage_interface::{DbReader, DbReaderWriter};
use aptos_time_service::TimeService;
use aptos_types::{chain_id::ChainId, indexer::indexer_db_reader::IndexerReader};
Expand All @@ -44,6 +45,7 @@ pub fn bootstrap_api_and_indexer(
node_config: &NodeConfig,
db_rw: DbReaderWriter,
chain_id: ChainId,
internal_indexer_db: Option<Arc<DB>>,
) -> anyhow::Result<(
Receiver<MempoolClientRequest>,
Option<Runtime>,
Expand All @@ -67,7 +69,7 @@ pub fn bootstrap_api_and_indexer(
};

let (db_indexer_runtime, txn_event_reader) =
match bootstrap_internal_indexer_db(node_config, db_rw.clone()) {
match bootstrap_internal_indexer_db(node_config, db_rw.clone(), internal_indexer_db) {
Some((runtime, db_indexer)) => (Some(runtime), Some(db_indexer)),
None => (None, None),
};
Expand Down
3 changes: 3 additions & 0 deletions aptos-node/src/state_sync.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ use aptos_network::application::{
interface::{NetworkClient, NetworkClientInterface, NetworkServiceEvents},
storage::PeersAndMetadata,
};
use aptos_schemadb::DB;
use aptos_state_sync_driver::{
driver_factory::{DriverFactory, StateSyncRuntimes},
metadata_storage::PersistentMetadataStorage,
Expand Down Expand Up @@ -131,6 +132,7 @@ pub fn start_state_sync_and_get_notification_handles(
waypoint: Waypoint,
event_subscription_service: EventSubscriptionService,
db_rw: DbReaderWriter,
internal_indexer_db: Option<Arc<DB>>,
) -> anyhow::Result<(
AptosDataClient,
StateSyncRuntimes,
Expand Down Expand Up @@ -195,6 +197,7 @@ pub fn start_state_sync_and_get_notification_handles(
aptos_data_client.clone(),
streaming_service_client,
TimeService::real(),
internal_indexer_db,
);

// Create a new state sync runtime handle
Expand Down
19 changes: 18 additions & 1 deletion config/src/config/internal_indexer_db_config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,17 +4,25 @@
use serde::{Deserialize, Serialize};

#[derive(Clone, Copy, Debug, Deserialize, PartialEq, Eq, Serialize)]
#[serde(default, deny_unknown_fields)]
pub struct InternalIndexerDBConfig {
pub enable_transaction: bool,
pub enable_event: bool,
pub enable_statekeys: bool,
pub batch_size: usize,
}

impl InternalIndexerDBConfig {
pub fn new(enable_transaction: bool, enable_event: bool, batch_size: usize) -> Self {
pub fn new(
enable_transaction: bool,
enable_event: bool,
enable_statekeys: bool,
batch_size: usize,
) -> Self {
Self {
enable_transaction,
enable_event,
enable_statekeys,
batch_size,
}
}
Expand All @@ -27,6 +35,14 @@ impl InternalIndexerDBConfig {
self.enable_event
}

pub fn enable_statekeys(&self) -> bool {
self.enable_statekeys
}

pub fn is_internal_indexer_db_enabled(&self) -> bool {
self.enable_transaction || self.enable_event || self.enable_statekeys
}

pub fn batch_size(&self) -> usize {
self.batch_size
}
Expand All @@ -37,6 +53,7 @@ impl Default for InternalIndexerDBConfig {
Self {
enable_transaction: false,
enable_event: false,
enable_statekeys: false,
batch_size: 10_000,
}
}
Expand Down
1 change: 1 addition & 0 deletions ecosystem/indexer-grpc/indexer-grpc-table-info/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ aptos-indexer-grpc-utils = { workspace = true }
aptos-logger = { workspace = true }
aptos-mempool = { workspace = true }
aptos-runtimes = { workspace = true }
aptos-schemadb = { workspace = true }
aptos-storage-interface = { workspace = true }
aptos-types = { workspace = true }
flate2 = { workspace = true }
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
use aptos_config::config::NodeConfig;
use aptos_db_indexer::{db_indexer::DBIndexer, db_ops::open_internal_indexer_db};
use aptos_indexer_grpc_utils::counters::{log_grpc_step, IndexerGrpcStep};
use aptos_schemadb::DB;
use aptos_storage_interface::DbReader;
use std::sync::Arc;

Expand All @@ -15,20 +16,13 @@ pub struct InternalIndexerDBService {
}

impl InternalIndexerDBService {
pub fn new(db_reader: Arc<dyn DbReader>, node_config: &NodeConfig) -> Self {
let db_path = node_config
.storage
.get_dir_paths()
.default_root_path()
.join(INTERNAL_INDEXER_DB);
let rocksdb_config = node_config.storage.rocksdb_configs.index_db_config;
let db = Arc::new(
open_internal_indexer_db(db_path, &rocksdb_config)
.expect("Failed to open up indexer db initially"),
);

pub fn new(
db_reader: Arc<dyn DbReader>,
node_config: &NodeConfig,
internal_indexer_db: Arc<DB>,
) -> Self {
let internal_db_indexer = Arc::new(DBIndexer::new(
db,
internal_indexer_db,
db_reader,
&node_config.indexer_db_config,
));
Expand All @@ -37,6 +31,26 @@ impl InternalIndexerDBService {
}
}

pub fn get_indexer_db(node_config: &NodeConfig) -> Option<Arc<DB>> {
if !node_config
.indexer_db_config
.is_internal_indexer_db_enabled()
{
return None;
}
let db_path_buf = node_config
.storage
.get_dir_paths()
.default_root_path()
.join(INTERNAL_INDEXER_DB);
let rocksdb_config = node_config.storage.rocksdb_configs.index_db_config;
let db_path = db_path_buf.as_path();
Some(Arc::new(
open_internal_indexer_db(db_path, &rocksdb_config)
.expect("Failed to open up indexer db initially"),
))
}

pub fn get_db_indexer(&self) -> Arc<DBIndexer> {
Arc::clone(&self.db_indexer)
}
Expand Down
7 changes: 5 additions & 2 deletions ecosystem/indexer-grpc/indexer-grpc-table-info/src/runtime.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ use aptos_api::context::Context;
use aptos_config::config::NodeConfig;
use aptos_db_indexer::{db_indexer::DBIndexer, db_ops::open_db, db_v2::IndexerAsyncV2};
use aptos_mempool::MempoolClientSender;
use aptos_schemadb::DB;
use aptos_storage_interface::DbReaderWriter;
use aptos_types::chain_id::ChainId;
use std::sync::Arc;
Expand All @@ -18,13 +19,15 @@ const INDEX_ASYNC_V2_DB_NAME: &str = "index_indexer_async_v2_db";
pub fn bootstrap_internal_indexer_db(
config: &NodeConfig,
db_rw: DbReaderWriter,
internal_indexer_db: Option<Arc<DB>>,
) -> Option<(Runtime, Arc<DBIndexer>)> {
if !(config.indexer_db_config.enable_event() || config.indexer_db_config.enable_transaction()) {
if !config.indexer_db_config.is_internal_indexer_db_enabled() || internal_indexer_db.is_none() {
return None;
}
let runtime = aptos_runtimes::spawn_named_runtime("index-db".to_string(), None);
// Set up db config and open up the db initially to read metadata
let mut indexer_service = InternalIndexerDBService::new(db_rw.reader, config);
let mut indexer_service =
InternalIndexerDBService::new(db_rw.reader, config, internal_indexer_db.unwrap());
let db_indexer = indexer_service.get_db_indexer();
// Spawn task for db indexer
runtime.spawn(async move {
Expand Down
28 changes: 23 additions & 5 deletions execution/executor/tests/internal_indexer_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,16 @@ use aptos_sdk::{
use aptos_storage_interface::DbReader;
use aptos_temppath::TempPath;
use aptos_types::{
account_address::AccountAddress,
account_config::aptos_test_root_address,
block_metadata::BlockMetadata,
chain_id::ChainId,
state_store::state_key::prefix::StateKeyPrefix,
test_helpers::transaction_test_helpers::TEST_BLOCK_EXECUTOR_ONCHAIN_CONFIG,
transaction::{
signature_verified_transaction::into_signature_verified_block, Transaction,
Transaction::UserTransaction, WriteSetPayload,
signature_verified_transaction::into_signature_verified_block,
Transaction::{self, UserTransaction},
WriteSetPayload,
},
};
use rand::SeedableRng;
Expand Down Expand Up @@ -143,8 +146,8 @@ fn test_db_indexer_data() {
);
let db_indexer = DBIndexer::new(
db.clone(),
aptos_db,
&InternalIndexerDBConfig::new(true, true, 2),
aptos_db.clone(),
&InternalIndexerDBConfig::new(true, true, true, 2),
);
// assert the data matches the expected data
let mut version = db_indexer.get_persisted_version().unwrap();
Expand All @@ -168,5 +171,20 @@ fn test_db_indexer_data() {

let x = db_indexer.get_event_by_key_iter().unwrap();
let res: Vec<_> = x.collect();
assert!(res.len() == 14);
assert_eq!(res.len(), 27);

let core_kv_iter = db_indexer
.get_prefixed_state_value_iterator(&StateKeyPrefix::from(core_account.address()), None, 12)
.unwrap();
let core_kv_res: Vec<_> = core_kv_iter.collect();
assert_eq!(core_kv_res.len(), 5);
let address_one_kv_iter = db_indexer
.get_prefixed_state_value_iterator(
&StateKeyPrefix::from(AccountAddress::from_hex_literal("0x1").unwrap()),
None,
12,
)
.unwrap();
let address_one_kv_res: Vec<_> = address_one_kv_iter.collect();
assert_eq!(address_one_kv_res.len(), 152);
}
1 change: 1 addition & 0 deletions state-sync/state-sync-driver/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ aptos-consensus-notifications = { workspace = true }
aptos-crypto = { workspace = true }
aptos-data-client = { workspace = true }
aptos-data-streaming-service = { workspace = true }
aptos-db-indexer-schemas = { workspace = true }
aptos-event-notifications = { workspace = true }
aptos-executor-types = { workspace = true }
aptos-infallible = { workspace = true }
Expand Down
7 changes: 7 additions & 0 deletions state-sync/state-sync-driver/src/bootstrapper.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ use aptos_data_streaming_service::{
streaming_client::{DataStreamingClient, NotificationAndFeedback, NotificationFeedback},
};
use aptos_logger::{prelude::*, sample::SampleRate};
use aptos_schemadb::DB;
use aptos_storage_interface::DbReader;
use aptos_types::{
epoch_change::Verifier,
Expand Down Expand Up @@ -323,6 +324,9 @@ pub struct Bootstrapper<MetadataStorage, StorageSyncer, StreamingClient> {

// The epoch states verified by this node (held in memory)
verified_epoch_states: VerifiedEpochStates,

// The internal indexer db used to store state value index
internal_indexer_db: Option<Arc<DB>>,
}

impl<
Expand All @@ -338,6 +342,7 @@ impl<
streaming_client: StreamingClient,
storage: Arc<dyn DbReader>,
storage_synchronizer: StorageSyncer,
internal_indexer_db: Option<Arc<DB>>,
) -> Self {
// Load the latest epoch state from storage
let latest_epoch_state = utils::fetch_latest_epoch_state(storage.clone())
Expand All @@ -357,6 +362,7 @@ impl<
storage,
storage_synchronizer,
verified_epoch_states,
internal_indexer_db,
}
}

Expand Down Expand Up @@ -994,6 +1000,7 @@ impl<
epoch_change_proofs,
ledger_info_to_sync,
transaction_output_to_sync.clone(),
self.internal_indexer_db.clone(),
)?;
self.state_value_syncer.initialized_state_snapshot_receiver = true;
}
Expand Down
Loading
Loading