@@ -189,3 +189,106 @@ def test_result_attributes(self):
189189 assert result .total_docs_quantized == 1000
190190 assert result .num_workers == 4
191191 assert len (result .worker_results ) == 4
192+
193+
194+ class TestWorkerResume :
195+ """Test sync and async worker resume from partial backups."""
196+
197+ def _make_partial_backup (self , tmp_path , phase = "dump" , dump_batches = 1 ):
198+ """Create a partial backup to simulate crash-resume."""
199+ from redisvl .migration .backup import VectorBackup
200+
201+ bp = str (tmp_path / "migration_backup_testidx_shard_0" )
202+ datatype_changes = {
203+ "embedding" : {"source" : "float32" , "target" : "float16" , "dims" : 4 }
204+ }
205+ backup = VectorBackup .create (
206+ path = bp ,
207+ index_name = "testidx" ,
208+ fields = datatype_changes ,
209+ batch_size = 2 ,
210+ )
211+ # Write some batches
212+ for i in range (dump_batches ):
213+ keys = [f"doc:{ i * 2 } " , f"doc:{ i * 2 + 1 } " ]
214+ originals = {
215+ k : {"embedding" : _make_float32_vector (4 , seed = float (j ))}
216+ for j , k in enumerate (keys )
217+ }
218+ backup .write_batch (i , keys , originals )
219+
220+ if phase == "ready" :
221+ backup .mark_dump_complete ()
222+ elif phase == "active" :
223+ backup .mark_dump_complete ()
224+ backup .start_quantize ()
225+ return bp , datatype_changes
226+
227+ def test_sync_worker_resumes_from_ready_phase (self , tmp_path ):
228+ """Sync worker should skip dump and proceed to quantize on resume."""
229+ from redisvl .migration .backup import VectorBackup
230+
231+ bp , dt_changes = self ._make_partial_backup (
232+ tmp_path , phase = "ready" , dump_batches = 2
233+ )
234+
235+ # Verify backup is in ready phase
236+ backup = VectorBackup .load (bp )
237+ assert backup is not None
238+ assert backup .header .phase == "ready"
239+ assert backup .header .dump_completed_batches == 2
240+
241+ def test_sync_worker_resumes_from_dump_phase (self , tmp_path ):
242+ """Sync worker should resume dumping from the last completed batch."""
243+ from redisvl .migration .backup import VectorBackup
244+
245+ bp , dt_changes = self ._make_partial_backup (
246+ tmp_path , phase = "dump" , dump_batches = 1
247+ )
248+
249+ backup = VectorBackup .load (bp )
250+ assert backup is not None
251+ assert backup .header .phase == "dump"
252+ assert backup .header .dump_completed_batches == 1
253+ # Worker should start from batch 1, not 0
254+
255+ def test_sync_worker_skips_completed_backup (self , tmp_path ):
256+ """Completed backup should be detected and skipped."""
257+ from redisvl .migration .backup import VectorBackup
258+
259+ bp , dt_changes = self ._make_partial_backup (
260+ tmp_path , phase = "active" , dump_batches = 2
261+ )
262+ backup = VectorBackup .load (bp )
263+ # Mark all batches quantized
264+ for i in range (2 ):
265+ backup .mark_batch_quantized (i )
266+ backup .mark_complete ()
267+
268+ # Reload and verify
269+ backup = VectorBackup .load (bp )
270+ assert backup .header .phase == "completed"
271+
272+ @pytest .mark .asyncio
273+ async def test_async_worker_loads_existing_backup (self , tmp_path ):
274+ """Async worker should load existing backup instead of creating new."""
275+ from redisvl .migration .backup import VectorBackup
276+
277+ bp , dt_changes = self ._make_partial_backup (
278+ tmp_path , phase = "ready" , dump_batches = 2
279+ )
280+
281+ # Verify load succeeds and returns existing backup
282+ backup = VectorBackup .load (bp )
283+ assert backup is not None
284+ assert backup .header .phase == "ready"
285+ assert backup .header .dump_completed_batches == 2
286+
287+ # Verify create would fail (backup already exists)
288+ with pytest .raises (FileExistsError ):
289+ VectorBackup .create (
290+ path = bp ,
291+ index_name = "testidx" ,
292+ fields = dt_changes ,
293+ batch_size = 2 ,
294+ )
0 commit comments