Scala API Extensions

    If you want to enjoy the full Scala experience you can choose to opt-in to extensions that enhance the Scala API via implicit conversions.

    To use all the available extensions, you can just add a simple for the DataStream API

    Normally, the DataStream API does not accept anonymous pattern matching functions to deconstruct tuples, case classes or collections, like the following:

    1. val data: DataStream[(Int, String, Double)] = // [...]
    2. data.map {
    3. case (id, name, temperature) => // [...]
    4. // The previous line causes the following compilation error:
    5. // "The argument types of an anonymous function must be fully known. (SLS 8.5)"
    6. }

    This extension introduces new methods in the DataStream Scala API that have a one-to-one correspondence in the extended API. These delegating methods do support anonymous pattern matching functions.

    DataStream API

    To use this extension exclusively, you can add the following import:

    1. import org.apache.flink.api.scala.extensions.acceptPartialFunctions

    for the DataSet extensions and

    1. object Main {
    2. import org.apache.flink.streaming.api.scala.extensions._
    3. def main(args: Array[String]): Unit = {
    4. val env = StreamExecutionEnvironment.getExecutionEnvironment
    5. val ds = env.fromElements(Point(1, 2), Point(3, 4), Point(5, 6))
    6. ds.filterWith {
    7. case Point(x, _) => x > 1
    8. }.reduceWith {
    9. case (Point(x1, y1), (Point(x2, y2))) => Point(x1 + y1, x2 + y2)
    10. }.mapWith {
    11. case Point(x, y) => (x, y)
    12. }.flatMapWith {
    13. case (x, y) => Seq("x" -> x, "y" -> y)
    14. }.keyingBy {
    15. case (id, value) => id
    16. }
    17. }