@@ -142,45 +142,12 @@ from (${query}) as insertions`;
142142 if ( ! this . shouldWriteIncrementally ( table , runConfig , tableMetadata ) ) {
143143 tasks . add ( Task . statement ( this . createOrReplace ( table ) ) ) ;
144144 } else {
145- const onSchemaChange = table . onSchemaChange || dataform . OnSchemaChange . IGNORE ;
145+ const onSchemaChange = table . onSchemaChange ?? dataform . OnSchemaChange . IGNORE ;
146146 switch ( onSchemaChange ) {
147147 case dataform . OnSchemaChange . FAIL :
148148 case dataform . OnSchemaChange . EXTEND :
149149 case dataform . OnSchemaChange . SYNCHRONIZE :
150- const uniqueId = crypto . randomUUID ( ) . replace ( / - / g, "_" ) ;
151-
152- const shortEmptyTableName = `${ table . target . name } _df_temp_${ uniqueId } _empty` ;
153- const emptyTempTableName = this . resolveTarget ( {
154- ...table . target ,
155- name : shortEmptyTableName
156- } ) ;
157-
158- const shortDataTableName = shortEmptyTableName . replace ( "_empty" , "_data" ) ;
159- const dataTempTableName = this . resolveTarget ( {
160- ...table . target ,
161- name : shortDataTableName
162- } ) ;
163-
164- const procedureName = this . createProcedureName ( table . target , uniqueId ) ;
165- const procedureBody = this . incrementalSchemaChangeBody (
166- table ,
167- this . resolveTarget ( table . target ) ,
168- emptyTempTableName ,
169- dataTempTableName ,
170- shortEmptyTableName
171- ) ;
172-
173- const createProcedureSql = `CREATE OR REPLACE PROCEDURE ${ procedureName } ()
174- OPTIONS(strict_mode=false)
175- BEGIN
176- ${ procedureBody }
177- END;` ;
178- const callProcedureSql = this . safeCallProcedure (
179- procedureName ,
180- emptyTempTableName ,
181- dataTempTableName
182- ) ;
183- tasks . add ( Task . statement ( createProcedureSql + "\n" + callProcedureSql ) ) ;
150+ this . buildIncrementalSchemaChangeTasks ( tasks , table ) ;
184151 break ;
185152 case dataform . OnSchemaChange . IGNORE :
186153 default :
@@ -231,7 +198,47 @@ END;`;
231198 return `drop ${ this . tableTypeAsSql ( type ) } if exists ${ this . resolveTarget ( target ) } ` ;
232199 }
233200
234- private createProcedureName ( target : dataform . ITarget , uniqueId : string ) : string {
201+ private buildIncrementalSchemaChangeTasks ( tasks : Tasks , table : dataform . ITable ) {
202+ const uniqueId = crypto . randomUUID ( ) . replace ( / - / g, "_" ) ;
203+
204+ const shortEmptyTableName = `${ table . target . name } _df_temp_${ uniqueId } _empty` ;
205+ const emptyTempTableName = this . resolveTarget ( {
206+ ...table . target ,
207+ name : shortEmptyTableName
208+ } ) ;
209+
210+ const shortDataTableName = shortEmptyTableName . replace ( "_empty" , "_data" ) ;
211+ const dataTempTableName = this . resolveTarget ( {
212+ ...table . target ,
213+ name : shortDataTableName
214+ } ) ;
215+
216+ const procedureName = this . createProcedureName ( table . target , uniqueId ) ;
217+ const procedureBody = this . incrementalSchemaChangeBody (
218+ table ,
219+ this . resolveTarget ( table . target ) ,
220+ emptyTempTableName ,
221+ dataTempTableName ,
222+ shortEmptyTableName
223+ ) ;
224+
225+ const createProcedureSql = `CREATE OR REPLACE PROCEDURE ${ procedureName } ()
226+ OPTIONS(strict_mode=false)
227+ BEGIN
228+ ${ procedureBody }
229+ END;` ;
230+
231+ const callProcedureSql = this . safeCallAndDropProcedure (
232+ procedureName ,
233+ emptyTempTableName ,
234+ dataTempTableName
235+ ) ;
236+ tasks . add ( Task . statement ( createProcedureSql ) ) ;
237+ tasks . add ( Task . statement ( callProcedureSql ) ) ;
238+ tasks . add ( Task . statement ( `DROP PROCEDURE IF EXISTS ${ procedureName } ;` ) ) ;
239+ }
240+
241+ private createProcedureName ( target : dataform . ITarget , uniqueId : string ) : string {
235242 // Procedure names cannot contain hyphens.
236243 const sanitizedUniqueId = uniqueId . replace ( / - / g, "_" ) ;
237244 return this . resolveTarget ( {
@@ -240,7 +247,7 @@ END;`;
240247 } ) ;
241248 }
242249
243- private safeCallProcedure (
250+ private safeCallAndDropProcedure (
244251 procedureName : string ,
245252 emptyTempTableName : string ,
246253 dataTempTableName : string
@@ -375,26 +382,42 @@ END FOR;
375382 qualifiedTargetTableName : string ,
376383 dataTempTableName : string
377384 ) : string {
378- let finalDmlSql = "\n-- Run final MERGE/INSERT." ;
385+ return [
386+ this . createIncrementalDataTempTableSql ( table , dataTempTableName ) ,
387+ this . declareDataformColumnsListSql ( ) ,
388+ this . executeMergeOrInsertSql ( table , qualifiedTargetTableName , dataTempTableName )
389+ ] . join ( "\n" ) ;
390+ }
379391
380- // Create temp table for incremental data.
381- finalDmlSql += `
392+ private createIncrementalDataTempTableSql ( table : dataform . ITable , dataTempTableName : string ) : string {
393+ return `
382394CREATE OR REPLACE TEMP TABLE ${ dataTempTableName } AS (
383- SELECT * FROM ( ${ table . incrementalQuery || table . query } )
395+ ${ this . where ( table . incrementalQuery || table . query , table . where ) }
384396);` ;
397+ }
385398
386- // Generate dynamic column lists from temp_table_columns.
387- finalDmlSql += `
399+ private declareDataformColumnsListSql ( ) : string {
400+ return `
388401DECLARE dataform_columns_list STRING;
389402SET dataform_columns_list = (
390403 SELECT IFNULL(STRING_AGG(CONCAT('\`', column_name, '\`'), ', '), '')
391404 FROM UNNEST(temp_table_columns)
392405);` ;
406+ }
393407
394- // Run final MERGE/INSERT.
408+ private executeMergeOrInsertSql (
409+ table : dataform . ITable ,
410+ qualifiedTargetTableName : string ,
411+ dataTempTableName : string
412+ ) : string {
395413 if ( table . uniqueKey && table . uniqueKey . length > 0 ) {
396414 const mergeOnClause = table . uniqueKey . map ( k => `T.\`${ k } \` = S.\`${ k } \`` ) . join ( " and " ) ;
397- finalDmlSql += `
415+ const updatePartitionFilter = table . bigquery && table . bigquery . updatePartitionFilter ;
416+ const mergeOnClauseWithFilter = updatePartitionFilter
417+ ? `${ mergeOnClause } and T.${ updatePartitionFilter } `
418+ : mergeOnClause ;
419+
420+ return `
398421DECLARE dataform_columns_merge STRING;
399422SET dataform_columns_merge = (
400423 SELECT IFNULL(STRING_AGG(CONCAT('\`', column_name, '\` = S.\`', column_name, '\`'), ', '), '')
@@ -405,25 +428,22 @@ IF ARRAY_LENGTH(temp_table_columns) > 0 THEN
405428 EXECUTE IMMEDIATE (
406429 "MERGE \`${ qualifiedTargetTableName } \` T " ||
407430 "USING \`${ dataTempTableName } \` S " ||
408- "ON ${ mergeOnClause } " ||
431+ "ON ${ mergeOnClauseWithFilter } " ||
409432 "WHEN MATCHED THEN " ||
410433 " UPDATE SET " || dataform_columns_merge || " " ||
411434 "WHEN NOT MATCHED THEN " ||
412435 " INSERT (" || dataform_columns_list || ") VALUES (" || dataform_columns_list || ")"
413436 );
414- END IF;
415- ` ;
437+ END IF;` ;
416438 } else {
417- finalDmlSql += `
439+ return `
418440IF ARRAY_LENGTH(temp_table_columns) > 0 THEN
419441 EXECUTE IMMEDIATE (
420442 "INSERT INTO \`${ qualifiedTargetTableName } \` (" || dataform_columns_list || ") " ||
421443 "SELECT " || dataform_columns_list || " FROM \`${ dataTempTableName } \`"
422444 );
423- END IF;
424- ` ;
445+ END IF;` ;
425446 }
426- return finalDmlSql ;
427447 }
428448
429449 private cleanupSql ( emptyTempTableName : string , dataTempTableName : string ) : string {
0 commit comments