Skip to content

Commit 04903fb

Browse files
committed
fix: nkode round 4 — rollback safety, cleanup precision, CI-safe rollback
Findings addressed: - Rollback gates on backup phase: refuses incomplete (phase='dump') backups unless --force is passed - Rollback requires --index or --yes when multiple indexes detected (no interactive input() prompt that blocks CI) - Backup cleanup uses exact .header/.data extensions and boundary check (prevents deleting unrelated files with shared prefix) - Async/sync direct quantize counts len(converted) not len(batch_keys) Tests added: - test_rollback_skips_incomplete_backup_phase - test_rollback_index_filter - test_rollback_multi_index_requires_flag - test_cleanup_only_removes_known_extensions - test_cleanup_does_not_match_similar_prefix
1 parent 8b9ea99 commit 04903fb

4 files changed

Lines changed: 212 additions & 25 deletions

File tree

redisvl/cli/migrate.py

Lines changed: 40 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -369,12 +369,15 @@ def estimate(self):
369369
disk_estimate = estimate_disk_space(plan, aof_enabled=args.aof_enabled)
370370
print(disk_estimate.summary())
371371

372+
# Phases that indicate a safe/complete backup for rollback
373+
_SAFE_ROLLBACK_PHASES = frozenset({"ready", "active", "completed"})
374+
372375
def rollback(self):
373376
"""Restore original vectors from a backup directory (undo quantization)."""
374377
parser = argparse.ArgumentParser(
375378
usage=(
376379
"rvl migrate rollback --backup-dir <dir> "
377-
"[--index <name>] [--url <redis_url>]"
380+
"[--index <name>] [--yes] [--force] [--url <redis_url>]"
378381
)
379382
)
380383
parser.add_argument(
@@ -389,6 +392,21 @@ def rollback(self):
389392
help="Only restore backups for this index name (filters by backup header)",
390393
default=None,
391394
)
395+
parser.add_argument(
396+
"--yes",
397+
"-y",
398+
dest="yes",
399+
action="store_true",
400+
help="Skip confirmation prompt for multi-index rollback",
401+
default=False,
402+
)
403+
parser.add_argument(
404+
"--force",
405+
dest="force",
406+
action="store_true",
407+
help="Proceed even if backup phase indicates incomplete dump",
408+
default=False,
409+
)
392410
parser = add_redis_connection_options(parser)
393411
args = parser.parse_args(sys.argv[3:])
394412

@@ -412,7 +430,7 @@ def rollback(self):
412430
# Derive backup base paths (strip .header suffix)
413431
backup_paths = [str(h.with_suffix("")) for h in header_files]
414432

415-
# Load and filter backups
433+
# Load, filter, and validate backups
416434
backups_to_restore = []
417435
for bp in backup_paths:
418436
backup = VectorBackup.load(bp)
@@ -425,25 +443,36 @@ def rollback(self):
425443
f"index '{backup.header.index_name}' != '{args.index_name}'"
426444
)
427445
continue
446+
# Gate on backup phase — refuse incomplete backups unless --force
447+
if backup.header.phase not in self._SAFE_ROLLBACK_PHASES:
448+
if args.force:
449+
print(
450+
f" Warning: {os.path.basename(bp)} has phase "
451+
f"'{backup.header.phase}' (incomplete dump) — "
452+
f"proceeding due to --force"
453+
)
454+
else:
455+
print(
456+
f" Skipping {os.path.basename(bp)}: backup phase "
457+
f"'{backup.header.phase}' indicates incomplete dump. "
458+
f"Use --force to restore from partial backups."
459+
)
460+
continue
428461
backups_to_restore.append((bp, backup))
429462

430463
if not backups_to_restore:
431464
print("Error: no matching backup files found")
432465
sys.exit(1)
433466

434-
# Warn if multiple distinct indexes detected without --index filter
467+
# Require --index or --yes when multiple distinct indexes detected
435468
distinct_indexes = {b.header.index_name for _, b in backups_to_restore}
436-
if len(distinct_indexes) > 1 and not args.index_name:
469+
if len(distinct_indexes) > 1 and not args.index_name and not args.yes:
437470
print(
438-
f"Warning: found backups for {len(distinct_indexes)} distinct indexes: "
471+
f"Error: found backups for {len(distinct_indexes)} distinct indexes: "
439472
f"{', '.join(sorted(distinct_indexes))}. "
440-
f"Use --index to filter, or press Enter to continue."
473+
f"Use --index to filter or --yes to restore all."
441474
)
442-
try:
443-
input()
444-
except (EOFError, KeyboardInterrupt):
445-
print("\nAborted.")
446-
sys.exit(1)
475+
sys.exit(1)
447476

448477
client = RedisConnectionFactory.get_redis_connection(redis_url=redis_url)
449478
total_restored = 0

redisvl/migration/async_executor.py

Lines changed: 23 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1036,17 +1036,33 @@ def _index_progress(indexed: int, total: int, pct: float) -> None:
10361036
return report
10371037

10381038
def _cleanup_backup_files(self, backup_dir: str, index_name: str) -> None:
1039-
"""Remove backup files after successful migration."""
1040-
import glob
1039+
"""Remove backup files after successful migration.
10411040
1041+
Only removes files with the exact extensions produced by VectorBackup
1042+
(.header and .data), avoiding accidental deletion of unrelated files
1043+
that happen to share the same prefix.
1044+
"""
10421045
safe_name = index_name.replace("/", "_").replace("\\", "_").replace(":", "_")
1043-
pattern = str(Path(backup_dir) / f"migration_backup_{safe_name}*")
1044-
for f in glob.glob(pattern):
1046+
base_prefix = f"migration_backup_{safe_name}"
1047+
known_suffixes = (".header", ".data")
1048+
backup_dir_path = Path(backup_dir)
1049+
1050+
for entry in backup_dir_path.iterdir():
1051+
if not entry.is_file():
1052+
continue
1053+
name = entry.name
1054+
if not name.startswith(base_prefix):
1055+
continue
1056+
if not any(name.endswith(s) for s in known_suffixes):
1057+
continue
1058+
remainder = name[len(base_prefix) :]
1059+
if remainder and remainder[0] not in (".", "_"):
1060+
continue
10451061
try:
1046-
Path(f).unlink()
1047-
logger.debug("Removed backup file: %s", f)
1062+
entry.unlink()
1063+
logger.debug("Removed backup file: %s", entry)
10481064
except OSError as e:
1049-
logger.warning("Failed to remove backup file %s: %s", f, e)
1065+
logger.warning("Failed to remove backup file %s: %s", entry, e)
10501066

10511067
# ------------------------------------------------------------------
10521068
# Two-phase quantization: dump originals → convert from backup

redisvl/migration/executor.py

Lines changed: 30 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1126,17 +1126,40 @@ def _index_progress(indexed: int, total: int, pct: float) -> None:
11261126
return report
11271127

11281128
def _cleanup_backup_files(self, backup_dir: str, index_name: str) -> None:
1129-
"""Remove backup files after successful migration."""
1130-
import glob
1129+
"""Remove backup files after successful migration.
11311130
1131+
Only removes files with the exact extensions produced by VectorBackup
1132+
(.header and .data), avoiding accidental deletion of unrelated files
1133+
that happen to share the same prefix.
1134+
"""
11321135
safe_name = index_name.replace("/", "_").replace("\\", "_").replace(":", "_")
1133-
pattern = str(Path(backup_dir) / f"migration_backup_{safe_name}*")
1134-
for f in glob.glob(pattern):
1136+
base_prefix = f"migration_backup_{safe_name}"
1137+
# Exact suffixes written by VectorBackup
1138+
known_suffixes = (".header", ".data")
1139+
backup_dir_path = Path(backup_dir)
1140+
1141+
for entry in backup_dir_path.iterdir():
1142+
if not entry.is_file():
1143+
continue
1144+
name = entry.name
1145+
# Match: base_prefix exactly, or base_prefix + shard suffix
1146+
# e.g., migration_backup_myidx.header
1147+
# migration_backup_myidx_shard_0.header
1148+
if not name.startswith(base_prefix):
1149+
continue
1150+
# Check that the file ends with a known extension
1151+
if not any(name.endswith(s) for s in known_suffixes):
1152+
continue
1153+
# Verify the character after the prefix is either a dot or underscore
1154+
# (prevents matching migration_backup_myidx2.header)
1155+
remainder = name[len(base_prefix) :]
1156+
if remainder and remainder[0] not in (".", "_"):
1157+
continue
11351158
try:
1136-
Path(f).unlink()
1137-
logger.debug("Removed backup file: %s", f)
1159+
entry.unlink()
1160+
logger.debug("Removed backup file: %s", entry)
11381161
except OSError as e:
1139-
logger.warning("Failed to remove backup file %s: %s", f, e)
1162+
logger.warning("Failed to remove backup file %s: %s", entry, e)
11401163

11411164
# ------------------------------------------------------------------
11421165
# Two-phase quantization: dump originals → convert from backup

tests/unit/test_vector_backup.py

Lines changed: 119 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -428,3 +428,122 @@ def test_rollback_unloadable_backup_returns_none(self, tmp_path):
428428
bp = str(tmp_path / "bad_backup")
429429
result = VectorBackup.load(bp)
430430
assert result is None
431+
432+
def test_rollback_skips_incomplete_backup_phase(self, tmp_path):
433+
"""Backups in 'dump' phase should be skipped without --force."""
434+
from redisvl.migration.backup import VectorBackup
435+
436+
bp = str(tmp_path / "migration_backup_partial")
437+
backup = VectorBackup.create(
438+
path=bp,
439+
index_name="partial_idx",
440+
fields={"embedding": {"source": "float32", "target": "float16", "dims": 4}},
441+
batch_size=1,
442+
)
443+
# Write one batch but don't mark dump complete — phase stays "dump"
444+
backup.write_batch(0, ["doc:0"], {"doc:0": {"embedding": b"\x00" * 16}})
445+
# Phase is "dump" — not in safe rollback phases
446+
assert backup.header.phase == "dump"
447+
safe_phases = frozenset({"ready", "active", "completed"})
448+
assert backup.header.phase not in safe_phases
449+
450+
def test_rollback_index_filter(self, tmp_path):
451+
"""--index filter should match only backups for the specified index."""
452+
self._create_backup_with_data(tmp_path, name="idx_a")
453+
self._create_backup_with_data(tmp_path, name="idx_b")
454+
455+
from pathlib import Path
456+
457+
from redisvl.migration.backup import VectorBackup
458+
459+
header_files = sorted(Path(tmp_path).glob("*.header"))
460+
assert len(header_files) == 2
461+
462+
# Filter for idx_a only
463+
backup_paths = [str(h.with_suffix("")) for h in header_files]
464+
filtered = []
465+
for bp in backup_paths:
466+
backup = VectorBackup.load(bp)
467+
if backup and backup.header.index_name == "idx_a":
468+
filtered.append(bp)
469+
assert len(filtered) == 1
470+
assert "idx_a" in filtered[0]
471+
472+
def test_rollback_multi_index_requires_flag(self, tmp_path):
473+
"""Multiple distinct indexes should require --index or --yes."""
474+
self._create_backup_with_data(tmp_path, name="idx_a")
475+
self._create_backup_with_data(tmp_path, name="idx_b")
476+
477+
from pathlib import Path
478+
479+
from redisvl.migration.backup import VectorBackup
480+
481+
header_files = sorted(Path(tmp_path).glob("*.header"))
482+
backup_paths = [str(h.with_suffix("")) for h in header_files]
483+
backups = []
484+
for bp in backup_paths:
485+
backup = VectorBackup.load(bp)
486+
if backup:
487+
backups.append(backup)
488+
distinct = {b.header.index_name for b in backups}
489+
assert len(distinct) > 1 # Multi-index — should require --index or --yes
490+
491+
492+
class TestBackupCleanup:
493+
"""Tests for tightened backup file cleanup."""
494+
495+
def test_cleanup_only_removes_known_extensions(self, tmp_path):
496+
"""Cleanup should only remove .header and .data files."""
497+
# Create files with various extensions
498+
(tmp_path / "migration_backup_test.header").touch()
499+
(tmp_path / "migration_backup_test.data").touch()
500+
(tmp_path / "migration_backup_test.log").touch() # should NOT be deleted
501+
(tmp_path / "migration_backup_test_shard_0.header").touch()
502+
(tmp_path / "migration_backup_test_shard_0.data").touch()
503+
(tmp_path / "unrelated_file.txt").touch() # should NOT be deleted
504+
505+
# Simulate the cleanup logic
506+
base_prefix = "migration_backup_test"
507+
known_suffixes = (".header", ".data")
508+
deleted = []
509+
for entry in tmp_path.iterdir():
510+
if not entry.is_file():
511+
continue
512+
name = entry.name
513+
if not name.startswith(base_prefix):
514+
continue
515+
if not any(name.endswith(s) for s in known_suffixes):
516+
continue
517+
remainder = name[len(base_prefix) :]
518+
if remainder and remainder[0] not in (".", "_"):
519+
continue
520+
deleted.append(name)
521+
522+
assert "migration_backup_test.header" in deleted
523+
assert "migration_backup_test.data" in deleted
524+
assert "migration_backup_test_shard_0.header" in deleted
525+
assert "migration_backup_test_shard_0.data" in deleted
526+
assert "migration_backup_test.log" not in deleted
527+
assert "unrelated_file.txt" not in deleted
528+
529+
def test_cleanup_does_not_match_similar_prefix(self, tmp_path):
530+
"""migration_backup_foo should not match migration_backup_foobar."""
531+
(tmp_path / "migration_backup_foo.header").touch()
532+
(tmp_path / "migration_backup_foobar.header").touch()
533+
534+
base_prefix = "migration_backup_foo"
535+
known_suffixes = (".header", ".data")
536+
deleted = []
537+
for entry in tmp_path.iterdir():
538+
name = entry.name
539+
if not name.startswith(base_prefix):
540+
continue
541+
if not any(name.endswith(s) for s in known_suffixes):
542+
continue
543+
remainder = name[len(base_prefix) :]
544+
if remainder and remainder[0] not in (".", "_"):
545+
continue
546+
deleted.append(name)
547+
548+
assert "migration_backup_foo.header" in deleted
549+
assert "migration_backup_foobar.header" not in deleted

0 commit comments

Comments
 (0)