Skip to content

Commit e1add9c

Browse files
committed
feat(migrate): add rename operations to planner
- Add FieldRename and RenameOperations models - Add _extract_rename_operations to detect index/prefix/field renames - Update classify_diff to support rename detection - Update tests for prefix change (now supported, not blocked)
1 parent 9fe44d9 commit e1add9c

6 files changed

Lines changed: 156 additions & 29 deletions

File tree

redisvl/migration/__init__.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,8 +13,10 @@
1313
BatchPlan,
1414
BatchReport,
1515
BatchState,
16+
FieldRename,
1617
MigrationPlan,
1718
MigrationReport,
19+
RenameOperations,
1820
SchemaPatch,
1921
)
2022
from redisvl.migration.planner import MigrationPlanner
@@ -29,6 +31,8 @@
2931
"MigrationReport",
3032
"MigrationValidator",
3133
"MigrationWizard",
34+
"FieldRename",
35+
"RenameOperations",
3236
"SchemaPatch",
3337
# Batch
3438
"BatchMigrationExecutor",

redisvl/migration/async_planner.py

Lines changed: 14 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -92,16 +92,28 @@ async def create_plan_from_patch(
9292
merged_target_schema = self._sync_planner.merge_patch(
9393
source_schema, schema_patch
9494
)
95+
96+
# Extract rename operations first
97+
rename_operations, rename_warnings = (
98+
self._sync_planner._extract_rename_operations(source_schema, schema_patch)
99+
)
100+
101+
# Classify diff with awareness of rename operations
95102
diff_classification = self._sync_planner.classify_diff(
96-
source_schema, schema_patch, merged_target_schema
103+
source_schema, schema_patch, merged_target_schema, rename_operations
97104
)
98105

106+
# Build warnings list
107+
warnings = ["Index downtime is required"]
108+
warnings.extend(rename_warnings)
109+
99110
return MigrationPlan(
100111
source=snapshot,
101112
requested_changes=schema_patch.model_dump(exclude_none=True),
102113
merged_target_schema=merged_target_schema.to_dict(),
103114
diff_classification=diff_classification,
104-
warnings=["Index downtime is required"],
115+
rename_operations=rename_operations,
116+
warnings=warnings,
105117
)
106118

107119
async def snapshot_source(

redisvl/migration/models.py

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,10 +24,18 @@ def merge_options_into_attrs(self) -> "FieldUpdate":
2424
return self
2525

2626

27+
class FieldRename(BaseModel):
28+
"""Field rename specification for schema patch inputs."""
29+
30+
old_name: str
31+
new_name: str
32+
33+
2734
class SchemaPatchChanges(BaseModel):
2835
add_fields: List[Dict[str, Any]] = Field(default_factory=list)
2936
remove_fields: List[str] = Field(default_factory=list)
3037
update_fields: List[FieldUpdate] = Field(default_factory=list)
38+
rename_fields: List[FieldRename] = Field(default_factory=list)
3139
index: Dict[str, Any] = Field(default_factory=dict)
3240

3341

@@ -60,13 +68,26 @@ class ValidationPolicy(BaseModel):
6068
require_schema_match: bool = True
6169

6270

71+
class RenameOperations(BaseModel):
72+
"""Tracks which rename operations are required for a migration."""
73+
74+
rename_index: Optional[str] = None # New index name if renaming
75+
change_prefix: Optional[str] = None # New prefix if changing
76+
rename_fields: List[FieldRename] = Field(default_factory=list)
77+
78+
@property
79+
def has_operations(self) -> bool:
80+
return bool(self.rename_index or self.change_prefix or self.rename_fields)
81+
82+
6383
class MigrationPlan(BaseModel):
6484
version: int = 1
6585
mode: str = "drop_recreate"
6686
source: SourceSnapshot
6787
requested_changes: Dict[str, Any]
6888
merged_target_schema: Dict[str, Any]
6989
diff_classification: DiffClassification
90+
rename_operations: RenameOperations = Field(default_factory=RenameOperations)
7091
warnings: List[str] = Field(default_factory=list)
7192
validation: ValidationPolicy = Field(default_factory=ValidationPolicy)
7293

@@ -90,6 +111,8 @@ class MigrationTimings(BaseModel):
90111
total_migration_duration_seconds: Optional[float] = None
91112
drop_duration_seconds: Optional[float] = None
92113
quantize_duration_seconds: Optional[float] = None
114+
field_rename_duration_seconds: Optional[float] = None
115+
key_rename_duration_seconds: Optional[float] = None
93116
recreate_duration_seconds: Optional[float] = None
94117
initial_indexing_duration_seconds: Optional[float] = None
95118
validation_duration_seconds: Optional[float] = None

redisvl/migration/planner.py

Lines changed: 99 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -2,15 +2,17 @@
22

33
from copy import deepcopy
44
from pathlib import Path
5-
from typing import Any, Dict, List, Optional
5+
from typing import Any, Dict, List, Optional, Tuple
66

77
import yaml
88

99
from redisvl.index import SearchIndex
1010
from redisvl.migration.models import (
1111
DiffClassification,
12+
FieldRename,
1213
KeyspaceSnapshot,
1314
MigrationPlan,
15+
RenameOperations,
1416
SchemaPatch,
1517
SourceSnapshot,
1618
)
@@ -94,16 +96,28 @@ def create_plan_from_patch(
9496
)
9597
source_schema = IndexSchema.from_dict(snapshot.schema_snapshot)
9698
merged_target_schema = self.merge_patch(source_schema, schema_patch)
99+
100+
# Extract rename operations first
101+
rename_operations, rename_warnings = self._extract_rename_operations(
102+
source_schema, schema_patch
103+
)
104+
105+
# Classify diff with awareness of rename operations
97106
diff_classification = self.classify_diff(
98-
source_schema, schema_patch, merged_target_schema
107+
source_schema, schema_patch, merged_target_schema, rename_operations
99108
)
100109

110+
# Build warnings list
111+
warnings = ["Index downtime is required"]
112+
warnings.extend(rename_warnings)
113+
101114
return MigrationPlan(
102115
source=snapshot,
103116
requested_changes=schema_patch.model_dump(exclude_none=True),
104117
merged_target_schema=merged_target_schema.to_dict(),
105118
diff_classification=diff_classification,
106-
warnings=["Index downtime is required"],
119+
rename_operations=rename_operations,
120+
warnings=warnings,
107121
)
108122

109123
def snapshot_source(
@@ -223,29 +237,100 @@ def merge_patch(
223237
schema_dict["index"].update(changes.index)
224238
return IndexSchema.from_dict(schema_dict)
225239

240+
def _extract_rename_operations(
241+
self,
242+
source_schema: IndexSchema,
243+
schema_patch: SchemaPatch,
244+
) -> Tuple[RenameOperations, List[str]]:
245+
"""Extract rename operations from the patch and generate warnings.
246+
247+
Returns:
248+
Tuple of (RenameOperations, warnings list)
249+
"""
250+
source_dict = source_schema.to_dict()
251+
changes = schema_patch.changes
252+
warnings: List[str] = []
253+
254+
# Index rename
255+
rename_index: Optional[str] = None
256+
if "name" in changes.index:
257+
new_name = changes.index["name"]
258+
old_name = source_dict["index"].get("name")
259+
if new_name != old_name:
260+
rename_index = new_name
261+
warnings.append(
262+
f"Index rename: '{old_name}' -> '{new_name}' (index-only change, no document migration needed)"
263+
)
264+
265+
# Prefix change
266+
change_prefix: Optional[str] = None
267+
if "prefix" in changes.index:
268+
new_prefix = changes.index["prefix"]
269+
old_prefix = source_dict["index"].get("prefix")
270+
if new_prefix != old_prefix:
271+
change_prefix = new_prefix
272+
warnings.append(
273+
f"Prefix change: '{old_prefix}' -> '{new_prefix}' "
274+
"(requires RENAME for all keys, may be slow for large datasets)"
275+
)
276+
277+
# Field renames from explicit rename_fields
278+
rename_fields: List[FieldRename] = list(changes.rename_fields)
279+
for field_rename in rename_fields:
280+
warnings.append(
281+
f"Field rename: '{field_rename.old_name}' -> '{field_rename.new_name}' "
282+
"(requires read/write for all documents, may be slow for large datasets)"
283+
)
284+
285+
return (
286+
RenameOperations(
287+
rename_index=rename_index,
288+
change_prefix=change_prefix,
289+
rename_fields=rename_fields,
290+
),
291+
warnings,
292+
)
293+
226294
def classify_diff(
227295
self,
228296
source_schema: IndexSchema,
229297
schema_patch: SchemaPatch,
230298
merged_target_schema: IndexSchema,
299+
rename_operations: Optional[RenameOperations] = None,
231300
) -> DiffClassification:
232301
blocked_reasons: List[str] = []
233302
changes = schema_patch.changes
234303
source_dict = source_schema.to_dict()
235304
target_dict = merged_target_schema.to_dict()
236305

306+
# Check which rename operations are being handled
307+
has_index_rename = rename_operations and rename_operations.rename_index
308+
has_prefix_change = rename_operations and rename_operations.change_prefix
309+
has_field_renames = (
310+
rename_operations and len(rename_operations.rename_fields) > 0
311+
)
312+
renamed_field_names = set()
313+
if has_field_renames and rename_operations:
314+
renamed_field_names = {
315+
fr.old_name for fr in rename_operations.rename_fields
316+
}
317+
237318
for index_key, target_value in changes.index.items():
238319
source_value = source_dict["index"].get(index_key)
239320
if source_value == target_value:
240321
continue
241322
if index_key == "name":
242-
blocked_reasons.append(
243-
"Changing the index name requires document migration (not yet supported)."
244-
)
323+
# Index rename is now supported - skip blocking if we have rename_operations
324+
if not has_index_rename:
325+
blocked_reasons.append(
326+
"Changing the index name requires document migration (not yet supported)."
327+
)
245328
elif index_key == "prefix":
246-
blocked_reasons.append(
247-
"Changing index prefixes requires document migration (not yet supported)."
248-
)
329+
# Prefix change is now supported
330+
if not has_prefix_change:
331+
blocked_reasons.append(
332+
"Changing index prefixes requires document migration (not yet supported)."
333+
)
249334
elif index_key == "key_separator":
250335
blocked_reasons.append(
251336
"Changing the key separator requires document migration (not yet supported)."
@@ -291,9 +376,11 @@ def classify_diff(
291376
)
292377
blocked_reasons.extend(vector_blocked)
293378

294-
blocked_reasons.extend(
295-
self._detect_possible_field_renames(source_fields, target_fields)
296-
)
379+
# Detect possible field renames only if not explicitly provided
380+
if not has_field_renames:
381+
blocked_reasons.extend(
382+
self._detect_possible_field_renames(source_fields, target_fields)
383+
)
297384

298385
return DiffClassification(
299386
supported=len(blocked_reasons) == 0,

tests/unit/test_async_migration_planner.py

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -276,8 +276,8 @@ async def mock_from_existing(*args, **kwargs):
276276

277277

278278
@pytest.mark.asyncio
279-
async def test_async_planner_prefix_change_blocked(monkeypatch, tmp_path):
280-
"""Prefix change is blocked: documents are at wrong keys."""
279+
async def test_async_planner_prefix_change_is_supported(monkeypatch, tmp_path):
280+
"""Prefix change is supported: executor will rename keys."""
281281
source_schema = _make_source_schema()
282282
dummy_index = AsyncDummyIndex(source_schema, {"num_docs": 2}, [b"docs:1"])
283283

@@ -312,8 +312,8 @@ async def mock_from_existing(*args, **kwargs):
312312
target_schema_path=str(target_schema_path),
313313
)
314314

315-
assert plan.diff_classification.supported is False
316-
assert any(
317-
"prefix" in reason.lower()
318-
for reason in plan.diff_classification.blocked_reasons
319-
)
315+
# Prefix change is now supported
316+
assert plan.diff_classification.supported is True
317+
assert plan.rename_operations.change_prefix == "docs_v2"
318+
# Should have a warning about key renaming
319+
assert any("prefix" in w.lower() for w in plan.warnings)

tests/unit/test_migration_planner.py

Lines changed: 9 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -278,8 +278,8 @@ def test_target_schema_vector_algorithm_change_is_allowed(monkeypatch, tmp_path)
278278
# =============================================================================
279279

280280

281-
def test_target_schema_prefix_change_is_blocked(monkeypatch, tmp_path):
282-
"""Prefix change is blocked: documents are at wrong keys."""
281+
def test_target_schema_prefix_change_is_supported(monkeypatch, tmp_path):
282+
"""Prefix change is now supported via key rename operations."""
283283
source_schema = _make_source_schema()
284284
dummy_index = DummyIndex(source_schema, {"num_docs": 2}, [b"docs:1"])
285285
monkeypatch.setattr(
@@ -310,11 +310,12 @@ def test_target_schema_prefix_change_is_blocked(monkeypatch, tmp_path):
310310
target_schema_path=str(target_schema_path),
311311
)
312312

313-
assert plan.diff_classification.supported is False
314-
assert any(
315-
"prefix" in reason.lower() and "iterative_shadow" in reason
316-
for reason in plan.diff_classification.blocked_reasons
317-
)
313+
# Prefix change is now supported
314+
assert plan.diff_classification.supported is True
315+
# Verify rename operation is populated
316+
assert plan.rename_operations.change_prefix == "docs_v2"
317+
# Verify warning is present
318+
assert any("Prefix change" in w for w in plan.warnings)
318319

319320

320321
def test_key_separator_change_is_blocked(monkeypatch, tmp_path):
@@ -466,7 +467,7 @@ def test_vector_dimension_change_is_blocked(monkeypatch, tmp_path):
466467

467468
assert plan.diff_classification.supported is False
468469
assert any(
469-
"dims" in reason and "iterative_shadow" in reason
470+
"dims" in reason and "document migration" in reason
470471
for reason in plan.diff_classification.blocked_reasons
471472
)
472473

0 commit comments

Comments
 (0)