简介
Peewee 是一个简单、轻量化的 ORM,支持 sqlite, mysql, mariadb, postgresql 和 cockroachdb。
Peewee 没有自带 driver,当前使用的是 postgres 数据库,所以需要安装 psycopg2。
requirements.txt
peewee==3.17.6
psycopg2==2.9.9
数据库连接、表与 Model 的映射
连接数据库:
import peewee
db = peewee.PostgresqlDatabase('database',
host='ip',
port=5432,
user='postgres',
password='password')
- 表映射的 class 实体需要继承
peewee.Model
。 - class 中属性需要指定
Field class
,用于明确属性与数据库列的映射。 - 模型配置通过 Model 类的内部类 Meta 进行实例化(该约定借鉴于 Django),Meta 配置会传递给子类。
字段映射关系详见:
https://docs.peewee-orm.com/en/latest/peewee/models.html#fields
[!warning]
写 demo 测试的时候没过脑子,下面的定义埋了一个坑,能发现是什么吗?
如果没有发现也没有关系,涉及到的时候会提及。
[!info]
我不喜欢通过 ORM 去建表,所有的示例均为自行建表,然后编写 Model class,而非通过db.create_tables([User])
建表。
create table public."user"
(
id varchar(32) not null primary key,
name varchar(255),
age integer,
factor numeric(10, 3),
birthday date,
del_flag smallint default (0)::smallint,
create_date timestamp(6),
sort serial
);
alter table public."user"
owner to postgres;
class User(peewee.Model):
# column_name=None 映射表中的字段名,如果为None,那么采用变量名当做字段名
id = peewee.CharField(primary_key=True)
name = peewee.CharField()
age = peewee.IntegerField()
factor = peewee.DecimalField(max_digits=10, decimal_places=3)
birthday = peewee.DateField()
del_flag = peewee.IntegerField()
create_date = peewee.DateTimeField()
sort = peewee.IntegerField()
class Meta:
database = db
table_name = 'user'
# postgres 默认 schema 是 public,可以不指定
schema = 'public'
CRUD
前期工作准备好后,自然就要开始测试 CRUD 的操作了,详见文档:
https://docs.peewee-orm.com/en/latest/peewee/querying.html
insert
Model.create()
Model.insert().execute()
Model.save()
insert 的方式有很多中,比如使用:
User.create(id='test1', name='test1', age=10)
User.insert(id='test2', name='test2', age=20).execute()
User(id='test3', name='test3', age=30).save(force_insert=True)
[!note]
对于save()
来说,如果查看官方示例,是没有指定参数的,这是因为官方示例使用的是自增主键。
使用非自增主键为模型创建一个新实例时必须指定force_insert=True
http://docs.peewee-orm.com/en/latest/peewee/models.html#id4
批量新增使用 insert_many()
,接受 dict list 或者 tuple list,注意每行必须包含相同的字段:
User.insert_many([
{'id': 'test7', 'name': 'test7', 'age': 70},
{'id': 'test8', 'name': 'test8', 'age': 80},
{'id': 'test9', 'name': 'test9', 'age': 90},
{'id': 'test10', 'name': 'test10', 'age': 100},
]).execute()
虽然 insert()
也可以接受 list,但是实现上是在内部逐条插入,而非 insert_many()
那样一次提交。
transaction
事务管理详见:
https://docs.peewee-orm.com/en/latest/peewee/database.html#managing-transactions
atomic()
实现透明的事务管理,支持嵌套使用。
使用 atomic()
时,最外层的调用将封装在一个事务中,任何嵌套调用都将使用 savepoint。
with db.atomic():
User.create(id='test2', name='test2', age=10)
User.create(id='test100', name='test100', age=1000)
如果希望明确运行在事务中,可以使用 transaction()
,与 atomic()
一样,transaction()
可用作事务管理或装饰器。
with db.transaction():
User.create(id='test1', name='test1', age=10)
User.create(id='test100', name='test100', age=1000)
[!info]
如果尝试使用transaction()
管理嵌套事务,则只会使用最外层的事务。
如果在嵌套中发生异常,该事务将不会回滚,只有上升到最外层事务的异常才会触发回滚。
这可能会导致不可预测的行为,因此建议使用atomic()
。
使用事务管理时 peewee 会自动进行回滚,除此之外也可以手动回滚:
try:
User.create(id='test3', name='test3', age=10)
User.create(id='test100', name='test100', age=1000)
except Exception:
db.rollback()
delete
Model.delete_instance()
Model.delete()
在实例上调用删除:
User.get(User.id == 'test2').delete_instance()
通过条件删除:
User.delete().where(User.name.contains('test1')).execute()
User.delete().where(User.age > 30).execute()
User.delete().execute()
update
Model.save()
Model.update().where().execute()
对于有主键的实例,调用 save()
都将导致 update:
user4 = User.get(User.id == 'test4')
user4.age = 40
user4.save()
通过条件更新:
User.update(age=666).where(User.age.is_null()).execute()
User.update(factor=15.678).where(User.id == 'test3').execute()
在 peewee 中实现 postgres 的 upsert,即 insert on conflict do
:
https://docs.peewee-orm.com/en/latest/peewee/querying.html#upsert
User.insert(id='test3', name='test33', age=33).on_conflict(
conflict_target=[User.id], # Which constraint?
preserve=[User.name, User.age], # Use the value we would have inserted.
update={User.birthday: datetime.now()}
).execute()
select
Model.get()
Model.get_by_id()
Model.get_or_none()
Model.select()
User.get(User.age == 666)
User.get_by_id('test3')
User.get_or_none(User.id == 'test333')
User.select().where(User.age > 100)
查询返回的实例对象均为 Model,如果只需要遍历数据而不需要 Model 提供的 API,可以将返回转换为 dict、tuple 等:
dicts()
tuples()
namedtuples()
objects()
– 接受构造函数
for user in User.select().where(User.age > 100).dicts():
print(user)
for user in User.select().where(User.age > 100).tuples():
print(user)
where
查询操作符:
https://docs.peewee-orm.com/en/latest/peewee/query_operators.html
alias:起别名
User.select(User.name.alias('user_name')).where(User.id == 'test3')
is_null:
User.select().where(User.birthday.is_null())
is_not_null:
User.select().where(User.birthday.is_null(False))
cast:改变类型
u = User.select(User.age.cast('varchar')).where(User.age == 33)
print(type(u[0].age), u[0].age == '33')
like:
# name like '%test1%'
print([u for u in User.select().where(User.name.contains('test')).dicts()])
# name like '%test4'
print([u for u in User.select().where(User.name.startswith('test4')).dicts()])
# name like '%3'
print([u for u in User.select().where(User.name.endswith('3')).dicts()])
between:
User.select().where(User.age.between(30, 50))
in:
User.select().where(User.id.in_(['test3', 'test4']))
User.select().where(User.id << ['test3', 'test4'])
not in:
User.select().where(User.id.not_in(['test3', 'test4']))
User.select().where(~(User.id << ['test3', 'test4']))
函数调用
使用 peewee.fn
调用函数:
https://docs.peewee-orm.com/en/latest/peewee/query_operators.html#sql-functions
[!note]
通过 fn 调用的函数要大写。
fn 辅助函数将任何 SQL 函数当作方法来使用。参数可以是字段、值、子查询,甚至是嵌套函数。
实际上 peewee 中定义了 no_coerce_functions,可以使用小写,为了避免问题建议统一使用大写函数名。
CONCAT:
select_call1 = User.select(peewee.fn.CONCAT(User.id, '-', User.name))
print(select_call1)
print([u.concat for u in select_call1])
# 通过起别名优化
select_call2 = User.select(peewee.fn.CONCAT(User.id, '-', User.name).alias('name'))
print(select_call2)
print([u.name for u in select_call2])
# 通过 tuple 优化
print(list(User.select(peewee.fn.CONCAT(User.id, '-', User.name)).tuples()))
distinct:
User.select(peewee.fn.DISTINCT(User.age))
# 也可以调用 distinct()
User.select(User.age).distinct()
coalesce:
User.select(peewee.fn.coalesce(User.factor, 114.514))
returning 子句
https://docs.peewee-orm.com/en/latest/peewee/querying.html#returning-clause
PostgresqlDatabase 支持在 insert、update 和 delete 查询中使用 returning 子句。
默认情况下,执行不同查询时的返回值为:
- insert - 新记录的主键
- update - 已修改记录的数量
- delete - 删除的记录数
insert 指定返回内容:
User.insert(id='test200', name='test200', age=200).returning(User.age).execute()
update 指定返回内容:
User.update(age=66, factor=123.456).where(User.id == 'test200').returning(User.id, User.sort).execute()
delete 指定返回内容:
User.delete().where(User.age.is_null()).returning(User.id, User.sort).execute()
group by、having 及 order by
https://docs.peewee-orm.com/en/latest/peewee/querying.html#aggregating-records
tuples = (User
.select(User.age, peewee.fn.COUNT(User.id))
.group_by(User.age)
.having(User.age > 100)
.order_by(peewee.fn.COUNT(User.id).asc())
.tuples())
print(list(tuples))
分页 paginate
https://docs.peewee-orm.com/en/latest/peewee/querying.html#paginating-records
peewee 支持使用 paginate(page, paginate_by=20)
或者 offset
加上 limit
来实现分页。
两者操作等价:
paginate()
:指定 page 和 paginate_by。page 就是页码,paginate_by 就是页容量。offset
、limit
:offset = page * paginate_by
limit = paginate_by
页容量为 3,查询第 1 页:
print('paginate(1, 3)')
for user in User.select().order_by(User.sort.desc()).paginate(1, 3):
print(user.id, user.sort)
print('offset(0).limit(3) == paginate(1, 3)')
for user in User.select().order_by(User.sort.desc()).offset(0).limit(3):
print(user.id, user.sort)
页容量为 3,查询第 2 页:
print('paginate(2, 3)')
for user in User.select().order_by(User.sort.desc()).paginate(2, 3):
print(user.id, user.sort)
print('offset(1 * 3).limit(3) == paginate(2, 3)')
for user in User.select().order_by(User.sort.desc()).offset(1 * 3).limit(3):
print(user.id, user.sort)
raw SQL
https://docs.peewee-orm.com/en/latest/peewee/query_operators.html#sql-helper
https://docs.peewee-orm.com/en/latest/peewee/query_operators.html#security-and-sql-injection
原生 SQL 语句实现方式
peewee.SQL()
- 提供 SQL 片段Model.raw()
Database.execute_sql()
比如说函数 position 的调用形式 position('test' in name)
是无法通过 peewee.fn
方式来调用的,这时候就可以使用 peewee.SQL()
提供 SQL 片段:
for user in User.select(peewee.SQL("id, position('test' in name) as pos, name, age")):
print(user.id, user.pos, user.name, user.age)
# 可以混用
for user in User.select(peewee.SQL("position('test' in name) as pos"), User.name):
print(user.pos, user.name)
# 任意位置
for user in User.select().where(peewee.SQL("factor is not null") & (User.del_flag == 0)):
print(user.id, user.name, user.factor)
直接执行 SQL 查询:
for row in User.raw('select * from public.user where age = %s and del_flag = %s;', 666, 0).dicts():
print(row)
[!note]
这里就是前文提到的,没过脑子埋坑的地方。
postgres 中默认的 user 表用于存放用户,这里必须明确指定 schema,既public.user
,否则会出错。
默认情况下,peewee 将对查询进行参数化处理,因此用户传入的任何参数都将被转义。
这一规则的唯一例外情况是,用户正在编写原始 SQL 查询,或传入的 SQL 对象可能包含不受信任的数据。
为减少这种情况,应当确保任何用户定义的数据都作为查询参数传递,而不是实际 SQL 查询的一部分
# Bad! DO NOT DO THIS!
query = MyModel.raw('SELECT * FROM my_table WHERE data = %s' % (user_data,))
# Good. `user_data` will be treated as a parameter to the query.
query = MyModel.raw('SELECT * FROM my_table WHERE data = %s', user_data)
# Bad! DO NOT DO THIS!
query = MyModel.select().where(SQL('Some SQL expression %s' % user_data))
# Good. `user_data` will be treated as a parameter.
query = MyModel.select().where(SQL('Some SQL expression %s', user_data))
执行 SQL 查询并返回结果游标:
# fetchall 返回 tuple
cursor = db.execute_sql('select * from public.user where age = %s and del_flag = %s;', (33, 0,))
for row in cursor.fetchall():
print(row)
# insert
db.execute_sql('INSERT INTO public.user (id, name, age) VALUES (%s, %s, %s);', ('test', 'test', 70))
# 事务控制
with db:
db.execute_sql('INSERT INTO public.user (id, name) VALUES (%s, %s);', ('test300', 'test300'))
db.execute_sql('INSERT INTO public.user (id, name) VALUES (%s, %s);', ('test', 'test'))
连表查询
连表查询:
https://docs.peewee-orm.com/en/latest/peewee/relationships.html
外键定义:
https://docs.peewee-orm.com/en/latest/peewee/models.html#foreignkeyfield
https://docs.peewee-orm.com/en/latest/peewee/models.html#foreignkeyfield-back-references
[!note]
Peewee 遵循 Django 的惯例,通过在外键字段名后附加_id
来访问原始外键值。
DDL:
create table public.users
(
id varchar(32) not null primary key,
name varchar(255),
sort serial
);
alter table public.users
owner to postgres;
create table public.tweet
(
id varchar(32) not null primary key,
user_id varchar(32),
content text,
post_date timestamp(6),
sort serial
);
alter table public.tweet
owner to postgres;
create table public.favorite
(
id varchar(32) not null primary key,
user_id varchar(32),
tweet_id varchar(32),
sort serial
);
alter table public.favorite
owner to postgres;
Model 定义:
class BaseModel(peewee.Model):
id = peewee.CharField(primary_key=True)
sort = peewee.IntegerField()
class Meta:
database = db
class Users(BaseModel):
name = peewee.CharField()
class Meta:
table_name = 'users'
class Tweet(BaseModel):
# 默认属性是 user_id,反向引用 Users.tweets
user = peewee.ForeignKeyField(Users, backref='tweets')
content = peewee.TextField()
post_date = peewee.DateTimeField()
class Meta:
table_name = 'tweet'
class Favorite(BaseModel):
# 默认属性是 user_id,反向引用 Users.favorites
user = peewee.ForeignKeyField(Users, backref='favorites')
# 默认属性是 tweet_id,反向引用 Tweet.favorites
tweet = peewee.ForeignKeyField(Tweet, backref='favorites')
class Meta:
table_name = 'favorite'
初始化数据:
Users.delete().execute()
Tweet.delete().execute()
Favorite.delete().execute()
data = (
('user_1', ('content_user1_1', 'content_user1_2', 'content_user1_3')),
('user_2', ('content_user2_1', 'content_user2_2')),
('user_3', ()),
)
user_id = 0
tweet_id = 0
for username, tweets in data:
user_id += 1
user = Users.create(id=user_id, name=username)
for tweet in tweets:
tweet_id += 1
Tweet.create(id=tweet_id, user=user, content=tweet, post_date=datetime.now())
favorite_data = (
('user_1', ['content_user2_1']),
('user_2', ['content_user1_1', 'content_user1_2']),
('user_3', ['content_user1_1', 'content_user2_2']),
)
favorite_id = 0
for username, favorites in favorite_data:
user = Users.get(Users.name == username)
for content in favorites:
favorite_id += 1
tweet = Tweet.get(Tweet.content == content)
Favorite.create(id=favorite_id, user=user, tweet=tweet)
不需要显示的指定 on 子句,peewee 可以从模型中自动推断:
for tweet in Tweet.select().join(Users, on=(Tweet.user == Users.id)).where(Users.name == 'user_1'):
print(tweet.id, tweet.user, tweet.content)
for tweet in Tweet.select().join(Users).where(Users.name == 'user_1'):
print(tweet.id, tweet.user, tweet.content)
反向引用:
user1 = Users.get(Users.name == 'user_1')
print(user1.tweets.sql())
for tweet in user1.tweets:
print(tweet.id, tweet.user, tweet.content)
查询每个人最喜欢的 tweet 数量:
'''
select users.name, count(favorite.id)
from users
left join tweet on tweet.user_id = users.id
left join favorite on favorite.tweet_id = tweet.id
group by users.name
'''
query = (Users
.select(Users.name, peewee.fn.COUNT(Favorite.id).alias('count'))
.join(Tweet, peewee.JOIN.LEFT_OUTER) # user left join tweet.
.join(Favorite, peewee.JOIN.LEFT_OUTER) # tweet left join favorite.
.group_by(Users.name))
for user in query:
print(user.name, user.count)
查询指定用户 user_1 的每个 tweet 被喜欢的数量:
'''
select tweet.content, COUNT(favorite.id)
from tweet
join users on tweet.user_id = users.id
left join favorite on favorite.tweet_id = tweet.id
where users.name = 'user_1'
group by tweet.content;
'''
# 注意使用 switch() 明确 peewee 将 contex 设回 tweet# 否则就会使用 Favorite.user 去连接 users
query = (Tweet
.select(Tweet.content, peewee.fn.COUNT(Favorite.id).alias('count'))
.join(Users)
.switch(Tweet) # Move "join context" back to tweet.
.join(Favorite, peewee.JOIN.LEFT_OUTER)
.where(Users.name == 'user_1')
.group_by(Tweet.content))
for tweet in query:
print('%s favorite %d times' % (tweet.content, tweet.count))
如果想省略 contex 切换,可以使用 join_from()
:
query = (Tweet
.select(Tweet.content, peewee.fn.COUNT(Favorite.id).alias('count'))
.join_from(Tweet, Users) # tweet join users
.join_from(Tweet, Favorite, peewee.JOIN.LEFT_OUTER) # tweet left join favorite
.where(Users.name == 'user_1')
.group_by(Tweet.content))
for tweet in query:
print(tweet.content, tweet.count)
常规连表:
# 使用 dicts()
for row in Tweet.select(Tweet.content, Users.name).join(Users).dicts():
print(row)
# 如果不使用 dicts(),而是就返回 Model Tweet,Users.name 被封装在 Tweet.user.name 中
for tweet in Tweet.select(Tweet.content, Users.name).join(Users):
print(tweet.user.name, '->', tweet.content)
# 可以在 join() 中指定 attr 来控制 Users 实例是什么
for tweet in Tweet.select(Tweet.content, Users.name).join(Users, attr='author'):
print(tweet.author.name, '->', tweet.content)
# 如果希望所有属性都变为 Tweet 示例的属性,可以使用 objects()
for tweet in Tweet.select(Tweet.content, Users.name).join(Users).objects():
print(tweet.name, '->', tweet.content)
一个稍微复杂的例子:查询每个人喜欢的 tweet 及 tweet 的作者
'''
select owner.name, tweet.content, author.name as author
from favorite
join users as owner on (favorite.user_id = owner.id)
join tweet on (favorite.tweet_id = tweet.id)
join users as author on (tweet.user_id = author.id);
'''
# 模型别名
owner = Users.alias()
query = (Favorite
.select(Favorite, Tweet.content, Users.name, owner.name)
.join(owner)
.switch(Favorite)
.join(Tweet)
.join(Users))
for fav in query:
print(fav.user.name, 'liked', fav.tweet.content, 'by', fav.tweet.user.name)
当然不是必须使用 ForeignKeyField
才可以连表,很多时候我们可能更希望在 Model 上减少耦合,那么明确指定 on
子句即可。
DDL:
create table public.room
(
id varchar(32) not null primary key,
name varchar(255),
sort serial
);
alter table public.room
owner to postgres;
create table public.person
(
id varchar(32) not null primary key,
room_id varchar(32),
name varchar(255),
sort serial
);
alter table public.person
owner to postgres;
Model 定义:
class BaseModel(peewee.Model):
id = peewee.CharField(primary_key=True)
sort = peewee.IntegerField()
class Meta:
database = db
class Room(BaseModel):
name = peewee.CharField()
class Meta:
table_name = 'room'
class Person(BaseModel):
room_id = peewee.CharField()
name = peewee.CharField()
class Meta:
table_name = 'person'
初始化数据:
Room.delete().execute()
Person.delete().execute()
data = (
('room_1', ('user1', 'user2', 'user3')),
('room_2', ('user3', 'user4')),
('room_3', ()),
)
room_id = 0
person_id = 0
for room_name, persons in data:
room_id += 1
Room.create(id=room_id, name=room_name)
for person_name in persons:
person_id += 1
Person.create(id=person_id, room_id=room_id, name=person_name)
连表查询:
for person in Person.select().join(Room, on=(Person.room_id == Room.id)).where(Room.name == 'room_1'):
print(person.id, person.name, person.room_id)
query = (Room
.select(Room.name, peewee.fn.COUNT(Person.id).alias('count'))
.join(Person, peewee.JOIN.LEFT_OUTER, on=(Person.room_id == Room.id))
.group_by(Room.name))
for row in query:
print(row.name, row.count)
for person in Person.select(Person, Room.name).join(Room, on=(Person.room_id == Room.id)):
print(person.name, person.room_id, person.room.name)