Query Builder
We’ll use the following schema:
There are two ways we can declare Table objects for working with these tables:
Person = Table('person', ('id', 'first', 'last'))
Note = Table('note', ('id', 'person_id', 'content', 'timestamp'))
# Do not declare columns, they will be accessed using magic ".c" attribute
Reminder = Table('reminder')
Typically we will want to our tables to a database. This saves us having to pass the database explicitly every time we wish to execute a query on the table:
db = SqliteDatabase('my_app.db')
Person = Person.bind(db)
Note = Note.bind(db)
Reminder = Reminder.bind(db)
Select queries
To select the first three notes and print their content, we can write:
query = Note.select().order_by(Note.timestamp).limit(3)
for note_dict in query:
print(note_dict['content'])
Note
By default, rows will be returned as dictionaries. You can use the , namedtuples() or methods to specify a different container for the row data, if you wish.
Because we didn’t specify any columns, all the columns we defined in the note’s Table constructor will be selected. This won’t work for Reminder, as we didn’t specify any columns at all.
To select all notes published in 2018 along with the name of the creator, we will use join()
. We’ll also request that rows be returned as namedtuple objects:
query = (Note
.select(Note.content, Note.timestamp, Person.first, Person.last)
.join(Person, on=(Note.person_id == Person.id))
.where(Note.timestamp >= datetime.date(2018, 1, 1))
.order_by(Note.timestamp)
.namedtuples())
for row in query:
print(row.timestamp, '-', row.content, '-', row.first, row.last)
Let’s query for the most prolific people, that is, get the people who have created the most notes. This introduces calling a SQL function (COUNT), which is accomplished using the fn
object:
name = Person.first.concat(' ').concat(Person.last)
query = (Person
.select(name.alias('name'), fn.COUNT(Note.id).alias('count'))
.join(Note, JOIN.LEFT_OUTER, on=(Note.person_id == Person.id))
.group_by(name)
.order_by(fn.COUNT(Note.id).desc()))
for row in query:
print(row['name'], row['count'])
There are a couple things to note in the above query:
- We store an expression in a variable (
name
), then use it in the query. - We call SQL functions using
fn.<function>(...)
passing arguments as if it were a normal Python function. - The method is used to specify the name used for a column or calculation.
In the join predicate for the join on the max_note subquery, we can reference columns in the subquery using the magical “.c” attribute. So, max_note.c.max_ts is translated into “the max_ts column value from the max_note subquery”.
We can also use the “.c” magic attribute to access columns on tables that do not explicitly define their columns, like we did with the Reminder table. Here’s a simple query to get all reminders for today, along with their associated note content:
today = datetime.date.today()
tomorrow = today + datetime.timedelta(days=1)
query = (Reminder
.select(Reminder.c.alarm, Note.content)
.where(Reminder.c.alarm.between(today, tomorrow))
.order_by(Reminder.c.alarm))
for row in query:
Note
The “.c” attribute will not work on tables that explicitly define their columns, to prevent confusion.
Inserting data is straightforward. We can specify data to insert() in two different ways (in both cases, the ID of the new row is returned):
# Using keyword arguments:
zaizee_id = Person.insert(first='zaizee', last='cat').execute()
# Using column: value mappings:
Note.insert({
Note.person_id: zaizee_id,
Note.content: 'meeeeowwww',
Note.timestamp: datetime.datetime.now()}).execute()
It is easy to bulk-insert data, just pass in either:
- A list of dictionaries (all must have the same keys/columns).
- A list of tuples, if the columns are specified explicitly.
Examples:
people = [
{'first': 'Bob', 'last': 'Foo'},
{'first': 'Herb', 'last': 'Bar'},
{'first': 'Nuggie', 'last': 'Bar'}]
# Inserting multiple rows returns the ID of the last-inserted row.
last_id = Person.insert(people).execute()
# We can also specify row tuples, so long as we tell Peewee which
# columns the tuple values correspond to:
people = [
('Bob', 'Foo'),
('Herb', 'Bar'),
('Nuggie', 'Bar')]
Person.insert(people, columns=[Person.first, Person.last]).execute()
Update queries
update() queries accept either keyword arguments or a dictionary mapping column to value, just like .
Examples:
# "Bob" changed his last name from "Foo" to "Baze".
nrows = (Person
.update(last='Baze')
.where((Person.first == 'Bob') &
(Person.last == 'Foo'))
.execute())
# Use dictionary mapping column to value.
nrows = (Person
.update({Person.last: 'Baze'})
.where((Person.first == 'Bob') &
(Person.last == 'Foo'))
.execute())
You can also use expressions as the value to perform an atomic update. Imagine we have a PageView table and we need to atomically increment the page-view count for some URL:
# Do an atomic update:
(PageView
.update({PageView.count: PageView.count + 1})
.where(PageView.url == some_url)
Because DELETE (and UPDATE) queries do not support joins, we can use subqueries to delete rows based on values in related tables. For example, here is how you would delete all notes by anyone whose last name is “Foo”:
# Get the id of all people whose last name is "Foo".
foo_people = Person.select(Person.id).where(Person.last == 'Foo')
# Delete all notes by any person whose ID is in the previous query.
Note.delete().where(Note.person_id.in_(foo_people)).execute()
Query Objects
One of the fundamental limitations of the abstractions provided by Peewee 2.x was the absence of a class that represented a structured query with no relation to a given model class.
An example of this might be computing aggregate values over a subquery. For example, the method, which returns the count of rows in an arbitrary query, is implemented by wrapping the query:
SELECT COUNT(1) FROM (...)
To accomplish this with Peewee, the implementation is written in this way:
def count(query):
# Select([source1, ... sourcen], [column1, ...columnn])
wrapped = Select(from_list=[query], columns=[fn.COUNT(SQL('1'))])
curs = wrapped.tuples().execute(db)
return curs[0][0] # Return first column from first row of result.
We can actually express this more concisely using the scalar() method, which is suitable for returning values from aggregate queries:
def count(query):
wrapped = Select(from_list=[query], columns=[fn.COUNT(SQL('1'))])
return wrapped.scalar(db)
The document has a more complex example, in which we write a query for a facility with the highest number of available slots booked:
The SQL we wish to express is:
SELECT facid, total FROM (
SELECT facid, SUM(slots) AS total,
rank() OVER (order by SUM(slots) DESC) AS rank
FROM bookings
GROUP BY facid
) AS ranked
WHERE rank = 1
We can express this fairly elegantly by using a plain Select for the outer query:
For another example, let’s create a recursive common table expression to calculate the first 10 fibonacci numbers:
base = Select(columns=(
Value(1).alias('n'),
Value(0).alias('fib_n'),
Value(1).alias('next_fib_n'))).cte('fibonacci', recursive=True)
n = (base.c.n + 1).alias('n')
recursive_term = Select(columns=(
n,
base.c.next_fib_n,
base.c.fib_n + base.c.next_fib_n)).from_(base).where(n < 10)
fibonacci = base.union_all(recursive_term)
query = fibonacci.select_from(fibonacci.c.n, fibonacci.c.fib_n)
results = list(query.execute(db))
# Generates the following result list:
[{'fib_n': 0, 'n': 1},
{'fib_n': 1, 'n': 2},
{'fib_n': 1, 'n': 3},
{'fib_n': 2, 'n': 4},
{'fib_n': 3, 'n': 5},
{'fib_n': 5, 'n': 6},
{'fib_n': 8, 'n': 7},
{'fib_n': 13, 'n': 8},
{'fib_n': 34, 'n': 10}]
For a description of the various classes used to describe a SQL AST, see the .