-
-
Notifications
You must be signed in to change notification settings - Fork 304
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Using zarr v3 in a multiprocessing context fails with JSONDecodeError #2729
Comments
We haven't explicitly tested zarr-python 3 with multiprocessing, but I don't see any reason why there should be any particular problems, because at least with the That being said, I don't really understand the architecture of your program. From the error, it looks like the reader is trying to access a To avoid these kinds of issues, I would create your zarr hierarchy in synchronous code as much as possible (because writing some JSON documents doesn't benefit from multiprocessing anyways). |
Hi,
My expectation is that the zarr directories and files are created and ready to be opened by the reader class, when the writer created them and set the event. Am I mistaken here? Furthermore Im not writing or using any json files. These are the zarr internal metadata files that can not be read. |
I am talking about json because zarr arrays and group use json for the metadata documents. Each time you create a zarr array or group, you are writing a json document to storage; each time you open a zarr array / group, you are reading a json document from storage. As your error message ended with
I can trigger the same error by attempting to decode an empty bytestring as json: >>> json.loads(b'')
Traceback (most recent call last):
File "<stdin>", line 1, in <module>
File "/Users/bennettd/.pyenv/versions/3.11.9/lib/python3.11/json/__init__.py", line 346, in loads
return _default_decoder.decode(s)
^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/Users/bennettd/.pyenv/versions/3.11.9/lib/python3.11/json/decoder.py", line 337, in decode
obj, end = self.raw_decode(s, idx=_w(s, 0).end())
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/Users/bennettd/.pyenv/versions/3.11.9/lib/python3.11/json/decoder.py", line 355, in raw_decode
raise JSONDecodeError("Expecting value", s, err.value) from None
json.decoder.JSONDecodeError: Expecting value: line 1 column 1 (char 0) so the question is, how is a zarr metadata document (which should be valid json) getting read from disk as an empty bytestring. I don't really know.
I'm not sure what you mean by this statement -- are you referring to zarr the format or this python library? the file format itself provides relatively limited concurrency guarantees -- in short, it is the responsibility of users / applications to structure their application to prevent race conditions / data corruption. zarr-python 2.x supported synchronization via file-based locking, which might be helpful for your use case, but we have not added these features to zarr-python 3 yet. for your specific example, could you refactor it so that you create all the arrays and groups first, and then you run your separate writer / reader processes, and they write / read to the already-created arrays? this might make the logic easier to follow. |
okay great! Just wanted to make sure we're talking about the same thing.
I tried this, but after a few iterations the same error appears: import os
import sys
import time
import zarr
import numpy as np
import logging
from multiprocessing import Process, Event
class ZarrReader(Process):
def __init__(self, event, fname, dsetname, timeout = 2.0):
super().__init__()
self._event = event
self._fname = fname
self._dsetname = dsetname
self._timeout = timeout
def run(self):
self.log = logging.getLogger('reader')
print("Reader: Waiting for initial event")
assert self._event.wait( self._timeout )
self._event.clear()
print(f"Reader: Opening file {self._fname}")
root = zarr.open_group(self._fname, mode='r')
dset = root[self._dsetname]
text_dset = root['text_data']
# monitor and read loop
while self._event.wait( self._timeout ):
self._event.clear()
print("Reader: Event received")
dset = root[self._dsetname]
text_dset = root['text_data']
shape = dset.shape
print("Reader: Read dset shape: %s"%str(shape))
print(f"Reader: Text dataset shape: {text_dset.shape}")
for i in range(text_dset.shape[0]):
print(text_dset[i])
class ZarrWriter(Process):
def __init__(self, event, fname, dsetname):
super().__init__()
self._event = event
self._fname = fname
self._dsetname = dsetname
def run(self):
self.log = logging.getLogger('writer')
print(f"Writer: Creating file {self._fname}")
root = zarr.open_group(self._fname, mode='r+')
arr = np.array([1,2,3,4])
dset = root.create_array(self._dsetname, shape=(4,), chunks=(2,), dtype=np.float64, fill_value=np.nan)
dset[:] = arr
text_dset = root.create_array('text_data', shape=(1,), chunks=(3,), dtype=str)
text_arr = np.array(["Sample text 0"])
text_dset[:] = text_arr
print("Writer: Sending initial event")
self._event.set()
print("Writer: Waiting for the reader-opened-file event")
# Write loop
for i in range(1, 6):
new_shape = (i * len(arr), )
print("Writer: Resizing dset shape: %s"%str(new_shape))
dset.resize( new_shape )
print("Writer: Writing data")
dset[i*len(arr):] = arr
text_dset.resize((text_dset.shape[0] + 1,))
new_text_arr = np.array([f"Sample text {i}" * i])
text_dset[-1:] = new_text_arr
#dset.write_direct( arr, np.s_[:], np.s_[i*len(arr):] )
print("Writer: Sending event")
self._event.set()
if __name__ == "__main__":
logging.basicConfig(format='%(levelname)10s %(asctime)s %(name)10s %(message)s',level=logging.INFO)
fname = 'measurements.zarr'
dsetname = 'data'
if len(sys.argv) > 1:
fname = sys.argv[1]
if len(sys.argv) > 2:
dsetname = sys.argv[2]
if os.path.exists(fname):
import shutil
shutil.rmtree(fname)
root = zarr.group(fname)
event = Event()
reader = ZarrReader(event, fname, dsetname)
writer = ZarrWriter(event, fname, dsetname)
logging.info("Starting reader")
reader.start()
logging.info("Starting writer")
writer.start()
logging.info("Waiting for writer to finish")
writer.join()
logging.info("Waiting for reader to finish")
reader.join()
Im actually not sure in which direction I should direct this report. I guess this library foremost.
but there isn't any tutorial or guidance on how to implement that with v3. Can you get my example code to work or have an example which shows how to correctly use zarr-v3 with a writer and reader process? |
You are resizing the arrays in your writer process. This will require writing new array metadata to disk; if the reader read the array metadata before the resize, then the reader will have an invalid copy of that metadata. That being said, I'm not sure what exactly is causing the error you see. For what it's worth, I could not replicate this on my machine, and I saw a different error, but the errors went away when I removed the resizing operation from the writer process. If your application requires resizing the array from a writer process, while reading from separate processes, then I would recommend synchronizing around the resizing operation -- as soon as the array is resized, any reader with a reference to that array has invalid metadata that should be discarded. This should be feasible, but it would be much simpler to not resize the array while attempting to read from it. |
to this point, we should be more clear in the docs about what degree of concurrency is supported.
So basically, if you are trying to write to different chunks of the same array from multiple processes, that's going to work. If you try to modify array metadata, or modify the chunks of an array, while it is being read from another process, then this is not likely to work without some explicit synchronization between processes. |
Zarr version
3.0.1
Numcodecs version
0.15.0
Python Version
3.12.2
Operating System
Windows 11 22H2
Installation
using pip into virtual environment
Description
Hi,
I discovered zarr a few days ago, just after v3 was published and I'm trying to use it in a multiprocessing context where one process writes numeric as well as variable length string data into a persistent file from which a reader process reads the newly arrived data.
The aim is to exchange data as well as store it persistently at the same time.
I tried to build a minimal working example (See steps to reproduce) but more often than not reading from the zarr files fails with the following exception:
Is this a bug in v3, is it not ready yet for multiprocessing or am I making as mistake here?
Sadly, the v3 docs don't really describe how to use zarr in a multiprocessing context so it might be possible I'm missing something.
Steps to reproduce
Additional output
No response
The text was updated successfully, but these errors were encountered: