Kubernetes Setup

    This page describes deploying a standalone Flink cluster on top of Kubernetes, using Flink’s standalone deployment. We generally recommend new users to deploy Flink on Kubernetes using .

    Preparation

    This guide expects a Kubernetes environment to be present. You can ensure that your Kubernetes setup is working by running a command like , which lists all connected Kubelets.

    If you want to run Kubernetes locally, we recommend using .

    Starting a Kubernetes Cluster (Session Mode)

    A Flink Session cluster is executed as a long-running Kubernetes Deployment. You can run multiple Flink jobs on a Session cluster. Each job needs to be submitted to the cluster after the cluster has been deployed.

    A Flink Session cluster deployment in Kubernetes has at least three components:

    • a Deployment which runs a
    • a Deployment for a pool of TaskManagers
    • a Service exposing the JobManager’s REST and UI ports

    Using the file contents provided in the , create the following files, and create the respective components with the kubectl command:

    Next, we set up a port forward to access the Flink UI and submit jobs:

    1. Run kubectl port-forward ${flink-jobmanager-pod} 8081:8081 to forward your jobmanager’s web ui port to local 8081.
    2. Navigate to http://localhost:8081 in your browser.
    3. Moreover, you could use the following command below to submit jobs to the cluster:
    1. $ ./bin/flink run -m localhost:8081 ./examples/streaming/TopSpeedWindowing.jar

    You can tear down the cluster using the following commands:

    1. $ kubectl delete -f jobmanager-service.yaml
    2. $ kubectl delete -f flink-configuration-configmap.yaml
    3. $ kubectl delete -f taskmanager-session-deployment.yaml
    4. $ kubectl delete -f jobmanager-session-deployment.yaml

    Application Mode

    A Flink Application cluster is a dedicated cluster which runs a single application, which needs to be available at deployment time.

    A basic Flink Application cluster deployment in Kubernetes has three components:

    • an Application which runs a JobManager
    • a Deployment for a pool of TaskManagers
    • a Service exposing the JobManager’s REST and UI ports

    Check the Application cluster specific resource definitions and adjust them accordingly:

    The args attribute in the jobmanager-job.yaml has to specify the main class of the user job. See also to understand how to pass other args to the Flink image in the jobmanager-job.yaml.

    The job artifacts should be available from the job-artifacts-volume in the resource definition examples. The definition examples mount the volume as a local directory of the host assuming that you create the components in a minikube cluster. If you do not use a minikube cluster, you can use any other type of volume, available in your Kubernetes cluster, to supply the job artifacts. Alternatively, you can build which already contains the artifacts instead.

    After creating the common cluster components, use to launch the cluster with the kubectl command:

    1. $ kubectl create -f jobmanager-job.yaml
    2. $ kubectl create -f taskmanager-job-deployment.yaml
    1. $ kubectl delete -f taskmanager-job-deployment.yaml
    2. $ kubectl delete -f jobmanager-job.yaml

    Per-Job Mode

    Flink on Standalone Kubernetes does not support the Per-Job Cluster Mode.

    Deployment of a Session cluster is explained in the guide at the top of this page.

    Configuration

    All configuration options are listed on the . Configuration options can be added to the flink-conf.yaml section of the flink-configuration-configmap.yaml config map.

    You can then access the Flink UI and submit jobs via different ways:

    • kubectl proxy:

      1. Run kubectl proxy in a terminal.
      2. Navigate to in your browser.
    • kubectl port-forward:

      1. Run kubectl port-forward ${flink-jobmanager-pod} 8081:8081 to forward your jobmanager’s web ui port to local 8081.
      2. Navigate to http://localhost:8081 in your browser.
      3. Moreover, you can use the following command below to submit jobs to the cluster:
      1. $ ./bin/flink run -m localhost:8081 ./examples/streaming/TopSpeedWindowing.jar
    • Create a NodePort service on the rest service of jobmanager:

      1. Run kubectl create -f jobmanager-rest-service.yaml to create the NodePort service on jobmanager. The example of jobmanager-rest-service.yaml can be found in .
      2. Run kubectl get svc flink-jobmanager-rest to know the node-port of this service and navigate to http://<public-node-ip>:<node-port> in your browser.
      3. If you use minikube, you can get its public ip by running minikube ip.
      4. Similarly to the port-forward solution, you can also use the following command below to submit jobs to the cluster:

    Debugging and Log Access

    Many common errors are easy to detect by checking Flink’s log files. If you have access to Flink’s web user interface, you can access the JobManager and TaskManager logs from there.

    If there are problems starting Flink, you can also use Kubernetes utilities to access the logs. Use kubectl get pods to see all running pods. For the quickstart example from above, you should see three pods:

    1. $ kubectl get pods
    2. NAME READY STATUS RESTARTS AGE
    3. flink-jobmanager-589967dcfc-m49xv 1/1 Running 3 3m32s
    4. flink-taskmanager-64847444ff-7rdl4 1/1 Running 3 3m28s
    5. flink-taskmanager-64847444ff-nnd6m 1/1 Running 3 3m28s

    You can now access the logs by running kubectl logs flink-jobmanager-589967dcfc-m49xv

    High-Availability with Standalone Kubernetes

    For high availability on Kubernetes, you can use the .

    Kubernetes High-Availability Services

    Session Mode and Application Mode clusters support using the . You need to add the following Flink config options to flink-configuration-configmap.yaml.

    Note The filesystem which corresponds to the scheme of your configured HA storage directory must be available to the runtime. Refer to and enable plugins for more information.

    1. apiVersion: v1
    2. kind: ConfigMap
    3. metadata:
    4. name: flink-config
    5. labels:
    6. app: flink
    7. data:
    8. flink-conf.yaml: |+
    9. ...
    10. kubernetes.cluster-id: <cluster-id>
    11. high-availability: org.apache.flink.kubernetes.highavailability.KubernetesHaServicesFactory
    12. high-availability.storageDir: hdfs:///flink/recovery
    13. restart-strategy: fixed-delay
    14. restart-strategy.fixed-delay.attempts: 10
    15. ...

    Moreover, you have to start the JobManager and TaskManager pods with a service account which has the permissions to create, edit, delete ConfigMaps. See for more information.

    Standby JobManagers

    Usually, it is enough to only start a single JobManager pod, because Kubernetes will restart it once the pod crashes. If you want to achieve faster recovery, configure the replicas in jobmanager-session-deployment-ha.yaml or parallelism in jobmanager-application-ha.yaml to a value greater than 1 to start standby JobManagers.

    You can access the queryable state of TaskManager if you create a NodePort service for it:

    1. Run kubectl create -f taskmanager-query-state-service.yaml to create the NodePort service for the taskmanager pod. The example of taskmanager-query-state-service.yaml can be found in .
    2. Run kubectl get svc flink-taskmanager-query-state to get the <node-port> of this service. Then you can create the QueryableStateClient(<public-node-ip>, <node-port> to submit state queries.

    Using Standalone Kubernetes with Reactive Mode

    Reactive Mode allows to run Flink in a mode, where the Application Cluster is always adjusting the job parallelism to the available resources. In combination with Kubernetes, the replica count of the TaskManager deployment determines the available resources. Increasing the replica count will scale up the job, reducing it will trigger a scale down. This can also be done automatically by using a .

    To use Reactive Mode on Kubernetes, follow the same steps as for deploying a job using an Application Cluster. But instead of flink-configuration-configmap.yaml use this config map: flink-reactive-mode-configuration-configmap.yaml. It contains the scheduler-mode: reactive setting for Flink.

    Once you have deployed the Application Cluster, you can scale your job up or down by changing the replica count in the flink-taskmanager deployment.

    Common cluster resource definitions

    flink-configuration-configmap.yaml

    1. apiVersion: v1
    2. kind: ConfigMap
    3. metadata:
    4. name: flink-config
    5. labels:
    6. app: flink
    7. data:
    8. flink-conf.yaml: |+
    9. jobmanager.rpc.address: flink-jobmanager
    10. taskmanager.numberOfTaskSlots: 2
    11. blob.server.port: 6124
    12. jobmanager.rpc.port: 6123
    13. taskmanager.rpc.port: 6122
    14. queryable-state.proxy.ports: 6125
    15. jobmanager.memory.process.size: 1600m
    16. taskmanager.memory.process.size: 1728m
    17. parallelism.default: 2
    18. log4j-console.properties: |+
    19. # This affects logging for both user code and Flink
    20. rootLogger.level = INFO
    21. rootLogger.appenderRef.console.ref = ConsoleAppender
    22. rootLogger.appenderRef.rolling.ref = RollingFileAppender
    23. # Uncomment this if you want to _only_ change Flink's logging
    24. #logger.flink.name = org.apache.flink
    25. #logger.flink.level = INFO
    26. # The following lines keep the log level of common libraries/connectors on
    27. # log level INFO. The root logger does not override this. You have to manually
    28. # change the log levels here.
    29. logger.akka.name = akka
    30. logger.akka.level = INFO
    31. logger.kafka.name= org.apache.kafka
    32. logger.kafka.level = INFO
    33. logger.hadoop.name = org.apache.hadoop
    34. logger.hadoop.level = INFO
    35. logger.zookeeper.name = org.apache.zookeeper
    36. logger.zookeeper.level = INFO
    37. # Log all infos to the console
    38. appender.console.name = ConsoleAppender
    39. appender.console.type = CONSOLE
    40. appender.console.layout.type = PatternLayout
    41. appender.console.layout.pattern = %d{yyyy-MM-dd HH:mm:ss,SSS} %-5p %-60c %x - %m%n
    42. # Log all infos in the given rolling file
    43. appender.rolling.name = RollingFileAppender
    44. appender.rolling.type = RollingFile
    45. appender.rolling.append = false
    46. appender.rolling.fileName = ${sys:log.file}
    47. appender.rolling.filePattern = ${sys:log.file}.%i
    48. appender.rolling.layout.type = PatternLayout
    49. appender.rolling.layout.pattern = %d{yyyy-MM-dd HH:mm:ss,SSS} %-5p %-60c %x - %m%n
    50. appender.rolling.policies.type = Policies
    51. appender.rolling.policies.size.type = SizeBasedTriggeringPolicy
    52. appender.rolling.policies.size.size=100MB
    53. appender.rolling.strategy.type = DefaultRolloverStrategy
    54. appender.rolling.strategy.max = 10
    55. # Suppress the irrelevant (wrong) warnings from the Netty channel handler
    56. logger.netty.name = org.jboss.netty.channel.DefaultChannelPipeline
    57. logger.netty.level = OFF

    flink-reactive-mode-configuration-configmap.yaml

    1. apiVersion: v1
    2. kind: ConfigMap
    3. metadata:
    4. name: flink-config
    5. labels:
    6. app: flink
    7. data:
    8. flink-conf.yaml: |+
    9. jobmanager.rpc.address: flink-jobmanager
    10. taskmanager.numberOfTaskSlots: 2
    11. blob.server.port: 6124
    12. jobmanager.rpc.port: 6123
    13. taskmanager.rpc.port: 6122
    14. queryable-state.proxy.ports: 6125
    15. jobmanager.memory.process.size: 1600m
    16. taskmanager.memory.process.size: 1728m
    17. parallelism.default: 2
    18. scheduler-mode: reactive
    19. execution.checkpointing.interval: 10s
    20. log4j-console.properties: |+
    21. rootLogger.level = INFO
    22. rootLogger.appenderRef.console.ref = ConsoleAppender
    23. # Uncomment this if you want to _only_ change Flink's logging
    24. #logger.flink.name = org.apache.flink
    25. #logger.flink.level = INFO
    26. # The following lines keep the log level of common libraries/connectors on
    27. # log level INFO. The root logger does not override this. You have to manually
    28. # change the log levels here.
    29. logger.akka.name = akka
    30. logger.akka.level = INFO
    31. logger.kafka.name= org.apache.kafka
    32. logger.kafka.level = INFO
    33. logger.hadoop.name = org.apache.hadoop
    34. logger.hadoop.level = INFO
    35. logger.zookeeper.name = org.apache.zookeeper
    36. logger.zookeeper.level = INFO
    37. # Log all infos to the console
    38. appender.console.name = ConsoleAppender
    39. appender.console.type = CONSOLE
    40. appender.console.layout.type = PatternLayout
    41. appender.console.layout.pattern = %d{yyyy-MM-dd HH:mm:ss,SSS} %-5p %-60c %x - %m%n
    42. # Log all infos in the given rolling file
    43. appender.rolling.name = RollingFileAppender
    44. appender.rolling.type = RollingFile
    45. appender.rolling.append = false
    46. appender.rolling.fileName = ${sys:log.file}
    47. appender.rolling.filePattern = ${sys:log.file}.%i
    48. appender.rolling.layout.type = PatternLayout
    49. appender.rolling.layout.pattern = %d{yyyy-MM-dd HH:mm:ss,SSS} %-5p %-60c %x - %m%n
    50. appender.rolling.policies.type = Policies
    51. appender.rolling.policies.size.type = SizeBasedTriggeringPolicy
    52. appender.rolling.policies.size.size=100MB
    53. appender.rolling.strategy.type = DefaultRolloverStrategy
    54. appender.rolling.strategy.max = 10
    55. # Suppress the irrelevant (wrong) warnings from the Netty channel handler
    56. logger.netty.name = org.jboss.netty.channel.DefaultChannelPipeline
    57. logger.netty.level = OFF

    jobmanager-service.yaml Optional service, which is only necessary for non-HA mode.

    1. apiVersion: v1
    2. kind: Service
    3. metadata:
    4. name: flink-jobmanager
    5. spec:
    6. type: ClusterIP
    7. ports:
    8. - name: rpc
    9. port: 6123
    10. - name: blob-server
    11. port: 6124
    12. - name: webui
    13. port: 8081
    14. selector:
    15. app: flink
    16. component: jobmanager

    jobmanager-rest-service.yaml. Optional service, that exposes the jobmanager rest port as public Kubernetes node’s port.

    taskmanager-query-state-service.yaml. Optional service, that exposes the TaskManager port to access the queryable state as a public Kubernetes node’s port.

    1. apiVersion: v1
    2. kind: Service
    3. metadata:
    4. name: flink-taskmanager-query-state
    5. spec:
    6. type: NodePort
    7. ports:
    8. - name: query-state
    9. port: 6125
    10. targetPort: 6125
    11. nodePort: 30025
    12. selector:
    13. app: flink
    14. component: taskmanager

    Session cluster resource definitions

    jobmanager-session-deployment-non-ha.yaml

    1. apiVersion: apps/v1
    2. kind: Deployment
    3. metadata:
    4. name: flink-jobmanager
    5. spec:
    6. replicas: 1
    7. selector:
    8. matchLabels:
    9. app: flink
    10. component: jobmanager
    11. template:
    12. metadata:
    13. labels:
    14. app: flink
    15. component: jobmanager
    16. spec:
    17. containers:
    18. - name: jobmanager
    19. image: apache/flink:1.14.4-scala_2.11
    20. args: ["jobmanager"]
    21. ports:
    22. - containerPort: 6123
    23. name: rpc
    24. - containerPort: 6124
    25. name: blob-server
    26. - containerPort: 8081
    27. name: webui
    28. livenessProbe:
    29. tcpSocket:
    30. port: 6123
    31. initialDelaySeconds: 30
    32. periodSeconds: 60
    33. volumeMounts:
    34. - name: flink-config-volume
    35. mountPath: /opt/flink/conf
    36. securityContext:
    37. runAsUser: 9999 # refers to user _flink_ from official flink image, change if necessary
    38. volumes:
    39. - name: flink-config-volume
    40. configMap:
    41. name: flink-config
    42. items:
    43. - key: flink-conf.yaml
    44. path: flink-conf.yaml
    45. - key: log4j-console.properties
    46. path: log4j-console.properties

    jobmanager-session-deployment-ha.yaml

    1. apiVersion: apps/v1
    2. kind: Deployment
    3. metadata:
    4. name: flink-jobmanager
    5. spec:
    6. replicas: 1 # Set the value to greater than 1 to start standby JobManagers
    7. selector:
    8. matchLabels:
    9. app: flink
    10. component: jobmanager
    11. template:
    12. metadata:
    13. labels:
    14. app: flink
    15. component: jobmanager
    16. spec:
    17. containers:
    18. - name: jobmanager
    19. image: apache/flink:1.14.4-scala_2.11
    20. env:
    21. - name: POD_IP
    22. valueFrom:
    23. fieldRef:
    24. apiVersion: v1
    25. fieldPath: status.podIP
    26. # The following args overwrite the value of jobmanager.rpc.address configured in the configuration config map to POD_IP.
    27. args: ["jobmanager", "$(POD_IP)"]
    28. ports:
    29. - containerPort: 6123
    30. name: rpc
    31. - containerPort: 6124
    32. name: blob-server
    33. - containerPort: 8081
    34. name: webui
    35. livenessProbe:
    36. tcpSocket:
    37. port: 6123
    38. volumeMounts:
    39. - name: flink-config-volume
    40. mountPath: /opt/flink/conf
    41. securityContext:
    42. runAsUser: 9999 # refers to user _flink_ from official flink image, change if necessary
    43. serviceAccountName: flink-service-account # Service account which has the permissions to create, edit, delete ConfigMaps
    44. volumes:
    45. - name: flink-config-volume
    46. configMap:
    47. name: flink-config
    48. items:
    49. - key: flink-conf.yaml
    50. path: flink-conf.yaml
    51. - key: log4j-console.properties
    52. path: log4j-console.properties

    taskmanager-session-deployment.yaml

    1. apiVersion: apps/v1
    2. kind: Deployment
    3. metadata:
    4. name: flink-taskmanager
    5. spec:
    6. replicas: 2
    7. selector:
    8. matchLabels:
    9. app: flink
    10. component: taskmanager
    11. template:
    12. metadata:
    13. labels:
    14. app: flink
    15. component: taskmanager
    16. spec:
    17. containers:
    18. - name: taskmanager
    19. image: apache/flink:1.14.4-scala_2.11
    20. args: ["taskmanager"]
    21. ports:
    22. - containerPort: 6122
    23. name: rpc
    24. - containerPort: 6125
    25. name: query-state
    26. livenessProbe:
    27. tcpSocket:
    28. port: 6122
    29. initialDelaySeconds: 30
    30. periodSeconds: 60
    31. volumeMounts:
    32. - name: flink-config-volume
    33. mountPath: /opt/flink/conf/
    34. securityContext:
    35. runAsUser: 9999 # refers to user _flink_ from official flink image, change if necessary
    36. volumes:
    37. - name: flink-config-volume
    38. configMap:
    39. name: flink-config
    40. items:
    41. - key: flink-conf.yaml
    42. path: flink-conf.yaml
    43. - key: log4j-console.properties
    44. path: log4j-console.properties

    Application cluster resource definitions

    jobmanager-application-non-ha.yaml

    1. apiVersion: batch/v1
    2. kind: Job
    3. metadata:
    4. name: flink-jobmanager
    5. spec:
    6. template:
    7. metadata:
    8. labels:
    9. app: flink
    10. component: jobmanager
    11. spec:
    12. restartPolicy: OnFailure
    13. containers:
    14. - name: jobmanager
    15. image: apache/flink:1.14.4-scala_2.11
    16. env:
    17. args: ["standalone-job", "--job-classname", "com.job.ClassName", <optional arguments>, <job arguments>] # optional arguments: ["--job-id", "<job id>", "--fromSavepoint", "/path/to/savepoint", "--allowNonRestoredState"]
    18. ports:
    19. - containerPort: 6123
    20. name: rpc
    21. - containerPort: 6124
    22. name: blob-server
    23. - containerPort: 8081
    24. name: webui
    25. livenessProbe:
    26. tcpSocket:
    27. port: 6123
    28. initialDelaySeconds: 30
    29. periodSeconds: 60
    30. volumeMounts:
    31. - name: flink-config-volume
    32. mountPath: /opt/flink/conf
    33. - name: job-artifacts-volume
    34. mountPath: /opt/flink/usrlib
    35. securityContext:
    36. runAsUser: 9999 # refers to user _flink_ from official flink image, change if necessary
    37. volumes:
    38. - name: flink-config-volume
    39. configMap:
    40. name: flink-config
    41. items:
    42. - key: flink-conf.yaml
    43. path: flink-conf.yaml
    44. - key: log4j-console.properties
    45. path: log4j-console.properties
    46. - name: job-artifacts-volume
    47. hostPath:
    48. path: /host/path/to/job/artifacts

    jobmanager-application-ha.yaml

    taskmanager-job-deployment.yaml

    1. apiVersion: apps/v1
    2. kind: Deployment
    3. metadata:
    4. name: flink-taskmanager
    5. spec:
    6. replicas: 2
    7. selector:
    8. matchLabels:
    9. app: flink
    10. component: taskmanager
    11. template:
    12. metadata:
    13. labels:
    14. app: flink
    15. component: taskmanager
    16. spec:
    17. containers:
    18. - name: taskmanager
    19. image: apache/flink:1.14.4-scala_2.11
    20. env:
    21. args: ["taskmanager"]
    22. ports:
    23. - containerPort: 6122
    24. name: rpc
    25. - containerPort: 6125
    26. name: query-state
    27. livenessProbe:
    28. tcpSocket:
    29. port: 6122
    30. initialDelaySeconds: 30
    31. periodSeconds: 60
    32. volumeMounts:
    33. - name: flink-config-volume
    34. mountPath: /opt/flink/conf/
    35. - name: job-artifacts-volume
    36. mountPath: /opt/flink/usrlib
    37. securityContext:
    38. runAsUser: 9999 # refers to user _flink_ from official flink image, change if necessary
    39. volumes:
    40. - name: flink-config-volume
    41. configMap:
    42. name: flink-config
    43. items:
    44. - key: flink-conf.yaml
    45. path: flink-conf.yaml
    46. - key: log4j-console.properties
    47. path: log4j-console.properties
    48. - name: job-artifacts-volume