1313 normalize_storage_path
1414from zarr .storage import array_meta_key , attrs_key , listdir , getsize
1515from zarr .meta import decode_array_metadata , encode_array_metadata
16- from zarr .attrs import Attributes , SynchronizedAttributes
16+ from zarr .attrs import Attributes
1717from zarr .errors import ReadOnlyError
1818from zarr .compat import reduce
1919
@@ -32,6 +32,8 @@ class Array(object):
3232 chunk_store : MutableMapping, optional
3333 Separate storage for chunks. If not provided, `store` will be used
3434 for storage of both chunks and metadata.
35+ synchronizer : object, optional
36+ Array synchronizer.
3537
3638 Attributes
3739 ----------
@@ -47,6 +49,7 @@ class Array(object):
4749 compression_opts
4850 fill_value
4951 order
52+ synchronizer
5053 attrs
5154 size
5255 itemsize
@@ -64,7 +67,8 @@ class Array(object):
6467
6568 """ # flake8: noqa
6669
67- def __init__ (self , store , path = None , readonly = False , chunk_store = None ):
70+ def __init__ (self , store , path = None , readonly = False , chunk_store = None ,
71+ synchronizer = None ):
6872 # N.B., expect at this point store is fully initialized with all
6973 # configuration metadata fully specified and normalized
7074
@@ -79,6 +83,7 @@ def __init__(self, store, path=None, readonly=False, chunk_store=None):
7983 self ._chunk_store = store
8084 else :
8185 self ._chunk_store = chunk_store
86+ self ._synchronizer = synchronizer
8287
8388 # initialize metadata
8489 try :
@@ -101,9 +106,10 @@ def __init__(self, store, path=None, readonly=False, chunk_store=None):
101106
102107 # initialize attributes
103108 akey = self ._key_prefix + attrs_key
104- self ._attrs = Attributes (store , key = akey , readonly = readonly )
109+ self ._attrs = Attributes (store , key = akey , readonly = readonly ,
110+ synchronizer = synchronizer )
105111
106- def flush_metadata (self ):
112+ def _flush_metadata (self ):
107113 meta = dict (shape = self ._shape , chunks = self ._chunks , dtype = self ._dtype ,
108114 compression = self ._compression ,
109115 compression_opts = self ._compression_opts ,
@@ -183,6 +189,11 @@ def order(self):
183189 chunks of the array."""
184190 return self ._order
185191
192+ @property
193+ def synchronizer (self ):
194+ """TODO doc me"""
195+ return self ._synchronizer
196+
186197 @property
187198 def attrs (self ):
188199 """A MutableMapping containing user-defined attributes. Note that
@@ -563,6 +574,20 @@ def _chunk_setitem(self, cidx, key, value):
563574
564575 """
565576
577+ # synchronization
578+ if self ._synchronizer is None :
579+ self ._chunk_setitem_nosync (cidx , key , value )
580+ else :
581+ # synchronize on the chunk
582+ ckey = self ._chunk_key (cidx )
583+ with self ._synchronizer [ckey ]:
584+ self ._chunk_setitem_nosync (cidx , key , value )
585+
586+ def _chunk_setitem_nosync (self , cidx , key , value ):
587+
588+ # obtain key for chunk storage
589+ ckey = self ._chunk_key (cidx )
590+
566591 if is_total_slice (key , self ._chunks ):
567592
568593 # optimisation: we are completely replacing the chunk, so no need
@@ -589,7 +614,6 @@ def _chunk_setitem(self, cidx, key, value):
589614 try :
590615
591616 # obtain compressed data for chunk
592- ckey = self ._chunk_key (cidx )
593617 cdata = self ._chunk_store [ckey ]
594618
595619 except KeyError :
@@ -614,7 +638,6 @@ def _chunk_setitem(self, cidx, key, value):
614638 cdata = self ._compressor .compress (chunk )
615639
616640 # store
617- ckey = self ._chunk_key (cidx )
618641 self ._chunk_store [ckey ] = cdata
619642
620643 def _chunk_key (self , cidx ):
@@ -644,14 +667,34 @@ def __repr__(self):
644667 r += '\n chunk_store: %s.%s' % \
645668 (type (self ._chunk_store ).__module__ ,
646669 type (self ._chunk_store ).__name__ )
670+ if self ._synchronizer is not None :
671+ r += ('\n synchronizer: %s.%s' %
672+ (type (self ._synchronizer ).__module__ ,
673+ type (self ._synchronizer ).__name__ ))
647674 return r
648675
649676 def __getstate__ (self ):
650- return self ._store , self ._path , self ._readonly , self ._chunk_store
677+ return self ._store , self ._path , self ._readonly , self ._chunk_store , \
678+ self ._synchronizer
651679
652680 def __setstate__ (self , state ):
653681 self .__init__ (* state )
654682
683+ def _write_op (self , f , * args , ** kwargs ):
684+
685+ # guard condition
686+ if self ._readonly :
687+ raise ReadOnlyError ('array is read-only' )
688+
689+ # synchronization
690+ if self ._synchronizer is None :
691+ return f (* args , ** kwargs )
692+ else :
693+ # synchronize on the array
694+ mkey = self ._key_prefix + array_meta_key
695+ with self ._synchronizer [mkey ]:
696+ return f (* args , ** kwargs )
697+
655698 def resize (self , * args ):
656699 """Change the shape of the array by growing or shrinking one or more
657700 dimensions.
@@ -687,16 +730,9 @@ def resize(self, *args):
687730
688731 """ # flake8: noqa
689732
690- # guard conditions
691- if self ._readonly :
692- raise ReadOnlyError ('array is read-only' )
693-
694- self ._resize (* args )
733+ return self ._write_op (self ._resize_nosync , * args )
695734
696- def _resize (self , * args ):
697-
698- # N.B., private implementation to avoid need for re-entrant lock on
699- # SynchronizedArray.append().
735+ def _resize_nosync (self , * args ):
700736
701737 # normalize new shape argument
702738 old_shape = self ._shape
@@ -718,7 +754,7 @@ def _resize(self, *args):
718754
719755 # update metadata
720756 self ._shape = new_shape
721- self .flush_metadata ()
757+ self ._flush_metadata ()
722758
723759 def append (self , data , axis = 0 ):
724760 """Append `data` to `axis`.
@@ -760,10 +796,9 @@ def append(self, data, axis=0):
760796 store: builtins.dict
761797
762798 """
799+ return self ._write_op (self ._append_nosync , data , axis = axis )
763800
764- # guard conditions
765- if self ._readonly :
766- raise ReadOnlyError ('array is read-only' )
801+ def _append_nosync (self , data , axis = 0 ):
767802
768803 # ensure data is array-like
769804 if not hasattr (data , 'shape' ) or not hasattr (data , 'dtype' ):
@@ -787,7 +822,7 @@ def append(self, data, axis=0):
787822 )
788823
789824 # resize
790- self ._resize (new_shape )
825+ self ._resize_nosync (new_shape )
791826
792827 # store data
793828 # noinspection PyTypeChecker
@@ -796,88 +831,3 @@ def append(self, data, axis=0):
796831 for i in range (len (self ._shape ))
797832 )
798833 self [append_selection ] = data
799-
800-
801- class SynchronizedArray (Array ):
802- """Instantiate a synchronized array.
803-
804- Parameters
805- ----------
806- store : MutableMapping
807- Array store, already initialized.
808- synchronizer : object
809- Array synchronizer.
810- readonly : bool, optional
811- True if array should be protected against modification.
812- path : string, optional
813- Path under which array is stored.
814- chunk_store : MutableMapping, optional
815- Separate storage for chunks. If not provided, `store` will be used
816- for storage of both chunks and metadata.
817-
818- Examples
819- --------
820- >>> import zarr
821- >>> store = dict()
822- >>> zarr.init_array(store, shape=1000, chunks=100)
823- >>> synchronizer = zarr.ThreadSynchronizer()
824- >>> z = zarr.SynchronizedArray(store, synchronizer)
825- >>> z
826- zarr.core.SynchronizedArray((1000,), float64, chunks=(100,), order=C)
827- compression: blosc; compression_opts: {'clevel': 5, 'cname': 'lz4', 'shuffle': 1}
828- nbytes: 7.8K; nbytes_stored: 285; ratio: 28.1; initialized: 0/10
829- store: builtins.dict; synchronizer: zarr.sync.ThreadSynchronizer
830-
831- Notes
832- -----
833- TODO review
834-
835- Only writing data to the array via the __setitem__() method and
836- modification of user attributes are synchronized. Neither append() nor
837- resize() are synchronized.
838-
839- Writing to the array is synchronized at the chunk level. I.e.,
840- the array supports concurrent write operations via the __setitem__()
841- method, but these will only exclude each other if they both require
842- modification of the same chunk.
843-
844- """ # flake8: noqa
845-
846- def __init__ (self , store , synchronizer , readonly = False , path = None ,
847- chunk_store = None ):
848- super (SynchronizedArray , self ).__init__ (store , readonly = readonly ,
849- path = path ,
850- chunk_store = chunk_store )
851- self ._synchronizer = synchronizer
852- akey = self ._key_prefix + attrs_key
853- self ._attrs = SynchronizedAttributes (store , synchronizer , key = akey ,
854- readonly = readonly )
855-
856- def __repr__ (self ):
857- r = super (SynchronizedArray , self ).__repr__ ()
858- r += ('\n synchronizer: %s.%s' %
859- (type (self ._synchronizer ).__module__ ,
860- type (self ._synchronizer ).__name__ ))
861- return r
862-
863- def __getstate__ (self ):
864- return self ._store , self ._synchronizer , self ._readonly , self ._path , \
865- self ._chunk_store
866-
867- def __setstate__ (self , state ):
868- self .__init__ (* state )
869-
870- def _chunk_setitem (self , cidx , key , value ):
871- ckey = self ._chunk_key (cidx )
872- with self ._synchronizer [ckey ]:
873- super (SynchronizedArray , self )._chunk_setitem (cidx , key , value )
874-
875- def resize (self , * args ):
876- mkey = self ._key_prefix + array_meta_key
877- with self ._synchronizer [mkey ]:
878- super (SynchronizedArray , self ).resize (* args )
879-
880- def append (self , * args , ** kwargs ):
881- mkey = self ._key_prefix + array_meta_key
882- with self ._synchronizer [mkey ]:
883- super (SynchronizedArray , self ).append (* args , ** kwargs )
0 commit comments