@@ -122,7 +122,116 @@ from (${query}) as insertions`;
122122 return this . CompilationSql . resolveTarget ( target ) ;
123123 }
124124
125- private createProcedureName ( target : dataform . ITarget , uniqueId : string ) : string {
125+ public publishTasks (
126+ table : dataform . ITable ,
127+ runConfig : dataform . IRunConfig ,
128+ tableMetadata ?: dataform . ITableMetadata
129+ ) : Tasks {
130+ const tasks = new Tasks ( ) ;
131+
132+ this . preOps ( table , runConfig , tableMetadata ) . forEach ( statement => tasks . add ( statement ) ) ;
133+
134+ const baseTableType = this . baseTableType ( table . enumType ) ;
135+ if ( tableMetadata && tableMetadata . type !== baseTableType ) {
136+ tasks . add (
137+ Task . statement ( this . dropIfExists ( table . target , this . oppositeTableType ( baseTableType ) ) )
138+ ) ;
139+ }
140+
141+ if ( table . enumType === dataform . TableType . INCREMENTAL ) {
142+ if ( ! this . shouldWriteIncrementally ( table , runConfig , tableMetadata ) ) {
143+ tasks . add ( Task . statement ( this . createOrReplace ( table ) ) ) ;
144+ } else {
145+ const onSchemaChange = table . onSchemaChange || dataform . OnSchemaChange . IGNORE ;
146+ switch ( onSchemaChange ) {
147+ case dataform . OnSchemaChange . FAIL :
148+ case dataform . OnSchemaChange . EXTEND :
149+ case dataform . OnSchemaChange . SYNCHRONIZE :
150+ const uniqueId = crypto . randomUUID ( ) . replace ( / - / g, "_" ) ;
151+
152+ const shortEmptyTableName = `${ table . target . name } _df_temp_${ uniqueId } _empty` ;
153+ const emptyTempTableName = this . resolveTarget ( {
154+ ...table . target ,
155+ name : shortEmptyTableName
156+ } ) ;
157+
158+ const shortDataTableName = shortEmptyTableName . replace ( "_empty" , "_data" ) ;
159+ const dataTempTableName = this . resolveTarget ( {
160+ ...table . target ,
161+ name : shortDataTableName
162+ } ) ;
163+
164+ const procedureName = this . createProcedureName ( table . target , uniqueId ) ;
165+ const procedureBody = this . incrementalSchemaChangeBody (
166+ table ,
167+ this . resolveTarget ( table . target ) ,
168+ emptyTempTableName ,
169+ dataTempTableName ,
170+ shortEmptyTableName
171+ ) ;
172+
173+ const createProcedureSql = `CREATE OR REPLACE PROCEDURE ${ procedureName } ()
174+ OPTIONS(strict_mode=false)
175+ BEGIN
176+ ${ procedureBody }
177+ END;` ;
178+ const callProcedureSql = this . safeCallProcedure (
179+ procedureName ,
180+ emptyTempTableName ,
181+ dataTempTableName
182+ ) ;
183+ tasks . add ( Task . statement ( createProcedureSql + "\n" + callProcedureSql ) ) ;
184+ break ;
185+ case dataform . OnSchemaChange . IGNORE :
186+ default :
187+ tasks . add (
188+ Task . statement (
189+ table . uniqueKey && table . uniqueKey . length > 0
190+ ? this . mergeInto (
191+ table . target ,
192+ tableMetadata ?. fields . map ( f => f . name ) ,
193+ this . where ( table . incrementalQuery || table . query , table . where ) ,
194+ table . uniqueKey ,
195+ table . bigquery && table . bigquery . updatePartitionFilter
196+ )
197+ : this . insertInto (
198+ table . target ,
199+ tableMetadata ?. fields . map ( f => f . name ) . map ( column => `\`${ column } \`` ) ,
200+ this . where ( table . incrementalQuery || table . query , table . where )
201+ )
202+ )
203+ ) ;
204+ break ;
205+ }
206+ }
207+ } else {
208+ tasks . add ( Task . statement ( this . createOrReplace ( table ) ) ) ;
209+ }
210+
211+ this . postOps ( table , runConfig , tableMetadata ) . forEach ( statement => tasks . add ( statement ) ) ;
212+
213+ return tasks . concatenate ( ) ;
214+ }
215+
216+ public assertTasks (
217+ assertion : dataform . IAssertion ,
218+ projectConfig : dataform . IProjectConfig ,
219+ ) : Tasks {
220+ const tasks = new Tasks ( ) ;
221+ const target = assertion . target ;
222+ // Create the view to check syntax of assertion
223+ tasks . add ( Task . statement ( this . createOrReplaceView ( target , assertion . query ) ) ) ;
224+
225+ // Add assertion check
226+ tasks . add ( Task . assertion ( `select sum(1) as row_count from ${ this . resolveTarget ( target ) } ` ) ) ;
227+ return tasks ;
228+ }
229+
230+ public dropIfExists ( target : dataform . ITarget , type : dataform . TableMetadata . Type ) {
231+ return `drop ${ this . tableTypeAsSql ( type ) } if exists ${ this . resolveTarget ( target ) } ` ;
232+ }
233+
234+ private createProcedureName ( target : dataform . ITarget , uniqueId : string ) : string {
126235 // Procedure names cannot contain hyphens.
127236 const sanitizedUniqueId = uniqueId . replace ( / - / g, "_" ) ;
128237 return this . resolveTarget ( {
@@ -348,115 +457,6 @@ DROP TABLE IF EXISTS ${dataTempTableName};
348457 return statements . join ( "\n\n" ) ;
349458 }
350459
351- public publishTasks (
352- table : dataform . ITable ,
353- runConfig : dataform . IRunConfig ,
354- tableMetadata ?: dataform . ITableMetadata
355- ) : Tasks {
356- const tasks = new Tasks ( ) ;
357-
358- this . preOps ( table , runConfig , tableMetadata ) . forEach ( statement => tasks . add ( statement ) ) ;
359-
360- const baseTableType = this . baseTableType ( table . enumType ) ;
361- if ( tableMetadata && tableMetadata . type !== baseTableType ) {
362- tasks . add (
363- Task . statement ( this . dropIfExists ( table . target , this . oppositeTableType ( baseTableType ) ) )
364- ) ;
365- }
366-
367- if ( table . enumType === dataform . TableType . INCREMENTAL ) {
368- if ( ! this . shouldWriteIncrementally ( table , runConfig , tableMetadata ) ) {
369- tasks . add ( Task . statement ( this . createOrReplace ( table ) ) ) ;
370- } else {
371- const onSchemaChange = table . onSchemaChange || dataform . OnSchemaChange . IGNORE ;
372- switch ( onSchemaChange ) {
373- case dataform . OnSchemaChange . FAIL :
374- case dataform . OnSchemaChange . EXTEND :
375- case dataform . OnSchemaChange . SYNCHRONIZE :
376- const uniqueId = crypto . randomUUID ( ) . replace ( / - / g, "_" ) ;
377-
378- const shortEmptyTableName = `${ table . target . name } _df_temp_${ uniqueId } _empty` ;
379- const emptyTempTableName = this . resolveTarget ( {
380- ...table . target ,
381- name : shortEmptyTableName
382- } ) ;
383-
384- const shortDataTableName = shortEmptyTableName . replace ( "_empty" , "_data" ) ;
385- const dataTempTableName = this . resolveTarget ( {
386- ...table . target ,
387- name : shortDataTableName
388- } ) ;
389-
390- const procedureName = this . createProcedureName ( table . target , uniqueId ) ;
391- const procedureBody = this . incrementalSchemaChangeBody (
392- table ,
393- this . resolveTarget ( table . target ) ,
394- emptyTempTableName ,
395- dataTempTableName ,
396- shortEmptyTableName
397- ) ;
398-
399- const createProcedureSql = `CREATE OR REPLACE PROCEDURE ${ procedureName } ()
400- OPTIONS(strict_mode=false)
401- BEGIN
402- ${ procedureBody }
403- END;` ;
404- const callProcedureSql = this . safeCallProcedure (
405- procedureName ,
406- emptyTempTableName ,
407- dataTempTableName
408- ) ;
409- tasks . add ( Task . statement ( createProcedureSql + "\n" + callProcedureSql ) ) ;
410- break ;
411- case dataform . OnSchemaChange . IGNORE :
412- default :
413- tasks . add (
414- Task . statement (
415- table . uniqueKey && table . uniqueKey . length > 0
416- ? this . mergeInto (
417- table . target ,
418- tableMetadata ?. fields . map ( f => f . name ) ,
419- this . where ( table . incrementalQuery || table . query , table . where ) ,
420- table . uniqueKey ,
421- table . bigquery && table . bigquery . updatePartitionFilter
422- )
423- : this . insertInto (
424- table . target ,
425- tableMetadata ?. fields . map ( f => f . name ) . map ( column => `\`${ column } \`` ) ,
426- this . where ( table . incrementalQuery || table . query , table . where )
427- )
428- )
429- ) ;
430- break ;
431- }
432- }
433- } else {
434- tasks . add ( Task . statement ( this . createOrReplace ( table ) ) ) ;
435- }
436-
437- this . postOps ( table , runConfig , tableMetadata ) . forEach ( statement => tasks . add ( statement ) ) ;
438-
439- return tasks . concatenate ( ) ;
440- }
441-
442- public assertTasks (
443- assertion : dataform . IAssertion ,
444- projectConfig : dataform . IProjectConfig ,
445- ) : Tasks {
446- const tasks = new Tasks ( ) ;
447- const target = assertion . target ;
448- // Create the view to check syntax of assertion
449- tasks . add ( Task . statement ( this . createOrReplaceView ( target , assertion . query ) ) ) ;
450-
451- // Add assertion check
452- tasks . add ( Task . assertion ( `select sum(1) as row_count from ${ this . resolveTarget ( target ) } ` ) ) ;
453- return tasks ;
454- }
455-
456- public dropIfExists ( target : dataform . ITarget , type : dataform . TableMetadata . Type ) {
457- return `drop ${ this . tableTypeAsSql ( type ) } if exists ${ this . resolveTarget ( target ) } ` ;
458- }
459-
460460 private createOrReplace ( table : dataform . ITable ) {
461461 const options = [ ] ;
462462 if ( table . bigquery && table . bigquery . partitionBy && table . bigquery . partitionExpirationDays ) {
0 commit comments