Skip to content

Commit b7b40ec

Browse files
JiT compilation stage (#2090)
* JiT Contexts - Add original request & simplify constructor Simplify constructor by explicitly passing and exposing the orignal compilation request (which already contains most of required data). * JiT compilation stage implementation Implement JiT compilation stage in Core: - a new entry point jitCompiler(rpcCallback), which instantiates a single compile method, performing a compilation against a supplied RPC implementation. - table/operations/incremental table context(s) construction and invocation of the JiT code.
1 parent 353f0f9 commit b7b40ec

8 files changed

Lines changed: 335 additions & 48 deletions

File tree

core/BUILD

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@ ts_library(
3333
"contextables.ts",
3434
"extension.ts",
3535
"index.ts",
36+
"jit_compiler.ts",
3637
"jit_context.ts",
3738
"main.ts",
3839
"path.ts",
@@ -77,6 +78,7 @@ ts_test_suite(
7778
"actions/table_test.ts",
7879
"actions/test_test.ts",
7980
"actions/view_test.ts",
81+
"jit_compiler_test.ts",
8082
"jit_context_test.ts",
8183
],
8284
data = [

core/contextables.ts

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -92,6 +92,8 @@ export type JitContext<T> = T & {
9292
adapter: dataform.DbAdapter,
9393
/** JiT data object. */
9494
data?: { [k: string]: any },
95+
/** Original JiT compilation request. */
96+
request: dataform.IJitCompilationRequest,
9597
};
9698

9799
/**

core/index.ts

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
import { compile as compiler } from "df/core/compilers";
22
import { IDataformExtension } from "df/core/extension";
3+
import { IJitCompiler, jitCompiler } from "df/core/jit_compiler";
34
import { main } from "df/core/main";
45
import { Session } from "df/core/session";
56
import { version } from "df/core/version";
@@ -29,4 +30,4 @@ function indexFileGenerator() {
2930
// These exports constitute the public API of @dataform/core.
3031
// They must also be listed in packages/@dataform/core/index.ts.
3132
// Changes to these will break @dataform/cli, so take care!
32-
export { compiler, IDataformExtension, indexFileGenerator, main, session, supportedFeatures, version };
33+
export { compiler, IDataformExtension, indexFileGenerator, IJitCompiler, jitCompiler, main, session, supportedFeatures, version };

core/jit_compiler.ts

Lines changed: 128 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,128 @@
1+
import * as $protobuf from "protobufjs";
2+
3+
import { JitOperationResult } from "df/core/actions/operation";
4+
import { JitTableResult } from "df/core/actions/table";
5+
import { IActionContext, ITableContext, JitContext } from "df/core/contextables";
6+
import { IncrementalTableJitContext, SqlActionJitContext, TableJitContext } from "df/core/jit_context";
7+
import { dataform } from "df/protos/ts";
8+
9+
function makeMainBody<Context, T>(code: string): (jctx: JitContext<Context>) => Promise<T> {
10+
return (
11+
jctx => {
12+
// tslint:disable-next-line: tsr-detect-eval-with-expression
13+
const body = new Function(
14+
"jctx", `const mainAsync = ${code};\nreturn mainAsync(jctx);`
15+
) as (jctx: JitContext<Context>) => Promise<T>;
16+
return body(jctx);
17+
});
18+
}
19+
20+
function makeJitTableResult(result: JitTableResult): dataform.IJitTableResult {
21+
let jitResult: dataform.IJitTableResult = {};
22+
if (typeof result === "string") {
23+
jitResult.query = result;
24+
} else {
25+
jitResult = result;
26+
}
27+
28+
return dataform.JitTableResult.create(jitResult);
29+
}
30+
31+
function jitCompileOperation(
32+
request: dataform.IJitCompilationRequest,
33+
adapter: dataform.DbAdapter,
34+
): Promise<dataform.IJitOperationResult> {
35+
const mainBody = makeMainBody<IActionContext, JitOperationResult>(request.jitCode);
36+
37+
const jctx: JitContext<IActionContext> = new SqlActionJitContext(
38+
adapter, request,
39+
);
40+
return mainBody(jctx).then(mainResult => {
41+
let queries: string[] | null = [];
42+
if (typeof mainResult === "string") {
43+
queries.push(mainResult);
44+
} else if (Array.isArray(mainResult)) {
45+
queries.push(...mainResult);
46+
} else {
47+
queries = mainResult.queries;
48+
}
49+
50+
return dataform.JitOperationResult.create({ queries });
51+
});
52+
}
53+
54+
function jitCompileTable(
55+
request: dataform.IJitCompilationRequest,
56+
adapter: dataform.DbAdapter,
57+
): Promise<dataform.IJitTableResult> {
58+
const mainBody = makeMainBody<ITableContext, JitTableResult>(request.jitCode);
59+
60+
const jctx: JitContext<ITableContext> = new TableJitContext(
61+
adapter, request,
62+
);
63+
return mainBody(jctx).then(makeJitTableResult);
64+
}
65+
66+
function jitCompileIncrementalTable(
67+
request: dataform.IJitCompilationRequest,
68+
adapter: dataform.DbAdapter,
69+
): Promise<dataform.IJitIncrementalTableResult> {
70+
const mainBody = makeMainBody<ITableContext, JitTableResult>(request.jitCode);
71+
72+
const incrementalJctx = new IncrementalTableJitContext(
73+
adapter, request, true,
74+
);
75+
const regularJctx = new IncrementalTableJitContext(
76+
adapter, request, false,
77+
);
78+
79+
return Promise.all([
80+
mainBody(incrementalJctx),
81+
mainBody(regularJctx),
82+
]).then(([incrementalResult, regularResult]) => {
83+
return dataform.JitIncrementalTableResult.create({
84+
incremental: makeJitTableResult(incrementalResult),
85+
regular: makeJitTableResult(regularResult),
86+
});
87+
});
88+
}
89+
90+
export interface IJitCompiler {
91+
compile: (request: Uint8Array) => Promise<Uint8Array>;
92+
}
93+
94+
/** RPC callback, implementing DbAdapter. */
95+
export type RpcCallback = (method: string, request: Uint8Array, callback: (error: Error | null, response: Uint8Array) => void) => void;
96+
97+
export function jitCompile(request: dataform.IJitCompilationRequest, rpcCallback: RpcCallback): Promise<dataform.IJitCompilationResponse> {
98+
const rpcImpl: $protobuf.RPCImpl = (method, internalRequest, callback) => {
99+
rpcCallback(method.name, internalRequest, callback);
100+
};
101+
const dbAdapter = dataform.DbAdapter.create(rpcImpl);
102+
103+
switch (request.compilationTargetType) {
104+
case dataform.JitCompilationTargetType.JIT_COMPILATION_TARGET_TYPE_OPERATION:
105+
return jitCompileOperation(request, dbAdapter).then(
106+
operation => dataform.JitCompilationResponse.create({ operation }));
107+
case dataform.JitCompilationTargetType.JIT_COMPILATION_TARGET_TYPE_TABLE:
108+
return jitCompileTable(request, dbAdapter).then(
109+
table => dataform.JitCompilationResponse.create({ table }));
110+
case dataform.JitCompilationTargetType.JIT_COMPILATION_TARGET_TYPE_INCREMENTAL_TABLE:
111+
return jitCompileIncrementalTable(request, dbAdapter).then(
112+
incrementalTable => dataform.JitCompilationResponse.create({ incrementalTable }));
113+
default:
114+
throw new Error(`Unrecognized compilation target type: ${request.compilationTargetType}`);
115+
}
116+
}
117+
118+
/** Main entry point for the JiT compiler. */
119+
export function jitCompiler(rpcCallback: RpcCallback): IJitCompiler {
120+
return {
121+
compile: (request: Uint8Array) => {
122+
const requestMessage = dataform.JitCompilationRequest.decode(request);
123+
return jitCompile(requestMessage, rpcCallback).then(
124+
response => dataform.JitCompilationResponse.encode(response).finish()
125+
);
126+
}
127+
};
128+
}

core/jit_compiler_test.ts

Lines changed: 140 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,140 @@
1+
import { expect } from "chai";
2+
3+
import { jitCompile } from "df/core/jit_compiler";
4+
import { dataform } from "df/protos/ts";
5+
import { suite, test } from "df/testing";
6+
7+
suite("jit_compiler", () => {
8+
const rpcCallback: (method: string, request: Uint8Array, callback: (error: Error | null, response: Uint8Array) => void) => void =
9+
(method, request, callback) => { callback(null, new Uint8Array()); };
10+
11+
const target = dataform.Target.create({
12+
database: "db",
13+
schema: "schema",
14+
name: "name"
15+
});
16+
17+
suite("jitCompileOperation", () => {
18+
test("compiles operation returning string", async () => {
19+
const request = dataform.JitCompilationRequest.create({
20+
jitCode: `async (ctx) => "SELECT 1"`,
21+
target,
22+
jitData: {},
23+
compilationTargetType: dataform.JitCompilationTargetType.JIT_COMPILATION_TARGET_TYPE_OPERATION,
24+
});
25+
const result = await jitCompile(request, rpcCallback);
26+
expect(result.operation.queries).to.deep.equal(["SELECT 1"]);
27+
});
28+
29+
test("compiles operation returning array", async () => {
30+
const request = dataform.JitCompilationRequest.create({
31+
jitCode: `async (ctx) => ["SELECT 1", "SELECT 2"]`,
32+
target,
33+
jitData: {},
34+
compilationTargetType: dataform.JitCompilationTargetType.JIT_COMPILATION_TARGET_TYPE_OPERATION,
35+
});
36+
const result = await jitCompile(request, rpcCallback);
37+
expect(result.operation.queries).to.deep.equal(["SELECT 1", "SELECT 2"]);
38+
});
39+
40+
test("compiles operation returning object", async () => {
41+
const request = dataform.JitCompilationRequest.create({
42+
jitCode: `async (ctx) => ({ queries: ["SELECT 1"] })`,
43+
target,
44+
jitData: {},
45+
compilationTargetType: dataform.JitCompilationTargetType.JIT_COMPILATION_TARGET_TYPE_OPERATION,
46+
});
47+
const result = await jitCompile(request, rpcCallback);
48+
expect(result.operation.queries).to.deep.equal(["SELECT 1"]);
49+
});
50+
51+
test("compiles operation using context", async () => {
52+
const request = dataform.JitCompilationRequest.create({
53+
jitCode: `async (ctx) => ({ queries: [\`SELECT "\${ctx.name()}"\`] })`,
54+
target,
55+
jitData: {},
56+
compilationTargetType: dataform.JitCompilationTargetType.JIT_COMPILATION_TARGET_TYPE_OPERATION,
57+
});
58+
const result = await jitCompile(request, rpcCallback);
59+
expect(result.operation.queries).to.deep.equal(['SELECT "name"']);
60+
});
61+
62+
test("compiles operation with arrow function returning promise", async () => {
63+
const request = dataform.JitCompilationRequest.create({
64+
jitCode: `(ctx) => Promise.resolve("SELECT 1")`,
65+
target,
66+
jitData: {},
67+
compilationTargetType: dataform.JitCompilationTargetType.JIT_COMPILATION_TARGET_TYPE_OPERATION,
68+
});
69+
const result = await jitCompile(request, rpcCallback);
70+
expect(result.operation.queries).to.deep.equal(["SELECT 1"]);
71+
});
72+
73+
test("compiles operation with async function", async () => {
74+
const request = dataform.JitCompilationRequest.create({
75+
jitCode: `async function(ctx) { return "SELECT 1"; }`,
76+
target,
77+
jitData: {},
78+
compilationTargetType: dataform.JitCompilationTargetType.JIT_COMPILATION_TARGET_TYPE_OPERATION,
79+
});
80+
const result = await jitCompile(request, rpcCallback);
81+
expect(result.operation.queries).to.deep.equal(["SELECT 1"]);
82+
});
83+
84+
test("compiles operation with regular function returning promise", async () => {
85+
const request = dataform.JitCompilationRequest.create({
86+
jitCode: `function(ctx) { return Promise.resolve("SELECT 1"); }`,
87+
target,
88+
jitData: {},
89+
compilationTargetType: dataform.JitCompilationTargetType.JIT_COMPILATION_TARGET_TYPE_OPERATION,
90+
});
91+
const result = await jitCompile(request, rpcCallback);
92+
expect(result.operation.queries).to.deep.equal(["SELECT 1"]);
93+
});
94+
});
95+
96+
suite("jitCompileTable", () => {
97+
test("compiles table returning string", async () => {
98+
const request = dataform.JitCompilationRequest.create({
99+
jitCode: `async (ctx) => "SELECT 1"`,
100+
target,
101+
jitData: {},
102+
compilationTargetType: dataform.JitCompilationTargetType.JIT_COMPILATION_TARGET_TYPE_TABLE,
103+
});
104+
const result = await jitCompile(request, rpcCallback);
105+
expect(result.table.query).to.equal("SELECT 1");
106+
});
107+
108+
test("compiles table returning object", async () => {
109+
const request = dataform.JitCompilationRequest.create({
110+
jitCode: `async (ctx) => ({ query: "SELECT 1", preOps: ["PRE"], postOps: ["POST"] })`,
111+
target,
112+
jitData: {},
113+
compilationTargetType: dataform.JitCompilationTargetType.JIT_COMPILATION_TARGET_TYPE_TABLE,
114+
});
115+
const result = await jitCompile(request, rpcCallback);
116+
expect(result.table.query).to.equal("SELECT 1");
117+
expect(result.table.preOps).to.deep.equal(["PRE"]);
118+
expect(result.table.postOps).to.deep.equal(["POST"]);
119+
});
120+
});
121+
122+
suite("jitCompileIncrementalTable", () => {
123+
test("compiles incremental table", async () => {
124+
const request = dataform.JitCompilationRequest.create({
125+
jitCode: `async (ctx) => {
126+
if (ctx.incremental()) {
127+
return { query: "SELECT INC" };
128+
}
129+
return { query: "SELECT REG" };
130+
}`,
131+
target,
132+
jitData: {},
133+
compilationTargetType: dataform.JitCompilationTargetType.JIT_COMPILATION_TARGET_TYPE_INCREMENTAL_TABLE,
134+
});
135+
const result = await jitCompile(request, rpcCallback);
136+
expect(result.incrementalTable.incremental?.query).to.equal("SELECT INC");
137+
expect(result.incrementalTable.regular?.query).to.equal("SELECT REG");
138+
});
139+
});
140+
});

core/jit_context.ts

Lines changed: 12 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -6,25 +6,25 @@ function canonicalTargetValue(target: dataform.ITarget): string {
66
return `${target.database}.${target.schema}.${target.name}`;
77
}
88

9-
function schemaTargetValue(target: dataform.ITarget): string {
10-
return `${target.schema}.${target.name}`;
11-
}
12-
139
/** Generate SQL action JiT context. */
1410
export class SqlActionJitContext implements JitContext<IActionContext> {
11+
public readonly data: { [k: string]: any } | undefined;
12+
13+
private readonly target: dataform.ITarget;
1514
private readonly resolvableMap: ResolvableMap<string>;
1615

1716
constructor(
1817
public readonly adapter: dataform.DbAdapter,
19-
public readonly data: { [k: string]: any } | undefined,
20-
private readonly target: dataform.ITarget,
21-
dependencies: dataform.ITarget[],
18+
public readonly request: dataform.IJitCompilationRequest,
2219
) {
23-
const resolvables = [target, ...dependencies];
20+
this.target = request.target;
21+
const dependencies = request.dependencies;
22+
const resolvables = [this.target, ...dependencies];
2423
this.resolvableMap = new ResolvableMap(resolvables.map(dep => ({
2524
actionTarget: dep,
2625
value: canonicalTargetValue(dep)
2726
})));
27+
this.data = request.jitData;
2828
}
2929

3030
public self(): string {
@@ -75,11 +75,9 @@ export class SqlActionJitContext implements JitContext<IActionContext> {
7575
export class TableJitContext extends SqlActionJitContext implements JitContext<ITableContext> {
7676
constructor(
7777
adapter: dataform.DbAdapter,
78-
data: { [k: string]: any } | undefined,
79-
target: dataform.ITarget,
80-
dependencies: dataform.ITarget[],
78+
request: dataform.IJitCompilationRequest,
8179
) {
82-
super(adapter, data, target, dependencies);
80+
super(adapter, request);
8381
}
8482

8583
public when(cond: boolean, trueCase: string, falseCase?: string) {
@@ -94,12 +92,10 @@ export class TableJitContext extends SqlActionJitContext implements JitContext<I
9492
/** JiT context for incremental table actions. */
9593
export class IncrementalTableJitContext extends TableJitContext {
9694
constructor(adapter: dataform.DbAdapter,
97-
data: { [k: string]: any } | undefined,
98-
target: dataform.ITarget,
99-
dependencies: dataform.ITarget[],
95+
request: dataform.IJitCompilationRequest,
10096
private readonly isIncrementalContext: boolean,
10197
) {
102-
super(adapter, data, target, dependencies);
98+
super(adapter, request);
10399
}
104100

105101
public incremental(): boolean {

0 commit comments

Comments
 (0)