Skip to content

Commit

Permalink
Add VirtualInMemoryArray that keeps small arrays in memory (#336)
Browse files Browse the repository at this point in the history
* Add VirtualInMemoryArray that keeps small arrays in memory rather than
materializing to disk.

* Fail if array created with `asarray` is greater than 1MB

* Remove `initial_values` in LazyZarrArray as it is no longer used

* Update test for `num_arrays`

* Update comment
  • Loading branch information
tomwhite authored Dec 22, 2023
1 parent 685316d commit 39cf477
Show file tree
Hide file tree
Showing 8 changed files with 79 additions and 62 deletions.
13 changes: 7 additions & 6 deletions cubed/array_api/creation_functions.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,12 @@
from cubed.backend_array_api import namespace as nxp
from cubed.core import Plan, gensym
from cubed.core.ops import map_direct
from cubed.core.plan import new_temp_path
from cubed.storage.virtual import virtual_empty, virtual_full, virtual_offsets
from cubed.storage.zarr import lazy_from_array
from cubed.storage.virtual import (
virtual_empty,
virtual_full,
virtual_in_memory,
virtual_offsets,
)
from cubed.utils import to_chunksize
from cubed.vendor.dask.array.core import normalize_chunks

Expand Down Expand Up @@ -70,11 +73,9 @@ def asarray(
if dtype is None:
dtype = a.dtype

# write to zarr
chunksize = to_chunksize(normalize_chunks(chunks, shape=a.shape, dtype=dtype))
name = gensym()
zarr_path = new_temp_path(name=name, spec=spec)
target = lazy_from_array(a, dtype=dtype, chunks=chunksize, store=zarr_path)
target = virtual_in_memory(a, chunks=chunksize)

plan = Plan._new(name, "asarray", target)
return Array(name, target, spec, plan)
Expand Down
15 changes: 2 additions & 13 deletions cubed/core/plan.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@

import networkx as nx

from cubed.backend_array_api import backend_array_to_numpy_array
from cubed.primitive.blockwise import can_fuse_pipelines, fuse
from cubed.runtime.pipeline import visit_nodes
from cubed.runtime.types import CubedPipeline
Expand Down Expand Up @@ -364,19 +363,9 @@ def create_zarr_array(lazy_zarr_array, *, config=None):


def create_zarr_arrays(lazy_zarr_arrays, reserved_mem):
# projected memory is size of largest initial values, or dtype size if there aren't any
# projected memory is size of largest dtype size (for a fill value)
projected_mem = (
max(
[
# TODO: calculate nbytes from size and dtype itemsize
backend_array_to_numpy_array(lza.initial_values).nbytes
if lza.initial_values is not None
else lza.dtype.itemsize
for lza in lazy_zarr_arrays
],
default=0,
)
+ reserved_mem
max([lza.dtype.itemsize for lza in lazy_zarr_arrays], default=0) + reserved_mem
)
num_tasks = len(lazy_zarr_arrays)

Expand Down
45 changes: 45 additions & 0 deletions cubed/storage/virtual.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
from cubed.backend_array_api import namespace as nxp
from cubed.backend_array_api import numpy_array_to_backend_array
from cubed.types import T_DType, T_RegularChunks, T_Shape
from cubed.utils import memory_repr


class VirtualEmptyArray:
Expand Down Expand Up @@ -97,6 +98,43 @@ def __getitem__(self, key):
)


class VirtualInMemoryArray:
"""A small array that is held in memory but never materialized on disk."""

def __init__(
self,
array: np.ndarray, # TODO: generalise
chunks: T_RegularChunks,
max_nbytes: int = 10**6,
):
if array.nbytes > max_nbytes:
raise ValueError(
f"Size of in memory array is {memory_repr(array.nbytes)} which exceeds maximum of {memory_repr(max_nbytes)}. Consider loading the array from storage using `from_array`."
)
self.array = array
# use an in-memory Zarr array as a template since it normalizes its properties
# and is needed for oindex
template = zarr.empty(
array.shape,
dtype=array.dtype,
chunks=chunks,
store=zarr.storage.MemoryStore(),
)
self.shape = template.shape
self.dtype = template.dtype
self.chunks = template.chunks
self.template = template
if array.size > 0:
template[...] = array

def __getitem__(self, key):
return self.array.__getitem__(key)

@property
def oindex(self):
return self.template.oindex


def _key_to_index_tuple(selection):
if isinstance(selection, slice):
selection = (selection,)
Expand Down Expand Up @@ -131,3 +169,10 @@ def virtual_full(

def virtual_offsets(shape: T_Shape) -> VirtualOffsetsArray:
return VirtualOffsetsArray(shape)


def virtual_in_memory(
array: np.ndarray,
chunks: T_RegularChunks,
) -> VirtualInMemoryArray:
return VirtualInMemoryArray(array, chunks)
16 changes: 1 addition & 15 deletions cubed/storage/zarr.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,7 @@
from typing import Any, Optional, Union
from typing import Any, Union

import zarr
from numpy import ndarray

from cubed.backend_array_api import backend_array_to_numpy_array
from cubed.types import T_DType, T_RegularChunks, T_Shape, T_Store


Expand All @@ -21,7 +19,6 @@ def __init__(
dtype: T_DType,
chunks: T_RegularChunks,
store: T_Store,
initial_values: Optional[ndarray] = None,
fill_value: Any = None,
**kwargs,
):
Expand All @@ -35,7 +32,6 @@ def __init__(
self.chunks = template.chunks

self.store = store
self.initial_values = initial_values
self.fill_value = fill_value
self.kwargs = kwargs

Expand All @@ -60,8 +56,6 @@ def create(self, mode: str = "w-") -> zarr.Array:
fill_value=self.fill_value,
**self.kwargs,
)
if self.initial_values is not None and self.initial_values.size > 0:
target[...] = backend_array_to_numpy_array(self.initial_values)
return target

def open(self) -> zarr.Array:
Expand Down Expand Up @@ -91,14 +85,6 @@ def lazy_empty(
return LazyZarrArray(shape, dtype, chunks, store, **kwargs)


def lazy_from_array(
array: ndarray, *, dtype: T_DType, chunks: T_RegularChunks, store: T_Store, **kwargs
) -> LazyZarrArray:
return LazyZarrArray(
array.shape, dtype, chunks, store, initial_values=array, **kwargs
)


def lazy_full(
shape: T_Shape,
fill_value: Any,
Expand Down
16 changes: 1 addition & 15 deletions cubed/tests/storage/test_zarr.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
import pytest
from numpy.testing import assert_array_equal

from cubed.storage.zarr import lazy_empty, lazy_from_array, lazy_full
from cubed.storage.zarr import lazy_empty, lazy_full


def test_lazy_empty(tmp_path):
Expand All @@ -18,20 +18,6 @@ def test_lazy_empty(tmp_path):
arr.open()


def test_lazy_from_array(tmp_path):
zarr_path = tmp_path / "lazy.zarr"
a = np.array([[1, 2, 3], [4, 5, 6], [7, 8, 9]], dtype=int)
arr = lazy_from_array(a, dtype=a.dtype, chunks=(2, 2), store=zarr_path)

assert not zarr_path.exists()
with pytest.raises(ValueError):
arr.open()

arr.create()
assert zarr_path.exists()
assert_array_equal(arr.open()[:], a)


def test_lazy_full(tmp_path):
zarr_path = tmp_path / "lazy.zarr"
arr = lazy_full(
Expand Down
14 changes: 11 additions & 3 deletions cubed/tests/test_core.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,15 @@ def modal_executor(request):
return request.param


def test_as_array_fails(spec):
a = np.ones((1000, 1000))
with pytest.raises(
ValueError,
match="Size of in memory array is 8.0 MB which exceeds maximum of 1.0 MB.",
):
xp.asarray(a, chunks=(100, 100), spec=spec)


def test_regular_chunks(spec):
xp.ones((5, 5), chunks=((2, 2, 1), (5,)), spec=spec)
with pytest.raises(ValueError):
Expand Down Expand Up @@ -220,9 +229,8 @@ def test_rechunk_same_chunks(spec):
b = a.rechunk((2, 1))
task_counter = TaskCounter()
res = b.compute(callbacks=[task_counter])
# no tasks except array creation task should have run since chunks are same
num_created_arrays = 1
assert task_counter.value == num_created_arrays
# no tasks should have run since chunks are same
assert task_counter.value == 0

assert_array_equal(res, np.array([[1, 2, 3], [4, 5, 6], [7, 8, 9]]))

Expand Down
8 changes: 4 additions & 4 deletions cubed/tests/test_executor_features.py
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ def test_callbacks(spec, executor):
np.array([[2, 3, 4], [5, 6, 7], [8, 9, 10]]),
)

num_created_arrays = 3
num_created_arrays = 1
assert task_counter.value == num_created_arrays + 4


Expand Down Expand Up @@ -132,12 +132,12 @@ def test_resume(spec, executor):
c = xp.add(a, b)
d = xp.negative(c)

num_created_arrays = 4 # a, b, c, d
num_created_arrays = 2 # c, d
assert d.plan.num_tasks(optimize_graph=False) == num_created_arrays + 8

task_counter = TaskCounter()
c.compute(executor=executor, callbacks=[task_counter], optimize_graph=False)
num_created_arrays = 3 # a, b, c
num_created_arrays = 1 # c
assert task_counter.value == num_created_arrays + 4

# since c has already been computed, when computing d only 4 tasks are run, instead of 8
Expand All @@ -146,7 +146,7 @@ def test_resume(spec, executor):
executor=executor, callbacks=[task_counter], optimize_graph=False, resume=True
)
# the create arrays tasks are run again, even though they exist
num_created_arrays = 4 # a, b, c, d
num_created_arrays = 2 # c, d
assert task_counter.value == num_created_arrays + 4


Expand Down
14 changes: 8 additions & 6 deletions cubed/tests/test_optimization.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,13 @@ def test_fusion(spec):
c = xp.astype(b, np.float32)
d = xp.negative(c)

num_created_arrays = 4 # a, b, c, d
assert d.plan.num_arrays(optimize_graph=False) == num_created_arrays
num_arrays = 4 # a, b, c, d
num_created_arrays = 3 # b, c, d (a is not created on disk)
assert d.plan.num_arrays(optimize_graph=False) == num_arrays
assert d.plan.num_tasks(optimize_graph=False) == num_created_arrays + 12
num_created_arrays = 2 # a, d
assert d.plan.num_arrays(optimize_graph=True) == num_created_arrays
num_arrays = 2 # a, d
num_created_arrays = 1 # d (a is not created on disk)
assert d.plan.num_arrays(optimize_graph=True) == num_arrays
assert d.plan.num_tasks(optimize_graph=True) == num_created_arrays + 4

task_counter = TaskCounter()
Expand All @@ -41,9 +43,9 @@ def test_fusion_transpose(spec):
c = xp.astype(b, np.float32)
d = c.T

num_created_arrays = 4 # a, b, c, d
num_created_arrays = 3 # b, c, d
assert d.plan.num_tasks(optimize_graph=False) == num_created_arrays + 12
num_created_arrays = 2 # a, d
num_created_arrays = 1 # d
assert d.plan.num_tasks(optimize_graph=True) == num_created_arrays + 4

task_counter = TaskCounter()
Expand Down

0 comments on commit 39cf477

Please sign in to comment.