TiCDC Open Protocol

    TiCDC Open Protocol 以 Event 为基本单位向下游复制数据变更事件,Event 分为三类:

    • Row Changed Event:代表一行的数据变化,在行发生变更时该 Event 被发出,包含变更后该行的相关信息。
    • DDL Event:代表 DDL 变更,在上游成功执行 DDL 后发出,DDL Event 会广播到每一个 MQ Partition 中。
    • Resolved Event:代表一个特殊的时间点,表示在这个时间点前的收到的 Event 是完整的。

    协议约束

    • 在绝大多数情况下,一个版本的 Row Changed Event 只会发出一次,但是特殊情况(节点故障、网络分区等)下,同一版本的 Row Changed Event 可能会多次发送。
    • 同一张表中的每一个版本第一次发出的 Row Changed Event 在 Event 流中一定是按 TS (timestamp) 顺序递增的。
    • Resolved Event 会被周期性的广播到各个 MQ Partition,Resolved Event 意味着任何 TS 小于 Resolved Event TS 的 Event 已经发送给下游。
    • DDL Event 将被广播到各个 MQ Partition。
    • 一行数据的多个 Row Changed Event 一定会被发送到同一个 MQ Partition 中。

    Message 格式定义

    一个 Message 中包含一个或多个 Event,按照以下格式排列:

    Key:

    Value:

    Offset(Byte)0~78~(7+长度1)
    参数长度1Event Value1长度NEvent ValueN
    • 代表第 N 个 Key/Value 的长度
    • 长度及协议版本号均为大端序 int64 类型
    • 当前协议版本号为 1

    本部分介绍 Row Changed Event、DDL Event 和 Resolved Event 的格式定义。

    • Key:

      参数类型说明
      TSNumber造成 Row 变更的事务的 TS
      Schema NameStringRow 所在的 Schema 的名字
      Table NameStringRow 所在的 Table 的名字
    • Value:

      Insert 事件,输出新增的行数据。

      1. {
      2. "u":{
      3. <Column Name>:{
      4. "t":<Column Type>,
      5. "h":<Where Handle>,
      6. "f":<Flag>,
      7. "v":<Column Value>
      8. },
      9. <Column Name>:{
      10. "t":<Column Type>,
      11. "h":<Where Handle>,
      12. "f":<Flag>,
      13. "v":<Column Value>
      14. }
      15. }
      16. }
      1. {
      2. "u":{
      3. <Column Name>:{
      4. "h":<Where Handle>,
      5. "f":<Flag>,
      6. "v":<Column Value>
      7. },
      8. <Column Name>:{
      9. "t":<Column Type>,
      10. "h":<Where Handle>,
      11. "f":<Flag>,
      12. "v":<Column Value>
      13. }
      14. },
      15. <Column Name>:{
      16. "t":<Column Type>,
      17. "h":<Where Handle>,
      18. "f":<Flag>,
      19. "v":<Column Value>
      20. },
      21. <Column Name>:{
      22. "t":<Column Type>,
      23. "h":<Where Handle>,
      24. "f":<Flag>,
      25. "v":<Column Value>
      26. }
      27. }
      28. }

      Delete 事件,输出被删除的行数据。当 Old Value 特性开启时,Delete 事件中包含被删除的行数据中的所有列;当 Old Value 特性关闭时,Delete 事件中仅包含 列。

      1. {
      2. "d":{
      3. <Column Name>:{
      4. "t":<Column Type>,
      5. "h":<Where Handle>,
      6. "f":<Flag>,
      7. "v":<Column Value>
      8. },
      9. <Column Name>:{
      10. "t":<Column Type>,
      11. "h":<Where Handle>,
      12. "f":<Flag>,
      13. "v":<Column Value>
      14. }
      15. }
      16. }
    • Key:

      1. {
      2. "tbl":<Table Name>,
      3. "t":2
      4. }
      参数类型说明
      TSNumber进行 DDL 变更的事务的 TS
      Schema NameStringDDL 变更的 Schema 的名字,可能为空字符串
      Table NameStringDDL 变更的 Table 的名字,可能为空字符串
    • Value:

      参数类型说明
      DDL QueryStringDDL Query SQL
      DDL TypeStringDDL 类型,详见:
    • Key:

      1. {
      2. "ts":<TS>,
      3. "t":3
      4. }
    • Value: None

    Event 流的输出示例

    本部分展示并描述 Event 流的输出日志。

    假设在上游执行以下 SQL 语句,MQ Partition 数量为 2:

    1. CREATE TABLE test.t1(id int primary key, val varchar(16));

    如以下执行日志中的 Log 1、Log 3 所示,DDL Event 将被广播到所有 MQ Partition,Resolved Event 会被周期性地广播到各个 MQ Partition:

    1. 1. [partition=0] [key="{\"ts\":415508856908021766,\"scm\":\"test\",\"tbl\":\"t1\",\"t\":2}"] [value="{\"q\":\"CREATE TABLE test.t1(id int primary key, val varchar(16))\",\"t\":3}"]
    2. 2. [partition=0] [key="{\"ts\":415508856908021766,\"t\":3}"] [value=]
    3. 3. [partition=1] [key="{\"ts\":415508856908021766,\"scm\":\"test\",\"tbl\":\"t1\",\"t\":2}"] [value="{\"q\":\"CREATE TABLE test.t1(id int primary key, val varchar(16))\",\"t\":3}"]
    4. 4. [partition=1] [key="{\"ts\":415508856908021766,\"t\":3}"] [value=]
    1. BEGIN;
    2. INSERT INTO test.t1(id, val) VALUES (1, 'aa');
    3. INSERT INTO test.t1(id, val) VALUES (2, 'aa');
    4. UPDATE test.t1 SET val = 'bb' WHERE id = 2;
    5. INSERT INTO test.t1(id, val) VALUES (3, 'cc');
    6. COMMIT;
    • 如以下执行日志中的 Log 5 和 Log 6 所示,同一张表内的 Row Changed Event 可能会根据主键被分派到不同的 Partition,但同一行的变更一定会分派到同一个 Partition,方便下游并发处理。
    • 如 Log 6 所示,在一个事务内对同一行进行多次修改,只会发出一个 Row Changed Event。
    • Log 8 是 Log 7 的重复 Event。Row Changed Event 可能重复,但每个版本的 Event 第一次发出的次序一定是有序的。

    在上游执行以下 SQL 语句:

    1. BEGIN;
    2. DELETE FROM test.t1 WHERE id = 1;
    3. UPDATE test.t1 SET val = 'dd' WHERE id = 3;
    4. UPDATE test.t1 SET id = 4, val = 'ee' WHERE id = 2;
    5. COMMIT;
    • Log 9 是 Delete 类型的 Row Changed Event,这种类型的 Event 只包含主键列或唯一索引列。
    • Log 13 和 Log 14 是 Resolved Event。Resolved Event 意味着在这个 Partition 中,任意小于 Resolved TS 的 Event(包括 Row Changed Event 和 DDL Event)已经发送完毕。
    1. 9. [partition=0] [key="{\"ts\":415508881418485761,\"scm\":\"test\",\"tbl\":\"t1\",\"t\":1}"] [value="{\"d\":{\"id\":{\"t\":3,\"h\":true,\"v\":1}}}"]
    2. 10. [partition=1] [key="{\"ts\":415508881418485761,\"scm\":\"test\",\"tbl\":\"t1\",\"t\":1}"] [value="{\"d\":{\"id\":{\"t\":3,\"h\":true,\"v\":2}}}"]
    3. 11. [partition=0] [key="{\"ts\":415508881418485761,\"scm\":\"test\",\"tbl\":\"t1\",\"t\":1}"] [value="{\"u\":{\"id\":{\"t\":3,\"h\":true,\"v\":3},\"val\":{\"t\":15,\"v\":\"ZGQ=\"}}}"]
    4. 12. [partition=0] [key="{\"ts\":415508881418485761,\"scm\":\"test\",\"tbl\":\"t1\",\"t\":1}"] [value="{\"u\":{\"id\":{\"t\":3,\"h\":true,\"v\":4},\"val\":{\"t\":15,\"v\":\"ZWU=\"}}}"]
    5. 13. [partition=0] [key="{\"ts\":415508881038376963,\"t\":3}"] [value=]
    6. 14. [partition=1] [key="{\"ts\":415508881038376963,\"t\":3}"] [value=]

    消费端协议解析

    目前 TiCDC 没有提供 Open Protocol 协议解析的标准实现,但是提供了 Golang 版本和 Java 版本的解析例子。你可以参考本文档提供的数据格式和以下例子实现消费端协议解析。

    Column 的类型码用于标识 Row Changed Event 中列的数据类型。

    类型Code输出示例说明
    TINYINT/BOOL1{“t”:1,”v”:1}
    SMALLINT2{“t”:2,”v”:1}
    INT3{“t”:3,”v”:123}
    FLOAT4{“t”:4,”v”:153.123}
    DOUBLE5{“t”:5,”v”:153.123}
    NULL6{“t”:6,”v”:null}
    TIMESTAMP7{“t”:7,”v”:”1973-12-30 15:30:00”}
    BIGINT8{“t”:8,”v”:123}
    MEDIUMINT9{“t”:9,”v”:123}
    DATE10/14{“t”:10,”v”:”2000-01-01”}
    TIME11{“t”:11,”v”:”23:59:59”}
    DATETIME12{“t”:12,”v”:”2015-12-20 23:58:58”}
    YEAR13{“t”:13,”v”:1970}
    VARCHAR/VARBINARY15/253{“t”:15,”v”:”测试”} / {“t”:15,”v”:”\x89PNG\r\n\x1a\n”}value 编码为 UTF-8;当上游类型为 VARBINARY 时,将对不可见的字符转义
    BIT16{“t”:16,”v”:81}
    JSON245{“t”:245,”v”:”{\”key1\”: \”value1\”}”}
    DECIMAL246{“t”:246,”v”:”129012.1230000”}
    ENUM247{“t”:247,”v”:1}
    SET248{“t”:248,”v”:3}
    TINYTEXT/TINYBLOB249{“t”:249,”v”:”5rWL6K+VdGV4dA==”}value 编码为 Base64
    MEDIUMTEXT/MEDIUMBLOB250{“t”:250,”v”:”5rWL6K+VdGV4dA==”}value 编码为 Base64
    LONGTEXT/LONGBLOB251{“t”:251,”v”:”5rWL6K+VdGV4dA==”}value 编码为 Base64
    TEXT/BLOB252{“t”:252,”v”:”5rWL6K+VdGV4dA==”}value 编码为 Base64
    CHAR/BINARY254{“t”:254,”v”:”测试”} / {“t”:254,”v”:”\x89PNG\r\n\x1a\n”}value 编码为 UTF-8;当上游类型为 BINARY 时,将对不可见的字符转义
    GEOMETRY255尚不支持

    DDL 的类型码

    DDL 的类型码用于标识 DDL Event 中的 DDL 语句的类型。

    类型Code
    Create Schema1
    Drop Schema2
    Create Table3
    Drop Table4
    Add Column5
    Drop Column6
    Add Index7
    Drop Index8
    Add Foreign Key9
    Drop Foreign Key10
    Truncate Table11
    Modify Column12
    Rebase Auto ID13
    Rename Table14
    Set Default Value15
    Shard RowID16
    Modify Table Comment17
    Rename Index18
    Add Table Partition19
    Drop Table Partition20
    Create View21
    Modify Table Charset And Collate22
    Truncate Table Partition23
    Drop View24
    Recover Table25
    Modify Schema Charset And Collate26
    Lock Table27
    Unlock Table28
    Repair Table29
    Set TiFlash Replica30
    Update TiFlash Replica Status31
    Add Primary Key32
    Drop Primary Key33
    Create Sequence34
    Alter Sequence35
    Drop Sequence36

    列标志位

    列标志位以 Bit flags 形式标记列的相关属性。

    示例:

    若某列 Flag 值为 85,则代表这一列为可空列、唯一索引列、生成列、二进制编码列。

    1. 85 == 0b_101_0101
    2. == NullableFlag | UniqueKeyFlag | GeneratedColumnFlag | BinaryFlag

    若某列 Flag 值为 46,则代表这一列为组合索引列、主键列、生成列、Handle 列。

    1. 46 == 0b_010_1110
    2. == MultipleKeyFlag | PrimaryKeyFlag | GeneratedColumnFlag | HandleKeyFlag

    注意

    • 若要同步上游的一张表,TiCDC 会选择一个有效索引作为 Handle Index。Handle Index 包含的列的 HandleKeyFlag 置 1