Command-Line Interface

    A prerequisite for the commands listed in this section to work is to have a running Flink deployment like Kubernetes, or any other option available. Feel free to start a Flink cluster locally to try the commands on your own machine.

    Submitting a job means uploading the job’s JAR and related dependencies to the Flink cluster and initiating the job execution. For the sake of this example, we select a long-running job like examples/streaming/StateMachineExample.jar. Feel free to select any other JAR archive from the examples/ folder or deploy your own job.

    Submitting the job using --detached will make the command return after the submission is done. The output contains (besides other things) the ID of the newly submitted job.

    1. Usage with built-in data generator: StateMachineExample [--error-rate <probability-of-invalid-transition>] [--sleep <sleep-per-record-in-ms>]
    2. Usage with Kafka: StateMachineExample --kafka-topic <topic> [--brokers <brokers>]
    3. Options for both the above setups:
    4. [--backend <file|rocks>]
    5. [--checkpoint-dir <filepath>]
    6. [--async-checkpoints <true|false>]
    7. [--incremental-checkpoints <true|false>]
    8. [--output <filepath> OR null for stdout]
    9. Using standalone source with error rate 0.000000 and sleep delay 1 millis
    10. Job has been submitted with JobID cca7bc1061d61cf15238e92312c2fc20

    The usage information printed lists job-related parameters that can be added to the end of the job submission command if necessary. For the purpose of readability, we assume that the returned JobID is stored in a variable JOB_ID for the commands below:

    1. $ export JOB_ID="cca7bc1061d61cf15238e92312c2fc20"

    There is another action called run-application available to run the job in . This documentation does not address this action individually as it works similarly to the run action in terms of the CLI frontend.

    The run and run-application commands support passing additional configuration parameters via the -D argument. For example setting the maximum parallelism for a job can be done by setting -Dpipeline.max-parallelism=120. This argument is very useful for configuring per-job or application mode clusters, because you can pass any configuration parameter to the cluster, without changing the configuration file.

    When submitting a job to an existing session cluster, only are supported.

    Job Monitoring

    You can monitor any running jobs using the list action:

    1. $ ./bin/flink list
    1. Waiting for response...
    2. ------------------ Running/Restarting Jobs -------------------
    3. 30.11.2020 16:02:29 : cca7bc1061d61cf15238e92312c2fc20 : State machine job (RUNNING)
    4. --------------------------------------------------------------
    5. No scheduled jobs.

    Jobs that were submitted but not started, yet, would be listed under “Scheduled Jobs”.

    can be created to save the current state a job is in. All that’s needed is the JobID:

    1. $ ./bin/flink savepoint \
    2. /tmp/flink-savepoints
    1. Triggering savepoint for job cca7bc1061d61cf15238e92312c2fc20.
    2. Waiting for response...
    3. Savepoint completed. Path: file:/tmp/flink-savepoints/savepoint-cca7bc-bb1e257f0dab
    4. You can resume your program from this savepoint with the run command.

    The savepoint folder is optional and needs to be specified if state.savepoints.dir isn’t set.

    The path to the savepoint can be used later on to .

    Disposing a Savepoint

    1. $ ./bin/flink savepoint \
    2. --dispose \
    3. /tmp/flink-savepoints/savepoint-cca7bc-bb1e257f0dab \
    4. $JOB_ID

    If you use custom state instances (for example custom reducing state or RocksDB state), you have to specify the path to the program JAR with which the savepoint was triggered. Otherwise, you will run into a ClassNotFoundException:

    1. $ ./bin/flink savepoint \
    2. --dispose <savepointPath> \
    3. --jarfile <jarFile>

    Triggering the savepoint disposal through the savepoint action does not only remove the data from the storage but makes Flink clean up the savepoint-related metadata as well.

    Terminating a Job

    Stopping a Job Gracefully Creating a Final Savepoint

    Another action for stopping a job is stop. It is a more graceful way of stopping a running streaming job as the flows from source to sink. When the user requests to stop a job, all sources will be requested to send the last checkpoint barrier that will trigger a savepoint, and after the successful completion of that savepoint, they will finish by calling their cancel() method.

    1. $ ./bin/flink stop \
    2. --savepointPath /tmp/flink-savepoints \
    3. $JOB_ID
    1. Suspending job "cca7bc1061d61cf15238e92312c2fc20" with a savepoint.
    2. Savepoint completed. Path: file:/tmp/flink-savepoints/savepoint-cca7bc-bb1e257f0dab

    We have to use --savepointPath to specify the savepoint folder if isn’t set.

    If the --drain flag is specified, then a MAX_WATERMARK will be emitted before the last checkpoint barrier. This will make all registered event-time timers fire, thus flushing out any state that is waiting for a specific watermark, e.g. windows. The job will keep running until all sources properly shut down. This allows the job to finish processing all in-flight data, which can produce some records to process after the savepoint taken while stopping.

    Cancelling a Job Ungracefully

    Cancelling a job can be achieved through the cancel action:

    1. $ ./bin/flink cancel $JOB_ID
    1. Cancelling job cca7bc1061d61cf15238e92312c2fc20.
    2. Cancelled job cca7bc1061d61cf15238e92312c2fc20.

    The corresponding job’s state will be transitioned from Running to Cancelled. Any computations will be stopped.

    Starting a job from a savepoint can be achieved using the run (and run-application) action.

    1. $ ./bin/flink run \
    2. --detached \
    3. --fromSavepoint /tmp/flink-savepoints/savepoint-cca7bc-bb1e257f0dab \
    4. ./examples/streaming/StateMachineExample.jar
    1. Usage with built-in data generator: StateMachineExample [--error-rate <probability-of-invalid-transition>] [--sleep <sleep-per-record-in-ms>]
    2. Usage with Kafka: StateMachineExample --kafka-topic <topic> [--brokers <brokers>]
    3. Options for both the above setups:
    4. [--backend <file|rocks>]
    5. [--checkpoint-dir <filepath>]
    6. [--async-checkpoints <true|false>]
    7. [--incremental-checkpoints <true|false>]
    8. [--output <filepath> OR null for stdout]
    9. Using standalone source with error rate 0.000000 and sleep delay 1 millis
    10. Job has been submitted with JobID 97b20a0a8ffd5c1d656328b0cd6436a6

    See how the command is equal to the except for the --fromSavepoint parameter which is used to refer to the state of the previously stopped job. A new JobID is generated that can be used to maintain the job.

    By default, we try to match the whole savepoint state to the job being submitted. If you want to allow to skip savepoint state that cannot be restored with the new job you can set the --allowNonRestoredState flag. You need to allow this if you removed an operator from your program that was part of the program when the savepoint was triggered and you still want to use the savepoint.

    Here’s an overview of actions supported by Flink’s CLI tool:

    A more fine-grained description of all actions and their parameters can be accessed through bin/flink --help or the usage information of each individual action bin/flink <action> --help.

    REST API

    The Flink cluster can be also managed using the REST API. The commands described in previous sections are a subset of what is offered by Flink’s REST endpoints. Therefore, tools like can be used to get even more out of Flink.

    Flink is compatible with multiple cluster management frameworks like or YARN which are described in more detail in the Resource Provider section. Jobs can be submitted in different . The parameterization of a job submission differs based on the underlying framework and Deployment Mode.

    bin/flink offers a parameter --target to handle the different options. In addition to that, jobs have to be submitted using either run (for Session and ) or run-application (for Application Mode). See the following summary of parameter combinations:

    • YARN
      • ./bin/flink run --target yarn-session: Submission to an already running Flink on YARN cluster
      • ./bin/flink run --target yarn-per-job: Submission spinning up a Flink on YARN cluster in Per-Job Mode
      • ./bin/flink run-application --target yarn-application: Submission spinning up Flink on YARN cluster in Application Mode
    • Kubernetes
      • ./bin/flink run --target kubernetes-session: Submission to an already running Flink on Kubernetes cluster
      • ./bin/flink run-application --target kubernetes-application: Submission spinning up a Flink on Kubernetes cluster in Application Mode
    • Standalone:
      • ./bin/flink run --target local: Local submission using a MiniCluster in Session Mode
      • ./bin/flink run --target remote: Submission to an already running Flink cluster

    The --target will overwrite the specified in the conf/flink-conf.yaml.

    For more details on the commands and the available options, please refer to the Resource Provider-specific pages of the documentation.

    Currently, users are able to submit a PyFlink job via the CLI. It does not require to specify the JAR file path or the entry main class, which is different from the Java job submission.

    1. $ python --version
    2. # the version printed here must be 3.6+

    The following commands show different PyFlink job submission use-cases:

    • Run a PyFlink job:
    1. $ ./bin/flink run --python examples/python/table/word_count.py
    • Run a PyFlink job with additional source and resource files. Files specified in --pyFiles will be added to the PYTHONPATH and, therefore, available in the Python code.
    1. $ ./bin/flink run \
    2. --python examples/python/table/word_count.py \
    3. --pyFiles file:///user.txt,hdfs:///$namenode_address/username.txt
    • Run a PyFlink job which will reference Java UDF or external connectors. JAR file specified in --jarfile will be uploaded to the cluster.
    1. $ ./bin/flink run \
    2. --python examples/python/table/word_count.py \
    3. --jarfile <jarFile>
    • Run a PyFlink job with pyFiles and the main entry module specified in --pyModule:
    1. $ ./bin/flink run \
    2. --pyModule table.word_count \
    3. --pyFiles examples/python/table
    • Submit a PyFlink job on a specific JobManager running on host <jobmanagerHost> (adapt the command accordingly):
    1. $ ./bin/flink run \
    2. --jobmanager <jobmanagerHost>:8081 \
    3. --python examples/python/table/word_count.py
    • Run a PyFlink job using a :
    1. $ ./bin/flink run \
    2. --target yarn-per-job
    • Run a PyFlink application on a native Kubernetes cluster having the cluster ID <ClusterId>, it requires a docker image with PyFlink installed, please refer to Enabling PyFlink in docker:

    To learn more available options, please refer to or YARN which are described in more detail in the Resource Provider section.

    Besides --pyFiles, --pyModule and --python mentioned above, there are also some other Python related options. Here’s an overview of all the Python related options for the actions run and run-application supported by Flink’s CLI tool:

    OptionDescription
    -py,—pythonPython script with the program entry. The dependent resources can be configured with the —pyFiles option.
    -pym,—pyModulePython module with the program entry point. This option must be used in conjunction with —pyFiles.
    -pyfs,—pyFilesAttach custom files for job. The standard resource file suffixes such as .py/.egg/.zip/.whl or directory are all supported. These files will be added to the PYTHONPATH of both the local client and the remote python UDF worker. Files suffixed with .zip will be extracted and added to PYTHONPATH. Comma (‘,’) could be used as the separator to specify multiple files (e.g., —pyFiles file:///tmp/myresource.zip,hdfs:///$namenode_address/myresource2.zip).
    -pyarch,—pyArchivesAdd python archive files for job. The archive files will be extracted to the working directory of python UDF worker. For each archive file, a target directory be specified. If the target directory name is specified, the archive file will be extracted to a directory with the specified name. Otherwise, the archive file will be extracted to a directory with the same name of the archive file. The files uploaded via this option are accessible via relative path. ‘#’ could be used as the separator of the archive file path and the target directory name. Comma (‘,’) could be used as the separator to specify multiple archive files. This option can be used to upload the virtual environment, the data files used in Python UDF (e.g., —pyArchives file:///tmp/py37.zip,file:///tmp/data.zip#data —pyExecutable py37.zip/py37/bin/python). The data files could be accessed in Python UDF, e.g.: f = open(‘data/data.txt’, ‘r’).
    -pyclientexec,—pyClientExecutableThe path of the Python interpreter used to launch the Python process when submitting the Python jobs via \”flink run\” or compiling the Java/Scala jobs containing Python UDFs. (e.g., —pyArchives file:///tmp/py37.zip —pyClientExecutable py37.zip/py37/python)
    Specify the path of the python interpreter used to execute the python UDF worker (e.g.: —pyExecutable /usr/local/bin/python3). The python UDF worker depends on Python 3.6+, Apache Beam (version == 2.27.0), Pip (version >= 7.1.0) and SetupTools (version >= 37.0.0). Please ensure that the specified environment meets the above requirements.
    -pyreq,—pyRequirementsSpecify the requirements.txt file which defines the third-party dependencies. These dependencies will be installed and added to the PYTHONPATH of the python UDF worker. A directory which contains the installation packages of these dependencies could be specified optionally. Use ‘#’ as the separator if the optional parameter exists (e.g., —pyRequirements file:///tmp/requirements.txt#file:///tmp/cached_dir).