与 Apache Kafka 和 Apache Flink 进行数据集成

    1. 快速搭建 TiCDC 集群、Kafka 集群和 Flink 集群
    2. 创建 changefeed,将 TiDB 增量数据输出至 Kafka
    3. 使用 go-tpc 写入数据到上游 TiDB
    4. 使用 Kafka console consumer 观察数据被写入到指定的 Topic
    5. (可选)配置 Flink 集群消费 Kafka 内数据

    上述过程将会基于实验环境进行。你也可以参考上述执行步骤,搭建生产级别的集群。

    1. 部署包含 TiCDC 的 TiDB 集群。

      在实验或测试环境中,可以使用 TiUP Playground 功能,快速部署 TiCDC,命令如下:

      如果尚未安装 TiUP,可以参考。在生产环境下,可以参考 TiUP 安装部署 TiCDC 集群,完成 TiCDC 集群部署工作。

    2. 部署 Kafka 集群。

      • 实验环境,可以参考 启动 Kafka 集群。
    3. (可选)部署 Flink 集群。

      • 实验环境,可以参考 Apache Flink First steps 启动 Flink 集群。
      • 生产环境,可以参考 部署 Flink 生产集群。
    1. 创建 changefeed 配置文件。

      根据 Flink 的要求和规范,每张表的增量数据需要发送到独立的 Topic 中,并且每个事件需要按照主键值分发 Partition。因此,需要创建一个名为 的配置文件,填写如下内容:

      1. [sink]
      2. dispatchers = [
      3. {matcher = ['*.*'], topic = "tidb_{schema}_{table}", partition="index-value"},
      4. ]

      关于配置文件中 dispatchers 的详细解释,参考。

    2. 创建一个 changefeed,将增量数据输出到 Kafka:

      • 如果命令长时间没有返回,你需要检查当前执行命令所在服务器到 sink-uri 中指定的 Kafka 机器的网络可达性,保证二者之间的网络连接正常。

      生产环境下 Kafka 集群通常有多个 broker 节点,你可以在 sink-uri 中配置多个 broker 的访问地址,这有助于提升 changefeed 到 Kafka 集群访问的稳定性,当部分被配置的 Kafka 节点故障的时候,changefeed 依旧可以正常工作。假设 Kafka 集群中有 3 个 broker 节点,地址分别为 127.0.0.1:9092 / 127.0.0.2:9092 / 127.0.0.3:9092,可以参考如下 sink-uri 创建 changefeed:

      1. Changefeed 创建成功后,执行如下命令,查看 changefeed 的状态:

        1. tiup ctl:v<CLUSTER_VERSION> cdc changefeed list --server="http://127.0.0.1:8300"

        可以参考管理 Changefeed,对 changefeed 状态进行管理。

      完成以上步骤后,TiCDC 会将上游 TiDB 的增量数据变更日志发送到 Kafka,下面对 TiDB 写入数据,以产生增量数据变更日志。

      1. 模拟业务负载。

        在测试实验环境下,可以使用 go-tpc 向上游 TiDB 集群写入数据,以让 TiDB 产生事件变更数据。如下命令,首先在上游 TiDB 创建名为 tpcc 的数据库,然后使用 TiUP bench 写入数据到这个数据库中。

        关于 go-tpc 的更多详细内容,可以参考如何对 TiDB 进行 TPC-C 测试

      2. 消费 Kafka Topic 中的数据。

        changefeed 正常运行时,会向 Kafka Topic 写入数据,你可以通过由 Kafka 提供的 kafka-console-consumer.sh,观测到数据成功被写入到 Kafka Topic 中:

        1. ./bin/kafka-console-consumer.sh --bootstrap-server 127.0.0.1:9092 --from-beginning --topic `${topic-name}`
      1. 安装 Flink Kafka Connector。

        在 Flink 生态中,Flink Kafka Connector 用于消费 Kafka 中的数据并输出到 Flink 中。Flink Kafka Connector 并不是内建的,因此在 Flink 安装完毕后,还需要将 Flink Kafka Connector 及其依赖项添加到 Flink 安装目录中。下载下列 jar 文件至 Flink 安装目录下的 lib 目录中,如果你已经运行了 Flink 集群,请重启集群以加载新的插件。

      2. 创建一个表。

        你可以在 Flink 的安装目录执行如下命令,启动 Flink SQL 交互式客户端:

        随后,执行如下语句创建一个名为 tpcc_orders 的表:

        请将 topicproperties.bootstrap.servers 参数替换为环境中的实际值。

      3. 查询表内容。

        执行如下命令,查询 tpcc_orders 表中的数据:

          执行成功后,可以观察到有数据输出,如下图: