Skip to content

Commit 697f2ba

Browse files
committedJun 25, 2024
[internalindexer] add indexer db to state sync

File tree

30 files changed

+572
-83
lines changed

30 files changed

+572
-83
lines changed
 

‎Cargo.lock

+7
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

‎aptos-node/Cargo.toml

+1
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,7 @@ aptos-peer-monitoring-service-server = { workspace = true }
5252
aptos-peer-monitoring-service-types = { workspace = true }
5353
aptos-runtimes = { workspace = true }
5454
aptos-safety-rules = { workspace = true }
55+
aptos-schemadb = { workspace = true }
5556
aptos-state-sync-driver = { workspace = true }
5657
aptos-storage-interface = { workspace = true }
5758
aptos-storage-service-client = { workspace = true }

‎aptos-node/src/lib.rs

+10-1
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@ use aptos_config::config::{
2424
merge_node_config, InitialSafetyRulesConfig, NodeConfig, PersistableConfig,
2525
};
2626
use aptos_framework::ReleaseBundle;
27+
use aptos_indexer_grpc_table_info::internal_indexer_db_service::InternalIndexerDBService;
2728
use aptos_logger::{prelude::*, telemetry_log_writer::TelemetryLog, Level, LoggerFilterUpdater};
2829
use aptos_state_sync_driver::driver_factory::StateSyncRuntimes;
2930
use aptos_types::{chain_id::ChainId, on_chain_config::OnChainJWKConsensusConfig};
@@ -666,6 +667,8 @@ pub fn setup_environment_and_start_node(
666667
db_rw.reader.clone(),
667668
);
668669

670+
let internal_indexer_db = InternalIndexerDBService::get_indexer_db(&node_config);
671+
669672
// Start state sync and get the notification endpoints for mempool and consensus
670673
let (aptos_data_client, state_sync_runtimes, mempool_listener, consensus_notifier) =
671674
state_sync::start_state_sync_and_get_notification_handles(
@@ -674,6 +677,7 @@ pub fn setup_environment_and_start_node(
674677
genesis_waypoint,
675678
event_subscription_service,
676679
db_rw.clone(),
680+
internal_indexer_db.clone(),
677681
)?;
678682

679683
// Start the node inspection service
@@ -691,7 +695,12 @@ pub fn setup_environment_and_start_node(
691695
indexer_runtime,
692696
indexer_grpc_runtime,
693697
internal_indexer_db_runtime,
694-
) = services::bootstrap_api_and_indexer(&node_config, db_rw.clone(), chain_id)?;
698+
) = services::bootstrap_api_and_indexer(
699+
&node_config,
700+
db_rw.clone(),
701+
chain_id,
702+
internal_indexer_db,
703+
)?;
695704

696705
// Create mempool and get the consensus to mempool sender
697706
let (mempool_runtime, consensus_to_mempool_sender) =

‎aptos-node/src/services.rs

+3-1
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@ use aptos_peer_monitoring_service_server::{
2727
PeerMonitoringServiceServer,
2828
};
2929
use aptos_peer_monitoring_service_types::PeerMonitoringServiceMessage;
30+
use aptos_schemadb::DB;
3031
use aptos_storage_interface::{DbReader, DbReaderWriter};
3132
use aptos_time_service::TimeService;
3233
use aptos_types::{chain_id::ChainId, indexer::indexer_db_reader::IndexerReader};
@@ -44,6 +45,7 @@ pub fn bootstrap_api_and_indexer(
4445
node_config: &NodeConfig,
4546
db_rw: DbReaderWriter,
4647
chain_id: ChainId,
48+
internal_indexer_db: Option<Arc<DB>>,
4749
) -> anyhow::Result<(
4850
Receiver<MempoolClientRequest>,
4951
Option<Runtime>,
@@ -67,7 +69,7 @@ pub fn bootstrap_api_and_indexer(
6769
};
6870

6971
let (db_indexer_runtime, txn_event_reader) =
70-
match bootstrap_internal_indexer_db(node_config, db_rw.clone()) {
72+
match bootstrap_internal_indexer_db(node_config, db_rw.clone(), internal_indexer_db) {
7173
Some((runtime, db_indexer)) => (Some(runtime), Some(db_indexer)),
7274
None => (None, None),
7375
};

‎aptos-node/src/state_sync.rs

+3
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ use aptos_network::application::{
2020
interface::{NetworkClient, NetworkClientInterface, NetworkServiceEvents},
2121
storage::PeersAndMetadata,
2222
};
23+
use aptos_schemadb::DB;
2324
use aptos_state_sync_driver::{
2425
driver_factory::{DriverFactory, StateSyncRuntimes},
2526
metadata_storage::PersistentMetadataStorage,
@@ -131,6 +132,7 @@ pub fn start_state_sync_and_get_notification_handles(
131132
waypoint: Waypoint,
132133
event_subscription_service: EventSubscriptionService,
133134
db_rw: DbReaderWriter,
135+
internal_indexer_db: Option<Arc<DB>>,
134136
) -> anyhow::Result<(
135137
AptosDataClient,
136138
StateSyncRuntimes,
@@ -195,6 +197,7 @@ pub fn start_state_sync_and_get_notification_handles(
195197
aptos_data_client.clone(),
196198
streaming_service_client,
197199
TimeService::real(),
200+
internal_indexer_db,
198201
);
199202

200203
// Create a new state sync runtime handle

‎config/src/config/internal_indexer_db_config.rs

+18-1
Original file line numberDiff line numberDiff line change
@@ -4,17 +4,25 @@
44
use serde::{Deserialize, Serialize};
55

66
#[derive(Clone, Copy, Debug, Deserialize, PartialEq, Eq, Serialize)]
7+
#[serde(default, deny_unknown_fields)]
78
pub struct InternalIndexerDBConfig {
89
pub enable_transaction: bool,
910
pub enable_event: bool,
11+
pub enable_statekeys: bool,
1012
pub batch_size: usize,
1113
}
1214

1315
impl InternalIndexerDBConfig {
14-
pub fn new(enable_transaction: bool, enable_event: bool, batch_size: usize) -> Self {
16+
pub fn new(
17+
enable_transaction: bool,
18+
enable_event: bool,
19+
enable_statekeys: bool,
20+
batch_size: usize,
21+
) -> Self {
1522
Self {
1623
enable_transaction,
1724
enable_event,
25+
enable_statekeys,
1826
batch_size,
1927
}
2028
}
@@ -27,6 +35,14 @@ impl InternalIndexerDBConfig {
2735
self.enable_event
2836
}
2937

38+
pub fn enable_statekeys(&self) -> bool {
39+
self.enable_statekeys
40+
}
41+
42+
pub fn is_internal_indexer_db_enabled(&self) -> bool {
43+
self.enable_transaction || self.enable_event || self.enable_statekeys
44+
}
45+
3046
pub fn batch_size(&self) -> usize {
3147
self.batch_size
3248
}
@@ -37,6 +53,7 @@ impl Default for InternalIndexerDBConfig {
3753
Self {
3854
enable_transaction: false,
3955
enable_event: false,
56+
enable_statekeys: false,
4057
batch_size: 10_000,
4158
}
4259
}

‎ecosystem/indexer-grpc/indexer-grpc-table-info/Cargo.toml

+1
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ aptos-indexer-grpc-utils = { workspace = true }
2222
aptos-logger = { workspace = true }
2323
aptos-mempool = { workspace = true }
2424
aptos-runtimes = { workspace = true }
25+
aptos-schemadb = { workspace = true }
2526
aptos-storage-interface = { workspace = true }
2627
aptos-types = { workspace = true }
2728
flate2 = { workspace = true }

‎ecosystem/indexer-grpc/indexer-grpc-table-info/src/internal_indexer_db_service.rs

+27-13
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44
use aptos_config::config::NodeConfig;
55
use aptos_db_indexer::{db_indexer::DBIndexer, db_ops::open_internal_indexer_db};
66
use aptos_indexer_grpc_utils::counters::{log_grpc_step, IndexerGrpcStep};
7+
use aptos_schemadb::DB;
78
use aptos_storage_interface::DbReader;
89
use std::sync::Arc;
910

@@ -15,20 +16,13 @@ pub struct InternalIndexerDBService {
1516
}
1617

1718
impl InternalIndexerDBService {
18-
pub fn new(db_reader: Arc<dyn DbReader>, node_config: &NodeConfig) -> Self {
19-
let db_path = node_config
20-
.storage
21-
.get_dir_paths()
22-
.default_root_path()
23-
.join(INTERNAL_INDEXER_DB);
24-
let rocksdb_config = node_config.storage.rocksdb_configs.index_db_config;
25-
let db = Arc::new(
26-
open_internal_indexer_db(db_path, &rocksdb_config)
27-
.expect("Failed to open up indexer db initially"),
28-
);
29-
19+
pub fn new(
20+
db_reader: Arc<dyn DbReader>,
21+
node_config: &NodeConfig,
22+
internal_indexer_db: Arc<DB>,
23+
) -> Self {
3024
let internal_db_indexer = Arc::new(DBIndexer::new(
31-
db,
25+
internal_indexer_db,
3226
db_reader,
3327
&node_config.indexer_db_config,
3428
));
@@ -37,6 +31,26 @@ impl InternalIndexerDBService {
3731
}
3832
}
3933

34+
pub fn get_indexer_db(node_config: &NodeConfig) -> Option<Arc<DB>> {
35+
if !node_config
36+
.indexer_db_config
37+
.is_internal_indexer_db_enabled()
38+
{
39+
return None;
40+
}
41+
let db_path_buf = node_config
42+
.storage
43+
.get_dir_paths()
44+
.default_root_path()
45+
.join(INTERNAL_INDEXER_DB);
46+
let rocksdb_config = node_config.storage.rocksdb_configs.index_db_config;
47+
let db_path = db_path_buf.as_path();
48+
Some(Arc::new(
49+
open_internal_indexer_db(db_path, &rocksdb_config)
50+
.expect("Failed to open up indexer db initially"),
51+
))
52+
}
53+
4054
pub fn get_db_indexer(&self) -> Arc<DBIndexer> {
4155
Arc::clone(&self.db_indexer)
4256
}

‎ecosystem/indexer-grpc/indexer-grpc-table-info/src/runtime.rs

+5-2
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ use aptos_api::context::Context;
88
use aptos_config::config::NodeConfig;
99
use aptos_db_indexer::{db_indexer::DBIndexer, db_ops::open_db, db_v2::IndexerAsyncV2};
1010
use aptos_mempool::MempoolClientSender;
11+
use aptos_schemadb::DB;
1112
use aptos_storage_interface::DbReaderWriter;
1213
use aptos_types::chain_id::ChainId;
1314
use std::sync::Arc;
@@ -18,13 +19,15 @@ const INDEX_ASYNC_V2_DB_NAME: &str = "index_indexer_async_v2_db";
1819
pub fn bootstrap_internal_indexer_db(
1920
config: &NodeConfig,
2021
db_rw: DbReaderWriter,
22+
internal_indexer_db: Option<Arc<DB>>,
2123
) -> Option<(Runtime, Arc<DBIndexer>)> {
22-
if !(config.indexer_db_config.enable_event() || config.indexer_db_config.enable_transaction()) {
24+
if !config.indexer_db_config.is_internal_indexer_db_enabled() || internal_indexer_db.is_none() {
2325
return None;
2426
}
2527
let runtime = aptos_runtimes::spawn_named_runtime("index-db".to_string(), None);
2628
// Set up db config and open up the db initially to read metadata
27-
let mut indexer_service = InternalIndexerDBService::new(db_rw.reader, config);
29+
let mut indexer_service =
30+
InternalIndexerDBService::new(db_rw.reader, config, internal_indexer_db.unwrap());
2831
let db_indexer = indexer_service.get_db_indexer();
2932
// Spawn task for db indexer
3033
runtime.spawn(async move {

‎execution/executor/tests/internal_indexer_test.rs

+23-5
Original file line numberDiff line numberDiff line change
@@ -16,13 +16,16 @@ use aptos_sdk::{
1616
use aptos_storage_interface::DbReader;
1717
use aptos_temppath::TempPath;
1818
use aptos_types::{
19+
account_address::AccountAddress,
1920
account_config::aptos_test_root_address,
2021
block_metadata::BlockMetadata,
2122
chain_id::ChainId,
23+
state_store::state_key::prefix::StateKeyPrefix,
2224
test_helpers::transaction_test_helpers::TEST_BLOCK_EXECUTOR_ONCHAIN_CONFIG,
2325
transaction::{
24-
signature_verified_transaction::into_signature_verified_block, Transaction,
25-
Transaction::UserTransaction, WriteSetPayload,
26+
signature_verified_transaction::into_signature_verified_block,
27+
Transaction::{self, UserTransaction},
28+
WriteSetPayload,
2629
},
2730
};
2831
use rand::SeedableRng;
@@ -143,8 +146,8 @@ fn test_db_indexer_data() {
143146
);
144147
let db_indexer = DBIndexer::new(
145148
db.clone(),
146-
aptos_db,
147-
&InternalIndexerDBConfig::new(true, true, 2),
149+
aptos_db.clone(),
150+
&InternalIndexerDBConfig::new(true, true, true, 2),
148151
);
149152
// assert the data matches the expected data
150153
let mut version = db_indexer.get_persisted_version().unwrap();
@@ -168,5 +171,20 @@ fn test_db_indexer_data() {
168171

169172
let x = db_indexer.get_event_by_key_iter().unwrap();
170173
let res: Vec<_> = x.collect();
171-
assert!(res.len() == 14);
174+
assert_eq!(res.len(), 27);
175+
176+
let core_kv_iter = db_indexer
177+
.get_prefixed_state_value_iterator(&StateKeyPrefix::from(core_account.address()), None, 12)
178+
.unwrap();
179+
let core_kv_res: Vec<_> = core_kv_iter.collect();
180+
assert_eq!(core_kv_res.len(), 5);
181+
let address_one_kv_iter = db_indexer
182+
.get_prefixed_state_value_iterator(
183+
&StateKeyPrefix::from(AccountAddress::from_hex_literal("0x1").unwrap()),
184+
None,
185+
12,
186+
)
187+
.unwrap();
188+
let address_one_kv_res: Vec<_> = address_one_kv_iter.collect();
189+
assert_eq!(address_one_kv_res.len(), 152);
172190
}

‎state-sync/state-sync-driver/Cargo.toml

+1
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ aptos-consensus-notifications = { workspace = true }
1919
aptos-crypto = { workspace = true }
2020
aptos-data-client = { workspace = true }
2121
aptos-data-streaming-service = { workspace = true }
22+
aptos-db-indexer-schemas = { workspace = true }
2223
aptos-event-notifications = { workspace = true }
2324
aptos-executor-types = { workspace = true }
2425
aptos-infallible = { workspace = true }

‎state-sync/state-sync-driver/src/bootstrapper.rs

+7
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ use aptos_data_streaming_service::{
2121
streaming_client::{DataStreamingClient, NotificationAndFeedback, NotificationFeedback},
2222
};
2323
use aptos_logger::{prelude::*, sample::SampleRate};
24+
use aptos_schemadb::DB;
2425
use aptos_storage_interface::DbReader;
2526
use aptos_types::{
2627
epoch_change::Verifier,
@@ -323,6 +324,9 @@ pub struct Bootstrapper<MetadataStorage, StorageSyncer, StreamingClient> {
323324

324325
// The epoch states verified by this node (held in memory)
325326
verified_epoch_states: VerifiedEpochStates,
327+
328+
// The internal indexer db used to store state value index
329+
internal_indexer_db: Option<Arc<DB>>,
326330
}
327331

328332
impl<
@@ -338,6 +342,7 @@ impl<
338342
streaming_client: StreamingClient,
339343
storage: Arc<dyn DbReader>,
340344
storage_synchronizer: StorageSyncer,
345+
internal_indexer_db: Option<Arc<DB>>,
341346
) -> Self {
342347
// Load the latest epoch state from storage
343348
let latest_epoch_state = utils::fetch_latest_epoch_state(storage.clone())
@@ -357,6 +362,7 @@ impl<
357362
storage,
358363
storage_synchronizer,
359364
verified_epoch_states,
365+
internal_indexer_db,
360366
}
361367
}
362368

@@ -994,6 +1000,7 @@ impl<
9941000
epoch_change_proofs,
9951001
ledger_info_to_sync,
9961002
transaction_output_to_sync.clone(),
1003+
self.internal_indexer_db.clone(),
9971004
)?;
9981005
self.state_value_syncer.initialized_state_snapshot_receiver = true;
9991006
}

‎state-sync/state-sync-driver/src/driver.rs

+4
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@ use aptos_event_notifications::EventSubscriptionService;
3232
use aptos_infallible::Mutex;
3333
use aptos_logger::prelude::*;
3434
use aptos_mempool_notifications::MempoolNotificationSender;
35+
use aptos_schemadb::DB;
3536
use aptos_storage_interface::DbReader;
3637
use aptos_storage_service_notifications::StorageServiceNotificationSender;
3738
use aptos_time_service::{TimeService, TimeServiceTrait};
@@ -151,6 +152,7 @@ impl<
151152
StreamingClient,
152153
>
153154
{
155+
#[allow(clippy::too_many_arguments)]
154156
pub fn new(
155157
client_notification_listener: ClientNotificationListener,
156158
commit_notification_listener: CommitNotificationListener,
@@ -168,6 +170,7 @@ impl<
168170
streaming_client: StreamingClient,
169171
storage: Arc<dyn DbReader>,
170172
time_service: TimeService,
173+
internal_indexer_db: Option<Arc<DB>>,
171174
) -> Self {
172175
let output_fallback_handler =
173176
OutputFallbackHandler::new(driver_configuration.clone(), time_service.clone());
@@ -178,6 +181,7 @@ impl<
178181
streaming_client.clone(),
179182
storage.clone(),
180183
storage_synchronizer.clone(),
184+
internal_indexer_db,
181185
);
182186
let continuous_syncer = ContinuousSyncer::new(
183187
driver_configuration.clone(),

‎state-sync/state-sync-driver/src/driver_factory.rs

+5
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ use aptos_event_notifications::{EventNotificationSender, EventSubscriptionServic
2020
use aptos_executor_types::ChunkExecutorTrait;
2121
use aptos_infallible::Mutex;
2222
use aptos_mempool_notifications::MempoolNotificationSender;
23+
use aptos_schemadb::DB;
2324
use aptos_storage_interface::DbReaderWriter;
2425
use aptos_storage_service_notifications::StorageServiceNotificationSender;
2526
use aptos_time_service::TimeService;
@@ -58,6 +59,7 @@ impl DriverFactory {
5859
aptos_data_client: AptosDataClient,
5960
streaming_service_client: StreamingServiceClient,
6061
time_service: TimeService,
62+
internal_indexer_db: Option<Arc<DB>>,
6163
) -> Self {
6264
let (driver_factory, _) = Self::create_and_spawn_driver_internal(
6365
create_runtime,
@@ -73,6 +75,7 @@ impl DriverFactory {
7375
aptos_data_client,
7476
streaming_service_client,
7577
time_service,
78+
internal_indexer_db,
7679
);
7780
driver_factory
7881
}
@@ -99,6 +102,7 @@ impl DriverFactory {
99102
aptos_data_client: AptosDataClient,
100103
streaming_service_client: StreamingServiceClient,
101104
time_service: TimeService,
105+
internal_indexer_db: Option<Arc<DB>>,
102106
) -> (Self, UnboundedSender<CommitNotification>) {
103107
// Notify subscribers of the initial on-chain config values
104108
match storage.reader.get_latest_state_checkpoint_version() {
@@ -179,6 +183,7 @@ impl DriverFactory {
179183
streaming_service_client,
180184
storage.reader,
181185
time_service,
186+
internal_indexer_db,
182187
);
183188

184189
// Spawn the driver

‎state-sync/state-sync-driver/src/storage_synchronizer.rs

+37-4
Original file line numberDiff line numberDiff line change
@@ -15,13 +15,15 @@ use crate::{
1515
};
1616
use aptos_config::config::StateSyncDriverConfig;
1717
use aptos_data_streaming_service::data_notification::NotificationId;
18+
use aptos_db_indexer_schemas::schema::state_keys::StateKeysSchema;
1819
use aptos_event_notifications::EventSubscriptionService;
1920
use aptos_executor_types::{ChunkCommitNotification, ChunkExecutorTrait};
2021
use aptos_infallible::Mutex;
2122
use aptos_logger::prelude::*;
2223
use aptos_mempool_notifications::MempoolNotificationSender;
2324
use aptos_metrics_core::HistogramTimer;
24-
use aptos_storage_interface::{DbReader, DbReaderWriter, StateSnapshotReceiver};
25+
use aptos_schemadb::{SchemaBatch, DB};
26+
use aptos_storage_interface::{AptosDbError, DbReader, DbReaderWriter, StateSnapshotReceiver};
2527
use aptos_storage_service_notifications::StorageServiceNotificationSender;
2628
use aptos_types::{
2729
ledger_info::LedgerInfoWithSignatures,
@@ -86,6 +88,7 @@ pub trait StorageSynchronizerInterface {
8688
epoch_change_proofs: Vec<LedgerInfoWithSignatures>,
8789
target_ledger_info: LedgerInfoWithSignatures,
8890
target_output_with_proof: TransactionOutputListWithProof,
91+
internal_indexer_db: Option<Arc<DB>>,
8992
) -> Result<JoinHandle<()>, Error>;
9093

9194
/// Returns true iff there is storage data that is still waiting
@@ -374,6 +377,7 @@ impl<
374377
epoch_change_proofs: Vec<LedgerInfoWithSignatures>,
375378
target_ledger_info: LedgerInfoWithSignatures,
376379
target_output_with_proof: TransactionOutputListWithProof,
380+
internal_indexer_db: Option<Arc<DB>>,
377381
) -> Result<JoinHandle<()>, Error> {
378382
// Create a channel to notify the state snapshot receiver when data chunks are ready
379383
let max_pending_data_chunks = self.driver_config.max_pending_data_chunks as usize;
@@ -393,6 +397,7 @@ impl<
393397
target_ledger_info,
394398
target_output_with_proof,
395399
self.runtime.clone(),
400+
internal_indexer_db,
396401
);
397402
self.state_snapshot_notifier = Some(state_snapshot_notifier);
398403

@@ -791,6 +796,20 @@ fn spawn_commit_post_processor<
791796
spawn(runtime, commit_post_processor)
792797
}
793798

799+
fn write_kv_to_indexer_db(
800+
internal_indexer_db: &Option<Arc<DB>>,
801+
kvs: &Vec<(StateKey, StateValue)>,
802+
) -> aptos_storage_interface::Result<()> {
803+
// add state value to internal indexer
804+
if let Some(indexer_db) = internal_indexer_db.as_ref() {
805+
let batch = SchemaBatch::new();
806+
for (state_key, _) in kvs {
807+
batch.put::<StateKeysSchema>(state_key, &())?;
808+
}
809+
indexer_db.write_schemas(batch)?;
810+
}
811+
Ok(())
812+
}
794813
/// Spawns a dedicated receiver that commits state values from a state snapshot
795814
fn spawn_state_snapshot_receiver<
796815
ChunkExecutor: ChunkExecutorTrait + 'static,
@@ -807,6 +826,7 @@ fn spawn_state_snapshot_receiver<
807826
target_ledger_info: LedgerInfoWithSignatures,
808827
target_output_with_proof: TransactionOutputListWithProof,
809828
runtime: Option<Handle>,
829+
internal_indexer_db: Option<Arc<DB>>,
810830
) -> JoinHandle<()> {
811831
// Create a state snapshot receiver
812832
let receiver = async move {
@@ -841,14 +861,17 @@ fn spawn_state_snapshot_receiver<
841861
let all_states_synced = states_with_proof.is_last_chunk();
842862
let last_committed_state_index = states_with_proof.last_index;
843863
let num_state_values = states_with_proof.raw_values.len();
864+
let indexer_results: Result<(), AptosDbError> =
865+
write_kv_to_indexer_db(&internal_indexer_db, &states_with_proof.raw_values);
866+
844867
let result = state_snapshot_receiver.add_chunk(
845868
states_with_proof.raw_values,
846869
states_with_proof.proof.clone(),
847870
);
848871

849872
// Handle the commit result
850-
match result {
851-
Ok(()) => {
873+
match (result, indexer_results) {
874+
(Ok(()), Ok(())) => {
852875
// Update the logs and metrics
853876
info!(
854877
LogSchema::new(LogEntry::StorageSynchronizer).message(&format!(
@@ -919,7 +942,7 @@ fn spawn_state_snapshot_receiver<
919942
decrement_pending_data_chunks(pending_data_chunks.clone());
920943
return; // There's nothing left to do!
921944
},
922-
Err(error) => {
945+
(Err(error), _) => {
923946
let error =
924947
format!("Failed to commit state value chunk! Error: {:?}", error);
925948
send_storage_synchronizer_error(
@@ -929,6 +952,16 @@ fn spawn_state_snapshot_receiver<
929952
)
930953
.await;
931954
},
955+
(_, Err(error)) => {
956+
let error =
957+
format!("Failed to commit state value chunk to internal indexer! Error: {:?}", error);
958+
send_storage_synchronizer_error(
959+
error_notification_sender.clone(),
960+
notification_id,
961+
error,
962+
)
963+
.await;
964+
},
932965
}
933966
},
934967
storage_data_chunk => {

‎state-sync/state-sync-driver/src/tests/bootstrapper.rs

+2
Original file line numberDiff line numberDiff line change
@@ -1628,6 +1628,7 @@ fn create_bootstrapper(
16281628
mock_streaming_client,
16291629
Arc::new(mock_database_reader),
16301630
mock_storage_synchronizer,
1631+
None,
16311632
);
16321633

16331634
(bootstrapper, output_fallback_handler)
@@ -1683,6 +1684,7 @@ fn create_bootstrapper_with_storage(
16831684
mock_streaming_client,
16841685
Arc::new(mock_database_reader),
16851686
mock_storage_synchronizer,
1687+
None,
16861688
)
16871689
}
16881690

‎state-sync/state-sync-driver/src/tests/driver.rs

+1
Original file line numberDiff line numberDiff line change
@@ -406,6 +406,7 @@ async fn create_driver_for_tests(
406406
aptos_data_client,
407407
streaming_service_client,
408408
time_service.clone(),
409+
None,
409410
);
410411

411412
// The driver will notify reconfiguration subscribers of the initial configs.

‎state-sync/state-sync-driver/src/tests/driver_factory.rs

+1
Original file line numberDiff line numberDiff line change
@@ -99,6 +99,7 @@ fn test_new_initialized_configs() {
9999
aptos_data_client,
100100
streaming_service_client,
101101
TimeService::mock(),
102+
None,
102103
);
103104

104105
// Verify the initial configs were notified

‎state-sync/state-sync-driver/src/tests/mocks.rs

+2
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@ use aptos_data_streaming_service::{
1515
streaming_client::{DataStreamingClient, Epoch, NotificationAndFeedback},
1616
};
1717
use aptos_executor_types::{ChunkCommitNotification, ChunkExecutorTrait};
18+
use aptos_schemadb::DB;
1819
use aptos_storage_interface::{
1920
cached_state_view::ShardedStateCache, state_delta::StateDelta, DbReader, DbReaderWriter,
2021
DbWriter, ExecutedTrees, Order, Result, StateSnapshotReceiver,
@@ -473,6 +474,7 @@ mock! {
473474
epoch_change_proofs: Vec<LedgerInfoWithSignatures>,
474475
target_ledger_info: LedgerInfoWithSignatures,
475476
target_output_with_proof: TransactionOutputListWithProof,
477+
internal_indexer_db: Option<Arc<DB>>,
476478
) -> AnyhowResult<JoinHandle<()>, crate::error::Error>;
477479

478480
fn pending_storage_data(&self) -> bool;

‎state-sync/state-sync-driver/src/tests/storage_synchronizer.rs

+5
Original file line numberDiff line numberDiff line change
@@ -641,6 +641,7 @@ async fn test_initialize_state_synchronizer_missing_info() {
641641
vec![create_epoch_ending_ledger_info()],
642642
create_epoch_ending_ledger_info(),
643643
output_list_with_proof,
644+
None,
644645
)
645646
.unwrap();
646647

@@ -673,6 +674,7 @@ async fn test_initialize_state_synchronizer_receiver_error() {
673674
vec![create_epoch_ending_ledger_info()],
674675
create_epoch_ending_ledger_info(),
675676
create_output_list_with_proof(),
677+
None,
676678
)
677679
.unwrap();
678680

@@ -744,6 +746,7 @@ async fn test_save_states_completion() {
744746
epoch_change_proofs.to_vec(),
745747
target_ledger_info,
746748
output_list_with_proof.clone(),
749+
None,
747750
)
748751
.unwrap();
749752

@@ -803,6 +806,7 @@ async fn test_save_states_dropped_error_listener() {
803806
vec![create_epoch_ending_ledger_info()],
804807
create_epoch_ending_ledger_info(),
805808
create_output_list_with_proof(),
809+
None,
806810
)
807811
.unwrap();
808812

@@ -845,6 +849,7 @@ async fn test_save_states_invalid_chunk() {
845849
vec![create_epoch_ending_ledger_info()],
846850
create_epoch_ending_ledger_info(),
847851
create_output_list_with_proof(),
852+
None,
848853
)
849854
.unwrap();
850855

‎storage/indexer/src/db_indexer.rs

+84-30
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,13 @@
11
// Copyright (c) Aptos Foundation
22
// SPDX-License-Identifier: Apache-2.0
33

4+
use crate::utils::PrefixedStateValueIterator;
45
use aptos_config::config::internal_indexer_db_config::InternalIndexerDBConfig;
56
use aptos_db_indexer_schemas::{
67
metadata::{MetadataKey, MetadataValue},
78
schema::{
89
event_by_key::EventByKeySchema, event_by_version::EventByVersionSchema,
9-
indexer_metadata::InternalIndexerMetadataSchema,
10+
indexer_metadata::InternalIndexerMetadataSchema, state_keys::StateKeysSchema,
1011
transaction_by_account::TransactionByAccountSchema,
1112
},
1213
utils::{
@@ -23,7 +24,12 @@ use aptos_types::{
2324
contract_event::{ContractEvent, EventWithVersion},
2425
event::EventKey,
2526
indexer::indexer_db_reader::Order,
27+
state_store::{
28+
state_key::{prefix::StateKeyPrefix, StateKey},
29+
state_value::StateValue,
30+
},
2631
transaction::{AccountTransactionsWithProof, Transaction, Version},
32+
write_set::{TransactionWrite, WriteSet},
2733
};
2834
use std::{
2935
cmp::min,
@@ -120,31 +126,43 @@ impl DBIndexer {
120126
self.config.enable_transaction
121127
}
122128

129+
pub fn statekeys_enabled(&self) -> bool {
130+
self.config.enable_statekeys
131+
}
132+
123133
fn get_main_db_iter(
124134
&self,
125135
start_version: Version,
126136
num_transactions: u64,
127-
) -> Result<impl Iterator<Item = Result<(Transaction, Vec<ContractEvent>)>> + '_> {
137+
) -> Result<impl Iterator<Item = Result<(Transaction, Vec<ContractEvent>, WriteSet)>> + '_>
138+
{
128139
let txn_iter = self
129140
.main_db_reader
130141
.get_transaction_iterator(start_version, num_transactions)?;
131142
let event_vec_iter = self
132143
.main_db_reader
133144
.get_events_iterator(start_version, num_transactions)?;
134-
let zipped = txn_iter
135-
.zip(event_vec_iter)
136-
.map(|(txn_res, event_vec_res)| {
145+
let writeset_iter = self
146+
.main_db_reader
147+
.get_write_set_iterator(start_version, num_transactions)?;
148+
let zipped = txn_iter.zip(event_vec_iter).zip(writeset_iter).map(
149+
|((txn_res, event_vec_res), writeset_res)| {
137150
let txn = txn_res?;
138151
let event_vec = event_vec_res?;
139-
Ok((txn, event_vec))
140-
});
152+
let writeset = writeset_res?;
153+
Ok((txn, event_vec, writeset))
154+
},
155+
);
141156
Ok(zipped)
142157
}
143158

144159
fn get_num_of_transactions(&self, version: Version) -> Result<u64> {
145160
let highest_version = self.main_db_reader.get_synced_version()?;
146161
// we want to include the last transaction since the iterator interface will is right exclusive.
147-
let num_of_transaction = min(self.config.batch_size as u64, highest_version - version) + 1;
162+
let num_of_transaction = min(
163+
(self.config.batch_size + 1) as u64,
164+
highest_version - version + 1,
165+
);
148166
Ok(num_of_transaction)
149167
}
150168

@@ -155,34 +173,45 @@ impl DBIndexer {
155173
let mut db_iter = self.get_main_db_iter(version, num_transactions)?;
156174
let batch = SchemaBatch::new();
157175
db_iter.try_for_each(|res| {
158-
let (txn, events) = res?;
176+
let (txn, events, writeset) = res?;
159177
if let Some(txn) = txn.try_as_signed_user_txn() {
160178
if self.config.enable_transaction {
161179
batch.put::<TransactionByAccountSchema>(
162180
&(txn.sender(), txn.sequence_number()),
163181
&version,
164182
)?;
165183
}
184+
}
166185

167-
if self.config.enable_event {
168-
events.iter().enumerate().for_each(|(idx, event)| {
169-
if let ContractEvent::V1(v1) = event {
170-
batch
171-
.put::<EventByKeySchema>(
172-
&(*v1.key(), v1.sequence_number()),
173-
&(version, idx as u64),
174-
)
175-
.expect("Failed to put events by key to a batch");
176-
batch
177-
.put::<EventByVersionSchema>(
178-
&(*v1.key(), version, v1.sequence_number()),
179-
&(idx as u64),
180-
)
181-
.expect("Failed to put events by version to a batch");
182-
}
183-
});
184-
}
186+
if self.config.enable_event {
187+
events.iter().enumerate().for_each(|(idx, event)| {
188+
if let ContractEvent::V1(v1) = event {
189+
batch
190+
.put::<EventByKeySchema>(
191+
&(*v1.key(), v1.sequence_number()),
192+
&(version, idx as u64),
193+
)
194+
.expect("Failed to put events by key to a batch");
195+
batch
196+
.put::<EventByVersionSchema>(
197+
&(*v1.key(), version, v1.sequence_number()),
198+
&(idx as u64),
199+
)
200+
.expect("Failed to put events by version to a batch");
201+
}
202+
});
203+
}
204+
205+
if self.config.enable_statekeys {
206+
writeset.iter().for_each(|(state_key, write_op)| {
207+
if write_op.is_creation() {
208+
batch
209+
.put::<StateKeysSchema>(state_key, &())
210+
.expect("Failed to put state keys to a batch");
211+
}
212+
});
185213
}
214+
186215
version += 1;
187216
Ok::<(), AptosDbError>(())
188217
})?;
@@ -270,6 +299,16 @@ impl DBIndexer {
270299
Ok(result)
271300
}
272301

302+
#[cfg(any(test, feature = "fuzzing"))]
303+
pub fn get_state_keys(&self, prefix: &StateKeyPrefix) -> Result<Vec<StateKey>> {
304+
let mut iter = self.db.iter::<StateKeysSchema>()?;
305+
iter.seek_to_first();
306+
Ok(iter
307+
.map(|res| res.unwrap().0)
308+
.filter(|k| prefix.is_prefix(k).unwrap())
309+
.collect())
310+
}
311+
273312
#[cfg(any(test, feature = "fuzzing"))]
274313
pub fn get_event_by_key_iter(
275314
&self,
@@ -289,7 +328,7 @@ impl DBIndexer {
289328
order: Order,
290329
limit: u64,
291330
ledger_version: Version,
292-
) -> anyhow::Result<Vec<EventWithVersion>> {
331+
) -> Result<Vec<EventWithVersion>> {
293332
self.get_events_by_event_key(event_key, start, order, limit, ledger_version)
294333
}
295334

@@ -300,7 +339,7 @@ impl DBIndexer {
300339
order: Order,
301340
limit: u64,
302341
ledger_version: Version,
303-
) -> anyhow::Result<Vec<EventWithVersion>> {
342+
) -> Result<Vec<EventWithVersion>> {
304343
error_if_too_many_requested(limit, MAX_REQUEST_LIMIT)?;
305344
let get_latest = order == Order::Descending && start_seq_num == u64::max_value();
306345

@@ -367,7 +406,7 @@ impl DBIndexer {
367406
limit: u64,
368407
include_events: bool,
369408
ledger_version: Version,
370-
) -> anyhow::Result<AccountTransactionsWithProof> {
409+
) -> Result<AccountTransactionsWithProof> {
371410
error_if_too_many_requested(limit, MAX_REQUEST_LIMIT)?;
372411

373412
let txns_with_proofs = self
@@ -384,4 +423,19 @@ impl DBIndexer {
384423

385424
Ok(AccountTransactionsWithProof::new(txns_with_proofs))
386425
}
426+
427+
pub fn get_prefixed_state_value_iterator(
428+
&self,
429+
key_prefix: &StateKeyPrefix,
430+
cursor: Option<&StateKey>,
431+
version: Version,
432+
) -> Result<impl Iterator<Item = anyhow::Result<(StateKey, StateValue)>> + '_> {
433+
PrefixedStateValueIterator::new(
434+
self.main_db_reader.clone(),
435+
self.db.as_ref(),
436+
key_prefix.clone(),
437+
cursor.cloned(),
438+
version,
439+
)
440+
}
387441
}

‎storage/indexer/src/indexer_reader.rs

+51-19
Original file line numberDiff line numberDiff line change
@@ -2,13 +2,17 @@
22
// SPDX-License-Identifier: Apache-2.0
33

44
use crate::{db_indexer::DBIndexer, db_v2::IndexerAsyncV2};
5-
use anyhow::{bail, Result};
5+
use anyhow::anyhow;
66
use aptos_types::{
77
account_address::AccountAddress,
88
contract_event::EventWithVersion,
99
event::EventKey,
1010
indexer::indexer_db_reader::{IndexerReader, Order},
11-
state_store::table::{TableHandle, TableInfo},
11+
state_store::{
12+
state_key::{prefix::StateKeyPrefix, StateKey},
13+
state_value::StateValue,
14+
table::{TableHandle, TableInfo},
15+
},
1216
transaction::{AccountTransactionsWithProof, Version},
1317
};
1418
use std::sync::Arc;
@@ -35,11 +39,11 @@ impl IndexerReaders {
3539
}
3640

3741
impl IndexerReader for IndexerReaders {
38-
fn get_table_info(&self, handle: TableHandle) -> Result<Option<TableInfo>> {
42+
fn get_table_info(&self, handle: TableHandle) -> anyhow::Result<Option<TableInfo>> {
3943
if let Some(table_info_reader) = &self.table_info_reader {
4044
return Ok(table_info_reader.get_table_info_with_retry(handle)?);
4145
}
42-
bail!("Table info reader is not available")
46+
anyhow::bail!("Table info reader is not available")
4347
}
4448

4549
fn get_events(
@@ -49,21 +53,21 @@ impl IndexerReader for IndexerReaders {
4953
order: Order,
5054
limit: u64,
5155
ledger_version: Version,
52-
) -> Result<Vec<EventWithVersion>> {
56+
) -> anyhow::Result<Vec<EventWithVersion>> {
5357
if let Some(db_indexer_reader) = &self.db_indexer_reader {
5458
if db_indexer_reader.event_enabled() {
55-
return db_indexer_reader.get_events(
59+
return Ok(db_indexer_reader.get_events(
5660
event_key,
5761
start,
5862
order,
5963
limit,
6064
ledger_version,
61-
);
65+
)?);
6266
} else {
63-
bail!("Event index is not enabled")
67+
anyhow::bail!("Event index is not enabled")
6468
}
6569
}
66-
bail!("DB Indexer reader is not available")
70+
anyhow::bail!("DB Indexer reader is not available")
6771
}
6872

6973
fn get_events_by_event_key(
@@ -73,21 +77,21 @@ impl IndexerReader for IndexerReaders {
7377
order: Order,
7478
limit: u64,
7579
ledger_version: Version,
76-
) -> Result<Vec<EventWithVersion>> {
80+
) -> anyhow::Result<Vec<EventWithVersion>> {
7781
if let Some(db_indexer_reader) = &self.db_indexer_reader {
7882
if db_indexer_reader.event_enabled() {
79-
return db_indexer_reader.get_events_by_event_key(
83+
return Ok(db_indexer_reader.get_events_by_event_key(
8084
event_key,
8185
start_seq_num,
8286
order,
8387
limit,
8488
ledger_version,
85-
);
89+
)?);
8690
} else {
87-
bail!("Event index is not enabled")
91+
anyhow::bail!("Event index is not enabled")
8892
}
8993
}
90-
bail!("DB indexer reader is not available")
94+
anyhow::bail!("DB indexer reader is not available")
9195
}
9296

9397
fn get_account_transactions(
@@ -97,20 +101,48 @@ impl IndexerReader for IndexerReaders {
97101
limit: u64,
98102
include_events: bool,
99103
ledger_version: Version,
100-
) -> Result<AccountTransactionsWithProof> {
104+
) -> anyhow::Result<AccountTransactionsWithProof> {
101105
if let Some(db_indexer_reader) = &self.db_indexer_reader {
102106
if db_indexer_reader.transaction_enabled() {
103-
return db_indexer_reader.get_account_transactions(
107+
return Ok(db_indexer_reader.get_account_transactions(
104108
address,
105109
start_seq_num,
106110
limit,
107111
include_events,
108112
ledger_version,
109-
);
113+
)?);
110114
} else {
111-
bail!("Transaction by account index is not enabled")
115+
anyhow::bail!("Transaction by account index is not enabled")
112116
}
113117
}
114-
bail!("DB indexer reader is not available")
118+
anyhow::bail!("DB indexer reader is not available")
119+
}
120+
121+
fn get_prefixed_state_value_iterator(
122+
&self,
123+
key_prefix: &StateKeyPrefix,
124+
cursor: Option<&StateKey>,
125+
version: Version,
126+
) -> anyhow::Result<Box<dyn Iterator<Item = anyhow::Result<(StateKey, StateValue)>> + '_>> {
127+
if let Some(db_indexer_reader) = &self.db_indexer_reader {
128+
if db_indexer_reader.statekeys_enabled() {
129+
return Ok(Box::new(
130+
db_indexer_reader
131+
.get_prefixed_state_value_iterator(key_prefix, cursor, version)
132+
.map_err(|err| {
133+
anyhow!(format!(
134+
"failed to get prefixed state value iterator {}",
135+
err
136+
))
137+
})?,
138+
)
139+
as Box<
140+
dyn Iterator<Item = anyhow::Result<(StateKey, StateValue)>>,
141+
>);
142+
} else {
143+
anyhow::bail!("StateKeys index is not enabled")
144+
}
145+
}
146+
anyhow::bail!("DB indexer reader is not available")
115147
}
116148
}

‎storage/indexer/src/lib.rs

+1
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ pub mod db_indexer;
77
pub mod db_ops;
88
pub mod db_v2;
99
pub mod indexer_reader;
10+
mod utils;
1011

1112
use crate::db::INDEX_DB_NAME;
1213
use aptos_config::config::RocksdbConfig;

‎storage/indexer/src/utils.rs

+83
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,83 @@
1+
// Copyright (c) Aptos Foundation
2+
// SPDX-License-Identifier: Apache-2.0
3+
4+
use aptos_db_indexer_schemas::schema::state_keys::StateKeysSchema;
5+
use aptos_schemadb::{iterator::SchemaIterator, ReadOptions, DB};
6+
use aptos_storage_interface::{DbReader, Result};
7+
use aptos_types::{
8+
state_store::{
9+
state_key::{prefix::StateKeyPrefix, StateKey},
10+
state_value::StateValue,
11+
},
12+
transaction::Version,
13+
};
14+
use std::sync::Arc;
15+
16+
pub struct PrefixedStateValueIterator<'a> {
17+
state_keys_iter: SchemaIterator<'a, StateKeysSchema>,
18+
main_db: Arc<dyn DbReader>,
19+
key_prefix: StateKeyPrefix,
20+
desired_version: Version, // state values before the version
21+
is_finished: bool,
22+
}
23+
24+
impl<'a> PrefixedStateValueIterator<'a> {
25+
pub fn new(
26+
main_db_reader: Arc<dyn DbReader>,
27+
indexer_db: &'a DB,
28+
key_prefix: StateKeyPrefix,
29+
first_key: Option<StateKey>,
30+
desired_version: Version,
31+
) -> Result<Self> {
32+
let mut read_opt = ReadOptions::default();
33+
read_opt.set_total_order_seek(true);
34+
let mut state_keys_iter = indexer_db.iter_with_opts::<StateKeysSchema>(read_opt)?;
35+
if let Some(first_key) = first_key {
36+
state_keys_iter.seek(&first_key)?;
37+
} else {
38+
state_keys_iter.seek(&&key_prefix)?;
39+
};
40+
Ok(Self {
41+
state_keys_iter,
42+
main_db: main_db_reader,
43+
key_prefix,
44+
desired_version,
45+
is_finished: false,
46+
})
47+
}
48+
49+
pub fn next_impl(&mut self) -> anyhow::Result<Option<(StateKey, StateValue)>> {
50+
let iter = &mut self.state_keys_iter;
51+
if self.is_finished {
52+
return Ok(None);
53+
}
54+
while let Some((state_key, _)) = iter.next().transpose()? {
55+
if !self.key_prefix.is_prefix(&state_key)? {
56+
self.is_finished = true;
57+
return Ok(None);
58+
}
59+
60+
match self
61+
.main_db
62+
.get_state_value_by_version(&state_key, self.desired_version)?
63+
{
64+
Some(state_value) => {
65+
return Ok(Some((state_key, state_value)));
66+
},
67+
None => {
68+
// state key doesn't have value before the desired version, continue to next state key
69+
continue;
70+
},
71+
}
72+
}
73+
Ok(None)
74+
}
75+
}
76+
77+
impl<'a> Iterator for PrefixedStateValueIterator<'a> {
78+
type Item = anyhow::Result<(StateKey, StateValue)>;
79+
80+
fn next(&mut self) -> Option<Self::Item> {
81+
self.next_impl().transpose()
82+
}
83+
}

‎storage/indexer_schemas/src/schema/mod.rs

+3
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@
99
pub mod event_by_key;
1010
pub mod event_by_version;
1111
pub mod indexer_metadata;
12+
pub mod state_keys;
1213
pub mod table_info;
1314
pub mod transaction_by_account;
1415
use aptos_schemadb::ColumnFamilyName;
@@ -20,6 +21,7 @@ pub const TABLE_INFO_CF_NAME: ColumnFamilyName = "table_info";
2021
pub const EVENT_BY_KEY_CF_NAME: ColumnFamilyName = "event_by_key";
2122
pub const EVENT_BY_VERSION_CF_NAME: ColumnFamilyName = "event_by_version";
2223
pub const TRANSACTION_BY_ACCOUNT_CF_NAME: ColumnFamilyName = "transaction_by_account";
24+
pub const STATE_KEYS_CF_NAME: ColumnFamilyName = "state_keys";
2325

2426
pub fn column_families() -> Vec<ColumnFamilyName> {
2527
vec![
@@ -36,5 +38,6 @@ pub fn internal_indexer_column_families() -> Vec<ColumnFamilyName> {
3638
EVENT_BY_KEY_CF_NAME,
3739
EVENT_BY_VERSION_CF_NAME,
3840
TRANSACTION_BY_ACCOUNT_CF_NAME,
41+
STATE_KEYS_CF_NAME,
3942
]
4043
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,43 @@
1+
// Copyright © Aptos Foundation
2+
// SPDX-License-Identifier: Apache-2.0
3+
4+
use crate::{schema::STATE_KEYS_CF_NAME, utils::ensure_slice_len_eq};
5+
use anyhow::Result;
6+
use aptos_schemadb::{
7+
define_pub_schema,
8+
schema::{KeyCodec, SeekKeyCodec, ValueCodec},
9+
};
10+
use aptos_types::state_store::state_key::{prefix::StateKeyPrefix, StateKey};
11+
12+
define_pub_schema!(StateKeysSchema, StateKey, (), STATE_KEYS_CF_NAME);
13+
14+
impl KeyCodec<StateKeysSchema> for StateKey {
15+
fn encode_key(&self) -> Result<Vec<u8>> {
16+
Ok(self.encoded().to_vec())
17+
}
18+
19+
fn decode_key(data: &[u8]) -> Result<Self> {
20+
let state_key: StateKey = StateKey::decode(data)?;
21+
Ok(state_key)
22+
}
23+
}
24+
25+
impl ValueCodec<StateKeysSchema> for () {
26+
fn encode_value(&self) -> Result<Vec<u8>> {
27+
Ok(Vec::new())
28+
}
29+
30+
fn decode_value(data: &[u8]) -> Result<Self> {
31+
ensure_slice_len_eq(data, 0)?;
32+
Ok(())
33+
}
34+
}
35+
36+
impl SeekKeyCodec<StateKeysSchema> for &StateKeyPrefix {
37+
fn encode_seek_key(&self) -> Result<Vec<u8>> {
38+
self.encode()
39+
}
40+
}
41+
42+
#[cfg(test)]
43+
mod test;
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,17 @@
1+
// Copyright © Aptos Foundation
2+
// SPDX-License-Identifier: Apache-2.0
3+
4+
use super::*;
5+
use aptos_schemadb::{schema::fuzzing::assert_encode_decode, test_no_panic_decoding};
6+
use proptest::prelude::*;
7+
8+
proptest! {
9+
#[test]
10+
fn test_encode_decode(
11+
state_key in any::<StateKey>(),
12+
) {
13+
assert_encode_decode::<StateKeysSchema>(&state_key, &());
14+
}
15+
}
16+
17+
test_no_panic_decoding!(StateKeysSchema);

‎testsuite/smoke-test/Cargo.toml

+5
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ aptos-config = { workspace = true }
2121
aptos-consensus = { workspace = true }
2222
aptos-crypto = { workspace = true }
2323
aptos-db = { workspace = true }
24+
aptos-db-indexer-schemas = { workspace = true }
2425
aptos-debugger = { workspace = true }
2526
aptos-dkg = { workspace = true }
2627
aptos-faucet-core = { workspace = true }
@@ -30,12 +31,14 @@ aptos-gas-algebra = { workspace = true }
3031
aptos-gas-schedule = { workspace = true, features = ["testing"] }
3132
aptos-global-constants = { workspace = true }
3233
aptos-indexer = { workspace = true }
34+
aptos-indexer-grpc-table-info = { workspace = true }
3335
aptos-inspection-service = { workspace = true }
3436
aptos-keygen = { workspace = true }
3537
aptos-move-debugger = { workspace = true }
3638
aptos-release-builder = { workspace = true }
3739
aptos-rest-client = { workspace = true }
3840
aptos-rosetta = { workspace = true }
41+
aptos-schemadb = { workspace = true }
3942
aptos-sdk = { workspace = true }
4043
aptos-storage-interface = { workspace = true }
4144
aptos-temppath = { workspace = true }
@@ -76,3 +79,5 @@ rand = { workspace = true }
7679
regex = { workspace = true }
7780
reqwest = { workspace = true }
7881
serde_yaml = { workspace = true }
82+
tempfile = { workspace = true }
83+

‎testsuite/smoke-test/src/fullnode.rs

+110-6
Original file line numberDiff line numberDiff line change
@@ -2,15 +2,31 @@
22
// Parts of the project are originally copyright © Meta Platforms, Inc.
33
// SPDX-License-Identifier: Apache-2.0
44

5-
use crate::{smoke_test_environment::new_local_swarm_with_aptos, utils::MAX_HEALTHY_WAIT_SECS};
5+
use crate::{
6+
smoke_test_environment::{new_local_swarm_with_aptos, SwarmBuilder},
7+
state_sync_utils::create_fullnode,
8+
utils::{create_test_accounts, execute_transactions, MAX_HEALTHY_WAIT_SECS},
9+
};
610
use anyhow::bail;
711
use aptos_cached_packages::aptos_stdlib;
8-
use aptos_config::config::{NodeConfig, OverrideNodeConfig};
9-
use aptos_forge::{NodeExt, Result, Swarm};
12+
use aptos_config::config::{BootstrappingMode, NodeConfig, OverrideNodeConfig};
13+
use aptos_db::AptosDB;
14+
use aptos_db_indexer_schemas::schema::state_keys::StateKeysSchema;
15+
use aptos_forge::{NodeExt, Result, Swarm, SwarmExt};
16+
use aptos_indexer_grpc_table_info::internal_indexer_db_service::InternalIndexerDBService;
1017
use aptos_rest_client::Client as RestClient;
11-
use aptos_types::account_address::AccountAddress;
12-
use std::time::{Duration, Instant};
13-
18+
use aptos_schemadb::DB;
19+
use aptos_storage_interface::DbReader;
20+
use aptos_types::{
21+
account_address::AccountAddress,
22+
state_store::state_key::{prefix::StateKeyPrefix, StateKey},
23+
transaction::Version,
24+
};
25+
use std::{
26+
collections::HashSet,
27+
sync::Arc,
28+
time::{Duration, Instant},
29+
};
1430
#[tokio::test]
1531
async fn test_indexer() {
1632
let mut swarm = new_local_swarm_with_aptos(1).await;
@@ -86,3 +102,91 @@ async fn wait_for_account(client: &RestClient, address: AccountAddress) -> Resul
86102
}
87103
bail!("wait for account(address={}) timeout", address,)
88104
}
105+
106+
fn enable_internal_indexer(node_config: &mut NodeConfig) {
107+
node_config.indexer_db_config.enable_event = true;
108+
node_config.indexer_db_config.enable_transaction = true;
109+
node_config.indexer_db_config.enable_statekeys = true;
110+
}
111+
112+
#[tokio::test]
113+
async fn test_internal_indexer_with_fast_sync() {
114+
// Create a swarm with 2 validators
115+
let mut swarm = SwarmBuilder::new_local(2)
116+
.with_aptos()
117+
.with_init_config(Arc::new(move |_, config, _| {
118+
config.state_sync.state_sync_driver.bootstrapping_mode =
119+
BootstrappingMode::DownloadLatestStates;
120+
}))
121+
.build()
122+
.await;
123+
124+
let validator_peer_id = swarm.validators().next().unwrap().peer_id();
125+
let validator_client = swarm.validator(validator_peer_id).unwrap().rest_client();
126+
let (mut account_0, account_1) = create_test_accounts(&mut swarm).await;
127+
128+
execute_transactions(
129+
&mut swarm,
130+
&validator_client,
131+
&mut account_0,
132+
&account_1,
133+
true,
134+
)
135+
.await;
136+
137+
let ledger_info = validator_client.get_ledger_information().await.unwrap();
138+
println!("ledger_info: {:?}", ledger_info);
139+
let mut vfn_config = NodeConfig::get_default_vfn_config();
140+
vfn_config.state_sync.state_sync_driver.bootstrapping_mode =
141+
BootstrappingMode::DownloadLatestStates;
142+
enable_internal_indexer(&mut vfn_config);
143+
144+
let peer_id = create_fullnode(vfn_config.clone(), &mut swarm).await;
145+
swarm
146+
.wait_for_all_nodes_to_catchup(Duration::from_secs(60))
147+
.await
148+
.unwrap();
149+
let node = swarm.full_node_mut(peer_id).unwrap();
150+
let node_config = node.config().to_owned();
151+
node.stop().await.unwrap();
152+
check_indexer_db(&node_config);
153+
}
154+
155+
fn check_indexer_db(vfn_config: &NodeConfig) {
156+
let aptos_db_dir = vfn_config
157+
.storage
158+
.get_dir_paths()
159+
.default_root_path()
160+
.to_owned();
161+
let path = aptos_db_dir.as_path();
162+
let aptos_db = AptosDB::new_for_test(path);
163+
164+
let internal_indexer_db = InternalIndexerDBService::get_indexer_db(vfn_config).unwrap();
165+
let prefix = StateKeyPrefix::from(AccountAddress::from_hex_literal("0x1").unwrap());
166+
let main_db_iter = aptos_db
167+
.get_prefixed_state_value_iterator(&prefix, None, Version::MAX)
168+
.unwrap();
169+
let main_db_keys: HashSet<StateKey> = main_db_iter.map(|iter| iter.unwrap().0).collect();
170+
let indexer_keys: HashSet<StateKey> =
171+
get_indexer_db_content::<StateKeysSchema, StateKey>(internal_indexer_db.clone());
172+
println!(
173+
"Total state keys: {}, {}",
174+
main_db_keys.len(),
175+
indexer_keys.len()
176+
);
177+
assert!(!main_db_keys.is_empty());
178+
// 0x1 statekeys are synced and is subset of indexer statekeys
179+
assert!(main_db_keys.is_subset(&indexer_keys));
180+
}
181+
182+
fn get_indexer_db_content<T, U>(internal_indexer_db: Arc<DB>) -> HashSet<U>
183+
where
184+
T: aptos_schemadb::schema::Schema,
185+
U: aptos_schemadb::schema::KeyCodec<T> + std::cmp::Ord + std::fmt::Debug,
186+
std::collections::HashSet<U>:
187+
std::iter::FromIterator<<T as aptos_schemadb::schema::Schema>::Key>,
188+
{
189+
let mut indexer_db_iter = internal_indexer_db.iter::<T>().unwrap();
190+
indexer_db_iter.seek_to_first();
191+
indexer_db_iter.map(|iter| iter.unwrap().0).collect()
192+
}

‎types/src/indexer/indexer_db_reader.rs

+12-1
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,11 @@ use crate::{
55
account_address::AccountAddress,
66
contract_event::EventWithVersion,
77
event::EventKey,
8-
state_store::table::{TableHandle, TableInfo},
8+
state_store::{
9+
state_key::{prefix::StateKeyPrefix, StateKey},
10+
state_value::StateValue,
11+
table::{TableHandle, TableInfo},
12+
},
913
transaction::{AccountTransactionsWithProof, Version},
1014
};
1115
use anyhow::Result;
@@ -45,4 +49,11 @@ pub trait IndexerReader: Send + Sync {
4549
include_events: bool,
4650
ledger_version: Version,
4751
) -> Result<AccountTransactionsWithProof>;
52+
53+
fn get_prefixed_state_value_iterator(
54+
&self,
55+
key_prefix: &StateKeyPrefix,
56+
cursor: Option<&StateKey>,
57+
version: Version,
58+
) -> Result<Box<dyn Iterator<Item = Result<(StateKey, StateValue)>> + '_>>;
4859
}

0 commit comments

Comments
 (0)
Please sign in to comment.