事务限制

    TiDB 支持的隔离级别是 RC(Read Committed)与 SI(Snapshot Isolation),其中 SI 与 RR(Repeatable Read)隔离级别基本等价。

    SI 可以克服幻读

    TiDB 的 SI 隔离级别可以克服幻读异常 (Phantom Reads),但 ANSI/ISO SQL 标准中的 RR 不能。

    所谓幻读是指:事务 A 首先根据条件查询得到 n 条记录,然后事务 B 改变了这 n 条记录之外的 m 条记录或者增添了 m 条符合事务 A 查询条件的记录,导致事务 A 再次发起请求时发现有 n+m 条符合条件记录,就产生了幻读。

    例如:系统管理员 A 将数据库中所有学生的成绩从具体分数改为 ABCDE 等级,但是系统管理员 B 就在这个时候插入了一条具体分数的记录,当系统管理员 A 改结束后发现还有一条记录没有改过来,就好像发生了幻觉一样,这就叫幻读。

    TiDB 的 SI 隔离级别不能克服写偏斜异常(Write Skew),需要使用 Select for update 语法来克服写偏斜异常。

    写偏斜异常是指两个并发的事务读取了不同但相关的记录,接着这两个事务各自更新了自己读到的数据,并最终都提交了事务,如果这些相关的记录之间存在着不能被多个事务并发修改的约束,那么最终结果将是违反约束的。

    举个例子,假设你正在为医院写一个医生轮班管理程序。医院通常会同时要求几位医生待命,但底线是至少有一位医生在待命。医生可以放弃他们的班次(例如,如果他们自己生病了),只要至少有一个同事在这一班中继续工作。

    现在出现这样一种情况,Alice 和 Bob 是两位值班医生。两人都感到不适,所以他们都决定请假。不幸的是,他们恰好在同一时间点击按钮下班。下面用程序来模拟一下这个过程。

    • Java
    • Golang

    在 Golang 中,首先,封装一个用于适配 TiDB 事务的工具包 ,随后编写以下代码:

    1. import (
    2. "database/sql"
    3. "fmt"
    4. "sync"
    5. "github.com/pingcap-inc/tidb-example-golang/util"
    6. _ "github.com/go-sql-driver/mysql"
    7. )
    8. func main() {
    9. openDB("mysql", "root:@tcp(127.0.0.1:4000)/test", func(db *sql.DB) {
    10. writeSkew(db)
    11. })
    12. }
    13. func openDB(driverName, dataSourceName string, runnable func(db *sql.DB)) {
    14. db, err := sql.Open(driverName, dataSourceName)
    15. if err != nil {
    16. panic(err)
    17. }
    18. defer db.Close()
    19. runnable(db)
    20. }
    21. func writeSkew(db *sql.DB) {
    22. err := prepareData(db)
    23. if err != nil {
    24. panic(err)
    25. }
    26. waitingChan, waitGroup := make(chan bool), sync.WaitGroup{}
    27. waitGroup.Add(1)
    28. go func() {
    29. defer waitGroup.Done()
    30. err = askForLeave(db, waitingChan, 1, 1)
    31. if err != nil {
    32. panic(err)
    33. }
    34. }()
    35. waitGroup.Add(1)
    36. go func() {
    37. defer waitGroup.Done()
    38. err = askForLeave(db, waitingChan, 2, 2)
    39. if err != nil {
    40. panic(err)
    41. }
    42. }()
    43. waitGroup.Wait()
    44. }
    45. func askForLeave(db *sql.DB, waitingChan chan bool, goroutineID, doctorID int) error {
    46. txnComment := fmt.Sprintf("/* txn %d */ ", goroutineID)
    47. if goroutineID != 1 {
    48. txnComment = "\t" + txnComment
    49. }
    50. txn, err := util.TiDBSqlBegin(db, true)
    51. if err != nil {
    52. return err
    53. }
    54. fmt.Println(txnComment + "start txn")
    55. // Txn 1 should be waiting until txn 2 is done.
    56. if goroutineID == 1 {
    57. <-waitingChan
    58. }
    59. txnFunc := func() error {
    60. queryCurrentOnCall := "SELECT COUNT(*) AS `count` FROM `doctors` WHERE `on_call` = ? AND `shift_id` = ?"
    61. rows, err := txn.Query(queryCurrentOnCall, true, 123)
    62. if err != nil {
    63. return err
    64. }
    65. defer rows.Close()
    66. fmt.Println(txnComment + queryCurrentOnCall + " successful")
    67. count := 0
    68. if rows.Next() {
    69. err = rows.Scan(&count)
    70. if err != nil {
    71. return err
    72. }
    73. }
    74. rows.Close()
    75. if count < 2 {
    76. return fmt.Errorf("at least one doctor is on call")
    77. }
    78. shift := "UPDATE `doctors` SET `on_call` = ? WHERE `id` = ? AND `shift_id` = ?"
    79. _, err = txn.Exec(shift, false, doctorID, 123)
    80. if err == nil {
    81. fmt.Println(txnComment + shift + " successful")
    82. }
    83. return err
    84. }
    85. err = txnFunc()
    86. if err == nil {
    87. txn.Commit()
    88. fmt.Println("[runTxn] commit success")
    89. } else {
    90. txn.Rollback()
    91. fmt.Printf("[runTxn] got an error, rollback: %+v\n", err)
    92. }
    93. // Txn 2 is done. Let txn 1 run again.
    94. if goroutineID == 2 {
    95. waitingChan <- true
    96. }
    97. return nil
    98. }
    99. func prepareData(db *sql.DB) error {
    100. err := createDoctorTable(db)
    101. if err != nil {
    102. return err
    103. }
    104. err = createDoctor(db, 1, "Alice", true, 123)
    105. if err != nil {
    106. return err
    107. }
    108. err = createDoctor(db, 2, "Bob", true, 123)
    109. if err != nil {
    110. return err
    111. }
    112. err = createDoctor(db, 3, "Carol", false, 123)
    113. if err != nil {
    114. return err
    115. }
    116. return nil
    117. }
    118. func createDoctorTable(db *sql.DB) error {
    119. _, err := db.Exec("CREATE TABLE IF NOT EXISTS `doctors` (" +
    120. " `id` int(11) NOT NULL," +
    121. " `name` varchar(255) DEFAULT NULL," +
    122. " `on_call` tinyint(1) DEFAULT NULL," +
    123. " `shift_id` int(11) DEFAULT NULL," +
    124. " PRIMARY KEY (`id`)," +
    125. " ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin")
    126. return err
    127. }
    128. func createDoctor(db *sql.DB, id int, name string, onCall bool, shiftID int) error {
    129. _, err := db.Exec("INSERT INTO `doctors` (`id`, `name`, `on_call`, `shift_id`) VALUES (?, ?, ?, ?)",
    130. id, name, onCall, shiftID)
    131. }

    SQL 日志:

    1. /* txn 1 */ BEGIN
    2. /* txn 2 */ BEGIN
    3. /* txn 2 */ SELECT COUNT(*) as `count` FROM `doctors` WHERE `on_call` = 1 AND `shift_id` = 123
    4. /* txn 2 */ UPDATE `doctors` SET `on_call` = 0 WHERE `id` = 2 AND `shift_id` = 123
    5. /* txn 2 */ COMMIT
    6. /* txn 1 */ SELECT COUNT(*) AS `count` FROM `doctors` WHERE `on_call` = 1 and `shift_id` = 123
    7. /* txn 1 */ UPDATE `doctors` SET `on_call` = 0 WHERE `id` = 1 AND `shift_id` = 123
    8. /* txn 1 */ COMMIT

    执行结果:

    在两个事务中,应用首先检查是否有两个或以上的医生正在值班;如果是的话,它就假定一名医生可以安全地休班。由于数据库使用快照隔离,两次检查都返回 2,所以两个事务都进入下一个阶段。Alice 更新自己的记录休班了,而 Bob 也做了一样的事情。两个事务都成功提交了,现在没有医生值班了。违反了至少有一名医生在值班的要求。下图(引用自《Designing Data-Intensive Application》)说明了实际发生的情况:

    Write Skew

    现在更改示例程序,使用 SELECT FOR UPDATE 来克服写偏斜问题:

    • Java
    • Golang

    Java 中使用 SELECT FOR UPDATE 来克服写偏斜问题的示例如下:

    1. package com.pingcap.txn.write.skew;
    2. import com.zaxxer.hikari.HikariDataSource;
    3. import java.sql.Connection;
    4. import java.sql.PreparedStatement;
    5. import java.sql.ResultSet;
    6. import java.sql.SQLException;
    7. import java.util.concurrent.CountDownLatch;
    8. import java.util.concurrent.ExecutorService;
    9. import java.util.concurrent.Executors;
    10. import java.util.concurrent.Semaphore;
    11. public class EffectWriteSkew {
    12. public static void main(String[] args) throws SQLException, InterruptedException {
    13. HikariDataSource ds = new HikariDataSource();
    14. ds.setJdbcUrl("jdbc:mysql://localhost:4000/test?useServerPrepStmts=true&cachePrepStmts=true");
    15. ds.setUsername("root");
    16. // prepare data
    17. Connection connection = ds.getConnection();
    18. createDoctorTable(connection);
    19. createDoctor(connection, 1, "Alice", true, 123);
    20. createDoctor(connection, 2, "Bob", true, 123);
    21. createDoctor(connection, 3, "Carol", false, 123);
    22. Semaphore txn1Pass = new Semaphore(0);
    23. CountDownLatch countDownLatch = new CountDownLatch(2);
    24. ExecutorService threadPool = Executors.newFixedThreadPool(2);
    25. threadPool.execute(() -> {
    26. askForLeave(ds, txn1Pass, 1, 1);
    27. countDownLatch.countDown();
    28. });
    29. threadPool.execute(() -> {
    30. askForLeave(ds, txn1Pass, 2, 2);
    31. countDownLatch.countDown();
    32. });
    33. countDownLatch.await();
    34. }
    35. public static void createDoctorTable(Connection connection) throws SQLException {
    36. connection.createStatement().executeUpdate("CREATE TABLE `doctors` (" +
    37. " `id` int(11) NOT NULL," +
    38. " `name` varchar(255) DEFAULT NULL," +
    39. " `on_call` tinyint(1) DEFAULT NULL," +
    40. " `shift_id` int(11) DEFAULT NULL," +
    41. " PRIMARY KEY (`id`)," +
    42. " KEY `idx_shift_id` (`shift_id`)" +
    43. " ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin");
    44. }
    45. public static void createDoctor(Connection connection, Integer id, String name, Boolean onCall, Integer shiftID) throws SQLException {
    46. PreparedStatement insert = connection.prepareStatement(
    47. "INSERT INTO `doctors` (`id`, `name`, `on_call`, `shift_id`) VALUES (?, ?, ?, ?)");
    48. insert.setInt(1, id);
    49. insert.setString(2, name);
    50. insert.setBoolean(3, onCall);
    51. insert.setInt(4, shiftID);
    52. insert.executeUpdate();
    53. }
    54. public static void askForLeave(HikariDataSource ds, Semaphore txn1Pass, Integer txnID, Integer doctorID) {
    55. try(Connection connection = ds.getConnection()) {
    56. try {
    57. connection.setAutoCommit(false);
    58. String comment = txnID == 2 ? " " : "" + "/* txn #{txn_id} */ ";
    59. connection.createStatement().executeUpdate(comment + "BEGIN");
    60. // Txn 1 should be waiting until txn 2 is done.
    61. if (txnID == 1) {
    62. txn1Pass.acquire();
    63. }
    64. PreparedStatement currentOnCallQuery = connection.prepareStatement(comment +
    65. "SELECT COUNT(*) AS `count` FROM `doctors` WHERE `on_call` = ? AND `shift_id` = ? FOR UPDATE");
    66. currentOnCallQuery.setBoolean(1, true);
    67. currentOnCallQuery.setInt(2, 123);
    68. ResultSet res = currentOnCallQuery.executeQuery();
    69. if (!res.next()) {
    70. throw new RuntimeException("error query");
    71. } else {
    72. int count = res.getInt("count");
    73. if (count >= 2) {
    74. // If current on-call doctor has 2 or more, this doctor can leave
    75. PreparedStatement insert = connection.prepareStatement( comment +
    76. "UPDATE `doctors` SET `on_call` = ? WHERE `id` = ? AND `shift_id` = ?");
    77. insert.setBoolean(1, false);
    78. insert.setInt(2, doctorID);
    79. insert.setInt(3, 123);
    80. insert.executeUpdate();
    81. connection.commit();
    82. } else {
    83. throw new RuntimeException("At least one doctor is on call");
    84. }
    85. }
    86. // Txn 2 is done. Let txn 1 run again.
    87. if (txnID == 2) {
    88. txn1Pass.release();
    89. }
    90. } catch (Exception e) {
    91. // If got any error, you should roll back, data is priceless
    92. connection.rollback();
    93. e.printStackTrace();
    94. }
    95. } catch (SQLException e) {
    96. e.printStackTrace();
    97. }
    98. }
    99. }

    Golang 中使用 SELECT FOR UPDATE 来克服写偏斜问题的示例如下:

    1. package main
    2. import (
    3. "database/sql"
    4. "fmt"
    5. "sync"
    6. "github.com/pingcap-inc/tidb-example-golang/util"
    7. _ "github.com/go-sql-driver/mysql"
    8. )
    9. func main() {
    10. openDB("mysql", "root:@tcp(127.0.0.1:4000)/test", func(db *sql.DB) {
    11. writeSkew(db)
    12. })
    13. }
    14. func openDB(driverName, dataSourceName string, runnable func(db *sql.DB)) {
    15. db, err := sql.Open(driverName, dataSourceName)
    16. if err != nil {
    17. panic(err)
    18. defer db.Close()
    19. runnable(db)
    20. }
    21. err := prepareData(db)
    22. if err != nil {
    23. panic(err)
    24. }
    25. waitingChan, waitGroup := make(chan bool), sync.WaitGroup{}
    26. waitGroup.Add(1)
    27. go func() {
    28. defer waitGroup.Done()
    29. err = askForLeave(db, waitingChan, 1, 1)
    30. if err != nil {
    31. panic(err)
    32. }
    33. }()
    34. waitGroup.Add(1)
    35. go func() {
    36. defer waitGroup.Done()
    37. err = askForLeave(db, waitingChan, 2, 2)
    38. if err != nil {
    39. panic(err)
    40. }
    41. }()
    42. waitGroup.Wait()
    43. }
    44. func askForLeave(db *sql.DB, waitingChan chan bool, goroutineID, doctorID int) error {
    45. txnComment := fmt.Sprintf("/* txn %d */ ", goroutineID)
    46. if goroutineID != 1 {
    47. txnComment = "\t" + txnComment
    48. }
    49. txn, err := util.TiDBSqlBegin(db, true)
    50. if err != nil {
    51. return err
    52. }
    53. fmt.Println(txnComment + "start txn")
    54. // Txn 1 should be waiting until txn 2 is done.
    55. if goroutineID == 1 {
    56. <-waitingChan
    57. }
    58. txnFunc := func() error {
    59. queryCurrentOnCall := "SELECT COUNT(*) AS `count` FROM `doctors` WHERE `on_call` = ? AND `shift_id` = ?"
    60. rows, err := txn.Query(queryCurrentOnCall, true, 123)
    61. if err != nil {
    62. return err
    63. }
    64. defer rows.Close()
    65. fmt.Println(txnComment + queryCurrentOnCall + " successful")
    66. count := 0
    67. if rows.Next() {
    68. err = rows.Scan(&count)
    69. if err != nil {
    70. return err
    71. }
    72. }
    73. rows.Close()
    74. if count < 2 {
    75. return fmt.Errorf("at least one doctor is on call")
    76. }
    77. shift := "UPDATE `doctors` SET `on_call` = ? WHERE `id` = ? AND `shift_id` = ?"
    78. _, err = txn.Exec(shift, false, doctorID, 123)
    79. if err == nil {
    80. fmt.Println(txnComment + shift + " successful")
    81. }
    82. return err
    83. }
    84. err = txnFunc()
    85. if err == nil {
    86. txn.Commit()
    87. fmt.Println("[runTxn] commit success")
    88. } else {
    89. txn.Rollback()
    90. fmt.Printf("[runTxn] got an error, rollback: %+v\n", err)
    91. }
    92. // Txn 2 is done. Let txn 1 run again.
    93. if goroutineID == 2 {
    94. waitingChan <- true
    95. }
    96. return nil
    97. }
    98. func prepareData(db *sql.DB) error {
    99. err := createDoctorTable(db)
    100. if err != nil {
    101. return err
    102. }
    103. err = createDoctor(db, 1, "Alice", true, 123)
    104. if err != nil {
    105. return err
    106. }
    107. err = createDoctor(db, 2, "Bob", true, 123)
    108. if err != nil {
    109. return err
    110. }
    111. err = createDoctor(db, 3, "Carol", false, 123)
    112. if err != nil {
    113. return err
    114. }
    115. return nil
    116. }
    117. func createDoctorTable(db *sql.DB) error {
    118. _, err := db.Exec("CREATE TABLE IF NOT EXISTS `doctors` (" +
    119. " `id` int(11) NOT NULL," +
    120. " `name` varchar(255) DEFAULT NULL," +
    121. " `on_call` tinyint(1) DEFAULT NULL," +
    122. " `shift_id` int(11) DEFAULT NULL," +
    123. " PRIMARY KEY (`id`)," +
    124. " KEY `idx_shift_id` (`shift_id`)" +
    125. " ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin")
    126. return err
    127. }
    128. func createDoctor(db *sql.DB, id int, name string, onCall bool, shiftID int) error {
    129. _, err := db.Exec("INSERT INTO `doctors` (`id`, `name`, `on_call`, `shift_id`) VALUES (?, ?, ?, ?)",
    130. id, name, onCall, shiftID)
    131. return err
    132. }

    SQL 日志:

    1. mysql> SELECT * FROM doctors;
    2. +----+-------+---------+----------+
    3. | id | name | on_call | shift_id |
    4. +----+-------+---------+----------+
    5. | 1 | Alice | 1 | 123 |
    6. | 2 | Bob | 0 | 123 |
    7. | 3 | Carol | 0 | 123 |
    8. +----+-------+---------+----------+

    对 savepoint 和嵌套事务的支持

    Spring 支持的 PROPAGATION_NESTED 传播行为会启动一个嵌套的事务,它是当前事务之上独立启动的一个子事务。嵌套事务开始时会记录一个 savepoint,如果嵌套事务执行失败,事务将会回滚到 savepoint 的状态。嵌套事务是外层事务的一部分,它将会在外层事务提交时一起被提交。下面案例展示了 savepoint 机制:

    1. mysql> BEGIN;
    2. mysql> INSERT INTO T2 VALUES(100);
    3. mysql> SAVEPOINT svp1;
    4. mysql> INSERT INTO T2 VALUES(200);
    5. mysql> ROLLBACK TO SAVEPOINT svp1;
    6. mysql> RELEASE SAVEPOINT svp1;
    7. mysql> COMMIT;
    8. mysql> SELECT * FROM T2;
    9. +------+
    10. | ID |
    11. +------+
    12. | 100 |
    13. +------+

    注意

    TiDB 从 v6.2.0 版本开始支持 特性。因此低于 v6.2.0 版本的 TiDB 不支持 PROPAGATION_NESTED 传播行为。基于 Java Spring 框架的应用如果使用了 PROPAGATION_NESTED 传播行为,需要在应用端做出调整,将嵌套事务的逻辑移除。

    基本原则是要限制事务的大小。TiDB 对单个事务的大小有限制,这层限制是在 KV 层面。反映在 SQL 层面的话,简单来说一行数据会映射为一个 KV entry,每多一个索引,也会增加一个 KV entry。所以这个限制反映在 SQL 层面是:

    • 支持的最大单个事务容量为 10GB(TiDB v4.0 及更高版本可通过 tidb-server 配置项 performance.txn-total-size-limit 调整,低于 TiDB v4.0 的版本支持的最大单个事务容量为 100MB)。

    另外注意,无论是大小限制还是行数限制,还要考虑事务执行过程中,TiDB 做编码以及事务额外 Key 的开销。在使用的时候,为了使性能达到最优,建议每 100 ~ 500 行写入一个事务。

    自动提交的 SELECT FOR UPDATE 语句不会等锁

    自动提交下的 SELECT FOR UPDATE 目前不会加锁。效果如下图所示:

    TiDB中的情况

    这是已知的与 MySQL 不兼容的地方。

    可以通过使用显式的 BEGIN;COMMIT; 解决该问题。