Docker Setup

    Docker is a popular container runtime. There are official Docker images for Apache Flink available . You can use the Docker images to deploy a Session or Application cluster on Docker. This page focuses on the setup of Flink on Docker, Docker Swarm and Docker Compose.

    Deployment into managed containerized environments, such as standalone Kubernetes or , are described on separate pages.

    Starting a Session Cluster on Docker

    A Flink Session cluster can be used to run multiple jobs. Each job needs to be submitted to the cluster after the cluster has been deployed. To deploy a Flink Session cluster with Docker, you need to start a JobManager container. To enable communication between the containers, we first set a required Flink configuration property and create a network:

    Then we launch the JobManager:

    1. --rm \
    2. --name=jobmanager \
    3. --network flink-network \
    4. --publish 8081:8081 \
    5. --env FLINK_PROPERTIES="${FLINK_PROPERTIES}" \
    6. flink:1.14.4-scala_2.11 jobmanager

    and one or more TaskManager containers:

    1. $ docker run \
    2. --rm \
    3. --name=taskmanager \
    4. --network flink-network \
    5. --env FLINK_PROPERTIES="${FLINK_PROPERTIES}" \
    6. flink:1.14.4-scala_2.11 taskmanager

    The web interface is now available at .

    Submission of a job is now possible like this (assuming you have a local distribution of Flink available):

    1. $ ./bin/flink run ./examples/streaming/TopSpeedWindowing.jar

    To shut down the cluster, either terminate (e.g. with CTRL-C) the JobManager and TaskManager processes, or use docker ps to identify and docker stop to terminate the containers.

    The Flink image contains a regular Flink distribution with its default configuration and a standard entry point script. You can run its entry point in the following modes:

    This allows you to deploy a standalone cluster (Session or Application Mode) in any containerised environment, for example:

    Note The native Kubernetes also runs the same image by default and deploys TaskManagers on demand so that you do not have to do it manually.

    The next chapters describe how to start a single Flink Docker container for various purposes.

    Once you’ve started Flink on Docker, you can access the Flink Webfrontend on or submit jobs like this ./bin/flink run ./examples/streaming/TopSpeedWindowing.jar.

    We recommend using Docker Compose or for deploying Flink in Session Mode to ease system configuration.

    Application Mode

    A Flink Application cluster is a dedicated cluster which runs a single job. In this case, you deploy the cluster with the job as one step, thus, there is no extra job submission needed.

    The job artifacts are included into the class path of Flink’s JVM process within the container and consist of:

    • your job jar, which you would normally submit to a Session cluster and
    • all other necessary dependencies or resources, not included into Flink.

    To deploy a cluster for a single job with Docker, you need to

    • make job artifacts available locally in all containers under /opt/flink/usrlib,
    • start a JobManager container in the Application cluster mode
    • start the required number of TaskManager containers.

    To make the job artifacts available locally in the container, you can

    • either mount a volume (or multiple volumes) with the artifacts to /opt/flink/usrlib when you start the JobManager and TaskManagers:

      1. $ FLINK_PROPERTIES="jobmanager.rpc.address: jobmanager"
      2. $ docker network create flink-network
      3. $ docker run \
      4. --mount type=bind,src=/host/path/to/job/artifacts1,target=/opt/flink/usrlib/artifacts1 \
      5. --mount type=bind,src=/host/path/to/job/artifacts2,target=/opt/flink/usrlib/artifacts2 \
      6. --rm \
      7. --env FLINK_PROPERTIES="${FLINK_PROPERTIES}" \
      8. --name=jobmanager \
      9. --network flink-network \
      10. flink:1.14.4-scala_2.11 standalone-job \
      11. --job-classname com.job.ClassName \
      12. [--job-id <job id>] \
      13. [--fromSavepoint /path/to/savepoint [--allowNonRestoredState]] \
      14. [job arguments]
      15. $ docker run \
      16. --mount type=bind,src=/host/path/to/job/artifacts1,target=/opt/flink/usrlib/artifacts1 \
      17. --mount type=bind,src=/host/path/to/job/artifacts2,target=/opt/flink/usrlib/artifacts2 \
      18. --env FLINK_PROPERTIES="${FLINK_PROPERTIES}" \
      19. flink:1.14.4-scala_2.11 taskmanager
    • or extend the Flink image by writing a custom Dockerfile, build it and use it for starting the JobManager and TaskManagers:

      1. FROM flink
      2. ADD /host/path/to/job/artifacts/1 /opt/flink/usrlib/artifacts/1
      3. ADD /host/path/to/job/artifacts/2 /opt/flink/usrlib/artifacts/2
      1. $ docker build --tag flink_with_job_artifacts .
      2. $ docker run \
      3. flink_with_job_artifacts standalone-job \
      4. --job-classname com.job.ClassName \
      5. [--job-id <job id>] \
      6. [--fromSavepoint /path/to/savepoint [--allowNonRestoredState]] \
      7. [job arguments]
      8. $ docker run flink_with_job_artifacts taskmanager

    The standalone-job argument starts a JobManager container in the Application Mode.

    JobManager additional command line arguments

    You can provide the following additional command line arguments to the cluster entrypoint:

    • --job-classname <job class name>: Class name of the job to run.

      By default, Flink scans its class path for a JAR with a Main-Class or program-class manifest entry and chooses it as the job class. Use this command line argument to manually set the job class. This argument is required in case that no or more than one JAR with such a manifest entry is available on the class path.

    • --job-id <job id> (optional): Manually set a Flink job ID for the job (default: 00000000000000000000000000000000)

    • --fromSavepoint /path/to/savepoint (optional): Restore from a savepoint

    • --allowNonRestoredState (optional): Skip broken savepoint state

      Additionally you can specify this argument to allow that savepoint state is skipped which cannot be restored.

    If the main function of the user job main class accepts arguments, you can also pass them at the end of the docker run command.

    Per-Job Mode

    Per-Job Mode is not supported by Flink on Docker.

    Session Mode

    Local deployment in the Session Mode has already been described in the Getting Started section above.

    There are two distribution channels for the Flink Docker images:

    1. Flink images on Docker Hub apache/flink (managed by the Flink developers)

    We recommend using the official images on Docker Hub, as they are reviewed by Docker. The images on apache/flink are provided in case of delays in the review process by Docker.

    Launching an image named flink:latest will pull the latest image from Docker Hub. In order to use the images hosted in apache/flink, replace flink by apache/flink. Any of the image tags (starting from Flink 1.11.3) are available on apache/flink as well.

    Image tags

    The Flink Docker repository is hosted on Docker Hub and serves images of Flink version 1.2.1 and later. The source for these images can be found in the repository.

    Images for each supported combination of Flink and Scala versions are available, and tag aliases are provided for convenience.

    For example, you can use the following aliases:

    • flink:latestflink:<latest-flink>-scala_<latest-scala>
    • flink:1.11flink:1.11.<latest-flink-1.11>-scala_2.11

    Note It is recommended to always use an explicit version tag of the docker image that specifies both the needed Flink and Scala versions (for example flink:1.11-scala_2.12). This will avoid some class conflicts that can occur if the Flink and/or Scala versions used in the application are different from the versions provided by the docker image.

    Note Prior to Flink 1.5 version, Hadoop dependencies were always bundled with Flink. You can see that certain tags include the version of Hadoop, e.g. (e.g. -hadoop28). Beginning with Flink 1.5, image tags that omit the Hadoop version correspond to Hadoop-free releases of Flink that do not include a bundled Hadoop distribution.

    Passing configuration via environment variables

    When you run Flink image, you can also change its configuration options by setting the environment variable FLINK_PROPERTIES:

    1. $ FLINK_PROPERTIES="jobmanager.rpc.address: host
    2. taskmanager.numberOfTaskSlots: 3
    3. "
    4. $ docker run --env FLINK_PROPERTIES=${FLINK_PROPERTIES} flink:1.14.4-scala_2.11 <jobmanager|standalone-job|taskmanager>

    The jobmanager.rpc.address option must be configured, others are optional to set.

    The environment variable FLINK_PROPERTIES should contain a list of Flink cluster configuration options separated by new line, the same way as in the flink-conf.yaml. FLINK_PROPERTIES takes precedence over configurations in flink-conf.yaml.

    Provide custom configuration

    The configuration files (flink-conf.yaml, logging, hosts etc) are located in the /opt/flink/conf directory in the Flink image. To provide a custom location for the Flink configuration files, you can

    • either mount a volume with the custom configuration files to this path /opt/flink/conf when you run the Flink image:

      1. $ docker run \
      2. --mount type=bind,src=/host/path/to/custom/conf,target=/opt/flink/conf \
      3. flink:1.14.4-scala_2.11 <jobmanager|standalone-job|taskmanager>
    • or add them to your custom Flink image, build and run it:

    Using filesystem plugins

    As described in the documentation page: In order to use plugins they must be copied to the correct location in the Flink installation in the Docker container for them to work.

    If you want to enable plugins provided with Flink (in the opt/ directory of the Flink distribution), you can pass the environment variable ENABLE_BUILT_IN_PLUGINS when you run the Flink image. The ENABLE_BUILT_IN_PLUGINS should contain a list of plugin jar file names separated by . A valid plugin name is for example flink-s3-fs-hadoop-1.14.4.jar

    1. $ docker run \
    2. --env ENABLE_BUILT_IN_PLUGINS=flink-plugin1.jar;flink-plugin2.jar \
    3. flink:1.14.4-scala_2.11 <jobmanager|standalone-job|taskmanager>

    There are also more advanced ways for customizing the Flink image.

    To build a custom image which has Python and PyFlink prepared, you can refer to the following Dockerfile:

    1. FROM flink:1.14.4
    2. # install python3: it has updated Python to 3.9 in Debian 11 and so install Python 3.7 from source
    3. # it currently only supports Python 3.6, 3.7 and 3.8 in PyFlink officially.
    4. RUN apt-get update -y && \
    5. apt-get install -y build-essential libssl-dev zlib1g-dev libbz2-dev libffi-dev && \
    6. wget https://www.python.org/ftp/python/3.7.9/Python-3.7.9.tgz && \
    7. tar -xvf Python-3.7.9.tgz && \
    8. cd Python-3.7.9 && \
    9. ./configure --without-tests --enable-shared && \
    10. make -j6 && \
    11. make install && \
    12. ldconfig /usr/local/lib && \
    13. cd .. && rm -f Python-3.7.9.tgz && rm -rf Python-3.7.9 && \
    14. ln -s /usr/local/bin/python3 /usr/local/bin/python && \
    15. apt-get clean && \
    16. rm -rf /var/lib/apt/lists/*
    17. # install PyFlink
    18. RUN pip3 install apache-flink==1.14.4

    Note For Debian 10 and below, Python 3 could also be installed alternatively as following:

    1. RUN apt-get update -y && \
    2. apt-get install -y python3.7 python3-pip python3.7-dev && rm -rf /var/lib/apt/lists/*
    3. RUN ln -s /usr/bin/python3 /usr/bin/python
    1. $ docker build --tag pyflink:latest .

    Switch memory allocator

    Flink introduced jemalloc as default memory allocator to resolve memory fragmentation problem (please refer to FLINK-19125).

    You could switch back to use glibc as the memory allocator to restore the old behavior or if any unexpected memory consumption or problem observed (and please report the issue via JIRA or mailing list if you found any), by setting environment variable DISABLE_JEMALLOC as true:

    1. $ docker run \
    2. --env DISABLE_JEMALLOC=true \
    3. flink:1.14.4-scala_2.11 <jobmanager|standalone-job|taskmanager>

    For users that are still using glibc memory allocator, the can easily be reproduced, especially while savepoints or full checkpoints with RocksDBStateBackend are created. Setting the environment variable MALLOC_ARENA_MAX can avoid unlimited memory growth:

    1. $ docker run \
    2. --env MALLOC_ARENA_MAX=1 \
    3. flink:1.14.4-scala_2.11 <jobmanager|standalone-job|taskmanager>

    Advanced customization

    There are several ways in which you can further customize the Flink image:

    • install custom software (e.g. python)
    • enable (symlink) optional libraries or plugins from /opt/flink/opt into /opt/flink/lib or /opt/flink/plugins
    • add other libraries to /opt/flink/lib (e.g. Hadoop)
    • add other plugins to /opt/flink/plugins

    You can customize the Flink image in several ways:

    • override the container entry point with a custom script where you can run any bootstrap actions. At the end you can call the standard /docker-entrypoint.sh script of the Flink image with the same arguments as described in .

      The following example creates a custom entry point script which enables more libraries and plugins. The custom script, custom library and plugin are provided from a mounted volume. Then it runs the standard entry point script of the Flink image:

      1. # create custom_lib.jar
      2. # create custom_plugin.jar
      3. $ echo "
      4. # enable an optional library
      5. ln -fs /opt/flink/opt/flink-queryable-state-runtime-*.jar /opt/flink/lib/
      6. # enable a custom library
      7. ln -fs /mnt/custom_lib.jar /opt/flink/lib/
      8. mkdir -p /opt/flink/plugins/flink-s3-fs-hadoop
      9. # enable an optional plugin
      10. ln -fs /opt/flink/opt/flink-s3-fs-hadoop-*.jar /opt/flink/plugins/flink-s3-fs-hadoop/
      11. mkdir -p /opt/flink/plugins/custom_plugin
      12. # enable a custom plugin
      13. ln -fs /mnt/custom_plugin.jar /opt/flink/plugins/custom_plugin/
      14. /docker-entrypoint.sh <jobmanager|standalone-job|taskmanager>
      15. " > custom_entry_point_script.sh
      16. $ chmod 755 custom_entry_point_script.sh
      17. $ docker run \
      18. --mount type=bind,src=$(pwd),target=/mnt
      19. flink:1.14.4-scala_2.11 /mnt/custom_entry_point_script.sh
    • extend the Flink image by writing a custom Dockerfile and build a custom image:

      1. FROM flink
      2. RUN set -ex; apt-get update; apt-get -y install python
      3. ADD /host/path/to/flink-conf.yaml /container/local/path/to/custom/conf/flink-conf.yaml
      4. ADD /host/path/to/log4j.properties /container/local/path/to/custom/conf/log4j.properties
      5. RUN ln -fs /opt/flink/opt/flink-queryable-state-runtime-*.jar /opt/flink/lib/.
      6. RUN mkdir -p /opt/flink/plugins/flink-s3-fs-hadoop
      7. RUN ln -fs /opt/flink/opt/flink-s3-fs-hadoop-*.jar /opt/flink/plugins/flink-s3-fs-hadoop/.
      8. ENV VAR_NAME value

      Commands for building:

    is a way to run a group of Docker containers locally. The next sections show examples of configuration files to run Flink.

    Usage

    • Create the yaml files with the container configuration, check examples for:

      See also and how to customize the Flink Docker image for usage in the configuration files.

    • Launch a cluster in the foreground (use -d for background)

      1. $ docker-compose up
    • Scale the cluster up or down to N TaskManagers

      1. $ docker-compose scale taskmanager=<N>
    • Kill the cluster

    • Access Web UI

      When the cluster is running, you can visit the web UI at . You can also use the web UI to submit a job to a Session cluster.

    • To submit a job to a Session cluster via the command line, you can either

      • use Flink CLI on the host if it is installed:

        1. $ ./bin/flink run --detached --class ${JOB_CLASS_NAME} /job.jar
      • or copy the JAR to the JobManager container and submit the job using the from there, for example:

        1. $ JOB_CLASS_NAME="com.job.ClassName"
        2. $ JM_CONTAINER=$(docker ps --filter name=jobmanager --format={{.ID}}))
        3. $ docker cp path/to/jar "${JM_CONTAINER}":/job.jar
        4. $ docker exec -t -i "${JM_CONTAINER}" flink run -d -c ${JOB_CLASS_NAME} /job.jar

    Here, we provide the docker-compose.yml for Application Cluster.

    Note: For the Application Mode cluster, the artifacts must be available in the Flink containers, check details here. See also in the command for the jobmanager service.

    1. version: "2.2"
    2. services:
    3. jobmanager:
    4. image: flink:1.14.4-scala_2.11
    5. ports:
    6. - "8081:8081"
    7. command: standalone-job --job-classname com.job.ClassName [--job-id <job id>] [--fromSavepoint /path/to/savepoint [--allowNonRestoredState]] [job arguments]
    8. volumes:
    9. - /host/path/to/job/artifacts:/opt/flink/usrlib
    10. environment:
    11. - |
    12. FLINK_PROPERTIES=
    13. jobmanager.rpc.address: jobmanager
    14. parallelism.default: 2
    15. taskmanager:
    16. image: flink:1.14.4-scala_2.11
    17. depends_on:
    18. - jobmanager
    19. command: taskmanager
    20. scale: 1
    21. volumes:
    22. - /host/path/to/job/artifacts:/opt/flink/usrlib
    23. environment:
    24. - |
    25. FLINK_PROPERTIES=
    26. jobmanager.rpc.address: jobmanager
    27. taskmanager.numberOfTaskSlots: 2
    28. parallelism.default: 2

    As well as the docker-compose.yml for Session Cluster:

    1. version: "2.2"
    2. services:
    3. jobmanager:
    4. image: flink:1.14.4-scala_2.11
    5. ports:
    6. - "8081:8081"
    7. command: jobmanager
    8. environment:
    9. - |
    10. FLINK_PROPERTIES=
    11. jobmanager.rpc.address: jobmanager
    12. taskmanager:
    13. image: flink:1.14.4-scala_2.11
    14. depends_on:
    15. - jobmanager
    16. command: taskmanager
    17. scale: 1
    18. environment:
    19. - |
    20. FLINK_PROPERTIES=
    21. jobmanager.rpc.address: jobmanager
    22. taskmanager.numberOfTaskSlots: 2

    The is a container orchestration tool, that allows you to manage multiple containers deployed across multiple host machines.

    The following chapters contain examples of how to configure and start JobManager and TaskManager containers. You can adjust them accordingly to start a cluster. See also the Flink Docker image tags and for usage in the provided scripts.

    The port 8081 is exposed for the Flink Web UI access. If you run the swarm locally, you can visit the web UI at http://localhost:8081 after starting the cluster.

    Session Cluster with Docker Swarm

    Application Cluster with Docker Swarm

    1. $ FLINK_PROPERTIES="jobmanager.rpc.address: flink-jobmanager
    2. taskmanager.numberOfTaskSlots: 2
    3. "
    4. # Create overlay network
    5. $ docker network create -d overlay flink-job
    6. # Create the JobManager service
    7. $ docker service create \
    8. --name flink-jobmanager \
    9. --env FLINK_PROPERTIES="${FLINK_PROPERTIES}" \
    10. --mount type=bind,source=/host/path/to/job/artifacts,target=/opt/flink/usrlib \
    11. --publish 8081:8081 \
    12. --network flink-job \
    13. flink:1.14.4-scala_2.11 \
    14. standalone-job \
    15. --job-classname com.job.ClassName \
    16. [--job-id <job id>] \
    17. [--fromSavepoint /path/to/savepoint [--allowNonRestoredState]] \
    18. [job arguments]
    19. # Create the TaskManager service (scale this out as needed)
    20. $ docker service create \
    21. --name flink-job-taskmanager \
    22. --replicas 2 \
    23. --env FLINK_PROPERTIES="${FLINK_PROPERTIES}" \
    24. --mount type=bind,source=/host/path/to/job/artifacts,target=/opt/flink/usrlib \
    25. --network flink-job \
    26. flink:1.14.4-scala_2.11 \

    The example assumes that you run the swarm locally and expects the job artifacts to be in /host/path/to/job/artifacts. It also mounts the host path with the artifacts as a volume to the container’s path .