Skip to content

Commit 9a5ae78

Browse files
committed
feat(migrate): add batch migration planner, executor, and CLI subcommands
- batch_planner.py: multi-index plan generation with pattern/list support - batch_executor.py: checkpointed batch execution with resume capability - CLI: batch-plan, batch-apply, batch-resume, batch-status subcommands - 32 unit tests for batch migration logic
1 parent cc2d1d3 commit 9a5ae78

6 files changed

Lines changed: 2806 additions & 0 deletions

File tree

redisvl/cli/migrate.py

Lines changed: 271 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,14 @@
11
import argparse
22
import asyncio
33
import sys
4+
from pathlib import Path
45
from typing import Optional
56

67
from redisvl.cli.utils import add_redis_connection_options, create_redis_url
78
from redisvl.migration import (
89
AsyncMigrationExecutor,
10+
BatchMigrationExecutor,
11+
BatchMigrationPlanner,
912
MigrationExecutor,
1013
MigrationPlanner,
1114
MigrationValidator,
@@ -16,6 +19,7 @@
1619
estimate_disk_space,
1720
list_indexes,
1821
load_migration_plan,
22+
load_yaml,
1923
write_benchmark_report,
2024
write_migration_report,
2125
)
@@ -37,6 +41,10 @@ class Migrate:
3741
"\tapply Execute a reviewed drop/recreate migration plan (use --async for large migrations)",
3842
"\testimate Estimate disk space required for a migration plan (dry-run, no mutations)",
3943
"\tvalidate Validate a completed migration plan against the live index",
44+
"\tbatch-plan Generate a batch migration plan for multiple indexes",
45+
"\tbatch-apply Execute a batch migration plan with checkpointing",
46+
"\tbatch-resume Resume an interrupted batch migration",
47+
"\tbatch-status Show status of an in-progress or completed batch migration",
4048
"\n",
4149
]
4250
)
@@ -495,3 +503,266 @@ def _print_report_summary(
495503
print(f"- {action}")
496504
if benchmark_out:
497505
print(f"Benchmark report written to {benchmark_out}")
506+
507+
def batch_plan(self):
508+
"""Generate a batch migration plan for multiple indexes."""
509+
parser = argparse.ArgumentParser(
510+
usage=(
511+
"rvl migrate batch-plan --schema-patch <patch.yaml> "
512+
"(--pattern <glob> | --indexes <name1,name2> | --indexes-file <file>)"
513+
)
514+
)
515+
parser.add_argument(
516+
"--schema-patch", help="Path to shared schema patch file", required=True
517+
)
518+
parser.add_argument(
519+
"--pattern", help="Glob pattern to match index names (e.g., '*_idx')"
520+
)
521+
parser.add_argument("--indexes", help="Comma-separated list of index names")
522+
parser.add_argument(
523+
"--indexes-file", help="File with index names (one per line)"
524+
)
525+
parser.add_argument(
526+
"--failure-policy",
527+
help="How to handle failures: fail_fast or continue_on_error",
528+
choices=["fail_fast", "continue_on_error"],
529+
default="fail_fast",
530+
)
531+
parser.add_argument(
532+
"--plan-out",
533+
help="Path to write batch_plan.yaml",
534+
default="batch_plan.yaml",
535+
)
536+
parser = add_redis_connection_options(parser)
537+
args = parser.parse_args(sys.argv[3:])
538+
539+
redis_url = create_redis_url(args)
540+
indexes = (
541+
[idx.strip() for idx in args.indexes.split(",") if idx.strip()]
542+
if args.indexes
543+
else None
544+
)
545+
546+
planner = BatchMigrationPlanner()
547+
batch_plan = planner.create_batch_plan(
548+
indexes=indexes,
549+
pattern=args.pattern,
550+
indexes_file=args.indexes_file,
551+
schema_patch_path=args.schema_patch,
552+
redis_url=redis_url,
553+
failure_policy=args.failure_policy,
554+
)
555+
556+
planner.write_batch_plan(batch_plan, args.plan_out)
557+
self._print_batch_plan_summary(args.plan_out, batch_plan)
558+
559+
def batch_apply(self):
560+
"""Execute a batch migration plan with checkpointing."""
561+
parser = argparse.ArgumentParser(
562+
usage=(
563+
"rvl migrate batch-apply --plan <batch_plan.yaml> "
564+
"[--state <batch_state.yaml>] [--report-dir <./reports>]"
565+
)
566+
)
567+
parser.add_argument("--plan", help="Path to batch_plan.yaml", required=True)
568+
parser.add_argument(
569+
"--accept-data-loss",
570+
help="Acknowledge that quantization is lossy and cannot be reverted",
571+
action="store_true",
572+
)
573+
parser.add_argument(
574+
"--state",
575+
help="Path to checkpoint state file",
576+
default="batch_state.yaml",
577+
)
578+
parser.add_argument(
579+
"--report-dir",
580+
help="Directory for per-index migration reports",
581+
default="./reports",
582+
)
583+
parser = add_redis_connection_options(parser)
584+
args = parser.parse_args(sys.argv[3:])
585+
586+
from redisvl.migration.models import BatchPlan
587+
588+
plan_data = load_yaml(args.plan)
589+
batch_plan = BatchPlan.model_validate(plan_data)
590+
591+
if batch_plan.requires_quantization and not args.accept_data_loss:
592+
print(
593+
"""WARNING: This batch migration includes quantization (e.g., float32 -> float16).
594+
Vector data will be modified. Original precision cannot be recovered.
595+
To proceed, add --accept-data-loss flag.
596+
597+
If you need to preserve original vectors, backup your data first:
598+
redis-cli BGSAVE"""
599+
)
600+
exit(1)
601+
602+
redis_url = create_redis_url(args)
603+
executor = BatchMigrationExecutor()
604+
605+
def progress_callback(
606+
index_name: str, position: int, total: int, status: str
607+
) -> None:
608+
print(f"[{position}/{total}] {index_name}: {status}")
609+
610+
report = executor.apply(
611+
batch_plan,
612+
batch_plan_path=args.plan,
613+
state_path=args.state,
614+
report_dir=args.report_dir,
615+
redis_url=redis_url,
616+
progress_callback=progress_callback,
617+
)
618+
619+
self._print_batch_report_summary(report)
620+
621+
def batch_resume(self):
622+
"""Resume an interrupted batch migration."""
623+
parser = argparse.ArgumentParser(
624+
usage=(
625+
"rvl migrate batch-resume --state <batch_state.yaml> "
626+
"[--plan <batch_plan.yaml>] [--retry-failed]"
627+
)
628+
)
629+
parser.add_argument(
630+
"--state", help="Path to checkpoint state file", required=True
631+
)
632+
parser.add_argument(
633+
"--plan", help="Path to batch_plan.yaml (optional, uses state.plan_path)"
634+
)
635+
parser.add_argument(
636+
"--retry-failed",
637+
help="Retry previously failed indexes",
638+
action="store_true",
639+
)
640+
parser.add_argument(
641+
"--report-dir",
642+
help="Directory for per-index migration reports",
643+
default="./reports",
644+
)
645+
parser = add_redis_connection_options(parser)
646+
args = parser.parse_args(sys.argv[3:])
647+
648+
redis_url = create_redis_url(args)
649+
executor = BatchMigrationExecutor()
650+
651+
def progress_callback(
652+
index_name: str, position: int, total: int, status: str
653+
) -> None:
654+
print(f"[{position}/{total}] {index_name}: {status}")
655+
656+
report = executor.resume(
657+
args.state,
658+
batch_plan_path=args.plan,
659+
retry_failed=args.retry_failed,
660+
report_dir=args.report_dir,
661+
redis_url=redis_url,
662+
progress_callback=progress_callback,
663+
)
664+
665+
self._print_batch_report_summary(report)
666+
667+
def batch_status(self):
668+
"""Show status of an in-progress or completed batch migration."""
669+
parser = argparse.ArgumentParser(
670+
usage="rvl migrate batch-status --state <batch_state.yaml>"
671+
)
672+
parser.add_argument(
673+
"--state", help="Path to checkpoint state file", required=True
674+
)
675+
args = parser.parse_args(sys.argv[3:])
676+
677+
state_path = Path(args.state).resolve()
678+
if not state_path.exists():
679+
print(f"State file not found: {args.state}")
680+
exit(1)
681+
682+
from redisvl.migration.models import BatchState
683+
684+
state_data = load_yaml(args.state)
685+
state = BatchState.model_validate(state_data)
686+
687+
print(
688+
f"""Batch ID: {state.batch_id}
689+
Started at: {state.started_at}
690+
Updated at: {state.updated_at}
691+
Current index: {state.current_index or '(none)'}
692+
Remaining: {len(state.remaining)}
693+
Completed: {len(state.completed)}
694+
- Succeeded: {state.success_count}
695+
- Failed: {state.failed_count}
696+
- Skipped: {state.skipped_count}"""
697+
)
698+
699+
if state.completed:
700+
print("\nCompleted indexes:")
701+
for idx in state.completed:
702+
if idx.status == "success":
703+
status_icon = "[OK]"
704+
elif idx.status == "skipped":
705+
status_icon = "[SKIP]"
706+
else:
707+
status_icon = "[FAIL]"
708+
print(f" {status_icon} {idx.name}")
709+
if idx.error:
710+
print(f" Error: {idx.error}")
711+
712+
if state.remaining:
713+
print(f"\nRemaining indexes ({len(state.remaining)}):")
714+
for name in state.remaining[:10]:
715+
print(f" - {name}")
716+
if len(state.remaining) > 10:
717+
print(f" ... and {len(state.remaining) - 10} more")
718+
719+
def _print_batch_plan_summary(self, plan_out: str, batch_plan) -> None:
720+
"""Print summary after generating batch plan."""
721+
import os
722+
723+
abs_path = os.path.abspath(plan_out)
724+
print(
725+
f"""Batch plan written to {abs_path}
726+
Batch ID: {batch_plan.batch_id}
727+
Mode: {batch_plan.mode}
728+
Failure policy: {batch_plan.failure_policy}
729+
Requires quantization: {batch_plan.requires_quantization}
730+
Total indexes: {len(batch_plan.indexes)}
731+
- Applicable: {batch_plan.applicable_count}
732+
- Skipped: {batch_plan.skipped_count}"""
733+
)
734+
735+
if batch_plan.skipped_count > 0:
736+
print("\nSkipped indexes:")
737+
for idx in batch_plan.indexes:
738+
if not idx.applicable:
739+
print(f" - {idx.name}: {idx.skip_reason}")
740+
741+
print(
742+
f"""
743+
Next steps:
744+
Review the plan: cat {plan_out}
745+
Apply the migration: rvl migrate batch-apply --plan {plan_out}"""
746+
)
747+
748+
if batch_plan.requires_quantization:
749+
print(" (add --accept-data-loss for quantization)")
750+
751+
def _print_batch_report_summary(self, report) -> None:
752+
"""Print summary after batch migration completes."""
753+
print(
754+
f"""
755+
Batch migration {report.status}
756+
Batch ID: {report.batch_id}
757+
Duration: {report.summary.total_duration_seconds}s
758+
Total: {report.summary.total_indexes}
759+
- Succeeded: {report.summary.successful}
760+
- Failed: {report.summary.failed}
761+
- Skipped: {report.summary.skipped}"""
762+
)
763+
764+
if report.summary.failed > 0:
765+
print("\nFailed indexes:")
766+
for idx in report.indexes:
767+
if idx.status == "failed":
768+
print(f" - {idx.name}: {idx.error}")

redisvl/migration/__init__.py

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,10 @@
11
from redisvl.migration.async_executor import AsyncMigrationExecutor
22
from redisvl.migration.async_planner import AsyncMigrationPlanner
33
from redisvl.migration.async_validation import AsyncMigrationValidator
4+
from redisvl.migration.batch_executor import BatchMigrationExecutor
5+
from redisvl.migration.batch_planner import BatchMigrationPlanner
46
from redisvl.migration.executor import MigrationExecutor
7+
from redisvl.migration.models import BatchPlan, BatchState, SchemaPatch
58
from redisvl.migration.planner import MigrationPlanner
69
from redisvl.migration.validation import MigrationValidator
710
from redisvl.migration.wizard import MigrationWizard
@@ -10,8 +13,13 @@
1013
"AsyncMigrationExecutor",
1114
"AsyncMigrationPlanner",
1215
"AsyncMigrationValidator",
16+
"BatchMigrationExecutor",
17+
"BatchMigrationPlanner",
18+
"BatchPlan",
19+
"BatchState",
1320
"MigrationExecutor",
1421
"MigrationPlanner",
1522
"MigrationValidator",
1623
"MigrationWizard",
24+
"SchemaPatch",
1725
]

0 commit comments

Comments
 (0)