Support onSchemaChange for incremental tables#2101
Support onSchemaChange for incremental tables#2101SuchodolskiEdvin wants to merge 4 commits intomainfrom
Conversation
|
/gcbrun |
kolina
left a comment
There was a problem hiding this comment.
I haven't taken a deep look, but in the current logic we add granular SQL statements as separate tasks.
I'd be great to preserve this consistency (as incremental schema change generates quite a lot of statements).
|
|
||
| // Create temp table for incremental data. | ||
| finalDmlSql += ` | ||
| CREATE OR REPLACE TEMP TABLE ${dataTempTableName} AS ( |
There was a problem hiding this comment.
Not a maintainer of this repo but:
I've noticed this in the GCP Dataform compiled version of an EXTEND schema change incremental table, but for the life of me I cannot discern why this intermediate table is needed. It doesn't seem to be used for anything other than the MERGE statement, and the INSERT on line 420. Could the query be used directly in the MERGE, and in the INSERT? Materializing into an intermediate table increases the cost of the query, sometimes by quite a lot.
There was a problem hiding this comment.
The dataTempTableName is used to ensure the columns selected for the MERGE or INSERT operation exactly match the schema determined after potential ALTER TABLE operations, as reflected in the temp_table_columns variable. The final DML statement is constructed dynamically within EXECUTE IMMEDIATE using column lists derived from temp_table_columns (e.g., dataform_columns_list). To use these dynamic column lists in the SELECT part of the MERGE's USING clause or the INSERT's SELECT statement, the source data must be in a table format. Using the raw table.incrementalQuery directly would not allow dynamic column selection based on dataform_columns_list within the EXECUTE IMMEDIATE context.
There was a problem hiding this comment.
Hey, we actually don't have a specific reason for doing it. It is clean up leftover from the when we were using store procedure inside the execute immediate. @SuchodolskiEdvin it might be worth running some tests on the engine directly with the generated queries with table.incrementalQuery instead of temp table.
@spatel11 Thanks for pointing this out. Please feel free to open a feature request in our public tracker and we will take a look and plan this change.
https://issuetracker.google.com/issues?q=status:open%20componentid:1193995&s=created_time:desc
There was a problem hiding this comment.
Done, thank you: https://issuetracker.google.com/issues/493440483
| }); | ||
| } | ||
|
|
||
| private safeCallProcedure( |
There was a problem hiding this comment.
The name should also describe the fact that procedure is being removed in all cases.
| ) | ||
| ) | ||
| ); | ||
| const onSchemaChange = table.onSchemaChange || dataform.OnSchemaChange.IGNORE; |
There was a problem hiding this comment.
?? operator is more appropriate here.
|
|
WDYT, about such devision: Task 1: Create the Procedure. Task 1 (the CREATE PROCEDURE statement) still contains the main, complex SQL logic from incrementalSchemaChangeBody(). Trying to divide the contents of the procedure body into multiple separate Task.statement calls from the CLI side is indeed hard to achieve and not feasible. This is because BigQuery SQL Scripting features (like DECLARE, SET, variable scope, control flow) require the entire logic to run within a single script execution context, which the BEGIN...END block of the procedure provides. |
This sounds reasonable to me. I'd recommend refactoring it a bit so it'll be a bit more structured and ordered
|
c1c02cc to
5e3f9f9
Compare
| emptyTempTableName, | ||
| dataTempTableName, | ||
| shortEmptyTableName |
There was a problem hiding this comment.
Do we need to pass both shortEmptyTableName and emptyTempTableName here?
There was a problem hiding this comment.
We could remove emptyTempTableName from the parameters and just compute it inside incrementalSchemaChangeBody() using shortEmptyTableName. However, the parent function buildIncrementalSchemaChangeTasks() still needs emptyTempTableName anyway to pass it to safeCallAndDropProcedure(). Since the parent has to compute it, passing it down to the child prevents us from duplicating the resolveTarget logic in two different places. So I would stay with this implementation.
There was a problem hiding this comment.
Can you pass ITarget instances as function arguments and call resolveTarget or name where you actually need a specific string?
It'll be more readable than passing these strings that have quite similar names
| );`; | ||
| } | ||
|
|
||
| private executeMergeOrInsertSql( |
There was a problem hiding this comment.
So it seems that you added a new logic of generating MERGE or INSERT statements from what we had before in mergeInto / insertInto functions.
I think we should avoid duplicating this logic and different behaviour when incremental schema change is enabled / disabled. So let's consolidate them
There was a problem hiding this comment.
Consolidating them entirely is tricky because they operate differently: mergeInto/insertInto generate static SQL, while executeMergeOrInsertSql generates dynamic SQL via EXECUTE IMMEDIATE. Forcing both into one function hurts readability.
To reduce duplication I suggest we extract the shared ON clause logic (handling uniqueKey and updatePartitionFilter) into a single helper method, and rename the original methods to clarify they are for static SQL.
WDYT?
There was a problem hiding this comment.
Consolidating them entirely is tricky because they operate differently: mergeInto/insertInto generate static SQL, while executeMergeOrInsertSql generates dynamic SQL via EXECUTE IMMEDIATE. Forcing both into one function hurts readability.
I assume this logic with EXECUTE IMMEDIATE was copied from GCP code? If static SQL in CLI didn't use it before, I don't think you should introduce it as part of your changes: merge / insert logic should stay consistent for all modes of schema updates including IGNORE.
We can bring EXECUTE IMMEDIATE to synchronize GCP and CLI, but it can be done later.
There was a problem hiding this comment.
These tests don't seem to check a lot, for example there is no checks about how logic of merge / insert or actual altering schema is generated
There was a problem hiding this comment.
I enhanced the tests. Additionally they check the STRING_AGG formatting for ADD COLUMN and DROP COLUMN command, and they verify the structural generation of the procedural MERGE (when a uniqueKey is present) and INSERT (when a uniqueKey is absent) queries.
This change introduces support for the onSchemaChange option in incremental tables for the BigQuery adapter. It adds the incrementalSchemaChangeBody() strategy to handle schema changes. End-to-end tests have been acreated to verify the new functionality.
- Split sql queries to sun-functions - Fixed function names
5e3f9f9 to
c0ea194
Compare
I was wondering if we can actually create an e2e test for the onSchemaChange parameter? During a dataform run --dry-run, Dataform connects to BigQuery to fetch the current warehouseState. Because our example_incremental table doesn't actually exist in the test BigQuery project yet, tableMetadata comes back as undefined. When that happens, shouldWriteIncrementally() correctly evaluates to false, and Dataform natively falls back to generating a standard CREATE OR REPLACE TABLE. Because the CLI does not expose a flag to inject a fake warehouseState JSON, I am not sure how we can force the E2E test to simulate a pre-existing table. I still can create the test to verify the parser and the dry-run fallback. Technically we could run a real dataform run first to create the table in BigQuery, followed by a --dry-run to check the generated MERGE query. However, this would mutate the test BigQuery project, potentially introducing flakiness if tests run concurrently, and it would slow down the suite. The SQL generation is already mostly covered in the execution_sql_test.ts unit tests by mocking tableMetadata. WDYT? |
you can create a separate dataset per-run and prepare the table with the desired state in operation action before running the one you actually want to test. Just make sure to have a tear-down mechanism. |
Igor's proposal is good |
| emptyTempTableName, | ||
| dataTempTableName, | ||
| shortEmptyTableName |
There was a problem hiding this comment.
Can you pass ITarget instances as function arguments and call resolveTarget or name where you actually need a specific string?
It'll be more readable than passing these strings that have quite similar names
| database: string, | ||
| schema: string, | ||
| targetName: string, |
There was a problem hiding this comment.
You can also just pass ITarget here
| ): string { | ||
| const onSchemaChange = table.onSchemaChange || dataform.OnSchemaChange.IGNORE; | ||
| let sql = ` | ||
| -- Apply schema change strategy (${dataform.OnSchemaChange[onSchemaChange]}).`; |
There was a problem hiding this comment.
what is the purpose of this?
| table: dataform.ITable, | ||
| qualifiedTargetTableName: string | ||
| ): string { | ||
| const query = this.getIncrementalQuery(table); |
There was a problem hiding this comment.
Do I understand correctly that this query will be inserted twice into generated SQL? Could we avoid it?
| );`; | ||
| } | ||
|
|
||
| private executeMergeOrInsertSql( |
There was a problem hiding this comment.
Consolidating them entirely is tricky because they operate differently: mergeInto/insertInto generate static SQL, while executeMergeOrInsertSql generates dynamic SQL via EXECUTE IMMEDIATE. Forcing both into one function hurts readability.
I assume this logic with EXECUTE IMMEDIATE was copied from GCP code? If static SQL in CLI didn't use it before, I don't think you should introduce it as part of your changes: merge / insert logic should stay consistent for all modes of schema updates including IGNORE.
We can bring EXECUTE IMMEDIATE to synchronize GCP and CLI, but it can be done later.
| /create or replace procedure `project-id.dataset-id.df_osc_.*`\(\)\s+options\(strict_mode=false\)/i | ||
| ); | ||
| expect(procedureSql).to.include( | ||
| `"Schema mismatch defined by on_schema_change = 'FAIL'. Added columns: %T, removed columns: %T"` | ||
| ); | ||
| expect(procedureSql).to.match(/call `project-id.dataset-id.df_osc_.*`\(\)/i); | ||
| expect(procedureSql).to.include("EXCEPTION WHEN ERROR THEN"); | ||
| expect(procedureSql).to.match(/drop procedure if exists `project-id.dataset-id.df_osc_.*`/i); |
There was a problem hiding this comment.
Can we just match the whole query?
If there are non-deterministic places in logic, you can inject an interface for UUID generation to have a determinitic output in tests.
For readability you can put expected "golden" SQL into separate file and read it in tests for comparison
This change introduces support for the onSchemaChange option in incremental tables for the BigQuery adapter. It adds the incrementalSchemaChangeBody() strategy to handle schema changes.
End-to-end tests have been created to verify the new functionality.