Skip to content

Commit fb5cfea

Browse files
committed
inner method for read_batches_consolidated
1 parent b5bc656 commit fb5cfea

File tree

2 files changed

+65
-63
lines changed

2 files changed

+65
-63
lines changed

src/persist-client/src/lib.rs

Lines changed: 23 additions & 50 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ use std::sync::Arc;
2121

2222
use differential_dataflow::difference::Semigroup;
2323
use differential_dataflow::lattice::Lattice;
24+
use itertools::Itertools;
2425
use mz_build_info::{BuildInfo, build_info};
2526
use mz_dyncfg::ConfigSet;
2627
use mz_ore::{instrument, soft_assert_or_log};
@@ -32,22 +33,20 @@ use semver::Version;
3233
use timely::progress::{Antichain, Timestamp};
3334

3435
use crate::async_runtime::IsolatedRuntime;
35-
use crate::batch::{BATCH_DELETE_ENABLED, BLOB_TARGET_SIZE, Batch, BatchBuilder, ProtoBatch};
36+
use crate::batch::{BATCH_DELETE_ENABLED, Batch, BatchBuilder, ProtoBatch};
3637
use crate::cache::{PersistClientCache, StateCache};
37-
use crate::cfg::{COMPACTION_MEMORY_BOUND_BYTES, PersistConfig};
38+
use crate::cfg::PersistConfig;
3839
use crate::critical::{CriticalReaderId, SinceHandle};
3940
use crate::error::InvalidUsage;
40-
use crate::fetch::{BatchFetcher, BatchFetcherConfig, FetchBatchFilter, Lease};
41+
use crate::fetch::{BatchFetcher, BatchFetcherConfig, Lease};
4142
use crate::internal::compact::Compactor;
4243
use crate::internal::encoding::parse_id;
4344
use crate::internal::gc::GarbageCollector;
4445
use crate::internal::machine::{Machine, retry_external};
4546
use crate::internal::state_versions::StateVersions;
46-
use crate::iter::{Consolidator, StructuredSort};
4747
use crate::metrics::Metrics;
4848
use crate::read::{
49-
Cursor, CursorConsolidator, LazyPartStats, LeasedReaderId, READER_LEASE_DURATION, ReadHandle,
50-
Since,
49+
Cursor, LazyPartStats, LeasedReaderId, READER_LEASE_DURATION, ReadHandle, Since,
5150
};
5251
use crate::rpc::PubSubSender;
5352
use crate::schema::CaESchema;
@@ -643,57 +642,31 @@ impl PersistClient {
643642
should_fetch_part: impl for<'a> Fn(Option<&'a LazyPartStats>) -> bool,
644643
) -> Result<Cursor<K, V, T, D>, Since<T>>
645644
where
646-
K: Debug + Codec,
647-
V: Debug + Codec,
645+
K: Debug + Codec + Ord,
646+
V: Debug + Codec + Ord,
648647
T: Timestamp + Lattice + Codec64 + Sync,
649648
D: Semigroup + Ord + Codec64 + Send + Sync,
650649
{
651-
let context = format!("{}[as_of={:?}]", shard_id, as_of.elements());
652-
let filter = FetchBatchFilter::Snapshot {
653-
as_of: as_of.clone(),
654-
};
655-
650+
// WIP!
651+
let lease = Lease::new(SeqNo(0));
656652
let shard_metrics = self.metrics.shards.shard(&shard_id, "peek_stash");
657653

658-
let consolidator = {
659-
let mut consolidator = Consolidator::new(
660-
context,
661-
shard_id,
662-
StructuredSort::new(read_schemas.clone()),
663-
Arc::clone(&self.blob),
664-
Arc::clone(&self.metrics),
665-
Arc::clone(&shard_metrics),
666-
self.metrics.read.snapshot.clone(),
667-
filter,
668-
COMPACTION_MEMORY_BOUND_BYTES.get(&self.cfg),
669-
);
670-
for batch in batches {
671-
for (meta, run) in batch.batch.runs() {
672-
consolidator.enqueue_run(
673-
&batch.batch.desc,
674-
meta,
675-
run.into_iter()
676-
.filter(|p| should_fetch_part(p.stats()))
677-
.cloned(),
678-
);
679-
}
680-
}
681-
CursorConsolidator::Structured {
682-
consolidator,
683-
// This default may end up consolidating more records than previously
684-
// for cases like fast-path peeks, where only the first few entries are used.
685-
// If this is a noticeable performance impact, thread the max-len in from the caller.
686-
max_len: self.cfg.compaction_yield_after_n_updates,
687-
max_bytes: BLOB_TARGET_SIZE.get(&self.cfg).max(1),
688-
}
689-
};
654+
let hollow_batches = batches.iter().map(|b| b.batch.clone()).collect_vec();
690655

691-
Ok(Cursor {
692-
consolidator,
693-
// WIP: What to do about this?
694-
_lease: Lease::new(SeqNo(0)),
656+
ReadHandle::read_batches_consolidated(
657+
&self.cfg,
658+
Arc::clone(&self.metrics),
659+
shard_metrics,
660+
self.metrics.read.snapshot.clone(),
661+
Arc::clone(&self.blob),
662+
lease,
663+
shard_id,
664+
as_of,
695665
read_schemas,
696-
})
666+
&hollow_batches,
667+
should_fetch_part,
668+
)
669+
.await
697670
}
698671

699672
/// Returns the requested schema, if known at the current state.

src/persist-client/src/read.rs

Lines changed: 42 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,7 @@ use crate::cfg::{COMPACTION_MEMORY_BOUND_BYTES, RetryParameters};
4343
use crate::fetch::{FetchBatchFilter, FetchedPart, Lease, LeasedBatchPart, fetch_leased_part};
4444
use crate::internal::encoding::Schemas;
4545
use crate::internal::machine::{ExpireFn, Machine};
46-
use crate::internal::metrics::Metrics;
46+
use crate::internal::metrics::{Metrics, ReadMetrics, ShardMetrics};
4747
use crate::internal::state::{BatchPart, HollowBatch};
4848
use crate::internal::watch::StateWatch;
4949
use crate::iter::{Consolidator, StructuredSort};
@@ -985,24 +985,53 @@ where
985985
should_fetch_part: impl for<'a> Fn(Option<&'a LazyPartStats>) -> bool,
986986
) -> Result<Cursor<K, V, T, D>, Since<T>> {
987987
let batches = self.machine.snapshot(&as_of).await?;
988+
let lease = self.lease_seqno();
989+
990+
Self::read_batches_consolidated(
991+
&self.cfg,
992+
Arc::clone(&self.metrics),
993+
Arc::clone(&self.machine.applier.shard_metrics),
994+
self.metrics.read.snapshot.clone(),
995+
Arc::clone(&self.blob),
996+
lease,
997+
self.shard_id(),
998+
as_of,
999+
self.read_schemas.clone(),
1000+
&batches,
1001+
should_fetch_part,
1002+
)
1003+
.await
1004+
}
9881005

989-
let context = format!("{}[as_of={:?}]", self.shard_id(), as_of.elements());
1006+
pub(crate) fn read_batches_consolidated(
1007+
persist_cfg: &PersistConfig,
1008+
metrics: Arc<Metrics>,
1009+
shard_metrics: Arc<ShardMetrics>,
1010+
read_metrics: ReadMetrics,
1011+
blob: Arc<dyn Blob>,
1012+
lease: Lease,
1013+
shard_id: ShardId,
1014+
as_of: Antichain<T>,
1015+
schemas: Schemas<K, V>,
1016+
batches: &[HollowBatch<T>],
1017+
should_fetch_part: impl for<'a> Fn(Option<&'a LazyPartStats>) -> bool,
1018+
) -> Result<Cursor<K, V, T, D>, Since<T>> {
1019+
let context = format!("{}[as_of={:?}]", shard_id, as_of.elements());
9901020
let filter = FetchBatchFilter::Snapshot {
9911021
as_of: as_of.clone(),
9921022
};
993-
let lease = self.lease_seqno();
9941023

9951024
let consolidator = {
9961025
let mut consolidator = Consolidator::new(
9971026
context,
998-
self.shard_id(),
999-
StructuredSort::new(self.read_schemas.clone()),
1000-
Arc::clone(&self.blob),
1001-
Arc::clone(&self.metrics),
1002-
Arc::clone(&self.machine.applier.shard_metrics),
1003-
self.metrics.read.snapshot.clone(),
1027+
shard_id,
1028+
StructuredSort::new(schemas.clone()),
1029+
blob,
1030+
metrics,
1031+
shard_metrics,
1032+
read_metrics,
10041033
filter,
1005-
COMPACTION_MEMORY_BOUND_BYTES.get(&self.cfg),
1034+
COMPACTION_MEMORY_BOUND_BYTES.get(persist_cfg),
10061035
);
10071036
for batch in batches {
10081037
for (meta, run) in batch.runs() {
@@ -1020,15 +1049,15 @@ where
10201049
// This default may end up consolidating more records than previously
10211050
// for cases like fast-path peeks, where only the first few entries are used.
10221051
// If this is a noticeable performance impact, thread the max-len in from the caller.
1023-
max_len: self.cfg.compaction_yield_after_n_updates,
1024-
max_bytes: BLOB_TARGET_SIZE.get(&self.cfg).max(1),
1052+
max_len: persist_cfg.compaction_yield_after_n_updates,
1053+
max_bytes: BLOB_TARGET_SIZE.get(persist_cfg).max(1),
10251054
}
10261055
};
10271056

10281057
Ok(Cursor {
10291058
consolidator,
10301059
_lease: lease,
1031-
read_schemas: self.read_schemas.clone(),
1060+
read_schemas: schemas,
10321061
})
10331062
}
10341063

0 commit comments

Comments
 (0)