Skip to content

Commit 8b9ea99

Browse files
committed
fix: nkode round 3 — rollback index filter, --resume validation, worker guards, overcount
- Rollback CLI: add --index filter to scope restore to specific index - Rollback CLI: remove unused --batch-size flag - --resume: fail fast if value looks like a checkpoint file (old semantics) - --workers > 1: enforce --backup-dir at CLI level - Direct quantize overcount: count len(converted) not len(batch_keys) (sync+async)
1 parent f775476 commit 8b9ea99

3 files changed

Lines changed: 54 additions & 14 deletions

File tree

redisvl/cli/migrate.py

Lines changed: 52 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -283,6 +283,15 @@ def apply(self):
283283
if args.legacy_resume is not None:
284284
import warnings
285285

286+
# Fail fast if the value looks like a checkpoint file (old semantics)
287+
if os.path.isfile(args.legacy_resume) or args.legacy_resume.endswith(
288+
(".yaml", ".yml")
289+
):
290+
parser.error(
291+
"--resume semantics have changed: it now expects a backup "
292+
"directory, not a checkpoint file. Use --backup-dir <dir> instead."
293+
)
294+
286295
warnings.warn(
287296
"--resume is deprecated and will be removed in a future version. "
288297
"Use --backup-dir instead: the backup directory replaces "
@@ -291,9 +300,12 @@ def apply(self):
291300
stacklevel=1,
292301
)
293302
if args.backup_dir is None:
294-
# Treat --resume value as a backup directory
295303
args.backup_dir = args.legacy_resume
296304

305+
# Validate --workers > 1 requires --backup-dir
306+
if args.num_workers > 1 and args.backup_dir is None:
307+
parser.error("--workers > 1 requires --backup-dir")
308+
297309
redis_url = create_redis_url(args)
298310
plan = load_migration_plan(args.plan)
299311

@@ -362,7 +374,7 @@ def rollback(self):
362374
parser = argparse.ArgumentParser(
363375
usage=(
364376
"rvl migrate rollback --backup-dir <dir> "
365-
"[--batch-size N] [--url <redis_url>]"
377+
"[--index <name>] [--url <redis_url>]"
366378
)
367379
)
368380
parser.add_argument(
@@ -372,11 +384,10 @@ def rollback(self):
372384
required=True,
373385
)
374386
parser.add_argument(
375-
"--batch-size",
376-
dest="batch_size",
377-
type=int,
378-
help="Keys per pipeline batch when restoring (default 500)",
379-
default=500,
387+
"--index",
388+
dest="index_name",
389+
help="Only restore backups for this index name (filters by backup header)",
390+
default=None,
380391
)
381392
parser = add_redis_connection_options(parser)
382393
args = parser.parse_args(sys.argv[3:])
@@ -401,14 +412,43 @@ def rollback(self):
401412
# Derive backup base paths (strip .header suffix)
402413
backup_paths = [str(h.with_suffix("")) for h in header_files]
403414

415+
# Load and filter backups
416+
backups_to_restore = []
417+
for bp in backup_paths:
418+
backup = VectorBackup.load(bp)
419+
if backup is None:
420+
print(f" Skipping {bp}: could not load backup")
421+
continue
422+
if args.index_name and backup.header.index_name != args.index_name:
423+
print(
424+
f" Skipping {os.path.basename(bp)}: "
425+
f"index '{backup.header.index_name}' != '{args.index_name}'"
426+
)
427+
continue
428+
backups_to_restore.append((bp, backup))
429+
430+
if not backups_to_restore:
431+
print("Error: no matching backup files found")
432+
sys.exit(1)
433+
434+
# Warn if multiple distinct indexes detected without --index filter
435+
distinct_indexes = {b.header.index_name for _, b in backups_to_restore}
436+
if len(distinct_indexes) > 1 and not args.index_name:
437+
print(
438+
f"Warning: found backups for {len(distinct_indexes)} distinct indexes: "
439+
f"{', '.join(sorted(distinct_indexes))}. "
440+
f"Use --index to filter, or press Enter to continue."
441+
)
442+
try:
443+
input()
444+
except (EOFError, KeyboardInterrupt):
445+
print("\nAborted.")
446+
sys.exit(1)
447+
404448
client = RedisConnectionFactory.get_redis_connection(redis_url=redis_url)
405449
total_restored = 0
406450
try:
407-
for bp in backup_paths:
408-
backup = VectorBackup.load(bp)
409-
if backup is None:
410-
print(f" Skipping {bp}: could not load backup")
411-
continue
451+
for bp, backup in backups_to_restore:
412452
print(
413453
f"Restoring from: {os.path.basename(bp)} "
414454
f"(index={backup.header.index_name}, "

redisvl/migration/async_executor.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -905,7 +905,7 @@ def _notify(step: str, detail: Optional[str] = None) -> None:
905905
for fn, data in fields.items():
906906
wpipe.hset(key, fn, data)
907907
await wpipe.execute()
908-
docs_quantized += len(batch_keys)
908+
docs_quantized += len(converted) if converted else 0
909909
if progress_callback:
910910
_notify(
911911
"quantize",

redisvl/migration/executor.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -997,7 +997,7 @@ def _notify(step: str, detail: Optional[str] = None) -> None:
997997
converted = convert_vectors(originals, effective_changes)
998998
if converted:
999999
pipeline_write_vectors(client, converted)
1000-
docs_quantized += len(batch_keys)
1000+
docs_quantized += len(converted) if converted else 0
10011001
if progress_callback:
10021002
_notify(
10031003
"quantize",

0 commit comments

Comments
 (0)