-
Notifications
You must be signed in to change notification settings - Fork 377
Fix: Regression that caused view snapshos not to be migrated #5389
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 1 commit
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -12,6 +12,7 @@ | |
| Snapshot, | ||
| SnapshotTableInfo, | ||
| SnapshotId, | ||
| snapshots_to_dag, | ||
| ) | ||
|
|
||
|
|
||
|
|
@@ -248,6 +249,7 @@ def build(self, plan: EvaluatablePlan) -> t.List[PlanStage]: | |
| stored_snapshots = self.state_reader.get_snapshots(plan.environment.snapshots) | ||
| snapshots = {**new_snapshots, **stored_snapshots} | ||
| snapshots_by_name = {s.name: s for s in snapshots.values()} | ||
| dag = snapshots_to_dag(snapshots.values()) | ||
|
|
||
| all_selected_for_backfill_snapshots = { | ||
| s.snapshot_id for s in snapshots.values() if plan.is_selected_for_backfill(s.name) | ||
|
|
@@ -271,8 +273,17 @@ def build(self, plan: EvaluatablePlan) -> t.List[PlanStage]: | |
| after_promote_snapshots = all_selected_for_backfill_snapshots - before_promote_snapshots | ||
| deployability_index = DeployabilityIndex.all_deployable() | ||
|
|
||
| snapshot_ids_with_schema_migration = [ | ||
| s.snapshot_id for s in snapshots.values() if s.requires_schema_migration_in_prod | ||
| ] | ||
| # Include all upstream dependencies of snapshots that require schema migration to make sure | ||
| # the upstream tables are created before the schema updates are applied | ||
| snapshots_with_schema_migration = [ | ||
| s for s in snapshots.values() if s.requires_schema_migration_in_prod | ||
| snapshots[s_id] | ||
| for s_id in dag.subdag(*snapshot_ids_with_schema_migration) | ||
| if snapshots[s_id].is_paused | ||
| and snapshots[s_id].is_model | ||
| and not snapshots[s_id].is_symbolic | ||
|
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Just to help me learn: Why couldn't these if conditions be applied when creating
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Oh I see it is part of
Collaborator
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. correct. So the snapshots in the subdag may not return True for |
||
| ] | ||
|
|
||
| snapshots_to_intervals = self._missing_intervals( | ||
|
|
||
| Original file line number | Diff line number | Diff line change | ||||
|---|---|---|---|---|---|---|
|
|
@@ -489,15 +489,14 @@ def migrate( | |||||
| allow_destructive_snapshots = allow_destructive_snapshots or set() | ||||||
| allow_additive_snapshots = allow_additive_snapshots or set() | ||||||
| snapshots_by_name = {s.name: s for s in snapshots.values()} | ||||||
| snapshots_with_data_objects = [snapshots[s_id] for s_id in target_data_objects] | ||||||
| with self.concurrent_context(): | ||||||
| # Only migrate snapshots for which there's an existing data object | ||||||
| concurrent_apply_to_snapshots( | ||||||
| snapshots_with_data_objects, | ||||||
| snapshots_by_name.values(), | ||||||
| lambda s: self._migrate_snapshot( | ||||||
| s, | ||||||
| snapshots_by_name, | ||||||
| target_data_objects[s.snapshot_id], | ||||||
| target_data_objects.get(s.snapshot_id), | ||||||
|
Comment on lines
+495
to
+499
|
||||||
| allow_destructive_snapshots, | ||||||
| allow_additive_snapshots, | ||||||
| self.get_adapter(s.model_gateway), | ||||||
|
|
@@ -1059,7 +1058,7 @@ def _migrate_snapshot( | |||||
| adapter: EngineAdapter, | ||||||
| deployability_index: DeployabilityIndex, | ||||||
| ) -> None: | ||||||
| if not snapshot.requires_schema_migration_in_prod: | ||||||
| if not snapshot.is_model or snapshot.is_symbolic: | ||||||
| return | ||||||
|
|
||||||
| deployability_index = DeployabilityIndex.all_deployable() | ||||||
|
|
@@ -1081,20 +1080,32 @@ def _migrate_snapshot( | |||||
| ): | ||||||
| table_exists = False | ||||||
|
|
||||||
| rendered_physical_properties = snapshot.model.render_physical_properties( | ||||||
| **render_kwargs | ||||||
| ) | ||||||
|
|
||||||
| if table_exists: | ||||||
| self._migrate_target_table( | ||||||
| target_table_name=target_table_name, | ||||||
| snapshot=snapshot, | ||||||
| snapshots=snapshots, | ||||||
| deployability_index=deployability_index, | ||||||
| render_kwargs=render_kwargs, | ||||||
| rendered_physical_properties=snapshot.model.render_physical_properties( | ||||||
| **render_kwargs | ||||||
| ), | ||||||
| rendered_physical_properties=rendered_physical_properties, | ||||||
| allow_destructive_snapshots=allow_destructive_snapshots, | ||||||
| allow_additive_snapshots=allow_additive_snapshots, | ||||||
| run_pre_post_statements=True, | ||||||
| ) | ||||||
| else: | ||||||
| self._execute_create( | ||||||
| snapshot=snapshot, | ||||||
| table_name=snapshot.table_name(is_deployable=True), | ||||||
| is_table_deployable=True, | ||||||
| deployability_index=deployability_index, | ||||||
| create_render_kwargs=render_kwargs, | ||||||
| rendered_physical_properties=rendered_physical_properties, | ||||||
| dry_run=True, | ||||||
|
||||||
| dry_run=True, | |
| dry_run=False, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this is just a bad comment all around
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What is the reasoning for having this logic outside of EvaluatablePlan? For example should it have properties
evaluatable_snapshotsanddagwhich would return this info? Reading this over and I was surprised I couldn't consult the plan itself to get this information.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Because
EvaluatablePlaninstance only contains data which can be (and will be) serialized. We could consider making it a property method of theEvaluatblePlanbut currently there isn't really any other consumer for this. Plus we need access to the StateReader to fetch all snapshots whichEvalutablePlandoesn't (and shouldn't) posses.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ah yes the state reader. Thanks.