diff --git a/crates/sui-indexer-alt/src/pipeline/concurrent/collector.rs b/crates/sui-indexer-alt/src/pipeline/concurrent/collector.rs index a54766ad9a679..ee83f3014292b 100644 --- a/crates/sui-indexer-alt/src/pipeline/concurrent/collector.rs +++ b/crates/sui-indexer-alt/src/pipeline/concurrent/collector.rs @@ -31,8 +31,11 @@ struct Pending { impl Pending { /// Whether there are values left to commit from this indexed checkpoint. fn is_empty(&self) -> bool { - debug_assert!(self.watermark.batch_rows == 0); - self.values.is_empty() + let empty = self.values.is_empty(); + if empty { + debug_assert!(self.watermark.batch_rows == 0); + } + empty } /// Adds data from this indexed checkpoint to the `batch`, honoring the handler's bounds on @@ -132,7 +135,7 @@ pub(super) fn collector( pipeline = H::NAME, elapsed_ms = elapsed * 1000.0, rows = batch.len(), - pending = pending_rows, + pending_rows = pending_rows, "Gathered batch", ); @@ -179,3 +182,166 @@ pub(super) fn collector( } }) } + +#[cfg(test)] +mod tests { + use crate::{db, pipeline::Processor}; + + use super::*; + use prometheus::Registry; + use std::time::Duration; + use sui_types::full_checkpoint_content::CheckpointData; + use tokio::sync::mpsc; + + struct TestHandler; + impl Processor for TestHandler { + type Value = u64; + const NAME: &'static str = "test_handler"; + const FANOUT: usize = 1; + + fn process(&self, _checkpoint: &Arc) -> anyhow::Result> { + Ok(vec![]) + } + } + + #[async_trait::async_trait] + impl Handler for TestHandler { + const MIN_EAGER_ROWS: usize = 3; + const MAX_CHUNK_ROWS: usize = 4; + const MAX_PENDING_ROWS: usize = 10; + + async fn commit( + _values: &[Self::Value], + _conn: &mut db::Connection<'_>, + ) -> anyhow::Result { + tokio::time::sleep(Duration::from_millis(1000)).await; + Ok(0) + } + } + + #[tokio::test] + async fn test_collector_batches_data() { + let (processor_tx, processor_rx) = mpsc::channel(10); + let (collector_tx, mut collector_rx) = mpsc::channel(10); + let metrics = Arc::new(IndexerMetrics::new(&Registry::new())); + let cancel = CancellationToken::new(); + + let _collector = collector::( + PipelineConfig::default(), + processor_rx, + collector_tx, + metrics, + cancel.clone(), + ); + + // Send test data + let test_data = vec![ + Indexed::new(0, 1, 10, 1000, vec![1, 2]), + Indexed::new(0, 2, 20, 2000, vec![3, 4]), + Indexed::new(0, 3, 30, 3000, vec![5, 6]), + ]; + + for data in test_data { + processor_tx.send(data).await.unwrap(); + } + + let batch1 = collector_rx.recv().await.unwrap(); + assert_eq!(batch1.len(), 4); + + let batch2 = collector_rx.recv().await.unwrap(); + assert_eq!(batch2.len(), 2); + + let batch3 = collector_rx.recv().await.unwrap(); + assert_eq!(batch3.len(), 0); + + cancel.cancel(); + } + + #[tokio::test] + async fn test_collector_shutdown() { + let (processor_tx, processor_rx) = mpsc::channel(10); + let (collector_tx, mut collector_rx) = mpsc::channel(10); + let metrics = Arc::new(IndexerMetrics::new(&Registry::new())); + let cancel = CancellationToken::new(); + + let _collector = collector::( + PipelineConfig::default(), + processor_rx, + collector_tx, + metrics, + cancel.clone(), + ); + + processor_tx + .send(Indexed::new(0, 1, 10, 1000, vec![1, 2])) + .await + .unwrap(); + + let batch = collector_rx.recv().await.unwrap(); + assert_eq!(batch.len(), 2); + + // Drop processor sender to simulate shutdown + drop(processor_tx); + + // After a short delay, collector should shut down + tokio::time::sleep(Duration::from_millis(200)).await; + assert!(collector_rx.try_recv().is_err()); + + cancel.cancel(); + } + + #[tokio::test] + async fn test_collector_respects_max_pending() { + let processor_channel_size = 5; // unit is checkpoint + let collector_channel_size = 10; // unit is batch, aka rows / MAX_CHUNK_ROWS + let (processor_tx, processor_rx) = mpsc::channel(processor_channel_size); + let (collector_tx, _collector_rx) = mpsc::channel(collector_channel_size); + + let metrics = Arc::new(IndexerMetrics::new(&Registry::new())); + + let cancel = CancellationToken::new(); + + let _collector = collector::( + PipelineConfig::default(), + processor_rx, + collector_tx, + metrics.clone(), + cancel.clone(), + ); + + // Send more data than MAX_PENDING_ROWS plus collector channel buffer + let data = Indexed::new( + 0, + 1, + 10, + 1000, + vec![ + 1; + TestHandler::MAX_PENDING_ROWS + + TestHandler::MAX_CHUNK_ROWS * collector_channel_size + ], + ); + processor_tx.send(data).await.unwrap(); + + // Now fill up the processor channel with minimum data to trigger send blocking + for _ in 0..processor_channel_size { + let more_data = Indexed::new(0, 2, 11, 1000, vec![1]); + processor_tx.send(more_data).await.unwrap(); + } + + // Now sending even more data should block. + let even_more_data = Indexed::new(0, 3, 12, 1000, vec![1]); + + let send_result = tokio::time::timeout( + Duration::from_millis(2000), + processor_tx.send(even_more_data), + ) + .await; + assert!( + send_result.is_err(), + "Send should timeout due to MAX_PENDING_ROWS limit" + ); + + cancel.cancel(); + } +} diff --git a/crates/sui-indexer-alt/src/pipeline/mod.rs b/crates/sui-indexer-alt/src/pipeline/mod.rs index 0909d43fe2f58..0597327efe400 100644 --- a/crates/sui-indexer-alt/src/pipeline/mod.rs +++ b/crates/sui-indexer-alt/src/pipeline/mod.rs @@ -57,6 +57,17 @@ pub struct PipelineConfig { pub skip_watermark: bool, } +impl Default for PipelineConfig { + fn default() -> Self { + Self { + write_concurrency: 5, + collect_interval: Duration::from_millis(500), + watermark_interval: Duration::from_millis(500), + skip_watermark: false, + } + } +} + /// Processed values associated with a single checkpoint. This is an internal type used to /// communicate between the processor and the collector parts of the pipeline. struct Indexed {