Skip to content

Commit c08c727

Browse files
committed
feat(migrate): implement rename execution in sync/async executors
- Add _rename_keys for prefix changes via RENAME command - Add _rename_field_in_hash and _rename_field_in_json for field renames - Execute renames before drop/recreate for safe enumeration - Support both HASH and JSON storage types
1 parent e1add9c commit c08c727

2 files changed

Lines changed: 458 additions & 13 deletions

File tree

redisvl/migration/async_executor.py

Lines changed: 213 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -177,6 +177,120 @@ async def _enumerate_with_scan(
177177
if cursor == 0:
178178
break
179179

180+
async def _rename_keys(
181+
self,
182+
client: AsyncRedisClient,
183+
keys: List[str],
184+
old_prefix: str,
185+
new_prefix: str,
186+
progress_callback: Optional[Callable[[int, int], None]] = None,
187+
) -> int:
188+
"""Async version: Rename keys from old prefix to new prefix."""
189+
renamed = 0
190+
total = len(keys)
191+
pipeline_size = 100
192+
193+
for i in range(0, total, pipeline_size):
194+
batch = keys[i : i + pipeline_size]
195+
pipe = client.pipeline(transaction=False)
196+
197+
for key in batch:
198+
if key.startswith(old_prefix):
199+
new_key = new_prefix + key[len(old_prefix) :]
200+
else:
201+
logger.warning(
202+
f"Key '{key}' does not start with prefix '{old_prefix}'"
203+
)
204+
continue
205+
pipe.rename(key, new_key)
206+
207+
try:
208+
results = await pipe.execute()
209+
renamed += sum(1 for r in results if r is True or r == "OK")
210+
except Exception as e:
211+
logger.warning(f"Error in rename batch: {e}")
212+
213+
if progress_callback:
214+
progress_callback(min(i + pipeline_size, total), total)
215+
216+
return renamed
217+
218+
async def _rename_field_in_hash(
219+
self,
220+
client: AsyncRedisClient,
221+
keys: List[str],
222+
old_name: str,
223+
new_name: str,
224+
progress_callback: Optional[Callable[[int, int], None]] = None,
225+
) -> int:
226+
"""Async version: Rename a field in hash documents."""
227+
renamed = 0
228+
total = len(keys)
229+
pipeline_size = 100
230+
231+
for i in range(0, total, pipeline_size):
232+
batch = keys[i : i + pipeline_size]
233+
234+
pipe = client.pipeline(transaction=False)
235+
for key in batch:
236+
pipe.hget(key, old_name)
237+
values = await pipe.execute()
238+
239+
pipe = client.pipeline(transaction=False)
240+
for key, value in zip(batch, values):
241+
if value is not None:
242+
pipe.hset(key, new_name, value)
243+
pipe.hdel(key, old_name)
244+
245+
try:
246+
results = await pipe.execute()
247+
renamed += sum(1 for j, r in enumerate(results) if j % 2 == 0 and r)
248+
except Exception as e:
249+
logger.warning(f"Error in field rename batch: {e}")
250+
251+
if progress_callback:
252+
progress_callback(min(i + pipeline_size, total), total)
253+
254+
return renamed
255+
256+
async def _rename_field_in_json(
257+
self,
258+
client: AsyncRedisClient,
259+
keys: List[str],
260+
old_path: str,
261+
new_path: str,
262+
progress_callback: Optional[Callable[[int, int], None]] = None,
263+
) -> int:
264+
"""Async version: Rename a field in JSON documents."""
265+
renamed = 0
266+
total = len(keys)
267+
pipeline_size = 100
268+
269+
for i in range(0, total, pipeline_size):
270+
batch = keys[i : i + pipeline_size]
271+
272+
pipe = client.pipeline(transaction=False)
273+
for key in batch:
274+
pipe.json().get(key, old_path)
275+
values = await pipe.execute()
276+
277+
pipe = client.pipeline(transaction=False)
278+
for key, value in zip(batch, values):
279+
if value is not None:
280+
pipe.json().set(key, new_path, value)
281+
pipe.json().delete(key, old_path)
282+
283+
try:
284+
results = await pipe.execute()
285+
renamed += sum(1 for j, r in enumerate(results) if j % 2 == 0 and r)
286+
except Exception as e:
287+
logger.warning(f"Error in JSON field rename batch: {e}")
288+
289+
if progress_callback:
290+
progress_callback(min(i + pipeline_size, total), total)
291+
292+
return renamed
293+
180294
async def apply(
181295
self,
182296
plan: MigrationPlan,
@@ -244,6 +358,8 @@ async def apply(
244358
enumerate_duration = 0.0
245359
drop_duration = 0.0
246360
quantize_duration = 0.0
361+
field_rename_duration = 0.0
362+
key_rename_duration = 0.0
247363
recreate_duration = 0.0
248364
indexing_duration = 0.0
249365
target_info: Dict[str, Any] = {}
@@ -254,18 +370,26 @@ async def apply(
254370
plan.source.schema_snapshot, plan.merged_target_schema
255371
)
256372

373+
# Check for rename operations
374+
rename_ops = plan.rename_operations
375+
has_prefix_change = bool(rename_ops.change_prefix)
376+
has_field_renames = bool(rename_ops.rename_fields)
377+
needs_enumeration = datatype_changes or has_prefix_change or has_field_renames
378+
257379
def _notify(step: str, detail: Optional[str] = None) -> None:
258380
if progress_callback:
259381
progress_callback(step, detail)
260382

261383
try:
262-
# STEP 1: Enumerate keys BEFORE dropping index (if quantization needed)
263-
if datatype_changes:
384+
client = source_index._redis_client
385+
if client is None:
386+
raise ValueError("Failed to get Redis client from source index")
387+
storage_type = plan.source.keyspace.storage_type
388+
389+
# STEP 1: Enumerate keys BEFORE any modifications
390+
if needs_enumeration:
264391
_notify("enumerate", "Enumerating indexed documents...")
265392
enumerate_started = time.perf_counter()
266-
client = source_index._redis_client
267-
if client is None:
268-
raise ValueError("Failed to get Redis client from source index")
269393
keys_to_process = [
270394
key
271395
async for key in self._enumerate_indexed_keys(
@@ -278,17 +402,85 @@ def _notify(step: str, detail: Optional[str] = None) -> None:
278402
f"found {len(keys_to_process):,} documents ({enumerate_duration}s)",
279403
)
280404

281-
# STEP 2: Drop the index
405+
# STEP 2: Field renames (before dropping index)
406+
if has_field_renames and keys_to_process:
407+
_notify("field_rename", "Renaming fields in documents...")
408+
field_rename_started = time.perf_counter()
409+
for field_rename in rename_ops.rename_fields:
410+
if storage_type == "json":
411+
old_path = f"$.{field_rename.old_name}"
412+
new_path = f"$.{field_rename.new_name}"
413+
await self._rename_field_in_json(
414+
client,
415+
keys_to_process,
416+
old_path,
417+
new_path,
418+
progress_callback=lambda done, total: _notify(
419+
"field_rename",
420+
f"{field_rename.old_name} -> {field_rename.new_name}: {done:,}/{total:,}",
421+
),
422+
)
423+
else:
424+
await self._rename_field_in_hash(
425+
client,
426+
keys_to_process,
427+
field_rename.old_name,
428+
field_rename.new_name,
429+
progress_callback=lambda done, total: _notify(
430+
"field_rename",
431+
f"{field_rename.old_name} -> {field_rename.new_name}: {done:,}/{total:,}",
432+
),
433+
)
434+
field_rename_duration = round(
435+
time.perf_counter() - field_rename_started, 3
436+
)
437+
_notify("field_rename", f"done ({field_rename_duration}s)")
438+
439+
# STEP 3: Drop the index
282440
_notify("drop", "Dropping index definition...")
283441
drop_started = time.perf_counter()
284442
await source_index.delete(drop=False)
285443
drop_duration = round(time.perf_counter() - drop_started, 3)
286444
_notify("drop", f"done ({drop_duration}s)")
287445

288-
# STEP 3: Re-encode vectors using pre-enumerated keys
446+
# STEP 4: Key renames (after drop, before recreate)
447+
if has_prefix_change and keys_to_process:
448+
_notify("key_rename", "Renaming keys...")
449+
key_rename_started = time.perf_counter()
450+
old_prefix = plan.source.keyspace.prefixes[0]
451+
new_prefix = rename_ops.change_prefix
452+
assert new_prefix is not None
453+
renamed_count = await self._rename_keys(
454+
client,
455+
keys_to_process,
456+
old_prefix,
457+
new_prefix,
458+
progress_callback=lambda done, total: _notify(
459+
"key_rename", f"{done:,}/{total:,} keys"
460+
),
461+
)
462+
key_rename_duration = round(time.perf_counter() - key_rename_started, 3)
463+
_notify(
464+
"key_rename",
465+
f"done ({renamed_count:,} keys in {key_rename_duration}s)",
466+
)
467+
468+
# STEP 5: Re-encode vectors using pre-enumerated keys
289469
if datatype_changes and keys_to_process:
290470
_notify("quantize", "Re-encoding vectors...")
291471
quantize_started = time.perf_counter()
472+
# If we renamed keys, update keys_to_process to new names
473+
if has_prefix_change and rename_ops.change_prefix:
474+
old_prefix = plan.source.keyspace.prefixes[0]
475+
new_prefix = rename_ops.change_prefix
476+
keys_to_process = [
477+
(
478+
new_prefix + k[len(old_prefix) :]
479+
if k.startswith(old_prefix)
480+
else k
481+
)
482+
for k in keys_to_process
483+
]
292484
docs_quantized = await self._async_quantize_vectors(
293485
source_index,
294486
datatype_changes,
@@ -341,11 +533,19 @@ def _index_progress(indexed: int, total: int, pct: float) -> None:
341533
quantize_duration_seconds=(
342534
quantize_duration if quantize_duration else None
343535
),
536+
field_rename_duration_seconds=(
537+
field_rename_duration if field_rename_duration else None
538+
),
539+
key_rename_duration_seconds=(
540+
key_rename_duration if key_rename_duration else None
541+
),
344542
recreate_duration_seconds=recreate_duration,
345543
initial_indexing_duration_seconds=indexing_duration,
346544
validation_duration_seconds=validation_duration,
347545
downtime_duration_seconds=round(
348546
drop_duration
547+
+ field_rename_duration
548+
+ key_rename_duration
349549
+ quantize_duration
350550
+ recreate_duration
351551
+ indexing_duration,
@@ -368,17 +568,23 @@ def _index_progress(indexed: int, total: int, pct: float) -> None:
368568
total_migration_duration_seconds=total_duration,
369569
drop_duration_seconds=drop_duration or None,
370570
quantize_duration_seconds=quantize_duration or None,
571+
field_rename_duration_seconds=field_rename_duration or None,
572+
key_rename_duration_seconds=key_rename_duration or None,
371573
recreate_duration_seconds=recreate_duration or None,
372574
initial_indexing_duration_seconds=indexing_duration or None,
373575
downtime_duration_seconds=(
374576
round(
375577
drop_duration
578+
+ field_rename_duration
579+
+ key_rename_duration
376580
+ quantize_duration
377581
+ recreate_duration
378582
+ indexing_duration,
379583
3,
380584
)
381585
if drop_duration
586+
or field_rename_duration
587+
or key_rename_duration
382588
or quantize_duration
383589
or recreate_duration
384590
or indexing_duration

0 commit comments

Comments
 (0)