与 Confluent Cloud 和 Snowflake 进行数据集成
- 快速搭建包含 TiCDC 的 TiDB 集群
- 创建将数据输出到 Confluent Cloud 的 changefeed
- 创建将数据从 Confluent Cloud 输出到 Snowflake、ksqlDB 和 SQL Server 的连接器 (Connector)
- 使用 go-tpc 写入数据到上游 TiDB,并观察 Snowflake、ksqlDB 和 SQL Server 中的数据
上述过程将会基于实验环境进行,你也可以参考上述执行步骤,搭建生产级别的集群。
部署包含 TiCDC 的 TiDB 集群。
在实验或测试环境中,可以使用 TiUP Playground 功能快速部署 TiCDC,命令如下:
如果尚未安装 TiUP,可以参考安装 TiUP。在生产环境下,可以参考,完成 TiCDC 集群部署工作。
注册 Confluent Cloud 并创建 Confluent 集群。
创建 Basic 集群并开放 Internet 访问,详见 Quick Start for Confluent Cloud。
第 2 步:创建 Access Key Pair
创建 Cluster API Key。
在 Confluent 集群控制面板中依次点击 Data integration > API keys > Create key。在弹出的 Select scope for API key 页面,选择 Global access。
创建成功后会得到一个 Key Pair 文件,内容如下:
API key:
L5WWA4GK4NAT2EQV
API secret:
xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx
Bootstrap server:
xxx-xxxxx.ap-east-1.aws.confluent.cloud:9092
记录 Schema Registry Endpoints。
在 Confluent 集群控制面板中,选择 Schema Registry > API endpoint,记录 Schema Registry Endpoints,如下:
https://yyy-yyyyy.us-east-2.aws.confluent.cloud
创建 Schema Registry API key。
在 Confluent 集群控制面板中,选择 Schema Registry > API credentials,点击 Edit 和 Create key。
创建成功后会得到一个 Key Pair 文件,内容如下:
=== Confluent Cloud API key: yyy-yyyyy ===
API key:
7NBH2CAFM2LMGTH7
API secret:
xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx
以上步骤也可以通过 Confluent CLI 实现,详见 Connect Confluent CLI to Confluent Cloud Cluster。
创建 changefeed 配置文件。
根据 Avro 协议和 Confluent Connector 的要求和规范,每张表的增量数据需要发送到独立的 Topic 中,并且每个事件需要按照主键值分发 Partition。因此,需要创建一个名为
changefeed.conf
的配置文件,填写如下内容:[sink]
dispatchers = [
{matcher = ['*.*'], topic = "tidb_{schema}_{table}", partition="index-value"},
]
关于配置文件中
dispatchers
的详细解释,参考自定义 Kafka Sink 的 Topic 和 Partition 的分发规则。创建一个 changefeed,将增量数据输出到 Confluent Cloud:
tiup ctl:v<CLUSTER_VERSION> cdc changefeed create --server="http://127.0.0.1:8300" --sink-uri="kafka://<broker_endpoint>/ticdc-meta?protocol=avro&replication-factor=3&enable-tls=true&auto-create-topic=true&sasl-mechanism=plain&sasl-user=<broker_api_key>&sasl-password=<broker_api_secret>" --schema-registry="https://<schema_registry_api_key>:<schema_registry_api_secret>@<schema_registry_endpoint>" --changefeed-id="confluent-changefeed" --config changefeed.conf
将如下字段替换为中创建和记录的值:
<broker_endpoint>
<broker_api_key>
<broker_api_secret>
<schema_registry_api_key>
<schema_registry_api_secret>
<schema_registry_endpoint>
其中
<schema_registry_api_secret>
需要经过 HTML URL 编码后再替换,替换完毕后示例如下:如果命令执行成功,将会返回被创建的 changefeed 的相关信息,包含被创建的 changefeed 的 ID 以及相关信息,内容如下:
Create changefeed successfully!
ID: confluent-changefeed
Info: {... changfeed info json struct ...}
如果命令长时间没有返回,请检查当前执行命令所在服务器到 Confluent Cloud 之间网络可达性,参考 。
第 4 步:写入数据以产生变更日志
完成以上步骤后,TiCDC 会将上游 TiDB 的增量数据变更日志发送到 Confluent Cloud。本小节将对 TiDB 写入数据,以产生增量数据变更日志。
模拟业务负载。
在测试实验环境下,可以使用 go-tpc 向上游 TiDB 集群写入数据,以让 TiDB 产生事件变更数据。执行以下命令,会首先在上游 TiDB 创建名为
tpcc
的数据库,然后使用 TiUP bench 写入数据到这个数据库中。tiup bench tpcc -H 127.0.0.1 -P 4000 -D tpcc --warehouses 4 prepare
tiup bench tpcc -H 127.0.0.1 -P 4000 -D tpcc --warehouses 4 run --time 300s
关于 go-tpc 的更多详细内容,可以参考。
观察 Confluent 中数据传输情况。
在 Confluent 集群控制面板中,可以观察到相应的 Topic 已经被自动创建,并有数据正在写入。至此,TiDB 数据库中的增量数据就被成功输出到了 Confluent Cloud。
Snowflake 是一种云原生数据仓库。借助 Confluent 的能力,你只需要创建 Snowflake Sink Connector,就可以将 TiDB 的增量数据输出到 Snowflake。
注册和创建 Snowflake 集群,参考 Getting Started with Snowflake。
连接到 Snowflake 前,为 Snowflake 添加 Private Key,参考 。
集成步骤
在 Snowflake 中创建 Database 和 Schema。
在 Snowflake 控制面板中,选择 Data > Database。创建名为
TPCC
的 Database 和名为TiCDC
的 Schema。在 Confluent 集群控制面板中,选择 Data integration > Connectors > Snowflake Sink,进入如下页面:
选择需要同步到 Snowflake 的 Topic 后,进入下一页面:
填写 Snowflake 连接认证信息,其中 Database name 和 Schema name 填写在上一步创建的 Database 和 Schema 名,随后进入下一页面:
在 Configuration 页面中,
record value format
和record key format
都选择AVRO
,点击 Continue,直到 Connector 创建完成。等待 Connector 状态变为RUNNING
,这个过程可能持续数分钟。在 Snowflake 控制面板中,选择 Data > Database > TPCC > TiCDC,可以观察到 TiDB 中的增量数据实时同步到了 Snowflake,如上图。但 Snowflake 中的表结构和 TiDB 中的表结构不同,数据也以“追加”的方式插入 Snowflake 表。在大多数业务场景中,都希望 Snowflake 中的表数据是 TiDB 表的一个副本,而不是存储 TiDB 表的变更日志。该问题将在下一章节解决。
在上一章节,TiDB 的增量变更日志已经被同步到 Snowflake 中,本章节将介绍如何借助 Snowflake 的 TASK 和 STREAM 功能,将实时写入 Snowflake 的数据变更日志根据 INSERT
、UPDATE
和 DELETE
等事件类型分别处理,写入一个与上游 TiDB 结构相同的表中,从而在 Snowflake 中创建一个数据副本。下面以 ITEM
表为例。
`i_id` int(11) NOT NULL,
`i_im_id` int(11) DEFAULT NULL,
`i_name` varchar(24) DEFAULT NULL,
`i_price` decimal(5,2) DEFAULT NULL,
`i_data` varchar(50) DEFAULT NULL,
PRIMARY KEY (`i_id`)
);
Snowflake 中存在一张名为 TIDB_TEST_ITEM
的表,这张表是 Confluent Snowflake Sink Connector 自动创建的,表结构如下:
create or replace TABLE TIDB_TEST_ITEM (
RECORD_METADATA VARIANT,
RECORD_CONTENT VARIANT
);
根据 TiDB 中的表结构,在 Snowflake 中创建结构相同的表:
为
TIDB_TEST_ITEM
创建一个 STREAM,将append_only
设为true
,表示仅接收INSERT
事件。创建的 STREAM 可以实时捕获TIDB_TEST_ITEM
的INSERT
事件,也就是说,当 TiDB 中ITEM
有新的变更日志时,变更日志将会被插入到TIDB_TEST_ITEM
表,然后被 STREAM 捕获。create or replace stream TEST_ITEM_STREAM on table TIDB_TEST_ITEM append_only=true;
处理 STREAM 中的数据,根据不同的事件类型,在
TEST_ITEM
表中插入、更新或删除 STREAM 数据。--将数据合并到 TEST_ITEM 表
merge into TEST_ITEM n
using
-- 查询 TEST_ITEM_STREAM
(SELECT RECORD_METADATA:key as k, RECORD_CONTENT:val as v from TEST_ITEM_STREAM) stm
-- 以 i_id 相等为条件将流和表做匹配
on k:i_id = n.i_id
-- 如果 TEST_ITEM 表中存在匹配 i_id 的记录,并且 v 为空,则删除这条记录
when matched and IS_NULL_VALUE(v) = true then
delete
-- 如果 TEST_ITEM 表中存在匹配 i_id 的记录,并且 v 不为空,则更新这条记录
when matched and IS_NULL_VALUE(v) = false then
update set n.i_data = v:i_data, n.i_im_id = v:i_im_id, n.i_name = v:i_name, n.i_price = v:i_price
-- 如果 TEST_ITEM 表中不存在匹配 i_id 的记录,则插入这条记录
when not matched then
insert
(i_data, i_id, i_im_id, i_name, i_price)
values
(v:i_data, v:i_id, v:i_im_id, v:i_name, v:i_price)
;
在上面的语句中,我们使用了 Snowflake 的
MERGE INTO
语句,这个语句可以根据条件将流和表做匹配,然后根据不同的匹配结果,执行不同的操作,比如删除、更新或插入。在这个例子中,我们使用了三个WHEN
子句,分别对应了三种情况:- 当流和表匹配时,且流中的数据为空,则删除表中的记录
- 当流和表匹配时,且流中的数据不为空,则更新表中的记录
- 当流和表不匹配时,则插入流中的数据
周期性执行第三步中的语句,以保证数据的实时性。可通过 Snowflake 的
SCHEDULED TASK
来实现:-- 创建一个 TASK,周期性执行 MERGE INTO 语句
-- 每分钟执行一次
schedule = '1 minute'
when
-- 当 TEST_ITEM_STREAM 中无数据时跳过
system$stream_has_data('TEST_ITEM_STREAM')
as
-- 将数据合并到 TEST_ITEM 表,和上文中的 merge into 语句相同
merge into TEST_ITEM n
using
(select RECORD_METADATA:key as k, RECORD_CONTENT:val as v from TEST_ITEM_STREAM) stm
on k:i_id = n.i_id
when matched and IS_NULL_VALUE(v) = true then
delete
when matched and IS_NULL_VALUE(v) = false then
update set n.i_data = v:i_data, n.i_im_id = v:i_im_id, n.i_name = v:i_name, n.i_price = v:i_price
when not matched then
insert
(i_data, i_id, i_im_id, i_name, i_price)
values
(v:i_data, v:i_id, v:i_im_id, v:i_name, v:i_price)
;
至此,你就建立了一条具备一定 ETL 能力的数据通路,使得 TiDB 的增量数据变更日志能够被输出到 Snowflake,并且维护一个 TiDB 表的数据副本,实现在 Snowflake 中使用 TiDB 表的数据。最后一步操作是定期清理 TIDB_TEST_ITEM
表中的无用数据:
-- 每两小时清空表 TIDB_TEST_ITEM
create or replace task TRUNCATE_TIDB_TEST_ITEM
warehouse = test
schedule = '120 minute'
when
system$stream_has_data('TIDB_TEST_ITEM')
as
TRUNCATE table TIDB_TEST_ITEM;
ksqlDB 是一种面向流式数据处理的数据库。你可以直接在 Confluent Cloud 上创建 ksqlDB 集群,并且直接读取 TiCDC 输出到 Confluent 的增量数据。
在 Confluent 集群控制面板中选择 ksqlDB,按照引导创建 ksqlDB 集群。
等待集群状态为 Running 后,进入下一步操作,这个过程可能持续数分钟。
在 ksqlDB Editor 中执行如下命令,创建一个用于读取
tidb_tpcc_orders
Topic 的 STREAM。CREATE STREAM orders (o_id INTEGER, o_d_id INTEGER, o_w_id INTEGER, o_c_id INTEGER, o_entry_d STRING, o_carrier_id INTEGER, o_ol_cnt INTEGER, o_all_local INTEGER) WITH (kafka_topic='tidb_tpcc_orders', partitions=3, value_format='AVRO');
执行如下命令查询 orders STREAM 数据:
可以观察到 TiDB 中的增量数据实时同步到了 ksqlDB,如上图。至此,就完成了 TiDB 与 ksqlDB 的数据集成。
SQL Server 是 Microsoft 推出的关系型数据库软件。借助 Confluent 的能力,你只需要创建 SQL Server Sink Connector,就可以将 TiDB 的增量数据输出到 SQL Server。
连接 SQL Server 服务器,创建名为
tpcc
的数据库:[ec2-user@ip-172-1-1-1 bin]$ sqlcmd -S 10.61.43.14,1433 -U admin
Password:
1> create database tpcc
2> go
1> select name from master.dbo.sysdatabases
2> go
name
----------------------------------------------------------------------
master
tempdb
model
msdb
rdsadmin
tpcc
(6 rows affected)
在 Confluent 集群控制面板中,选择 Data integration > Connectors > Microsoft SQL Server Sink,进入如下页面:
选择需要同步到 SQL Server 的 Topic 后,进入下一页面:
在填写 SQL Server 的连接和认证信息后,进入下一页面。
在 Configuration 界面,按下表进行配置:
配置完成后,选择 Continue,等待 Connector 状态变为 RUNNING,这个过程可能持续数分钟。