Skip to content

Commit 3f03bb7

Browse files
committed
fix: code-rev round 8 - cluster key rename batching, legacy backup fallback
Performance fix: - Sync and async _rename_keys_cluster now batch DUMP+PTTL reads and RESTORE+DEL writes in groups of 100 (pipeline_size), reducing from 5 RTTs/key to ~3 RTTs per batch of 100 keys. ~50x fewer round trips for large key renames in Redis Cluster mode. Backward compat fix: - Both executors now probe for legacy backup filenames (pre-hash naming convention: migration_backup_<safe_name>) when the hashed path is not found. This ensures crash-resume works across library upgrades. Tests: 178 passed
1 parent 8cb1d4d commit 3f03bb7

2 files changed

Lines changed: 141 additions & 49 deletions

File tree

redisvl/migration/async_executor.py

Lines changed: 70 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -318,39 +318,75 @@ async def _rename_keys_cluster(
318318
new_prefix: str,
319319
progress_callback: Optional[Callable[[int, int], None]] = None,
320320
) -> int:
321-
"""Rename keys using DUMP/RESTORE/DEL for Redis Cluster.
321+
"""Rename keys using batched DUMP/RESTORE/DEL for Redis Cluster.
322322
323323
RENAME/RENAMENX raises CROSSSLOT errors when source and destination
324324
hash to different slots. DUMP/RESTORE works across slots.
325+
326+
Batches DUMP+PTTL reads and RESTORE+DEL writes in groups of
327+
``pipeline_size`` to reduce per-key round-trip overhead.
325328
"""
326329
renamed = 0
327330
total = len(keys)
331+
pipeline_size = 100
328332

329-
for idx, key in enumerate(keys):
330-
if not key.startswith(old_prefix):
331-
logger.warning(f"Key '{key}' does not start with prefix '{old_prefix}'")
332-
continue
333-
new_key = new_prefix + key[len(old_prefix) :]
333+
for i in range(0, total, pipeline_size):
334+
batch = keys[i : i + pipeline_size]
334335

335-
if await client.exists(new_key):
336-
raise RuntimeError(
337-
f"Prefix rename aborted after {renamed} successful rename(s): "
338-
f"destination key '{new_key}' already exists. "
339-
f"Remove conflicting keys or choose a different prefix."
340-
)
336+
# Build (key, new_key) pairs for this batch
337+
pairs = []
338+
for key in batch:
339+
if not key.startswith(old_prefix):
340+
logger.warning(
341+
"Key '%s' does not start with prefix '%s'", key, old_prefix
342+
)
343+
continue
344+
new_key = new_prefix + key[len(old_prefix) :]
345+
pairs.append((key, new_key))
341346

342-
dumped = await client.dump(key)
343-
if dumped is None:
344-
logger.warning(f"Key '{key}' does not exist, skipping")
347+
if not pairs:
345348
continue
346-
ttl = await client.pttl(key)
347-
restore_ttl = max(ttl, 0)
348-
await client.restore(new_key, restore_ttl, dumped, replace=False)
349-
await client.delete(key)
350-
renamed += 1
351349

352-
if progress_callback and (idx + 1) % 100 == 0:
353-
progress_callback(idx + 1, total)
350+
# Phase 1: Check destination keys don't exist (batched)
351+
check_pipe = client.pipeline(transaction=False)
352+
for _, new_key in pairs:
353+
check_pipe.exists(new_key)
354+
exists_results = await check_pipe.execute()
355+
for (_, new_key), exists in zip(pairs, exists_results):
356+
if exists:
357+
raise RuntimeError(
358+
f"Prefix rename aborted after {renamed} successful rename(s): "
359+
f"destination key '{new_key}' already exists. "
360+
f"Remove conflicting keys or choose a different prefix."
361+
)
362+
363+
# Phase 2: DUMP + PTTL all source keys (batched — 1 RTT)
364+
dump_pipe = client.pipeline(transaction=False)
365+
for key, _ in pairs:
366+
dump_pipe.dump(key)
367+
dump_pipe.pttl(key)
368+
dump_results = await dump_pipe.execute()
369+
370+
# Phase 3: RESTORE + DEL (batched — 1 RTT)
371+
restore_pipe = client.pipeline(transaction=False)
372+
valid_pairs = []
373+
for idx, (key, new_key) in enumerate(pairs):
374+
dumped = dump_results[idx * 2]
375+
ttl = dump_results[idx * 2 + 1]
376+
if dumped is None:
377+
logger.warning("Key '%s' does not exist, skipping", key)
378+
continue
379+
restore_ttl = max(ttl, 0)
380+
restore_pipe.restore(new_key, restore_ttl, dumped, replace=False)
381+
restore_pipe.delete(key)
382+
valid_pairs.append((key, new_key))
383+
384+
if valid_pairs:
385+
await restore_pipe.execute()
386+
renamed += len(valid_pairs)
387+
388+
if progress_callback:
389+
progress_callback(min(i + pipeline_size, total), total)
354390

355391
if progress_callback:
356392
progress_callback(total, total)
@@ -554,6 +590,18 @@ async def apply(
554590
)
555591
existing_backup = VectorBackup.load(backup_path)
556592

593+
# Fallback: probe for legacy backup filename (pre-hash naming)
594+
if existing_backup is None:
595+
legacy_path = str(Path(backup_dir) / f"migration_backup_{safe_name}")
596+
legacy_backup = VectorBackup.load(legacy_path)
597+
if legacy_backup is not None:
598+
logger.info(
599+
"Found legacy backup at %s (pre-hash naming), using it",
600+
legacy_path,
601+
)
602+
existing_backup = legacy_backup
603+
backup_path = legacy_path
604+
557605
if existing_backup is not None:
558606
if existing_backup.header.index_name != plan.source.index_name:
559607
existing_backup = None

redisvl/migration/executor.py

Lines changed: 71 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -330,43 +330,75 @@ def _rename_keys_cluster(
330330
new_prefix: str,
331331
progress_callback: Optional[Callable[[int, int], None]] = None,
332332
) -> int:
333-
"""Rename keys using DUMP/RESTORE/DEL for Redis Cluster.
333+
"""Rename keys using batched DUMP/RESTORE/DEL for Redis Cluster.
334334
335335
RENAME/RENAMENX raises CROSSSLOT errors when source and destination
336-
hash to different slots. DUMP/RESTORE works across slots because
337-
each command targets a single key.
336+
hash to different slots. DUMP/RESTORE works across slots.
337+
338+
Batches DUMP+PTTL reads and RESTORE+DEL writes in groups of
339+
``pipeline_size`` to reduce per-key round-trip overhead.
338340
"""
339341
renamed = 0
340342
total = len(keys)
343+
pipeline_size = 100
341344

342-
for idx, key in enumerate(keys):
343-
if not key.startswith(old_prefix):
344-
logger.warning(f"Key '{key}' does not start with prefix '{old_prefix}'")
345-
continue
346-
new_key = new_prefix + key[len(old_prefix) :]
345+
for i in range(0, total, pipeline_size):
346+
batch = keys[i : i + pipeline_size]
347347

348-
# Collision check
349-
if client.exists(new_key):
350-
raise RuntimeError(
351-
f"Prefix rename aborted after {renamed} successful rename(s): "
352-
f"destination key '{new_key}' already exists. "
353-
f"Remove conflicting keys or choose a different prefix."
354-
)
348+
# Build (key, new_key) pairs for this batch
349+
pairs = []
350+
for key in batch:
351+
if not key.startswith(old_prefix):
352+
logger.warning(
353+
"Key '%s' does not start with prefix '%s'", key, old_prefix
354+
)
355+
continue
356+
new_key = new_prefix + key[len(old_prefix) :]
357+
pairs.append((key, new_key))
355358

356-
# DUMP → RESTORE → DEL (atomic per-key, cross-slot safe)
357-
dumped = client.dump(key)
358-
if dumped is None:
359-
logger.warning(f"Key '{key}' does not exist, skipping")
359+
if not pairs:
360360
continue
361-
ttl = int(client.pttl(key)) # type: ignore[arg-type]
362-
# pttl returns -1 (no expiry) or -2 (key missing)
363-
restore_ttl = max(ttl, 0)
364-
client.restore(new_key, restore_ttl, dumped, replace=False) # type: ignore[arg-type]
365-
client.delete(key)
366-
renamed += 1
367361

368-
if progress_callback and (idx + 1) % 100 == 0:
369-
progress_callback(idx + 1, total)
362+
# Phase 1: Check destination keys don't exist (batched)
363+
check_pipe = client.pipeline(transaction=False)
364+
for _, new_key in pairs:
365+
check_pipe.exists(new_key)
366+
exists_results = check_pipe.execute()
367+
for (_, new_key), exists in zip(pairs, exists_results):
368+
if exists:
369+
raise RuntimeError(
370+
f"Prefix rename aborted after {renamed} successful rename(s): "
371+
f"destination key '{new_key}' already exists. "
372+
f"Remove conflicting keys or choose a different prefix."
373+
)
374+
375+
# Phase 2: DUMP + PTTL all source keys (batched — 1 RTT)
376+
dump_pipe = client.pipeline(transaction=False)
377+
for key, _ in pairs:
378+
dump_pipe.dump(key)
379+
dump_pipe.pttl(key)
380+
dump_results = dump_pipe.execute()
381+
382+
# Phase 3: RESTORE + DEL (batched — 1 RTT)
383+
restore_pipe = client.pipeline(transaction=False)
384+
valid_pairs = []
385+
for idx, (key, new_key) in enumerate(pairs):
386+
dumped = dump_results[idx * 2]
387+
ttl = int(dump_results[idx * 2 + 1]) # type: ignore[arg-type]
388+
if dumped is None:
389+
logger.warning("Key '%s' does not exist, skipping", key)
390+
continue
391+
restore_ttl = max(ttl, 0)
392+
restore_pipe.restore(new_key, restore_ttl, dumped, replace=False) # type: ignore[arg-type]
393+
restore_pipe.delete(key)
394+
valid_pairs.append((key, new_key))
395+
396+
if valid_pairs:
397+
restore_pipe.execute()
398+
renamed += len(valid_pairs)
399+
400+
if progress_callback:
401+
progress_callback(min(i + pipeline_size, total), total)
370402

371403
if progress_callback:
372404
progress_callback(total, total)
@@ -630,6 +662,18 @@ def apply(
630662
)
631663
existing_backup = VectorBackup.load(backup_path)
632664

665+
# Fallback: probe for legacy backup filename (pre-hash naming)
666+
if existing_backup is None:
667+
legacy_path = str(Path(backup_dir) / f"migration_backup_{safe_name}")
668+
legacy_backup = VectorBackup.load(legacy_path)
669+
if legacy_backup is not None:
670+
logger.info(
671+
"Found legacy backup at %s (pre-hash naming), using it",
672+
legacy_path,
673+
)
674+
existing_backup = legacy_backup
675+
backup_path = legacy_path
676+
633677
if existing_backup is not None:
634678
if existing_backup.header.index_name != plan.source.index_name:
635679
logger.warning(

0 commit comments

Comments
 (0)