与 Confluent Cloud 和 Snowflake 进行数据集成

    • 快速搭建包含 TiCDC 的 TiDB 集群
    • 创建将数据输出到 Confluent Cloud 的 changefeed
    • 创建将数据从 Confluent Cloud 输出到 Snowflake、ksqlDB 和 SQL Server 的连接器 (Connector)
    • 使用 go-tpc 写入数据到上游 TiDB,并观察 Snowflake、ksqlDB 和 SQL Server 中的数据

    上述过程将会基于实验环境进行,你也可以参考上述执行步骤,搭建生产级别的集群。

    1. 部署包含 TiCDC 的 TiDB 集群。

      在实验或测试环境中,可以使用 TiUP Playground 功能快速部署 TiCDC,命令如下:

      如果尚未安装 TiUP,可以参考安装 TiUP。在生产环境下,可以参考,完成 TiCDC 集群部署工作。

    2. 注册 Confluent Cloud 并创建 Confluent 集群。

      创建 Basic 集群并开放 Internet 访问,详见 Quick Start for Confluent Cloud

    第 2 步:创建 Access Key Pair

    1. 创建 Cluster API Key。

      在 Confluent 集群控制面板中依次点击 Data integration > API keys > Create key。在弹出的 Select scope for API key 页面,选择 Global access

      创建成功后会得到一个 Key Pair 文件,内容如下:

      1. API key:
      2. L5WWA4GK4NAT2EQV
      3. API secret:
      4. xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx
      5. Bootstrap server:
      6. xxx-xxxxx.ap-east-1.aws.confluent.cloud:9092
    2. 记录 Schema Registry Endpoints。

      在 Confluent 集群控制面板中,选择 Schema Registry > API endpoint,记录 Schema Registry Endpoints,如下:

      1. https://yyy-yyyyy.us-east-2.aws.confluent.cloud
    3. 创建 Schema Registry API key。

      在 Confluent 集群控制面板中,选择 Schema Registry > API credentials,点击 EditCreate key

      创建成功后会得到一个 Key Pair 文件,内容如下:

      1. === Confluent Cloud API key: yyy-yyyyy ===
      2. API key:
      3. 7NBH2CAFM2LMGTH7
      4. API secret:
      5. xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx

      以上步骤也可以通过 Confluent CLI 实现,详见 Connect Confluent CLI to Confluent Cloud Cluster

    1. 创建 changefeed 配置文件。

      根据 Avro 协议和 Confluent Connector 的要求和规范,每张表的增量数据需要发送到独立的 Topic 中,并且每个事件需要按照主键值分发 Partition。因此,需要创建一个名为 changefeed.conf 的配置文件,填写如下内容:

      1. [sink]
      2. dispatchers = [
      3. {matcher = ['*.*'], topic = "tidb_{schema}_{table}", partition="index-value"},
      4. ]

      关于配置文件中 dispatchers 的详细解释,参考自定义 Kafka Sink 的 Topic 和 Partition 的分发规则

    2. 创建一个 changefeed,将增量数据输出到 Confluent Cloud:

      1. tiup ctl:v<CLUSTER_VERSION> cdc changefeed create --server="http://127.0.0.1:8300" --sink-uri="kafka://<broker_endpoint>/ticdc-meta?protocol=avro&replication-factor=3&enable-tls=true&auto-create-topic=true&sasl-mechanism=plain&sasl-user=<broker_api_key>&sasl-password=<broker_api_secret>" --schema-registry="https://<schema_registry_api_key>:<schema_registry_api_secret>@<schema_registry_endpoint>" --changefeed-id="confluent-changefeed" --config changefeed.conf

      将如下字段替换为中创建和记录的值:

      • <broker_endpoint>
      • <broker_api_key>
      • <broker_api_secret>
      • <schema_registry_api_key>
      • <schema_registry_api_secret>
      • <schema_registry_endpoint>

      其中 <schema_registry_api_secret> 需要经过 HTML URL 编码后再替换,替换完毕后示例如下:

      • 如果命令执行成功,将会返回被创建的 changefeed 的相关信息,包含被创建的 changefeed 的 ID 以及相关信息,内容如下:

        1. Create changefeed successfully!
        2. ID: confluent-changefeed
        3. Info: {... changfeed info json struct ...}
      • 如果命令长时间没有返回,请检查当前执行命令所在服务器到 Confluent Cloud 之间网络可达性,参考 。

    第 4 步:写入数据以产生变更日志

    完成以上步骤后,TiCDC 会将上游 TiDB 的增量数据变更日志发送到 Confluent Cloud。本小节将对 TiDB 写入数据,以产生增量数据变更日志。

    1. 模拟业务负载。

      在测试实验环境下,可以使用 go-tpc 向上游 TiDB 集群写入数据,以让 TiDB 产生事件变更数据。执行以下命令,会首先在上游 TiDB 创建名为 tpcc 的数据库,然后使用 TiUP bench 写入数据到这个数据库中。

      1. tiup bench tpcc -H 127.0.0.1 -P 4000 -D tpcc --warehouses 4 prepare
      2. tiup bench tpcc -H 127.0.0.1 -P 4000 -D tpcc --warehouses 4 run --time 300s

      关于 go-tpc 的更多详细内容,可以参考。

    2. 观察 Confluent 中数据传输情况。

      在 Confluent 集群控制面板中,可以观察到相应的 Topic 已经被自动创建,并有数据正在写入。至此,TiDB 数据库中的增量数据就被成功输出到了 Confluent Cloud。

    Snowflake 是一种云原生数据仓库。借助 Confluent 的能力,你只需要创建 Snowflake Sink Connector,就可以将 TiDB 的增量数据输出到 Snowflake。

    集成步骤

    1. 在 Snowflake 中创建 Database 和 Schema。

      在 Snowflake 控制面板中,选择 Data > Database。创建名为 TPCC 的 Database 和名为 TiCDC 的 Schema。

    2. 在 Confluent 集群控制面板中,选择 Data integration > Connectors > Snowflake Sink,进入如下页面:

      Add snowflake sink connector

    3. 选择需要同步到 Snowflake 的 Topic 后,进入下一页面:

    4. 填写 Snowflake 连接认证信息,其中 Database name 和 Schema name 填写在上一步创建的 Database 和 Schema 名,随后进入下一页面:

      Configuration

    5. Configuration 页面中,record value formatrecord key format 都选择 AVRO,点击 Continue,直到 Connector 创建完成。等待 Connector 状态变为 RUNNING,这个过程可能持续数分钟。

    6. 在 Snowflake 控制面板中,选择 Data > Database > TPCC > TiCDC,可以观察到 TiDB 中的增量数据实时同步到了 Snowflake,如上图。但 Snowflake 中的表结构和 TiDB 中的表结构不同,数据也以“追加”的方式插入 Snowflake 表。在大多数业务场景中,都希望 Snowflake 中的表数据是 TiDB 表的一个副本,而不是存储 TiDB 表的变更日志。该问题将在下一章节解决。

    在上一章节,TiDB 的增量变更日志已经被同步到 Snowflake 中,本章节将介绍如何借助 Snowflake 的 TASK 和 STREAM 功能,将实时写入 Snowflake 的数据变更日志根据 INSERTUPDATEDELETE 等事件类型分别处理,写入一个与上游 TiDB 结构相同的表中,从而在 Snowflake 中创建一个数据副本。下面以 ITEM 表为例。

    1. `i_id` int(11) NOT NULL,
    2. `i_im_id` int(11) DEFAULT NULL,
    3. `i_name` varchar(24) DEFAULT NULL,
    4. `i_price` decimal(5,2) DEFAULT NULL,
    5. `i_data` varchar(50) DEFAULT NULL,
    6. PRIMARY KEY (`i_id`)
    7. );

    Snowflake 中存在一张名为 TIDB_TEST_ITEM 的表,这张表是 Confluent Snowflake Sink Connector 自动创建的,表结构如下:

    1. create or replace TABLE TIDB_TEST_ITEM (
    2. RECORD_METADATA VARIANT,
    3. RECORD_CONTENT VARIANT
    4. );
    1. 根据 TiDB 中的表结构,在 Snowflake 中创建结构相同的表:

    2. TIDB_TEST_ITEM 创建一个 STREAM,将 append_only 设为 true,表示仅接收 INSERT 事件。创建的 STREAM 可以实时捕获 TIDB_TEST_ITEMINSERT 事件,也就是说,当 TiDB 中 ITEM 有新的变更日志时,变更日志将会被插入到 TIDB_TEST_ITEM 表,然后被 STREAM 捕获。

      1. create or replace stream TEST_ITEM_STREAM on table TIDB_TEST_ITEM append_only=true;
    3. 处理 STREAM 中的数据,根据不同的事件类型,在 TEST_ITEM 表中插入、更新或删除 STREAM 数据。

      1. --将数据合并到 TEST_ITEM
      2. merge into TEST_ITEM n
      3. using
      4. -- 查询 TEST_ITEM_STREAM
      5. (SELECT RECORD_METADATA:key as k, RECORD_CONTENT:val as v from TEST_ITEM_STREAM) stm
      6. -- i_id 相等为条件将流和表做匹配
      7. on k:i_id = n.i_id
      8. -- 如果 TEST_ITEM 表中存在匹配 i_id 的记录,并且 v 为空,则删除这条记录
      9. when matched and IS_NULL_VALUE(v) = true then
      10. delete
      11. -- 如果 TEST_ITEM 表中存在匹配 i_id 的记录,并且 v 不为空,则更新这条记录
      12. when matched and IS_NULL_VALUE(v) = false then
      13. update set n.i_data = v:i_data, n.i_im_id = v:i_im_id, n.i_name = v:i_name, n.i_price = v:i_price
      14. -- 如果 TEST_ITEM 表中不存在匹配 i_id 的记录,则插入这条记录
      15. when not matched then
      16. insert
      17. (i_data, i_id, i_im_id, i_name, i_price)
      18. values
      19. (v:i_data, v:i_id, v:i_im_id, v:i_name, v:i_price)
      20. ;

      在上面的语句中,我们使用了 Snowflake 的 MERGE INTO 语句,这个语句可以根据条件将流和表做匹配,然后根据不同的匹配结果,执行不同的操作,比如删除、更新或插入。在这个例子中,我们使用了三个 WHEN 子句,分别对应了三种情况:

      • 当流和表匹配时,且流中的数据为空,则删除表中的记录
      • 当流和表匹配时,且流中的数据不为空,则更新表中的记录
      • 当流和表不匹配时,则插入流中的数据
    4. 周期性执行第三步中的语句,以保证数据的实时性。可通过 Snowflake 的 SCHEDULED TASK 来实现:

      1. -- 创建一个 TASK,周期性执行 MERGE INTO 语句
      2. -- 每分钟执行一次
      3. schedule = '1 minute'
      4. when
      5. -- TEST_ITEM_STREAM 中无数据时跳过
      6. system$stream_has_data('TEST_ITEM_STREAM')
      7. as
      8. -- 将数据合并到 TEST_ITEM 表,和上文中的 merge into 语句相同
      9. merge into TEST_ITEM n
      10. using
      11. (select RECORD_METADATA:key as k, RECORD_CONTENT:val as v from TEST_ITEM_STREAM) stm
      12. on k:i_id = n.i_id
      13. when matched and IS_NULL_VALUE(v) = true then
      14. delete
      15. when matched and IS_NULL_VALUE(v) = false then
      16. update set n.i_data = v:i_data, n.i_im_id = v:i_im_id, n.i_name = v:i_name, n.i_price = v:i_price
      17. when not matched then
      18. insert
      19. (i_data, i_id, i_im_id, i_name, i_price)
      20. values
      21. (v:i_data, v:i_id, v:i_im_id, v:i_name, v:i_price)
      22. ;

    至此,你就建立了一条具备一定 ETL 能力的数据通路,使得 TiDB 的增量数据变更日志能够被输出到 Snowflake,并且维护一个 TiDB 表的数据副本,实现在 Snowflake 中使用 TiDB 表的数据。最后一步操作是定期清理 TIDB_TEST_ITEM 表中的无用数据:

    1. -- 每两小时清空表 TIDB_TEST_ITEM
    2. create or replace task TRUNCATE_TIDB_TEST_ITEM
    3. warehouse = test
    4. schedule = '120 minute'
    5. when
    6. system$stream_has_data('TIDB_TEST_ITEM')
    7. as
    8. TRUNCATE table TIDB_TEST_ITEM;

    ksqlDB 是一种面向流式数据处理的数据库。你可以直接在 Confluent Cloud 上创建 ksqlDB 集群,并且直接读取 TiCDC 输出到 Confluent 的增量数据。

    1. 在 Confluent 集群控制面板中选择 ksqlDB,按照引导创建 ksqlDB 集群。

      等待集群状态为 Running 后,进入下一步操作,这个过程可能持续数分钟。

    2. 在 ksqlDB Editor 中执行如下命令,创建一个用于读取 tidb_tpcc_orders Topic 的 STREAM。

      1. CREATE STREAM orders (o_id INTEGER, o_d_id INTEGER, o_w_id INTEGER, o_c_id INTEGER, o_entry_d STRING, o_carrier_id INTEGER, o_ol_cnt INTEGER, o_all_local INTEGER) WITH (kafka_topic='tidb_tpcc_orders', partitions=3, value_format='AVRO');
    3. 执行如下命令查询 orders STREAM 数据:

      Select from orders

    可以观察到 TiDB 中的增量数据实时同步到了 ksqlDB,如上图。至此,就完成了 TiDB 与 ksqlDB 的数据集成。

    SQL Server 是 Microsoft 推出的关系型数据库软件。借助 Confluent 的能力,你只需要创建 SQL Server Sink Connector,就可以将 TiDB 的增量数据输出到 SQL Server。

    1. 连接 SQL Server 服务器,创建名为 tpcc 的数据库:

      1. [ec2-user@ip-172-1-1-1 bin]$ sqlcmd -S 10.61.43.14,1433 -U admin
      2. Password:
      3. 1> create database tpcc
      4. 2> go
      5. 1> select name from master.dbo.sysdatabases
      6. 2> go
      7. name
      8. ----------------------------------------------------------------------
      9. master
      10. tempdb
      11. model
      12. msdb
      13. rdsadmin
      14. tpcc
      15. (6 rows affected)
    2. 在 Confluent 集群控制面板中,选择 Data integration > Connectors > Microsoft SQL Server Sink,进入如下页面:

    3. 选择需要同步到 SQL Server 的 Topic 后,进入下一页面:

      Authentication

    4. 在填写 SQL Server 的连接和认证信息后,进入下一页面。

    5. Configuration 界面,按下表进行配置:

    6. 配置完成后,选择 Continue,等待 Connector 状态变为 RUNNING,这个过程可能持续数分钟。