User-Defined Functions

    Java

    The most basic way is to implement one of the provided interfaces:

    Anonymous classes

    You can pass a function as an anonymous class:

    1. data.map(new MapFunction<String, Integer> () {
    2. public Integer map(String value) { return Integer.parseInt(value); }
    3. });

    Flink also supports Java 8 Lambdas in the Java API.

    1. data.filter(s -> s.startsWith("http://"));
    1. data.reduce((i1,i2) -> i1 + i2);

    Rich functions

    All transformations that require a user-defined function can instead take as argument a rich function. For example, instead of

    1. public Integer map(String value) { return Integer.parseInt(value); }
    2. };

    you can write

    1. class MyMapFunction extends RichMapFunction<String, Integer> {
    2. public Integer map(String value) { return Integer.parseInt(value); }
    3. };

    and pass the function as usual to a map transformation:

    Rich functions can also be defined as an anonymous class:

    1. data.map (new RichMapFunction<String, Integer>() {
    2. public Integer map(String value) { return Integer.parseInt(value); }
    3. });

    Scala

    1. data.filter { _.startsWith("http://") }
    1. val data: DataSet[Int] = // [...]
    2. data.reduce { (i1,i2) => i1 + i2 }
    3. data.reduce { _ + _ }

    Rich functions

    All transformations that take as argument a lambda function can instead take as argument a rich function. For example, instead of

    1. data.map { x => x.toInt }

    you can write

    1. class MyMapFunction extends RichMapFunction[String, Int] {
    2. def map(in: String):Int = { in.toInt }
    3. };

    and pass the function to a map transformation:

    Rich functions can also be defined as an anonymous class:

    1. data.map (new RichMapFunction[String, Int] {
    2. def map(in: String):Int = { in.toInt }
    3. })

    Rich functions provide, in addition to the user-defined function (map, reduce, etc), four methods: open, close, , and setRuntimeContext. These are useful for parameterizing the function (see ), creating and finalizing local state, accessing broadcast variables (see Broadcast Variables), and for accessing runtime information such as accumulators and counters (see ), and information on iterations (see Iterations).

    Accumulators are simple constructs with an add operation and a final accumulated result, which is available after the job ended.

    The most straightforward accumulator is a counter: You can increment it using the Accumulator.add(V value) method. At the end of the job Flink will sum up (merge) all partial results and send the result to the client. Accumulators are useful during debugging or if you quickly want to find out more about your data.

    Flink currently has the following built-in accumulators. Each of them implements the interface.

    • IntCounter , and DoubleCounter : See below for an example using a counter.
    • : A histogram implementation for a discrete number of bins. Internally it is just a map from Integer to Integer. You can use this to compute distributions of values, e.g. the distribution of words-per-line for a word count program.

    How to use accumulators:

    1. private IntCounter numLines = new IntCounter();

    Second you have to register the accumulator object, typically in the open() method of the rich function. Here you also define the name.

    1. getRuntimeContext().addAccumulator("num-lines", this.numLines);

    You can now use the accumulator anywhere in the operator function, including in the open() and close() methods.

      The overall result will be stored in the JobExecutionResult object which is returned from the execute() method of the execution environment (currently this only works if the execution waits for the completion of the job).

      1. myJobExecutionResult.getAccumulatorResult("num-lines")

      All accumulators share a single namespace per job. Thus you can use the same accumulator in different operator functions of your job. Flink will internally merge all accumulators with the same name.

      A note on accumulators and iterations: Currently the result of accumulators is only available after the overall job has ended. We plan to also make the result of the previous iteration available in the next iteration. You can use Aggregators to compute per-iteration statistics and base the termination of iterations on such statistics.

      Custom accumulators:

      To implement your own accumulator you simply have to write your implementation of the Accumulator interface. Feel free to create a pull request if you think your custom accumulator should be shipped with Flink.

      You have the choice to implement either or SimpleAccumulator .

      Accumulator<V,R> is most flexible: It defines a type V for the value to add, and a result type R for the final result. E.g. for a histogram, V is a number and is a histogram. SimpleAccumulator is for the cases where both types are the same, e.g. for counters.