Skip to content

Commit 8cb1d4d

Browse files
committed
fix: code-rev round 7 - resume skips key renames, backup filename collisions
Critical fix: - Sync and async executor resume paths now perform key prefix renames after quantize (renames happen after drop in normal path, so they may not have completed before crash) Backup naming: - Add sha256[:8] hash of index_name to backup filenames to prevent collisions between distinct names that sanitize identically (e.g., 'a/b' and 'a:b' both become 'a_b' but have different hashes) - Applied to single-worker, multi-worker, and cleanup paths Accepted as-is: - --resume backward compat: intentional, clear error message - Rollback non-recursive: flat layout matches how backups are written Tests: 178 passed
1 parent ab801ad commit 8cb1d4d

3 files changed

Lines changed: 84 additions & 7 deletions

File tree

redisvl/migration/async_executor.py

Lines changed: 37 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
from __future__ import annotations
22

33
import asyncio
4+
import hashlib
45
import time
56
from pathlib import Path
67
from typing import TYPE_CHECKING, Any, AsyncGenerator, Callable, Dict, List, Optional
@@ -547,7 +548,10 @@ async def apply(
547548
.replace("\\", "_")
548549
.replace(":", "_")
549550
)
550-
backup_path = str(Path(backup_dir) / f"migration_backup_{safe_name}")
551+
name_hash = hashlib.sha256(plan.source.index_name.encode()).hexdigest()[:8]
552+
backup_path = str(
553+
Path(backup_dir) / f"migration_backup_{safe_name}_{name_hash}"
554+
)
551555
existing_backup = VectorBackup.load(backup_path)
552556

553557
if existing_backup is not None:
@@ -692,6 +696,36 @@ def _notify(step: str, detail: Optional[str] = None) -> None:
692696
"quantize",
693697
f"done ({docs_quantized:,} docs in {quantize_duration}s)",
694698
)
699+
700+
# Key prefix renames may not have happened before the crash
701+
# (they run after index drop in the normal path). Re-apply
702+
# idempotently.
703+
if has_prefix_change:
704+
resume_keys = []
705+
for batch_keys, _ in existing_backup.iter_batches():
706+
resume_keys.extend(batch_keys)
707+
if resume_keys:
708+
old_prefix = plan.source.keyspace.prefixes[0]
709+
new_prefix = rename_ops.change_prefix
710+
assert new_prefix is not None
711+
_notify("key_rename", "Renaming keys (resume)...")
712+
key_rename_started = time.perf_counter()
713+
renamed_count = await self._rename_keys(
714+
client,
715+
resume_keys,
716+
old_prefix,
717+
new_prefix,
718+
progress_callback=lambda done, total: _notify(
719+
"key_rename", f"{done:,}/{total:,} keys"
720+
),
721+
)
722+
key_rename_duration = round(
723+
time.perf_counter() - key_rename_started, 3
724+
)
725+
_notify(
726+
"key_rename",
727+
f"done ({renamed_count:,} keys in {key_rename_duration}s)",
728+
)
695729
else:
696730
# Normal (non-resume) path
697731
if needs_enumeration:
@@ -1057,7 +1091,8 @@ def _cleanup_backup_files(self, backup_dir: str, index_name: str) -> None:
10571091
that happen to share the same prefix.
10581092
"""
10591093
safe_name = index_name.replace("/", "_").replace("\\", "_").replace(":", "_")
1060-
base_prefix = f"migration_backup_{safe_name}"
1094+
name_hash = hashlib.sha256(index_name.encode()).hexdigest()[:8]
1095+
base_prefix = f"migration_backup_{safe_name}_{name_hash}"
10611096
known_suffixes = (".header", ".data")
10621097
backup_dir_path = Path(backup_dir)
10631098

redisvl/migration/executor.py

Lines changed: 42 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
from __future__ import annotations
22

3+
import hashlib
34
import time
45
from pathlib import Path
56
from typing import TYPE_CHECKING, Any, Callable, Dict, Generator, List, Optional
@@ -615,13 +616,18 @@ def apply(
615616
backup_path: Optional[str] = None
616617

617618
if backup_dir:
618-
# Sanitize index name for filesystem
619+
# Sanitize index name for filesystem with hash suffix to avoid
620+
# collisions between distinct names that sanitize identically
621+
# (e.g., "a/b" and "a:b" both become "a_b").
619622
safe_name = (
620623
plan.source.index_name.replace("/", "_")
621624
.replace("\\", "_")
622625
.replace(":", "_")
623626
)
624-
backup_path = str(Path(backup_dir) / f"migration_backup_{safe_name}")
627+
name_hash = hashlib.sha256(plan.source.index_name.encode()).hexdigest()[:8]
628+
backup_path = str(
629+
Path(backup_dir) / f"migration_backup_{safe_name}_{name_hash}"
630+
)
625631
existing_backup = VectorBackup.load(backup_path)
626632

627633
if existing_backup is not None:
@@ -793,6 +799,38 @@ def _notify(step: str, detail: Optional[str] = None) -> None:
793799
"quantize",
794800
f"done ({docs_quantized:,} docs in {quantize_duration}s)",
795801
)
802+
803+
# Key prefix renames may not have happened before the crash
804+
# (they run after index drop in the normal path). Re-apply
805+
# idempotently — RENAME is a no-op if old == new or key
806+
# was already renamed.
807+
if has_prefix_change:
808+
# Collect keys from backup to know what to rename
809+
resume_keys = []
810+
for batch_keys, _ in existing_backup.iter_batches():
811+
resume_keys.extend(batch_keys)
812+
if resume_keys:
813+
old_prefix = plan.source.keyspace.prefixes[0]
814+
new_prefix = rename_ops.change_prefix
815+
assert new_prefix is not None
816+
_notify("key_rename", "Renaming keys (resume)...")
817+
key_rename_started = time.perf_counter()
818+
renamed_count = self._rename_keys(
819+
client,
820+
resume_keys,
821+
old_prefix,
822+
new_prefix,
823+
progress_callback=lambda done, total: _notify(
824+
"key_rename", f"{done:,}/{total:,} keys"
825+
),
826+
)
827+
key_rename_duration = round(
828+
time.perf_counter() - key_rename_started, 3
829+
)
830+
_notify(
831+
"key_rename",
832+
f"done ({renamed_count:,} keys in {key_rename_duration}s)",
833+
)
796834
else:
797835
# Normal (non-resume) path
798836
# STEP 1: Enumerate keys BEFORE any modifications
@@ -1147,7 +1185,8 @@ def _cleanup_backup_files(self, backup_dir: str, index_name: str) -> None:
11471185
that happen to share the same prefix.
11481186
"""
11491187
safe_name = index_name.replace("/", "_").replace("\\", "_").replace(":", "_")
1150-
base_prefix = f"migration_backup_{safe_name}"
1188+
name_hash = hashlib.sha256(index_name.encode()).hexdigest()[:8]
1189+
base_prefix = f"migration_backup_{safe_name}_{name_hash}"
11511190
# Exact suffixes written by VectorBackup
11521191
known_suffixes = (".header", ".data")
11531192
backup_dir_path = Path(backup_dir)

redisvl/migration/quantize.py

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77
using ThreadPoolExecutor (sync) or asyncio.gather (async).
88
"""
99

10+
import hashlib
1011
import logging
1112
import math
1213
from concurrent.futures import ThreadPoolExecutor, as_completed
@@ -264,8 +265,9 @@ def multi_worker_quantize(
264265

265266
# Generate backup paths per worker
266267
safe_name = index_name.replace("/", "_").replace("\\", "_").replace(":", "_")
268+
name_hash = hashlib.sha256(index_name.encode()).hexdigest()[:8]
267269
worker_backup_paths = [
268-
str(Path(backup_dir) / f"migration_backup_{safe_name}_worker{i}")
270+
str(Path(backup_dir) / f"migration_backup_{safe_name}_{name_hash}_worker{i}")
269271
for i in range(actual_workers)
270272
]
271273

@@ -441,8 +443,9 @@ async def async_multi_worker_quantize(
441443
)
442444

443445
safe_name = index_name.replace("/", "_").replace("\\", "_").replace(":", "_")
446+
name_hash = hashlib.sha256(index_name.encode()).hexdigest()[:8]
444447
worker_backup_paths = [
445-
str(Path(backup_dir) / f"migration_backup_{safe_name}_worker{i}")
448+
str(Path(backup_dir) / f"migration_backup_{safe_name}_{name_hash}_worker{i}")
446449
for i in range(actual_workers)
447450
]
448451

0 commit comments

Comments
 (0)