Skip to content

Commit ad85ce0

Browse files
Support onSchemaChange for incremental tables
This change introduces support for the onSchemaChange option in incremental tables for the BigQuery adapter. It adds the incrementalSchemaChangeBody() strategy to handle schema changes. End-to-end tests have been acreated to verify the new functionality.
1 parent 0a8902e commit ad85ce0

3 files changed

Lines changed: 429 additions & 18 deletions

File tree

cli/api/BUILD

Lines changed: 15 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,13 @@
11
package(default_visibility = ["//visibility:public"])
22

33
load("//tools:ts_library.bzl", "ts_library")
4+
load("//testing:index.bzl", "ts_test_suite")
45

56
ts_library(
67
name = "api",
78
srcs = glob(
89
["**/*.ts"],
9-
exclude = ["utils/**/*.*"],
10+
exclude = ["utils/**/*.*", "**/*_test.ts"],
1011
),
1112
deps = [
1213
"//cli/api/utils",
@@ -42,3 +43,16 @@ ts_library(
4243
"@npm//tmp",
4344
],
4445
)
46+
47+
ts_test_suite(
48+
name = "tests",
49+
srcs = ["dbadapters/execution_sql_test.ts"],
50+
deps = [
51+
":api",
52+
"//core",
53+
"//protos:ts",
54+
"//testing",
55+
"@npm//@types/chai",
56+
"@npm//chai",
57+
],
58+
)

cli/api/dbadapters/execution_sql.ts

Lines changed: 288 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
import * as crypto from "crypto";
12
import * as semver from "semver";
23

34
import { concatenateQueries, Task, Tasks } from "df/cli/api/dbadapters/tasks";
@@ -121,6 +122,232 @@ from (${query}) as insertions`;
121122
return this.CompilationSql.resolveTarget(target);
122123
}
123124

125+
private createProcedureName(target: dataform.ITarget, uniqueId: string): string {
126+
// Procedure names cannot contain hyphens.
127+
const sanitizedUniqueId = uniqueId.replace(/-/g, "_");
128+
return this.resolveTarget({
129+
...target,
130+
name: `df_osc_${sanitizedUniqueId}`
131+
});
132+
}
133+
134+
private safeCallProcedure(
135+
procedureName: string,
136+
emptyTempTableName: string,
137+
dataTempTableName: string
138+
): string {
139+
return `
140+
BEGIN
141+
CALL ${procedureName}();
142+
EXCEPTION WHEN ERROR THEN
143+
DROP TABLE IF EXISTS ${emptyTempTableName};
144+
DROP TABLE IF EXISTS ${dataTempTableName};
145+
DROP PROCEDURE IF EXISTS ${procedureName};
146+
RAISE;
147+
END;
148+
DROP PROCEDURE IF EXISTS ${procedureName};`;
149+
}
150+
151+
private inferSchemaSql(emptyTempTableName: string, query: string): string {
152+
return `
153+
-- Infer schema of new query.
154+
CREATE OR REPLACE TABLE ${emptyTempTableName} AS (
155+
SELECT * FROM (${query}) AS insertions LIMIT 0
156+
);`;
157+
}
158+
159+
private compareSchemasSql(
160+
database: string,
161+
schema: string,
162+
targetName: string,
163+
shortEmptyTableName: string
164+
): string {
165+
return `
166+
-- Compare schemas
167+
DECLARE dataform_columns ARRAY<STRING>;
168+
DECLARE temp_table_columns ARRAY<STRUCT<column_name STRING, data_type STRING>>;
169+
DECLARE columns_added ARRAY<STRUCT<column_name STRING, data_type STRING>>;
170+
DECLARE columns_removed ARRAY<STRING>;
171+
172+
SET dataform_columns = (
173+
SELECT IFNULL(ARRAY_AGG(DISTINCT column_name), [])
174+
FROM \`${database}.${schema}.INFORMATION_SCHEMA.COLUMNS\`
175+
WHERE table_name = '${targetName}'
176+
);
177+
178+
SET temp_table_columns = (
179+
SELECT IFNULL(ARRAY_AGG(STRUCT(column_name, data_type)), [])
180+
FROM \`${database}.${schema}.INFORMATION_SCHEMA.COLUMNS\`
181+
WHERE table_name = '${shortEmptyTableName}'
182+
);
183+
184+
SET columns_added = (
185+
SELECT IFNULL(ARRAY_AGG(column_info), [])
186+
FROM UNNEST(temp_table_columns) AS column_info
187+
WHERE column_info.column_name NOT IN UNNEST(dataform_columns)
188+
);
189+
SET columns_removed = (
190+
SELECT IFNULL(ARRAY_AGG(column_name), [])
191+
FROM UNNEST(dataform_columns) AS column_name
192+
WHERE column_name NOT IN (SELECT col.column_name FROM UNNEST(temp_table_columns) AS col)
193+
);`;
194+
}
195+
196+
private applySchemaChangeStrategySql(
197+
table: dataform.ITable,
198+
qualifiedTargetTableName: string
199+
): string {
200+
const onSchemaChange = table.onSchemaChange || dataform.OnSchemaChange.IGNORE;
201+
let sql = `
202+
-- Apply schema change strategy (${dataform.OnSchemaChange[onSchemaChange]}).`;
203+
204+
switch (onSchemaChange) {
205+
case dataform.OnSchemaChange.FAIL:
206+
sql += `
207+
IF ARRAY_LENGTH(columns_added) > 0 OR ARRAY_LENGTH(columns_removed) > 0 THEN
208+
RAISE USING MESSAGE = FORMAT(
209+
"Schema mismatch defined by on_schema_change = 'FAIL'. Added columns: %t, removed columns: %t",
210+
columns_added,
211+
columns_removed
212+
);
213+
END IF;
214+
`;
215+
break;
216+
case dataform.OnSchemaChange.EXTEND:
217+
sql += `
218+
IF ARRAY_LENGTH(columns_removed) > 0 THEN
219+
RAISE USING MESSAGE = FORMAT(
220+
"Column removals are not allowed when on_schema_change = 'EXTEND'. Removed columns: %t",
221+
columns_removed
222+
);
223+
END IF;
224+
225+
FOR column_info IN (SELECT * FROM UNNEST(columns_added)) DO
226+
EXECUTE IMMEDIATE FORMAT(
227+
"ALTER TABLE ${qualifiedTargetTableName} ADD COLUMN IF NOT EXISTS %s %s",
228+
column_info.column_name,
229+
column_info.data_type
230+
);
231+
END FOR;
232+
`;
233+
break;
234+
case dataform.OnSchemaChange.SYNCHRONIZE:
235+
const uniqueKeys = table.uniqueKey || [];
236+
sql += `
237+
FOR removed_column_name IN (SELECT * FROM UNNEST(columns_removed)) DO
238+
IF removed_column_name IN UNNEST(${JSON.stringify(uniqueKeys)}) THEN
239+
RAISE USING MESSAGE = FORMAT(
240+
"Cannot drop column %s as it is part of the unique key for table ${qualifiedTargetTableName}",
241+
removed_column_name
242+
);
243+
ELSE
244+
EXECUTE IMMEDIATE FORMAT(
245+
"ALTER TABLE ${qualifiedTargetTableName} DROP COLUMN IF EXISTS %s",
246+
removed_column_name
247+
);
248+
END IF;
249+
END FOR;
250+
251+
FOR column_info IN (SELECT * FROM UNNEST(columns_added)) DO
252+
EXECUTE IMMEDIATE FORMAT(
253+
"ALTER TABLE ${qualifiedTargetTableName} ADD COLUMN IF NOT EXISTS %s %s",
254+
column_info.column_name,
255+
column_info.data_type
256+
);
257+
END FOR;
258+
`;
259+
break;
260+
}
261+
return sql;
262+
}
263+
264+
private runFinalDmlSql(
265+
table: dataform.ITable,
266+
qualifiedTargetTableName: string,
267+
dataTempTableName: string
268+
): string {
269+
let finalDmlSql = "\n-- Run final MERGE/INSERT.";
270+
271+
// Create temp table for incremental data.
272+
finalDmlSql += `
273+
CREATE OR REPLACE TEMP TABLE ${dataTempTableName} AS (
274+
SELECT * FROM (${table.incrementalQuery || table.query})
275+
);`;
276+
277+
// Generate dynamic column lists from temp_table_columns.
278+
finalDmlSql += `
279+
DECLARE dataform_columns_list STRING;
280+
SET dataform_columns_list = (
281+
SELECT IFNULL(STRING_AGG(CONCAT('\`', column_name, '\`'), ', '), '')
282+
FROM UNNEST(temp_table_columns)
283+
);`;
284+
285+
// Run final MERGE/INSERT.
286+
if (table.uniqueKey && table.uniqueKey.length > 0) {
287+
const mergeOnClause = table.uniqueKey.map(k => `T.\`${k}\` = S.\`${k}\``).join(" and ");
288+
finalDmlSql += `
289+
DECLARE dataform_columns_merge STRING;
290+
SET dataform_columns_merge = (
291+
SELECT IFNULL(STRING_AGG(CONCAT('\`', column_name, '\` = S.\`', column_name, '\`'), ', '), '')
292+
FROM UNNEST(temp_table_columns)
293+
);
294+
295+
IF ARRAY_LENGTH(temp_table_columns) > 0 THEN
296+
EXECUTE IMMEDIATE (
297+
"MERGE \`${qualifiedTargetTableName}\` T " ||
298+
"USING \`${dataTempTableName}\` S " ||
299+
"ON ${mergeOnClause} " ||
300+
"WHEN MATCHED THEN " ||
301+
" UPDATE SET " || dataform_columns_merge || " " ||
302+
"WHEN NOT MATCHED THEN " ||
303+
" INSERT (" || dataform_columns_list || ") VALUES (" || dataform_columns_list || ")"
304+
);
305+
END IF;
306+
`;
307+
} else {
308+
finalDmlSql += `
309+
IF ARRAY_LENGTH(temp_table_columns) > 0 THEN
310+
EXECUTE IMMEDIATE (
311+
"INSERT INTO \`${qualifiedTargetTableName}\` (" || dataform_columns_list || ") " ||
312+
"SELECT " || dataform_columns_list || " FROM \`${dataTempTableName}\`"
313+
);
314+
END IF;
315+
`;
316+
}
317+
return finalDmlSql;
318+
}
319+
320+
private cleanupSql(emptyTempTableName: string, dataTempTableName: string): string {
321+
return `
322+
-- Cleanup temporary tables.
323+
DROP TABLE IF EXISTS ${emptyTempTableName};
324+
DROP TABLE IF EXISTS ${dataTempTableName};
325+
`;
326+
}
327+
328+
private incrementalSchemaChangeBody(
329+
table: dataform.ITable,
330+
qualifiedTargetTableName: string,
331+
emptyTempTableName: string,
332+
dataTempTableName: string,
333+
shortEmptyTableName: string
334+
): string {
335+
const statements: string[] = [
336+
this.inferSchemaSql(emptyTempTableName, table.incrementalQuery || table.query),
337+
this.compareSchemasSql(
338+
table.target.database,
339+
table.target.schema,
340+
table.target.name,
341+
shortEmptyTableName
342+
),
343+
this.applySchemaChangeStrategySql(table, qualifiedTargetTableName),
344+
this.runFinalDmlSql(table, qualifiedTargetTableName, dataTempTableName),
345+
this.cleanupSql(emptyTempTableName, dataTempTableName)
346+
];
347+
348+
return statements.join("\n\n");
349+
}
350+
124351
public publishTasks(
125352
table: dataform.ITable,
126353
runConfig: dataform.IRunConfig,
@@ -141,23 +368,67 @@ from (${query}) as insertions`;
141368
if (!this.shouldWriteIncrementally(table, runConfig, tableMetadata)) {
142369
tasks.add(Task.statement(this.createOrReplace(table)));
143370
} else {
144-
tasks.add(
145-
Task.statement(
146-
table.uniqueKey && table.uniqueKey.length > 0
147-
? this.mergeInto(
148-
table.target,
149-
tableMetadata?.fields.map(f => f.name),
150-
this.where(table.incrementalQuery || table.query, table.where),
151-
table.uniqueKey,
152-
table.bigquery && table.bigquery.updatePartitionFilter
153-
)
154-
: this.insertInto(
155-
table.target,
156-
tableMetadata?.fields.map(f => f.name).map(column => `\`${column}\``),
157-
this.where(table.incrementalQuery || table.query, table.where)
158-
)
159-
)
160-
);
371+
const onSchemaChange = table.onSchemaChange || dataform.OnSchemaChange.IGNORE;
372+
switch (onSchemaChange) {
373+
case dataform.OnSchemaChange.FAIL:
374+
case dataform.OnSchemaChange.EXTEND:
375+
case dataform.OnSchemaChange.SYNCHRONIZE:
376+
const uniqueId = crypto.randomUUID().replace(/-/g, "_");
377+
378+
const shortEmptyTableName = `${table.target.name}_df_temp_${uniqueId}_empty`;
379+
const emptyTempTableName = this.resolveTarget({
380+
...table.target,
381+
name: shortEmptyTableName
382+
});
383+
384+
const shortDataTableName = shortEmptyTableName.replace("_empty", "_data");
385+
const dataTempTableName = this.resolveTarget({
386+
...table.target,
387+
name: shortDataTableName
388+
});
389+
390+
const procedureName = this.createProcedureName(table.target, uniqueId);
391+
const procedureBody = this.incrementalSchemaChangeBody(
392+
table,
393+
this.resolveTarget(table.target),
394+
emptyTempTableName,
395+
dataTempTableName,
396+
shortEmptyTableName
397+
);
398+
399+
const createProcedureSql = `CREATE OR REPLACE PROCEDURE ${procedureName}()
400+
OPTIONS(strict_mode=false)
401+
BEGIN
402+
${procedureBody}
403+
END;`;
404+
const callProcedureSql = this.safeCallProcedure(
405+
procedureName,
406+
emptyTempTableName,
407+
dataTempTableName
408+
);
409+
tasks.add(Task.statement(createProcedureSql + "\n" + callProcedureSql));
410+
break;
411+
case dataform.OnSchemaChange.IGNORE:
412+
default:
413+
tasks.add(
414+
Task.statement(
415+
table.uniqueKey && table.uniqueKey.length > 0
416+
? this.mergeInto(
417+
table.target,
418+
tableMetadata?.fields.map(f => f.name),
419+
this.where(table.incrementalQuery || table.query, table.where),
420+
table.uniqueKey,
421+
table.bigquery && table.bigquery.updatePartitionFilter
422+
)
423+
: this.insertInto(
424+
table.target,
425+
tableMetadata?.fields.map(f => f.name).map(column => `\`${column}\``),
426+
this.where(table.incrementalQuery || table.query, table.where)
427+
)
428+
)
429+
);
430+
break;
431+
}
161432
}
162433
} else {
163434
tasks.add(Task.statement(this.createOrReplace(table)));

0 commit comments

Comments
 (0)