Parallel Execution
If you want to use savepoints you should also consider setting a maximum parallelism (or ). When restoring from a savepoint you can change the parallelism of specific operators or the whole program and this setting specifies an upper bound on the parallelism. This is required because Flink internally partitions state into key-groups and we cannot have +Inf
number of key-groups because this would be detrimental to performance.
The parallelism of a task can be specified in Flink on different levels:
The parallelism of an individual operator, data source, or data sink can be defined by calling its setParallelism()
method. For example, like this:
Java
Scala
val env = StreamExecutionEnvironment.getExecutionEnvironment
val text = [...]
val wordCounts = text
.flatMap{ _.split(" ") map { (_, 1) } }
.keyBy(_._1)
.window(TumblingEventTimeWindows.of(Time.seconds(5)))
wordCounts.print()
env.execute("Word Count Example")
The default parallelism of an execution environment can be specified by calling the setParallelism()
method. To execute all operators, data sources, and data sinks with a parallelism of 3
, set the default parallelism of the execution environment as follows:
Java
Scala
env.setParallelism(3)
val text = [...]
val wordCounts = text
.flatMap{ _.split(" ") map { (_, 1) } }
.keyBy(_._1)
.window(TumblingEventTimeWindows.of(Time.seconds(5)))
.sum(1)
wordCounts.print()
env.execute("Word Count Example")
The parallelism can be set at the Client when submitting jobs to Flink. The Client can either be a Java or a Scala program. One example of such a Client is Flink’s Command-line Interface (CLI).
For the CLI client, the parallelism parameter can be specified with -p
. For example:
Java
try {
PackagedProgram program = new PackagedProgram(file, args);
Configuration config = new Configuration();
Client client = new Client(jobManagerAddress, config, program.getUserCodeClassLoader());
// set the parallelism to 10 here
client.run(program, 10, true);
} catch (ProgramInvocationException e) {
}
Scala
A system-wide default parallelism for all execution environments can be defined by setting the parallelism.default
property in ./conf/flink-conf.yaml
. See the documentation for details.
Setting the Maximum Parallelism
The maximum parallelism can be set in places where you can also set a parallelism (except client level and system level). Instead of calling setParallelism()
you call setMaxParallelism()
to set the maximum parallelism.
The default setting for the maximum parallelism is roughly operatorParallelism + (operatorParallelism / 2)
with a lower bound of and an upper bound of 32768
.