Skip to content

Commit cc42b77

Browse files
committed
feat(migrate): add async migration support
Add async/await execution for index migrations, enabling non-blocking operation for large quantization jobs and async application integration. New functionality: - CLI: --async flag for rvl migrate apply - Python API: AsyncMigrationPlanner, AsyncMigrationExecutor, AsyncMigrationValidator - Batched quantization with pipelined HSET operations - Non-blocking readiness polling with asyncio.sleep() What becomes async: - SCAN operations (yields between batches of 500 keys) - Pipelined HSET writes (100-1000 ops per batch) - Index readiness polling (asyncio.sleep vs time.sleep) What stays sync: - CLI prompts (user interaction) - YAML file I/O (local filesystem) Documentation: - Sync vs async execution guidance in concepts/index-migrations.md - Async usage examples in how_to_guides/migrate-indexes.md Tests: - 4 unit tests for AsyncMigrationPlanner - 4 unit tests for AsyncMigrationExecutor - 1 integration test for full async flow
1 parent 8d17a0f commit cc42b77

11 files changed

Lines changed: 1651 additions & 12 deletions

File tree

docs/concepts/index-migrations.md

Lines changed: 103 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -139,6 +139,109 @@ With `drop_recreate`, your index is unavailable between the drop and when re-ind
139139

140140
The duration depends on document count, field count, and vector dimensions. For large indexes, consider running migrations during low traffic periods.
141141

142+
## Sync vs async execution
143+
144+
The migrator provides both synchronous and asynchronous execution modes.
145+
146+
### What becomes async and what stays sync
147+
148+
The migration workflow has distinct phases. Here is what each mode affects:
149+
150+
| Phase | Sync mode | Async mode | Notes |
151+
|-------|-----------|------------|-------|
152+
| **Plan generation** | `MigrationPlanner.create_plan()` | `AsyncMigrationPlanner.create_plan()` | Reads index metadata from Redis |
153+
| **Schema snapshot** | Sync Redis calls | Async Redis calls | Single `FT.INFO` command |
154+
| **Drop index** | `index.delete()` | `await index.delete()` | Single `FT.DROPINDEX` command |
155+
| **Quantization** | Sequential SCAN + HSET | Pipelined SCAN + batched HSET | See below |
156+
| **Create index** | `index.create()` | `await index.create()` | Single `FT.CREATE` command |
157+
| **Readiness polling** | `time.sleep()` loop | `asyncio.sleep()` loop | Polls `FT.INFO` until indexed |
158+
| **Validation** | Sync Redis calls | Async Redis calls | Schema and doc count checks |
159+
| **CLI interaction** | Always sync | Always sync | User prompts, file I/O |
160+
| **YAML read/write** | Always sync | Always sync | Local filesystem only |
161+
162+
### When to use sync (default)
163+
164+
Sync execution is simpler and sufficient for most migrations:
165+
166+
- Small to medium indexes (under 100K documents)
167+
- Index-only changes (algorithm, distance metric, field options)
168+
- Interactive CLI usage where blocking is acceptable
169+
170+
For migrations without quantization, the Redis operations are fast single commands. Sync mode adds no meaningful overhead.
171+
172+
### When to use async
173+
174+
Async execution (`--async` flag) provides benefits in specific scenarios:
175+
176+
**Large quantization jobs (1M+ vectors)**
177+
178+
Converting float32 to float16 requires reading every vector, converting it, and writing it back. The async executor:
179+
180+
- Uses `SCAN` with `COUNT 500` to iterate keys without blocking Redis (per [Redis SCAN docs](https://redis.io/docs/latest/commands/scan/), SCAN is O(1) per call)
181+
- Pipelines `HSET` operations in batches (100-1000 operations per pipeline is optimal for Redis)
182+
- Yields to the event loop between batches so other tasks can proceed
183+
184+
**Large keyspaces (40M+ keys)**
185+
186+
When your Redis instance has many keys, `SCAN` iteration can take minutes. Async mode yields between batches.
187+
188+
**Async application integration**
189+
190+
If your application uses asyncio, you can integrate migration directly:
191+
192+
```python
193+
import asyncio
194+
from redisvl.migration import AsyncMigrationPlanner, AsyncMigrationExecutor
195+
196+
async def migrate():
197+
planner = AsyncMigrationPlanner()
198+
plan = await planner.create_plan("myindex", redis_url="redis://localhost:6379")
199+
200+
executor = AsyncMigrationExecutor()
201+
report = await executor.apply(plan, redis_url="redis://localhost:6379")
202+
203+
asyncio.run(migrate())
204+
```
205+
206+
### Why async helps with quantization
207+
208+
The key difference is in the vector re-encoding loop:
209+
210+
**Sync quantization:**
211+
```
212+
for each batch of 500 keys:
213+
SCAN (blocks) -> get keys
214+
for each key:
215+
HGET field (blocks)
216+
convert array
217+
pipeline.HSET(field, new_bytes)
218+
pipeline.execute() (blocks)
219+
```
220+
221+
**Async quantization:**
222+
```
223+
for each batch of 500 keys:
224+
await SCAN -> get keys (yields)
225+
for each key:
226+
await HGET field (yields)
227+
convert array
228+
pipeline.HSET(field, new_bytes)
229+
await pipeline.execute() (yields)
230+
```
231+
232+
Each `await` is a yield point where other coroutines can run. For millions of vectors, this prevents your application from freezing.
233+
234+
### What async does NOT improve
235+
236+
Async execution does not reduce:
237+
238+
- **Total migration time**: Same work, different scheduling
239+
- **Redis server load**: Same commands execute on the server
240+
- **Downtime window**: Index remains unavailable during rebuild
241+
- **Network round trips**: Same number of Redis calls
242+
243+
The benefit is application responsiveness, not faster migration.
244+
142245
## Learn more
143246

144247
- [Migration guide](../user_guide/how_to_guides/migrate-indexes.md): Step by step instructions

docs/user_guide/how_to_guides/migrate-indexes.md

Lines changed: 78 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -302,6 +302,41 @@ What `apply` does:
302302
6. validates the result
303303
7. writes report artifacts
304304

305+
### Async execution for large migrations
306+
307+
For large migrations (especially those involving vector quantization), use the `--async` flag:
308+
309+
```bash
310+
rvl migrate apply \
311+
--plan migration_plan.yaml \
312+
--allow-downtime \
313+
--async \
314+
--url redis://localhost:6379
315+
```
316+
317+
**What becomes async:**
318+
319+
- Keyspace SCAN during quantization (yields between batches of 500 keys)
320+
- Vector read/write operations (pipelined HGET/HSET)
321+
- Index readiness polling (uses `asyncio.sleep()` instead of blocking)
322+
- Validation checks
323+
324+
**What stays sync:**
325+
326+
- CLI prompts and user interaction
327+
- YAML file reading/writing
328+
- Progress display
329+
330+
**When to use async:**
331+
332+
- Quantizing millions of vectors (float32 to float16)
333+
- Redis instance has 40M+ keys
334+
- Integrating into an async application
335+
336+
For most migrations (index-only changes, small datasets), sync mode is sufficient and simpler.
337+
338+
See {doc}`/concepts/index-migrations` for detailed async vs sync guidance.
339+
305340
## Step 5: Validate the Result
306341

307342
Validation happens automatically during `apply`, but you can run it separately:
@@ -358,6 +393,7 @@ rvl migrate validate \
358393
- `--index` : Index name to migrate
359394
- `--plan` / `--plan-out` : Path to migration plan
360395
- `--allow-downtime` : Acknowledge index unavailability (required for apply)
396+
- `--async` : Use async executor for large migrations (apply only)
361397
- `--report-out` : Path for validation report
362398
- `--benchmark-out` : Path for performance metrics
363399

@@ -389,6 +425,48 @@ If `apply` fails mid-migration:
389425

390426
The underlying documents are never deleted by `drop_recreate`.
391427

428+
## Python API
429+
430+
For programmatic migrations, use the migration classes directly:
431+
432+
### Sync API
433+
434+
```python
435+
from redisvl.migration import MigrationPlanner, MigrationExecutor
436+
437+
planner = MigrationPlanner()
438+
plan = planner.create_plan(
439+
"myindex",
440+
redis_url="redis://localhost:6379",
441+
schema_patch_path="schema_patch.yaml",
442+
)
443+
444+
executor = MigrationExecutor()
445+
report = executor.apply(plan, redis_url="redis://localhost:6379")
446+
print(f"Migration result: {report.result}")
447+
```
448+
449+
### Async API
450+
451+
```python
452+
import asyncio
453+
from redisvl.migration import AsyncMigrationPlanner, AsyncMigrationExecutor
454+
455+
async def migrate():
456+
planner = AsyncMigrationPlanner()
457+
plan = await planner.create_plan(
458+
"myindex",
459+
redis_url="redis://localhost:6379",
460+
schema_patch_path="schema_patch.yaml",
461+
)
462+
463+
executor = AsyncMigrationExecutor()
464+
report = await executor.apply(plan, redis_url="redis://localhost:6379")
465+
print(f"Migration result: {report.result}")
466+
467+
asyncio.run(migrate())
468+
```
469+
392470
## Learn more
393471

394472
- {doc}`/concepts/index-migrations`: How migrations work and which changes are supported

redisvl/cli/migrate.py

Lines changed: 67 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,17 @@
11
import argparse
2+
import asyncio
23
import sys
34
from argparse import Namespace
45
from typing import Optional
56

67
from redisvl.cli.utils import add_redis_connection_options, create_redis_url
7-
from redisvl.migration import MigrationExecutor, MigrationPlanner, MigrationValidator
8+
from redisvl.migration import (
9+
AsyncMigrationExecutor,
10+
AsyncMigrationValidator,
11+
MigrationExecutor,
12+
MigrationPlanner,
13+
MigrationValidator,
14+
)
815
from redisvl.migration.utils import (
916
list_indexes,
1017
load_migration_plan,
@@ -26,7 +33,7 @@ class Migrate:
2633
"\tlist List all available indexes",
2734
"\tplan Generate a migration plan for a document-preserving drop/recreate migration",
2835
"\twizard Interactively build a migration plan and schema patch",
29-
"\tapply Execute a reviewed drop/recreate migration plan",
36+
"\tapply Execute a reviewed drop/recreate migration plan (use --async for large migrations)",
3037
"\tvalidate Validate a completed migration plan against the live index",
3138
"\n",
3239
]
@@ -194,7 +201,7 @@ def apply(self):
194201
parser = argparse.ArgumentParser(
195202
usage=(
196203
"rvl migrate apply --plan <migration_plan.yaml> --allow-downtime "
197-
"[--report-out <migration_report.yaml>]"
204+
"[--async] [--report-out <migration_report.yaml>]"
198205
)
199206
)
200207
parser.add_argument("--plan", help="Path to migration_plan.yaml", required=True)
@@ -203,6 +210,12 @@ def apply(self):
203210
help="Explicitly acknowledge downtime for drop_recreate",
204211
action="store_true",
205212
)
213+
parser.add_argument(
214+
"--async",
215+
dest="use_async",
216+
help="Use async executor (recommended for large migrations with quantization)",
217+
action="store_true",
218+
)
206219
parser.add_argument(
207220
"--report-out",
208221
help="Path to write migration_report.yaml",
@@ -228,6 +241,21 @@ def apply(self):
228241

229242
redis_url = create_redis_url(args)
230243
plan = load_migration_plan(args.plan)
244+
245+
if args.use_async:
246+
report = asyncio.run(
247+
self._apply_async(plan, redis_url, args.query_check_file)
248+
)
249+
else:
250+
report = self._apply_sync(plan, redis_url, args.query_check_file)
251+
252+
write_migration_report(report, args.report_out)
253+
if args.benchmark_out:
254+
write_benchmark_report(report, args.benchmark_out)
255+
self._print_report_summary(args.report_out, report, args.benchmark_out)
256+
257+
def _apply_sync(self, plan, redis_url: str, query_check_file: Optional[str]):
258+
"""Execute migration synchronously."""
231259
executor = MigrationExecutor()
232260

233261
print(f"\nApplying migration to '{plan.source.index_name}'...")
@@ -241,7 +269,6 @@ def progress_callback(step: str, detail: str) -> None:
241269
"validate": "[5/5] Validate",
242270
}
243271
label = step_labels.get(step, step)
244-
# Use carriage return to update in place for progress
245272
if detail and not detail.startswith("done"):
246273
print(f" {label}: {detail} ", end="\r", flush=True)
247274
else:
@@ -250,27 +277,55 @@ def progress_callback(step: str, detail: str) -> None:
250277
report = executor.apply(
251278
plan,
252279
redis_url=redis_url,
253-
query_check_file=args.query_check_file,
280+
query_check_file=query_check_file,
254281
progress_callback=progress_callback,
255282
)
256283

257-
# Print completion summary
284+
self._print_apply_result(report)
285+
return report
286+
287+
async def _apply_async(self, plan, redis_url: str, query_check_file: Optional[str]):
288+
"""Execute migration asynchronously (non-blocking for large quantization jobs)."""
289+
executor = AsyncMigrationExecutor()
290+
291+
print(f"\nApplying migration to '{plan.source.index_name}' (async mode)...")
292+
293+
def progress_callback(step: str, detail: str) -> None:
294+
step_labels = {
295+
"drop": "[1/5] Drop index",
296+
"quantize": "[2/5] Quantize vectors",
297+
"create": "[3/5] Create index",
298+
"index": "[4/5] Re-indexing",
299+
"validate": "[5/5] Validate",
300+
}
301+
label = step_labels.get(step, step)
302+
if detail and not detail.startswith("done"):
303+
print(f" {label}: {detail} ", end="\r", flush=True)
304+
else:
305+
print(f" {label}: {detail} ")
306+
307+
report = await executor.apply(
308+
plan,
309+
redis_url=redis_url,
310+
query_check_file=query_check_file,
311+
progress_callback=progress_callback,
312+
)
313+
314+
self._print_apply_result(report)
315+
return report
316+
317+
def _print_apply_result(self, report) -> None:
318+
"""Print the result summary after migration apply."""
258319
if report.result == "succeeded":
259320
total_time = report.timings.total_migration_duration_seconds or 0
260321
downtime = report.timings.downtime_duration_seconds or 0
261322
print(f"\nMigration completed in {total_time}s (downtime: {downtime}s)")
262323
else:
263324
print(f"\nMigration {report.result}")
264-
# Show errors immediately for visibility
265325
if report.validation.errors:
266326
for error in report.validation.errors:
267327
print(f" ERROR: {error}")
268328

269-
write_migration_report(report, args.report_out)
270-
if args.benchmark_out:
271-
write_benchmark_report(report, args.benchmark_out)
272-
self._print_report_summary(args.report_out, report, args.benchmark_out)
273-
274329
def validate(self):
275330
parser = argparse.ArgumentParser(
276331
usage=(

redisvl/migration/__init__.py

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,15 +1,32 @@
1+
from redisvl.migration.async_executor import AsyncMigrationExecutor
2+
from redisvl.migration.async_planner import AsyncMigrationPlanner
3+
from redisvl.migration.async_utils import (
4+
async_current_source_matches_snapshot,
5+
async_list_indexes,
6+
async_wait_for_index_ready,
7+
)
8+
from redisvl.migration.async_validation import AsyncMigrationValidator
19
from redisvl.migration.executor import MigrationExecutor
210
from redisvl.migration.models import MigrationPlan, MigrationReport, SchemaPatch
311
from redisvl.migration.planner import MigrationPlanner
412
from redisvl.migration.validation import MigrationValidator
513
from redisvl.migration.wizard import MigrationWizard
614

715
__all__ = [
16+
# Sync
817
"MigrationExecutor",
918
"MigrationPlan",
1019
"MigrationPlanner",
1120
"MigrationReport",
1221
"MigrationValidator",
1322
"MigrationWizard",
1423
"SchemaPatch",
24+
# Async
25+
"AsyncMigrationExecutor",
26+
"AsyncMigrationPlanner",
27+
"AsyncMigrationValidator",
28+
# Async utilities
29+
"async_current_source_matches_snapshot",
30+
"async_list_indexes",
31+
"async_wait_for_index_ready",
1532
]

0 commit comments

Comments
 (0)