Async SQL (Relational) Databases

    The current page still doesn’t have a translation for this language.

    But you can help translating it: Contributing.

    You can also use with FastAPI to connect to databases using async and await.

    It is compatible with:

    • PostgreSQL
    • MySQL
    • SQLite

    In this example, we’ll use SQLite, because it uses a single file and Python has integrated support. So, you can copy this example and run it as is.

    Later, for your production application, you might want to use a database server like PostgreSQL.

    Tip

    You could adopt ideas from the section about SQLAlchemy ORM (SQL (Relational) Databases), like using utility functions to perform operations in the database, independent of your FastAPI code.

    This section doesn’t apply those ideas, to be equivalent to the counterpart in .

    • Import SQLAlchemy.
    • Create a metadata object.
    • Create a table notes using the metadata object.

    Tip

    Notice that all this code is pure SQLAlchemy Core.

    databases is not doing anything here yet.

    Import and set up databases

    • Import databases.
    • Create a DATABASE_URL.
    • Create a database object.
    1. from typing import List
    2. import databases
    3. import sqlalchemy
    4. from fastapi import FastAPI
    5. from pydantic import BaseModel
    6. # SQLAlchemy specific code, as with any other app
    7. DATABASE_URL = "sqlite:///./test.db"
    8. # DATABASE_URL = "postgresql://user:password@postgresserver/db"
    9. database = databases.Database(DATABASE_URL)
    10. metadata = sqlalchemy.MetaData()
    11. notes = sqlalchemy.Table(
    12. "notes",
    13. metadata,
    14. sqlalchemy.Column("id", sqlalchemy.Integer, primary_key=True),
    15. sqlalchemy.Column("text", sqlalchemy.String),
    16. sqlalchemy.Column("completed", sqlalchemy.Boolean),
    17. )
    18. engine = sqlalchemy.create_engine(
    19. DATABASE_URL, connect_args={"check_same_thread": False}
    20. )
    21. metadata.create_all(engine)
    22. class NoteIn(BaseModel):
    23. text: str
    24. completed: bool
    25. class Note(BaseModel):
    26. id: int
    27. text: str
    28. completed: bool
    29. app = FastAPI()
    30. @app.on_event("startup")
    31. async def startup():
    32. await database.connect()
    33. @app.on_event("shutdown")
    34. async def shutdown():
    35. await database.disconnect()
    36. @app.get("/notes/", response_model=List[Note])
    37. async def read_notes():
    38. query = notes.select()
    39. return await database.fetch_all(query)
    40. @app.post("/notes/", response_model=Note)
    41. async def create_note(note: NoteIn):
    42. query = notes.insert().values(text=note.text, completed=note.completed)
    43. last_record_id = await database.execute(query)
    44. return {**note.dict(), "id": last_record_id}

    If you were connecting to a different database (e.g. PostgreSQL), you would need to change the DATABASE_URL.

    Create the tables

    In this case, we are creating the tables in the same Python file, but in production, you would probably want to create them with Alembic, integrated with migrations, etc.

    Here, this section would run directly, right before starting your FastAPI application.

    • Create an engine.
    • Create all the tables from the metadata object.
    1. from typing import List
    2. import databases
    3. import sqlalchemy
    4. from fastapi import FastAPI
    5. from pydantic import BaseModel
    6. # SQLAlchemy specific code, as with any other app
    7. DATABASE_URL = "sqlite:///./test.db"
    8. # DATABASE_URL = "postgresql://user:password@postgresserver/db"
    9. metadata = sqlalchemy.MetaData()
    10. notes = sqlalchemy.Table(
    11. "notes",
    12. metadata,
    13. sqlalchemy.Column("id", sqlalchemy.Integer, primary_key=True),
    14. sqlalchemy.Column("text", sqlalchemy.String),
    15. )
    16. engine = sqlalchemy.create_engine(
    17. DATABASE_URL, connect_args={"check_same_thread": False}
    18. )
    19. metadata.create_all(engine)
    20. class NoteIn(BaseModel):
    21. text: str
    22. completed: bool
    23. class Note(BaseModel):
    24. id: int
    25. text: str
    26. completed: bool
    27. app = FastAPI()
    28. @app.on_event("startup")
    29. async def startup():
    30. await database.connect()
    31. @app.on_event("shutdown")
    32. async def shutdown():
    33. await database.disconnect()
    34. @app.get("/notes/", response_model=List[Note])
    35. async def read_notes():
    36. query = notes.select()
    37. return await database.fetch_all(query)
    38. @app.post("/notes/", response_model=Note)
    39. async def create_note(note: NoteIn):
    40. query = notes.insert().values(text=note.text, completed=note.completed)
    41. last_record_id = await database.execute(query)
    42. return {**note.dict(), "id": last_record_id}

    Create Pydantic models for:

    • Notes to be created (NoteIn).
    • Notes to be returned (Note).

    By creating these Pydantic models, the input data will be validated, serialized (converted), and annotated (documented).

    So, you will be able to see it all in the interactive API docs.

    Connect and disconnect

    • Create your FastAPI application.
    • Create event handlers to connect and disconnect from the database.
    1. from typing import List
    2. import databases
    3. import sqlalchemy
    4. from fastapi import FastAPI
    5. from pydantic import BaseModel
    6. # SQLAlchemy specific code, as with any other app
    7. DATABASE_URL = "sqlite:///./test.db"
    8. # DATABASE_URL = "postgresql://user:password@postgresserver/db"
    9. database = databases.Database(DATABASE_URL)
    10. metadata = sqlalchemy.MetaData()
    11. notes = sqlalchemy.Table(
    12. "notes",
    13. metadata,
    14. sqlalchemy.Column("id", sqlalchemy.Integer, primary_key=True),
    15. sqlalchemy.Column("text", sqlalchemy.String),
    16. sqlalchemy.Column("completed", sqlalchemy.Boolean),
    17. )
    18. engine = sqlalchemy.create_engine(
    19. DATABASE_URL, connect_args={"check_same_thread": False}
    20. )
    21. metadata.create_all(engine)
    22. class NoteIn(BaseModel):
    23. text: str
    24. completed: bool
    25. class Note(BaseModel):
    26. id: int
    27. text: str
    28. completed: bool
    29. @app.on_event("startup")
    30. async def startup():
    31. await database.connect()
    32. @app.on_event("shutdown")
    33. async def shutdown():
    34. await database.disconnect()
    35. @app.get("/notes/", response_model=List[Note])
    36. async def read_notes():
    37. return await database.fetch_all(query)
    38. @app.post("/notes/", response_model=Note)
    39. async def create_note(note: NoteIn):
    40. query = notes.insert().values(text=note.text, completed=note.completed)
    41. last_record_id = await database.execute(query)
    42. return {**note.dict(), "id": last_record_id}

    Read notes

    Create the path operation function to read notes:

    1. from typing import List
    2. import databases
    3. import sqlalchemy
    4. from fastapi import FastAPI
    5. from pydantic import BaseModel
    6. # SQLAlchemy specific code, as with any other app
    7. DATABASE_URL = "sqlite:///./test.db"
    8. # DATABASE_URL = "postgresql://user:password@postgresserver/db"
    9. database = databases.Database(DATABASE_URL)
    10. metadata = sqlalchemy.MetaData()
    11. notes = sqlalchemy.Table(
    12. "notes",
    13. metadata,
    14. sqlalchemy.Column("id", sqlalchemy.Integer, primary_key=True),
    15. sqlalchemy.Column("text", sqlalchemy.String),
    16. sqlalchemy.Column("completed", sqlalchemy.Boolean),
    17. )
    18. engine = sqlalchemy.create_engine(
    19. DATABASE_URL, connect_args={"check_same_thread": False}
    20. )
    21. metadata.create_all(engine)
    22. class NoteIn(BaseModel):
    23. text: str
    24. completed: bool
    25. class Note(BaseModel):
    26. id: int
    27. text: str
    28. completed: bool
    29. app = FastAPI()
    30. @app.on_event("startup")
    31. async def startup():
    32. await database.connect()
    33. @app.on_event("shutdown")
    34. async def shutdown():
    35. await database.disconnect()
    36. @app.get("/notes/", response_model=List[Note])
    37. async def read_notes():
    38. query = notes.select()
    39. return await database.fetch_all(query)
    40. @app.post("/notes/", response_model=Note)
    41. async def create_note(note: NoteIn):
    42. query = notes.insert().values(text=note.text, completed=note.completed)
    43. last_record_id = await database.execute(query)
    44. return {**note.dict(), "id": last_record_id}

    Note

    Notice that as we communicate with the database using await, the path operation function is declared with async.

    It uses typing.List.

    That documents (and validates, serializes, filters) the output data, as a list of Notes.

    Create the path operation function to create notes:

    Notice that as we communicate with the database using await, the path operation function is declared with async.

    About {**note.dict(), "id": last_record_id}

    note is a Pydantic Note object.

    note.dict() returns a dict with its data, something like:

    1. {
    2. "text": "Some note",
    3. "completed": False,
    4. }

    but it doesn’t have the id field.

    So we create a new dict, that contains the key-value pairs from note.dict() with:

    1. {**note.dict()}

    **note.dict() “unpacks” the key value pairs directly, so, {**note.dict()} would be, more or less, a copy of note.dict().

    And then, we extend that copy dict, adding another key-value pair: "id": last_record_id:

    So, the final result returned would be something like:

    1. {
    2. "id": 1,
    3. "text": "Some note",
    4. "completed": False,

    Check it

    You can copy this code as is, and see the docs at http://127.0.0.1:8000/docs.

    There you can see all your API documented and interact with it:

    More info

    You can read more about encode/databases at its GitHub page.