-
Notifications
You must be signed in to change notification settings - Fork 14
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Icechunk store #633
base: main
Are you sure you want to change the base?
Icechunk store #633
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,86 @@ | ||
from typing import TYPE_CHECKING, Any, List, Sequence, Union | ||
|
||
import zarr | ||
from icechunk import IcechunkStore | ||
|
||
from cubed import compute | ||
from cubed.core.array import CoreArray | ||
from cubed.core.ops import blockwise | ||
from cubed.runtime.types import Callback | ||
|
||
if TYPE_CHECKING: | ||
from cubed.array_api.array_object import Array | ||
|
||
|
||
def store_icechunk( | ||
store: IcechunkStore, | ||
*, | ||
sources: Union["Array", Sequence["Array"]], | ||
targets: List[zarr.Array], | ||
executor=None, | ||
**kwargs: Any, | ||
) -> None: | ||
if isinstance(sources, CoreArray): | ||
sources = [sources] | ||
targets = [targets] | ||
|
||
if any(not isinstance(s, CoreArray) for s in sources): | ||
raise ValueError("All sources must be cubed array objects") | ||
|
||
if len(sources) != len(targets): | ||
raise ValueError( | ||
f"Different number of sources ({len(sources)}) and targets ({len(targets)})" | ||
) | ||
|
||
if isinstance(sources, CoreArray): | ||
sources = [sources] | ||
targets = [targets] | ||
|
||
arrays = [] | ||
for source, target in zip(sources, targets): | ||
identity = lambda a: a | ||
ind = tuple(range(source.ndim)) | ||
array = blockwise( | ||
identity, | ||
ind, | ||
source, | ||
ind, | ||
dtype=source.dtype, | ||
align_arrays=False, | ||
target_store=target, | ||
return_writes_stores=True, | ||
) | ||
arrays.append(array) | ||
|
||
# use a callback to merge icechunk stores | ||
store_callback = IcechunkStoreCallback() | ||
# add to other callbacks the user may have set | ||
callbacks = kwargs.pop("callbacks", []) | ||
callbacks = [store_callback] + list(callbacks) | ||
|
||
compute( | ||
*arrays, | ||
executor=executor, | ||
_return_in_memory_array=False, | ||
callbacks=callbacks, | ||
**kwargs, | ||
) | ||
|
||
# merge back into the store passed into this function | ||
merged_store = store_callback.store | ||
store.merge(merged_store.change_set_bytes()) | ||
|
||
|
||
class IcechunkStoreCallback(Callback): | ||
def on_compute_start(self, event): | ||
self.store = None | ||
|
||
def on_task_end(self, event): | ||
result = event.result | ||
if result is None: | ||
return | ||
for store in result: | ||
if self.store is None: | ||
self.store = store | ||
else: | ||
self.store.merge(store.change_set_bytes()) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @paraseba shall we commit to There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @tomwhite It looks like callbacks are "accumulated" on every worker. Is that right? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @dcherian My only concern with it is the implementation of There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Right we can fix that, but note that dask and presumably cubed are accumulating on remote workers already, so there is already some parallelism in how it is used. Moreover it'd be nice not to have There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
No, they are being accumulated on the client so there is no parallelism. I don't know what Icechunk is doing here, but would it be possible to merge a batch in one go rather than one at a time? Could that be more efficient? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
we'll have to build some rust API, we'll get to this eventually.
ah ok. In that case, how about calling EDIT: or is the There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Yes, that's basically it - Cubed also separates the data paths for array manipulations (contents of the blocks) from the metadata operations (block IDs and - for Icechunk - the changesets). So I think merging in batches would be more feasible.
+1 There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
:( I was afraid so. |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,87 @@ | ||
from typing import Iterable | ||
|
||
import icechunk | ||
import numpy as np | ||
import pytest | ||
import zarr | ||
from numpy.testing import assert_array_equal | ||
|
||
import cubed | ||
import cubed.array_api as xp | ||
import cubed.random | ||
from cubed.icechunk import store_icechunk | ||
from cubed.tests.utils import MAIN_EXECUTORS | ||
|
||
|
||
@pytest.fixture( | ||
scope="module", | ||
params=MAIN_EXECUTORS, | ||
ids=[executor.name for executor in MAIN_EXECUTORS], | ||
) | ||
def executor(request): | ||
return request.param | ||
|
||
|
||
def create_icechunk(a, tmp_path, /, *, dtype=None, chunks=None): | ||
# from dask.asarray | ||
if not isinstance(getattr(a, "shape", None), Iterable): | ||
# ensure blocks are arrays | ||
a = np.asarray(a, dtype=dtype) | ||
if dtype is None: | ||
dtype = a.dtype | ||
|
||
store = icechunk.IcechunkStore.create( | ||
storage=icechunk.StorageConfig.filesystem(tmp_path / "icechunk"), | ||
config=icechunk.StoreConfig(inline_chunk_threshold_bytes=1), | ||
read_only=False, | ||
) | ||
|
||
group = zarr.group(store=store, overwrite=True) | ||
arr = group.create_array("a", shape=a.shape, chunk_shape=chunks, dtype=dtype) | ||
|
||
arr[...] = a | ||
|
||
store.commit("commit 1") | ||
|
||
|
||
def test_from_zarr_icechunk(tmp_path, executor): | ||
create_icechunk( | ||
[[1, 2, 3], [4, 5, 6], [7, 8, 9]], | ||
tmp_path, | ||
chunks=(2, 2), | ||
) | ||
|
||
store = icechunk.IcechunkStore.open_existing( | ||
storage=icechunk.StorageConfig.filesystem(tmp_path / "icechunk"), | ||
) | ||
|
||
a = cubed.from_zarr(store, path="a") | ||
assert_array_equal( | ||
a.compute(executor=executor), np.array([[1, 2, 3], [4, 5, 6], [7, 8, 9]]) | ||
) | ||
|
||
|
||
def test_store_icechunk(tmp_path, executor): | ||
a = xp.asarray([[1, 2, 3], [4, 5, 6], [7, 8, 9]], chunks=(2, 2)) | ||
|
||
store = icechunk.IcechunkStore.create( | ||
storage=icechunk.StorageConfig.filesystem(tmp_path / "icechunk"), | ||
config=icechunk.StoreConfig(inline_chunk_threshold_bytes=1), | ||
read_only=False, | ||
) | ||
with store.preserve_read_only(): | ||
group = zarr.group(store=store, overwrite=True) | ||
target = group.create_array( | ||
"a", shape=a.shape, chunk_shape=a.chunksize, dtype=a.dtype | ||
) | ||
store_icechunk(store, sources=a, targets=target, executor=executor) | ||
store.commit("commit 1") | ||
|
||
# reopen store and check contents of array | ||
store = icechunk.IcechunkStore.open_existing( | ||
storage=icechunk.StorageConfig.filesystem(tmp_path / "icechunk"), | ||
) | ||
group = zarr.open_group(store=store, mode="r") | ||
assert_array_equal( | ||
cubed.from_array(group["a"])[:], np.array([[1, 2, 3], [4, 5, 6], [7, 8, 9]]) | ||
) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
could definitely address this later but
regions
is quite an important kwarg.