Binlog Consumer Client 用户文档
修改 Drainer 的配置文件,设置输出为 Kafka,相关配置如下:
自定义开发
首先需要了解 Drainer 写入到 Kafka 中的数据格式:
message Column {
// 数据是否为 null
optional bool is_null = 1 [ default = false ];
// 保存 int 类型的数据
optional int64 int64_value = 2;
// 保存 uint、enum, set 类型的数据
optional uint64 uint64_value = 3;
// 保存 float、double 类型的数据
optional double double_value = 4;
// 保存 bit、blob、binary、json 类型的数据
optional bytes bytes_value = 5;
// 保存 date、time、decimal、text、char 类型的数据
optional string string_value = 6;
}
// ColumnInfo 保存列的信息,包括列名、类型、是否为主键
message ColumnInfo {
optional string name = 1 [ (gogoproto.nullable) = false ];
// MySQL 中小写的列字段类型
// https://dev.mysql.com/doc/refman/8.0/en/data-types.html
// numeric 类型:int bigint smallint tinyint float double decimal bit
// string 类型:text longtext mediumtext char tinytext varchar
// blob longblob mediumblob binary tinyblob varbinary
// enum set
// json 类型:json
optional string mysql_type = 2 [ (gogoproto.nullable) = false ];
optional bool is_primary_key = 3 [ (gogoproto.nullable) = false ];
}
// Row 保存一行的具体数据
message Row { repeated Column columns = 1; }
// MutationType 表示 DML 的类型
enum MutationType {
Insert = 0;
Update = 1;
Delete = 2;
}
// Table 包含一个表的数据变更
message Table {
optional string schema_name = 1;
optional string table_name = 2;
repeated ColumnInfo column_info = 3;
repeated TableMutation mutations = 4;
}
// TableMutation 保存一行数据的变更
message TableMutation {
required MutationType type = 1;
// 修改后的数据
required Row row = 2;
// 修改前的数据,只对 Update MutationType 有效
optional Row change_row = 3;
}
// DMLData 保存一个事务所有的 DML 造成的数据变更
// `tables` 包含事务中所有表的数据变更
repeated Table tables = 1;
}
message DDLData {
// 当前使用的数据库
optional string schema_name = 1;
// 相关表
optional string table_name = 2;
// `ddl_query` 是原始的 DDL 语句 query
optional bytes ddl_query = 3;
}
// BinlogType 为 Binlog 的类型,分为 DML 和 DDL
enum BinlogType {
DML = 0; // Has `dml_data`
DDL = 1; // Has `ddl_query`
}
// Binlog 保存一个事务所有的变更,Kafka 中保存的数据为该结构数据序列化后的结果
message Binlog {
optional BinlogType type = 1 [ (gogoproto.nullable) = false ];
optional int64 commit_ts = 2 [ (gogoproto.nullable) = false ];
optional DMLData dml_data = 3;
optional DDLData ddl_data = 4;
}
Driver
TiDB-Tools 项目提供了用于读取 Kafka 中 binlog 数据的 Driver,具有如下功能:
- 读取 Kafka 的数据
- 根据 commit ts 查找 binlog 在 kafka 中的储存位置
使用该 Driver 时,用户需要配置如下信息:
- KafkaAddr:Kafka 集群的地址
- CommitTS:从哪个 commit ts 开始读取 binlog
- Offset:从 Kafka 哪个 offset 开始读取,如果设置了 CommitTS 就不用配置该参数
- ClusterID:TiDB 集群的 cluster ID
- Topic: Kafka Topic 名称,如果 Topic 名称为空,将会使用 drainer
<ClusterID>_obinlog
中的默认名称
- 使用该 Driver 将数据同步到 MySQL,该示例包含将 binlog 转化为 SQL 的具体方法
- 使用该 Driver 将数据打印出来
Driver 项目地址:Binlog Slave Driver。
- 示例代码仅仅用于示范如何使用 Driver,如果需要用于生产环境需要优化代码。
- 目前仅提供了 golang 版本的 Driver 以及示例代码。如果需要使用其他语言,用户需要根据 binlog 的 proto 文件生成相应语言的代码文件,并自行开发程序读取 Kafka 中的 binlog 数据、解析数据、输出到下游。也欢迎用户优化 example 代码,以及提交其他语言的示例代码到 。