Native Kubernetes

    This Getting Started section guides you through setting up a fully functional Flink Cluster on Kubernetes.

    Kubernetes is a popular container-orchestration system for automating computer application deployment, scaling, and management. Flink’s native Kubernetes integration allows you to directly deploy Flink on a running Kubernetes cluster. Moreover, Flink is able to dynamically allocate and de-allocate TaskManagers depending on the required resources because it can directly talk to Kubernetes.

    Preparation

    The Getting Started section assumes a running Kubernetes cluster fulfilling the following requirements:

    • Kubernetes >= 1.9.
    • KubeConfig, which has access to list, create, delete pods and services, configurable via . You can verify permissions by running kubectl auth can-i <list|create|edit|delete> pods.
    • Enabled Kubernetes DNS.
    • default service account with permissions to create, delete pods.

    If you have problems setting up a Kubernetes cluster, then take a look at how to setup a Kubernetes cluster.

    Once you have your Kubernetes cluster running and kubectl is configured to point to it, you can launch a Flink cluster in Session Mode via

    Congratulations! You have successfully run a Flink application by deploying Flink on Kubernetes.

    For production use, we recommend deploying Flink Applications in the , as these modes provide a better isolation for the Applications.

    Application Mode

    The requires that the user code is bundled together with the Flink image because it runs the user code’s main() method on the cluster. The Application Mode makes sure that all Flink components are properly cleaned up after the termination of the application.

    The Flink community provides a base Docker image which can be used to bundle the user code:

    1. FROM flink
    2. RUN mkdir -p $FLINK_HOME/usrlib
    3. COPY /path/of/my-flink-job.jar $FLINK_HOME/usrlib/my-flink-job.jar

    After creating and publishing the Docker image under custom-image-name, you can start an Application cluster with the following command:

    1. $ ./bin/flink run-application \
    2. -Dkubernetes.cluster-id=my-first-application-cluster \
    3. -Dkubernetes.container.image=custom-image-name \
    4. local:///opt/flink/usrlib/my-flink-job.jar

    Note local is the only supported scheme in Application Mode.

    The kubernetes.cluster-id option specifies the cluster name and must be unique. If you do not specify this option, then Flink will generate a random name.

    The kubernetes.container.image option specifies the image to start the pods with.

    Once the application cluster is deployed you can interact with it:

    1. # List running job on the cluster
    2. $ ./bin/flink list --target kubernetes-application -Dkubernetes.cluster-id=my-first-application-cluster
    3. # Cancel running job
    4. $ ./bin/flink cancel --target kubernetes-application -Dkubernetes.cluster-id=my-first-application-cluster <jobId>

    You can override configurations set in conf/flink-conf.yaml by passing key-value pairs -Dkey=value to bin/flink.

    Per-Job Mode

    Flink on Kubernetes does not support Per-Job Cluster Mode.

    Session Mode

    You have seen the deployment of a Session cluster in the guide at the top of this page.

    The Session Mode can be executed in two modes:

    • attached mode (-Dexecution.attached=true): The kubernetes-session.sh stays alive and allows entering commands to control the running Flink cluster. For example, stop stops the running Session cluster. Type help to list all supported commands.

    In order to re-attach to a running Session cluster with the cluster id my-first-flink-cluster use the following command:

    1. $ ./bin/kubernetes-session.sh \
    2. -Dkubernetes.cluster-id=my-first-flink-cluster \
    3. -Dexecution.attached=true

    You can override configurations set in conf/flink-conf.yaml by passing key-value pairs -Dkey=value to bin/kubernetes-session.sh.

    Stop a Running Session Cluster

    In order to stop a running Session Cluster with cluster id my-first-flink-cluster you can either or use:

    The Kubernetes-specific configuration options are listed on the configuration page.

    Flink uses to communicate with Kubernetes APIServer to create/delete Kubernetes resources(e.g. Deployment, Pod, ConfigMap, Service, etc.), as well as watch the Pods and ConfigMaps. Except for the above Flink config options, some expert options of Fabric8 Kubernetes client could be configured via system properties or environment variables.

    For example, users could use the following Flink config options to set the concurrent max requests.

    1. containerized.master.env.KUBERNETES_MAX_CONCURRENT_REQUESTS: 200
    2. env.java.opts.jobmanager: "-Dkubernetes.max.concurrent.requests=200"

    Flink’s Web UI and REST endpoint can be exposed in several ways via the kubernetes.rest-service.exposed.type configuration option.

    • ClusterIP: Exposes the service on a cluster-internal IP. The Service is only reachable within the cluster. If you want to access the JobManager UI or submit job to the existing session, you need to start a local proxy. You can then use localhost:8081 to submit a Flink job to the session or view the dashboard.
    1. $ kubectl port-forward service/<ServiceName> 8081
    • NodePort: Exposes the service on each Node’s IP at a static port (the NodePort). <NodeIP>:<NodePort> can be used to contact the JobManager service.

    • LoadBalancer: Exposes the service externally using a cloud provider’s load balancer. Since the cloud provider and Kubernetes needs some time to prepare the load balancer, you may get a JobManager Web Interface in the client log. You can use kubectl get services/<cluster-id>-rest to get EXTERNAL-IP and construct the load balancer JobManager Web Interface manually http://<EXTERNAL-IP>:8081.

    Please refer to the official documentation on for more information.

    Logging

    The Kubernetes integration exposes conf/log4j-console.properties and conf/logback-console.xml as a ConfigMap to the pods. Changes to these files will be visible to a newly started cluster.

    Accessing the Logs

    By default, the JobManager and TaskManager will output the logs to the console and /opt/flink/log in each pod simultaneously. The STDOUT and STDERR output will only be redirected to the console. You can access them via

    1. $ kubectl logs <pod-name>

    If the pod is running, you can also use kubectl exec -it <pod-name> bash to tunnel in and view the logs or debug the process.

    Accessing the Logs of the TaskManagers

    Flink will automatically de-allocate idling TaskManagers in order to not waste resources. This behaviour can make it harder to access the logs of the respective pods. You can increase the time before idling TaskManagers are released by configuring so that you have more time to inspect the log files.

    Changing the Log Level Dynamically

    If you have configured your logger to , then you can dynamically adapt the log level by changing the respective ConfigMap (assuming that the cluster id is my-first-flink-cluster):

    1. $ kubectl edit cm flink-config-my-first-flink-cluster

    Using Plugins

    In order to use , you must copy them to the correct location in the Flink JobManager/TaskManager pod. You can use the built-in plugins without mounting a volume or building a custom Docker image. For example, use the following command to enable the S3 plugin for your Flink session cluster.

    Custom Docker Image

    If you want to use a custom Docker image, then you can specify it via the configuration option kubernetes.container.image. The Flink community provides a rich Flink Docker image which can be a good starting point. See for how to enable plugins, add dependencies and other options.

    Using Secrets

    is an object that contains a small amount of sensitive data such as a password, a token, or a key. Such information might otherwise be put in a pod specification or in an image. Flink on Kubernetes can use Secrets in two ways:

    • Using Secrets as files from a pod;

    • Using Secrets as environment variables;

    Using Secrets as Files From a Pod

    1. $ ./bin/kubernetes-session.sh -Dkubernetes.secrets=mysecret:/path/to/secret

    The username and password of the secret mysecret can then be found stored in the files /path/to/secret/username and /path/to/secret/password. For more details see the .

    Using Secrets as Environment Variables

    The following command will expose the secret mysecret as environment variable in the started pods:

    1. $ ./bin/kubernetes-session.sh -Dkubernetes.env.secretKeyRef=\
    2. env:SECRET_USERNAME,secret:mysecret,key:username;\
    3. env:SECRET_PASSWORD,secret:mysecret,key:password

    The env variable SECRET_USERNAME contains the username and the env variable SECRET_PASSWORD contains the password of the secret mysecret. For more details see the .

    For high availability on Kubernetes, you can use the existing high availability services.

    Configure the value of to greater than 1 to start standby JobManagers. It will help to achieve faster recovery. Notice that high availability should be enabled when starting standby JobManagers.

    Manual Resource Cleanup

    Flink uses to clean up all cluster components. All the Flink created resources, including ConfigMap, Service, and Pod, have the OwnerReference being set to deployment/<cluster-id>. When the deployment is deleted, all related resources will be deleted automatically.

    1. $ kubectl delete deployment/<cluster-id>

    Supported Kubernetes Versions

    Currently, all Kubernetes versions >= 1.9 are supported.

    Namespaces

    Namespaces in Kubernetes divide cluster resources between multiple users via . Flink on Kubernetes can use namespaces to launch Flink clusters. The namespace can be configured via kubernetes.namespace.

    RBAC

    Role-based access control (RBAC) is a method of regulating access to compute or network resources based on the roles of individual users within an enterprise. Users can configure RBAC roles and service accounts used by JobManager to access the Kubernetes API server within the Kubernetes cluster.

    Every namespace has a default service account. However, the default service account may not have the permission to create or delete pods within the Kubernetes cluster. Users may need to update the permission of the default service account or specify another service account that has the right role bound.

    1. $ kubectl create clusterrolebinding flink-role-binding-default --clusterrole=edit --serviceaccount=default:default

    If you do not want to use the default service account, use the following command to create a new flink-service-account service account and set the role binding. Then use the config option -Dkubernetes.service-account=flink-service-account to make the JobManager pod use the flink-service-account service account to create/delete TaskManager pods and leader ConfigMaps. Also this will allow the TaskManager to watch leader ConfigMaps to retrieve the address of JobManager and ResourceManager.

    Please refer to the official Kubernetes documentation on for more information.

    Pod Template

    Flink allows users to define the JobManager and TaskManager pods via template files. This allows to support advanced features that are not supported by Flink directly. Use kubernetes.pod-template-file to specify a local file that contains the pod definition. It will be used to initialize the JobManager and TaskManager. The main container should be defined with name flink-main-container. Please refer to the for more information.

    Some fields of the pod template will be overwritten by Flink. The mechanism for resolving effective field values can be categorized as follows:

    • Defined by Flink: User cannot configure it.

    • Defined by the user: User can freely specify this value. Flink framework won’t set any additional values and the effective value derives from the config option and the template.

      Precedence order: First an explicit config option value is taken, then the value in pod template and at last the default value of a config option if nothing is specified.

    • Merged with Flink: Flink will merge values for a setting with a user defined value (see precedence order for “Defined by the user”). Flink values have precedence in case of same name fields.

    Refer to the following tables for the full list of pod fields that will be overwritten. All the fields defined in the pod template that are not listed in the tables will be unaffected.

    Pod Metadata

    Pod Spec

    Main Container Spec

    Example of Pod Template

    1. kind: Pod
    2. metadata:
    3. name: jobmanager-pod-template
    4. spec:
    5. initContainers:
    6. image: artifacts-fetcher:latest
    7. # Use wget or other tools to get user jars from remote storage
    8. command: [ 'wget', 'https://path/of/StateMachineExample.jar', '-O', '/flink-artifact/myjob.jar' ]
    9. volumeMounts:
    10. - mountPath: /flink-artifact
    11. name: flink-artifact
    12. containers:
    13. # Do not change the main container name
    14. - name: flink-main-container
    15. resources:
    16. requests:
    17. ephemeral-storage: 2048Mi
    18. limits:
    19. ephemeral-storage: 2048Mi
    20. volumeMounts:
    21. - mountPath: /opt/flink/volumes/hostpath
    22. name: flink-volume-hostpath
    23. - mountPath: /opt/flink/artifacts
    24. name: flink-artifact
    25. - mountPath: /opt/flink/log
    26. name: flink-logs
    27. # Use sidecar container to push logs to remote storage or do some other debugging things
    28. - name: sidecar-log-collector
    29. image: sidecar-log-collector:latest
    30. command: [ 'command-to-upload', '/remote/path/of/flink-logs/' ]
    31. volumeMounts:
    32. - mountPath: /flink-logs
    33. name: flink-logs
    34. volumes:
    35. - name: flink-volume-hostpath
    36. hostPath:
    37. path: /tmp
    38. type: Directory
    39. - name: flink-artifact
    40. emptyDir: { }
    41. - name: flink-logs
    42. emptyDir: { }