-
Notifications
You must be signed in to change notification settings - Fork 471
persist: expose API for writing/reading "free-standing" batches #32513
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
persist: expose API for writing/reading "free-standing" batches #32513
Conversation
src/persist-client/src/lib.rs
Outdated
@@ -121,6 +133,9 @@ pub const BUILD_INFO: BuildInfo = build_info!(); | |||
// Re-export for convenience. | |||
pub use mz_persist_types::{PersistLocation, ShardId}; | |||
|
|||
pub use crate::internal::encoding::Schemas; | |||
pub use crate::internal::state::HollowBatch; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You might object to these being exposed. We only need HollowBatch
because we use it to sniff out the size/num_rows from stashed batches without turning them into a Batch
and/or reading them.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't have immediate strong feelings here, but it'll be interesting to see the usage code in the next round!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The usage is in https://github.com/MaterializeInc/materialize/pull/32335/commits, its the last commit.
Specifically, the Schemas
are used here:
materialize/src/adapter/src/coord/peek.rs
Line 793 in 2d08009
let read_schemas: Schemas<SourceData, ()> = Schemas { materialize/src/compute/src/compute_state.rs
Line 1708 in 12778b0
let write_schemas: Schemas<SourceData, ()> = Schemas {
I managed to change to code so that it doesn't need HollowBatch
exposed, by writing down the encoded batch sizes in the response struct itself. Though that requires exposing an encoded_size_bytes
on Batch
, which seems better?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Batch::encoded_size_bytes
seems fine, yeah!
/// enough that we can reasonably chunk them up: O(KB) is definitely fine, | ||
/// O(MB) come talk to us. | ||
#[instrument(level = "debug", fields(shard = %shard_id))] | ||
pub async fn batch_builder<K, V, T, D>( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
These are basically duplicated code. I could add a WriteHandle::batch_builder_inner
that doesn't take a self
but all the arguments it needs. And then both WriteHandle
and the code here could call that one.
Same for read_batches_consolidated
below.
I didn't do this for now because I felt that the method signature was almost the largest part of these methods. But happy to change that!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The new APIs make sense to me! It would be nice to have a single WriteHandle::batch_builder_inner
method (and for the other too) to prevent the implementations from drifting, but I'll leave that final call up to you
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, let's share the impl if it's not too costly. I'm optimistic we can simplify the batch builder machinery here, and it'll be nice if we can centralize that in one place instead of two.
207a19e
to
c4221d1
Compare
7d16f22
to
2d4f236
Compare
/// enough that we can reasonably chunk them up: O(KB) is definitely fine, | ||
/// O(MB) come talk to us. | ||
#[instrument(level = "debug", fields(shard = %shard_id))] | ||
pub async fn batch_builder<K, V, T, D>( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The new APIs make sense to me! It would be nice to have a single WriteHandle::batch_builder_inner
method (and for the other too) to prevent the implementations from drifting, but I'll leave that final call up to you
/// enough that we can reasonably chunk them up: O(KB) is definitely fine, | ||
/// O(MB) come talk to us. | ||
#[instrument(level = "debug", fields(shard = %shard_id))] | ||
pub async fn batch_builder<K, V, T, D>( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, let's share the impl if it's not too costly. I'm optimistic we can simplify the batch builder machinery here, and it'll be nice if we can centralize that in one place instead of two.
/// well, this consolidates as it goes. However, note that only the | ||
/// serialized data is consolidated: the deserialized data will only be | ||
/// consolidated if your K/V codecs are one-to-one. | ||
// WIP: Do we want to let callers inject sth like MFP here? |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It looks like you've already done this via should_fetch_part
! Seems fine!
/// consolidated if your K/V codecs are one-to-one. | ||
// WIP: Do we want to let callers inject sth like MFP here? | ||
// WIP: This doesn't need async right now, but still might want it in the | ||
// API to have the option in the future? |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't have strong feelings about this!
src/persist-client/src/lib.rs
Outdated
Ok(Cursor { | ||
consolidator, | ||
// WIP: What to do about this? | ||
_lease: Lease::new(SeqNo(0)), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The job of the lease is to make sure that the parts that we're scanning through in the cursor don't get deleted out from under us.
I think the API as you currently have it is risky: right now nothing in this API ensures that the provided batches will outlive the cursor. (And if those batches are dropped, parts will be deleted and bad things will happen.) One option would be for the cursor to take ownership of the batches... we'd make the Cursor::_lease
field have some more generic shape like Box<dyn Any>
, and stuff the owned batches in there. Or perhaps the caller could provide a lease, and take responsibility for all that stuff.
(Does this method really need to be in persist-client
at all? I feel like code like this has mostly lived in clients so far?)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Also agreed! I played around with this for quite a while but didn't find a satisfying solution yet. The basic problem when workin with persist batches is that nothing can really prevent the caller from dropping if they're "malicious". Even when passing in owned Batches
, there's nothing stopping the caller from turning the same ProtoBatch
into a Batch
again and then deleting it, for example.
For reference, here's the code that uses the API:
materialize/src/adapter/src/coord/peek.rs
Line 888 in 2d08009
for batch in batches { |
The code does the right thing, but yes, nothing enforces that the batches aren't deleted before we finish reading them.
Also, another difficulty is that dropping batches requires async. So when I pass in owned batches, all I could do is add an async clean_up_lease
method on Cursor
, with a trait DroppableLease
that has an async release()
method, where's that's a no-op for the current Lease
. But I'd add a new struct that keeps the owned batches which can delete them when they're "released". This is a very rough sketch, so sorry if it's not clear. It would add quite a bit of complexity, though, imo.
Or, some more spitballing. I could add a generic parameter to Cursor
for the lease type, and an into_lease()
method that allows me to get back the lease. In the case of this method here, the lease type would be a Vec<Batches>
, so the code that reads them could change the cursor for the original vec of batches and then clean them up. What do you think about that one?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
(Does this method really need to be in persist-client at all? I feel like code like this has mostly lived in clients so far?)
It's in there because it uses private types like Consolidator
and CursorConsolidator
. The alternative of making these pub seemed worse. Also, if I want to put the impl of read_batches_consolidated
in an _inner
method in ReadHandle
, as Parker and you suggested above, it would also still have to be here on PersistClient
. What do you think now?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The code does the right thing, but yes, nothing enforces that the batches aren't deleted before we finish reading them.
Yeah, I think that looks about right. (Batches can be deleted while they're being read even there I think, because the reads happen in separate tasks and we sometimes exit early from the consolidation.) But you can imagine adding an error exit from the loop and things getting weirder...
Or, some more spitballing. [...] What do you think about that one?
Seems fine! I think my original proposal could also be salvaged by wrapping batches
up in an Arc
, passing a clone of the arc as the lease, and then unwrapping the arc at the end to drop the batches. But maybe that's all too much work to avoid the extra type parameter.
What do you think now?
Convinced!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
But you still think we should do something, right? I'll try both the Arc thing or the extra type parameter next week Monday.
|
||
/// Turns the given [`ProtoBatch`] back into a [`Batch`] which can be used | ||
/// to append it to the given shard or to read it via | ||
/// [PersistClient::read_batches_consolidated] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is somewhat risky API, since it would be pretty easy for us to decode the same ProtoBatch
twice and thus have two batches that think they have unique ownership.
Maybe that's necessary, but it feels like it at least warrants a warning in the name or docstring...
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This mirrors
materialize/src/persist-client/src/write.rs
Line 703 in abd4987
pub fn batch_from_transmittable_batch(&self, batch: ProtoBatch) -> Batch<K, V, T, D> { |
I'll add a warning in the docstring.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, that's fair! The danger of moving code... everything old is under scrutiny again.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Which is good, imo! 😅
src/persist-client/src/lib.rs
Outdated
@@ -121,6 +133,9 @@ pub const BUILD_INFO: BuildInfo = build_info!(); | |||
// Re-export for convenience. | |||
pub use mz_persist_types::{PersistLocation, ShardId}; | |||
|
|||
pub use crate::internal::encoding::Schemas; | |||
pub use crate::internal::state::HollowBatch; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't have immediate strong feelings here, but it'll be interesting to see the usage code in the next round!
2d4f236
to
afd4cc4
Compare
adb3543
to
fb5cfea
Compare
fb5cfea
to
6191325
Compare
Add API for writing and reading batches without creating a
WriteHandle
/ReadHandle
. Writing batches still requires aShardId
, which is used for namespacing in blob storage.Work towards
https://github.com/MaterializeInc/database-issues/issues/9180, where we want to use persist batches/blob to stash peek results when sending them back to
environmentd
.