Kubernetes 安装

    本文描述了如何使用 Flink standalone 部署模式在 Kubernetes 上部署 standalone 模式的 Flink 集群。通常我们建议新用户使用 模式在 Kubernetes上部署 Flink。

    准备

    本指南假设存在一个 Kubernets 的运行环境。你可以通过运行 命令来确保 Kubernetes 环境运行正常,该命令展示所有连接到 Kubernets 集群的 node 节点信息。

    如果你想在本地运行 Kubernetes,建议使用 。

    Flink session 集群 是以一种长期运行的 Kubernetes Deployment 形式执行的。你可以在一个 session 集群 上运行多个 Flink 作业。当然,只有 session 集群部署好以后才可以在上面提交 Flink 作业。

    在 Kubernetes 上部署一个基本的 Flink session 集群 时,一般包括下面三个组件:

    • 运行 的 Deployment
    • 运行 TaskManagersDeployment
    • 暴露 JobManager 上 REST 和 UI 端口的 Service

    使用中提供的文件内容来创建以下文件,并使用 kubectl 命令来创建相应的组件:

    接下来,我们设置端口转发以访问 Flink UI 页面并提交作业:

    1. 运行 kubectl port-forward ${flink-jobmanager-pod} 8081:8081 将 jobmanager 的 web ui 端口映射到本地 8081。
    2. 在浏览器中导航到 http://localhost:8081 页面。
    3. 此外,也可以使用如下命令向集群提交作业:
    1. $ ./bin/flink run -m localhost:8081 ./examples/streaming/TopSpeedWindowing.jar

    可以使用以下命令停止运行 flink 集群:

    1. $ kubectl delete -f jobmanager-service.yaml
    2. $ kubectl delete -f flink-configuration-configmap.yaml
    3. $ kubectl delete -f taskmanager-session-deployment.yaml
    4. $ kubectl delete -f jobmanager-session-deployment.yaml

    Application 集群模式

    Flink Application 集群 是运行单个 Application 的专用集群,部署集群时要保证该 Application 可用。

    在 Kubernetes 上部署一个基本的 Flink Application 集群 时,一般包括下面三个组件:

    • 一个运行 JobManagerApplication
    • 运行若干个 TaskManager 的 Deployment;
    • 暴露 JobManager 上 REST 和 UI 端口的 Service;

    检查 Application 集群资源定义 并做出相应的调整:

    jobmanager-job.yaml 中的 args 属性必须指定用户作业的主类。也可以参考来了解如何将额外的 args 传递给 jobmanager-job.yaml 配置中指定的 Flink 镜像。

    job artifacts 参数必须可以从 资源定义示例 中的 job-artifacts-volume 处获取。假如是在 minikube 集群中创建这些组件,那么定义示例中的 job-artifacts-volume 可以挂载为主机的本地目录。如果不使用 minikube 集群,那么可以使用 Kubernetes 集群中任何其它可用类型的 volume 来提供 job artifacts。此外,还可以构建一个已经包含 job artifacts 参数的。

    在创建通用集群组件后,指定 文件,执行 kubectl 命令来启动 Flink Application 集群:

    1. $ kubectl create -f jobmanager-job.yaml
    2. $ kubectl create -f taskmanager-job-deployment.yaml
    1. $ kubectl delete -f taskmanager-job-deployment.yaml
    2. $ kubectl delete -f jobmanager-job.yaml

    Per-Job 集群模式

    在 Kubernetes 上部署 Standalone 集群时不支持 Per-Job 集群模式。

    本文档开始部分的指南中描述了 Session 集群模式的部署。

    Configuration

    所有配置项都展示在上。在 config map 配置文件 flink-configuration-configmap.yaml 中,可以将配置添加在 flink-conf.yaml 部分。

    接下来可以访问 Flink UI 页面并通过不同的方式提交作业:

    • kubectl proxy:

      1. 在终端运行 kubectl proxy 命令。
      2. 在浏览器中导航到 。
    • kubectl port-forward:

      1. 运行 kubectl port-forward ${flink-jobmanager-pod} 8081:8081 将 jobmanager 的 web ui 端口映射到本地的 8081。
      2. 在浏览器中导航到 http://localhost:8081
      3. 此外,也可以使用如下命令向集群提交作业:
      1. $ ./bin/flink run -m localhost:8081 ./examples/streaming/TopSpeedWindowing.jar
    • 基于 jobmanager 的 rest 服务上创建 NodePort service:

      1. 运行 kubectl create -f jobmanager-rest-service.yaml 来基于 jobmanager 创建 NodePort service。jobmanager-rest-service.yaml 的示例文件可以在 中找到。
      2. 运行 kubectl get svc flink-jobmanager-rest 来查询 server 的 node-port,然后再浏览器导航到 http://<public-node-ip>:<node-port>
      3. 如果使用 minikube 集群,可以执行 minikube ip 命令来查看 public ip。
      4. port-forward 方案类似,也可以使用如下命令向集群提交作业。

    调试和访问日志

    通过查看 Flink 的日志文件,可以很轻松地发现许多常见错误。如果你有权访问 Flink 的 Web 用户界面,那么可以在页面上访问 JobManager 和 TaskManager 日志。

    如果启动 Flink 出现问题,也可以使用 Kubernetes 工具集访问日志。使用 kubectl get pods 命令查看所有运行的 pods 资源。针对上面的快速入门示例,你可以看到三个 pod:

    1. $ kubectl get pods
    2. NAME READY STATUS RESTARTS AGE
    3. flink-jobmanager-589967dcfc-m49xv 1/1 Running 3 3m32s
    4. flink-taskmanager-64847444ff-7rdl4 1/1 Running 3 3m28s
    5. flink-taskmanager-64847444ff-nnd6m 1/1 Running 3 3m28s

    现在你可以通过运行 kubectl logs flink-jobmanager-589967dcfc-m49xv 来访问日志。

    高可用的 Standalone Kubernetes

    对于在 Kubernetes 上实现HA,可以参考当前的 。

    Kubernetes 高可用 Services

    Session 模式和 Application 模式集群都支持使用 。需要在 flink-configuration-configmap.yaml 中添加如下 Flink 配置项。

    Note 配置了 HA 存储目录相对应的文件系统必须在运行时可用。请参阅和启用文件系统插件获取更多相关信息。

    1. apiVersion: v1
    2. kind: ConfigMap
    3. metadata:
    4. name: flink-config
    5. labels:
    6. app: flink
    7. data:
    8. flink-conf.yaml: |+
    9. ...
    10. kubernetes.cluster-id: <cluster-id>
    11. high-availability: org.apache.flink.kubernetes.highavailability.KubernetesHaServicesFactory
    12. high-availability.storageDir: hdfs:///flink/recovery
    13. restart-strategy: fixed-delay
    14. restart-strategy.fixed-delay.attempts: 10
    15. ...

    此外,你必须使用具有创建、编辑、删除 ConfigMap 权限的 service 账号启动 JobManager 和 TaskManager pod。请查看获取更多信息。

    当启用了高可用,Flink 会使用自己的 HA 服务进行服务发现。因此,JobManager Pod 会使用 IP 地址而不是 Kubernetes 的 service 名称来作为 jobmanager.rpc.address 的配置项启动。完整配置请参考附录

    Standby JobManagers

    如果你为 TaskManager 创建了 NodePort service,那么你就可以访问 TaskManager 的 Queryable State 服务:

    1. 运行 kubectl create -f taskmanager-query-state-service.yaml 来为 taskmanager pod 创建 NodePort service。taskmanager-query-state-service.yaml 的示例文件可以从附录中找到。
    2. 运行 kubectl get svc flink-taskmanager-query-state 来查询 service 对应 node-port 的端口号。然后你就可以创建 来提交状态查询。

    在 Reactive 模式下使用 Standalone Kubernetes

    允许在 Application 集群 始终根据可用资源调整作业并行度的模式下运行 Flink。与 Kubernetes 结合使用,TaskManager 部署的副本数决定了可用资源。增加副本数将扩大作业规模,而减少副本数将会触发缩减作业规模。通过使用 Horizontal Pod Autoscaler 也可以自动实现该功能。

    要在 Kubernetes 上使用 Reactive Mode,请按照 完成相同的步骤。但是要使用 flink-reactive-mode-configuration-configmap.yaml 配置文件来代替 flink-configuration-configmap.yaml。该文件包含了针对 Flink 的 scheduler-mode: reactive 配置。

    一旦你部署了 Application 集群,就可以通过修改 flink-taskmanager 的部署副本数量来扩大或缩小作业的并行度。

    通用集群资源定义

    flink-configuration-configmap.yaml

    1. apiVersion: v1
    2. kind: ConfigMap
    3. metadata:
    4. name: flink-config
    5. labels:
    6. app: flink
    7. data:
    8. flink-conf.yaml: |+
    9. jobmanager.rpc.address: flink-jobmanager
    10. taskmanager.numberOfTaskSlots: 2
    11. blob.server.port: 6124
    12. jobmanager.rpc.port: 6123
    13. taskmanager.rpc.port: 6122
    14. queryable-state.proxy.ports: 6125
    15. jobmanager.memory.process.size: 1600m
    16. taskmanager.memory.process.size: 1728m
    17. parallelism.default: 2
    18. log4j-console.properties: |+
    19. # 如下配置会同时影响用户代码和 Flink 的日志行为
    20. rootLogger.level = INFO
    21. rootLogger.appenderRef.console.ref = ConsoleAppender
    22. rootLogger.appenderRef.rolling.ref = RollingFileAppender
    23. # 如果你只想改变 Flink 的日志行为则可以取消如下的注释部分
    24. #logger.flink.name = org.apache.flink
    25. #logger.flink.level = INFO
    26. # 下面几行将公共 libraries 或 connectors 的日志级别保持在 INFO 级别。
    27. # root logger 的配置不会覆盖此处配置。
    28. # 你必须手动修改这里的日志级别。
    29. logger.akka.name = akka
    30. logger.akka.level = INFO
    31. logger.kafka.name= org.apache.kafka
    32. logger.kafka.level = INFO
    33. logger.hadoop.name = org.apache.hadoop
    34. logger.hadoop.level = INFO
    35. logger.zookeeper.name = org.apache.zookeeper
    36. logger.zookeeper.level = INFO
    37. # 将所有 info 级别的日志输出到 console
    38. appender.console.name = ConsoleAppender
    39. appender.console.type = CONSOLE
    40. appender.console.layout.type = PatternLayout
    41. appender.console.layout.pattern = %d{yyyy-MM-dd HH:mm:ss,SSS} %-5p %-60c %x - %m%n
    42. # 将所有 info 级别的日志输出到指定的 rolling file
    43. appender.rolling.name = RollingFileAppender
    44. appender.rolling.type = RollingFile
    45. appender.rolling.append = false
    46. appender.rolling.fileName = ${sys:log.file}
    47. appender.rolling.filePattern = ${sys:log.file}.%i
    48. appender.rolling.layout.type = PatternLayout
    49. appender.rolling.layout.pattern = %d{yyyy-MM-dd HH:mm:ss,SSS} %-5p %-60c %x - %m%n
    50. appender.rolling.policies.type = Policies
    51. appender.rolling.policies.size.type = SizeBasedTriggeringPolicy
    52. appender.rolling.policies.size.size=100MB
    53. appender.rolling.strategy.type = DefaultRolloverStrategy
    54. appender.rolling.strategy.max = 10
    55. # 关闭 Netty channel handler 中不相关的(错误)警告
    56. logger.netty.name = org.jboss.netty.channel.DefaultChannelPipeline
    57. logger.netty.level = OFF

    flink-reactive-mode-configuration-configmap.yaml

    1. apiVersion: v1
    2. kind: ConfigMap
    3. metadata:
    4. name: flink-config
    5. labels:
    6. app: flink
    7. data:
    8. flink-conf.yaml: |+
    9. jobmanager.rpc.address: flink-jobmanager
    10. taskmanager.numberOfTaskSlots: 2
    11. blob.server.port: 6124
    12. jobmanager.rpc.port: 6123
    13. taskmanager.rpc.port: 6122
    14. queryable-state.proxy.ports: 6125
    15. jobmanager.memory.process.size: 1600m
    16. taskmanager.memory.process.size: 1728m
    17. parallelism.default: 2
    18. scheduler-mode: reactive
    19. execution.checkpointing.interval: 10s
    20. log4j-console.properties: |+
    21. # 如下配置会同时影响用户代码和 Flink 的日志行为
    22. rootLogger.appenderRef.console.ref = ConsoleAppender
    23. rootLogger.appenderRef.rolling.ref = RollingFileAppender
    24. # 如果你只想改变 Flink 的日志行为则可以取消如下的注释部分
    25. #logger.flink.name = org.apache.flink
    26. #logger.flink.level = INFO
    27. # 下面几行将公共 libraries 或 connectors 的日志级别保持在 INFO 级别。
    28. # root logger 的配置不会覆盖此处配置。
    29. # 你必须手动修改这里的日志级别。
    30. logger.akka.name = akka
    31. logger.akka.level = INFO
    32. logger.kafka.name= org.apache.kafka
    33. logger.kafka.level = INFO
    34. logger.hadoop.name = org.apache.hadoop
    35. logger.hadoop.level = INFO
    36. logger.zookeeper.name = org.apache.zookeeper
    37. logger.zookeeper.level = INFO
    38. # 将所有 info 级别的日志输出到 console
    39. appender.console.name = ConsoleAppender
    40. appender.console.type = CONSOLE
    41. appender.console.layout.type = PatternLayout
    42. appender.console.layout.pattern = %d{yyyy-MM-dd HH:mm:ss,SSS} %-5p %-60c %x - %m%n
    43. # 将所有 info 级别的日志输出到指定的 rolling file
    44. appender.rolling.name = RollingFileAppender
    45. appender.rolling.type = RollingFile
    46. appender.rolling.append = false
    47. appender.rolling.fileName = ${sys:log.file}
    48. appender.rolling.filePattern = ${sys:log.file}.%i
    49. appender.rolling.layout.type = PatternLayout
    50. appender.rolling.layout.pattern = %d{yyyy-MM-dd HH:mm:ss,SSS} %-5p %-60c %x - %m%n
    51. appender.rolling.policies.type = Policies
    52. appender.rolling.policies.size.type = SizeBasedTriggeringPolicy
    53. appender.rolling.policies.size.size=100MB
    54. appender.rolling.strategy.type = DefaultRolloverStrategy
    55. appender.rolling.strategy.max = 10
    56. # 关闭 Netty channel handler 中不相关的(错误)警告
    57. logger.netty.name = org.jboss.netty.channel.DefaultChannelPipeline
    58. logger.netty.level = OFF

    jobmanager-service.yaml 。可选的 service,仅在非 HA 模式下需要。

    1. apiVersion: v1
    2. kind: Service
    3. metadata:
    4. name: flink-jobmanager
    5. spec:
    6. type: ClusterIP
    7. ports:
    8. - name: rpc
    9. port: 6123
    10. - name: blob-server
    11. port: 6124
    12. - name: webui
    13. port: 8081
    14. selector:
    15. app: flink
    16. component: jobmanager

    jobmanager-rest-service.yaml。可选的 service,该 service 将 jobmanager 的 rest 端口暴露为公共 Kubernetes node 的节点端口。

    taskmanager-query-state-service.yaml。可选的 service,该 service 将 TaskManager 的端口暴露为公共 Kubernetes node 的节点端口,通过该端口来访问 queryable state 服务。

    1. apiVersion: v1
    2. kind: Service
    3. metadata:
    4. name: flink-taskmanager-query-state
    5. spec:
    6. type: NodePort
    7. ports:
    8. - name: query-state
    9. port: 6125
    10. targetPort: 6125
    11. nodePort: 30025
    12. selector:
    13. app: flink
    14. component: taskmanager

    Session 集群资源定义

    jobmanager-session-deployment-non-ha.yaml

    1. apiVersion: apps/v1
    2. kind: Deployment
    3. metadata:
    4. name: flink-jobmanager
    5. spec:
    6. replicas: 1
    7. selector:
    8. matchLabels:
    9. app: flink
    10. component: jobmanager
    11. template:
    12. metadata:
    13. labels:
    14. app: flink
    15. component: jobmanager
    16. spec:
    17. containers:
    18. - name: jobmanager
    19. image: apache/flink:1.15.0-scala_2.12
    20. args: ["jobmanager"]
    21. ports:
    22. - containerPort: 6123
    23. name: rpc
    24. - containerPort: 6124
    25. name: blob-server
    26. - containerPort: 8081
    27. name: webui
    28. livenessProbe:
    29. tcpSocket:
    30. port: 6123
    31. initialDelaySeconds: 30
    32. periodSeconds: 60
    33. volumeMounts:
    34. - name: flink-config-volume
    35. mountPath: /opt/flink/conf
    36. securityContext:
    37. runAsUser: 9999 # 参考官方 flink 镜像中的 _flink_ 用户,如有必要可以修改
    38. volumes:
    39. - name: flink-config-volume
    40. configMap:
    41. name: flink-config
    42. items:
    43. - key: flink-conf.yaml
    44. path: flink-conf.yaml
    45. - key: log4j-console.properties
    46. path: log4j-console.properties

    jobmanager-session-deployment-ha.yaml

    1. apiVersion: apps/v1
    2. kind: Deployment
    3. metadata:
    4. name: flink-jobmanager
    5. spec:
    6. replicas: 1 # 通过设置大于 1 的整型值来开启 Standby JobManager
    7. selector:
    8. matchLabels:
    9. app: flink
    10. component: jobmanager
    11. template:
    12. metadata:
    13. labels:
    14. app: flink
    15. component: jobmanager
    16. spec:
    17. containers:
    18. - name: jobmanager
    19. image: apache/flink:1.15.0-scala_2.12
    20. env:
    21. - name: POD_IP
    22. valueFrom:
    23. fieldRef:
    24. apiVersion: v1
    25. fieldPath: status.podIP
    26. # 下面的 args 参数会使用 POD_IP 对应的值覆盖 config mapjobmanager.rpc.address 的属性值。
    27. args: ["jobmanager", "$(POD_IP)"]
    28. ports:
    29. - containerPort: 6123
    30. name: rpc
    31. - containerPort: 6124
    32. name: blob-server
    33. - containerPort: 8081
    34. name: webui
    35. livenessProbe:
    36. tcpSocket:
    37. port: 6123
    38. initialDelaySeconds: 30
    39. volumeMounts:
    40. - name: flink-config-volume
    41. mountPath: /opt/flink/conf
    42. securityContext:
    43. runAsUser: 9999 # 参考官方 flink 镜像中的 _flink_ 用户,如有必要可以修改
    44. serviceAccountName: flink-service-account # 拥有创建、编辑、删除 ConfigMap 权限的 Service 账号
    45. volumes:
    46. - name: flink-config-volume
    47. configMap:
    48. name: flink-config
    49. items:
    50. - key: flink-conf.yaml
    51. path: flink-conf.yaml
    52. - key: log4j-console.properties
    53. path: log4j-console.properties

    taskmanager-session-deployment.yaml

    1. apiVersion: apps/v1
    2. kind: Deployment
    3. metadata:
    4. name: flink-taskmanager
    5. spec:
    6. replicas: 2
    7. selector:
    8. matchLabels:
    9. app: flink
    10. component: taskmanager
    11. template:
    12. metadata:
    13. labels:
    14. app: flink
    15. component: taskmanager
    16. spec:
    17. containers:
    18. - name: taskmanager
    19. image: apache/flink:1.15.0-scala_2.12
    20. args: ["taskmanager"]
    21. ports:
    22. - containerPort: 6122
    23. name: rpc
    24. - containerPort: 6125
    25. name: query-state
    26. livenessProbe:
    27. tcpSocket:
    28. port: 6122
    29. initialDelaySeconds: 30
    30. periodSeconds: 60
    31. volumeMounts:
    32. - name: flink-config-volume
    33. mountPath: /opt/flink/conf/
    34. securityContext:
    35. runAsUser: 9999 # 参考官方 flink 镜像中的 _flink_ 用户,如有必要可以修改
    36. volumes:
    37. - name: flink-config-volume
    38. configMap:
    39. name: flink-config
    40. items:
    41. - key: flink-conf.yaml
    42. path: flink-conf.yaml
    43. - key: log4j-console.properties
    44. path: log4j-console.properties

    Application 集群资源定义

    jobmanager-application-non-ha.yaml

    1. apiVersion: batch/v1
    2. kind: Job
    3. metadata:
    4. name: flink-jobmanager
    5. spec:
    6. template:
    7. metadata:
    8. labels:
    9. app: flink
    10. component: jobmanager
    11. spec:
    12. restartPolicy: OnFailure
    13. containers:
    14. - name: jobmanager
    15. image: apache/flink:1.15.0-scala_2.12
    16. env:
    17. args: ["standalone-job", "--job-classname", "com.job.ClassName", <optional arguments>, <job arguments>] # 可选的参数项: ["--job-id", "<job id>", "--fromSavepoint", "/path/to/savepoint", "--allowNonRestoredState"]
    18. ports:
    19. - containerPort: 6123
    20. name: rpc
    21. - containerPort: 6124
    22. name: blob-server
    23. - containerPort: 8081
    24. name: webui
    25. livenessProbe:
    26. tcpSocket:
    27. port: 6123
    28. initialDelaySeconds: 30
    29. periodSeconds: 60
    30. volumeMounts:
    31. - name: flink-config-volume
    32. mountPath: /opt/flink/conf
    33. - name: job-artifacts-volume
    34. mountPath: /opt/flink/usrlib
    35. securityContext:
    36. runAsUser: 9999 # 参考官方 flink 镜像中的 _flink_ 用户,如有必要可以修改
    37. volumes:
    38. - name: flink-config-volume
    39. configMap:
    40. name: flink-config
    41. items:
    42. - key: flink-conf.yaml
    43. path: flink-conf.yaml
    44. - key: log4j-console.properties
    45. path: log4j-console.properties
    46. - name: job-artifacts-volume
    47. hostPath:
    48. path: /host/path/to/job/artifacts

    jobmanager-application-ha.yaml

    taskmanager-job-deployment.yaml

    1. apiVersion: apps/v1
    2. kind: Deployment
    3. metadata:
    4. name: flink-taskmanager
    5. spec:
    6. replicas: 2
    7. selector:
    8. matchLabels:
    9. app: flink
    10. component: taskmanager
    11. template:
    12. metadata:
    13. labels:
    14. app: flink
    15. component: taskmanager
    16. spec:
    17. containers:
    18. - name: taskmanager
    19. image: apache/flink:1.15.0-scala_2.12
    20. env:
    21. args: ["taskmanager"]
    22. ports:
    23. - containerPort: 6122
    24. name: rpc
    25. - containerPort: 6125
    26. name: query-state
    27. livenessProbe:
    28. tcpSocket:
    29. port: 6122
    30. initialDelaySeconds: 30
    31. periodSeconds: 60
    32. volumeMounts:
    33. - name: flink-config-volume
    34. mountPath: /opt/flink/conf/
    35. - name: job-artifacts-volume
    36. mountPath: /opt/flink/usrlib
    37. securityContext:
    38. runAsUser: 9999 # 参考官方 flink 镜像中的 _flink_ 用户,如有必要可以修改
    39. volumes:
    40. - name: flink-config-volume
    41. configMap:
    42. name: flink-config
    43. items:
    44. - key: flink-conf.yaml
    45. path: flink-conf.yaml
    46. - key: log4j-console.properties
    47. path: log4j-console.properties
    48. - name: job-artifacts-volume
    49. hostPath: