ZooKeeper HA Services

    Flink leverages ZooKeeper for distributed coordination between all running JobManager instances. ZooKeeper is a separate service from Flink, which provides highly reliable distributed coordination via leader election and light-weight consistent state storage. Check out for more information about ZooKeeper. Flink includes scripts to bootstrap a simple ZooKeeper installation.

    In order to start an HA-cluster you have to configure the following configuration keys:

    • (required): The option has to be set to zookeeper.

    • high-availability.storageDir (required): JobManager metadata is persisted in the file system high-availability.storageDir and only a pointer to this state is stored in ZooKeeper.

      1. high-availability.storageDir: hdfs:///flink/recovery

      The storageDir stores all metadata needed to recover a JobManager failure.

    • (required): A ZooKeeper quorum is a replicated group of ZooKeeper servers, which provide the distributed coordination service.

    • high-availability.zookeeper.path.root (recommended): The root ZooKeeper node, under which all cluster nodes are placed.

      1. high-availability.zookeeper.path.root: /flink

    Configure high availability mode and ZooKeeper quorum in conf/flink-conf.yaml:

    1. high-availability.zookeeper.quorum: localhost:2181
    2. high-availability.zookeeper.path.root: /flink
    3. high-availability.cluster-id: /cluster_one # important: customize per cluster
    4. high-availability.storageDir: hdfs:///flink/recovery

    If ZooKeeper is running in secure mode with Kerberos, you can override the following configurations in flink-conf.yaml as necessary:

    For more information on Flink configuration for Kerberos security, please refer to the . You can also find further details on how Flink sets up Kerberos-based security internally.

    Tolerating Suspended ZooKeeper Connections

    This behaviour might be too disruptive in some cases (e.g., unstable network environment). If you are willing to take a more aggressive approach, then you can tolerate suspended ZooKeeper connections and only treat lost connections as an error via high-availability.zookeeper.client.tolerate-suspended-connections. Enabling this feature will make Flink more resilient against temporary connection problems but also increase the risk of running into ZooKeeper timing problems.

    For more information take a look at .

    Flink ships with separate ZooKeeper clients for 3.4 and 3.5, with 3.4 being in the lib directory of the distribution and thus used by default, whereas 3.5 is placed in the opt directory.

    The 3.5 client allows you to secure the ZooKeeper connection via SSL, but may not work with 3.4- ZooKeeper installations.

    You can control which version is used by Flink by placing either jar in the directory.

    If you don’t have a running ZooKeeper installation, you can use the helper scripts, which ship with Flink.

    1. server.X=addressX:peerPort:leaderPort
    2. server.Y=addressY:peerPort:leaderPort

    The script bin/start-zookeeper-quorum.sh will start a ZooKeeper server on each of the configured hosts. The started processes start ZooKeeper servers via a Flink wrapper, which reads the configuration from conf/zoo.cfg and makes sure to set some required configuration values for convenience. In production setups, it is recommended to manage your own ZooKeeper installation.