Skip to content

Commit 85f47d4

Browse files
authored
Handling for Data Preparation actions (#1789)
* Add logic to handle Data preparation definitions and parse data preparation YAML into the proto representation of the operation. * Addressed PR Comments * Fixed lint warnings
1 parent 6635abc commit 85f47d4

9 files changed

Lines changed: 329 additions & 16 deletions

File tree

core/BUILD

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,8 @@
1-
load("//tools:ts_library.bzl", "ts_library")
2-
load("//tools:expand_template.bzl", "expand_template")
31
load("//:version.bzl", "DF_VERSION")
42
load("//testing:index.bzl", "ts_test_suite")
3+
load("//tools:expand_template.bzl", "expand_template")
54
load("//tools:node_modules.bzl", "node_modules")
5+
load("//tools:ts_library.bzl", "ts_library")
66

77
package(default_visibility = ["//visibility:public"])
88

@@ -19,14 +19,15 @@ ts_library(
1919
name = "core",
2020
srcs = [
2121
"actions/assertion.ts",
22+
"actions/data_preparation.ts",
2223
"actions/declaration.ts",
23-
"actions/view.ts",
2424
"actions/incremental_table.ts",
2525
"actions/index.ts",
26-
"actions/operation.ts",
2726
"actions/notebook.ts",
27+
"actions/operation.ts",
2828
"actions/table.ts",
2929
"actions/test.ts",
30+
"actions/view.ts",
3031
"column_descriptors.ts",
3132
"common.ts",
3233
"compilers.ts",

core/actions/data_preparation.ts

Lines changed: 142 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,142 @@
1+
import { verifyObjectMatchesProto, VerifyProtoErrorBehaviour } from "df/common/protos";
2+
import { ActionBuilder } from "df/core/actions";
3+
import { Resolvable } from "df/core/common";
4+
import * as Path from "df/core/path";
5+
import { Session } from "df/core/session";
6+
import {
7+
addDependenciesToActionDependencyTargets,
8+
nativeRequire,
9+
resolveActionsConfigFilename
10+
} from "df/core/utils";
11+
import { dataform } from "df/protos/ts";
12+
13+
/**
14+
* @hidden
15+
*/
16+
export class DataPreparation extends ActionBuilder<dataform.DataPreparation> {
17+
public session: Session;
18+
19+
// TODO: make this field private, to enforce proto update logic to happen in this class.
20+
public proto: dataform.IDataPreparation = dataform.DataPreparation.create();
21+
22+
constructor(
23+
session?: Session,
24+
config?: dataform.ActionConfig.DataPreparationConfig,
25+
configPath?: string
26+
) {
27+
super(session);
28+
this.session = session;
29+
30+
if (!config.name) {
31+
config.name = Path.basename(config.filename);
32+
}
33+
34+
config.filename = resolveActionsConfigFilename(config.filename, configPath);
35+
const dataPreparationContents = nativeRequire(config.filename).asJson;
36+
const dataPreparationDefinition = parseDataPreparationDefinitionJson(dataPreparationContents);
37+
this.proto.dataPreparation = dataPreparationDefinition;
38+
39+
// Find targets
40+
const targets = getTargets(dataPreparationDefinition)
41+
this.proto.targets = targets.map((target) => this.applySessionToTarget(
42+
target,
43+
session.projectConfig,
44+
config.filename,
45+
true
46+
));
47+
this.proto.canonicalTargets = targets.map((target) => this.applySessionToTarget(target, session.canonicalProjectConfig));
48+
49+
// Set the unique target key as the first target defined.
50+
// TODO: Remove once multiple targets are supported.
51+
this.proto.target = this.proto.targets[0];
52+
this.proto.canonicalTarget = this.proto.canonicalTargets[0];
53+
54+
this.proto.tags = config.tags;
55+
this.dependencies(config.dependencyTargets);
56+
this.proto.fileName = config.filename;
57+
if (config.disabled) {
58+
this.proto.disabled = config.disabled;
59+
}
60+
}
61+
62+
/**
63+
* @hidden
64+
*/
65+
public config(config: any) {
66+
return this;
67+
}
68+
69+
/**
70+
* @hidden
71+
*/
72+
public dependencies(value: Resolvable | Resolvable[]) {
73+
const newDependencies = Array.isArray(value) ? value : [value];
74+
newDependencies.forEach(resolvable =>
75+
addDependenciesToActionDependencyTargets(this, resolvable)
76+
);
77+
return this;
78+
}
79+
80+
/**
81+
* @hidden
82+
*/
83+
public getFileName() {
84+
return this.proto.fileName;
85+
}
86+
87+
/**
88+
* @hidden
89+
*/
90+
public getTarget() {
91+
// Return only the first target for now.
92+
return dataform.Target.create(this.proto.target);
93+
}
94+
95+
public compile() {
96+
return verifyObjectMatchesProto(
97+
dataform.DataPreparation,
98+
this.proto,
99+
VerifyProtoErrorBehaviour.SUGGEST_REPORTING_TO_DATAFORM_TEAM
100+
);
101+
}
102+
}
103+
104+
function parseDataPreparationDefinitionJson(
105+
dataPreparationAsJson: { [key: string]: unknown }): dataform.DataPreparationDefinition {
106+
try {
107+
return dataform.DataPreparationDefinition.create(
108+
verifyObjectMatchesProto(
109+
dataform.DataPreparationDefinition,
110+
dataPreparationAsJson as {
111+
[key: string]: any;
112+
},
113+
VerifyProtoErrorBehaviour.SHOW_DOCS_LINK
114+
)
115+
);
116+
} catch (e) {
117+
if (e instanceof ReferenceError) {
118+
throw ReferenceError(`Data Preparation parsing error: ${e.message}`);
119+
}
120+
throw e;
121+
}
122+
}
123+
124+
function getTargets(
125+
definition: dataform.DataPreparationDefinition
126+
): dataform.Target[] {
127+
const targets: dataform.Target[] = [];
128+
129+
definition.nodes.forEach(node => {
130+
const table = node.destination?.table;
131+
if (table) {
132+
const compiledGraphTarget: dataform.ITarget = {
133+
database: table.project,
134+
schema:table.dataset,
135+
name: table.table
136+
};
137+
targets.push(dataform.Target.create(compiledGraphTarget))
138+
}
139+
});
140+
141+
return targets;
142+
}

core/actions/index.ts

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
import { Assertion } from "df/core/actions/assertion";
2+
import { DataPreparation } from "df/core/actions/data_preparation";
23
import { Declaration } from "df/core/actions/declaration";
34
import { IncrementalTable } from "df/core/actions/incremental_table";
45
import { Notebook } from "df/core/actions/notebook";
@@ -15,7 +16,8 @@ export type Action =
1516
| Operation
1617
| Assertion
1718
| Declaration
18-
| Notebook;
19+
| Notebook
20+
| DataPreparation;
1921

2022
// TODO(ekrekr): In v4, make all method on inheritors of this private, forcing users to use
2123
// constructors in order to populate actions.

core/main.ts

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ import {
55
VerifyProtoErrorBehaviour
66
} from "df/common/protos";
77
import { Assertion } from "df/core/actions/assertion";
8+
import { DataPreparation } from "df/core/actions/data_preparation";
89
import { Declaration } from "df/core/actions/declaration";
910
import { IncrementalTable } from "df/core/actions/incremental_table";
1011
import { Notebook } from "df/core/actions/notebook";
@@ -179,6 +180,14 @@ function loadActionConfigs(session: Session, filePaths: string[]) {
179180
actionConfigsPath
180181
)
181182
);
183+
} else if (actionConfig.dataPreparation) {
184+
session.actions.push(
185+
new DataPreparation(
186+
session,
187+
dataform.ActionConfig.DataPreparationConfig.create(actionConfig.dataPreparation),
188+
actionConfigsPath
189+
)
190+
);
182191
} else {
183192
throw Error("Empty action configs are not permitted.");
184193
}

core/main_test.ts

Lines changed: 149 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -858,6 +858,155 @@ defaultNotebookRuntimeOptions:
858858
});
859859
});
860860

861+
suite("data preparations", () => {
862+
const createSimpleDataPreparationProject = (
863+
workflowSettingsYaml = VALID_WORKFLOW_SETTINGS_YAML
864+
): string => {
865+
const projectDir = tmpDirFixture.createNewTmpDir();
866+
fs.writeFileSync(path.join(projectDir, "workflow_settings.yaml"), workflowSettingsYaml);
867+
fs.mkdirSync(path.join(projectDir, "definitions"));
868+
fs.writeFileSync(
869+
path.join(projectDir, "definitions/actions.yaml"),
870+
`
871+
actions:
872+
- dataPreparation:
873+
filename: data_preparation.yaml`
874+
);
875+
return projectDir;
876+
};
877+
878+
test(`data preparations can be loaded via an actions config file`, () => {
879+
const projectDir = createSimpleDataPreparationProject();
880+
fs.writeFileSync(
881+
path.join(projectDir, "definitions/data_preparation.yaml"),
882+
`
883+
nodes:
884+
- id: node1
885+
source:
886+
table:
887+
project: prj
888+
dataset: ds
889+
table: src
890+
destination:
891+
table:
892+
project: prj
893+
dataset: ds
894+
table: dest
895+
generated:
896+
outputSchema:
897+
field:
898+
- name: a
899+
type: INT64
900+
mode: NULLABLE
901+
sourceGenerated:
902+
sourceSchema:
903+
tableSchema:
904+
field:
905+
- name: a
906+
type: STRING
907+
mode: NULLABLE
908+
destinationGenerated:
909+
schema:
910+
field:
911+
- name: a
912+
type: STRING
913+
mode: NULLABLE
914+
`
915+
);
916+
917+
const result = runMainInVm(coreExecutionRequestFromPath(projectDir));
918+
919+
expect(result.compile.compiledGraph.graphErrors.compilationErrors).deep.equals([]);
920+
expect(asPlainObject(result.compile.compiledGraph.dataPreparations)).deep.equals(
921+
asPlainObject([
922+
{
923+
target: {
924+
database: "prj",
925+
schema: "ds",
926+
name: "dest"
927+
},
928+
canonicalTarget: {
929+
database: "prj",
930+
schema: "ds",
931+
name: "dest"
932+
},
933+
targets: [
934+
{
935+
database: "prj",
936+
schema: "ds",
937+
name: "dest"
938+
}
939+
],
940+
canonicalTargets: [
941+
{
942+
database: "prj",
943+
schema: "ds",
944+
name: "dest"
945+
}
946+
],
947+
fileName: "definitions/data_preparation.yaml",
948+
dataPreparation: {
949+
nodes: [
950+
{
951+
id: "node1",
952+
source: {
953+
table: {
954+
dataset: "ds",
955+
project: "prj",
956+
table: "src"
957+
}
958+
},
959+
destination: {
960+
table: {
961+
dataset: "ds",
962+
project: "prj",
963+
table: "dest"
964+
}
965+
},
966+
generated: {
967+
destinationGenerated: {
968+
schema: {
969+
field: [
970+
{
971+
mode: "NULLABLE",
972+
name: "a",
973+
type: "STRING"
974+
}
975+
]
976+
}
977+
},
978+
outputSchema: {
979+
field: [
980+
{
981+
mode: "NULLABLE",
982+
name: "a",
983+
type: "INT64"
984+
}
985+
]
986+
},
987+
sourceGenerated: {
988+
sourceSchema: {
989+
tableSchema: {
990+
field: [
991+
{
992+
mode: "NULLABLE",
993+
name: "a",
994+
type: "STRING"
995+
}
996+
]
997+
}
998+
}
999+
}
1000+
}
1001+
}
1002+
]
1003+
}
1004+
}
1005+
])
1006+
);
1007+
});
1008+
});
1009+
8611010
suite("action configs", () => {
8621011
test(`operations can be loaded`, () => {
8631012
const projectDir = tmpDirFixture.createNewTmpDir();

0 commit comments

Comments
 (0)