TiDB 和 Python 的简单 CRUD 应用程序

    注意

    推荐使用 Python 3.10 及以上版本进行 TiDB 的应用程序的编写。

    本节将介绍 TiDB 集群的启动方法。

    • TiDB Cloud
    • 本地集群
    • Gitpod

    你可以部署一个本地测试的 TiDB 集群或正式的 TiDB 集群。详细步骤,请参考:

    基于 Git 的预配置的开发环境:现在就试试

    该环境会自动克隆代码,并通过 TiUP 部署测试集群。

    • 使用 SQLAlchemy(推荐)
    • 使用 peewee(推荐)
    • 使用 mysqlclient
    • 使用 PyMySQL
    • 使用 mysql-connector-python

    SQLAlchemy 为当前比较流行的开源 Python ORM 之一。此处将以 SQLAlchemy 1.4.44 版本进行说明。

    1. from typing import List
    2. from sqlalchemy import create_engine, String, Column, Integer, select, func
    3. from sqlalchemy.orm import declarative_base, sessionmaker
    4. engine = create_engine('mysql://root:@127.0.0.1:4000/test')
    5. Base = declarative_base()
    6. Base.metadata.create_all(engine)
    7. Session = sessionmaker(bind=engine)
    8. class Player(Base):
    9. __tablename__ = "player"
    10. id = Column(String(36), primary_key=True)
    11. coins = Column(Integer)
    12. goods = Column(Integer)
    13. def __repr__(self):
    14. return f'Player(id={self.id!r}, coins={self.coins!r}, goods={self.goods!r})'
    15. def random_player(amount: int) -> List[Player]:
    16. players = []
    17. for _ in range(amount):
    18. players.append(Player(id=uuid.uuid4(), coins=10000, goods=10000))
    19. return players
    20. def simple_example() -> None:
    21. with Session() as session:
    22. # create a player, who has a coin and a goods.
    23. session.add(Player(id="test", coins=1, goods=1))
    24. # get this player, and print it.
    25. get_test_stmt = select(Player).where(Player.id == "test")
    26. for player in session.scalars(get_test_stmt):
    27. print(player)
    28. # create players with bulk inserts.
    29. # insert 1919 players totally, with 114 players per batch.
    30. # each player has a random UUID
    31. player_list = random_player(1919)
    32. for idx in range(0, len(player_list), 114):
    33. session.bulk_save_objects(player_list[idx:idx + 114])
    34. # print the number of players
    35. count = session.query(func.count(Player.id)).scalar()
    36. print(f'number of players: {count}')
    37. # print 3 players.
    38. three_players = session.query(Player).limit(3).all()
    39. for player in three_players:
    40. print(player)
    41. session.commit()
    42. def trade_check(session: Session, sell_id: str, buy_id: str, amount: int, price: int) -> bool:
    43. # sell player goods check
    44. sell_player = session.query(Player.goods).filter(Player.id == sell_id).with_for_update().one()
    45. if sell_player.goods < amount:
    46. print(f'sell player {sell_id} goods not enough')
    47. return False
    48. # buy player coins check
    49. buy_player = session.query(Player.coins).filter(Player.id == buy_id).with_for_update().one()
    50. if buy_player.coins < price:
    51. print(f'buy player {buy_id} coins not enough')
    52. return False
    53. def trade(sell_id: str, buy_id: str, amount: int, price: int) -> None:
    54. with Session() as session:
    55. if trade_check(session, sell_id, buy_id, amount, price) is False:
    56. return
    57. # deduct the goods of seller, and raise his/her the coins
    58. session.query(Player).filter(Player.id == sell_id). \
    59. update({'goods': Player.goods - amount, 'coins': Player.coins + price})
    60. # deduct the coins of buyer, and raise his/her the goods
    61. session.query(Player).filter(Player.id == buy_id). \
    62. update({'goods': Player.goods + amount, 'coins': Player.coins - price})
    63. session.commit()
    64. print("trade success")
    65. def trade_example() -> None:
    66. with Session() as session:
    67. # create two players
    68. # player 1: id is "1", has only 100 coins.
    69. # player 2: id is "2", has 114514 coins, and 20 goods.
    70. session.add(Player(id="1", coins=100, goods=0))
    71. session.add(Player(id="2", coins=114514, goods=20))
    72. session.commit()
    73. # player 1 wants to buy 10 goods from player 2.
    74. # it will cost 500 coins, but player 1 cannot afford it.
    75. # so this trade will fail, and nobody will lose their coins or goods
    76. trade(sell_id="2", buy_id="1", amount=10, price=500)
    77. # then player 1 has to reduce the incoming quantity to 2.
    78. # this trade will be successful
    79. trade(sell_id="2", buy_id="1", amount=2, price=100)
    80. with Session() as session:
    81. traders = session.query(Player).filter(Player.id.in_(("1", "2"))).all()
    82. for player in traders:
    83. print(player)
    84. session.commit()
    85. simple_example()
    86. trade_example()

    相较于直接使用 Driver,SQLAlchemy 屏蔽了创建数据库连接时,不同数据库差异的细节。SQLAlchemy 还封装了大量的操作,如会话管理、基本对象的 CRUD 等,极大地简化了代码量。

    Player 类为数据库表在程序内的映射。Player 的每个属性都对应着 player 表的一个字段。SQLAlchemy 使用 Player 类为了给 SQLAlchemy 提供更多的信息,使用了形如以上示例中的 id = Column(String(36), primary_key=True) 的类型定义,用来指示字段类型和其附加属性。id = Column(String(36), primary_key=True) 表示 id 字段为 String 类型,对应数据库类型为 VARCHAR,长度为 36,且为主键。

    关于 SQLAlchemy 的更多使用方法,你可以参考 。

    peewee 为当前比较流行的开源 Python ORM 之一。此处将以 peewee 3.15.4 版本进行说明。

    1. import os
    2. import uuid
    3. from typing import List
    4. from peewee import *
    5. from playhouse.db_url import connect
    6. db = connect('mysql://root:@127.0.0.1:4000/test')
    7. class Player(Model):
    8. id = CharField(max_length=36, primary_key=True)
    9. coins = IntegerField()
    10. goods = IntegerField()
    11. class Meta:
    12. database = db
    13. table_name = "player"
    14. def random_player(amount: int) -> List[Player]:
    15. players = []
    16. for _ in range(amount):
    17. players.append(Player(id=uuid.uuid4(), coins=10000, goods=10000))
    18. return players
    19. def simple_example() -> None:
    20. # create a player, who has a coin and a goods.
    21. Player.create(id="test", coins=1, goods=1)
    22. # get this player, and print it.
    23. test_player = Player.select().where(Player.id == "test").get()
    24. print(f'id:{test_player.id}, coins:{test_player.coins}, goods:{test_player.goods}')
    25. # create players with bulk inserts.
    26. # insert 1919 players totally, with 114 players per batch.
    27. # each player has a random UUID
    28. player_list = random_player(1919)
    29. Player.bulk_create(player_list, 114)
    30. # print the number of players
    31. count = Player.select().count()
    32. print(f'number of players: {count}')
    33. # print 3 players.
    34. three_players = Player.select().limit(3)
    35. for player in three_players:
    36. print(f'id:{player.id}, coins:{player.coins}, goods:{player.goods}')
    37. def trade_check(sell_id: str, buy_id: str, amount: int, price: int) -> bool:
    38. sell_goods = Player.select(Player.goods).where(Player.id == sell_id).get().goods
    39. if sell_goods < amount:
    40. print(f'sell player {sell_id} goods not enough')
    41. return False
    42. buy_coins = Player.select(Player.coins).where(Player.id == buy_id).get().coins
    43. if buy_coins < price:
    44. print(f'buy player {buy_id} coins not enough')
    45. return False
    46. return True
    47. def trade(sell_id: str, buy_id: str, amount: int, price: int) -> None:
    48. with db.atomic() as txn:
    49. try:
    50. if trade_check(sell_id, buy_id, amount, price) is False:
    51. txn.rollback()
    52. return
    53. # deduct the goods of seller, and raise his/her the coins
    54. Player.update(goods=Player.goods - amount, coins=Player.coins + price).where(Player.id == sell_id).execute()
    55. # deduct the coins of buyer, and raise his/her the goods
    56. Player.update(goods=Player.goods + amount, coins=Player.coins - price).where(Player.id == buy_id).execute()
    57. except Exception as err:
    58. txn.rollback()
    59. print(f'something went wrong: {err}')
    60. else:
    61. txn.commit()
    62. print("trade success")
    63. def trade_example() -> None:
    64. # create two players
    65. # player 1: id is "1", has only 100 coins.
    66. # player 2: id is "2", has 114514 coins, and 20 goods.
    67. Player.create(id="1", coins=100, goods=0)
    68. Player.create(id="2", coins=114514, goods=20)
    69. # player 1 wants to buy 10 goods from player 2.
    70. # it will cost 500 coins, but player 1 cannot afford it.
    71. # so this trade will fail, and nobody will lose their coins or goods
    72. trade(sell_id="2", buy_id="1", amount=10, price=500)
    73. # then player 1 has to reduce the incoming quantity to 2.
    74. # this trade will be successful
    75. trade(sell_id="2", buy_id="1", amount=2, price=100)
    76. # let's take a look for player 1 and player 2 currently
    77. after_trade_players = Player.select().where(Player.id.in_(["1", "2"]))
    78. for player in after_trade_players:
    79. print(f'id:{player.id}, coins:{player.coins}, goods:{player.goods}')
    80. db.connect()
    81. # recreate the player table
    82. db.drop_tables([Player])
    83. db.create_tables([Player])
    84. simple_example()
    85. trade_example()

    相较于直接使用 Driver,peewee 屏蔽了创建数据库连接时,不同数据库差异的细节。peewee 还封装了大量的操作,如会话管理、基本对象的 CRUD 等,极大地简化了代码量。

    Player 类为数据库表在程序内的映射。Player 的每个属性都对应着 player 表的一个字段。peewee 使用 Player 类为了给 peewee 提供更多的信息,使用了形如以上示例中的 id = CharField(max_length=36, primary_key=True) 的类型定义,用来指示字段类型和其附加属性。id = CharField(max_length=36, primary_key=True) 表示 id 字段为 CharField 类型,对应数据库类型为 VARCHAR,长度为 36,且为主键。

    关于 peewee 的更多使用方法,你可以参考 。

    mysqlclient 为当前比较流行的开源 Python Driver 之一。此处将以 mysqlclient 2.1.1 版本进行说明。虽然 Python 的 Driver 相较其他语言,使用也极其方便。但因其不可屏蔽底层实现,需手动管控事务的特性,如果没有大量必须使用 SQL 的场景,仍然推荐使用 ORM 进行程序编写。这可以降低程序的耦合性。

    1. import uuid
    2. from typing import List
    3. import MySQLdb
    4. from MySQLdb import Connection
    5. from MySQLdb.cursors import Cursor
    6. def get_connection(autocommit: bool = True) -> MySQLdb.Connection:
    7. return MySQLdb.connect(
    8. host="127.0.0.1",
    9. port=4000,
    10. user="root",
    11. password="",
    12. database="test",
    13. )
    14. def create_player(cursor: Cursor, player: tuple) -> None:
    15. cursor.execute("INSERT INTO player (id, coins, goods) VALUES (%s, %s, %s)", player)
    16. def get_player(cursor: Cursor, player_id: str) -> tuple:
    17. cursor.execute("SELECT id, coins, goods FROM player WHERE id = %s", (player_id,))
    18. return cursor.fetchone()
    19. def get_players_with_limit(cursor: Cursor, limit: int) -> List[tuple]:
    20. cursor.execute("SELECT id, coins, goods FROM player LIMIT %s", (limit,))
    21. return cursor.fetchall()
    22. def random_player(amount: int) -> List[tuple]:
    23. players = []
    24. for _ in range(amount):
    25. players.append((uuid.uuid4(), 10000, 10000))
    26. return players
    27. def bulk_create_player(cursor: Cursor, players: List[tuple]) -> None:
    28. cursor.executemany("INSERT INTO player (id, coins, goods) VALUES (%s, %s, %s)", players)
    29. def get_count(cursor: Cursor) -> None:
    30. cursor.execute("SELECT count(*) FROM player")
    31. return cursor.fetchone()[0]
    32. def trade_check(cursor: Cursor, sell_id: str, buy_id: str, amount: int, price: int) -> bool:
    33. get_player_with_lock_sql = "SELECT coins, goods FROM player WHERE id = %s FOR UPDATE"
    34. # sell player goods check
    35. cursor.execute(get_player_with_lock_sql, (sell_id,))
    36. _, sell_goods = cursor.fetchone()
    37. if sell_goods < amount:
    38. print(f'sell player {sell_id} goods not enough')
    39. return False
    40. # buy player coins check
    41. cursor.execute(get_player_with_lock_sql, (buy_id,))
    42. buy_coins, _ = cursor.fetchone()
    43. if buy_coins < price:
    44. print(f'buy player {buy_id} coins not enough')
    45. return False
    46. def trade_update(cursor: Cursor, sell_id: str, buy_id: str, amount: int, price: int) -> None:
    47. update_player_sql = "UPDATE player set goods = goods + %s, coins = coins + %s WHERE id = %s"
    48. # deduct the goods of seller, and raise his/her the coins
    49. cursor.execute(update_player_sql, (-amount, price, sell_id))
    50. # deduct the coins of buyer, and raise his/her the goods
    51. cursor.execute(update_player_sql, (amount, -price, buy_id))
    52. def trade(connection: Connection, sell_id: str, buy_id: str, amount: int, price: int) -> None:
    53. with connection.cursor() as cursor:
    54. if trade_check(cursor, sell_id, buy_id, amount, price) is False:
    55. connection.rollback()
    56. return
    57. try:
    58. trade_update(cursor, sell_id, buy_id, amount, price)
    59. except Exception as err:
    60. connection.rollback()
    61. print(f'something went wrong: {err}')
    62. else:
    63. connection.commit()
    64. print("trade success")
    65. def simple_example() -> None:
    66. with get_connection(autocommit=True) as conn:
    67. with conn.cursor() as cur:
    68. # create a player, who has a coin and a goods.
    69. create_player(cur, ("test", 1, 1))
    70. # get this player, and print it.
    71. test_player = get_player(cur, "test")
    72. print(f'id:{test_player[0]}, coins:{test_player[1]}, goods:{test_player[2]}')
    73. # create players with bulk inserts.
    74. # insert 1919 players totally, with 114 players per batch.
    75. # each player has a random UUID
    76. player_list = random_player(1919)
    77. for idx in range(0, len(player_list), 114):
    78. bulk_create_player(cur, player_list[idx:idx + 114])
    79. # print the number of players
    80. count = get_count(cur)
    81. print(f'number of players: {count}')
    82. # print 3 players.
    83. three_players = get_players_with_limit(cur, 3)
    84. for player in three_players:
    85. print(f'id:{player[0]}, coins:{player[1]}, goods:{player[2]}')
    86. def trade_example() -> None:
    87. with get_connection(autocommit=False) as conn:
    88. with conn.cursor() as cur:
    89. # create two players
    90. # player 1: id is "1", has only 100 coins.
    91. # player 2: id is "2", has 114514 coins, and 20 goods.
    92. create_player(cur, ("1", 100, 0))
    93. create_player(cur, ("2", 114514, 20))
    94. conn.commit()
    95. # player 1 wants to buy 10 goods from player 2.
    96. # it will cost 500 coins, but player 1 cannot afford it.
    97. # so this trade will fail, and nobody will lose their coins or goods
    98. trade(conn, sell_id="2", buy_id="1", amount=10, price=500)
    99. # then player 1 has to reduce the incoming quantity to 2.
    100. # this trade will be successful
    101. trade(conn, sell_id="2", buy_id="1", amount=2, price=100)
    102. # let's take a look for player 1 and player 2 currently
    103. with conn.cursor() as cur:
    104. _, player1_coin, player1_goods = get_player(cur, "1")
    105. print(f'id:1, coins:{player1_coin}, goods:{player1_goods}')
    106. _, player2_coin, player2_goods = get_player(cur, "2")
    107. print(f'id:2, coins:{player2_coin}, goods:{player2_goods}')
    108. simple_example()
    109. trade_example()

    Driver 有着更低的封装程度,因此我们可以在程序内见到大量的 SQL。程序内查询到的 Player,与 ORM 不同,因为没有数据对象的存在,Player 将以元组 (tuple) 进行表示。

    关于 mysqlclient 的更多使用方法,你可以参考 。

    PyMySQL 为当前比较流行的开源 Python Driver 之一。此处将以 PyMySQL 1.0.2 版本进行说明。虽然 Python 的 Driver 相较其他语言,使用也极其方便。但因其不可屏蔽底层实现,需手动管控事务的特性,如果没有大量必须使用 SQL 的场景,仍然推荐使用 ORM 进行程序编写。这可以降低程序的耦合性。

    1. import uuid
    2. from typing import List
    3. import pymysql.cursors
    4. from pymysql import Connection
    5. from pymysql.cursors import DictCursor
    6. def get_connection(autocommit: bool = False) -> Connection:
    7. return pymysql.connect(host='127.0.0.1',
    8. port=4000,
    9. user='root',
    10. password='',
    11. database='test',
    12. cursorclass=DictCursor,
    13. autocommit=autocommit)
    14. def create_player(cursor: DictCursor, player: tuple) -> None:
    15. cursor.execute("INSERT INTO player (id, coins, goods) VALUES (%s, %s, %s)", player)
    16. def get_player(cursor: DictCursor, player_id: str) -> dict:
    17. cursor.execute("SELECT id, coins, goods FROM player WHERE id = %s", (player_id,))
    18. return cursor.fetchone()
    19. def get_players_with_limit(cursor: DictCursor, limit: int) -> tuple:
    20. cursor.execute("SELECT id, coins, goods FROM player LIMIT %s", (limit,))
    21. return cursor.fetchall()
    22. def random_player(amount: int) -> List[tuple]:
    23. players = []
    24. for _ in range(amount):
    25. players.append((uuid.uuid4(), 10000, 10000))
    26. return players
    27. def bulk_create_player(cursor: DictCursor, players: List[tuple]) -> None:
    28. cursor.executemany("INSERT INTO player (id, coins, goods) VALUES (%s, %s, %s)", players)
    29. def get_count(cursor: DictCursor) -> int:
    30. cursor.execute("SELECT count(*) as count FROM player")
    31. return cursor.fetchone()['count']
    32. def trade_check(cursor: DictCursor, sell_id: str, buy_id: str, amount: int, price: int) -> bool:
    33. get_player_with_lock_sql = "SELECT coins, goods FROM player WHERE id = %s FOR UPDATE"
    34. # sell player goods check
    35. cursor.execute(get_player_with_lock_sql, (sell_id,))
    36. seller = cursor.fetchone()
    37. if seller['goods'] < amount:
    38. print(f'sell player {sell_id} goods not enough')
    39. return False
    40. # buy player coins check
    41. cursor.execute(get_player_with_lock_sql, (buy_id,))
    42. buyer = cursor.fetchone()
    43. if buyer['coins'] < price:
    44. print(f'buy player {buy_id} coins not enough')
    45. return False
    46. def trade_update(cursor: DictCursor, sell_id: str, buy_id: str, amount: int, price: int) -> None:
    47. update_player_sql = "UPDATE player set goods = goods + %s, coins = coins + %s WHERE id = %s"
    48. # deduct the goods of seller, and raise his/her the coins
    49. cursor.execute(update_player_sql, (-amount, price, sell_id))
    50. # deduct the coins of buyer, and raise his/her the goods
    51. cursor.execute(update_player_sql, (amount, -price, buy_id))
    52. def trade(connection: Connection, sell_id: str, buy_id: str, amount: int, price: int) -> None:
    53. with connection.cursor() as cursor:
    54. if trade_check(cursor, sell_id, buy_id, amount, price) is False:
    55. connection.rollback()
    56. return
    57. try:
    58. trade_update(cursor, sell_id, buy_id, amount, price)
    59. except Exception as err:
    60. connection.rollback()
    61. print(f'something went wrong: {err}')
    62. else:
    63. connection.commit()
    64. print("trade success")
    65. def simple_example() -> None:
    66. with get_connection(autocommit=True) as connection:
    67. with connection.cursor() as cur:
    68. # create a player, who has a coin and a goods.
    69. create_player(cur, ("test", 1, 1))
    70. # get this player, and print it.
    71. test_player = get_player(cur, "test")
    72. print(test_player)
    73. # create players with bulk inserts.
    74. # insert 1919 players totally, with 114 players per batch.
    75. # each player has a random UUID
    76. player_list = random_player(1919)
    77. for idx in range(0, len(player_list), 114):
    78. bulk_create_player(cur, player_list[idx:idx + 114])
    79. # print the number of players
    80. count = get_count(cur)
    81. print(f'number of players: {count}')
    82. # print 3 players.
    83. three_players = get_players_with_limit(cur, 3)
    84. for player in three_players:
    85. print(player)
    86. def trade_example() -> None:
    87. with get_connection(autocommit=False) as connection:
    88. with connection.cursor() as cur:
    89. # create two players
    90. # player 1: id is "1", has only 100 coins.
    91. # player 2: id is "2", has 114514 coins, and 20 goods.
    92. create_player(cur, ("1", 100, 0))
    93. create_player(cur, ("2", 114514, 20))
    94. connection.commit()
    95. # player 1 wants to buy 10 goods from player 2.
    96. # it will cost 500 coins, but player 1 cannot afford it.
    97. # so this trade will fail, and nobody will lose their coins or goods
    98. # then player 1 has to reduce the incoming quantity to 2.
    99. # this trade will be successful
    100. # let's take a look for player 1 and player 2 currently
    101. with connection.cursor() as cur:
    102. print(get_player(cur, "1"))
    103. print(get_player(cur, "2"))
    104. simple_example()
    105. trade_example()

    关于 PyMySQL 的更多使用方法,你可以参考 。

    mysql-connector-python 为当前比较流行的开源 Python Driver 之一。此处将以 mysql-connector-python 8.0.31 版本进行说明。虽然 Python 的 Driver 相较其他语言,使用也极其方便。但因其不可屏蔽底层实现,需手动管控事务的特性,如果没有大量必须使用 SQL 的场景,仍然推荐使用 ORM 进行程序编写。这可以降低程序的耦合性。

    1. import uuid
    2. from typing import List
    3. from mysql.connector import connect, MySQLConnection
    4. from mysql.connector.cursor import MySQLCursor
    5. def get_connection(autocommit: bool = True) -> MySQLConnection:
    6. connection = connect(host='127.0.0.1',
    7. port=4000,
    8. user='root',
    9. password='',
    10. database='test')
    11. connection.autocommit = autocommit
    12. return connection
    13. def create_player(cursor: MySQLCursor, player: tuple) -> None:
    14. cursor.execute("INSERT INTO player (id, coins, goods) VALUES (%s, %s, %s)", player)
    15. def get_player(cursor: MySQLCursor, player_id: str) -> tuple:
    16. cursor.execute("SELECT id, coins, goods FROM player WHERE id = %s", (player_id,))
    17. return cursor.fetchone()
    18. def get_players_with_limit(cursor: MySQLCursor, limit: int) -> List[tuple]:
    19. cursor.execute("SELECT id, coins, goods FROM player LIMIT %s", (limit,))
    20. return cursor.fetchall()
    21. def random_player(amount: int) -> List[tuple]:
    22. players = []
    23. for _ in range(amount):
    24. players.append((str(uuid.uuid4()), 10000, 10000))
    25. return players
    26. def bulk_create_player(cursor: MySQLCursor, players: List[tuple]) -> None:
    27. cursor.executemany("INSERT INTO player (id, coins, goods) VALUES (%s, %s, %s)", players)
    28. def get_count(cursor: MySQLCursor) -> int:
    29. cursor.execute("SELECT count(*) FROM player")
    30. return cursor.fetchone()[0]
    31. def trade_check(cursor: MySQLCursor, sell_id: str, buy_id: str, amount: int, price: int) -> bool:
    32. get_player_with_lock_sql = "SELECT coins, goods FROM player WHERE id = %s FOR UPDATE"
    33. # sell player goods check
    34. cursor.execute(get_player_with_lock_sql, (sell_id,))
    35. _, sell_goods = cursor.fetchone()
    36. if sell_goods < amount:
    37. print(f'sell player {sell_id} goods not enough')
    38. return False
    39. # buy player coins check
    40. cursor.execute(get_player_with_lock_sql, (buy_id,))
    41. buy_coins, _ = cursor.fetchone()
    42. if buy_coins < price:
    43. print(f'buy player {buy_id} coins not enough')
    44. return False
    45. def trade_update(cursor: MySQLCursor, sell_id: str, buy_id: str, amount: int, price: int) -> None:
    46. update_player_sql = "UPDATE player set goods = goods + %s, coins = coins + %s WHERE id = %s"
    47. # deduct the goods of seller, and raise his/her the coins
    48. cursor.execute(update_player_sql, (-amount, price, sell_id))
    49. # deduct the coins of buyer, and raise his/her the goods
    50. cursor.execute(update_player_sql, (amount, -price, buy_id))
    51. def trade(connection: MySQLConnection, sell_id: str, buy_id: str, amount: int, price: int) -> None:
    52. with connection.cursor() as cursor:
    53. if trade_check(cursor, sell_id, buy_id, amount, price) is False:
    54. connection.rollback()
    55. return
    56. try:
    57. trade_update(cursor, sell_id, buy_id, amount, price)
    58. except Exception as err:
    59. connection.rollback()
    60. print(f'something went wrong: {err}')
    61. else:
    62. connection.commit()
    63. print("trade success")
    64. def simple_example() -> None:
    65. with get_connection(autocommit=True) as connection:
    66. with connection.cursor() as cur:
    67. # create a player, who has a coin and a goods.
    68. create_player(cur, ("test", 1, 1))
    69. # get this player, and print it.
    70. test_player = get_player(cur, "test")
    71. print(f'id:{test_player[0]}, coins:{test_player[1]}, goods:{test_player[2]}')
    72. # create players with bulk inserts.
    73. # insert 1919 players totally, with 114 players per batch.
    74. # each player has a random UUID
    75. player_list = random_player(1919)
    76. for idx in range(0, len(player_list), 114):
    77. bulk_create_player(cur, player_list[idx:idx + 114])
    78. # print the number of players
    79. count = get_count(cur)
    80. print(f'number of players: {count}')
    81. # print 3 players.
    82. three_players = get_players_with_limit(cur, 3)
    83. for player in three_players:
    84. print(f'id:{player[0]}, coins:{player[1]}, goods:{player[2]}')
    85. def trade_example() -> None:
    86. with get_connection(autocommit=False) as conn:
    87. with conn.cursor() as cur:
    88. # create two players
    89. # player 1: id is "1", has only 100 coins.
    90. # player 2: id is "2", has 114514 coins, and 20 goods.
    91. create_player(cur, ("1", 100, 0))
    92. create_player(cur, ("2", 114514, 20))
    93. conn.commit()
    94. # player 1 wants to buy 10 goods from player 2.
    95. # it will cost 500 coins, but player 1 cannot afford it.
    96. # so this trade will fail, and nobody will lose their coins or goods
    97. trade(conn, sell_id="2", buy_id="1", amount=10, price=500)
    98. # then player 1 has to reduce the incoming quantity to 2.
    99. # this trade will be successful
    100. trade(conn, sell_id="2", buy_id="1", amount=2, price=100)
    101. # let's take a look for player 1 and player 2 currently
    102. with conn.cursor() as cur:
    103. _, player1_coin, player1_goods = get_player(cur, "1")
    104. print(f'id:1, coins:{player1_coin}, goods:{player1_goods}')
    105. _, player2_coin, player2_goods = get_player(cur, "2")
    106. print(f'id:2, coins:{player2_coin}, goods:{player2_goods}')
    107. simple_example()
    108. trade_example()

    Driver 有着更低的封装程度,因此我们可以在程序内见到大量的 SQL。程序内查询到的 Player,与 ORM 不同,因为没有数据对象的存在,Player 将以 tuple 进行表示。

    关于 mysql-connector-python 的更多使用方法,你可以参考 。

    本节将逐步介绍代码的运行方法。

    Python - 图2

    小贴士

    在 Gitpod Playground 中尝试 Python 与 TiDB 的连接:现在就试试

    本示例需手动初始化表,若你使用本地集群,可直接运行:

    • MySQL CLI
    • MyCLI
    1. mysql --host 127.0.0.1 --port 4000 -u root < player_init.sql
    1. mycli --host 127.0.0.1 --port 4000 -u root --no-warn < player_init.sql

    若不使用本地集群,或未安装命令行客户端,请用喜欢的方式(如 Navicat、DBeaver 等 GUI 工具)直接登录集群,并运行 player_init.sql 文件内的 SQL 语句。

    若你使用了 TiDB Cloud Serverless Tier 集群,此处需使用系统本地的 CA 证书,并将证书路径记为 <ca_path> 以供后续指代。请参考以下系统相关的证书路径地址:

    • MacOS / Alpine
    • Debian / Ubuntu / Arch
    • RedHat / Fedora / CentOS / Mageia
    • OpenSUSE

    /etc/ssl/cert.pem

    /etc/ssl/certs/ca-certificates.crt

    /etc/pki/tls/certs/ca-bundle.crt

    /etc/ssl/ca-bundle.pem

    若设置后仍有证书错误,请查阅 TiDB Cloud Serverless Tier 安全连接文档

    • 使用 SQLAlchemy(推荐)
    • 使用 peewee(推荐)
    • 使用 mysqlclient
    • 使用 PyMySQL
    • 使用 mysql-connector-python

    若你使用 TiDB Cloud Serverless Tier 集群,更改 sqlalchemy_example.pycreate_engine 函数的入参:

    若你设定的密码为 123456,而且从 TiDB Cloud Serverless Tier 集群面板中得到的连接信息为:

    • Endpoint: xxx.tidbcloud.com
    • Port: 4000
    • User: 2aEp24QWEDLqRFs.root

    那么此处应将 create_engine 更改为:

    1. engine = create_engine('mysql://2aEp24QWEDLqRFs.root:123456@xxx.tidbcloud.com:4000/test', connect_args={
    2. "ssl_mode": "VERIFY_IDENTITY",
    3. "ssl": {
    4. "ca": "<ca_path>"
    5. }
    6. })

    若你使用 TiDB Cloud Serverless Tier 集群,更改 peewee_example.pyconnect 函数的入参:

    1. db = connect('mysql://root:@127.0.0.1:4000/test')
    • Endpoint: xxx.tidbcloud.com
    • Port: 4000
    • User: 2aEp24QWEDLqRFs.root

    那么此处应将 connect 更改为:

    • peewee 将 PyMySQL 作为 Driver 时:

      1. db = connect('mysql://2aEp24QWEDLqRFs.root:123456@xxx.tidbcloud.com:4000/test',
      2. ssl_verify_cert=True, ssl_ca="<ca_path>")
    • peewee 将 mysqlclient 作为 Driver 时:

      1. db = connect('mysql://2aEp24QWEDLqRFs.root:123456@xxx.tidbcloud.com:4000/test',
      2. ssl_mode="VERIFY_IDENTITY", ssl={"ca": "<ca_path>"})

    由于 peewee 会将参数透传至 Driver 中,使用 peewee 时请注意 Driver 的使用类型。

    若你使用 TiDB Cloud Serverless Tier 集群,更改 mysqlclient_example.pyget_connection 函数:

    1. def get_connection(autocommit: bool = True) -> MySQLdb.Connection:
    2. return MySQLdb.connect(
    3. host="127.0.0.1",
    4. port=4000,
    5. user="root",
    6. password="",
    7. database="test",
    8. autocommit=autocommit
    9. )

    若你设定的密码为 123456,而且从 TiDB Cloud Serverless Tier 集群面板中得到的连接信息为:

    • Endpoint: xxx.tidbcloud.com
    • Port: 4000
    • User: 2aEp24QWEDLqRFs.root

    那么此处应将 get_connection 更改为:

    1. def get_connection(autocommit: bool = True) -> MySQLdb.Connection:
    2. return MySQLdb.connect(
    3. host="xxx.tidbcloud.com",
    4. port=4000,
    5. user="2aEp24QWEDLqRFs.root",
    6. password="123456",
    7. database="test",
    8. autocommit=autocommit,
    9. ssl_mode="VERIFY_IDENTITY",
    10. ssl={
    11. "ca": "<ca_path>"
    12. }
    13. )

    若你使用 TiDB Cloud Serverless Tier 集群,更改 pymysql_example.pyget_connection 函数:

    1. def get_connection(autocommit: bool = False) -> Connection:
    2. return pymysql.connect(host='127.0.0.1',
    3. port=4000,
    4. user='root',
    5. password='',
    6. database='test',
    7. cursorclass=DictCursor,
    8. autocommit=autocommit)

    若你设定的密码为 123456,而且从 TiDB Cloud Serverless Tier 集群面板中得到的连接信息为:

    • Endpoint: xxx.tidbcloud.com
    • Port: 4000
    • User: 2aEp24QWEDLqRFs.root

    那么此处应将 get_connection 更改为:

    若你使用 TiDB Cloud Serverless Tier 集群,更改 mysql_connector_python_example.pyget_connection 函数:

    1. def get_connection(autocommit: bool = True) -> MySQLConnection:
    2. connection = connect(host='127.0.0.1',
    3. port=4000,
    4. user='root',
    5. password='',
    6. database='test')
    7. connection.autocommit = autocommit
    8. return connection

    若你设定的密码为 123456,而且从 TiDB Cloud Serverless Tier 集群面板中得到的连接信息为:

    • Endpoint: xxx.tidbcloud.com
    • Port: 4000
    • User: 2aEp24QWEDLqRFs.root

    那么此处应将 get_connection 更改为:

    1. def get_connection(autocommit: bool = True) -> MySQLConnection:
    2. connection = connect(
    3. host="xxx.tidbcloud.com",
    4. port=4000,
    5. user="2aEp24QWEDLqRFs.root",
    6. password="123456",
    7. database="test",
    8. autocommit=autocommit,
    9. ssl_ca='<ca_path>',
    10. ssl_verify_identity=True
    11. )
    12. connection.autocommit = autocommit
    13. return connection

    运行前请先安装依赖:

    1. pip3 install -r requirement.txt

    当以后需要多次运行脚本时,请在每次运行前先依照表初始化一节再次进行表初始化。

    • 使用 SQLAlchemy(推荐)
    • 使用 peewee(推荐)
    • 使用 mysqlclient
    • 使用 PyMySQL
    • 使用 mysql-connector-python
    1. python3 sqlalchemy_example.py
    1. python3 peewee_example.py
    1. python3 mysqlclient_example.py
    • 使用 SQLAlchemy(推荐)
    • 使用 peewee(推荐)
    • 使用 mysqlclient
    • 使用 mysql-connector-python

    SQLAlchemy 预期输出

    mysqlclient 预期输出

    mysql-connector-python 预期输出