Scala API Extensions
If you want to enjoy the full Scala experience you can choose to opt-in to extensions that enhance the Scala API via implicit conversions.
To use all the available extensions, you can just add a simple for the DataStream API
Normally, the DataStream API does not accept anonymous pattern matching functions to deconstruct tuples, case classes or collections, like the following:
val data: DataStream[(Int, String, Double)] = // [...]
data.map {
case (id, name, temperature) => // [...]
// The previous line causes the following compilation error:
// "The argument types of an anonymous function must be fully known. (SLS 8.5)"
}
This extension introduces new methods in the DataStream Scala API that have a one-to-one correspondence in the extended API. These delegating methods do support anonymous pattern matching functions.
DataStream API
To use this extension exclusively, you can add the following import
:
import org.apache.flink.api.scala.extensions.acceptPartialFunctions
for the DataSet extensions and
object Main {
import org.apache.flink.streaming.api.scala.extensions._
def main(args: Array[String]): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
val ds = env.fromElements(Point(1, 2), Point(3, 4), Point(5, 6))
ds.filterWith {
case Point(x, _) => x > 1
}.reduceWith {
case (Point(x1, y1), (Point(x2, y2))) => Point(x1 + y1, x2 + y2)
}.mapWith {
case Point(x, y) => (x, y)
}.flatMapWith {
case (x, y) => Seq("x" -> x, "y" -> y)
}.keyingBy {
case (id, value) => id
}
}