Skip to content

Commit c0ea194

Browse files
fix: Address feedback for PR #2101
1 parent 86fe771 commit c0ea194

3 files changed

Lines changed: 118 additions & 114 deletions

File tree

cli/api/BUILD

Lines changed: 2 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,9 @@
1-
load("//tools:ts_library.bzl", "ts_library")
21
load("//testing:index.bzl", "ts_test_suite")
32
load("//tools:node_modules.bzl", "node_modules")
3+
load("//tools:ts_library.bzl", "ts_library")
44

55
package(default_visibility = ["//visibility:public"])
66

7-
load("//tools:ts_library.bzl", "ts_library")
8-
97
ts_library(
108
name = "api",
119
srcs = glob(
@@ -57,23 +55,11 @@ node_modules(
5755
],
5856
)
5957

60-
# ts_test_suite(
61-
# name = "tests",
62-
# srcs = ["dbadapters/execution_sql_test.ts"],
63-
# deps = [
64-
# ":api",
65-
# "//core",
66-
# "//protos:ts",
67-
# "//testing",
68-
# "@npm//@types/chai",
69-
# "@npm//chai",
70-
# ],
71-
# )
72-
7358
ts_test_suite(
7459
name = "tests",
7560
srcs = [
7661
"utils_test.ts",
62+
"execution_sql_test.ts",
7763
],
7864
data = [
7965
":node_modules",

cli/api/dbadapters/execution_sql.ts

Lines changed: 89 additions & 92 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,3 @@
1-
import * as crypto from "crypto";
21
import * as semver from "semver";
32

43
import { concatenateQueries, Task, Tasks } from "df/cli/api/dbadapters/tasks";
@@ -122,6 +121,10 @@ from (${query}) as insertions`;
122121
return this.CompilationSql.resolveTarget(target);
123122
}
124123

124+
public getIncrementalQuery(table: dataform.ITable): string {
125+
return this.where(table.incrementalQuery || table.query, table.where);
126+
}
127+
125128
public publishTasks(
126129
table: dataform.ITable,
127130
runConfig: dataform.IRunConfig,
@@ -157,14 +160,14 @@ from (${query}) as insertions`;
157160
? this.mergeInto(
158161
table.target,
159162
tableMetadata?.fields.map(f => f.name),
160-
this.where(table.incrementalQuery || table.query, table.where),
163+
this.getIncrementalQuery(table),
161164
table.uniqueKey,
162165
table.bigquery && table.bigquery.updatePartitionFilter
163166
)
164167
: this.insertInto(
165168
table.target,
166169
tableMetadata?.fields.map(f => f.name).map(column => `\`${column}\``),
167-
this.where(table.incrementalQuery || table.query, table.where)
170+
this.getIncrementalQuery(table)
168171
)
169172
)
170173
);
@@ -199,26 +202,19 @@ from (${query}) as insertions`;
199202
}
200203

201204
private buildIncrementalSchemaChangeTasks(tasks: Tasks, table: dataform.ITable) {
202-
const uniqueId = crypto.randomUUID().replace(/-/g, "_");
205+
const uniqueId = Math.random().toString(36).substring(2);
203206

204207
const shortEmptyTableName = `${table.target.name}_df_temp_${uniqueId}_empty`;
205208
const emptyTempTableName = this.resolveTarget({
206209
...table.target,
207210
name: shortEmptyTableName
208211
});
209212

210-
const shortDataTableName = shortEmptyTableName.replace("_empty", "_data");
211-
const dataTempTableName = this.resolveTarget({
212-
...table.target,
213-
name: shortDataTableName
214-
});
215-
216213
const procedureName = this.createProcedureName(table.target, uniqueId);
217214
const procedureBody = this.incrementalSchemaChangeBody(
218215
table,
219216
this.resolveTarget(table.target),
220217
emptyTempTableName,
221-
dataTempTableName,
222218
shortEmptyTableName
223219
);
224220

@@ -230,43 +226,37 @@ END;`;
230226

231227
const callProcedureSql = this.safeCallAndDropProcedure(
232228
procedureName,
233-
emptyTempTableName,
234-
dataTempTableName
229+
emptyTempTableName
235230
);
236231
tasks.add(Task.statement(createProcedureSql));
237232
tasks.add(Task.statement(callProcedureSql));
238-
tasks.add(Task.statement(`DROP PROCEDURE IF EXISTS ${procedureName};`));
239233
}
240234

241235
private createProcedureName(target: dataform.ITarget, uniqueId: string): string {
242-
// Procedure names cannot contain hyphens.
243-
const sanitizedUniqueId = uniqueId.replace(/-/g, "_");
244236
return this.resolveTarget({
245237
...target,
246-
name: `df_osc_${sanitizedUniqueId}`
238+
name: `df_osc_${uniqueId}`
247239
});
248240
}
249241

250242
private safeCallAndDropProcedure(
251243
procedureName: string,
252-
emptyTempTableName: string,
253-
dataTempTableName: string
244+
emptyTempTableName: string
254245
): string {
255246
return `
256247
BEGIN
257248
CALL ${procedureName}();
258249
EXCEPTION WHEN ERROR THEN
259250
DROP TABLE IF EXISTS ${emptyTempTableName};
260-
DROP TABLE IF EXISTS ${dataTempTableName};
261251
DROP PROCEDURE IF EXISTS ${procedureName};
262252
RAISE;
263253
END;
264254
DROP PROCEDURE IF EXISTS ${procedureName};`;
265255
}
266256

267-
private inferSchemaSql(emptyTempTableName: string, query: string): string {
257+
private createEmptyTempTableSql(emptyTempTableName: string, query: string): string {
268258
return `
269-
-- Infer schema of new query.
259+
-- Create empty table to extract schema of new query.
270260
CREATE OR REPLACE TABLE ${emptyTempTableName} AS (
271261
SELECT * FROM (${query}) AS insertions LIMIT 0
272262
);`;
@@ -322,7 +312,7 @@ SET columns_removed = (
322312
sql += `
323313
IF ARRAY_LENGTH(columns_added) > 0 OR ARRAY_LENGTH(columns_removed) > 0 THEN
324314
RAISE USING MESSAGE = FORMAT(
325-
"Schema mismatch defined by on_schema_change = 'FAIL'. Added columns: %t, removed columns: %t",
315+
"Schema mismatch defined by on_schema_change = 'FAIL'. Added columns: %T, removed columns: %T",
326316
columns_added,
327317
columns_removed
328318
);
@@ -333,69 +323,73 @@ END IF;
333323
sql += `
334324
IF ARRAY_LENGTH(columns_removed) > 0 THEN
335325
RAISE USING MESSAGE = FORMAT(
336-
"Column removals are not allowed when on_schema_change = 'EXTEND'. Removed columns: %t",
326+
"Column removals are not allowed when on_schema_change = 'EXTEND'. Removed columns: %T",
337327
columns_removed
338328
);
339329
END IF;
340330
341-
FOR column_info IN (SELECT * FROM UNNEST(columns_added)) DO
342-
EXECUTE IMMEDIATE FORMAT(
343-
"ALTER TABLE ${qualifiedTargetTableName} ADD COLUMN IF NOT EXISTS %s %s",
344-
column_info.column_name,
345-
column_info.data_type
346-
);
347-
END FOR;
331+
${this.alterTableAddColumnsSql(qualifiedTargetTableName)}
348332
`;
349333
break;
350334
case dataform.OnSchemaChange.SYNCHRONIZE:
351335
const uniqueKeys = table.uniqueKey || [];
352336
sql += `
353-
FOR removed_column_name IN (SELECT * FROM UNNEST(columns_removed)) DO
354-
IF removed_column_name IN UNNEST(${JSON.stringify(uniqueKeys)}) THEN
355-
RAISE USING MESSAGE = FORMAT(
356-
"Cannot drop column %s as it is part of the unique key for table ${qualifiedTargetTableName}",
357-
removed_column_name
358-
);
359-
ELSE
360-
EXECUTE IMMEDIATE FORMAT(
361-
"ALTER TABLE ${qualifiedTargetTableName} DROP COLUMN IF EXISTS %s",
362-
removed_column_name
363-
);
364-
END IF;
365-
END FOR;
366-
367-
FOR column_info IN (SELECT * FROM UNNEST(columns_added)) DO
368-
EXECUTE IMMEDIATE FORMAT(
369-
"ALTER TABLE ${qualifiedTargetTableName} ADD COLUMN IF NOT EXISTS %s %s",
370-
column_info.column_name,
371-
column_info.data_type
337+
DECLARE invalid_removed_columns ARRAY<STRING>;
338+
SET invalid_removed_columns = (
339+
SELECT IFNULL(ARRAY_AGG(col), []) FROM UNNEST(columns_removed) AS col WHERE col IN UNNEST(${JSON.stringify(uniqueKeys)})
340+
);
341+
342+
IF ARRAY_LENGTH(invalid_removed_columns) > 0 THEN
343+
RAISE USING MESSAGE = FORMAT(
344+
"Cannot drop columns %T as they are part of the unique key for table ${qualifiedTargetTableName}",
345+
invalid_removed_columns
346+
);
347+
END IF;
348+
349+
IF ARRAY_LENGTH(columns_removed) > 0 THEN
350+
EXECUTE IMMEDIATE (
351+
"ALTER TABLE ${qualifiedTargetTableName} " ||
352+
(
353+
SELECT STRING_AGG(FORMAT("DROP COLUMN IF EXISTS %s", col), ", ")
354+
FROM UNNEST(columns_removed) AS col
355+
)
372356
);
373-
END FOR;
357+
END IF;
358+
359+
${this.alterTableAddColumnsSql(qualifiedTargetTableName)}
374360
`;
375361
break;
376362
}
377363
return sql;
378364
}
379365

366+
private alterTableAddColumnsSql(qualifiedTargetTableName: string): string {
367+
return `IF ARRAY_LENGTH(columns_added) > 0 THEN
368+
EXECUTE IMMEDIATE (
369+
"ALTER TABLE ${qualifiedTargetTableName} " ||
370+
(
371+
SELECT STRING_AGG(FORMAT("ADD COLUMN IF NOT EXISTS %s %s", column_info.column_name, column_info.data_type), ", ")
372+
FROM UNNEST(columns_added) AS column_info
373+
)
374+
);
375+
END IF;`;
376+
}
377+
380378
private runFinalDmlSql(
381379
table: dataform.ITable,
382-
qualifiedTargetTableName: string,
383-
dataTempTableName: string
380+
qualifiedTargetTableName: string
384381
): string {
382+
const query = this.getIncrementalQuery(table);
383+
const escapedQuery = query.replace(/\\/g, "\\\\").replace(/"/g, '\\"');
384+
385385
return [
386-
this.createIncrementalDataTempTableSql(table, dataTempTableName),
387386
this.declareDataformColumnsListSql(),
388-
this.executeMergeOrInsertSql(table, qualifiedTargetTableName, dataTempTableName)
387+
table.uniqueKey && table.uniqueKey.length > 0
388+
? this.buildDynamicMergeSql(table, qualifiedTargetTableName, escapedQuery)
389+
: this.buildDynamicInsertSql(table, qualifiedTargetTableName, escapedQuery)
389390
].join("\n");
390391
}
391392

392-
private createIncrementalDataTempTableSql(table: dataform.ITable, dataTempTableName: string): string {
393-
return `
394-
CREATE OR REPLACE TEMP TABLE ${dataTempTableName} AS (
395-
${this.where(table.incrementalQuery || table.query, table.where)}
396-
);`;
397-
}
398-
399393
private declareDataformColumnsListSql(): string {
400394
return `
401395
DECLARE dataform_columns_list STRING;
@@ -405,73 +399,76 @@ SET dataform_columns_list = (
405399
);`;
406400
}
407401

408-
private executeMergeOrInsertSql(
402+
private buildDynamicMergeSql(
409403
table: dataform.ITable,
410404
qualifiedTargetTableName: string,
411-
dataTempTableName: string
405+
escapedQuery: string
412406
): string {
413-
if (table.uniqueKey && table.uniqueKey.length > 0) {
414-
const mergeOnClause = table.uniqueKey.map(k => `T.\`${k}\` = S.\`${k}\``).join(" and ");
415-
const updatePartitionFilter = table.bigquery && table.bigquery.updatePartitionFilter;
416-
const mergeOnClauseWithFilter = updatePartitionFilter
417-
? `${mergeOnClause} and T.${updatePartitionFilter}`
418-
: mergeOnClause;
419-
420-
return `
407+
const mergeOnClause = table.uniqueKey.map(k => `T.\`${k}\` = S.\`${k}\``).join(" and ");
408+
const updatePartitionFilter = table.bigquery && table.bigquery.updatePartitionFilter;
409+
const mergeOnClauseWithFilter = updatePartitionFilter
410+
? `${mergeOnClause} and T.${updatePartitionFilter}`
411+
: mergeOnClause;
412+
const escapedMergeOnClauseWithFilter = mergeOnClauseWithFilter.replace(/\\/g, "\\\\").replace(/"/g, '\\"');
413+
414+
return `
421415
DECLARE dataform_columns_merge STRING;
422416
SET dataform_columns_merge = (
423417
SELECT IFNULL(STRING_AGG(CONCAT('\`', column_name, '\` = S.\`', column_name, '\`'), ', '), '')
424418
FROM UNNEST(temp_table_columns)
425419
);
426420
427-
IF ARRAY_LENGTH(temp_table_columns) > 0 THEN
428421
EXECUTE IMMEDIATE (
429-
"MERGE \`${qualifiedTargetTableName}\` T " ||
430-
"USING \`${dataTempTableName}\` S " ||
431-
"ON ${mergeOnClauseWithFilter} " ||
422+
"MERGE ${qualifiedTargetTableName} T " ||
423+
"USING (" ||
424+
"""${escapedQuery}""" ||
425+
") S " ||
426+
"ON ${escapedMergeOnClauseWithFilter} " ||
432427
"WHEN MATCHED THEN " ||
433428
" UPDATE SET " || dataform_columns_merge || " " ||
434429
"WHEN NOT MATCHED THEN " ||
435430
" INSERT (" || dataform_columns_list || ") VALUES (" || dataform_columns_list || ")"
436-
);
437-
END IF;`;
438-
} else {
439-
return `
440-
IF ARRAY_LENGTH(temp_table_columns) > 0 THEN
431+
);`;
432+
}
433+
434+
private buildDynamicInsertSql(
435+
table: dataform.ITable,
436+
qualifiedTargetTableName: string,
437+
escapedQuery: string
438+
): string {
439+
return `
441440
EXECUTE IMMEDIATE (
442-
"INSERT INTO \`${qualifiedTargetTableName}\` (" || dataform_columns_list || ") " ||
443-
"SELECT " || dataform_columns_list || " FROM \`${dataTempTableName}\`"
444-
);
445-
END IF;`;
446-
}
441+
"INSERT INTO ${qualifiedTargetTableName} (" || dataform_columns_list || ") " ||
442+
"SELECT " || dataform_columns_list || " FROM (" ||
443+
"""${escapedQuery}""" ||
444+
")"
445+
);`;
447446
}
448447

449-
private cleanupSql(emptyTempTableName: string, dataTempTableName: string): string {
448+
private cleanupSql(emptyTempTableName: string): string {
450449
return `
451450
-- Cleanup temporary tables.
452451
DROP TABLE IF EXISTS ${emptyTempTableName};
453-
DROP TABLE IF EXISTS ${dataTempTableName};
454452
`;
455453
}
456454

457455
private incrementalSchemaChangeBody(
458456
table: dataform.ITable,
459457
qualifiedTargetTableName: string,
460458
emptyTempTableName: string,
461-
dataTempTableName: string,
462459
shortEmptyTableName: string
463460
): string {
464461
const statements: string[] = [
465-
this.inferSchemaSql(emptyTempTableName, table.incrementalQuery || table.query),
462+
this.createEmptyTempTableSql(emptyTempTableName, this.getIncrementalQuery(table)),
466463
this.compareSchemasSql(
467464
table.target.database,
468465
table.target.schema,
469466
table.target.name,
470467
shortEmptyTableName
471468
),
472469
this.applySchemaChangeStrategySql(table, qualifiedTargetTableName),
473-
this.runFinalDmlSql(table, qualifiedTargetTableName, dataTempTableName),
474-
this.cleanupSql(emptyTempTableName, dataTempTableName)
470+
this.runFinalDmlSql(table, qualifiedTargetTableName),
471+
this.cleanupSql(emptyTempTableName)
475472
];
476473

477474
return statements.join("\n\n");

0 commit comments

Comments
 (0)