乐观事务和悲观事务

    对于乐观事务模型来说,比较适合冲突率不高的场景,因为直接提交大概率会成功,冲突是小概率事件,但是一旦遇到事务冲突,回滚的代价会比较大。

    悲观事务的好处是对于冲突率高的场景,提前上锁的代价小于事后回滚的代价,而且还能以比较低的代价解决多个并发事务互相冲突导致谁也成功不了的场景。不过悲观事务在冲突率不高的场景并没有乐观事务处理高效。

    从应用端实现的复杂度而言,悲观事务更直观,更容易实现。而乐观事务需要复杂的应用端重试机制来保证。

    下面用 bookshop 数据库中的表实现一个购书的例子来演示乐观事务和悲观事务的区别以及优缺点。购书流程主要包括:

    1. 更新库存数量
    2. 创建订单
    3. 付款

    这三个操作需要保证全部成功或者全部失败,并且在并发情况下要保证不超卖。

    下面代码以悲观事务的方式,用两个线程模拟了两个用户并发买同一本书的过程,书店剩余 10 本,Bob 购买了 6 本,Alice 购买了 4 本。两个人几乎同一时间完成订单,最终,这本书的剩余库存为零。

    • Java
    • Golang
    • Python

    当使用多个线程模拟多用户同时插入的情况时,需要使用一个线程安全的连接对象,这里使用 Java 当前较流行的连接池 HikariCP

    Golang 的 是并发安全的,无需引入外部包。

    封装一个用于适配 TiDB 事务的工具包 ,编写以下代码备用:

    使用 Python 的 mysqlclient Driver 开启多个连接对象进行交互,线程之间不共享连接,以保证其线程安全。

    • Java
    • Golang
    • Python

    配置文件

    在 Java 中,如果你使用 Maven 作为包管理,在 pom.xml 中的 <dependencies> 节点中,加入以下依赖来引入 HikariCP,同时设定打包目标,及 JAR 包启动的主类,完整的 pom.xml 如下所示:

    1. <?xml version="1.0" encoding="UTF-8"?>
    2. <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    3. xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    4. <modelVersion>4.0.0</modelVersion>
    5. <groupId>com.pingcap</groupId>
    6. <artifactId>plain-java-txn</artifactId>
    7. <version>0.0.1</version>
    8. <name>plain-java-jdbc</name>
    9. <properties>
    10. <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
    11. <maven.compiler.source>17</maven.compiler.source>
    12. <maven.compiler.target>17</maven.compiler.target>
    13. </properties>
    14. <dependencies>
    15. <dependency>
    16. <groupId>junit</groupId>
    17. <artifactId>junit</artifactId>
    18. <version>4.13.2</version>
    19. <scope>test</scope>
    20. </dependency>
    21. <!-- https://mvnrepository.com/artifact/mysql/mysql-connector-java -->
    22. <dependency>
    23. <groupId>mysql</groupId>
    24. <artifactId>mysql-connector-java</artifactId>
    25. <version>8.0.28</version>
    26. </dependency>
    27. <dependency>
    28. <groupId>com.zaxxer</groupId>
    29. <artifactId>HikariCP</artifactId>
    30. <version>5.0.1</version>
    31. </dependency>
    32. </dependencies>
    33. <build>
    34. <plugins>
    35. <plugin>
    36. <groupId>org.apache.maven.plugins</groupId>
    37. <artifactId>maven-assembly-plugin</artifactId>
    38. <version>3.3.0</version>
    39. <configuration>
    40. <descriptorRefs>
    41. <descriptorRef>jar-with-dependencies</descriptorRef>
    42. </descriptorRefs>
    43. <archive>
    44. <manifest>
    45. <mainClass>com.pingcap.txn.TxnExample</mainClass>
    46. </manifest>
    47. </archive>
    48. </configuration>
    49. <executions>
    50. <execution>
    51. <id>make-assembly</id>
    52. <phase>package</phase>
    53. <goals>
    54. <goal>single</goal>
    55. </goals>
    56. </execution>
    57. </executions>
    58. </plugin>
    59. </plugins>
    60. </build>
    61. </project>

    代码

    随后编写代码:

    1. package com.pingcap.txn;
    2. import com.zaxxer.hikari.HikariDataSource;
    3. import java.math.BigDecimal;
    4. import java.sql.*;
    5. import java.util.Arrays;
    6. import java.util.concurrent.*;
    7. public class TxnExample {
    8. public static void main(String[] args) throws SQLException, InterruptedException {
    9. System.out.println(Arrays.toString(args));
    10. int aliceQuantity = 0;
    11. int bobQuantity = 0;
    12. for (String arg: args) {
    13. if (arg.startsWith("ALICE_NUM")) {
    14. aliceQuantity = Integer.parseInt(arg.replace("ALICE_NUM=", ""));
    15. }
    16. if (arg.startsWith("BOB_NUM")) {
    17. bobQuantity = Integer.parseInt(arg.replace("BOB_NUM=", ""));
    18. }
    19. }
    20. HikariDataSource ds = new HikariDataSource();
    21. ds.setJdbcUrl("jdbc:mysql://localhost:4000/bookshop?useServerPrepStmts=true&cachePrepStmts=true");
    22. ds.setUsername("root");
    23. ds.setPassword("");
    24. // prepare data
    25. Connection connection = ds.getConnection();
    26. createBook(connection, 1L, "Designing Data-Intensive Application", "Science & Technology",
    27. Timestamp.valueOf("2018-09-01 00:00:00"), new BigDecimal(100), 10);
    28. createUser(connection, 1L, "Bob", new BigDecimal(10000));
    29. createUser(connection, 2L, "Alice", new BigDecimal(10000));
    30. CountDownLatch countDownLatch = new CountDownLatch(2);
    31. ExecutorService threadPool = Executors.newFixedThreadPool(2);
    32. final int finalBobQuantity = bobQuantity;
    33. threadPool.execute(() -> {
    34. buy(ds, 1, 1000L, 1L, 1L, finalBobQuantity);
    35. countDownLatch.countDown();
    36. });
    37. final int finalAliceQuantity = aliceQuantity;
    38. threadPool.execute(() -> {
    39. buy(ds, 2, 1001L, 1L, 2L, finalAliceQuantity);
    40. countDownLatch.countDown();
    41. });
    42. countDownLatch.await(5, TimeUnit.SECONDS);
    43. }
    44. public static void createUser(Connection connection, Long id, String nickname, BigDecimal balance) throws SQLException {
    45. PreparedStatement insert = connection.prepareStatement(
    46. "INSERT INTO `users` (`id`, `nickname`, `balance`) VALUES (?, ?, ?)");
    47. insert.setLong(1, id);
    48. insert.setString(2, nickname);
    49. insert.setBigDecimal(3, balance);
    50. insert.executeUpdate();
    51. }
    52. public static void createBook(Connection connection, Long id, String title, String type, Timestamp publishedAt, BigDecimal price, Integer stock) throws SQLException {
    53. PreparedStatement insert = connection.prepareStatement(
    54. "INSERT INTO `books` (`id`, `title`, `type`, `published_at`, `price`, `stock`) values (?, ?, ?, ?, ?, ?)");
    55. insert.setLong(1, id);
    56. insert.setString(2, title);
    57. insert.setString(3, type);
    58. insert.setTimestamp(4, publishedAt);
    59. insert.setBigDecimal(5, price);
    60. insert.setInt(6, stock);
    61. insert.executeUpdate();
    62. }
    63. public static void buy (HikariDataSource ds, Integer threadID,
    64. Long orderID, Long bookID, Long userID, Integer quantity) {
    65. String txnComment = "/* txn " + threadID + " */ ";
    66. try (Connection connection = ds.getConnection()) {
    67. try {
    68. connection.setAutoCommit(false);
    69. connection.createStatement().executeUpdate(txnComment + "begin pessimistic");
    70. // waiting for other thread ran the 'begin pessimistic' statement
    71. TimeUnit.SECONDS.sleep(1);
    72. BigDecimal price = null;
    73. // read price of book
    74. PreparedStatement selectBook = connection.prepareStatement(txnComment + "select price from books where id = ? for update");
    75. selectBook.setLong(1, bookID);
    76. ResultSet res = selectBook.executeQuery();
    77. if (!res.next()) {
    78. throw new RuntimeException("book not exist");
    79. } else {
    80. price = res.getBigDecimal("price");
    81. }
    82. // update book
    83. String updateBookSQL = "update `books` set stock = stock - ? where id = ? and stock - ? >= 0";
    84. PreparedStatement updateBook = connection.prepareStatement(txnComment + updateBookSQL);
    85. updateBook.setInt(1, quantity);
    86. updateBook.setLong(2, bookID);
    87. updateBook.setInt(3, quantity);
    88. int affectedRows = updateBook.executeUpdate();
    89. if (affectedRows == 0) {
    90. // stock not enough, rollback
    91. connection.createStatement().executeUpdate(txnComment + "rollback");
    92. return;
    93. }
    94. // insert order
    95. String insertOrderSQL = "insert into `orders` (`id`, `book_id`, `user_id`, `quality`) values (?, ?, ?, ?)";
    96. PreparedStatement insertOrder = connection.prepareStatement(txnComment + insertOrderSQL);
    97. insertOrder.setLong(1, orderID);
    98. insertOrder.setLong(2, bookID);
    99. insertOrder.setLong(3, userID);
    100. insertOrder.setInt(4, quantity);
    101. insertOrder.executeUpdate();
    102. // update user
    103. String updateUserSQL = "update `users` set `balance` = `balance` - ? where id = ?";
    104. PreparedStatement updateUser = connection.prepareStatement(txnComment + updateUserSQL);
    105. updateUser.setBigDecimal(1, price.multiply(new BigDecimal(quantity)));
    106. updateUser.setLong(2, userID);
    107. updateUser.executeUpdate();
    108. connection.createStatement().executeUpdate(txnComment + "commit");
    109. } catch (Exception e) {
    110. connection.createStatement().executeUpdate(txnComment + "rollback");
    111. e.printStackTrace();
    112. }
    113. } catch (SQLException e) {
    114. e.printStackTrace();
    115. }
    116. }
    117. }

    首先编写一个封装了所需的数据库操作的 helper.go 文件:

    1. package main
    2. import (
    3. "context"
    4. "database/sql"
    5. "fmt"
    6. "time"
    7. "github.com/go-sql-driver/mysql"
    8. "github.com/pingcap-inc/tidb-example-golang/util"
    9. "github.com/shopspring/decimal"
    10. )
    11. type TxnFunc func(txn *util.TiDBSqlTx) error
    12. const (
    13. ErrWriteConflict = 9007 // Transactions in TiKV encounter write conflicts.
    14. ErrInfoSchemaChanged = 8028 // table schema changes
    15. ErrForUpdateCantRetry = 8002 // "SELECT FOR UPDATE" commit conflict
    16. ErrTxnRetryable = 8022 // The transaction commit fails and has been rolled back
    17. )
    18. const retryTimes = 5
    19. var retryErrorCodeSet = map[uint16]interface{}{
    20. ErrWriteConflict: nil,
    21. ErrInfoSchemaChanged: nil,
    22. ErrForUpdateCantRetry: nil,
    23. ErrTxnRetryable: nil,
    24. }
    25. func runTxn(db *sql.DB, optimistic bool, optimisticRetryTimes int, txnFunc TxnFunc) {
    26. txn, err := util.TiDBSqlBegin(db, !optimistic)
    27. if err != nil {
    28. panic(err)
    29. }
    30. err = txnFunc(txn)
    31. if err != nil {
    32. txn.Rollback()
    33. if mysqlErr, ok := err.(*mysql.MySQLError); ok && optimistic && optimisticRetryTimes != 0 {
    34. if _, retryableError := retryErrorCodeSet[mysqlErr.Number]; retryableError {
    35. fmt.Printf("[runTxn] got a retryable error, rest time: %d\n", optimisticRetryTimes-1)
    36. runTxn(db, optimistic, optimisticRetryTimes-1, txnFunc)
    37. return
    38. }
    39. }
    40. fmt.Printf("[runTxn] got an error, rollback: %+v\n", err)
    41. } else {
    42. err = txn.Commit()
    43. if mysqlErr, ok := err.(*mysql.MySQLError); ok && optimistic && optimisticRetryTimes != 0 {
    44. if _, retryableError := retryErrorCodeSet[mysqlErr.Number]; retryableError {
    45. fmt.Printf("[runTxn] got a retryable error, rest time: %d\n", optimisticRetryTimes-1)
    46. runTxn(db, optimistic, optimisticRetryTimes-1, txnFunc)
    47. return
    48. }
    49. }
    50. if err == nil {
    51. fmt.Println("[runTxn] commit success")
    52. }
    53. }
    54. }
    55. func prepareData(db *sql.DB, optimistic bool) {
    56. runTxn(db, optimistic, retryTimes, func(txn *util.TiDBSqlTx) error {
    57. publishedAt, err := time.Parse("2006-01-02 15:04:05", "2018-09-01 00:00:00")
    58. if err != nil {
    59. return err
    60. }
    61. if err = createBook(txn, 1, "Designing Data-Intensive Application",
    62. "Science & Technology", publishedAt, decimal.NewFromInt(100), 10); err != nil {
    63. return err
    64. }
    65. if err = createUser(txn, 1, "Bob", decimal.NewFromInt(10000)); err != nil {
    66. return err
    67. }
    68. if err = createUser(txn, 2, "Alice", decimal.NewFromInt(10000)); err != nil {
    69. return err
    70. }
    71. return nil
    72. })
    73. }
    74. func buyPessimistic(db *sql.DB, goroutineID, orderID, bookID, userID, amount int) {
    75. txnComment := fmt.Sprintf("/* txn %d */ ", goroutineID)
    76. if goroutineID != 1 {
    77. txnComment = "\t" + txnComment
    78. }
    79. fmt.Printf("\nuser %d try to buy %d books(id: %d)\n", userID, amount, bookID)
    80. runTxn(db, false, retryTimes, func(txn *util.TiDBSqlTx) error {
    81. time.Sleep(time.Second)
    82. // read the price of book
    83. selectBookForUpdate := "select `price` from books where id = ? for update"
    84. bookRows, err := txn.Query(selectBookForUpdate, bookID)
    85. if err != nil {
    86. return err
    87. }
    88. fmt.Println(txnComment + selectBookForUpdate + " successful")
    89. defer bookRows.Close()
    90. price := decimal.NewFromInt(0)
    91. if bookRows.Next() {
    92. if err != nil {
    93. return err
    94. }
    95. } else {
    96. return fmt.Errorf("book ID not exist")
    97. }
    98. // update book
    99. updateStock := "update `books` set stock = stock - ? where id = ? and stock - ? >= 0"
    100. result, err := txn.Exec(updateStock, amount, bookID, amount)
    101. if err != nil {
    102. return err
    103. }
    104. fmt.Println(txnComment + updateStock + " successful")
    105. affected, err := result.RowsAffected()
    106. if err != nil {
    107. return err
    108. }
    109. if affected == 0 {
    110. return fmt.Errorf("stock not enough, rollback")
    111. }
    112. // insert order
    113. insertOrder := "insert into `orders` (`id`, `book_id`, `user_id`, `quality`) values (?, ?, ?, ?)"
    114. if _, err := txn.Exec(insertOrder,
    115. orderID, bookID, userID, amount); err != nil {
    116. return err
    117. }
    118. fmt.Println(txnComment + insertOrder + " successful")
    119. // update user
    120. updateUser := "update `users` set `balance` = `balance` - ? where id = ?"
    121. if _, err := txn.Exec(updateUser,
    122. price.Mul(decimal.NewFromInt(int64(amount))), userID); err != nil {
    123. return err
    124. }
    125. fmt.Println(txnComment + updateUser + " successful")
    126. return nil
    127. })
    128. }
    129. func buyOptimistic(db *sql.DB, goroutineID, orderID, bookID, userID, amount int) {
    130. txnComment := fmt.Sprintf("/* txn %d */ ", goroutineID)
    131. if goroutineID != 1 {
    132. txnComment = "\t" + txnComment
    133. }
    134. fmt.Printf("\nuser %d try to buy %d books(id: %d)\n", userID, amount, bookID)
    135. runTxn(db, true, retryTimes, func(txn *util.TiDBSqlTx) error {
    136. time.Sleep(time.Second)
    137. // read the price and stock of book
    138. selectBookForUpdate := "select `price`, `stock` from books where id = ? for update"
    139. bookRows, err := txn.Query(selectBookForUpdate, bookID)
    140. if err != nil {
    141. return err
    142. }
    143. fmt.Println(txnComment + selectBookForUpdate + " successful")
    144. defer bookRows.Close()
    145. price, stock := decimal.NewFromInt(0), 0
    146. if bookRows.Next() {
    147. err = bookRows.Scan(&price, &stock)
    148. if err != nil {
    149. return err
    150. }
    151. } else {
    152. return fmt.Errorf("book ID not exist")
    153. }
    154. bookRows.Close()
    155. if stock < amount {
    156. return fmt.Errorf("book not enough")
    157. }
    158. // update book
    159. updateStock := "update `books` set stock = stock - ? where id = ? and stock - ? >= 0"
    160. result, err := txn.Exec(updateStock, amount, bookID, amount)
    161. if err != nil {
    162. return err
    163. }
    164. fmt.Println(txnComment + updateStock + " successful")
    165. affected, err := result.RowsAffected()
    166. if err != nil {
    167. return err
    168. }
    169. if affected == 0 {
    170. return fmt.Errorf("stock not enough, rollback")
    171. }
    172. // insert order
    173. insertOrder := "insert into `orders` (`id`, `book_id`, `user_id`, `quality`) values (?, ?, ?, ?)"
    174. if _, err := txn.Exec(insertOrder,
    175. orderID, bookID, userID, amount); err != nil {
    176. return err
    177. }
    178. fmt.Println(txnComment + insertOrder + " successful")
    179. // update user
    180. updateUser := "update `users` set `balance` = `balance` - ? where id = ?"
    181. if _, err := txn.Exec(updateUser,
    182. price.Mul(decimal.NewFromInt(int64(amount))), userID); err != nil {
    183. return err
    184. }
    185. fmt.Println(txnComment + updateUser + " successful")
    186. return nil
    187. })
    188. }
    189. func createBook(txn *util.TiDBSqlTx, id int, title, bookType string,
    190. publishedAt time.Time, price decimal.Decimal, stock int) error {
    191. _, err := txn.ExecContext(context.Background(),
    192. "INSERT INTO `books` (`id`, `title`, `type`, `published_at`, `price`, `stock`) values (?, ?, ?, ?, ?, ?)",
    193. id, title, bookType, publishedAt, price, stock)
    194. return err
    195. }
    196. func createUser(txn *util.TiDBSqlTx, id int, nickname string, balance decimal.Decimal) error {
    197. _, err := txn.ExecContext(context.Background(),
    198. "INSERT INTO `users` (`id`, `nickname`, `balance`) VALUES (?, ?, ?)",
    199. id, nickname, balance)
    200. return err
    201. }

    再编写一个包含 main 函数的 txn.go 来调用 helper.go,同时处理传入的命令行参数:

    1. package main
    2. import (
    3. "database/sql"
    4. "flag"
    5. "fmt"
    6. "sync"
    7. )
    8. func main() {
    9. optimistic, alice, bob := parseParams()
    10. openDB("mysql", "root:@tcp(127.0.0.1:4000)/bookshop?charset=utf8mb4", func(db *sql.DB) {
    11. prepareData(db, optimistic)
    12. buy(db, optimistic, alice, bob)
    13. })
    14. }
    15. func buy(db *sql.DB, optimistic bool, alice, bob int) {
    16. buyFunc := buyOptimistic
    17. if !optimistic {
    18. buyFunc = buyPessimistic
    19. }
    20. wg := sync.WaitGroup{}
    21. wg.Add(1)
    22. go func() {
    23. defer wg.Done()
    24. buyFunc(db, 1, 1000, 1, 1, bob)
    25. }()
    26. wg.Add(1)
    27. go func() {
    28. defer wg.Done()
    29. buyFunc(db, 2, 1001, 1, 2, alice)
    30. }()
    31. wg.Wait()
    32. }
    33. func openDB(driverName, dataSourceName string, runnable func(db *sql.DB)) {
    34. db, err := sql.Open(driverName, dataSourceName)
    35. if err != nil {
    36. panic(err)
    37. }
    38. defer db.Close()
    39. runnable(db)
    40. }
    41. func parseParams() (optimistic bool, alice, bob int) {
    42. flag.BoolVar(&optimistic, "o", false, "transaction is optimistic")
    43. flag.IntVar(&alice, "a", 4, "Alice bought num")
    44. flag.IntVar(&bob, "b", 6, "Bob bought num")
    45. flag.Parse()
    46. fmt.Println(optimistic, alice, bob)
    47. return optimistic, alice, bob
    48. }

    Golang 的例子中,已经包含乐观事务。

    1. import time
    2. import MySQLdb
    3. import os
    4. import datetime
    5. from threading import Thread
    6. REPEATABLE_ERROR_CODE_SET = {
    7. 9007, # Transactions in TiKV encounter write conflicts.
    8. 8028, # table schema changes
    9. 8002, # "SELECT FOR UPDATE" commit conflict
    10. 8022 # The transaction commit fails and has been rolled back
    11. }
    12. def create_connection():
    13. return MySQLdb.connect(
    14. host="127.0.0.1",
    15. port=4000,
    16. user="root",
    17. password="",
    18. database="bookshop",
    19. autocommit=False
    20. )
    21. def prepare_data() -> None:
    22. connection = create_connection()
    23. with connection:
    24. with connection.cursor() as cursor:
    25. cursor.execute("INSERT INTO `books` (`id`, `title`, `type`, `published_at`, `price`, `stock`) "
    26. "values (%s, %s, %s, %s, %s, %s)",
    27. (1, "Designing Data-Intensive Application", "Science & Technology",
    28. datetime.datetime(2018, 9, 1), 100, 10))
    29. cursor.executemany("INSERT INTO `users` (`id`, `nickname`, `balance`) VALUES (%s, %s, %s)",
    30. [(1, "Bob", 10000), (2, "ALICE", 10000)])
    31. connection.commit()
    32. def buy_optimistic(thread_id: int, order_id: int, book_id: int, user_id: int, amount: int,
    33. optimistic_retry_times: int = 5) -> None:
    34. connection = create_connection()
    35. txn_log_header = f"/* txn {thread_id} */"
    36. if thread_id != 1:
    37. txn_log_header = "\t" + txn_log_header
    38. with connection:
    39. with connection.cursor() as cursor:
    40. cursor.execute("BEGIN OPTIMISTIC")
    41. print(f'{txn_log_header} BEGIN OPTIMISTIC')
    42. time.sleep(1)
    43. try:
    44. # read the price of book
    45. select_book_for_update = "SELECT `price`, `stock` FROM books WHERE id = %s FOR UPDATE"
    46. cursor.execute(select_book_for_update, (book_id,))
    47. book = cursor.fetchone()
    48. if book is None:
    49. raise Exception("book_id not exist")
    50. price, stock = book
    51. print(f'{txn_log_header} {select_book_for_update} successful')
    52. if stock < amount:
    53. raise Exception("book not enough, rollback")
    54. # update book
    55. update_stock = "update `books` set stock = stock - %s where id = %s and stock - %s >= 0"
    56. rows_affected = cursor.execute(update_stock, (amount, book_id, amount))
    57. print(f'{txn_log_header} {update_stock} successful')
    58. if rows_affected == 0:
    59. raise Exception("stock not enough, rollback")
    60. # insert order
    61. insert_order = "insert into `orders` (`id`, `book_id`, `user_id`, `quality`) values (%s, %s, %s, %s)"
    62. cursor.execute(insert_order, (order_id, book_id, user_id, amount))
    63. print(f'{txn_log_header} {insert_order} successful')
    64. # update user
    65. update_user = "update `users` set `balance` = `balance` - %s where id = %s"
    66. cursor.execute(update_user, (amount * price, user_id))
    67. print(f'{txn_log_header} {update_user} successful')
    68. except Exception as err:
    69. connection.rollback()
    70. print(f'something went wrong: {err}')
    71. else:
    72. # important here! you need deal the Exception from the TiDB
    73. try:
    74. connection.commit()
    75. except MySQLdb.MySQLError as db_err:
    76. code, desc = db_err.args
    77. if code in REPEATABLE_ERROR_CODE_SET and optimistic_retry_times > 0:
    78. print(f'retry, rest {optimistic_retry_times - 1} times, for {code} {desc}')
    79. buy_optimistic(thread_id, order_id, book_id, user_id, amount, optimistic_retry_times - 1)
    80. def buy_pessimistic(thread_id: int, order_id: int, book_id: int, user_id: int, amount: int) -> None:
    81. connection = create_connection()
    82. txn_log_header = f"/* txn {thread_id} */"
    83. if thread_id != 1:
    84. txn_log_header = "\t" + txn_log_header
    85. with connection:
    86. with connection.cursor() as cursor:
    87. cursor.execute("BEGIN PESSIMISTIC")
    88. print(f'{txn_log_header} BEGIN PESSIMISTIC')
    89. time.sleep(1)
    90. try:
    91. # read the price of book
    92. select_book_for_update = "SELECT `price` FROM books WHERE id = %s FOR UPDATE"
    93. cursor.execute(select_book_for_update, (book_id,))
    94. book = cursor.fetchone()
    95. if book is None:
    96. raise Exception("book_id not exist")
    97. price = book[0]
    98. print(f'{txn_log_header} {select_book_for_update} successful')
    99. # update book
    100. update_stock = "update `books` set stock = stock - %s where id = %s and stock - %s >= 0"
    101. rows_affected = cursor.execute(update_stock, (amount, book_id, amount))
    102. print(f'{txn_log_header} {update_stock} successful')
    103. if rows_affected == 0:
    104. raise Exception("stock not enough, rollback")
    105. # insert order
    106. insert_order = "insert into `orders` (`id`, `book_id`, `user_id`, `quality`) values (%s, %s, %s, %s)"
    107. cursor.execute(insert_order, (order_id, book_id, user_id, amount))
    108. print(f'{txn_log_header} {insert_order} successful')
    109. # update user
    110. update_user = "update `users` set `balance` = `balance` - %s where id = %s"
    111. cursor.execute(update_user, (amount * price, user_id))
    112. print(f'{txn_log_header} {update_user} successful')
    113. except Exception as err:
    114. connection.rollback()
    115. print(f'something went wrong: {err}')
    116. else:
    117. connection.commit()
    118. optimistic = os.environ.get('OPTIMISTIC')
    119. alice = os.environ.get('ALICE')
    120. bob = os.environ.get('BOB')
    121. if not (optimistic and alice and bob):
    122. raise Exception("please use \"OPTIMISTIC=<is_optimistic> ALICE=<alice_num> "
    123. "BOB=<bob_num> python3 txn_example.py\" to start this script")
    124. prepare_data()
    125. if bool(optimistic) is True:
    126. buy_func = buy_optimistic
    127. else:
    128. buy_func = buy_pessimistic
    129. bob_thread = Thread(target=buy_func, kwargs={
    130. "thread_id": 1, "order_id": 1000, "book_id": 1, "user_id": 1, "amount": int(bob)})
    131. alice_thread = Thread(target=buy_func, kwargs={
    132. "thread_id": 2, "order_id": 1001, "book_id": 1, "user_id": 2, "amount": int(alice)})
    133. bob_thread.start()
    134. alice_thread.start()
    135. bob_thread.join(timeout=10)
    136. alice_thread.join(timeout=10)

    2. 运行不涉及超卖的例子

    运行示例程序:

    • Java
    • Golang
    • Python

    在 Java 中运行示例程序:

    1. mvn clean package
    2. java -jar target/plain-java-txn-0.0.1-jar-with-dependencies.jar ALICE_NUM=4 BOB_NUM=6

    在 Golang 中运行示例程序:

    1. go build -o bin/txn
    2. ./bin/txn -a 4 -b 6

    在 Python 中运行示例程序:

    1. OPTIMISTIC=False ALICE=4 BOB=6 python3 txn_example.py

    SQL 日志:

    最后,检验一下订单创建、用户余额扣减、图书库存扣减情况,都符合预期。

    1. mysql> SELECT * FROM `books`;
    2. +----+--------------------------------------+----------------------+---------------------+-------+--------+
    3. | id | title | type | published_at | stock | price |
    4. +----+--------------------------------------+----------------------+---------------------+-------+--------+
    5. | 1 | Designing Data-Intensive Application | Science & Technology | 2018-09-01 00:00:00 | 0 | 100.00 |
    6. +----+--------------------------------------+----------------------+---------------------+-------+--------+
    7. 1 row in set (0.00 sec)
    8. mysql> SELECT * FROM orders;
    9. +------+---------+---------+---------+---------------------+
    10. | id | book_id | user_id | quality | ordered_at |
    11. +------+---------+---------+---------+---------------------+
    12. | 1000 | 1 | 1 | 6 | 2022-04-19 10:58:12 |
    13. | 1001 | 1 | 1 | 4 | 2022-04-19 10:58:11 |
    14. +------+---------+---------+---------+---------------------+
    15. 2 rows in set (0.01 sec)
    16. mysql> SELECT * FROM users;
    17. +----+---------+----------+
    18. | id | balance | nickname |
    19. +----+---------+----------+
    20. | 1 | 9400.00 | Bob |
    21. | 2 | 9600.00 | Alice |
    22. +----+---------+----------+
    23. 2 rows in set (0.00 sec)

    可以再把难度加大,如果图书的库存剩余 10 本,Bob 购买 7 本,Alice 购买 4 本,两人几乎同时下单,结果会是怎样?继续复用上个例子里的代码来解决这个需求,只不过把 Bob 购买数量从 6 改成 7:

    运行示例程序:

    • Java
    • Golang
    • Python

    在 Java 中运行示例程序:

    1. mvn clean package
    2. java -jar target/plain-java-txn-0.0.1-jar-with-dependencies.jar ALICE_NUM=4 BOB_NUM=7

    在 Golang 中运行示例程序:

    1. go build -o bin/txn
    2. ./bin/txn -a 4 -b 7

    在 Python 中运行示例程序:

    1. OPTIMISTIC=False ALICE=4 BOB=7 python3 txn_example.py
    1. /* txn 1 */ BEGIN PESSIMISTIC
    2. /* txn 2 */ BEGIN PESSIMISTIC
    3. /* txn 2 */ SELECT * FROM `books` WHERE `id` = 1 FOR UPDATE
    4. /* txn 2 */ UPDATE `books` SET `stock` = `stock` - 4 WHERE `id` = 1 AND `stock` - 4 >= 0
    5. /* txn 2 */ INSERT INTO `orders` (`id`, `book_id`, `user_id`, `quality`) values (1001, 1, 1, 4)
    6. /* txn 2 */ UPDATE `users` SET `balance` = `balance` - 400.0 WHERE `id` = 2
    7. /* txn 2 */ COMMIT
    8. /* txn 1 */ SELECT * FROM `books` WHERE `id` = 1 FOR UPDATE
    9. /* txn 1 */ UPDATE `books` SET `stock` = `stock` - 7 WHERE `id` = 1 AND `stock` - 7 >= 0
    10. /* txn 1 */ ROLLBACK

    由于 txn 2 抢先获得锁资源,更新了 stock,txn 1 里面 affected_rows 返回值为 0,进入了 rollback 流程。

    再检验一下订单创建、用户余额扣减、图书库存扣减情况。Alice 下单 4 本书成功,Bob 下单 7 本书失败,库存剩余 6 本符合预期。

    1. mysql> SELECT * FROM books;
    2. +----+--------------------------------------+----------------------+---------------------+-------+--------+
    3. | id | title | type | published_at | stock | price |
    4. +----+--------------------------------------+----------------------+---------------------+-------+--------+
    5. | 1 | Designing Data-Intensive Application | Science & Technology | 2018-09-01 00:00:00 | 6 | 100.00 |
    6. +----+--------------------------------------+----------------------+---------------------+-------+--------+
    7. 1 row in set (0.00 sec)
    8. mysql> SELECT * FROM orders;
    9. +------+---------+---------+---------+---------------------+
    10. | id | book_id | user_id | quality | ordered_at |
    11. +------+---------+---------+---------+---------------------+
    12. | 1001 | 1 | 1 | 4 | 2022-04-19 11:03:03 |
    13. +------+---------+---------+---------+---------------------+
    14. 1 row in set (0.00 sec)
    15. mysql> SELECT * FROM users;
    16. +----+----------+----------+
    17. | id | balance | nickname |
    18. +----+----------+----------+
    19. | 1 | 10000.00 | Bob |
    20. | 2 | 9600.00 | Alice |
    21. +----+----------+----------+
    22. 2 rows in set (0.01 sec)

    乐观事务

    下面代码以乐观事务的方式,用两个线程模拟了两个用户并发买同一本书的过程,和悲观事务的示例一样。书店剩余 10 本,Bob 购买了 6 本,Alice 购买了 4 本。两个人几乎同一时间完成订单,最终,这本书的剩余库存为零。

    1. 编写乐观事务示例

    • Java
    • Golang
    • Python

    使用 Java 编写乐观事务示例:

    代码编写

    1. package com.pingcap.txn.optimistic;
    2. import com.zaxxer.hikari.HikariDataSource;
    3. import java.math.BigDecimal;
    4. import java.sql.*;
    5. import java.util.Arrays;
    6. import java.util.concurrent.*;
    7. public class TxnExample {
    8. public static void main(String[] args) throws SQLException, InterruptedException {
    9. System.out.println(Arrays.toString(args));
    10. int aliceQuantity = 0;
    11. int bobQuantity = 0;
    12. for (String arg: args) {
    13. if (arg.startsWith("ALICE_NUM")) {
    14. aliceQuantity = Integer.parseInt(arg.replace("ALICE_NUM=", ""));
    15. }
    16. if (arg.startsWith("BOB_NUM")) {
    17. bobQuantity = Integer.parseInt(arg.replace("BOB_NUM=", ""));
    18. }
    19. }
    20. HikariDataSource ds = new HikariDataSource();
    21. ds.setJdbcUrl("jdbc:mysql://localhost:4000/bookshop?useServerPrepStmts=true&cachePrepStmts=true");
    22. ds.setUsername("root");
    23. ds.setPassword("");
    24. // prepare data
    25. Connection connection = ds.getConnection();
    26. createBook(connection, 1L, "Designing Data-Intensive Application", "Science & Technology",
    27. Timestamp.valueOf("2018-09-01 00:00:00"), new BigDecimal(100), 10);
    28. createUser(connection, 1L, "Bob", new BigDecimal(10000));
    29. createUser(connection, 2L, "Alice", new BigDecimal(10000));
    30. CountDownLatch countDownLatch = new CountDownLatch(2);
    31. ExecutorService threadPool = Executors.newFixedThreadPool(2);
    32. final int finalBobQuantity = bobQuantity;
    33. threadPool.execute(() -> {
    34. buy(ds, 1, 1000L, 1L, 1L, finalBobQuantity, 5);
    35. countDownLatch.countDown();
    36. });
    37. final int finalAliceQuantity = aliceQuantity;
    38. threadPool.execute(() -> {
    39. buy(ds, 2, 1001L, 1L, 2L, finalAliceQuantity, 5);
    40. countDownLatch.countDown();
    41. });
    42. countDownLatch.await(5, TimeUnit.SECONDS);
    43. }
    44. public static void createUser(Connection connection, Long id, String nickname, BigDecimal balance) throws SQLException {
    45. PreparedStatement insert = connection.prepareStatement(
    46. "INSERT INTO `users` (`id`, `nickname`, `balance`) VALUES (?, ?, ?)");
    47. insert.setLong(1, id);
    48. insert.setString(2, nickname);
    49. insert.setBigDecimal(3, balance);
    50. insert.executeUpdate();
    51. }
    52. public static void createBook(Connection connection, Long id, String title, String type, Timestamp publishedAt, BigDecimal price, Integer stock) throws SQLException {
    53. PreparedStatement insert = connection.prepareStatement(
    54. "INSERT INTO `books` (`id`, `title`, `type`, `published_at`, `price`, `stock`) values (?, ?, ?, ?, ?, ?)");
    55. insert.setLong(1, id);
    56. insert.setString(2, title);
    57. insert.setString(3, type);
    58. insert.setTimestamp(4, publishedAt);
    59. insert.setBigDecimal(5, price);
    60. insert.setInt(6, stock);
    61. insert.executeUpdate();
    62. }
    63. public static void buy (HikariDataSource ds, Integer threadID, Long orderID, Long bookID,
    64. Long userID, Integer quantity, Integer retryTimes) {
    65. String txnComment = "/* txn " + threadID + " */ ";
    66. try (Connection connection = ds.getConnection()) {
    67. try {
    68. connection.setAutoCommit(false);
    69. connection.createStatement().executeUpdate(txnComment + "begin optimistic");
    70. // waiting for other thread ran the 'begin optimistic' statement
    71. TimeUnit.SECONDS.sleep(1);
    72. BigDecimal price = null;
    73. // read price of book
    74. PreparedStatement selectBook = connection.prepareStatement(txnComment + "SELECT * FROM books where id = ? for update");
    75. selectBook.setLong(1, bookID);
    76. ResultSet res = selectBook.executeQuery();
    77. if (!res.next()) {
    78. throw new RuntimeException("book not exist");
    79. } else {
    80. price = res.getBigDecimal("price");
    81. int stock = res.getInt("stock");
    82. if (stock < quantity) {
    83. throw new RuntimeException("book not enough");
    84. }
    85. }
    86. // update book
    87. String updateBookSQL = "update `books` set stock = stock - ? where id = ? and stock - ? >= 0";
    88. PreparedStatement updateBook = connection.prepareStatement(txnComment + updateBookSQL);
    89. updateBook.setInt(1, quantity);
    90. updateBook.setLong(2, bookID);
    91. updateBook.setInt(3, quantity);
    92. updateBook.executeUpdate();
    93. // insert order
    94. String insertOrderSQL = "insert into `orders` (`id`, `book_id`, `user_id`, `quality`) values (?, ?, ?, ?)";
    95. PreparedStatement insertOrder = connection.prepareStatement(txnComment + insertOrderSQL);
    96. insertOrder.setLong(1, orderID);
    97. insertOrder.setLong(2, bookID);
    98. insertOrder.setLong(3, userID);
    99. insertOrder.setInt(4, quantity);
    100. insertOrder.executeUpdate();
    101. // update user
    102. String updateUserSQL = "update `users` set `balance` = `balance` - ? where id = ?";
    103. PreparedStatement updateUser = connection.prepareStatement(txnComment + updateUserSQL);
    104. updateUser.setBigDecimal(1, price.multiply(new BigDecimal(quantity)));
    105. updateUser.setLong(2, userID);
    106. updateUser.executeUpdate();
    107. connection.createStatement().executeUpdate(txnComment + "commit");
    108. } catch (Exception e) {
    109. connection.createStatement().executeUpdate(txnComment + "rollback");
    110. System.out.println("error occurred: " + e.getMessage());
    111. if (e instanceof SQLException sqlException) {
    112. switch (sqlException.getErrorCode()) {
    113. // You can get all error codes at https://docs.pingcap.com/tidb/stable/error-codes
    114. case 9007: // Transactions in TiKV encounter write conflicts.
    115. case 8028: // table schema changes
    116. case 8002: // "SELECT FOR UPDATE" commit conflict
    117. case 8022: // The transaction commit fails and has been rolled back
    118. if (retryTimes != 0) {
    119. System.out.println("rest " + retryTimes + " times. retry for " + e.getMessage());
    120. buy(ds, threadID, orderID, bookID, userID, quantity, retryTimes - 1);
    121. }
    122. }
    123. }
    124. }
    125. } catch (SQLException e) {
    126. e.printStackTrace();
    127. }
    128. }
    129. }

    配置更改

    1. <mainClass>com.pingcap.txn.TxnExample</mainClass>

    更改为:

    来指向乐观事务的例子。

    Golang 在章节中的例子已经支持了乐观事务,无需更改,可直接使用。

    Python 在编写悲观事务示例章节中的例子已经支持了乐观事务,无需更改,可直接使用。

    运行示例程序:

    • Java
    • Golang
    • Python

    在 Java 中运行示例程序:

    1. mvn clean package
    2. java -jar target/plain-java-txn-0.0.1-jar-with-dependencies.jar ALICE_NUM=4 BOB_NUM=6

    在 Golang 中运行示例程序:

    1. go build -o bin/txn
    2. ./bin/txn -a 4 -b 6 -o true

    在 Python 中运行示例程序:

    1. OPTIMISTIC=True ALICE=4 BOB=6 python3 txn_example.py

    SQL 语句执行过程:

    1. /* txn 2 */ BEGIN OPTIMISTIC
    2. /* txn 1 */ BEGIN OPTIMISTIC
    3. /* txn 2 */ SELECT * FROM `books` WHERE `id` = 1 FOR UPDATE
    4. /* txn 2 */ UPDATE `books` SET `stock` = `stock` - 4 WHERE `id` = 1 AND `stock` - 4 >= 0
    5. /* txn 2 */ INSERT INTO `orders` (`id`, `book_id`, `user_id`, `quality`) VALUES (1001, 1, 1, 4)
    6. /* txn 2 */ UPDATE `users` SET `balance` = `balance` - 400.0 WHERE `id` = 2
    7. /* txn 2 */ COMMIT
    8. /* txn 1 */ SELECT * FROM `books` WHERE `id` = 1 for UPDATE
    9. /* txn 1 */ UPDATE `books` SET `stock` = `stock` - 6 WHERE `id` = 1 AND `stock` - 6 >= 0
    10. /* txn 1 */ INSERT INTO `orders` (`id`, `book_id`, `user_id`, `quality`) VALUES (1000, 1, 1, 6)
    11. /* txn 1 */ UPDATE `users` SET `balance` = `balance` - 600.0 WHERE `id` = 1
    12. retry 1 times for 9007 Write conflict, txnStartTS=432618733006225412, conflictStartTS=432618733006225411, conflictCommitTS=432618733006225414, key={tableID=126, handle=1} primary={tableID=114, indexID=1, indexValues={1, 1000, }} [try again later]
    13. /* txn 1 */ BEGIN OPTIMISTIC
    14. /* txn 1 */ SELECT * FROM `books` WHERE `id` = 1 FOR UPDATE
    15. /* txn 1 */ UPDATE `books` SET `stock` = `stock` - 6 WHERE `id` = 1 AND `stock` - 6 >= 0
    16. /* txn 1 */ INSERT INTO `orders` (`id`, `book_id`, `user_id`, `quality`) VALUES (1000, 1, 1, 6)
    17. /* txn 1 */ UPDATE `users` SET `balance` = `balance` - 600.0 WHERE `id` = 1
    18. /* txn 1 */ COMMIT

    在乐观事务模式下,由于中间状态不一定正确,不能像悲观事务模式一样,通过 affected_rows 来判断某个语句是否执行成功。需要把事务看做一个整体,通过最终的 COMMIT 语句是否返回异常来判断当前事务是否发生写冲突。

    从上面 SQL 日志可以看出,由于两个事务并发执行,并且对同一条记录做了修改,txn 1 COMMIT 之后抛出了 9007 Write conflict 异常。对于乐观事务写冲突,在应用端可以进行安全的重试,重试一次之后提交成功,最终执行结果符合预期:

    1. mysql> SELECT * FROM books;
    2. +----+--------------------------------------+----------------------+---------------------+-------+--------+
    3. | id | title | type | published_at | stock | price |
    4. +----+--------------------------------------+----------------------+---------------------+-------+--------+
    5. | 1 | Designing Data-Intensive Application | Science & Technology | 2018-09-01 00:00:00 | 0 | 100.00 |
    6. +----+--------------------------------------+----------------------+---------------------+-------+--------+
    7. 1 row in set (0.01 sec)
    8. mysql> SELECT * FROM orders;
    9. +------+---------+---------+---------+---------------------+
    10. | id | book_id | user_id | quality | ordered_at |
    11. +------+---------+---------+---------+---------------------+
    12. | 1000 | 1 | 1 | 6 | 2022-04-19 03:18:19 |
    13. | 1001 | 1 | 1 | 4 | 2022-04-19 03:18:17 |
    14. +------+---------+---------+---------+---------------------+
    15. 2 rows in set (0.01 sec)
    16. mysql> SELECT * FROM users;
    17. +----+---------+----------+
    18. | id | balance | nickname |
    19. +----+---------+----------+
    20. | 1 | 9400.00 | Bob |
    21. | 2 | 9600.00 | Alice |
    22. +----+---------+----------+
    23. 2 rows in set (0.00 sec)

    3. 运行防止超卖的例子

    再来看一下用乐观事务防止超卖的例子,如果图书的库存剩余 10 本,Bob 购买 7 本,Alice 购买 4 本,两人几乎同时下单,结果会是怎样?继续复用乐观事务例子里的代码来解决这个需求,只不过把 Bob 购买数量从 6 改成 7:

    运行示例程序:

    • Java
    • Golang
    • Python

    在 Java 中运行示例程序:

    1. mvn clean package
    2. java -jar target/plain-java-txn-0.0.1-jar-with-dependencies.jar ALICE_NUM=4 BOB_NUM=7

    在 Golang 中运行示例程序:

    1. go build -o bin/txn
    2. ./bin/txn -a 4 -b 7 -o true

    在 Python 中运行示例程序:

    1. OPTIMISTIC=True ALICE=4 BOB=7 python3 txn_example.py

    从上面的 SQL 日志可以看出,第一次执行由于写冲突,txn 1 在应用端进行了重试,从获取到的最新快照对比发现,剩余库存不够,应用端抛出 out of stock 异常结束。

    1. mysql> SELECT * FROM books;
    2. +----+--------------------------------------+----------------------+---------------------+-------+--------+
    3. | id | title | type | published_at | stock | price |
    4. +----+--------------------------------------+----------------------+---------------------+-------+--------+
    5. | 1 | Designing Data-Intensive Application | Science & Technology | 2018-09-01 00:00:00 | 6 | 100.00 |
    6. +----+--------------------------------------+----------------------+---------------------+-------+--------+
    7. 1 row in set (0.00 sec)
    8. mysql> SELECT * FROM orders;
    9. +------+---------+---------+---------+---------------------+
    10. | id | book_id | user_id | quality | ordered_at |
    11. +------+---------+---------+---------+---------------------+
    12. | 1001 | 1 | 1 | 4 | 2022-04-19 03:41:16 |
    13. +------+---------+---------+---------+---------------------+
    14. 1 row in set (0.00 sec)
    15. mysql> SELECT * FROM users;
    16. +----+----------+----------+
    17. | id | balance | nickname |
    18. +----+----------+----------+
    19. | 1 | 10000.00 | Bob |
    20. | 2 | 9600.00 | Alice |
    21. +----+----------+----------+