Skip to content

Commit cc2d1d3

Browse files
committed
feat(migrate): [3/6] async executor, planner, validator with --async CLI flag
Adds non-blocking async migration support: - async_executor.py: AsyncMigrationExecutor with async apply, BGSAVE, quantization - async_planner.py: AsyncMigrationPlanner with async create_plan - async_validation.py: AsyncMigrationValidator with async validate - async_utils.py: async Redis helpers - cli/migrate.py: adds --async flag to 'apply' subcommand - Unit tests for async executor and planner
1 parent d634016 commit cc2d1d3

9 files changed

Lines changed: 3335 additions & 5 deletions

File tree

redisvl/cli/migrate.py

Lines changed: 43 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,11 @@
11
import argparse
2+
import asyncio
23
import sys
34
from typing import Optional
45

56
from redisvl.cli.utils import add_redis_connection_options, create_redis_url
67
from redisvl.migration import (
8+
AsyncMigrationExecutor,
79
MigrationExecutor,
810
MigrationPlanner,
911
MigrationValidator,
@@ -32,7 +34,7 @@ class Migrate:
3234
"\tlist List all available indexes",
3335
"\twizard Interactively build a migration plan and schema patch",
3436
"\tplan Generate a migration plan for a document-preserving drop/recreate migration",
35-
"\tapply Execute a reviewed drop/recreate migration plan",
37+
"\tapply Execute a reviewed drop/recreate migration plan (use --async for large migrations)",
3638
"\testimate Estimate disk space required for a migration plan (dry-run, no mutations)",
3739
"\tvalidate Validate a completed migration plan against the live index",
3840
"\n",
@@ -199,11 +201,17 @@ def apply(self):
199201
parser = argparse.ArgumentParser(
200202
usage=(
201203
"rvl migrate apply --plan <migration_plan.yaml> "
202-
"[--resume <checkpoint.yaml>] "
204+
"[--async] [--resume <checkpoint.yaml>] "
203205
"[--report-out <migration_report.yaml>]"
204206
)
205207
)
206208
parser.add_argument("--plan", help="Path to migration_plan.yaml", required=True)
209+
parser.add_argument(
210+
"--async",
211+
dest="use_async",
212+
help="Use async executor (recommended for large migrations with quantization)",
213+
action="store_true",
214+
)
207215
parser.add_argument(
208216
"--resume",
209217
dest="checkpoint_path",
@@ -246,9 +254,16 @@ def apply(self):
246254
if disk_estimate.has_quantization:
247255
print(f"\n{disk_estimate.summary()}\n")
248256

249-
report = self._apply_sync(
250-
plan, redis_url, args.query_check_file, args.checkpoint_path
251-
)
257+
if args.use_async:
258+
report = asyncio.run(
259+
self._apply_async(
260+
plan, redis_url, args.query_check_file, args.checkpoint_path
261+
)
262+
)
263+
else:
264+
report = self._apply_sync(
265+
plan, redis_url, args.query_check_file, args.checkpoint_path
266+
)
252267

253268
write_migration_report(report, args.report_out)
254269
if args.benchmark_out:
@@ -319,6 +334,29 @@ def _apply_sync(
319334
self._print_apply_result(report)
320335
return report
321336

337+
async def _apply_async(
338+
self,
339+
plan,
340+
redis_url: str,
341+
query_check_file: Optional[str],
342+
checkpoint_path: Optional[str] = None,
343+
):
344+
"""Execute migration asynchronously (non-blocking for large quantization jobs)."""
345+
executor = AsyncMigrationExecutor()
346+
347+
print(f"\nApplying migration to '{plan.source.index_name}' (async mode)...")
348+
349+
report = await executor.apply(
350+
plan,
351+
redis_url=redis_url,
352+
query_check_file=query_check_file,
353+
progress_callback=self._make_progress_callback(),
354+
checkpoint_path=checkpoint_path,
355+
)
356+
357+
self._print_apply_result(report)
358+
return report
359+
322360
def _print_apply_result(self, report) -> None:
323361
"""Print the result summary after migration apply."""
324362
if report.result == "succeeded":

redisvl/migration/__init__.py

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,15 @@
1+
from redisvl.migration.async_executor import AsyncMigrationExecutor
2+
from redisvl.migration.async_planner import AsyncMigrationPlanner
3+
from redisvl.migration.async_validation import AsyncMigrationValidator
14
from redisvl.migration.executor import MigrationExecutor
25
from redisvl.migration.planner import MigrationPlanner
36
from redisvl.migration.validation import MigrationValidator
47
from redisvl.migration.wizard import MigrationWizard
58

69
__all__ = [
10+
"AsyncMigrationExecutor",
11+
"AsyncMigrationPlanner",
12+
"AsyncMigrationValidator",
713
"MigrationExecutor",
814
"MigrationPlanner",
915
"MigrationValidator",

0 commit comments

Comments
 (0)