Skip to content

Commit

Permalink
Add API parameter to cap thread usage (#46)
Browse files Browse the repository at this point in the history
* Update C API to add max_threads to settings struct.

* Update C++ tests to use ZarrStreamSettings::max_threads.

* Update Python API to reflect `max_threads` parameter.

* Set `max_threads` to 0 to use all available threads.

* Update bindings.

* Update tests.

* Update examples.

* Revert "Update bindings."

This reverts commit 9752bb6.

* Respond to PR comments.
  • Loading branch information
aliddell authored Jan 21, 2025
1 parent 9cda7ae commit 53f58f3
Show file tree
Hide file tree
Showing 23 changed files with 58 additions and 8 deletions.
3 changes: 2 additions & 1 deletion examples/zarrv2-compressed-s3.c
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,8 @@ int main() {
.s3_settings = &s3,
.compression_settings = &compression,
.data_type = ZarrDataType_int32,
.version = ZarrVersion_2
.version = ZarrVersion_2,
.max_threads = 0, // use all available threads
};

// Set up dimensions (t, c, y, x)
Expand Down
3 changes: 2 additions & 1 deletion examples/zarrv2-raw-filesystem.c
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,8 @@ int main() {
.s3_settings = NULL,
.compression_settings = NULL,
.data_type = ZarrDataType_int32,
.version = ZarrVersion_2
.version = ZarrVersion_2,
.max_threads = 0, // use all available threads
};

// Set up dimensions (t, y, x)
Expand Down
1 change: 1 addition & 0 deletions examples/zarrv3-compressed-filesystem.c
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ main()
.compression_settings = &compression,
.data_type = ZarrDataType_uint16,
.version = ZarrVersion_3,
.max_threads = 0, // use all available threads
};

// Set up dimensions (t, y, x)
Expand Down
1 change: 1 addition & 0 deletions examples/zarrv3-compressed-multiscale-s3.c
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ main()
.data_type = ZarrDataType_uint16,
.version = ZarrVersion_3,
.multiscale = true,
.max_threads = 0, // use all available threads
};

// Set up dimensions (t, z, y, x)
Expand Down
1 change: 1 addition & 0 deletions examples/zarrv3-compressed-s3.c
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ main()
.compression_settings = &compression,
.data_type = ZarrDataType_uint16,
.version = ZarrVersion_3,
.max_threads = 0, // use all available threads
};

// Set up dimensions (t, y, x)
Expand Down
1 change: 1 addition & 0 deletions examples/zarrv3-raw-filesystem.c
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ main()
.compression_settings = NULL,
.data_type = ZarrDataType_uint16,
.version = ZarrVersion_3,
.max_threads = 0, // use all available threads
};

// Set up dimensions (t, y, x)
Expand Down
1 change: 1 addition & 0 deletions examples/zarrv3-raw-multiscale-filesystem.c
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ main()
.multiscale = true,
.data_type = ZarrDataType_uint16,
.version = ZarrVersion_3,
.max_threads = 0, // use all available threads
};

// Set up 5D array (t, c, z, y, x)
Expand Down
3 changes: 2 additions & 1 deletion examples/zarrv3-raw-s3.c
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,8 @@ int main() {
.s3_settings = &s3,
.compression_settings = NULL, // No compression
.data_type = ZarrDataType_uint16,
.version = ZarrVersion_3
.version = ZarrVersion_3,
.max_threads = 0, // use all available threads
};

// Set up dimensions (t, z, y, x)
Expand Down
1 change: 1 addition & 0 deletions include/acquire.zarr.h
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ extern "C"
bool multiscale; /**< Whether to stream to multiple levels of detail. */
ZarrDataType data_type; /**< The pixel data type of the dataset. */
ZarrVersion version; /**< The version of the Zarr format to use. 2 or 3. */
unsigned int max_threads; /**< The maximum number of threads to use in the stream. Set to 0 to use the supported number of concurrent threads. */
} ZarrStreamSettings;

typedef struct ZarrStream_s ZarrStream;
Expand Down
4 changes: 2 additions & 2 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,8 @@ build-backend = "setuptools.build_meta"

[project]
name = "acquire-zarr"
version = "0.0.5"
description = "Python bindings for acquire-zarr"
version = "0.1.0"
description = "Performant streaming to Zarr storage, on filesystem or cloud"
authors = [
{name = "Alan Liddell", email = "[email protected]"}
]
Expand Down
14 changes: 13 additions & 1 deletion python/acquire-zarr-py.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -274,6 +274,12 @@ class PyZarrStreamSettings
ZarrVersion version() const { return version_; }
void set_version(ZarrVersion version) { version_ = version; }

unsigned int max_threads() const { return max_threads_; }
void set_max_threads(unsigned int max_threads)
{
max_threads_ = max_threads;
}

private:
std::string store_path_;
std::optional<std::string> custom_metadata_{ std::nullopt };
Expand All @@ -284,6 +290,7 @@ class PyZarrStreamSettings
bool multiscale_ = false;
ZarrDataType data_type_{ ZarrDataType_uint8 };
ZarrVersion version_{ ZarrVersion_2 };
unsigned int max_threads_{ std::thread::hardware_concurrency() };
};

class PyZarrStream
Expand All @@ -304,6 +311,7 @@ class PyZarrStream
.multiscale = settings.multiscale(),
.data_type = settings.data_type(),
.version = settings.version(),
.max_threads = settings.max_threads(),
};

store_path_ = settings.store_path();
Expand Down Expand Up @@ -632,6 +640,7 @@ PYBIND11_MODULE(acquire_zarr, m)
std::string(data_type_to_str(self.data_type())) +
", version=ZarrVersion." +
std::string(self.version() == ZarrVersion_2 ? "V2" : "V3") +
", max_threads=" + std::to_string(self.max_threads()) +
")";
return repr;
})
Expand Down Expand Up @@ -692,7 +701,10 @@ PYBIND11_MODULE(acquire_zarr, m)
&PyZarrStreamSettings::set_data_type)
.def_property("version",
&PyZarrStreamSettings::version,
&PyZarrStreamSettings::set_version);
&PyZarrStreamSettings::set_version)
.def_property("max_threads",
&PyZarrStreamSettings::max_threads,
&PyZarrStreamSettings::set_max_threads);

py::class_<PyZarrStream>(m, "ZarrStream")
.def(py::init<PyZarrStreamSettings>())
Expand Down
1 change: 1 addition & 0 deletions python/acquire_zarr.pyi
Original file line number Diff line number Diff line change
Expand Up @@ -354,6 +354,7 @@ class StreamSettings:
s3: Optional[S3Settings]
store_path: str
version: ZarrVersion
max_threads: int

def __init__(self, **kwargs) -> None:
...
Expand Down
7 changes: 7 additions & 0 deletions python/tests/test_settings.py
Original file line number Diff line number Diff line change
Expand Up @@ -135,3 +135,10 @@ def test_set_version(settings):
settings.version = acquire_zarr.ZarrVersion.V3

assert settings.version == acquire_zarr.ZarrVersion.V3


def test_set_max_threads(settings):
assert settings.max_threads > 0 # depends on your system, but will be nonzero

settings.max_threads = 4
assert settings.max_threads == 4
16 changes: 14 additions & 2 deletions src/streaming/zarr.stream.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -374,9 +374,21 @@ ZarrStream::ZarrStream_s(struct ZarrStreamSettings_s* settings)
commit_settings_(settings);

// spin up thread pool
unsigned int max_threads = settings->max_threads;
const auto hardware_concurrency = std::thread::hardware_concurrency();

if (max_threads == 0) {
if (hardware_concurrency > 0) {
LOG_DEBUG("Using ", hardware_concurrency, " threads");
max_threads = hardware_concurrency;
} else {
LOG_WARNING(
"Unable to determine hardware concurrency, using 1 thread");
max_threads = 1;
}
}
thread_pool_ = std::make_shared<zarr::ThreadPool>(
std::thread::hardware_concurrency(),
[this](const std::string& err) { this->set_error_(err); });
max_threads, [this](const std::string& err) { this->set_error_(err); });

// allocate a frame buffer
frame_buffer_.resize(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ setup()
.s3_settings = nullptr,
.data_type = ZarrDataType_int32,
.version = ZarrVersion_2,
.max_threads = 0, // use all available threads
};

ZarrCompressionSettings compression_settings = {
Expand Down
1 change: 1 addition & 0 deletions tests/integration/stream-zarr-v2-compressed-to-s3.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -157,6 +157,7 @@ setup()
.store_path = TEST,
.data_type = ZarrDataType_int32,
.version = ZarrVersion_2,
.max_threads = 0, // use all available threads
};

ZarrS3Settings s3_settings{
Expand Down
1 change: 1 addition & 0 deletions tests/integration/stream-zarr-v2-raw-to-filesystem.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ setup()
.compression_settings = nullptr,
.data_type = ZarrDataType_int32,
.version = ZarrVersion_2,
.max_threads = 0, // use all available threads
};

CHECK_OK(ZarrStreamSettings_create_dimension_array(&settings, 5));
Expand Down
1 change: 1 addition & 0 deletions tests/integration/stream-zarr-v2-raw-to-s3.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -158,6 +158,7 @@ setup()
.compression_settings = nullptr,
.data_type = ZarrDataType_int32,
.version = ZarrVersion_2,
.max_threads = 0, // use all available threads
};

ZarrS3Settings s3_settings{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ setup()
.s3_settings = nullptr,
.data_type = ZarrDataType_uint16,
.version = ZarrVersion_3,
.max_threads = 0, // use all available threads
};

ZarrCompressionSettings compression_settings = {
Expand Down
1 change: 1 addition & 0 deletions tests/integration/stream-zarr-v3-compressed-to-s3.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -172,6 +172,7 @@ setup()
.store_path = TEST,
.data_type = ZarrDataType_uint16,
.version = ZarrVersion_3,
.max_threads = 0, // use all available threads
};

ZarrS3Settings s3_settings{
Expand Down
1 change: 1 addition & 0 deletions tests/integration/stream-zarr-v3-raw-to-filesystem.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ setup()
.compression_settings = nullptr,
.data_type = ZarrDataType_uint16,
.version = ZarrVersion_3,
.max_threads = 0, // use all available threads
};

CHECK_OK(ZarrStreamSettings_create_dimension_array(&settings, 5));
Expand Down
1 change: 1 addition & 0 deletions tests/integration/stream-zarr-v3-raw-to-s3.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -175,6 +175,7 @@ setup()
.compression_settings = nullptr,
.data_type = ZarrDataType_uint16,
.version = ZarrVersion_3,
.max_threads = 0, // use all available threads
};

ZarrS3Settings s3_settings{
Expand Down
1 change: 1 addition & 0 deletions tests/unit-tests/create-stream.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ main()
ZarrStreamSettings settings;
memset(&settings, 0, sizeof(settings));
settings.version = ZarrVersion_2;
settings.max_threads = std::thread::hardware_concurrency();

try {
// try to create a stream with no store path
Expand Down

0 comments on commit 53f58f3

Please sign in to comment.