Side Outputs
When using side outputs, you first need to define an OutputTag
that will be used to identify a side output stream:
Java
Scala
val outputTag = OutputTag[String]("side-output")
Emitting data to a side output is possible from the following functions:
- ProcessFunction
- CoProcessFunction
- KeyedCoProcessFunction
- ProcessWindowFunction
- ProcessAllWindowFunction
You can use the Context
parameter, which is exposed to users in the above functions, to emit data to a side output identified by an OutputTag
. Here is an example of emitting side output data from a ProcessFunction
:
Java
val input: DataStream[Int] = ...
val mainDataStream = input
.process(new ProcessFunction[Int, Int] {
override def processElement(
value: Int,
ctx: ProcessFunction[Int, Int]#Context,
out: Collector[Int]): Unit = {
// emit data to regular output
out.collect(value)
ctx.output(outputTag, "sideout-" + String.valueOf(value))
}
})
For retrieving the side output stream you use getSideOutput(OutputTag)
on the result of the DataStream
operation. This will give you a DataStream
that is typed to the result of the side output stream:
Java
Scala
val outputTag = OutputTag[String]("side-output")
val mainDataStream = ...
val sideOutputStream: DataStream[String] = mainDataStream.getSideOutput(outputTag)