Osmanthus

空想具現化


  • 首页
  • 归档
  • 分类
  • 标签
  • 关于
  •   

© 2024 Homurax

UV: | PV:

Theme Typography by Makito

Proudly published with Hexo

Peewee:简单、轻量化的 ORM

发布于 2024-08-14 Python  Peewee 

简介

Peewee 是一个简单、轻量化的 ORM,支持 sqlite, mysql, mariadb, postgresql 和 cockroachdb。
Peewee 没有自带 driver,当前使用的是 postgres 数据库,所以需要安装 psycopg2。

requirements.txt

peewee==3.17.6
psycopg2==2.9.9

数据库连接、表与 Model 的映射

连接数据库:

import peewee

db = peewee.PostgresqlDatabase('database',  
                               host='ip',  
                               port=5432,  
                               user='postgres',  
                               password='password')
  • 表映射的 class 实体需要继承 peewee.Model。
  • class 中属性需要指定 Field class,用于明确属性与数据库列的映射。
  • 模型配置通过 Model 类的内部类 Meta 进行实例化(该约定借鉴于 Django),Meta 配置会传递给子类。

字段映射关系详见:
https://docs.peewee-orm.com/en/latest/peewee/models.html#fields

[!warning]
写 demo 测试的时候没过脑子,下面的定义埋了一个坑,能发现是什么吗?
如果没有发现也没有关系,涉及到的时候会提及。

[!info]
我不喜欢通过 ORM 去建表,所有的示例均为自行建表,然后编写 Model class,而非通过 db.create_tables([User]) 建表。

create table public."user"
(
    id          varchar(32) not null primary key,
    name        varchar(255),
    age         integer,
    factor      numeric(10, 3),
    birthday    date,
    del_flag    smallint default (0)::smallint,
    create_date timestamp(6),
    sort        serial
);

alter table public."user"
    owner to postgres;
class User(peewee.Model):  
    # column_name=None 映射表中的字段名,如果为None,那么采用变量名当做字段名   
    id = peewee.CharField(primary_key=True)  
    name = peewee.CharField()  
    age = peewee.IntegerField()  
    factor = peewee.DecimalField(max_digits=10, decimal_places=3)  
    birthday = peewee.DateField()  
    del_flag = peewee.IntegerField()  
    create_date = peewee.DateTimeField()  
    sort = peewee.IntegerField()  

    class Meta:  
        database = db  
        table_name = 'user'
        # postgres 默认 schema 是 public,可以不指定
        schema = 'public'

CRUD

前期工作准备好后,自然就要开始测试 CRUD 的操作了,详见文档:
https://docs.peewee-orm.com/en/latest/peewee/querying.html

insert

  • Model.create()
  • Model.insert().execute()
  • Model.save()

insert 的方式有很多中,比如使用:

User.create(id='test1', name='test1', age=10)

User.insert(id='test2', name='test2', age=20).execute()

User(id='test3', name='test3', age=30).save(force_insert=True)

[!note]
对于 save() 来说,如果查看官方示例,是没有指定参数的,这是因为官方示例使用的是自增主键。
使用非自增主键为模型创建一个新实例时必须指定 force_insert=True
http://docs.peewee-orm.com/en/latest/peewee/models.html#id4

批量新增使用 insert_many(),接受 dict list 或者 tuple list,注意每行必须包含相同的字段:

User.insert_many([  
    {'id': 'test7', 'name': 'test7', 'age': 70},  
    {'id': 'test8', 'name': 'test8', 'age': 80},  
    {'id': 'test9', 'name': 'test9', 'age': 90},  
    {'id': 'test10', 'name': 'test10', 'age': 100},  
]).execute()

虽然 insert() 也可以接受 list,但是实现上是在内部逐条插入,而非 insert_many() 那样一次提交。

transaction

事务管理详见:
https://docs.peewee-orm.com/en/latest/peewee/database.html#managing-transactions

atomic() 实现透明的事务管理,支持嵌套使用。
使用 atomic() 时,最外层的调用将封装在一个事务中,任何嵌套调用都将使用 savepoint。

with db.atomic():  
    User.create(id='test2', name='test2', age=10)  
    User.create(id='test100', name='test100', age=1000)

如果希望明确运行在事务中,可以使用 transaction(),与 atomic() 一样,transaction() 可用作事务管理或装饰器。

with db.transaction():  
    User.create(id='test1', name='test1', age=10)  
    User.create(id='test100', name='test100', age=1000)

[!info]
如果尝试使用 transaction() 管理嵌套事务,则只会使用最外层的事务。
如果在嵌套中发生异常,该事务将不会回滚,只有上升到最外层事务的异常才会触发回滚。
这可能会导致不可预测的行为,因此建议使用 atomic()。

使用事务管理时 peewee 会自动进行回滚,除此之外也可以手动回滚:

try:  
    User.create(id='test3', name='test3', age=10)  
    User.create(id='test100', name='test100', age=1000)  
except Exception:  
    db.rollback()

delete

  • Model.delete_instance()
  • Model.delete()

在实例上调用删除:

User.get(User.id == 'test2').delete_instance()

通过条件删除:

User.delete().where(User.name.contains('test1')).execute()

User.delete().where(User.age > 30).execute()

User.delete().execute()

update

  • Model.save()
  • Model.update().where().execute()

对于有主键的实例,调用 save() 都将导致 update:

user4 = User.get(User.id == 'test4')  
user4.age = 40  
user4.save()

通过条件更新:

User.update(age=666).where(User.age.is_null()).execute()

User.update(factor=15.678).where(User.id == 'test3').execute()

在 peewee 中实现 postgres 的 upsert,即 insert on conflict do:
https://docs.peewee-orm.com/en/latest/peewee/querying.html#upsert

User.insert(id='test3', name='test33', age=33).on_conflict(  
    conflict_target=[User.id],  # Which constraint?
    preserve=[User.name, User.age],  # Use the value we would have inserted.
    update={User.birthday: datetime.now()}  
).execute()

select

  • Model.get()
  • Model.get_by_id()
  • Model.get_or_none()
  • Model.select()
User.get(User.age == 666)

User.get_by_id('test3')

User.get_or_none(User.id == 'test333')

User.select().where(User.age > 100)

查询返回的实例对象均为 Model,如果只需要遍历数据而不需要 Model 提供的 API,可以将返回转换为 dict、tuple 等:

  • dicts()
  • tuples()
  • namedtuples()
  • objects() – 接受构造函数
for user in User.select().where(User.age > 100).dicts():  
    print(user)

for user in User.select().where(User.age > 100).tuples():  
    print(user)

where

查询操作符:
https://docs.peewee-orm.com/en/latest/peewee/query_operators.html

alias:起别名

User.select(User.name.alias('user_name')).where(User.id == 'test3')

is_null:

User.select().where(User.birthday.is_null())

is_not_null:

User.select().where(User.birthday.is_null(False))

cast:改变类型

u = User.select(User.age.cast('varchar')).where(User.age == 33)  
print(type(u[0].age), u[0].age == '33')

like:

# name like '%test1%'  
print([u for u in User.select().where(User.name.contains('test')).dicts()])

# name like '%test4'  
print([u for u in User.select().where(User.name.startswith('test4')).dicts()])  

# name like '%3'  
print([u for u in User.select().where(User.name.endswith('3')).dicts()])

between:

User.select().where(User.age.between(30, 50))

in:

User.select().where(User.id.in_(['test3', 'test4']))

User.select().where(User.id << ['test3', 'test4'])

not in:

User.select().where(User.id.not_in(['test3', 'test4']))

User.select().where(~(User.id << ['test3', 'test4']))

函数调用

使用 peewee.fn 调用函数:
https://docs.peewee-orm.com/en/latest/peewee/query_operators.html#sql-functions

[!note]
通过 fn 调用的函数要大写。
fn 辅助函数将任何 SQL 函数当作方法来使用。参数可以是字段、值、子查询,甚至是嵌套函数。
实际上 peewee 中定义了 no_coerce_functions,可以使用小写,为了避免问题建议统一使用大写函数名。

CONCAT:

select_call1 = User.select(peewee.fn.CONCAT(User.id, '-', User.name))  
print(select_call1)  
print([u.concat for u in select_call1])

# 通过起别名优化  
select_call2 = User.select(peewee.fn.CONCAT(User.id, '-', User.name).alias('name'))  
print(select_call2)  
print([u.name for u in select_call2])  

# 通过 tuple 优化  
print(list(User.select(peewee.fn.CONCAT(User.id, '-', User.name)).tuples()))

distinct:

User.select(peewee.fn.DISTINCT(User.age))

# 也可以调用 distinct()
User.select(User.age).distinct()

coalesce:

User.select(peewee.fn.coalesce(User.factor, 114.514))

returning 子句

https://docs.peewee-orm.com/en/latest/peewee/querying.html#returning-clause

PostgresqlDatabase 支持在 insert、update 和 delete 查询中使用 returning 子句。
默认情况下,执行不同查询时的返回值为:

  • insert - 新记录的主键
  • update - 已修改记录的数量
  • delete - 删除的记录数

insert 指定返回内容:

User.insert(id='test200', name='test200', age=200).returning(User.age).execute()

update 指定返回内容:

User.update(age=66, factor=123.456).where(User.id == 'test200').returning(User.id, User.sort).execute()

delete 指定返回内容:

User.delete().where(User.age.is_null()).returning(User.id, User.sort).execute()

group by、having 及 order by

https://docs.peewee-orm.com/en/latest/peewee/querying.html#aggregating-records

tuples = (User  
          .select(User.age, peewee.fn.COUNT(User.id))  
          .group_by(User.age)  
          .having(User.age > 100)  
          .order_by(peewee.fn.COUNT(User.id).asc())  
          .tuples())  
print(list(tuples))

分页 paginate

https://docs.peewee-orm.com/en/latest/peewee/querying.html#paginating-records

peewee 支持使用 paginate(page, paginate_by=20) 或者 offset 加上 limit 来实现分页。

两者操作等价:

  • paginate():指定 page 和 paginate_by。page 就是页码,paginate_by 就是页容量。
  • offset、limit:
    • offset = page * paginate_by
    • limit = paginate_by

页容量为 3,查询第 1 页:

print('paginate(1, 3)')
for user in User.select().order_by(User.sort.desc()).paginate(1, 3):  
    print(user.id, user.sort)

print('offset(0).limit(3) == paginate(1, 3)')  
for user in User.select().order_by(User.sort.desc()).offset(0).limit(3):  
    print(user.id, user.sort)

页容量为 3,查询第 2 页:

print('paginate(2, 3)')  
for user in User.select().order_by(User.sort.desc()).paginate(2, 3):  
    print(user.id, user.sort)

print('offset(1 * 3).limit(3) == paginate(2, 3)')  
for user in User.select().order_by(User.sort.desc()).offset(1 * 3).limit(3):  
    print(user.id, user.sort)

raw SQL

https://docs.peewee-orm.com/en/latest/peewee/query_operators.html#sql-helper
https://docs.peewee-orm.com/en/latest/peewee/query_operators.html#security-and-sql-injection

原生 SQL 语句实现方式

  • peewee.SQL() - 提供 SQL 片段
  • Model.raw()
  • Database.execute_sql()

比如说函数 position 的调用形式 position('test' in name) 是无法通过 peewee.fn 方式来调用的,这时候就可以使用 peewee.SQL() 提供 SQL 片段:

for user in User.select(peewee.SQL("id, position('test' in name) as pos, name, age")):  
    print(user.id, user.pos, user.name, user.age)

# 可以混用
for user in User.select(peewee.SQL("position('test' in name) as pos"), User.name):  
    print(user.pos, user.name)

# 任意位置
for user in User.select().where(peewee.SQL("factor is not null") & (User.del_flag == 0)):  
    print(user.id, user.name, user.factor)

直接执行 SQL 查询:

for row in User.raw('select * from public.user where age = %s and del_flag = %s;', 666, 0).dicts():  
    print(row)

[!note]
这里就是前文提到的,没过脑子埋坑的地方。
postgres 中默认的 user 表用于存放用户,这里必须明确指定 schema,既 public.user,否则会出错。

默认情况下,peewee 将对查询进行参数化处理,因此用户传入的任何参数都将被转义。
这一规则的唯一例外情况是,用户正在编写原始 SQL 查询,或传入的 SQL 对象可能包含不受信任的数据。
为减少这种情况,应当确保任何用户定义的数据都作为查询参数传递,而不是实际 SQL 查询的一部分

# Bad! DO NOT DO THIS!
query = MyModel.raw('SELECT * FROM my_table WHERE data = %s' % (user_data,))

# Good. `user_data` will be treated as a parameter to the query.
query = MyModel.raw('SELECT * FROM my_table WHERE data = %s', user_data)

# Bad! DO NOT DO THIS!
query = MyModel.select().where(SQL('Some SQL expression %s' % user_data))

# Good. `user_data` will be treated as a parameter.
query = MyModel.select().where(SQL('Some SQL expression %s', user_data))

执行 SQL 查询并返回结果游标:

# fetchall 返回 tuple
cursor = db.execute_sql('select * from public.user where age = %s and del_flag = %s;', (33, 0,))  
for row in cursor.fetchall():  
    print(row)  

# insert
db.execute_sql('INSERT INTO public.user (id, name, age) VALUES (%s, %s, %s);', ('test', 'test', 70))  

# 事务控制  
with db:  
    db.execute_sql('INSERT INTO public.user (id, name) VALUES (%s, %s);', ('test300', 'test300'))  
    db.execute_sql('INSERT INTO public.user (id, name) VALUES (%s, %s);', ('test', 'test'))

连表查询

连表查询:
https://docs.peewee-orm.com/en/latest/peewee/relationships.html

外键定义:
https://docs.peewee-orm.com/en/latest/peewee/models.html#foreignkeyfield
https://docs.peewee-orm.com/en/latest/peewee/models.html#foreignkeyfield-back-references

[!note]
Peewee 遵循 Django 的惯例,通过在外键字段名后附加 _id 来访问原始外键值。

DDL:

create table public.users
(
    id   varchar(32) not null primary key,
    name varchar(255),
    sort serial
);

alter table public.users
    owner to postgres;


create table public.tweet
(
    id        varchar(32) not null primary key,
    user_id   varchar(32),
    content   text,
    post_date timestamp(6),
    sort      serial
);

alter table public.tweet
    owner to postgres;


create table public.favorite
(
    id       varchar(32) not null primary key,
    user_id  varchar(32),
    tweet_id varchar(32),
    sort     serial
);

alter table public.favorite
    owner to postgres;

Model 定义:

class BaseModel(peewee.Model):  
    id = peewee.CharField(primary_key=True)  
    sort = peewee.IntegerField()  

    class Meta:  
        database = db  


class Users(BaseModel):  
    name = peewee.CharField()  

    class Meta:  
        table_name = 'users'  


class Tweet(BaseModel):  
    # 默认属性是 user_id,反向引用 Users.tweets
    user = peewee.ForeignKeyField(Users, backref='tweets')  
    content = peewee.TextField()  
    post_date = peewee.DateTimeField()  

    class Meta:  
        table_name = 'tweet'  


class Favorite(BaseModel):  
    # 默认属性是 user_id,反向引用 Users.favorites
    user = peewee.ForeignKeyField(Users, backref='favorites')  
    # 默认属性是 tweet_id,反向引用 Tweet.favorites
    tweet = peewee.ForeignKeyField(Tweet, backref='favorites')  

    class Meta:  
        table_name = 'favorite'

初始化数据:

Users.delete().execute()  
Tweet.delete().execute()  
Favorite.delete().execute()  

data = (  
    ('user_1', ('content_user1_1', 'content_user1_2', 'content_user1_3')),  
    ('user_2', ('content_user2_1', 'content_user2_2')),  
    ('user_3', ()),  
)  
user_id = 0  
tweet_id = 0  
for username, tweets in data:  
    user_id += 1  
    user = Users.create(id=user_id, name=username)  
    for tweet in tweets:  
        tweet_id += 1  
        Tweet.create(id=tweet_id, user=user, content=tweet, post_date=datetime.now())  

favorite_data = (  
    ('user_1', ['content_user2_1']),  
    ('user_2', ['content_user1_1', 'content_user1_2']),  
    ('user_3', ['content_user1_1', 'content_user2_2']),  
)  
favorite_id = 0  
for username, favorites in favorite_data:  
    user = Users.get(Users.name == username)  
    for content in favorites:  
        favorite_id += 1  
        tweet = Tweet.get(Tweet.content == content)  
        Favorite.create(id=favorite_id, user=user, tweet=tweet)

不需要显示的指定 on 子句,peewee 可以从模型中自动推断:

for tweet in Tweet.select().join(Users, on=(Tweet.user == Users.id)).where(Users.name == 'user_1'):  
    print(tweet.id, tweet.user, tweet.content)  

for tweet in Tweet.select().join(Users).where(Users.name == 'user_1'):  
    print(tweet.id, tweet.user, tweet.content)

反向引用:

user1 = Users.get(Users.name == 'user_1')  
print(user1.tweets.sql())  

for tweet in user1.tweets:  
    print(tweet.id, tweet.user, tweet.content)

查询每个人最喜欢的 tweet 数量:

'''  
select users.name, count(favorite.id)  
from users  
left join tweet on tweet.user_id = users.id  
left join favorite on favorite.tweet_id = tweet.id  
group by users.name  
'''  
query = (Users  
         .select(Users.name, peewee.fn.COUNT(Favorite.id).alias('count'))  
         .join(Tweet, peewee.JOIN.LEFT_OUTER)  # user left join tweet.  
         .join(Favorite, peewee.JOIN.LEFT_OUTER)  # tweet left join favorite.  
         .group_by(Users.name))  

for user in query:  
    print(user.name, user.count)

查询指定用户 user_1 的每个 tweet 被喜欢的数量:

'''  
select tweet.content, COUNT(favorite.id)  
from tweet  
join users on tweet.user_id = users.id  
left join favorite on favorite.tweet_id = tweet.id  
where users.name = 'user_1'  
group by tweet.content;  
'''  
# 注意使用 switch() 明确 peewee 将 contex 设回 tweet# 否则就会使用 Favorite.user 去连接 users
query = (Tweet  
         .select(Tweet.content, peewee.fn.COUNT(Favorite.id).alias('count'))  
         .join(Users)  
         .switch(Tweet)  # Move "join context" back to tweet.  
         .join(Favorite, peewee.JOIN.LEFT_OUTER)  
         .where(Users.name == 'user_1')  
         .group_by(Tweet.content))  

for tweet in query:  
    print('%s favorite %d times' % (tweet.content, tweet.count))

如果想省略 contex 切换,可以使用 join_from():

query = (Tweet  
         .select(Tweet.content, peewee.fn.COUNT(Favorite.id).alias('count'))  
         .join_from(Tweet, Users)  # tweet join users  
         .join_from(Tweet, Favorite, peewee.JOIN.LEFT_OUTER)  # tweet left join favorite  
         .where(Users.name == 'user_1')  
         .group_by(Tweet.content))  

for tweet in query:  
    print(tweet.content, tweet.count)

常规连表:

# 使用 dicts()
for row in Tweet.select(Tweet.content, Users.name).join(Users).dicts():  
    print(row)

# 如果不使用 dicts(),而是就返回 Model Tweet,Users.name 被封装在 Tweet.user.name 中
for tweet in Tweet.select(Tweet.content, Users.name).join(Users):  
    print(tweet.user.name, '->', tweet.content)

# 可以在 join() 中指定 attr 来控制 Users 实例是什么
for tweet in Tweet.select(Tweet.content, Users.name).join(Users, attr='author'):  
    print(tweet.author.name, '->', tweet.content)  

# 如果希望所有属性都变为 Tweet 示例的属性,可以使用 objects()
for tweet in Tweet.select(Tweet.content, Users.name).join(Users).objects():  
    print(tweet.name, '->', tweet.content)

一个稍微复杂的例子:查询每个人喜欢的 tweet 及 tweet 的作者

'''  
select owner.name, tweet.content, author.name as author  
from favorite  
join users as owner on (favorite.user_id = owner.id)  
join tweet on (favorite.tweet_id = tweet.id)  
join users as author on (tweet.user_id = author.id);  
'''  
# 模型别名  
owner = Users.alias()  

query = (Favorite  
         .select(Favorite, Tweet.content, Users.name, owner.name)  
         .join(owner)  
         .switch(Favorite)  
         .join(Tweet)  
         .join(Users))  

for fav in query:  
    print(fav.user.name, 'liked', fav.tweet.content, 'by', fav.tweet.user.name)

当然不是必须使用 ForeignKeyField 才可以连表,很多时候我们可能更希望在 Model 上减少耦合,那么明确指定 on 子句即可。

DDL:

create table public.room
(
    id   varchar(32) not null primary key,
    name varchar(255),
    sort serial
);

alter table public.room
    owner to postgres;


create table public.person
(
    id      varchar(32) not null primary key,
    room_id varchar(32),
    name    varchar(255),
    sort    serial
);

alter table public.person
    owner to postgres;

Model 定义:

class BaseModel(peewee.Model):  
    id = peewee.CharField(primary_key=True)  
    sort = peewee.IntegerField()  

    class Meta:  
        database = db


class Room(BaseModel):  
    name = peewee.CharField()  

    class Meta:  
        table_name = 'room'  


class Person(BaseModel):  
    room_id = peewee.CharField()  
    name = peewee.CharField()  

    class Meta:  
        table_name = 'person'

初始化数据:

Room.delete().execute()  
Person.delete().execute()  
data = (  
    ('room_1', ('user1', 'user2', 'user3')),  
    ('room_2', ('user3', 'user4')),  
    ('room_3', ()),  
)  
room_id = 0  
person_id = 0  
for room_name, persons in data:  
    room_id += 1  
    Room.create(id=room_id, name=room_name)  
    for person_name in persons:  
        person_id += 1  
        Person.create(id=person_id, room_id=room_id, name=person_name)

连表查询:

for person in Person.select().join(Room, on=(Person.room_id == Room.id)).where(Room.name == 'room_1'):  
    print(person.id, person.name, person.room_id)  

query = (Room  
         .select(Room.name, peewee.fn.COUNT(Person.id).alias('count'))  
         .join(Person, peewee.JOIN.LEFT_OUTER, on=(Person.room_id == Room.id))  
         .group_by(Room.name))  
for row in query:  
    print(row.name, row.count)  

for person in Person.select(Person, Room.name).join(Room, on=(Person.room_id == Room.id)):  
    print(person.name, person.room_id, person.room.name)

 上一篇: Docker 安装(rpm 仓库、rpm 软件包、二进制文件、银河麒麟环境) 下一篇: Java 21 新特性(18 ~ 21) 

© 2024 Homurax

UV: | PV:

Theme Typography by Makito

Proudly published with Hexo