Kubernetes Setup

    This page describes deploying a standalone Flink cluster on top of Kubernetes, using Flink’s standalone deployment. We generally recommend new users to deploy Flink on Kubernetes using .

    Preparation

    This guide expects a Kubernetes environment to be present. You can ensure that your Kubernetes setup is working by running a command like , which lists all connected Kubelets.

    If you want to run Kubernetes locally, we recommend using .

    Starting a Kubernetes Cluster (Session Mode)

    A Flink Session cluster is executed as a long-running Kubernetes Deployment. You can run multiple Flink jobs on a Session cluster. Each job needs to be submitted to the cluster after the cluster has been deployed.

    A Flink Session cluster deployment in Kubernetes has at least three components:

    • a Deployment which runs a
    • a Deployment for a pool of TaskManagers
    • a Service exposing the JobManager’s REST and UI ports

    Using the file contents provided in the , create the following files, and create the respective components with the kubectl command:

    Next, we set up a port forward to access the Flink UI and submit jobs:

    1. Run kubectl port-forward ${flink-jobmanager-pod} 8081:8081 to forward your jobmanager’s web ui port to local 8081.
    2. Navigate to http://localhost:8081 in your browser.
    3. Moreover, you could use the following command below to submit jobs to the cluster:
    1. $ ./bin/flink run -m localhost:8081 ./examples/streaming/TopSpeedWindowing.jar

    You can tear down the cluster using the following commands:

    1. $ kubectl delete -f jobmanager-service.yaml
    2. $ kubectl delete -f flink-configuration-configmap.yaml
    3. $ kubectl delete -f taskmanager-session-deployment.yaml
    4. $ kubectl delete -f jobmanager-session-deployment.yaml

    Application Mode

    A Flink Application cluster is a dedicated cluster which runs a single application, which needs to be available at deployment time.

    A basic Flink Application cluster deployment in Kubernetes has three components:

    • an Application which runs a JobManager
    • a Deployment for a pool of TaskManagers
    • a Service exposing the JobManager’s REST and UI ports

    Check and adjust them accordingly:

    The args attribute in the jobmanager-job.yaml has to specify the main class of the user job. See also how to specify the JobManager arguments to understand how to pass other args to the Flink image in the jobmanager-job.yaml.

    The job artifacts should be available from the job-artifacts-volume in . The definition examples mount the volume as a local directory of the host assuming that you create the components in a minikube cluster. If you do not use a minikube cluster, you can use any other type of volume, available in your Kubernetes cluster, to supply the job artifacts. Alternatively, you can build a custom image which already contains the artifacts instead.

    After creating , use the Application cluster specific resource definitions to launch the cluster with the kubectl command:

    1. $ kubectl create -f jobmanager-job.yaml
    2. $ kubectl create -f taskmanager-job-deployment.yaml
    1. $ kubectl delete -f taskmanager-job-deployment.yaml
    2. $ kubectl delete -f jobmanager-job.yaml

    Session Mode

    Deployment of a Session cluster is explained in the Getting Started guide at the top of this page.

    All configuration options are listed on the configuration page. Configuration options can be added to the flink-conf.yaml section of the flink-configuration-configmap.yaml config map.

    You can then access the Flink UI and submit jobs via different ways:

    • kubectl proxy:

      1. Run kubectl proxy in a terminal.
      2. Navigate to http://localhost:8001/api/v1/namespaces/default/services/flink-jobmanager:webui/proxy in your browser.
    • kubectl port-forward:

      1. Run kubectl port-forward ${flink-jobmanager-pod} 8081:8081 to forward your jobmanager’s web ui port to local 8081.
      2. Navigate to in your browser.
      3. Moreover, you can use the following command below to submit jobs to the cluster:
      1. $ ./bin/flink run -m localhost:8081 ./examples/streaming/TopSpeedWindowing.jar
    • Create a NodePort service on the rest service of jobmanager:

      1. Run kubectl create -f jobmanager-rest-service.yaml to create the NodePort service on jobmanager. The example of jobmanager-rest-service.yaml can be found in appendix.
      2. Run kubectl get svc flink-jobmanager-rest to know the node-port of this service and navigate to in your browser.
      3. If you use minikube, you can get its public ip by running minikube ip.
      4. Similarly to the port-forward solution, you can also use the following command below to submit jobs to the cluster:
      1. $ ./bin/flink run -m <public-node-ip>:<node-port> ./examples/streaming/TopSpeedWindowing.jar

    Debugging and Log Access

    Many common errors are easy to detect by checking Flink’s log files. If you have access to Flink’s web user interface, you can access the JobManager and TaskManager logs from there.

    If there are problems starting Flink, you can also use Kubernetes utilities to access the logs. Use kubectl get pods to see all running pods. For the quickstart example from above, you should see three pods:

    You can now access the logs by running kubectl logs flink-jobmanager-589967dcfc-m49xv

    High-Availability with Standalone Kubernetes

    For high availability on Kubernetes, you can use the existing high availability services.

    Kubernetes High-Availability Services

    Session Mode and Application Mode clusters support using the Kubernetes high availability service. You need to add the following Flink config options to .

    Note The filesystem which corresponds to the scheme of your configured HA storage directory must be available to the runtime. Refer to custom Flink image and for more information.

    1. apiVersion: v1
    2. kind: ConfigMap
    3. metadata:
    4. name: flink-config
    5. labels:
    6. app: flink
    7. data:
    8. flink-conf.yaml: |+
    9. ...
    10. kubernetes.cluster-id: <cluster-id>
    11. high-availability: org.apache.flink.kubernetes.highavailability.KubernetesHaServicesFactory
    12. high-availability.storageDir: hdfs:///flink/recovery
    13. restart-strategy: fixed-delay
    14. restart-strategy.fixed-delay.attempts: 10
    15. ...

    Moreover, you have to start the JobManager and TaskManager pods with a service account which has the permissions to create, edit, delete ConfigMaps. See how to configure service accounts for pods for more information.

    When High-Availability is enabled, Flink will use its own HA-services for service discovery. Therefore, JobManager pods should be started with their IP address instead of a Kubernetes service as its jobmanager.rpc.address. Refer to the for full configuration.

    Standby JobManagers

    Usually, it is enough to only start a single JobManager pod, because Kubernetes will restart it once the pod crashes. If you want to achieve faster recovery, configure the replicas in jobmanager-session-deployment-ha.yaml or parallelism in jobmanager-application-ha.yaml to a value greater than 1 to start standby JobManagers.

    Enabling Queryable State

    1. Run kubectl create -f taskmanager-query-state-service.yaml to create the NodePort service for the taskmanager pod. The example of taskmanager-query-state-service.yaml can be found in appendix.
    2. Run kubectl get svc flink-taskmanager-query-state to get the <node-port> of this service. Then you can create the to submit state queries.

    Reactive Mode allows to run Flink in a mode, where the Application Cluster is always adjusting the job parallelism to the available resources. In combination with Kubernetes, the replica count of the TaskManager deployment determines the available resources. Increasing the replica count will scale up the job, reducing it will trigger a scale down. This can also be done automatically by using a .

    To use Reactive Mode on Kubernetes, follow the same steps as for deploying a job using an Application Cluster. But instead of flink-configuration-configmap.yaml use this config map: flink-reactive-mode-configuration-configmap.yaml. It contains the scheduler-mode: reactive setting for Flink.

    Once you have deployed the Application Cluster, you can scale your job up or down by changing the replica count in the flink-taskmanager deployment.

    Enabling Local Recovery Across Pod Restarts

    In order to speed up recoveries in case of pod failures, you can leverage Flink’s working directory feature together with local recovery. If the working directory is configured to reside on a persistent volume that gets remounted to a restarted TaskManager pod, then Flink is able to recover state locally. With the , Kubernetes gives you the exact tool you need to map a pod to a persistent volume.

    Deploying TaskManagers as a StatefulSet, allows you to configure a volume claim template that is used to mount persistent volumes to the TaskManagers. Additionally, you need to configure a deterministic taskmanager.resource-id. A suitable value is the pod name, that you expose using environment variables. For an example StatefulSet configuration take a look at the .

    Back to top

    Common cluster resource definitions

    flink-configuration-configmap.yaml

    1. apiVersion: v1
    2. kind: ConfigMap
    3. metadata:
    4. name: flink-config
    5. labels:
    6. app: flink
    7. data:
    8. flink-conf.yaml: |+
    9. jobmanager.rpc.address: flink-jobmanager
    10. taskmanager.numberOfTaskSlots: 2
    11. blob.server.port: 6124
    12. jobmanager.rpc.port: 6123
    13. taskmanager.rpc.port: 6122
    14. queryable-state.proxy.ports: 6125
    15. jobmanager.memory.process.size: 1600m
    16. taskmanager.memory.process.size: 1728m
    17. parallelism.default: 2
    18. log4j-console.properties: |+
    19. # This affects logging for both user code and Flink
    20. rootLogger.level = INFO
    21. rootLogger.appenderRef.console.ref = ConsoleAppender
    22. rootLogger.appenderRef.rolling.ref = RollingFileAppender
    23. # Uncomment this if you want to _only_ change Flink's logging
    24. #logger.flink.name = org.apache.flink
    25. #logger.flink.level = INFO
    26. # The following lines keep the log level of common libraries/connectors on
    27. # log level INFO. The root logger does not override this. You have to manually
    28. # change the log levels here.
    29. logger.akka.name = akka
    30. logger.akka.level = INFO
    31. logger.kafka.name= org.apache.kafka
    32. logger.kafka.level = INFO
    33. logger.hadoop.name = org.apache.hadoop
    34. logger.hadoop.level = INFO
    35. logger.zookeeper.name = org.apache.zookeeper
    36. logger.zookeeper.level = INFO
    37. # Log all infos to the console
    38. appender.console.name = ConsoleAppender
    39. appender.console.type = CONSOLE
    40. appender.console.layout.type = PatternLayout
    41. appender.console.layout.pattern = %d{yyyy-MM-dd HH:mm:ss,SSS} %-5p %-60c %x - %m%n
    42. # Log all infos in the given rolling file
    43. appender.rolling.name = RollingFileAppender
    44. appender.rolling.type = RollingFile
    45. appender.rolling.append = false
    46. appender.rolling.fileName = ${sys:log.file}
    47. appender.rolling.filePattern = ${sys:log.file}.%i
    48. appender.rolling.layout.type = PatternLayout
    49. appender.rolling.layout.pattern = %d{yyyy-MM-dd HH:mm:ss,SSS} %-5p %-60c %x - %m%n
    50. appender.rolling.policies.type = Policies
    51. appender.rolling.policies.size.type = SizeBasedTriggeringPolicy
    52. appender.rolling.policies.size.size=100MB
    53. appender.rolling.strategy.type = DefaultRolloverStrategy
    54. appender.rolling.strategy.max = 10
    55. # Suppress the irrelevant (wrong) warnings from the Netty channel handler
    56. logger.netty.name = org.jboss.netty.channel.DefaultChannelPipeline
    57. logger.netty.level = OFF

    flink-reactive-mode-configuration-configmap.yaml

    1. apiVersion: v1
    2. kind: ConfigMap
    3. metadata:
    4. name: flink-config
    5. labels:
    6. app: flink
    7. data:
    8. flink-conf.yaml: |+
    9. jobmanager.rpc.address: flink-jobmanager
    10. taskmanager.numberOfTaskSlots: 2
    11. blob.server.port: 6124
    12. jobmanager.rpc.port: 6123
    13. taskmanager.rpc.port: 6122
    14. queryable-state.proxy.ports: 6125
    15. jobmanager.memory.process.size: 1600m
    16. taskmanager.memory.process.size: 1728m
    17. parallelism.default: 2
    18. scheduler-mode: reactive
    19. execution.checkpointing.interval: 10s
    20. log4j-console.properties: |+
    21. # This affects logging for both user code and Flink
    22. rootLogger.level = INFO
    23. rootLogger.appenderRef.console.ref = ConsoleAppender
    24. rootLogger.appenderRef.rolling.ref = RollingFileAppender
    25. # Uncomment this if you want to _only_ change Flink's logging
    26. #logger.flink.name = org.apache.flink
    27. #logger.flink.level = INFO
    28. # The following lines keep the log level of common libraries/connectors on
    29. # log level INFO. The root logger does not override this. You have to manually
    30. # change the log levels here.
    31. logger.akka.name = akka
    32. logger.akka.level = INFO
    33. logger.kafka.name= org.apache.kafka
    34. logger.kafka.level = INFO
    35. logger.hadoop.name = org.apache.hadoop
    36. logger.hadoop.level = INFO
    37. logger.zookeeper.name = org.apache.zookeeper
    38. logger.zookeeper.level = INFO
    39. # Log all infos to the console
    40. appender.console.name = ConsoleAppender
    41. appender.console.type = CONSOLE
    42. appender.console.layout.type = PatternLayout
    43. appender.console.layout.pattern = %d{yyyy-MM-dd HH:mm:ss,SSS} %-5p %-60c %x - %m%n
    44. # Log all infos in the given rolling file
    45. appender.rolling.name = RollingFileAppender
    46. appender.rolling.type = RollingFile
    47. appender.rolling.append = false
    48. appender.rolling.fileName = ${sys:log.file}
    49. appender.rolling.filePattern = ${sys:log.file}.%i
    50. appender.rolling.layout.type = PatternLayout
    51. appender.rolling.layout.pattern = %d{yyyy-MM-dd HH:mm:ss,SSS} %-5p %-60c %x - %m%n
    52. appender.rolling.policies.type = Policies
    53. appender.rolling.policies.size.type = SizeBasedTriggeringPolicy
    54. appender.rolling.policies.size.size=100MB
    55. appender.rolling.strategy.type = DefaultRolloverStrategy
    56. appender.rolling.strategy.max = 10
    57. # Suppress the irrelevant (wrong) warnings from the Netty channel handler
    58. logger.netty.name = org.jboss.netty.channel.DefaultChannelPipeline

    jobmanager-service.yaml Optional service, which is only necessary for non-HA mode.

    1. apiVersion: v1
    2. kind: Service
    3. metadata:
    4. name: flink-jobmanager
    5. spec:
    6. type: ClusterIP
    7. ports:
    8. - name: rpc
    9. port: 6123
    10. - name: blob-server
    11. port: 6124
    12. - name: webui
    13. port: 8081
    14. selector:
    15. app: flink
    16. component: jobmanager

    jobmanager-rest-service.yaml. Optional service, that exposes the jobmanager rest port as public Kubernetes node’s port.

    1. apiVersion: v1
    2. kind: Service
    3. metadata:
    4. name: flink-jobmanager-rest
    5. spec:
    6. type: NodePort
    7. ports:
    8. - name: rest
    9. port: 8081
    10. targetPort: 8081
    11. nodePort: 30081
    12. selector:
    13. app: flink
    14. component: jobmanager

    taskmanager-query-state-service.yaml. Optional service, that exposes the TaskManager port to access the queryable state as a public Kubernetes node’s port.

    1. apiVersion: v1
    2. kind: Service
    3. metadata:
    4. name: flink-taskmanager-query-state
    5. spec:
    6. type: NodePort
    7. ports:
    8. - name: query-state
    9. port: 6125
    10. targetPort: 6125
    11. nodePort: 30025
    12. selector:
    13. app: flink
    14. component: taskmanager

    Session cluster resource definitions

    jobmanager-session-deployment-non-ha.yaml

    jobmanager-session-deployment-ha.yaml

    1. apiVersion: apps/v1
    2. kind: Deployment
    3. metadata:
    4. name: flink-jobmanager
    5. spec:
    6. replicas: 1 # Set the value to greater than 1 to start standby JobManagers
    7. selector:
    8. matchLabels:
    9. app: flink
    10. component: jobmanager
    11. template:
    12. metadata:
    13. labels:
    14. app: flink
    15. component: jobmanager
    16. spec:
    17. containers:
    18. - name: jobmanager
    19. image: apache/flink:1.15.0-scala_2.12
    20. env:
    21. - name: POD_IP
    22. valueFrom:
    23. fieldRef:
    24. apiVersion: v1
    25. fieldPath: status.podIP
    26. # The following args overwrite the value of jobmanager.rpc.address configured in the configuration config map to POD_IP.
    27. args: ["jobmanager", "$(POD_IP)"]
    28. ports:
    29. - containerPort: 6123
    30. name: rpc
    31. - containerPort: 6124
    32. name: blob-server
    33. - containerPort: 8081
    34. name: webui
    35. livenessProbe:
    36. tcpSocket:
    37. port: 6123
    38. initialDelaySeconds: 30
    39. periodSeconds: 60
    40. volumeMounts:
    41. - name: flink-config-volume
    42. mountPath: /opt/flink/conf
    43. securityContext:
    44. runAsUser: 9999 # refers to user _flink_ from official flink image, change if necessary
    45. serviceAccountName: flink-service-account # Service account which has the permissions to create, edit, delete ConfigMaps
    46. volumes:
    47. - name: flink-config-volume
    48. configMap:
    49. name: flink-config
    50. items:
    51. - key: flink-conf.yaml
    52. path: flink-conf.yaml
    53. - key: log4j-console.properties
    54. path: log4j-console.properties

    taskmanager-session-deployment.yaml

    1. apiVersion: apps/v1
    2. kind: Deployment
    3. metadata:
    4. name: flink-taskmanager
    5. spec:
    6. replicas: 2
    7. selector:
    8. matchLabels:
    9. app: flink
    10. component: taskmanager
    11. template:
    12. metadata:
    13. labels:
    14. app: flink
    15. component: taskmanager
    16. spec:
    17. containers:
    18. - name: taskmanager
    19. image: apache/flink:1.15.0-scala_2.12
    20. args: ["taskmanager"]
    21. ports:
    22. - containerPort: 6122
    23. name: rpc
    24. - containerPort: 6125
    25. name: query-state
    26. livenessProbe:
    27. tcpSocket:
    28. port: 6122
    29. initialDelaySeconds: 30
    30. periodSeconds: 60
    31. volumeMounts:
    32. - name: flink-config-volume
    33. mountPath: /opt/flink/conf/
    34. securityContext:
    35. runAsUser: 9999 # refers to user _flink_ from official flink image, change if necessary
    36. volumes:
    37. - name: flink-config-volume
    38. configMap:
    39. name: flink-config
    40. items:
    41. - key: flink-conf.yaml
    42. path: flink-conf.yaml
    43. - key: log4j-console.properties
    44. path: log4j-console.properties

    Application cluster resource definitions

    jobmanager-application-non-ha.yaml

    1. apiVersion: batch/v1
    2. kind: Job
    3. metadata:
    4. name: flink-jobmanager
    5. spec:
    6. template:
    7. metadata:
    8. labels:
    9. app: flink
    10. component: jobmanager
    11. spec:
    12. restartPolicy: OnFailure
    13. containers:
    14. - name: jobmanager
    15. image: apache/flink:1.15.0-scala_2.12
    16. env:
    17. args: ["standalone-job", "--job-classname", "com.job.ClassName", <optional arguments>, <job arguments>] # optional arguments: ["--job-id", "<job id>", "--fromSavepoint", "/path/to/savepoint", "--allowNonRestoredState"]
    18. ports:
    19. - containerPort: 6123
    20. name: rpc
    21. - containerPort: 6124
    22. name: blob-server
    23. - containerPort: 8081
    24. name: webui
    25. livenessProbe:
    26. tcpSocket:
    27. port: 6123
    28. initialDelaySeconds: 30
    29. periodSeconds: 60
    30. volumeMounts:
    31. - name: flink-config-volume
    32. mountPath: /opt/flink/conf
    33. - name: job-artifacts-volume
    34. mountPath: /opt/flink/usrlib
    35. securityContext:
    36. runAsUser: 9999 # refers to user _flink_ from official flink image, change if necessary
    37. volumes:
    38. - name: flink-config-volume
    39. configMap:
    40. name: flink-config
    41. items:
    42. - key: flink-conf.yaml
    43. path: flink-conf.yaml
    44. - key: log4j-console.properties
    45. path: log4j-console.properties
    46. - name: job-artifacts-volume
    47. hostPath:

    jobmanager-application-ha.yaml

    1. kind: Job
    2. metadata:
    3. name: flink-jobmanager
    4. spec:
    5. parallelism: 1 # Set the value to greater than 1 to start standby JobManagers
    6. template:
    7. metadata:
    8. labels:
    9. app: flink
    10. component: jobmanager
    11. spec:
    12. restartPolicy: OnFailure
    13. containers:
    14. - name: jobmanager
    15. image: apache/flink:1.15.0-scala_2.12
    16. env:
    17. - name: POD_IP
    18. valueFrom:
    19. fieldRef:
    20. apiVersion: v1
    21. fieldPath: status.podIP
    22. # The following args overwrite the value of jobmanager.rpc.address configured in the configuration config map to POD_IP.
    23. args: ["standalone-job", "--host", "$(POD_IP)", "--job-classname", "com.job.ClassName", <optional arguments>, <job arguments>] # optional arguments: ["--job-id", "<job id>", "--fromSavepoint", "/path/to/savepoint", "--allowNonRestoredState"]
    24. ports:
    25. - containerPort: 6123
    26. name: rpc
    27. - containerPort: 6124
    28. name: blob-server
    29. - containerPort: 8081
    30. name: webui
    31. livenessProbe:
    32. tcpSocket:
    33. port: 6123
    34. initialDelaySeconds: 30
    35. periodSeconds: 60
    36. volumeMounts:
    37. - name: flink-config-volume
    38. mountPath: /opt/flink/conf
    39. - name: job-artifacts-volume
    40. mountPath: /opt/flink/usrlib
    41. securityContext:
    42. runAsUser: 9999 # refers to user _flink_ from official flink image, change if necessary
    43. serviceAccountName: flink-service-account # Service account which has the permissions to create, edit, delete ConfigMaps
    44. volumes:
    45. - name: flink-config-volume
    46. configMap:
    47. name: flink-config
    48. items:
    49. - key: flink-conf.yaml
    50. path: flink-conf.yaml
    51. - key: log4j-console.properties
    52. path: log4j-console.properties
    53. - name: job-artifacts-volume
    54. hostPath:
    55. path: /host/path/to/job/artifacts

    taskmanager-job-deployment.yaml

    1. apiVersion: apps/v1
    2. kind: Deployment
    3. metadata:
    4. name: flink-taskmanager
    5. spec:
    6. replicas: 2
    7. selector:
    8. matchLabels:
    9. app: flink
    10. component: taskmanager
    11. template:
    12. metadata:
    13. labels:
    14. app: flink
    15. component: taskmanager
    16. spec:
    17. containers:
    18. - name: taskmanager
    19. image: apache/flink:1.15.0-scala_2.12
    20. env:
    21. args: ["taskmanager"]
    22. ports:
    23. - containerPort: 6122
    24. name: rpc
    25. - containerPort: 6125
    26. name: query-state
    27. livenessProbe:
    28. tcpSocket:
    29. port: 6122
    30. initialDelaySeconds: 30
    31. periodSeconds: 60
    32. volumeMounts:
    33. - name: flink-config-volume
    34. mountPath: /opt/flink/conf/
    35. - name: job-artifacts-volume
    36. mountPath: /opt/flink/usrlib
    37. securityContext:
    38. runAsUser: 9999 # refers to user _flink_ from official flink image, change if necessary
    39. volumes:
    40. - name: flink-config-volume
    41. configMap:
    42. name: flink-config
    43. items:
    44. - key: flink-conf.yaml
    45. path: flink-conf.yaml
    46. - key: log4j-console.properties
    47. path: log4j-console.properties
    48. - name: job-artifacts-volume
    49. hostPath:
    50. path: /host/path/to/job/artifacts
    1. apiVersion: v1
    2. kind: ConfigMap
    3. metadata:
    4. name: flink-config
    5. labels:
    6. app: flink
    7. data:
    8. flink-conf.yaml: |+
    9. jobmanager.rpc.address: flink-jobmanager
    10. taskmanager.numberOfTaskSlots: 2
    11. blob.server.port: 6124
    12. jobmanager.rpc.port: 6123
    13. taskmanager.rpc.port: 6122
    14. state.backend.local-recovery: true
    15. process.taskmanager.working-dir: /pv
    16. ---
    17. apiVersion: v1
    18. kind: Service
    19. metadata:
    20. name: taskmanager-hl
    21. spec:
    22. clusterIP: None
    23. selector:
    24. app: flink
    25. component: taskmanager
    26. ---
    27. apiVersion: apps/v1
    28. kind: StatefulSet
    29. metadata:
    30. name: flink-taskmanager
    31. spec:
    32. serviceName: taskmanager-hl
    33. replicas: 2
    34. selector:
    35. matchLabels:
    36. app: flink
    37. component: taskmanager
    38. template:
    39. metadata:
    40. labels:
    41. app: flink
    42. component: taskmanager
    43. spec:
    44. securityContext:
    45. runAsUser: 9999
    46. fsGroup: 9999
    47. containers:
    48. - name: taskmanager
    49. image: apache/flink:1.15.0-scala_2.12
    50. env:
    51. - name: POD_NAME
    52. valueFrom:
    53. fieldRef:
    54. fieldPath: metadata.name
    55. args: ["taskmanager", "-Dtaskmanager.resource-id=$(POD_NAME)"]
    56. ports:
    57. - containerPort: 6122
    58. name: rpc
    59. - containerPort: 6125
    60. name: query-state
    61. - containerPort: 6121
    62. name: metrics
    63. livenessProbe:
    64. tcpSocket:
    65. port: 6122
    66. initialDelaySeconds: 30
    67. periodSeconds: 60
    68. volumeMounts:
    69. - name: flink-config-volume
    70. mountPath: /opt/flink/conf/
    71. - name: pv
    72. mountPath: /pv
    73. volumes:
    74. - name: flink-config-volume
    75. configMap:
    76. name: flink-config
    77. items:
    78. - key: flink-conf.yaml
    79. path: flink-conf.yaml
    80. - key: log4j-console.properties
    81. path: log4j-console.properties
    82. volumeClaimTemplates:
    83. - metadata:
    84. name: pv
    85. spec:
    86. accessModes: [ "ReadWriteOnce" ]
    87. resources:
    88. requests: