Skip to content

Commit faea8c6

Browse files
authored
feat:add support python udf (#172)
1 parent a46c383 commit faea8c6

46 files changed

Lines changed: 1410 additions & 18 deletions

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.
Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,25 @@
1+
name: Python Package
2+
on:
3+
release:
4+
types: [created]
5+
6+
jobs:
7+
publishPythonZip:
8+
runs-on: ubuntu-latest
9+
steps:
10+
- uses: actions/checkout@v2
11+
- name: Zip Python Udf
12+
run: |
13+
cd dagger-py-functions
14+
zip -r python_udfs.zip udfs -x "*/__init__.py"
15+
zip -jr data.zip data
16+
zip -r dagger-py-functions.zip requirements.txt data.zip python_udfs.zip
17+
- name: Upload Release
18+
uses: ncipollo/release-action@v1
19+
with:
20+
artifacts: dagger-py-functions/dagger-py-functions.zip
21+
allowUpdates: true
22+
omitNameDuringUpdate: true
23+
omitBodyDuringUpdate: true
24+
omitPrereleaseDuringUpdate: true
25+
token: ${{ secrets.GITHUB_TOKEN }}
Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,27 @@
1+
name: Python Validation
2+
3+
on: push
4+
5+
jobs:
6+
pythonValidation:
7+
runs-on: ubuntu-latest
8+
steps:
9+
- uses: actions/checkout@v3
10+
- name: Set up Python 3.8
11+
uses: actions/setup-python@v3
12+
with:
13+
python-version: '3.8'
14+
- name: Install dependencies
15+
run: |
16+
python -m pip install --upgrade pip
17+
pip install apache-flink==1.14.3
18+
cd dagger-py-functions
19+
if [ -f requirements.txt ]; then pip install -r requirements.txt; fi
20+
- name: Lint with flake8
21+
run: |
22+
# stop the build if there are Python syntax errors or undefined names
23+
flake8 . --count --select=E9,F63,F7,F82 --show-source --statistics
24+
- name: Test with pytest
25+
run: |
26+
cd dagger-py-functions
27+
pytest --disable-warnings

dagger-core/env/local.properties

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,3 +26,7 @@ METRIC_TELEMETRY_ENABLE=true
2626
# == Others ==
2727
FUNCTION_FACTORY_CLASSES=io.odpf.dagger.functions.udfs.factories.FunctionFactory
2828
FLINK_ROWTIME_ATTRIBUTE_NAME=rowtime
29+
30+
# == Python Udf ==
31+
PYTHON_UDF_ENABLE=false
32+
PYTHON_UDF_CONFIG={"PYTHON_FILES":"/path/to/files.zip", "PYTHON_REQUIREMENTS": "requirements.txt", "PYTHON_FN_EXECUTION_BUNDLE_SIZE": "1000"}

dagger-core/src/main/java/io/odpf/dagger/core/StreamManager.java

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,8 @@
1919
import io.odpf.dagger.core.source.Stream;
2020
import io.odpf.dagger.core.source.StreamsFactory;
2121
import io.odpf.dagger.core.utils.Constants;
22+
import io.odpf.dagger.functions.udfs.python.PythonUdfConfig;
23+
import io.odpf.dagger.functions.udfs.python.PythonUdfManager;
2224
import org.apache.flink.streaming.api.CheckpointingMode;
2325
import org.apache.flink.streaming.api.datastream.DataStream;
2426
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
@@ -28,12 +30,15 @@
2830
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
2931
import org.apache.flink.types.Row;
3032

33+
import java.io.IOException;
3134
import java.lang.reflect.Constructor;
3235
import java.lang.reflect.InvocationTargetException;
3336
import java.time.Duration;
3437
import java.util.List;
3538

3639
import static io.odpf.dagger.core.utils.Constants.*;
40+
import static io.odpf.dagger.functions.common.Constants.PYTHON_UDF_ENABLE_DEFAULT;
41+
import static io.odpf.dagger.functions.common.Constants.PYTHON_UDF_ENABLE_KEY;
3742
import static org.apache.flink.table.api.Expressions.$;
3843

3944
/**
@@ -138,7 +143,13 @@ private ApiExpression[] getApiExpressions(StreamInfo streamInfo) {
138143
*
139144
* @return the stream manager
140145
*/
141-
public StreamManager registerFunctions() {
146+
public StreamManager registerFunctions() throws IOException {
147+
if (configuration.getBoolean(PYTHON_UDF_ENABLE_KEY, PYTHON_UDF_ENABLE_DEFAULT)) {
148+
PythonUdfConfig pythonUdfConfig = PythonUdfConfig.parse(configuration);
149+
PythonUdfManager pythonUdfManager = new PythonUdfManager(tableEnvironment, pythonUdfConfig);
150+
pythonUdfManager.registerPythonFunctions();
151+
}
152+
142153
String[] functionFactoryClasses = configuration
143154
.getString(Constants.FUNCTION_FACTORY_CLASSES_KEY, Constants.FUNCTION_FACTORY_CLASSES_DEFAULT)
144155
.split(",");

dagger-functions/build.gradle

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,7 @@ sourceSets {
4646
dependencies {
4747
compileOnly 'org.projectlombok:lombok:1.18.8'
4848
annotationProcessor 'org.projectlombok:lombok:1.18.8'
49+
4950
compileOnly project(path: ':dagger-common', configuration: 'minimalCommonJar')
5051
compileOnly project(path: ':dagger-common', configuration: 'dependenciesCommonJar')
5152
compileOnly 'org.apache.flink:flink-streaming-java_2.11:' + flinkVersion
@@ -62,7 +63,7 @@ dependencies {
6263
dependenciesFunctionsJar group: 'org.apache.commons', name: 'commons-jexl3', version: '3.1'
6364
dependenciesFunctionsJar group: 'org.isuper', name: 's2-geometry-library-java', version: '0.0.1'
6465
dependenciesFunctionsJar group: 'com.google.cloud', name: 'google-cloud-storage', version: '1.67.0'
65-
66+
6667
testImplementation project(':dagger-common').sourceSets.test.output
6768
testImplementation group: 'junit', name: 'junit', version: '4.12'
6869
testImplementation 'org.mockito:mockito-core:2.0.99-beta'

dagger-functions/src/main/java/io/odpf/dagger/functions/common/Constants.java

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,4 +7,17 @@ public class Constants {
77
public static final String UDF_DART_GCS_PROJECT_ID_DEFAULT = "";
88
public static final String UDF_DART_GCS_BUCKET_ID_KEY = "UDF_DART_GCS_BUCKET_ID";
99
public static final String UDF_DART_GCS_BUCKET_ID_DEFAULT = "";
10+
11+
public static final String PYTHON_UDF_CONFIG = "PYTHON_UDF_CONFIG";
12+
public static final String PYTHON_UDF_ENABLE_KEY = "PYTHON_UDF_ENABLE";
13+
public static final boolean PYTHON_UDF_ENABLE_DEFAULT = false;
14+
public static final String PYTHON_FILES_KEY = "PYTHON_FILES";
15+
public static final String PYTHON_REQUIREMENTS_KEY = "PYTHON_REQUIREMENTS";
16+
public static final String PYTHON_ARCHIVES_KEY = "PYTHON_ARCHIVES";
17+
public static final String PYTHON_FN_EXECUTION_ARROW_BATCH_SIZE_KEY = "PYTHON_FN_EXECUTION_ARROW_BATCH_SIZE";
18+
public static final Integer PYTHON_FN_EXECUTION_ARROW_BATCH_SIZE_DEFAULT = 10000;
19+
public static final String PYTHON_FN_EXECUTION_BUNDLE_SIZE_KEY = "PYTHON_FN_EXECUTION_BUNDLE_SIZE";
20+
public static final Integer PYTHON_FN_EXECUTION_BUNDLE_SIZE_DEFAULT = 100000;
21+
public static final String PYTHON_FN_EXECUTION_BUNDLE_TIME_KEY = "PYTHON_FN_EXECUTION_BUNDLE_TIME";
22+
public static final long PYTHON_FN_EXECUTION_BUNDLE_TIME_DEFAULT = 1000;
1023
}
Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,17 @@
1+
package io.odpf.dagger.functions.exceptions;
2+
3+
/**
4+
* The type Python files empty exception.
5+
*/
6+
public class PythonFilesEmptyException extends RuntimeException {
7+
8+
/**
9+
* Instantiates a new Python files empty exception.
10+
*
11+
* @param message the message
12+
*/
13+
public PythonFilesEmptyException(String message) {
14+
super(message);
15+
}
16+
17+
}
Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,17 @@
1+
package io.odpf.dagger.functions.exceptions;
2+
3+
/**
4+
* The type Python files format exception.
5+
*/
6+
public class PythonFilesFormatException extends RuntimeException {
7+
8+
/**
9+
* Instantiates a new Python files format exception.
10+
*
11+
* @param message the message
12+
*/
13+
public PythonFilesFormatException(String message) {
14+
super(message);
15+
}
16+
17+
}
Lines changed: 110 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,110 @@
1+
package io.odpf.dagger.functions.udfs.python;
2+
3+
import com.google.gson.Gson;
4+
import com.google.gson.GsonBuilder;
5+
import com.google.gson.annotations.SerializedName;
6+
import io.odpf.dagger.common.configuration.Configuration;
7+
import lombok.Getter;
8+
9+
import static io.odpf.dagger.functions.common.Constants.*;
10+
11+
/**
12+
* The type Python udf config.
13+
*/
14+
public class PythonUdfConfig {
15+
private static final Gson GSON = new GsonBuilder()
16+
.enableComplexMapKeySerialization()
17+
.setPrettyPrinting()
18+
.create();
19+
20+
@SerializedName(PYTHON_FILES_KEY)
21+
private String pythonFiles;
22+
23+
@SerializedName(PYTHON_REQUIREMENTS_KEY)
24+
@Getter
25+
private String pythonRequirements;
26+
27+
@SerializedName(PYTHON_ARCHIVES_KEY)
28+
private String pythonArchives;
29+
30+
@SerializedName(PYTHON_FN_EXECUTION_ARROW_BATCH_SIZE_KEY)
31+
private Integer pythonArrowBatchSize;
32+
33+
@SerializedName(PYTHON_FN_EXECUTION_BUNDLE_SIZE_KEY)
34+
private Integer pythonBundleSize;
35+
36+
@SerializedName(PYTHON_FN_EXECUTION_BUNDLE_TIME_KEY)
37+
private Long pythonBundleTime;
38+
39+
/**
40+
* Gets python files.
41+
*
42+
* @return the python files
43+
*/
44+
public String getPythonFiles() {
45+
if (pythonFiles != null) {
46+
return pythonFiles.replaceAll("\\s+", "");
47+
}
48+
return null;
49+
}
50+
51+
/**
52+
* Gets python archives.
53+
*
54+
* @return the python archives
55+
*/
56+
public String getPythonArchives() {
57+
if (pythonArchives != null) {
58+
return pythonArchives.replaceAll("\\s+", "");
59+
}
60+
return null;
61+
}
62+
63+
/**
64+
* Gets python arrow batch size.
65+
*
66+
* @return the python arrow batch size
67+
*/
68+
public int getPythonArrowBatchSize() {
69+
if (pythonArrowBatchSize == null) {
70+
return PYTHON_FN_EXECUTION_ARROW_BATCH_SIZE_DEFAULT;
71+
}
72+
return pythonArrowBatchSize;
73+
}
74+
75+
/**
76+
* Gets python bundle size.
77+
*
78+
* @return the python bundle size
79+
*/
80+
public int getPythonBundleSize() {
81+
if (pythonBundleSize == null) {
82+
return PYTHON_FN_EXECUTION_BUNDLE_SIZE_DEFAULT;
83+
}
84+
return pythonBundleSize;
85+
}
86+
87+
/**
88+
* Gets python bundle time.
89+
*
90+
* @return the python bundle time
91+
*/
92+
public long getPythonBundleTime() {
93+
if (pythonBundleTime == null) {
94+
return PYTHON_FN_EXECUTION_BUNDLE_TIME_DEFAULT;
95+
}
96+
return pythonBundleTime;
97+
}
98+
99+
/**
100+
* Parse python udf config.
101+
*
102+
* @param configuration the configuration
103+
* @return the python udf config
104+
*/
105+
public static PythonUdfConfig parse(Configuration configuration) {
106+
String jsonString = configuration.getString(PYTHON_UDF_CONFIG, "");
107+
108+
return GSON.fromJson(jsonString, PythonUdfConfig.class);
109+
}
110+
}
Lines changed: 81 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,81 @@
1+
package io.odpf.dagger.functions.udfs.python;
2+
3+
import io.odpf.dagger.functions.exceptions.PythonFilesEmptyException;
4+
import io.odpf.dagger.functions.udfs.python.file.type.FileType;
5+
import io.odpf.dagger.functions.udfs.python.file.type.FileTypeFactory;
6+
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
7+
8+
import java.io.IOException;
9+
import java.util.ArrayList;
10+
import java.util.List;
11+
12+
/**
13+
* The type Python udf manager.
14+
*/
15+
public class PythonUdfManager {
16+
17+
private StreamTableEnvironment tableEnvironment;
18+
private PythonUdfConfig pythonUdfConfig;
19+
20+
/**
21+
* Instantiates a new Python udf manager.
22+
*
23+
* @param tableEnvironment the table environment
24+
* @param pythonUdfConfig the python udf config
25+
*/
26+
public PythonUdfManager(StreamTableEnvironment tableEnvironment, PythonUdfConfig pythonUdfConfig) {
27+
this.tableEnvironment = tableEnvironment;
28+
this.pythonUdfConfig = pythonUdfConfig;
29+
}
30+
31+
/**
32+
* Register python functions.
33+
*/
34+
public void registerPythonFunctions() throws IOException {
35+
String inputFiles = pythonUdfConfig.getPythonFiles();
36+
String[] pythonFiles;
37+
if (inputFiles != null) {
38+
registerPythonConfig();
39+
pythonFiles = inputFiles.split(",");
40+
} else {
41+
throw new PythonFilesEmptyException("Python files can not be null");
42+
}
43+
44+
for (String pythonFile : pythonFiles) {
45+
FileType fileType = FileTypeFactory.getFileType(pythonFile);
46+
List<String> fileNames = fileType.getFileNames();
47+
List<String> sqlQueries = createQuery(fileNames);
48+
executeSql(sqlQueries);
49+
}
50+
}
51+
52+
private void registerPythonConfig() {
53+
if (pythonUdfConfig.getPythonRequirements() != null) {
54+
tableEnvironment.getConfig().getConfiguration().setString("python.requirements", pythonUdfConfig.getPythonRequirements());
55+
}
56+
if (pythonUdfConfig.getPythonArchives() != null) {
57+
tableEnvironment.getConfig().getConfiguration().setString("python.archives", pythonUdfConfig.getPythonArchives());
58+
}
59+
tableEnvironment.getConfig().getConfiguration().setString("python.files", pythonUdfConfig.getPythonFiles());
60+
tableEnvironment.getConfig().getConfiguration().setInteger("python.fn-execution.arrow.batch.size", pythonUdfConfig.getPythonArrowBatchSize());
61+
tableEnvironment.getConfig().getConfiguration().setInteger("python.fn-execution.bundle.size", pythonUdfConfig.getPythonBundleSize());
62+
tableEnvironment.getConfig().getConfiguration().setLong("python.fn-execution.bundle.time", pythonUdfConfig.getPythonBundleTime());
63+
}
64+
65+
private void executeSql(List<String> sqlQueries) {
66+
for (String query : sqlQueries) {
67+
tableEnvironment.executeSql(query);
68+
}
69+
}
70+
71+
private List<String> createQuery(List<String> fileNames) {
72+
List<String> sqlQueries = new ArrayList<>();
73+
for (String fileName : fileNames) {
74+
fileName = fileName.replace(".py", "").replace("/", ".");
75+
String functionName = fileName.substring(fileName.lastIndexOf(".") + 1);
76+
String query = "CREATE TEMPORARY FUNCTION " + functionName.toUpperCase() + " AS '" + fileName + "." + functionName + "' LANGUAGE PYTHON";
77+
sqlQueries.add(query);
78+
}
79+
return sqlQueries;
80+
}
81+
}

0 commit comments

Comments
 (0)