This repository was archived by the owner on Oct 8, 2020. It is now read-only.
File tree Expand file tree Collapse file tree
sansa-inference-flink/src/main/scala/net/sansa_stack/inference/flink/utils Expand file tree Collapse file tree Original file line number Diff line number Diff line change @@ -32,14 +32,37 @@ object DataSetUtils {
3232 }
3333
3434 /**
35- * Return a DataSet with the elements from this that are not in `other`.
35+ * Returns a DataSet with the elements from this that are not in `other`.
3636 *
3737 * @param other the DataSet containing the element to be subtracted
3838 * @return the DataSet
3939 */
4040 def subtract (other : DataSet [T ]): DataSet [T ] = {
41- dataset.coGroup(other).where(" *" ).equalTo(" *" )(new MinusCoGroupFunction [T ](true )).name(" subtract" )
41+ dataset.coGroup(other).where(" *" ).equalTo(" *" )(
42+ new MinusCoGroupFunction [T ](true ))
43+ .name(" subtract" )
4244 }
45+
46+ import scala .reflect ._
47+ /**
48+ * Returns a DataSet with the elements from this that are not in `other`.
49+ * A key selector function for both datasets has to be given.
50+ *
51+ * @param other the DataSet containing the element to be subtracted
52+ * @return the DataSet
53+ */
54+ def subtract [K : ClassTag : TypeInformation ](other : DataSet [T ], keySelectorThis : (T ) => K , keySelectorOther : (T ) => K ): DataSet [T ] = {
55+
56+ val typeInfo = TypeInformation .of(classTag[K ].runtimeClass).asInstanceOf [TypeInformation [K ]]
57+ dataset.coGroup(other)
58+ .where(keySelectorThis)
59+ .equalTo(keySelectorOther)(typeInfo)(
60+ new MinusCoGroupFunction [T ](true ))
61+ .name(" subtract" )
62+ }
63+
64+
65+
4366 }
4467
4568}
You can’t perform that action at this time.
0 commit comments