Side Outputs

    When using side outputs, you first need to define an OutputTag that will be used to identify a side output stream:

    Java

    Scala

    1. val outputTag = OutputTag[String]("side-output")

    Emitting data to a side output is possible from the following functions:

    You can use the Context parameter, which is exposed to users in the above functions, to emit data to a side output identified by an OutputTag. Here is an example of emitting side output data from a ProcessFunction:

    Java

    1. val input: DataStream[Int] = ...
    2. val mainDataStream = input
    3. .process(new ProcessFunction[Int, Int] {
    4. override def processElement(
    5. value: Int,
    6. ctx: ProcessFunction[Int, Int]#Context,
    7. out: Collector[Int]): Unit = {
    8. // emit data to regular output
    9. out.collect(value)
    10. ctx.output(outputTag, "sideout-" + String.valueOf(value))
    11. }
    12. })

    For retrieving the side output stream you use getSideOutput(OutputTag) on the result of the DataStream operation. This will give you a DataStream that is typed to the result of the side output stream:

    Java

    Scala

    1. val outputTag = OutputTag[String]("side-output")
    2. val mainDataStream = ...
    3. val sideOutputStream: DataStream[String] = mainDataStream.getSideOutput(outputTag)