Skip to content

Commit 4246ebb

Browse files
JiT compilation support in JS API (#2086)
JS API for JiT compilation actions definition (mutually exclusive with AoT properties) and JiT context/result typings.
1 parent 9b4e4e6 commit 4246ebb

11 files changed

Lines changed: 591 additions & 115 deletions

core/actions/incremental_table.ts

Lines changed: 61 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -8,10 +8,10 @@ import {
88
TableType
99
} from "df/core/actions";
1010
import { Assertion } from "df/core/actions/assertion";
11-
import { Table } from "df/core/actions/table";
11+
import { JitTableResult, Table } from "df/core/actions/table";
1212
import { View } from "df/core/actions/view";
1313
import { ColumnDescriptors } from "df/core/column_descriptors";
14-
import { Contextable, ITableContext, Resolvable } from "df/core/contextables";
14+
import { Contextable, ITableContext, JitContextable, Resolvable } from "df/core/contextables";
1515
import * as Path from "df/core/path";
1616
import { Session } from "df/core/session";
1717
import {
@@ -32,6 +32,7 @@ import {
3232
strictKeysOf,
3333
toResolvable,
3434
validateConnectionFormat,
35+
validateNoMixedCompilationMode,
3536
validateQueryString,
3637
validateStorageUriFormat,
3738
} from "df/core/utils";
@@ -86,6 +87,7 @@ export class IncrementalTable extends ActionBuilder<dataform.Table> {
8687
/** @hidden We delay contextification until the final compile step, so hold these here for now. */
8788
public contextableQuery: Contextable<ITableContext, string>;
8889
private contextableWhere: Contextable<ITableContext, string>;
90+
private contextableJitCode: JitContextable<ITableContext, JitTableResult> | undefined;
8991
private contextablePreOps: Array<Contextable<ITableContext, string | string[]>> = [];
9092
private contextablePostOps: Array<Contextable<ITableContext, string | string[]>> = [];
9193

@@ -269,6 +271,15 @@ export class IncrementalTable extends ActionBuilder<dataform.Table> {
269271
return this;
270272
}
271273

274+
public jitCode(jitCode: JitContextable<ITableContext, JitTableResult>) {
275+
if (!this.proto.actionDescriptor) {
276+
this.proto.actionDescriptor = {};
277+
}
278+
this.proto.actionDescriptor.compilationMode = dataform.ActionCompilationMode.ACTION_COMPILATION_MODE_JIT;
279+
this.contextableJitCode = jitCode;
280+
return this;
281+
}
282+
272283
/**
273284
* Sets a pre-operation to run before the query is run. This is often used for temporarily
274285
* granting permission to access source tables.
@@ -517,6 +528,45 @@ export class IncrementalTable extends ActionBuilder<dataform.Table> {
517528

518529
/** @hidden */
519530
public compile() {
531+
if (this.contextableJitCode) {
532+
this.compileJit();
533+
} else {
534+
this.compileAot();
535+
}
536+
537+
538+
if (this.proto.bigquery?.connection) {
539+
validateConnectionFormat(this.proto.bigquery.connection);
540+
}
541+
542+
if (this.proto.bigquery?.storageUri) {
543+
validateStorageUriFormat(this.proto.bigquery.storageUri);
544+
}
545+
546+
return verifyObjectMatchesProto(
547+
dataform.Table,
548+
this.proto,
549+
VerifyProtoErrorBehaviour.SUGGEST_REPORTING_TO_DATAFORM_TEAM
550+
);
551+
}
552+
553+
private compileJit() {
554+
validateNoMixedCompilationMode(
555+
this.session,
556+
this.getFileName(),
557+
this.contextableQuery,
558+
this.contextableWhere,
559+
this.contextablePostOps,
560+
this.contextablePreOps
561+
);
562+
563+
if (!this.proto.actionDescriptor) {
564+
this.proto.actionDescriptor = {};
565+
}
566+
this.proto.jitCode = this.contextableJitCode.toString();
567+
}
568+
569+
private compileAot() {
520570
const context = new IncrementalTableContext(this);
521571
const incrementalContext = new IncrementalTableContext(this, true);
522572

@@ -540,20 +590,6 @@ export class IncrementalTable extends ActionBuilder<dataform.Table> {
540590

541591
validateQueryString(this.session, this.proto.query, this.proto.fileName);
542592
validateQueryString(this.session, this.proto.incrementalQuery, this.proto.fileName);
543-
544-
if (this.proto.bigquery?.connection) {
545-
validateConnectionFormat(this.proto.bigquery.connection);
546-
}
547-
548-
if (this.proto.bigquery?.storageUri) {
549-
validateStorageUriFormat(this.proto.bigquery.storageUri);
550-
}
551-
552-
return verifyObjectMatchesProto(
553-
dataform.Table,
554-
this.proto,
555-
VerifyProtoErrorBehaviour.SUGGEST_REPORTING_TO_DATAFORM_TEAM
556-
);
557593
}
558594

559595
/** @hidden */
@@ -633,15 +669,15 @@ export class IncrementalTable extends ActionBuilder<dataform.Table> {
633669
}
634670
}
635671
if (unverifiedConfig.iceberg) {
636-
if (
637-
unverifiedConfig.iceberg.fileFormat &&
638-
unverifiedConfig.iceberg.fileFormat.toUpperCase() !== 'PARQUET'
639-
) {
640-
throw new ReferenceError(
641-
`Unexpected file format; only "PARQUET" is allowed, got "${unverifiedConfig.iceberg.fileFormat}".`
642-
);
643-
}
672+
if (
673+
unverifiedConfig.iceberg.fileFormat &&
674+
unverifiedConfig.iceberg.fileFormat.toUpperCase() !== 'PARQUET'
675+
) {
676+
throw new ReferenceError(
677+
`Unexpected file format; only "PARQUET" is allowed, got "${unverifiedConfig.iceberg.fileFormat}".`
678+
);
644679
}
680+
}
645681

646682
const config = verifyObjectMatchesProto(
647683
dataform.ActionConfig.IncrementalTableConfig,
@@ -699,7 +735,7 @@ export class IncrementalTable extends ActionBuilder<dataform.Table> {
699735
* @hidden
700736
*/
701737
export class IncrementalTableContext implements ITableContext {
702-
constructor(private incrementalTable: IncrementalTable, private isIncremental = false) {}
738+
constructor(private incrementalTable: IncrementalTable, private isIncremental = false) { }
703739

704740
public self(): string {
705741
return this.resolve(this.incrementalTable.getTarget());

core/actions/incremental_table_test.ts

Lines changed: 69 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@ import {
88
exampleBuiltInAssertions,
99
exampleBuiltInAssertionsAsYaml
1010
} from "df/core/actions/index_test";
11-
import {dataform} from "df/protos/ts";
11+
import { dataform } from "df/protos/ts";
1212
import { asPlainObject, suite, test } from "df/testing";
1313
import { TmpDirFixture } from "df/testing/fixtures";
1414
import {
@@ -396,7 +396,7 @@ defaultIcebergConfig:
396396
}
397397
}`,
398398
expected: {
399-
target: {name: "incremental_table1", schema: "dataset1", database: "defaultProject"},
399+
target: { name: "incremental_table1", schema: "dataset1", database: "defaultProject" },
400400
bigquery: {
401401
tableFormat: "ICEBERG",
402402
fileFormat: "PARQUET",
@@ -423,7 +423,7 @@ defaultIcebergConfig:
423423
}
424424
}`,
425425
expected: {
426-
target: {name: "incremental_table2", schema: "dataset2", database: "defaultProject"},
426+
target: { name: "incremental_table2", schema: "dataset2", database: "defaultProject" },
427427
bigquery: {
428428
tableFormat: "ICEBERG",
429429
fileFormat: "PARQUET",
@@ -449,7 +449,7 @@ defaultIcebergConfig:
449449
}
450450
}`,
451451
expected: {
452-
target: {name: "incremental_table3", schema: "dataset3", database: "defaultProject"},
452+
target: { name: "incremental_table3", schema: "dataset3", database: "defaultProject" },
453453
bigquery: {
454454
tableFormat: "ICEBERG",
455455
fileFormat: "PARQUET",
@@ -475,7 +475,7 @@ defaultIcebergConfig:
475475
}
476476
}`,
477477
expected: {
478-
target: {name: "my-incremental", schema: "my-dataset", database: "defaultProject"},
478+
target: { name: "my-incremental", schema: "my-dataset", database: "defaultProject" },
479479
bigquery: {
480480
tableFormat: "ICEBERG",
481481
fileFormat: "PARQUET",
@@ -500,7 +500,7 @@ defaultIcebergConfig:
500500
}
501501
}`,
502502
expected: {
503-
target: {name: "my-incremental", schema: "defaultDataset", database: "defaultProject"},
503+
target: { name: "my-incremental", schema: "defaultDataset", database: "defaultProject" },
504504
bigquery: {
505505
tableFormat: "ICEBERG",
506506
fileFormat: "PARQUET",
@@ -526,7 +526,7 @@ defaultIcebergConfig:
526526
}
527527
}`,
528528
expected: {
529-
target: {name: "incremental_table6", schema: "dataset6", database: "defaultProject"},
529+
target: { name: "incremental_table6", schema: "dataset6", database: "defaultProject" },
530530
bigquery: {
531531
tableFormat: "ICEBERG",
532532
fileFormat: "PARQUET",
@@ -552,7 +552,7 @@ defaultIcebergConfig:
552552
}
553553
}`,
554554
expected: {
555-
target: {name: "incremental_table7", schema: "dataset7", database: "defaultProject"},
555+
target: { name: "incremental_table7", schema: "dataset7", database: "defaultProject" },
556556
bigquery: {
557557
tableFormat: "ICEBERG",
558558
fileFormat: "PARQUET",
@@ -579,7 +579,7 @@ defaultIcebergConfig:
579579
}
580580
}`,
581581
expected: {
582-
target: {name: "incremental_table8", schema: "dataset8", database: "defaultProject"},
582+
target: { name: "incremental_table8", schema: "dataset8", database: "defaultProject" },
583583
bigquery: {
584584
tableFormat: "ICEBERG",
585585
fileFormat: "PARQUET",
@@ -657,16 +657,16 @@ defaultIcebergConfig:
657657
}
658658
}`,
659659
expected: {
660-
target: {name: "iceberg_incremental_mixed", schema: "mixed_dataset", database: "defaultProject"},
660+
target: { name: "iceberg_incremental_mixed", schema: "mixed_dataset", database: "defaultProject" },
661661
bigquery: {
662662
tableFormat: "ICEBERG",
663663
fileFormat: "PARQUET",
664664
connection: "gcp.us.conn-id",
665665
storageUri: "gs://my-bucket/my-root/my-subpath",
666666
partitionBy: "partition_col",
667667
clusterBy: ["cluster_col1", "cluster_col2"],
668-
labels: {"env": "test", "type": "iceberg"},
669-
additionalOptions: {"key1": "val1", "key2": "val2"},
668+
labels: { "env": "test", "type": "iceberg" },
669+
additionalOptions: { "key1": "val1", "key2": "val2" },
670670
},
671671
},
672672
expectError: false,
@@ -829,7 +829,7 @@ defaultIcebergConfig:
829829
},
830830
expectError: false,
831831
},
832-
{
832+
{
833833
testName: "bucketName not defined in config or workspace settings",
834834
configBlock: `
835835
type: "incremental",
@@ -927,4 +927,60 @@ select \${incremental()} as is_incremental`
927927
expect(compiledTable.target.name).equals("incremental_table_without_default_project");
928928
expect(compiledTable.target.database).equals("");
929929
});
930+
931+
suite("jit compilation", () => {
932+
test("jit compilation is supported", () => {
933+
const projectDir = tmpDirFixture.createNewTmpDir();
934+
fs.writeFileSync(path.join(projectDir, "workflow_settings.yaml"), VALID_WORKFLOW_SETTINGS_YAML);
935+
fs.mkdirSync(path.join(projectDir, "definitions"));
936+
fs.writeFileSync(
937+
path.join(projectDir, "definitions/incremental.js"),
938+
`publish("incremental", {type: "incremental"}).jitCode((ctx) => Promise.resolve({query: "select 1", incrementalQuery: "select 1"}))`
939+
);
940+
941+
const result = runMainInVm(coreExecutionRequestFromPath(projectDir));
942+
943+
expect(result.compile.compiledGraph.graphErrors.compilationErrors).deep.equals([]);
944+
expect(asPlainObject(result.compile.compiledGraph.tables)).deep.equals([
945+
{
946+
target: {
947+
database: "defaultProject",
948+
schema: "defaultDataset",
949+
name: "incremental"
950+
},
951+
canonicalTarget: {
952+
database: "defaultProject",
953+
schema: "defaultDataset",
954+
name: "incremental"
955+
},
956+
type: "incremental",
957+
enumType: "INCREMENTAL",
958+
disabled: false,
959+
protected: false,
960+
hermeticity: "NON_HERMETIC",
961+
onSchemaChange: "IGNORE",
962+
fileName: "definitions/incremental.js",
963+
jitCode: '(ctx) => Promise.resolve({query: "select 1", incrementalQuery: "select 1"})',
964+
actionDescriptor: {
965+
compilationMode: "ACTION_COMPILATION_MODE_JIT"
966+
}
967+
}
968+
]);
969+
});
970+
971+
test("jit compilation fails if query is also provided", () => {
972+
const projectDir = tmpDirFixture.createNewTmpDir();
973+
fs.writeFileSync(path.join(projectDir, "workflow_settings.yaml"), VALID_WORKFLOW_SETTINGS_YAML);
974+
fs.mkdirSync(path.join(projectDir, "definitions"));
975+
fs.writeFileSync(
976+
path.join(projectDir, "definitions/incremental.js"),
977+
`publish("incremental", {type: "incremental"}).jitCode((ctx) => ({query: "select 1", incrementalQuery: "select 1"})).query("select 1")`
978+
);
979+
980+
const result = runMainInVm(coreExecutionRequestFromPath(projectDir));
981+
982+
expect(result.compile.compiledGraph.graphErrors.compilationErrors.length).greaterThan(0);
983+
expect(result.compile.compiledGraph.graphErrors.compilationErrors.some(e => e.message.includes("Cannot mix AoT and JiT compilation"))).equals(true);
984+
});
985+
});
930986
});

core/actions/operation.ts

Lines changed: 32 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
import { verifyObjectMatchesProto, VerifyProtoErrorBehaviour } from "df/common/protos";
22
import { ActionBuilder } from "df/core/actions";
33
import { ColumnDescriptors } from "df/core/column_descriptors";
4-
import { Contextable, IActionContext, Resolvable } from "df/core/contextables";
4+
import { Contextable, IActionContext, JitContextable, Resolvable } from "df/core/contextables";
55
import * as Path from "df/core/path";
66
import { Session } from "df/core/session";
77
import {
@@ -30,6 +30,8 @@ interface ILegacyOperationConfig extends dataform.ActionConfig.OperationConfig {
3030
type: string;
3131
}
3232

33+
export type JitOperationResult = string | string[] | dataform.IJitOperationResult;
34+
3335
/**
3436
* Operations define custom SQL operations that don't fit into the Dataform model of publishing a
3537
* table or writing an assertion.
@@ -89,6 +91,7 @@ export class Operation extends ActionBuilder<dataform.Operation> {
8991

9092
/** @hidden We delay contextification until the final compile step, so hold these here for now. */
9193
private contextableQueries: Contextable<IActionContext, string | string[]>;
94+
private contextableJitCode: JitContextable<IActionContext, JitOperationResult>|undefined;
9295

9396
/** @hidden */
9497
constructor(session?: Session, unverifiedConfig?: any, configPath?: string) {
@@ -170,6 +173,15 @@ export class Operation extends ActionBuilder<dataform.Operation> {
170173
return this;
171174
}
172175

176+
public jitCode(jitCode: JitContextable<IActionContext, JitOperationResult>) {
177+
if (!this.proto.actionDescriptor) {
178+
this.proto.actionDescriptor = {};
179+
}
180+
this.proto.actionDescriptor.compilationMode = dataform.ActionCompilationMode.ACTION_COMPILATION_MODE_JIT;
181+
this.contextableJitCode = jitCode;
182+
return this;
183+
}
184+
173185
/**
174186
* @deprecated Deprecated in favor of
175187
* [OperationConfig.dependencies](configs#dataform-ActionConfig-OperationConfig).
@@ -342,10 +354,14 @@ export class Operation extends ActionBuilder<dataform.Operation> {
342354
);
343355
}
344356

345-
const context = new OperationContext(this);
346357

347-
const appliedQueries = context.apply(this.contextableQueries);
348-
this.proto.queries = typeof appliedQueries === "string" ? [appliedQueries] : appliedQueries;
358+
if (this.contextableJitCode) {
359+
this.compileJit();
360+
} else {
361+
const context = new OperationContext(this);
362+
const appliedQueries = context.apply(this.contextableQueries);
363+
this.proto.queries = typeof appliedQueries === "string" ? [appliedQueries] : appliedQueries;
364+
}
349365

350366
return verifyObjectMatchesProto(
351367
dataform.Operation,
@@ -354,6 +370,18 @@ export class Operation extends ActionBuilder<dataform.Operation> {
354370
);
355371
}
356372

373+
private compileJit() {
374+
if (!!this.contextableQueries) {
375+
const err = new Error(`Cannot mix AoT and JiT compilation in action: ${this.contextableQueries}`);
376+
this.session.compileError(err, this.getFileName());
377+
throw err;
378+
}
379+
if (!this.proto.actionDescriptor) {
380+
this.proto.actionDescriptor = {};
381+
}
382+
this.proto.jitCode = this.contextableJitCode.toString();
383+
}
384+
357385
/**
358386
* @hidden Verify config checks that the constructor provided config matches the expected proto
359387
* structure, or the previously accepted legacy structure. If the legacy structure is used, it is

0 commit comments

Comments
 (0)