1313 normalize_storage_path
1414from zarr.storage import array_meta_key, attrs_key, listdir, getsize
1515from zarr.meta import decode_array_metadata, encode_array_metadata
16- from zarr.attrs import Attributes
16+ from zarr.attrs import Attributes, SynchronizedAttributes
1717from zarr.errors import ReadOnlyError
1818from zarr.compat import reduce
1919
@@ -514,7 +514,7 @@ def _chunk_getitem(self, cidx, item, dest):
514514 try:
515515
516516 # obtain compressed data for chunk
517- ckey = self._ckey (cidx)
517+ ckey = self._chunk_key (cidx)
518518 cdata = self._chunk_store[ckey]
519519
520520 except KeyError:
@@ -589,7 +589,7 @@ def _chunk_setitem(self, cidx, key, value):
589589 try:
590590
591591 # obtain compressed data for chunk
592- ckey = self._ckey (cidx)
592+ ckey = self._chunk_key (cidx)
593593 cdata = self._chunk_store[ckey]
594594
595595 except KeyError:
@@ -614,10 +614,10 @@ def _chunk_setitem(self, cidx, key, value):
614614 cdata = self._compressor.compress(chunk)
615615
616616 # store
617- ckey = self._ckey (cidx)
617+ ckey = self._chunk_key (cidx)
618618 self._chunk_store[ckey] = cdata
619619
620- def _ckey (self, cidx):
620+ def _chunk_key (self, cidx):
621621 return self._key_prefix + '.'.join(map(str, cidx))
622622
623623 def __repr__(self):
@@ -691,6 +691,13 @@ def resize(self, *args):
691691 if self._readonly:
692692 raise ReadOnlyError('array is read-only')
693693
694+ self._resize(*args)
695+
696+ def _resize(self, *args):
697+
698+ # N.B., private implementation to avoid need for re-entrant lock on
699+ # SynchronizedArray.append().
700+
694701 # normalize new shape argument
695702 old_shape = self._shape
696703 new_shape = normalize_resize_args(old_shape, *args)
@@ -780,7 +787,7 @@ def append(self, data, axis=0):
780787 )
781788
782789 # resize
783- self.resize (new_shape)
790+ self._resize (new_shape)
784791
785792 # store data
786793 # noinspection PyTypeChecker
@@ -789,3 +796,88 @@ def append(self, data, axis=0):
789796 for i in range(len(self._shape))
790797 )
791798 self[append_selection] = data
799+
800+
801+ class SynchronizedArray(Array):
802+ """Instantiate a synchronized array.
803+
804+ Parameters
805+ ----------
806+ store : MutableMapping
807+ Array store, already initialized.
808+ synchronizer : object
809+ Array synchronizer.
810+ readonly : bool, optional
811+ True if array should be protected against modification.
812+ path : string, optional
813+ Path under which array is stored.
814+ chunk_store : MutableMapping, optional
815+ Separate storage for chunks. If not provided, `store` will be used
816+ for storage of both chunks and metadata.
817+
818+ Examples
819+ --------
820+ >>> import zarr
821+ >>> store = dict()
822+ >>> zarr.init_array(store, shape=1000, chunks=100)
823+ >>> synchronizer = zarr.ThreadSynchronizer()
824+ >>> z = zarr.SynchronizedArray(store, synchronizer)
825+ >>> z
826+ zarr.sync.SynchronizedArray((1000,), float64, chunks=(100,), order=C)
827+ compression: blosc; compression_opts: {'clevel': 5, 'cname': 'lz4', 'shuffle': 1}
828+ nbytes: 7.8K; nbytes_stored: 285; ratio: 28.1; initialized: 0/10
829+ store: builtins.dict; synchronizer: zarr.sync.ThreadSynchronizer
830+
831+ Notes
832+ -----
833+ TODO review
834+
835+ Only writing data to the array via the __setitem__() method and
836+ modification of user attributes are synchronized. Neither append() nor
837+ resize() are synchronized.
838+
839+ Writing to the array is synchronized at the chunk level. I.e.,
840+ the array supports concurrent write operations via the __setitem__()
841+ method, but these will only exclude each other if they both require
842+ modification of the same chunk.
843+
844+ """ # flake8: noqa
845+
846+ def __init__(self, store, synchronizer, readonly=False, path=None,
847+ chunk_store=None):
848+ super(SynchronizedArray, self).__init__(store, readonly=readonly,
849+ path=path,
850+ chunk_store=chunk_store)
851+ self.synchronizer = synchronizer
852+ akey = self._key_prefix + attrs_key
853+ self._attrs = SynchronizedAttributes(store, synchronizer, key=akey,
854+ readonly=readonly)
855+
856+ def __repr__(self):
857+ r = super(SynchronizedArray, self).__repr__()
858+ r += ('; synchronizer: %s.%s' %
859+ (type(self.synchronizer).__module__,
860+ type(self.synchronizer).__name__))
861+ return r
862+
863+ def __getstate__(self):
864+ return self._store, self.synchronizer, self._readonly, self._path, \
865+ self._chunk_store
866+
867+ def __setstate__(self, state):
868+ self.__init__(*state)
869+
870+ def _chunk_setitem(self, cidx, key, value):
871+ ckey = self._chunk_key(cidx)
872+ with self.synchronizer[ckey]:
873+ super(SynchronizedArray, self)._chunk_setitem(cidx, key, value)
874+
875+ def resize(self, *args):
876+ mkey = self._key_prefix + array_meta_key
877+ with self.synchronizer[mkey]:
878+ super(SynchronizedArray, self).resize(*args)
879+
880+ def append(self, *args, **kwargs):
881+ mkey = self._key_prefix + array_meta_key
882+ with self.synchronizer[mkey]:
883+ super(SynchronizedArray, self).append(*args, **kwargs)
0 commit comments