Skip to content

Commit 8c6efad

Browse files
committed
feat(migrate): [0/6] migration design foundation with models, planner, validator, and utilities
Adds the core data structures and planning engine for the Index Migrator: - models.py: Pydantic models for MigrationPlan, DiffClassification, ValidationResult, MigrationReport - planner.py: MigrationPlanner with schema introspection, diffing, and change classification - validation.py: MigrationValidator for post-migration checks - utils.py: shared helpers for YAML I/O, disk estimation, index listing, timestamps - connection.py: HNSW parameter extraction for schema introspection - 15 unit tests for planner logic
1 parent 522ba9b commit 8c6efad

7 files changed

Lines changed: 2670 additions & 0 deletions

File tree

redisvl/migration/__init__.py

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,7 @@
1+
from redisvl.migration.planner import MigrationPlanner
2+
from redisvl.migration.validation import MigrationValidator
3+
4+
__all__ = [
5+
"MigrationPlanner",
6+
"MigrationValidator",
7+
]

redisvl/migration/models.py

Lines changed: 380 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,380 @@
1+
from __future__ import annotations
2+
3+
from typing import Any, Dict, List, Optional
4+
5+
from pydantic import BaseModel, Field, model_validator
6+
7+
8+
class FieldUpdate(BaseModel):
9+
"""Partial field update for schema patch inputs."""
10+
11+
name: str
12+
type: Optional[str] = None
13+
path: Optional[str] = None
14+
attrs: Dict[str, Any] = Field(default_factory=dict)
15+
options: Dict[str, Any] = Field(default_factory=dict)
16+
17+
@model_validator(mode="after")
18+
def merge_options_into_attrs(self) -> "FieldUpdate":
19+
if self.options:
20+
merged_attrs = dict(self.attrs)
21+
merged_attrs.update(self.options)
22+
self.attrs = merged_attrs
23+
self.options = {}
24+
return self
25+
26+
27+
class FieldRename(BaseModel):
28+
"""Field rename specification for schema patch inputs."""
29+
30+
old_name: str
31+
new_name: str
32+
33+
34+
class SchemaPatchChanges(BaseModel):
35+
add_fields: List[Dict[str, Any]] = Field(default_factory=list)
36+
remove_fields: List[str] = Field(default_factory=list)
37+
update_fields: List[FieldUpdate] = Field(default_factory=list)
38+
rename_fields: List[FieldRename] = Field(default_factory=list)
39+
index: Dict[str, Any] = Field(default_factory=dict)
40+
41+
42+
class SchemaPatch(BaseModel):
43+
version: int = 1
44+
changes: SchemaPatchChanges = Field(default_factory=SchemaPatchChanges)
45+
46+
47+
class KeyspaceSnapshot(BaseModel):
48+
storage_type: str
49+
prefixes: List[str]
50+
key_separator: str
51+
key_sample: List[str] = Field(default_factory=list)
52+
53+
54+
class SourceSnapshot(BaseModel):
55+
index_name: str
56+
schema_snapshot: Dict[str, Any]
57+
stats_snapshot: Dict[str, Any]
58+
keyspace: KeyspaceSnapshot
59+
60+
61+
class DiffClassification(BaseModel):
62+
supported: bool
63+
blocked_reasons: List[str] = Field(default_factory=list)
64+
65+
66+
class ValidationPolicy(BaseModel):
67+
require_doc_count_match: bool = True
68+
require_schema_match: bool = True
69+
70+
71+
class RenameOperations(BaseModel):
72+
"""Tracks which rename operations are required for a migration."""
73+
74+
rename_index: Optional[str] = None # New index name if renaming
75+
change_prefix: Optional[str] = None # New prefix if changing
76+
rename_fields: List[FieldRename] = Field(default_factory=list)
77+
78+
@property
79+
def has_operations(self) -> bool:
80+
return bool(
81+
self.rename_index is not None
82+
or self.change_prefix is not None
83+
or self.rename_fields
84+
)
85+
86+
87+
class MigrationPlan(BaseModel):
88+
version: int = 1
89+
mode: str = "drop_recreate"
90+
source: SourceSnapshot
91+
requested_changes: Dict[str, Any]
92+
merged_target_schema: Dict[str, Any]
93+
diff_classification: DiffClassification
94+
rename_operations: RenameOperations = Field(default_factory=RenameOperations)
95+
warnings: List[str] = Field(default_factory=list)
96+
validation: ValidationPolicy = Field(default_factory=ValidationPolicy)
97+
98+
99+
class QueryCheckResult(BaseModel):
100+
name: str
101+
passed: bool
102+
details: Optional[str] = None
103+
104+
105+
class MigrationValidation(BaseModel):
106+
schema_match: bool = False
107+
doc_count_match: bool = False
108+
key_sample_exists: bool = False
109+
indexing_failures_delta: int = 0
110+
query_checks: List[QueryCheckResult] = Field(default_factory=list)
111+
errors: List[str] = Field(default_factory=list)
112+
113+
114+
class MigrationTimings(BaseModel):
115+
total_migration_duration_seconds: Optional[float] = None
116+
drop_duration_seconds: Optional[float] = None
117+
quantize_duration_seconds: Optional[float] = None
118+
field_rename_duration_seconds: Optional[float] = None
119+
key_rename_duration_seconds: Optional[float] = None
120+
recreate_duration_seconds: Optional[float] = None
121+
initial_indexing_duration_seconds: Optional[float] = None
122+
validation_duration_seconds: Optional[float] = None
123+
downtime_duration_seconds: Optional[float] = None
124+
125+
126+
class MigrationBenchmarkSummary(BaseModel):
127+
documents_indexed_per_second: Optional[float] = None
128+
source_index_size_mb: Optional[float] = None
129+
target_index_size_mb: Optional[float] = None
130+
index_size_delta_mb: Optional[float] = None
131+
132+
133+
class MigrationReport(BaseModel):
134+
version: int = 1
135+
mode: str = "drop_recreate"
136+
source_index: str
137+
target_index: str
138+
result: str
139+
started_at: str
140+
finished_at: str
141+
timings: MigrationTimings = Field(default_factory=MigrationTimings)
142+
validation: MigrationValidation = Field(default_factory=MigrationValidation)
143+
benchmark_summary: MigrationBenchmarkSummary = Field(
144+
default_factory=MigrationBenchmarkSummary
145+
)
146+
disk_space_estimate: Optional["DiskSpaceEstimate"] = None
147+
warnings: List[str] = Field(default_factory=list)
148+
manual_actions: List[str] = Field(default_factory=list)
149+
150+
151+
# -----------------------------------------------------------------------------
152+
# Disk Space Estimation
153+
# -----------------------------------------------------------------------------
154+
155+
# Bytes per element for each vector datatype
156+
DTYPE_BYTES: Dict[str, int] = {
157+
"float64": 8,
158+
"float32": 4,
159+
"float16": 2,
160+
"bfloat16": 2,
161+
"int8": 1,
162+
"uint8": 1,
163+
}
164+
165+
# AOF protocol overhead per HSET command (RESP framing)
166+
AOF_HSET_OVERHEAD_BYTES = 114
167+
# JSON.SET has slightly larger RESP framing
168+
AOF_JSON_SET_OVERHEAD_BYTES = 140
169+
# RDB compression ratio for pseudo-random vector data (compresses poorly)
170+
RDB_COMPRESSION_RATIO = 0.95
171+
172+
173+
class VectorFieldEstimate(BaseModel):
174+
"""Per-field disk space breakdown for a single vector field."""
175+
176+
field_name: str
177+
dims: int
178+
source_dtype: str
179+
target_dtype: str
180+
source_bytes_per_doc: int
181+
target_bytes_per_doc: int
182+
183+
184+
class DiskSpaceEstimate(BaseModel):
185+
"""Pre-migration estimate of disk and memory costs.
186+
187+
Produced by estimate_disk_space() as a pure calculation from the migration
188+
plan. No Redis mutations are performed.
189+
"""
190+
191+
# Index metadata
192+
index_name: str
193+
doc_count: int
194+
storage_type: str = "hash"
195+
196+
# Per-field breakdowns
197+
vector_fields: List[VectorFieldEstimate] = Field(default_factory=list)
198+
199+
# Aggregate vector data sizes
200+
total_source_vector_bytes: int = 0
201+
total_target_vector_bytes: int = 0
202+
203+
# RDB snapshot cost (BGSAVE before migration)
204+
rdb_snapshot_disk_bytes: int = 0
205+
rdb_cow_memory_if_concurrent_bytes: int = 0
206+
207+
# AOF growth cost (only if aof_enabled is True)
208+
aof_enabled: bool = False
209+
aof_growth_bytes: int = 0
210+
211+
# Totals
212+
total_new_disk_bytes: int = 0
213+
memory_savings_after_bytes: int = 0
214+
215+
@property
216+
def has_quantization(self) -> bool:
217+
return len(self.vector_fields) > 0
218+
219+
def summary(self) -> str:
220+
"""Human-readable summary for CLI output."""
221+
if not self.has_quantization:
222+
return "No vector quantization in this migration. No additional disk space required."
223+
224+
lines = [
225+
"Pre-migration disk space estimate:",
226+
f" Index: {self.index_name} ({self.doc_count:,} documents)",
227+
]
228+
for vf in self.vector_fields:
229+
lines.append(
230+
f" Vector field '{vf.field_name}': {vf.dims} dims, "
231+
f"{vf.source_dtype} -> {vf.target_dtype}"
232+
)
233+
234+
lines.append("")
235+
lines.append(
236+
f" RDB snapshot (BGSAVE): ~{_format_bytes(self.rdb_snapshot_disk_bytes)}"
237+
)
238+
if self.aof_enabled:
239+
lines.append(
240+
f" AOF growth (appendonly=yes): ~{_format_bytes(self.aof_growth_bytes)}"
241+
)
242+
else:
243+
lines.append(
244+
" AOF growth: not estimated (pass aof_enabled=True if AOF is on)"
245+
)
246+
lines.append(
247+
f" Total new disk required: ~{_format_bytes(self.total_new_disk_bytes)}"
248+
)
249+
lines.append("")
250+
lines.append(
251+
f" Post-migration memory delta: ~{_format_bytes(abs(self.memory_savings_after_bytes))} "
252+
f"({'reduction' if self.memory_savings_after_bytes >= 0 else 'increase'}, "
253+
f"{abs(self._savings_pct())}%)"
254+
)
255+
return "\n".join(lines)
256+
257+
def _savings_pct(self) -> int:
258+
if self.total_source_vector_bytes == 0:
259+
return 0
260+
return round(
261+
100 * self.memory_savings_after_bytes / self.total_source_vector_bytes
262+
)
263+
264+
265+
def _format_bytes(n: int) -> str:
266+
"""Format byte count as human-readable string."""
267+
if n >= 1_073_741_824:
268+
return f"{n / 1_073_741_824:.2f} GB"
269+
if n >= 1_048_576:
270+
return f"{n / 1_048_576:.1f} MB"
271+
if n >= 1024:
272+
return f"{n / 1024:.1f} KB"
273+
return f"{n} bytes"
274+
275+
276+
# -----------------------------------------------------------------------------
277+
# Batch Migration Models
278+
# -----------------------------------------------------------------------------
279+
280+
281+
class BatchIndexEntry(BaseModel):
282+
"""Entry for a single index in a batch migration plan."""
283+
284+
name: str
285+
applicable: bool = True
286+
skip_reason: Optional[str] = None
287+
288+
289+
class BatchPlan(BaseModel):
290+
"""Plan for migrating multiple indexes with a shared patch."""
291+
292+
version: int = 1
293+
batch_id: str
294+
mode: str = "drop_recreate"
295+
failure_policy: str = "fail_fast" # or "continue_on_error"
296+
requires_quantization: bool = False
297+
shared_patch: SchemaPatch
298+
indexes: List[BatchIndexEntry] = Field(default_factory=list)
299+
created_at: str
300+
301+
@property
302+
def applicable_count(self) -> int:
303+
return sum(1 for idx in self.indexes if idx.applicable)
304+
305+
@property
306+
def skipped_count(self) -> int:
307+
return sum(1 for idx in self.indexes if not idx.applicable)
308+
309+
310+
class BatchIndexState(BaseModel):
311+
"""State of a single index in batch execution."""
312+
313+
name: str
314+
status: str # pending, in_progress, success, failed, skipped
315+
started_at: Optional[str] = None
316+
completed_at: Optional[str] = None
317+
failed_at: Optional[str] = None
318+
error: Optional[str] = None
319+
report_path: Optional[str] = None
320+
321+
322+
class BatchState(BaseModel):
323+
"""Checkpoint state for batch migration execution."""
324+
325+
batch_id: str
326+
plan_path: str
327+
started_at: str
328+
updated_at: str
329+
completed: List[BatchIndexState] = Field(default_factory=list)
330+
current_index: Optional[str] = None
331+
remaining: List[str] = Field(default_factory=list)
332+
333+
@property
334+
def success_count(self) -> int:
335+
return sum(1 for idx in self.completed if idx.status == "success")
336+
337+
@property
338+
def failed_count(self) -> int:
339+
return sum(1 for idx in self.completed if idx.status == "failed")
340+
341+
@property
342+
def skipped_count(self) -> int:
343+
return sum(1 for idx in self.completed if idx.status == "skipped")
344+
345+
@property
346+
def is_complete(self) -> bool:
347+
return len(self.remaining) == 0 and self.current_index is None
348+
349+
350+
class BatchReportSummary(BaseModel):
351+
"""Summary statistics for batch migration."""
352+
353+
total_indexes: int = 0
354+
successful: int = 0
355+
failed: int = 0
356+
skipped: int = 0
357+
total_duration_seconds: float = 0.0
358+
359+
360+
class BatchIndexReport(BaseModel):
361+
"""Report for a single index in batch execution."""
362+
363+
name: str
364+
status: str # success, failed, skipped
365+
duration_seconds: Optional[float] = None
366+
docs_migrated: Optional[int] = None
367+
report_path: Optional[str] = None
368+
error: Optional[str] = None
369+
370+
371+
class BatchReport(BaseModel):
372+
"""Final report for batch migration execution."""
373+
374+
version: int = 1
375+
batch_id: str
376+
status: str # completed, partial_failure, failed
377+
summary: BatchReportSummary = Field(default_factory=BatchReportSummary)
378+
indexes: List[BatchIndexReport] = Field(default_factory=list)
379+
started_at: str
380+
completed_at: str

0 commit comments

Comments
 (0)