TiDB 和 Python 的简单 CRUD 应用程序
注意
推荐使用 Python 3.10 及以上版本进行 TiDB 的应用程序的编写。
本节将介绍 TiDB 集群的启动方法。
- TiDB Cloud
- 本地集群
- Gitpod
。
你可以部署一个本地测试的 TiDB 集群或正式的 TiDB 集群。详细步骤,请参考:
基于 Git 的预配置的开发环境:现在就试试
该环境会自动克隆代码,并通过 TiUP 部署测试集群。
- 使用 SQLAlchemy(推荐)
- 使用 peewee(推荐)
- 使用 mysqlclient
- 使用 PyMySQL
- 使用 mysql-connector-python
SQLAlchemy 为当前比较流行的开源 Python ORM 之一。此处将以 SQLAlchemy 1.4.44 版本进行说明。
from typing import List
from sqlalchemy import create_engine, String, Column, Integer, select, func
from sqlalchemy.orm import declarative_base, sessionmaker
engine = create_engine('mysql://root:@127.0.0.1:4000/test')
Base = declarative_base()
Base.metadata.create_all(engine)
Session = sessionmaker(bind=engine)
class Player(Base):
__tablename__ = "player"
id = Column(String(36), primary_key=True)
coins = Column(Integer)
goods = Column(Integer)
def __repr__(self):
return f'Player(id={self.id!r}, coins={self.coins!r}, goods={self.goods!r})'
def random_player(amount: int) -> List[Player]:
players = []
for _ in range(amount):
players.append(Player(id=uuid.uuid4(), coins=10000, goods=10000))
return players
def simple_example() -> None:
with Session() as session:
# create a player, who has a coin and a goods.
session.add(Player(id="test", coins=1, goods=1))
# get this player, and print it.
get_test_stmt = select(Player).where(Player.id == "test")
for player in session.scalars(get_test_stmt):
print(player)
# create players with bulk inserts.
# insert 1919 players totally, with 114 players per batch.
# each player has a random UUID
player_list = random_player(1919)
for idx in range(0, len(player_list), 114):
session.bulk_save_objects(player_list[idx:idx + 114])
# print the number of players
count = session.query(func.count(Player.id)).scalar()
print(f'number of players: {count}')
# print 3 players.
three_players = session.query(Player).limit(3).all()
for player in three_players:
print(player)
session.commit()
def trade_check(session: Session, sell_id: str, buy_id: str, amount: int, price: int) -> bool:
# sell player goods check
sell_player = session.query(Player.goods).filter(Player.id == sell_id).with_for_update().one()
if sell_player.goods < amount:
print(f'sell player {sell_id} goods not enough')
return False
# buy player coins check
buy_player = session.query(Player.coins).filter(Player.id == buy_id).with_for_update().one()
if buy_player.coins < price:
print(f'buy player {buy_id} coins not enough')
return False
def trade(sell_id: str, buy_id: str, amount: int, price: int) -> None:
with Session() as session:
if trade_check(session, sell_id, buy_id, amount, price) is False:
return
# deduct the goods of seller, and raise his/her the coins
session.query(Player).filter(Player.id == sell_id). \
update({'goods': Player.goods - amount, 'coins': Player.coins + price})
# deduct the coins of buyer, and raise his/her the goods
session.query(Player).filter(Player.id == buy_id). \
update({'goods': Player.goods + amount, 'coins': Player.coins - price})
session.commit()
print("trade success")
def trade_example() -> None:
with Session() as session:
# create two players
# player 1: id is "1", has only 100 coins.
# player 2: id is "2", has 114514 coins, and 20 goods.
session.add(Player(id="1", coins=100, goods=0))
session.add(Player(id="2", coins=114514, goods=20))
session.commit()
# player 1 wants to buy 10 goods from player 2.
# it will cost 500 coins, but player 1 cannot afford it.
# so this trade will fail, and nobody will lose their coins or goods
trade(sell_id="2", buy_id="1", amount=10, price=500)
# then player 1 has to reduce the incoming quantity to 2.
# this trade will be successful
trade(sell_id="2", buy_id="1", amount=2, price=100)
with Session() as session:
traders = session.query(Player).filter(Player.id.in_(("1", "2"))).all()
for player in traders:
print(player)
session.commit()
simple_example()
trade_example()
相较于直接使用 Driver,SQLAlchemy 屏蔽了创建数据库连接时,不同数据库差异的细节。SQLAlchemy 还封装了大量的操作,如会话管理、基本对象的 CRUD 等,极大地简化了代码量。
Player
类为数据库表在程序内的映射。Player
的每个属性都对应着 player
表的一个字段。SQLAlchemy 使用 Player
类为了给 SQLAlchemy 提供更多的信息,使用了形如以上示例中的 id = Column(String(36), primary_key=True)
的类型定义,用来指示字段类型和其附加属性。id = Column(String(36), primary_key=True)
表示 id
字段为 String
类型,对应数据库类型为 VARCHAR
,长度为 36
,且为主键。
关于 SQLAlchemy 的更多使用方法,你可以参考 。
peewee 为当前比较流行的开源 Python ORM 之一。此处将以 peewee 3.15.4 版本进行说明。
import os
import uuid
from typing import List
from peewee import *
from playhouse.db_url import connect
db = connect('mysql://root:@127.0.0.1:4000/test')
class Player(Model):
id = CharField(max_length=36, primary_key=True)
coins = IntegerField()
goods = IntegerField()
class Meta:
database = db
table_name = "player"
def random_player(amount: int) -> List[Player]:
players = []
for _ in range(amount):
players.append(Player(id=uuid.uuid4(), coins=10000, goods=10000))
return players
def simple_example() -> None:
# create a player, who has a coin and a goods.
Player.create(id="test", coins=1, goods=1)
# get this player, and print it.
test_player = Player.select().where(Player.id == "test").get()
print(f'id:{test_player.id}, coins:{test_player.coins}, goods:{test_player.goods}')
# create players with bulk inserts.
# insert 1919 players totally, with 114 players per batch.
# each player has a random UUID
player_list = random_player(1919)
Player.bulk_create(player_list, 114)
# print the number of players
count = Player.select().count()
print(f'number of players: {count}')
# print 3 players.
three_players = Player.select().limit(3)
for player in three_players:
print(f'id:{player.id}, coins:{player.coins}, goods:{player.goods}')
def trade_check(sell_id: str, buy_id: str, amount: int, price: int) -> bool:
sell_goods = Player.select(Player.goods).where(Player.id == sell_id).get().goods
if sell_goods < amount:
print(f'sell player {sell_id} goods not enough')
return False
buy_coins = Player.select(Player.coins).where(Player.id == buy_id).get().coins
if buy_coins < price:
print(f'buy player {buy_id} coins not enough')
return False
return True
def trade(sell_id: str, buy_id: str, amount: int, price: int) -> None:
with db.atomic() as txn:
try:
if trade_check(sell_id, buy_id, amount, price) is False:
txn.rollback()
return
# deduct the goods of seller, and raise his/her the coins
Player.update(goods=Player.goods - amount, coins=Player.coins + price).where(Player.id == sell_id).execute()
# deduct the coins of buyer, and raise his/her the goods
Player.update(goods=Player.goods + amount, coins=Player.coins - price).where(Player.id == buy_id).execute()
except Exception as err:
txn.rollback()
print(f'something went wrong: {err}')
else:
txn.commit()
print("trade success")
def trade_example() -> None:
# create two players
# player 1: id is "1", has only 100 coins.
# player 2: id is "2", has 114514 coins, and 20 goods.
Player.create(id="1", coins=100, goods=0)
Player.create(id="2", coins=114514, goods=20)
# player 1 wants to buy 10 goods from player 2.
# it will cost 500 coins, but player 1 cannot afford it.
# so this trade will fail, and nobody will lose their coins or goods
trade(sell_id="2", buy_id="1", amount=10, price=500)
# then player 1 has to reduce the incoming quantity to 2.
# this trade will be successful
trade(sell_id="2", buy_id="1", amount=2, price=100)
# let's take a look for player 1 and player 2 currently
after_trade_players = Player.select().where(Player.id.in_(["1", "2"]))
for player in after_trade_players:
print(f'id:{player.id}, coins:{player.coins}, goods:{player.goods}')
db.connect()
# recreate the player table
db.drop_tables([Player])
db.create_tables([Player])
simple_example()
trade_example()
相较于直接使用 Driver,peewee 屏蔽了创建数据库连接时,不同数据库差异的细节。peewee 还封装了大量的操作,如会话管理、基本对象的 CRUD 等,极大地简化了代码量。
Player
类为数据库表在程序内的映射。Player
的每个属性都对应着 player
表的一个字段。peewee 使用 Player
类为了给 peewee 提供更多的信息,使用了形如以上示例中的 id = CharField(max_length=36, primary_key=True)
的类型定义,用来指示字段类型和其附加属性。id = CharField(max_length=36, primary_key=True)
表示 id
字段为 CharField
类型,对应数据库类型为 VARCHAR
,长度为 36
,且为主键。
关于 peewee 的更多使用方法,你可以参考 。
mysqlclient 为当前比较流行的开源 Python Driver 之一。此处将以 mysqlclient 2.1.1 版本进行说明。虽然 Python 的 Driver 相较其他语言,使用也极其方便。但因其不可屏蔽底层实现,需手动管控事务的特性,如果没有大量必须使用 SQL 的场景,仍然推荐使用 ORM 进行程序编写。这可以降低程序的耦合性。
import uuid
from typing import List
import MySQLdb
from MySQLdb import Connection
from MySQLdb.cursors import Cursor
def get_connection(autocommit: bool = True) -> MySQLdb.Connection:
return MySQLdb.connect(
host="127.0.0.1",
port=4000,
user="root",
password="",
database="test",
)
def create_player(cursor: Cursor, player: tuple) -> None:
cursor.execute("INSERT INTO player (id, coins, goods) VALUES (%s, %s, %s)", player)
def get_player(cursor: Cursor, player_id: str) -> tuple:
cursor.execute("SELECT id, coins, goods FROM player WHERE id = %s", (player_id,))
return cursor.fetchone()
def get_players_with_limit(cursor: Cursor, limit: int) -> List[tuple]:
cursor.execute("SELECT id, coins, goods FROM player LIMIT %s", (limit,))
return cursor.fetchall()
def random_player(amount: int) -> List[tuple]:
players = []
for _ in range(amount):
players.append((uuid.uuid4(), 10000, 10000))
return players
def bulk_create_player(cursor: Cursor, players: List[tuple]) -> None:
cursor.executemany("INSERT INTO player (id, coins, goods) VALUES (%s, %s, %s)", players)
def get_count(cursor: Cursor) -> None:
cursor.execute("SELECT count(*) FROM player")
return cursor.fetchone()[0]
def trade_check(cursor: Cursor, sell_id: str, buy_id: str, amount: int, price: int) -> bool:
get_player_with_lock_sql = "SELECT coins, goods FROM player WHERE id = %s FOR UPDATE"
# sell player goods check
cursor.execute(get_player_with_lock_sql, (sell_id,))
_, sell_goods = cursor.fetchone()
if sell_goods < amount:
print(f'sell player {sell_id} goods not enough')
return False
# buy player coins check
cursor.execute(get_player_with_lock_sql, (buy_id,))
buy_coins, _ = cursor.fetchone()
if buy_coins < price:
print(f'buy player {buy_id} coins not enough')
return False
def trade_update(cursor: Cursor, sell_id: str, buy_id: str, amount: int, price: int) -> None:
update_player_sql = "UPDATE player set goods = goods + %s, coins = coins + %s WHERE id = %s"
# deduct the goods of seller, and raise his/her the coins
cursor.execute(update_player_sql, (-amount, price, sell_id))
# deduct the coins of buyer, and raise his/her the goods
cursor.execute(update_player_sql, (amount, -price, buy_id))
def trade(connection: Connection, sell_id: str, buy_id: str, amount: int, price: int) -> None:
with connection.cursor() as cursor:
if trade_check(cursor, sell_id, buy_id, amount, price) is False:
connection.rollback()
return
try:
trade_update(cursor, sell_id, buy_id, amount, price)
except Exception as err:
connection.rollback()
print(f'something went wrong: {err}')
else:
connection.commit()
print("trade success")
def simple_example() -> None:
with get_connection(autocommit=True) as conn:
with conn.cursor() as cur:
# create a player, who has a coin and a goods.
create_player(cur, ("test", 1, 1))
# get this player, and print it.
test_player = get_player(cur, "test")
print(f'id:{test_player[0]}, coins:{test_player[1]}, goods:{test_player[2]}')
# create players with bulk inserts.
# insert 1919 players totally, with 114 players per batch.
# each player has a random UUID
player_list = random_player(1919)
for idx in range(0, len(player_list), 114):
bulk_create_player(cur, player_list[idx:idx + 114])
# print the number of players
count = get_count(cur)
print(f'number of players: {count}')
# print 3 players.
three_players = get_players_with_limit(cur, 3)
for player in three_players:
print(f'id:{player[0]}, coins:{player[1]}, goods:{player[2]}')
def trade_example() -> None:
with get_connection(autocommit=False) as conn:
with conn.cursor() as cur:
# create two players
# player 1: id is "1", has only 100 coins.
# player 2: id is "2", has 114514 coins, and 20 goods.
create_player(cur, ("1", 100, 0))
create_player(cur, ("2", 114514, 20))
conn.commit()
# player 1 wants to buy 10 goods from player 2.
# it will cost 500 coins, but player 1 cannot afford it.
# so this trade will fail, and nobody will lose their coins or goods
trade(conn, sell_id="2", buy_id="1", amount=10, price=500)
# then player 1 has to reduce the incoming quantity to 2.
# this trade will be successful
trade(conn, sell_id="2", buy_id="1", amount=2, price=100)
# let's take a look for player 1 and player 2 currently
with conn.cursor() as cur:
_, player1_coin, player1_goods = get_player(cur, "1")
print(f'id:1, coins:{player1_coin}, goods:{player1_goods}')
_, player2_coin, player2_goods = get_player(cur, "2")
print(f'id:2, coins:{player2_coin}, goods:{player2_goods}')
simple_example()
trade_example()
Driver 有着更低的封装程度,因此我们可以在程序内见到大量的 SQL。程序内查询到的 Player
,与 ORM 不同,因为没有数据对象的存在,Player
将以元组 (tuple) 进行表示。
关于 mysqlclient 的更多使用方法,你可以参考 。
PyMySQL 为当前比较流行的开源 Python Driver 之一。此处将以 PyMySQL 1.0.2 版本进行说明。虽然 Python 的 Driver 相较其他语言,使用也极其方便。但因其不可屏蔽底层实现,需手动管控事务的特性,如果没有大量必须使用 SQL 的场景,仍然推荐使用 ORM 进行程序编写。这可以降低程序的耦合性。
import uuid
from typing import List
import pymysql.cursors
from pymysql import Connection
from pymysql.cursors import DictCursor
def get_connection(autocommit: bool = False) -> Connection:
return pymysql.connect(host='127.0.0.1',
port=4000,
user='root',
password='',
database='test',
cursorclass=DictCursor,
autocommit=autocommit)
def create_player(cursor: DictCursor, player: tuple) -> None:
cursor.execute("INSERT INTO player (id, coins, goods) VALUES (%s, %s, %s)", player)
def get_player(cursor: DictCursor, player_id: str) -> dict:
cursor.execute("SELECT id, coins, goods FROM player WHERE id = %s", (player_id,))
return cursor.fetchone()
def get_players_with_limit(cursor: DictCursor, limit: int) -> tuple:
cursor.execute("SELECT id, coins, goods FROM player LIMIT %s", (limit,))
return cursor.fetchall()
def random_player(amount: int) -> List[tuple]:
players = []
for _ in range(amount):
players.append((uuid.uuid4(), 10000, 10000))
return players
def bulk_create_player(cursor: DictCursor, players: List[tuple]) -> None:
cursor.executemany("INSERT INTO player (id, coins, goods) VALUES (%s, %s, %s)", players)
def get_count(cursor: DictCursor) -> int:
cursor.execute("SELECT count(*) as count FROM player")
return cursor.fetchone()['count']
def trade_check(cursor: DictCursor, sell_id: str, buy_id: str, amount: int, price: int) -> bool:
get_player_with_lock_sql = "SELECT coins, goods FROM player WHERE id = %s FOR UPDATE"
# sell player goods check
cursor.execute(get_player_with_lock_sql, (sell_id,))
seller = cursor.fetchone()
if seller['goods'] < amount:
print(f'sell player {sell_id} goods not enough')
return False
# buy player coins check
cursor.execute(get_player_with_lock_sql, (buy_id,))
buyer = cursor.fetchone()
if buyer['coins'] < price:
print(f'buy player {buy_id} coins not enough')
return False
def trade_update(cursor: DictCursor, sell_id: str, buy_id: str, amount: int, price: int) -> None:
update_player_sql = "UPDATE player set goods = goods + %s, coins = coins + %s WHERE id = %s"
# deduct the goods of seller, and raise his/her the coins
cursor.execute(update_player_sql, (-amount, price, sell_id))
# deduct the coins of buyer, and raise his/her the goods
cursor.execute(update_player_sql, (amount, -price, buy_id))
def trade(connection: Connection, sell_id: str, buy_id: str, amount: int, price: int) -> None:
with connection.cursor() as cursor:
if trade_check(cursor, sell_id, buy_id, amount, price) is False:
connection.rollback()
return
try:
trade_update(cursor, sell_id, buy_id, amount, price)
except Exception as err:
connection.rollback()
print(f'something went wrong: {err}')
else:
connection.commit()
print("trade success")
def simple_example() -> None:
with get_connection(autocommit=True) as connection:
with connection.cursor() as cur:
# create a player, who has a coin and a goods.
create_player(cur, ("test", 1, 1))
# get this player, and print it.
test_player = get_player(cur, "test")
print(test_player)
# create players with bulk inserts.
# insert 1919 players totally, with 114 players per batch.
# each player has a random UUID
player_list = random_player(1919)
for idx in range(0, len(player_list), 114):
bulk_create_player(cur, player_list[idx:idx + 114])
# print the number of players
count = get_count(cur)
print(f'number of players: {count}')
# print 3 players.
three_players = get_players_with_limit(cur, 3)
for player in three_players:
print(player)
def trade_example() -> None:
with get_connection(autocommit=False) as connection:
with connection.cursor() as cur:
# create two players
# player 1: id is "1", has only 100 coins.
# player 2: id is "2", has 114514 coins, and 20 goods.
create_player(cur, ("1", 100, 0))
create_player(cur, ("2", 114514, 20))
connection.commit()
# player 1 wants to buy 10 goods from player 2.
# it will cost 500 coins, but player 1 cannot afford it.
# so this trade will fail, and nobody will lose their coins or goods
# then player 1 has to reduce the incoming quantity to 2.
# this trade will be successful
# let's take a look for player 1 and player 2 currently
with connection.cursor() as cur:
print(get_player(cur, "1"))
print(get_player(cur, "2"))
simple_example()
trade_example()
关于 PyMySQL 的更多使用方法,你可以参考 。
mysql-connector-python 为当前比较流行的开源 Python Driver 之一。此处将以 mysql-connector-python 8.0.31 版本进行说明。虽然 Python 的 Driver 相较其他语言,使用也极其方便。但因其不可屏蔽底层实现,需手动管控事务的特性,如果没有大量必须使用 SQL 的场景,仍然推荐使用 ORM 进行程序编写。这可以降低程序的耦合性。
import uuid
from typing import List
from mysql.connector import connect, MySQLConnection
from mysql.connector.cursor import MySQLCursor
def get_connection(autocommit: bool = True) -> MySQLConnection:
connection = connect(host='127.0.0.1',
port=4000,
user='root',
password='',
database='test')
connection.autocommit = autocommit
return connection
def create_player(cursor: MySQLCursor, player: tuple) -> None:
cursor.execute("INSERT INTO player (id, coins, goods) VALUES (%s, %s, %s)", player)
def get_player(cursor: MySQLCursor, player_id: str) -> tuple:
cursor.execute("SELECT id, coins, goods FROM player WHERE id = %s", (player_id,))
return cursor.fetchone()
def get_players_with_limit(cursor: MySQLCursor, limit: int) -> List[tuple]:
cursor.execute("SELECT id, coins, goods FROM player LIMIT %s", (limit,))
return cursor.fetchall()
def random_player(amount: int) -> List[tuple]:
players = []
for _ in range(amount):
players.append((str(uuid.uuid4()), 10000, 10000))
return players
def bulk_create_player(cursor: MySQLCursor, players: List[tuple]) -> None:
cursor.executemany("INSERT INTO player (id, coins, goods) VALUES (%s, %s, %s)", players)
def get_count(cursor: MySQLCursor) -> int:
cursor.execute("SELECT count(*) FROM player")
return cursor.fetchone()[0]
def trade_check(cursor: MySQLCursor, sell_id: str, buy_id: str, amount: int, price: int) -> bool:
get_player_with_lock_sql = "SELECT coins, goods FROM player WHERE id = %s FOR UPDATE"
# sell player goods check
cursor.execute(get_player_with_lock_sql, (sell_id,))
_, sell_goods = cursor.fetchone()
if sell_goods < amount:
print(f'sell player {sell_id} goods not enough')
return False
# buy player coins check
cursor.execute(get_player_with_lock_sql, (buy_id,))
buy_coins, _ = cursor.fetchone()
if buy_coins < price:
print(f'buy player {buy_id} coins not enough')
return False
def trade_update(cursor: MySQLCursor, sell_id: str, buy_id: str, amount: int, price: int) -> None:
update_player_sql = "UPDATE player set goods = goods + %s, coins = coins + %s WHERE id = %s"
# deduct the goods of seller, and raise his/her the coins
cursor.execute(update_player_sql, (-amount, price, sell_id))
# deduct the coins of buyer, and raise his/her the goods
cursor.execute(update_player_sql, (amount, -price, buy_id))
def trade(connection: MySQLConnection, sell_id: str, buy_id: str, amount: int, price: int) -> None:
with connection.cursor() as cursor:
if trade_check(cursor, sell_id, buy_id, amount, price) is False:
connection.rollback()
return
try:
trade_update(cursor, sell_id, buy_id, amount, price)
except Exception as err:
connection.rollback()
print(f'something went wrong: {err}')
else:
connection.commit()
print("trade success")
def simple_example() -> None:
with get_connection(autocommit=True) as connection:
with connection.cursor() as cur:
# create a player, who has a coin and a goods.
create_player(cur, ("test", 1, 1))
# get this player, and print it.
test_player = get_player(cur, "test")
print(f'id:{test_player[0]}, coins:{test_player[1]}, goods:{test_player[2]}')
# create players with bulk inserts.
# insert 1919 players totally, with 114 players per batch.
# each player has a random UUID
player_list = random_player(1919)
for idx in range(0, len(player_list), 114):
bulk_create_player(cur, player_list[idx:idx + 114])
# print the number of players
count = get_count(cur)
print(f'number of players: {count}')
# print 3 players.
three_players = get_players_with_limit(cur, 3)
for player in three_players:
print(f'id:{player[0]}, coins:{player[1]}, goods:{player[2]}')
def trade_example() -> None:
with get_connection(autocommit=False) as conn:
with conn.cursor() as cur:
# create two players
# player 1: id is "1", has only 100 coins.
# player 2: id is "2", has 114514 coins, and 20 goods.
create_player(cur, ("1", 100, 0))
create_player(cur, ("2", 114514, 20))
conn.commit()
# player 1 wants to buy 10 goods from player 2.
# it will cost 500 coins, but player 1 cannot afford it.
# so this trade will fail, and nobody will lose their coins or goods
trade(conn, sell_id="2", buy_id="1", amount=10, price=500)
# then player 1 has to reduce the incoming quantity to 2.
# this trade will be successful
trade(conn, sell_id="2", buy_id="1", amount=2, price=100)
# let's take a look for player 1 and player 2 currently
with conn.cursor() as cur:
_, player1_coin, player1_goods = get_player(cur, "1")
print(f'id:1, coins:{player1_coin}, goods:{player1_goods}')
_, player2_coin, player2_goods = get_player(cur, "2")
print(f'id:2, coins:{player2_coin}, goods:{player2_goods}')
simple_example()
trade_example()
Driver 有着更低的封装程度,因此我们可以在程序内见到大量的 SQL。程序内查询到的 Player
,与 ORM 不同,因为没有数据对象的存在,Player
将以 tuple 进行表示。
关于 mysql-connector-python 的更多使用方法,你可以参考 。
本节将逐步介绍代码的运行方法。
小贴士
在 Gitpod Playground 中尝试 Python 与 TiDB 的连接:现在就试试
本示例需手动初始化表,若你使用本地集群,可直接运行:
- MySQL CLI
- MyCLI
mysql --host 127.0.0.1 --port 4000 -u root < player_init.sql
mycli --host 127.0.0.1 --port 4000 -u root --no-warn < player_init.sql
若不使用本地集群,或未安装命令行客户端,请用喜欢的方式(如 Navicat、DBeaver 等 GUI 工具)直接登录集群,并运行 player_init.sql
文件内的 SQL 语句。
若你使用了 TiDB Cloud Serverless Tier 集群,此处需使用系统本地的 CA 证书,并将证书路径记为 <ca_path>
以供后续指代。请参考以下系统相关的证书路径地址:
- MacOS / Alpine
- Debian / Ubuntu / Arch
- RedHat / Fedora / CentOS / Mageia
- OpenSUSE
/etc/ssl/cert.pem
/etc/ssl/certs/ca-certificates.crt
/etc/pki/tls/certs/ca-bundle.crt
/etc/ssl/ca-bundle.pem
若设置后仍有证书错误,请查阅 TiDB Cloud Serverless Tier 安全连接文档。
- 使用 SQLAlchemy(推荐)
- 使用 peewee(推荐)
- 使用 mysqlclient
- 使用 PyMySQL
- 使用 mysql-connector-python
若你使用 TiDB Cloud Serverless Tier 集群,更改 sqlalchemy_example.py
内 create_engine
函数的入参:
若你设定的密码为 123456
,而且从 TiDB Cloud Serverless Tier 集群面板中得到的连接信息为:
- Endpoint:
xxx.tidbcloud.com
- Port:
4000
- User:
2aEp24QWEDLqRFs.root
那么此处应将 create_engine
更改为:
engine = create_engine('mysql://2aEp24QWEDLqRFs.root:123456@xxx.tidbcloud.com:4000/test', connect_args={
"ssl_mode": "VERIFY_IDENTITY",
"ssl": {
"ca": "<ca_path>"
}
})
若你使用 TiDB Cloud Serverless Tier 集群,更改 peewee_example.py
内 connect
函数的入参:
db = connect('mysql://root:@127.0.0.1:4000/test')
- Endpoint:
xxx.tidbcloud.com
- Port:
4000
- User:
2aEp24QWEDLqRFs.root
那么此处应将 connect
更改为:
peewee 将 PyMySQL 作为 Driver 时:
db = connect('mysql://2aEp24QWEDLqRFs.root:123456@xxx.tidbcloud.com:4000/test',
ssl_verify_cert=True, ssl_ca="<ca_path>")
peewee 将 mysqlclient 作为 Driver 时:
db = connect('mysql://2aEp24QWEDLqRFs.root:123456@xxx.tidbcloud.com:4000/test',
ssl_mode="VERIFY_IDENTITY", ssl={"ca": "<ca_path>"})
由于 peewee 会将参数透传至 Driver 中,使用 peewee 时请注意 Driver 的使用类型。
若你使用 TiDB Cloud Serverless Tier 集群,更改 mysqlclient_example.py
内 get_connection
函数:
def get_connection(autocommit: bool = True) -> MySQLdb.Connection:
return MySQLdb.connect(
host="127.0.0.1",
port=4000,
user="root",
password="",
database="test",
autocommit=autocommit
)
若你设定的密码为 123456
,而且从 TiDB Cloud Serverless Tier 集群面板中得到的连接信息为:
- Endpoint:
xxx.tidbcloud.com
- Port:
4000
- User:
2aEp24QWEDLqRFs.root
那么此处应将 get_connection
更改为:
def get_connection(autocommit: bool = True) -> MySQLdb.Connection:
return MySQLdb.connect(
host="xxx.tidbcloud.com",
port=4000,
user="2aEp24QWEDLqRFs.root",
password="123456",
database="test",
autocommit=autocommit,
ssl_mode="VERIFY_IDENTITY",
ssl={
"ca": "<ca_path>"
}
)
若你使用 TiDB Cloud Serverless Tier 集群,更改 pymysql_example.py
内 get_connection
函数:
def get_connection(autocommit: bool = False) -> Connection:
return pymysql.connect(host='127.0.0.1',
port=4000,
user='root',
password='',
database='test',
cursorclass=DictCursor,
autocommit=autocommit)
若你设定的密码为 123456
,而且从 TiDB Cloud Serverless Tier 集群面板中得到的连接信息为:
- Endpoint:
xxx.tidbcloud.com
- Port:
4000
- User:
2aEp24QWEDLqRFs.root
那么此处应将 get_connection
更改为:
若你使用 TiDB Cloud Serverless Tier 集群,更改 mysql_connector_python_example.py
内 get_connection
函数:
def get_connection(autocommit: bool = True) -> MySQLConnection:
connection = connect(host='127.0.0.1',
port=4000,
user='root',
password='',
database='test')
connection.autocommit = autocommit
return connection
若你设定的密码为 123456
,而且从 TiDB Cloud Serverless Tier 集群面板中得到的连接信息为:
- Endpoint:
xxx.tidbcloud.com
- Port:
4000
- User:
2aEp24QWEDLqRFs.root
那么此处应将 get_connection
更改为:
def get_connection(autocommit: bool = True) -> MySQLConnection:
connection = connect(
host="xxx.tidbcloud.com",
port=4000,
user="2aEp24QWEDLqRFs.root",
password="123456",
database="test",
autocommit=autocommit,
ssl_ca='<ca_path>',
ssl_verify_identity=True
)
connection.autocommit = autocommit
return connection
运行前请先安装依赖:
pip3 install -r requirement.txt
当以后需要多次运行脚本时,请在每次运行前先依照表初始化一节再次进行表初始化。
- 使用 SQLAlchemy(推荐)
- 使用 peewee(推荐)
- 使用 mysqlclient
- 使用 PyMySQL
- 使用 mysql-connector-python
python3 sqlalchemy_example.py
python3 peewee_example.py
python3 mysqlclient_example.py
- 使用 SQLAlchemy(推荐)
- 使用 peewee(推荐)
- 使用 mysqlclient
- 使用 mysql-connector-python