Skip to content

Support onSchemaChange for incremental tables#2101

Open
SuchodolskiEdvin wants to merge 4 commits intomainfrom
on_schema_change
Open

Support onSchemaChange for incremental tables#2101
SuchodolskiEdvin wants to merge 4 commits intomainfrom
on_schema_change

Conversation

@SuchodolskiEdvin
Copy link
Copy Markdown
Collaborator

This change introduces support for the onSchemaChange option in incremental tables for the BigQuery adapter. It adds the incrementalSchemaChangeBody() strategy to handle schema changes.

End-to-end tests have been created to verify the new functionality.

@SuchodolskiEdvin SuchodolskiEdvin requested a review from a team as a code owner March 3, 2026 14:21
@SuchodolskiEdvin SuchodolskiEdvin requested review from krushangSk17 and removed request for a team March 3, 2026 14:21
@apilaskowski
Copy link
Copy Markdown
Collaborator

/gcbrun

Copy link
Copy Markdown
Contributor

@kolina kolina left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I haven't taken a deep look, but in the current logic we add granular SQL statements as separate tasks.

I'd be great to preserve this consistency (as incremental schema change generates quite a lot of statements).

Comment thread cli/api/dbadapters/execution_sql.ts Outdated

// Create temp table for incremental data.
finalDmlSql += `
CREATE OR REPLACE TEMP TABLE ${dataTempTableName} AS (
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not a maintainer of this repo but:

I've noticed this in the GCP Dataform compiled version of an EXTEND schema change incremental table, but for the life of me I cannot discern why this intermediate table is needed. It doesn't seem to be used for anything other than the MERGE statement, and the INSERT on line 420. Could the query be used directly in the MERGE, and in the INSERT? Materializing into an intermediate table increases the cost of the query, sometimes by quite a lot.

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The dataTempTableName is used to ensure the columns selected for the MERGE or INSERT operation exactly match the schema determined after potential ALTER TABLE operations, as reflected in the temp_table_columns variable. The final DML statement is constructed dynamically within EXECUTE IMMEDIATE using column lists derived from temp_table_columns (e.g., dataform_columns_list). To use these dynamic column lists in the SELECT part of the MERGE's USING clause or the INSERT's SELECT statement, the source data must be in a table format. Using the raw table.incrementalQuery directly would not allow dynamic column selection based on dataform_columns_list within the EXECUTE IMMEDIATE context.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hey, we actually don't have a specific reason for doing it. It is clean up leftover from the when we were using store procedure inside the execute immediate. @SuchodolskiEdvin it might be worth running some tests on the engine directly with the generated queries with table.incrementalQuery instead of temp table.

@spatel11 Thanks for pointing this out. Please feel free to open a feature request in our public tracker and we will take a look and plan this change.
https://issuetracker.google.com/issues?q=status:open%20componentid:1193995&s=created_time:desc

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Comment thread cli/api/dbadapters/execution_sql.ts Outdated
});
}

private safeCallProcedure(
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The name should also describe the fact that procedure is being removed in all cases.

Comment thread cli/api/dbadapters/execution_sql.ts Outdated
)
)
);
const onSchemaChange = table.onSchemaChange || dataform.OnSchemaChange.IGNORE;
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

?? operator is more appropriate here.

@SuchodolskiEdvin
Copy link
Copy Markdown
Collaborator Author

I haven't taken a deep look, but in the current logic we add granular SQL statements as separate tasks.

I'd be great to preserve this consistency (as incremental schema change generates quite a lot of statements).

@SuchodolskiEdvin
Copy link
Copy Markdown
Collaborator Author

@kolina

WDYT, about such devision:

Task 1: Create the Procedure.
Task 2: Execute the block that Calls the Procedure and handles exceptions.
Task 3: Drop the Procedure.
This structure aligns with the CLI's granular task pattern for operations sent to BigQuery.

Task 1 (the CREATE PROCEDURE statement) still contains the main, complex SQL logic from incrementalSchemaChangeBody(). Trying to divide the contents of the procedure body into multiple separate Task.statement calls from the CLI side is indeed hard to achieve and not feasible. This is because BigQuery SQL Scripting features (like DECLARE, SET, variable scope, control flow) require the entire logic to run within a single script execution context, which the BEGIN...END block of the procedure provides.

@kolina
Copy link
Copy Markdown
Contributor

kolina commented Mar 17, 2026

@kolina

WDYT, about such devision:

Task 1: Create the Procedure. Task 2: Execute the block that Calls the Procedure and handles exceptions. Task 3: Drop the Procedure. This structure aligns with the CLI's granular task pattern for operations sent to BigQuery.

Task 1 (the CREATE PROCEDURE statement) still contains the main, complex SQL logic from incrementalSchemaChangeBody(). Trying to divide the contents of the procedure body into multiple separate Task.statement calls from the CLI side is indeed hard to achieve and not feasible. This is because BigQuery SQL Scripting features (like DECLARE, SET, variable scope, control flow) require the entire logic to run within a single script execution context, which the BEGIN...END block of the procedure provides.

This sounds reasonable to me. I'd recommend refactoring it a bit so it'll be a bit more structured and ordered

  • separate SQL generation pieces into separate small sub-functions
  • have high-level functions that will be calling these sub-functions so the overall algorithm (i.e. steps to assemble a query) and SQL generation pieces are not intertwined

Copy link
Copy Markdown
Contributor

@kolina kolina left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ideally we should add an e2e test checking a generated query like here

Comment thread cli/api/dbadapters/execution_sql.ts Outdated
Comment thread cli/api/dbadapters/execution_sql.ts Outdated
Comment on lines +220 to +222
emptyTempTableName,
dataTempTableName,
shortEmptyTableName
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need to pass both shortEmptyTableName and emptyTempTableName here?

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We could remove emptyTempTableName from the parameters and just compute it inside incrementalSchemaChangeBody() using shortEmptyTableName. However, the parent function buildIncrementalSchemaChangeTasks() still needs emptyTempTableName anyway to pass it to safeCallAndDropProcedure(). Since the parent has to compute it, passing it down to the child prevents us from duplicating the resolveTarget logic in two different places. So I would stay with this implementation.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you pass ITarget instances as function arguments and call resolveTarget or name where you actually need a specific string?

It'll be more readable than passing these strings that have quite similar names

Comment thread cli/api/dbadapters/execution_sql.ts Outdated
Comment thread cli/api/dbadapters/execution_sql.ts Outdated
Comment thread cli/api/dbadapters/execution_sql.ts Outdated
Comment thread cli/api/dbadapters/execution_sql.ts Outdated
);`;
}

private executeMergeOrInsertSql(
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So it seems that you added a new logic of generating MERGE or INSERT statements from what we had before in mergeInto / insertInto functions.

I think we should avoid duplicating this logic and different behaviour when incremental schema change is enabled / disabled. So let's consolidate them

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Consolidating them entirely is tricky because they operate differently: mergeInto/insertInto generate static SQL, while executeMergeOrInsertSql generates dynamic SQL via EXECUTE IMMEDIATE. Forcing both into one function hurts readability.

To reduce duplication I suggest we extract the shared ON clause logic (handling uniqueKey and updatePartitionFilter) into a single helper method, and rename the original methods to clarify they are for static SQL.

WDYT?

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Consolidating them entirely is tricky because they operate differently: mergeInto/insertInto generate static SQL, while executeMergeOrInsertSql generates dynamic SQL via EXECUTE IMMEDIATE. Forcing both into one function hurts readability.

I assume this logic with EXECUTE IMMEDIATE was copied from GCP code? If static SQL in CLI didn't use it before, I don't think you should introduce it as part of your changes: merge / insert logic should stay consistent for all modes of schema updates including IGNORE.

We can bring EXECUTE IMMEDIATE to synchronize GCP and CLI, but it can be done later.

Comment thread cli/api/dbadapters/execution_sql.ts Outdated
Comment thread cli/api/dbadapters/execution_sql.ts Outdated
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These tests don't seem to check a lot, for example there is no checks about how logic of merge / insert or actual altering schema is generated

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I enhanced the tests. Additionally they check the STRING_AGG formatting for ADD COLUMN and DROP COLUMN command, and they verify the structural generation of the procedural MERGE (when a uniqueKey is present) and INSERT (when a uniqueKey is absent) queries.

This change introduces support for the onSchemaChange option in incremental tables for the BigQuery adapter. It adds the incrementalSchemaChangeBody() strategy to handle schema changes.

End-to-end tests have been acreated to verify the new functionality.
- Split sql queries to sun-functions
- Fixed function names
@SuchodolskiEdvin
Copy link
Copy Markdown
Collaborator Author

Ideally we should add an e2e test checking a generated query like here

I was wondering if we can actually create an e2e test for the onSchemaChange parameter? During a dataform run --dry-run, Dataform connects to BigQuery to fetch the current warehouseState. Because our example_incremental table doesn't actually exist in the test BigQuery project yet, tableMetadata comes back as undefined. When that happens, shouldWriteIncrementally() correctly evaluates to false, and Dataform natively falls back to generating a standard CREATE OR REPLACE TABLE.

Because the CLI does not expose a flag to inject a fake warehouseState JSON, I am not sure how we can force the E2E test to simulate a pre-existing table. I still can create the test to verify the parser and the dry-run fallback.

Technically we could run a real dataform run first to create the table in BigQuery, followed by a --dry-run to check the generated MERGE query. However, this would mutate the test BigQuery project, potentially introducing flakiness if tests run concurrently, and it would slow down the suite. The SQL generation is already mostly covered in the execution_sql_test.ts unit tests by mocking tableMetadata.

WDYT?

@ikholopov-omni
Copy link
Copy Markdown
Collaborator

Ideally we should add an e2e test checking a generated query like here

I was wondering if we can actually create an e2e test for the onSchemaChange parameter? During a dataform run --dry-run, Dataform connects to BigQuery to fetch the current warehouseState. Because our example_incremental table doesn't actually exist in the test BigQuery project yet, tableMetadata comes back as undefined. When that happens, shouldWriteIncrementally() correctly evaluates to false, and Dataform natively falls back to generating a standard CREATE OR REPLACE TABLE.

Because the CLI does not expose a flag to inject a fake warehouseState JSON, I am not sure how we can force the E2E test to simulate a pre-existing table. I still can create the test to verify the parser and the dry-run fallback.

Technically we could run a real dataform run first to create the table in BigQuery, followed by a --dry-run to check the generated MERGE query. However, this would mutate the test BigQuery project, potentially introducing flakiness if tests run concurrently, and it would slow down the suite. The SQL generation is already mostly covered in the execution_sql_test.ts unit tests by mocking tableMetadata.

WDYT?

you can create a separate dataset per-run and prepare the table with the desired state in operation action before running the one you actually want to test. Just make sure to have a tear-down mechanism.

@kolina
Copy link
Copy Markdown
Contributor

kolina commented Apr 14, 2026

you can create a separate dataset per-run and prepare the table with the desired state in operation action before running the one you actually want to test. Just make sure to have a tear-down mechanism.

Igor's proposal is good

Comment on lines +220 to +222
emptyTempTableName,
dataTempTableName,
shortEmptyTableName
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you pass ITarget instances as function arguments and call resolveTarget or name where you actually need a specific string?

It'll be more readable than passing these strings that have quite similar names

Comment on lines +266 to +268
database: string,
schema: string,
targetName: string,
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You can also just pass ITarget here

): string {
const onSchemaChange = table.onSchemaChange || dataform.OnSchemaChange.IGNORE;
let sql = `
-- Apply schema change strategy (${dataform.OnSchemaChange[onSchemaChange]}).`;
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what is the purpose of this?

table: dataform.ITable,
qualifiedTargetTableName: string
): string {
const query = this.getIncrementalQuery(table);
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do I understand correctly that this query will be inserted twice into generated SQL? Could we avoid it?

Comment thread cli/api/dbadapters/execution_sql.ts Outdated
);`;
}

private executeMergeOrInsertSql(
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Consolidating them entirely is tricky because they operate differently: mergeInto/insertInto generate static SQL, while executeMergeOrInsertSql generates dynamic SQL via EXECUTE IMMEDIATE. Forcing both into one function hurts readability.

I assume this logic with EXECUTE IMMEDIATE was copied from GCP code? If static SQL in CLI didn't use it before, I don't think you should introduce it as part of your changes: merge / insert logic should stay consistent for all modes of schema updates including IGNORE.

We can bring EXECUTE IMMEDIATE to synchronize GCP and CLI, but it can be done later.

Comment on lines +50 to +57
/create or replace procedure `project-id.dataset-id.df_osc_.*`\(\)\s+options\(strict_mode=false\)/i
);
expect(procedureSql).to.include(
`"Schema mismatch defined by on_schema_change = 'FAIL'. Added columns: %T, removed columns: %T"`
);
expect(procedureSql).to.match(/call `project-id.dataset-id.df_osc_.*`\(\)/i);
expect(procedureSql).to.include("EXCEPTION WHEN ERROR THEN");
expect(procedureSql).to.match(/drop procedure if exists `project-id.dataset-id.df_osc_.*`/i);
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we just match the whole query?

If there are non-deterministic places in logic, you can inject an interface for UUID generation to have a determinitic output in tests.

For readability you can put expected "golden" SQL into separate file and read it in tests for comparison

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

6 participants