Handling Application Parameters
Flink provides a simple utility called ParameterTool
to provide some basic tooling for solving these problems. Please note that you don’t have to use the ParameterTool
described here. Other frameworks such as Commons CLI and also work well with Flink.
The ParameterTool
provides a set of predefined static methods for reading the configuration. The tool is internally expecting a Map<String, String>
, so it’s very easy to integrate it with your own configuration style.
From .properties
files
The following method will read a file and provide the key/value pairs:
From the command line arguments
This allows getting arguments like --input hdfs:///mydata --elements 42
from the command line.
public static void main(String[] args) {
From system properties
Using the parameters in your Flink program
Now that we’ve got the parameters from somewhere (see above) we can use them in various ways.
Directly from the ParameterTool
The ParameterTool
itself has methods for accessing the values.
ParameterTool parameters = // ...
parameter.getRequired("input");
parameter.get("output", "myDefaultValue");
parameter.getLong("expectedCount", -1L);
parameter.getNumberOfParameters()
// .. there are more methods available.
You can use the return values of these methods directly in the method of the client submitting the application. For example, you could set the parallelism of a operator like this:
ParameterTool parameters = ParameterTool.fromArgs(args);
and then use it inside the function for getting values from the command line.
Register the parameters globally
Parameters registered as global job parameters in the ExecutionConfig
can be accessed as configuration values from the JobManager web interface and in all functions defined by the user.
Register the parameters globally:
Access them in any rich user function:
public static final class Tokenizer extends RichFlatMapFunction<String, Tuple2<String, Integer>> {
@Override
public void flatMap(String value, Collector<Tuple2<String, Integer>> out) {
ParameterTool parameters = (ParameterTool)
getRuntimeContext().getExecutionConfig().getGlobalJobParameters();
// .. do more ..