From 25757868d69eb81bf9ea34ef9c941dc6754534e3 Mon Sep 17 00:00:00 2001 From: Francesc Alted Date: Wed, 13 Sep 2023 14:41:19 +0200 Subject: [PATCH 1/3] Preliminary version of Blosc2 module --- .flake8 | 1 + .gitmodules | 3 + MANIFEST.in | 1 + c-blosc2 | 1 + numcodecs/blosc.pyx | 2 +- numcodecs/blosc2.pyx | 604 +++++++++++++++++++++++++++++++++++++++++++ pyproject.toml | 1 + 7 files changed, 612 insertions(+), 1 deletion(-) create mode 160000 c-blosc2 create mode 100644 numcodecs/blosc2.pyx diff --git a/.flake8 b/.flake8 index e6b17013..69c0726c 100644 --- a/.flake8 +++ b/.flake8 @@ -7,6 +7,7 @@ exclude = adhoc, build, c-blosc, + c-blosc2, dist, docs, fixture, diff --git a/.gitmodules b/.gitmodules index 04c71aa1..d155cf8d 100644 --- a/.gitmodules +++ b/.gitmodules @@ -1,3 +1,6 @@ [submodule "c-blosc"] path = c-blosc url = https://github.com/Blosc/c-blosc.git +[submodule "c-blosc2"] + path = c-blosc2 + url = https://github.com/Blosc/c-blosc2.git diff --git a/MANIFEST.in b/MANIFEST.in index 41513ecb..a8e0849c 100644 --- a/MANIFEST.in +++ b/MANIFEST.in @@ -1,4 +1,5 @@ recursive-include c-blosc * +recursive-include c-blosc2 * recursive-include numcodecs *.pyx recursive-include numcodecs *.pxd include numcodecs/tests/package_with_entrypoint-0.1.dist-info/entry_points.txt diff --git a/c-blosc2 b/c-blosc2 new file mode 160000 index 00000000..34b273ee --- /dev/null +++ b/c-blosc2 @@ -0,0 +1 @@ +Subproject commit 34b273ee60289aaddc1dfc77e790b01263ae3005 diff --git a/numcodecs/blosc.pyx b/numcodecs/blosc.pyx index 45e5ea10..ab21e56f 100644 --- a/numcodecs/blosc.pyx +++ b/numcodecs/blosc.pyx @@ -173,7 +173,7 @@ def cbuffer_complib(source): def cbuffer_metainfo(source): """Return some meta-information about the compressed buffer in `source`, including - the typesize, whether the shuffle or bit-shuffle filters were used, and the + the typesize, whether the shuffle or bit-shuffle filters were used, and whether the buffer was memcpyed. Returns diff --git a/numcodecs/blosc2.pyx b/numcodecs/blosc2.pyx new file mode 100644 index 00000000..de9e6167 --- /dev/null +++ b/numcodecs/blosc2.pyx @@ -0,0 +1,604 @@ +# cython: embedsignature=True +# cython: profile=False +# cython: linetrace=False +# cython: binding=False +# cython: language_level=3 +import threading +import multiprocessing +import os + + +from cpython.buffer cimport PyBUF_ANY_CONTIGUOUS, PyBUF_WRITEABLE +from cpython.bytes cimport PyBytes_FromStringAndSize, PyBytes_AS_STRING +from libcpp cimport bool as c_bool + + +from .compat_ext cimport Buffer +from .compat_ext import Buffer +from .compat import ensure_contiguous_ndarray +from .abc import Codec + + +cdef extern from "": + ctypedef signed char int8_t + ctypedef signed short int16_t + ctypedef signed int int32_t + ctypedef signed long int64_t + ctypedef unsigned char uint8_t + ctypedef unsigned short uint16_t + ctypedef unsigned int uint32_t + ctypedef unsigned long long uint64_t + + +cdef extern from "blosc2.h": + cdef enum: + BLOSC_MAX_OVERHEAD, + BLOSC_VERSION_STRING, + BLOSC_VERSION_DATE, + BLOSC_NOSHUFFLE, + BLOSC_SHUFFLE, + BLOSC_BITSHUFFLE, + BLOSC_MAX_BUFFERSIZE, + BLOSC_MAX_THREADS, + BLOSC_MAX_TYPESIZE, + BLOSC_DOSHUFFLE, + BLOSC_DOBITSHUFFLE, + BLOSC_MEMCPYED, + BLOSC2_MAX_FILTERS, + + + ctypedef struct blosc2_context: + pass + + ctypedef struct blosc2_prefilter_params: + void* user_data + const uint8_t* input + uint8_t* output + int32_t output_size + int32_t output_typesize + int32_t output_offset + int64_t nchunk + int32_t nblock + int32_t tid + uint8_t* ttmp + size_t ttmp_nbytes + blosc2_context* ctx + + ctypedef struct blosc2_postfilter_params: + void *user_data + const uint8_t *input + uint8_t *output + int32_t size + int32_t typesize + int32_t offset + int64_t nchunk + int32_t nblock + int32_t tid + uint8_t * ttmp + size_t ttmp_nbytes + blosc2_context *ctx + + ctypedef int(*blosc2_prefilter_fn)(blosc2_prefilter_params* params) + + ctypedef int(*blosc2_postfilter_fn)(blosc2_postfilter_params *params) + + ctypedef struct blosc2_cparams: + uint8_t compcode + uint8_t compcode_meta + uint8_t clevel + int use_dict + int32_t typesize + int16_t nthreads + int32_t blocksize + int32_t splitmode + void *schunk + uint8_t filters[BLOSC2_MAX_FILTERS] + uint8_t filters_meta[BLOSC2_MAX_FILTERS] + blosc2_prefilter_fn prefilter + blosc2_prefilter_params* preparams + int tuner_id + void* tuner_params + c_bool instr_codec + void* codec_params + void* filter_params[BLOSC2_MAX_FILTERS] + + cdef const blosc2_cparams BLOSC2_CPARAMS_DEFAULTS + + ctypedef struct blosc2_dparams: + int16_t nthreads + void* schunk + blosc2_postfilter_fn postfilter + blosc2_postfilter_params *postparams + + cdef const blosc2_dparams BLOSC2_DPARAMS_DEFAULTS + + void blosc2_init() + void blosc2_destroy() + int blosc2_get_nthreads() + int blosc2_set_nthreads(int nthreads) + int blosc1_set_compressor(const char *compname) + void blosc1_set_blocksize(size_t blocksize) + char* blosc2_list_compressors() + int blosc2_compress(int clevel, int doshuffle, int32_t typesize, + const void *src, int32_t srcsize, void *dest, + int32_t destsize) nogil + int blosc2_decompress(const void *src, int32_t srcsize, + void *dest, int32_t destsize) nogil + int blosc2_getitem(const void *src, int32_t srcsize, int start, int nitems, + void *dest, int32_t destsize) + int blosc2_compname_to_compcode(const char *compname) + int blosc2_compress_ctx(blosc2_context *context, const void *src, int32_t srcsize, + void *dest, int32_t destsize) nogil + int blosc2_decompress_ctx(blosc2_context *context, const void *src, + int32_t srcsize, void *dest, int32_t destsize) nogil + int blosc2_cbuffer_sizes(const void* cbuffer, int32_t* nbytes, + int32_t* cbytes, int32_t* blocksize) + const char* blosc2_cbuffer_complib(const void* cbuffer) + void blosc1_cbuffer_metainfo(const void *cbuffer, size_t *typesize, int *flags) + + +MAX_OVERHEAD = BLOSC_MAX_OVERHEAD +MAX_BUFFERSIZE = BLOSC_MAX_BUFFERSIZE +MAX_THREADS = BLOSC_MAX_THREADS +MAX_TYPESIZE = BLOSC_MAX_TYPESIZE +VERSION_STRING = BLOSC_VERSION_STRING +VERSION_DATE = BLOSC_VERSION_DATE +VERSION_STRING = VERSION_STRING.decode() +VERSION_DATE = VERSION_DATE.decode() +__version__ = VERSION_STRING +NOSHUFFLE = BLOSC_NOSHUFFLE +SHUFFLE = BLOSC_SHUFFLE +BITSHUFFLE = BLOSC_BITSHUFFLE +# automatic shuffle +AUTOSHUFFLE = -1 +# automatic block size - let blosc decide +AUTOBLOCKS = 0 + +# synchronization +try: + mutex = multiprocessing.Lock() +except OSError: + mutex = None +except ImportError: + mutex = None + +# store ID of process that first loads the module, so we can detect a fork later +_importer_pid = os.getpid() + + +def init(): + """Initialize the Blosc2 library environment.""" + blosc2_init() + + +def destroy(): + """Destroy the Blosc2 library environment.""" + blosc2_destroy() + + +def compname_to_compcode(cname): + """Return the compressor code associated with the compressor name. If the compressor + name is not recognized, or there is not support for it in this build, -1 is returned + instead.""" + if isinstance(cname, str): + cname = cname.encode('ascii') + return blosc2_compname_to_compcode(cname) + + +def list_compressors(): + """Get a list of compressors supported in the current build.""" + s = blosc2_list_compressors() + s = s.decode('ascii') + return s.split(',') + + +def get_nthreads(): + """Get the number of threads that Blosc2 uses internally for compression and + decompression.""" + return blosc2_get_nthreads() + + +def set_nthreads(int nthreads): + """Set the number of threads that Blosc2 uses internally for compression and + decompression.""" + return blosc2_set_nthreads(nthreads) + + +def cbuffer_sizes(source): + """Return information about a compressed buffer, namely the number of uncompressed + bytes (`nbytes`) and compressed (`cbytes`). It also returns the `blocksize` (which + is used internally for doing the compression by blocks). + + Returns + ------- + nbytes : int + cbytes : int + blocksize : int + + """ + cdef: + Buffer buffer + size_t nbytes, cbytes, blocksize + + # obtain buffer + buffer = Buffer(source, PyBUF_ANY_CONTIGUOUS) + + # determine buffer size + blosc2_cbuffer_sizes(buffer.ptr, &nbytes, &cbytes, &blocksize) + + # release buffers + buffer.release() + + return nbytes, cbytes, blocksize + + +def cbuffer_complib(source): + """Return the name of the compression library used to compress `source`.""" + cdef: + Buffer buffer + + # obtain buffer + buffer = Buffer(source, PyBUF_ANY_CONTIGUOUS) + + # determine buffer size + complib = blosc2_cbuffer_complib(buffer.ptr) + + # release buffers + buffer.release() + + complib = complib.decode('ascii') + + return complib + + +def cbuffer_metainfo(source): + """Return some meta-information about the compressed buffer in `source`, including + the typesize, whether the shuffle or bit-shuffle filters were used, and + whether the buffer was memcpyed. + + Returns + ------- + typesize + shuffle + memcpyed + + """ + cdef: + Buffer buffer + size_t typesize + int flags + + # obtain buffer + buffer = Buffer(source, PyBUF_ANY_CONTIGUOUS) + + # determine buffer size + blosc1_cbuffer_metainfo(buffer.ptr, &typesize, &flags) + + # release buffers + buffer.release() + + # decompose flags + if flags & BLOSC_DOSHUFFLE: + shuffle = SHUFFLE + elif flags & BLOSC_DOBITSHUFFLE: + shuffle = BITSHUFFLE + else: + shuffle = NOSHUFFLE + memcpyed = flags & BLOSC_MEMCPYED + + return typesize, shuffle, memcpyed + + +def err_bad_cname(cname): + raise ValueError('bad compressor or compressor not supported: %r; expected one of ' + '%s' % (cname, list_compressors())) + + +def compress(source, char* cname, int clevel, int shuffle=SHUFFLE, + int blocksize=AUTOBLOCKS): + """Compress data. + + Parameters + ---------- + source : bytes-like + Data to be compressed. Can be any object supporting the buffer + protocol. + cname : bytes + Name of compression library to use. + clevel : int + Compression level. + shuffle : int + Either NOSHUFFLE (0), SHUFFLE (1), BITSHUFFLE (2) or AUTOSHUFFLE (-1). If AUTOSHUFFLE, + bit-shuffle will be used for buffers with itemsize 1, and byte-shuffle will + be used otherwise. The default is `SHUFFLE`. + blocksize : int + The requested size of the compressed blocks. If 0, an automatic blocksize will + be used. + + Returns + ------- + dest : bytes + Compressed data. + + """ + + cdef: + char *source_ptr + char *dest_ptr + Buffer source_buffer + size_t nbytes, itemsize + int cbytes + bytes dest + + # check valid cname early + cname_str = cname.decode('ascii') + if cname_str not in list_compressors(): + err_bad_cname(cname_str) + + # setup source buffer + source_buffer = Buffer(source, PyBUF_ANY_CONTIGUOUS) + source_ptr = source_buffer.ptr + nbytes = source_buffer.nbytes + itemsize = source_buffer.itemsize + + # determine shuffle + if shuffle == AUTOSHUFFLE: + if itemsize == 1: + shuffle = BITSHUFFLE + else: + shuffle = SHUFFLE + elif shuffle not in [NOSHUFFLE, SHUFFLE, BITSHUFFLE]: + raise ValueError('invalid shuffle argument; expected -1, 0, 1 or 2, found %r' % + shuffle) + + try: + + # setup destination + dest = PyBytes_FromStringAndSize(NULL, nbytes + BLOSC_MAX_OVERHEAD) + dest_ptr = PyBytes_AS_STRING(dest) + + # perform compression + with nogil: + # cbytes = blosc2_compress_ctx(clevel, shuffle, itemsize, nbytes, source_ptr, + # dest_ptr, nbytes + BLOSC_MAX_OVERHEAD, + # cname, blocksize, 1) + cdef blosc2_context *ctx = BLOSC2_CPARAMS_DEFAULTS + ctx.clevel = clevel + ctx.shuffle = shuffle + ctx.itemsize = itemsize + ctx.blocksize = blocksize + ctx.compcode = compname_to_compcode(cname) + ctx.nthreads = blosc2_get_nthreads() + cbytes = blosc2_compress_ctx(ctx, source_ptr, nbytes, + dest_ptr, nbytes + BLOSC_MAX_OVERHEAD) + + finally: + + # release buffers + source_buffer.release() + + # check compression was successful + if cbytes <= 0: + raise RuntimeError('error during blosc compression: %d' % cbytes) + + # resize after compression + dest = dest[:cbytes] + + return dest + + +def decompress(source, dest=None): + """Decompress data. + + Parameters + ---------- + source : bytes-like + Compressed data, including blosc header. Can be any object supporting the buffer + protocol. + dest : array-like, optional + Object to decompress into. + + Returns + ------- + dest : bytes + Object containing decompressed data. + + """ + cdef: + int ret + char *source_ptr + char *dest_ptr + Buffer source_buffer + Buffer dest_buffer = None + size_t nbytes, cbytes, blocksize + + # setup source buffer + source_buffer = Buffer(source, PyBUF_ANY_CONTIGUOUS) + source_ptr = source_buffer.ptr + + # determine buffer size + blosc2_cbuffer_sizes(source_ptr, &nbytes, &cbytes, &blocksize) + + # setup destination buffer + if dest is None: + # allocate memory + dest = PyBytes_FromStringAndSize(NULL, nbytes) + dest_ptr = PyBytes_AS_STRING(dest) + dest_nbytes = nbytes + else: + arr = ensure_contiguous_ndarray(dest) + dest_buffer = Buffer(arr, PyBUF_ANY_CONTIGUOUS | PyBUF_WRITEABLE) + dest_ptr = dest_buffer.ptr + dest_nbytes = dest_buffer.nbytes + + try: + + # guard condition + if dest_nbytes < nbytes: + raise ValueError('destination buffer too small; expected at least %s, ' + 'got %s' % (nbytes, dest_nbytes)) + + # perform decompression + with nogil: + cdef blosc2_context *ctx = BLOSC2_DPARAMS_DEFAULTS + ctx.nthreads = blosc2_get_nthreads() + ret = blosc2_decompress_ctx(ctx, source_ptr, nbytes, + dest_ptr, nbytes + BLOSC_MAX_OVERHEAD) + + finally: + + # release buffers + source_buffer.release() + if dest_buffer is not None: + dest_buffer.release() + + # handle errors + if ret <= 0: + raise RuntimeError('error during blosc decompression: %d' % ret) + + return dest + + +def decompress_partial(source, start, nitems, dest=None): + """**Experimental** + Decompress data of only a part of a buffer. + + Parameters + ---------- + source : bytes-like + Compressed data, including blosc header. Can be any object supporting the buffer + protocol. + start: int, + Offset in item where we want to start decoding + nitems: int + Number of items we want to decode + dest : array-like, optional + Object to decompress into. + + + Returns + ------- + dest : bytes + Object containing decompressed data. + + """ + cdef: + int ret + int encoding_size + int nitems_bytes + int start_bytes + char *source_ptr + char *dest_ptr + Buffer source_buffer + Buffer dest_buffer = None + + # setup source buffer + source_buffer = Buffer(source, PyBUF_ANY_CONTIGUOUS) + source_ptr = source_buffer.ptr + nbytes = source_buffer.nbytes + + # get encoding size from source buffer header + encoding_size = source[3] + + # convert variables to handle type and encoding sizes + nitems_bytes = nitems * encoding_size + start_bytes = (start * encoding_size) + + # setup destination buffer + if dest is None: + dest = PyBytes_FromStringAndSize(NULL, nitems_bytes) + dest_ptr = PyBytes_AS_STRING(dest) + dest_nbytes = nitems_bytes + else: + arr = ensure_contiguous_ndarray(dest) + dest_buffer = Buffer(arr, PyBUF_ANY_CONTIGUOUS | PyBUF_WRITEABLE) + dest_ptr = dest_buffer.ptr + dest_nbytes = dest_buffer.nbytes + + # try decompression + try: + if dest_nbytes < nitems_bytes: + raise ValueError('destination buffer too small; expected at least %s, ' + 'got %s' % (nitems_bytes, dest_nbytes)) + ret = blosc2_getitem(source_ptr, nbytes, start, nitems, dest_ptr, dest_nbytes) + finally: + source_buffer.release() + if dest_buffer is not None: + dest_buffer.release() + + # ret refers to the number of bytes returned from blosc_getitem. + if ret <= 0: + raise RuntimeError('error during blosc partial decompression: %d', ret) + + return dest + + +# set the value of this variable to True or False to override the +# default adaptive behaviour +use_threads = None + +_shuffle_repr = ['AUTOSHUFFLE', 'NOSHUFFLE', 'SHUFFLE', 'BITSHUFFLE'] + + +class Blosc2(Codec): + """Codec providing compression using the Blosc2 meta-compressor. + + Parameters + ---------- + cname : string, optional + A string naming one of the compression algorithms available within blosc, e.g., + 'zstd', 'blosclz', 'lz4', 'lz4hc', 'zlib' or 'snappy'. + clevel : integer, optional + An integer between 0 and 9 specifying the compression level. + shuffle : integer, optional + Either NOSHUFFLE (0), SHUFFLE (1), BITSHUFFLE (2) or AUTOSHUFFLE (-1). If AUTOSHUFFLE, + bit-shuffle will be used for buffers with itemsize 1, and byte-shuffle will + be used otherwise. The default is `SHUFFLE`. + blocksize : int + The requested size of the compressed blocks. If 0 (default), an automatic + blocksize will be used. + + See Also + -------- + numcodecs.zstd.Zstd, numcodecs.lz4.LZ4 + + """ + + codec_id = 'blosc' + NOSHUFFLE = NOSHUFFLE + SHUFFLE = SHUFFLE + BITSHUFFLE = BITSHUFFLE + AUTOSHUFFLE = AUTOSHUFFLE + max_buffer_size = 2**31 - 1 + + def __init__(self, cname='lz4', clevel=5, shuffle=SHUFFLE, blocksize=AUTOBLOCKS): + self.cname = cname + if isinstance(cname, str): + self._cname_bytes = cname.encode('ascii') + else: + self._cname_bytes = cname + self.clevel = clevel + self.shuffle = shuffle + self.blocksize = blocksize + + def encode(self, buf): + buf = ensure_contiguous_ndarray(buf, self.max_buffer_size) + return compress(buf, self._cname_bytes, self.clevel, self.shuffle, self.blocksize) + + def decode(self, buf, out=None): + buf = ensure_contiguous_ndarray(buf, self.max_buffer_size) + return decompress(buf, out) + + def decode_partial(self, buf, int start, int nitems, out=None): + '''**Experimental**''' + buf = ensure_contiguous_ndarray(buf, self.max_buffer_size) + return decompress_partial(buf, start, nitems, dest=out) + + def __repr__(self): + r = '%s(cname=%r, clevel=%r, shuffle=%s, blocksize=%s)' % \ + (type(self).__name__, + self.cname, + self.clevel, + _shuffle_repr[self.shuffle + 1], + self.blocksize) + return r diff --git a/pyproject.toml b/pyproject.toml index 147f9b54..ca96edd5 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -108,6 +108,7 @@ norecursedirs = [ "adhoc", "build", "c-blosc", + "c-blosc2", "docs", "fixture", "notebooks", From eca2cae6f4c765ec26b5057167e52bdc824786ef Mon Sep 17 00:00:00 2001 From: Francesc Alted Date: Wed, 13 Sep 2023 15:07:46 +0200 Subject: [PATCH 2/3] Some blosc -> blosc2 translation --- numcodecs/blosc2.pyx | 24 +++++++++++------------- 1 file changed, 11 insertions(+), 13 deletions(-) diff --git a/numcodecs/blosc2.pyx b/numcodecs/blosc2.pyx index de9e6167..2a4030ce 100644 --- a/numcodecs/blosc2.pyx +++ b/numcodecs/blosc2.pyx @@ -3,7 +3,6 @@ # cython: linetrace=False # cython: binding=False # cython: language_level=3 -import threading import multiprocessing import os @@ -151,7 +150,7 @@ SHUFFLE = BLOSC_SHUFFLE BITSHUFFLE = BLOSC_BITSHUFFLE # automatic shuffle AUTOSHUFFLE = -1 -# automatic block size - let blosc decide +# automatic block size - let blosc2 decide AUTOBLOCKS = 0 # synchronization @@ -373,13 +372,12 @@ def compress(source, char* cname, int clevel, int shuffle=SHUFFLE, dest_ptr, nbytes + BLOSC_MAX_OVERHEAD) finally: - # release buffers source_buffer.release() # check compression was successful if cbytes <= 0: - raise RuntimeError('error during blosc compression: %d' % cbytes) + raise RuntimeError('error during blosc2 compression: %d' % cbytes) # resize after compression dest = dest[:cbytes] @@ -393,7 +391,7 @@ def decompress(source, dest=None): Parameters ---------- source : bytes-like - Compressed data, including blosc header. Can be any object supporting the buffer + Compressed data, including blosc2 header. Can be any object supporting the buffer protocol. dest : array-like, optional Object to decompress into. @@ -454,7 +452,7 @@ def decompress(source, dest=None): # handle errors if ret <= 0: - raise RuntimeError('error during blosc decompression: %d' % ret) + raise RuntimeError('error during blosc2 decompression: %d' % ret) return dest @@ -466,7 +464,7 @@ def decompress_partial(source, start, nitems, dest=None): Parameters ---------- source : bytes-like - Compressed data, including blosc header. Can be any object supporting the buffer + Compressed data, including blosc2 header. Can be any object supporting the buffer protocol. start: int, Offset in item where we want to start decoding @@ -526,9 +524,9 @@ def decompress_partial(source, start, nitems, dest=None): if dest_buffer is not None: dest_buffer.release() - # ret refers to the number of bytes returned from blosc_getitem. + # ret refers to the number of bytes returned from blosc2_getitem. if ret <= 0: - raise RuntimeError('error during blosc partial decompression: %d', ret) + raise RuntimeError('error during blosc2 partial decompression: %d', ret) return dest @@ -546,8 +544,8 @@ class Blosc2(Codec): Parameters ---------- cname : string, optional - A string naming one of the compression algorithms available within blosc, e.g., - 'zstd', 'blosclz', 'lz4', 'lz4hc', 'zlib' or 'snappy'. + A string naming one of the compression algorithms available within blosc2, e.g., + 'zstd', 'blosclz', 'lz4', 'lz4hc' or 'zlib'. clevel : integer, optional An integer between 0 and 9 specifying the compression level. shuffle : integer, optional @@ -560,11 +558,11 @@ class Blosc2(Codec): See Also -------- - numcodecs.zstd.Zstd, numcodecs.lz4.LZ4 + numcodecs.zstd.Zstd, numcodecs.lz4.LZ4, numcodecs.blosc.Blosc """ - codec_id = 'blosc' + codec_id = 'blosc2' NOSHUFFLE = NOSHUFFLE SHUFFLE = SHUFFLE BITSHUFFLE = BITSHUFFLE From ad84f7eab9882452021a3386daf0a32620d1d02e Mon Sep 17 00:00:00 2001 From: Francesc Alted Date: Wed, 13 Sep 2023 15:20:19 +0200 Subject: [PATCH 3/3] Using contexts properly --- numcodecs/blosc2.pyx | 32 ++++++++++++++++---------------- 1 file changed, 16 insertions(+), 16 deletions(-) diff --git a/numcodecs/blosc2.pyx b/numcodecs/blosc2.pyx index 2a4030ce..99136a21 100644 --- a/numcodecs/blosc2.pyx +++ b/numcodecs/blosc2.pyx @@ -113,6 +113,9 @@ cdef extern from "blosc2.h": void blosc2_init() void blosc2_destroy() + blosc2_context* blosc2_create_cctx(blosc2_cparams cparams) + blosc2_context * blosc2_create_dctx(blosc2_dparams dparams) + void blosc2_free_ctx(blosc2_context * context) int blosc2_get_nthreads() int blosc2_set_nthreads(int nthreads) int blosc1_set_compressor(const char *compname) @@ -351,27 +354,24 @@ def compress(source, char* cname, int clevel, int shuffle=SHUFFLE, shuffle) try: - # setup destination dest = PyBytes_FromStringAndSize(NULL, nbytes + BLOSC_MAX_OVERHEAD) dest_ptr = PyBytes_AS_STRING(dest) # perform compression with nogil: - # cbytes = blosc2_compress_ctx(clevel, shuffle, itemsize, nbytes, source_ptr, - # dest_ptr, nbytes + BLOSC_MAX_OVERHEAD, - # cname, blocksize, 1) - cdef blosc2_context *ctx = BLOSC2_CPARAMS_DEFAULTS - ctx.clevel = clevel - ctx.shuffle = shuffle - ctx.itemsize = itemsize - ctx.blocksize = blocksize - ctx.compcode = compname_to_compcode(cname) - ctx.nthreads = blosc2_get_nthreads() + cdef blosc2_cparams *cparams = BLOSC2_CPARAMS_DEFAULTS + cparams.compcode = compname_to_compcode(cname) + cparams.clevel = clevel + cparams.shuffle = shuffle + cparams.typesize = itemsize + cparams.blocksize = blocksize + cparams.nthreads = blosc2_get_nthreads() + cdef blosc2_context ctx = blosc2_create_cctx(cparams) cbytes = blosc2_compress_ctx(ctx, source_ptr, nbytes, dest_ptr, nbytes + BLOSC_MAX_OVERHEAD) - finally: + blosc2_free_ctx(ctx) # release buffers source_buffer.release() @@ -438,13 +438,13 @@ def decompress(source, dest=None): # perform decompression with nogil: - cdef blosc2_context *ctx = BLOSC2_DPARAMS_DEFAULTS - ctx.nthreads = blosc2_get_nthreads() + cdef blosc2_cparams *dparams = BLOSC2_DPARAMS_DEFAULTS + dparams.nthreads = blosc2_get_nthreads() + cdef blosc2_context ctx = blosc2_create_dctx(dparams) ret = blosc2_decompress_ctx(ctx, source_ptr, nbytes, dest_ptr, nbytes + BLOSC_MAX_OVERHEAD) - finally: - + blosc2_free_ctx(ctx) # release buffers source_buffer.release() if dest_buffer is not None: