数据库

Python SQLAlchemy 高级教程——使用SQLAlc

2020-05-20  本文已影响0人  诸老师

建表

连接一个数据库

>>> from sqlalchemy import create_engine
>>> engine = create_engine('sqlite:///:memory:', echo=True)
这个 Engine 是任何SQLAlchemy应用程序的起点。

Dialect 定义特定数据库的DBAPI行为,能为不同的数据库转换SQL语法。
Pool 是连接池,当生成的session实例操作数据库(或者多个实例并发签出)的时候把session放进这个池统一管理,如果session实例绑定了bind=engine,则该session实例在使用时默认放到由这个engine管理的连接池中。

把表映射到自己的类

声明ORM基类

>>> from sqlalchemy.ext.declarative import declarative_base
>>> Base = declarative_base()#这是一个关系映射基类

User 将是我们映射此表的类。在类中,我们定义了要映射到的表的详细信息,主要是表名以及列的名称和数据类型:

>>> from sqlalchemy import Column, Integer, String
>>> class User(Base):
...     __tablename__ = 'users'
...
...     id = Column(Integer, primary_key=True)
...     name = Column(String)
...     fullname = Column(String)
...     nickname = Column(String)
...
...     def __repr__(self):
...        return "<User(name='%s', fullname='%s', nickname='%s')>" % (
...                             self.name, self.fullname, self.nickname)

查看刚才创建的表结构

>>> User.__table__ 
Table('users', MetaData(bind=None),
            Column('id', Integer(), table=<users>, primary_key=True, nullable=False),
            Column('name', String(), table=<users>),
            Column('fullname', String(), table=<users>),
            Column('nickname', String(), table=<users>), schema=None)

这个表结构被登记在Base.metadata.tables中,如果有多个子类继承了这个Base,那么这个Base.metadata就记录了所有刚刚定义的子类(表结构)。

在数据库中创建表

现在用 MetaData 向数据库发出create table语句:

>>> Base.metadata.create_all(engine)

如果Base.metadata存在多个子类,则会全部创建。

向表中插入数据

每一个映射类对应一张表,我们要插入的每一条记录都是映射类的实例,要插入数据,就是向这张表中添加映射类的实例。

创建会话

会话(session)是一个与数据库对话的控制器,能向Engine申请连接池(Pool)资源。

session常用操作:

session.query():查询,从数据库中定位数据
session.flush():预提交,提交到数据库文件,还未写入数据库文件中
session.commit():提交了一个事务,把缓存的数据直接写入数据库
session.rollback():回滚
session.close():关闭

关于报错 “Can’t reconnect until invalid transaction is rolled back”

session事务失败,占用连接池资源,无法提交新的事务。需要rollback(),或者close()来打破这种状态(也可以通过设置engine的autocommit为True来取消事务提交,比较野蛮,不推荐)。

创建会话的方式一(为每个Session工厂分配engine)

>>> from sqlalchemy.orm import sessionmaker
>>> Session = sessionmaker(bind=engine)# 生成Session工厂,与数据库连接绑定
>>> session = Session()

创建会话的方式二(为每个session分配engine)

>>> Session = sessionmaker()
>>> session = Session(bind=engine)# 生成session实例,与数据库连接绑定

创建会话的方式三(延迟绑定)

>>> Session = sessionmaker()
>>> Session.configure(bind=engine)
>>> session = Session()

创建会话的方式四(不绑定)

>>> Session = sessionmaker()
>>> session = Session()

注意:如果是Session = sessionmaker(bind=engine),由这个Session实例化的session只能操作engine能访问的视图或表,关联Engine是为了确保这个session可以使用该engine的连接资源。
如果不用Session.configure(bind=engine)绑定engine,则这个会话实例可以操作任意的engine,由于我们在下面的程序中反射表的时候用了绑定到engine的metadata,使用上面这个没有绑定engine的session查询的时候会找到表对应的metadata绑定的engine来进行,但是这个session无法操作metadata没有绑定到engine的表。

创建会话的方式五(线程安全)

>>> from sqlalchemy.orm import scoped_session
>>> from sqlalchemy.orm import sessionmaker
>>> session = scoped_session(sessionmaker(bind=engine))

更多关于session的问题,请看我的另一篇文章:https://www.jianshu.com/p/2524784c89c6

添加和更新对象

向之前创建的表中添加数据:

>>> ed_user = User(name='ed', fullname='Ed Jones', nickname='edsnickname')
>>> session.add(ed_user)

此时,这个session处于Pending状态,还没有发出SQL,对象还没有在数据库中表示为一行。这个session将使用一个刷新的方法,在需要时立即发出SQL来持久化数据。
下面,我们创建一个新的query对象,查看User表中的名为‘ed’的对象:

>>> our_user = session.query(User).filter_by(name='ed').first() 
>>> our_user
<User(name='ed', fullname='Ed Jones', nickname='edsnickname')>

这时候会话已经识别出返回的行与在其内部对象映射中表示的行相同,所以我们实际上得到了与我们刚刚添加的相同的实例。

>>> ed_user is our_user
True

这里使用的ORM概念称为标识映射,它确保会话中对特定行进行的所有操作都对同一组数据进行操作。一旦会话中出现具有特定主键的对象,该会话上的所有SQL查询将始终返回该特定主键的相同Python对象;如果试图在会话中放置具有相同主键的第二个已经持久化的对象,也会引发错误。
我们可以使用add_all()一次性添加更多的User对象:

>>> session.add_all([
...     User(name='wendy', fullname='Wendy Williams', nickname='windy'),
...     User(name='mary', fullname='Mary Contrary', nickname='mary'),
...     User(name='fred', fullname='Fred Flintstone', nickname='freddy')])

更改一个人的昵称:

>>> ed_user.nickname = 'eddie'

这个更改出现在session.dirty属性中:

>>> session.dirty
IdentitySet([<User(name='ed', fullname='Ed Jones', nickname='eddie')>])

三个新的用户对象正在等待处理的挂起(pending)态:

>>> session.new  
IdentitySet([<User(name='wendy', fullname='Wendy Williams', nickname='windy')>,
<User(name='mary', fullname='Mary Contrary', nickname='mary')>,
<User(name='fred', fullname='Fred Flintstone', nickname='freddy')>])

我们告诉session,我们希望向数据库发出所有剩余的更改,并提交正在进行中的事务。我们通过session.commit()来做这件事。会话发出针对“ed”上的昵称更改的UPDATE语句,以及我们添加的三个新用户对象的INSERT语句:

>>>  session.commit()

commit()将剩余的更改刷新到数据库,并提交事务。会话引用的连接资源现在返回到连接池。此会话的后续操作将在一个新事务中发生,该事务将在首次需要时再次重新获取连接资源。
如果我们看看Ed的id属性,之前它是None,现在它有一个值:

>>>  ed_user.id
1

User的对象在这个写入数据库的过程中经历了以下三种状态
transient, pending, persistent
一个对象在一个会话中可以拥有的状态如下:

>>> from sqlalchemy import inspect
>>> insp = inspect(ed_user)
>>> insp.persistent
True

回滚

先乱改一波数据

>>> ed_user.name = 'Edwardo'
>>> fake_user = User(name='fakeuser', fullname='Invalid', nickname='12345')
>>> session.add(fake_user)

查询会话时,可以看到它们被刷新到当前事务中:

>>> session.query(User).filter(User.name.in_(['Edwardo', 'fakeuser'])).all()
[<User(name='Edwardo', fullname='Ed Jones', nickname='eddie')>, <User(name='fakeuser', fullname='Invalid', nickname='12345')>]

回滚一下,我们可以看到 ed_user 的name变回 ed ,fake_user 已不在会话中:

>>> session.rollback()
>>> ed_user.name
u'ed'
>>> fake_user in session
False

再次查询发现已经恢复

>>> session.query(User).filter(User.name.in_(['ed', 'fakeuser'])).all()
[<User(name='ed', fullname='Ed Jones', nickname='eddie')>]

查询

>>> for instance in session.query(User).order_by(User.id):
...     print(instance.name, instance.fullname)
ed Ed Jones
wendy Wendy Williams
mary Mary Contrary
fred Fred Flintstone

>>> for name, fullname in session.query(User.name, User.fullname):
...     print(name, fullname)
ed Ed Jones
wendy Wendy Williams
mary Mary Contrary
fred Fred Flintstone

>>> for row in session.query(User, User.name).all():
...    print(row.User, row.name)
<User(name='ed', fullname='Ed Jones', nickname='eddie')> ed
<User(name='wendy', fullname='Wendy Williams', nickname='windy')> wendy
<User(name='mary', fullname='Mary Contrary', nickname='mary')> mary
<User(name='fred', fullname='Fred Flintstone', nickname='freddy')> fred

>>> for row in session.query(User.name.label('name_label')).all():
...    print(row.name_label)
ed
wendy
mary
fred

>>> from sqlalchemy.orm import aliased
>>> user_alias = aliased(User, name='user_alias')
>>> for row in session.query(user_alias, user_alias.name).all():
...    print(row.user_alias)
<User(name='ed', fullname='Ed Jones', nickname='eddie')>
<User(name='wendy', fullname='Wendy Williams', nickname='windy')>
<User(name='mary', fullname='Mary Contrary', nickname='mary')>
<User(name='fred', fullname='Fred Flintstone', nickname='freddy')>

>>> for u in session.query(User).order_by(User.id)[1:3]:
...    print(u)
<User(name='wendy', fullname='Wendy Williams', nickname='windy')>
<User(name='mary', fullname='Mary Contrary', nickname='mary')>

>>> for name, in session.query(User.name).\
...             filter_by(fullname='Ed Jones'):
...    print(name)
ed

>>> for name, in session.query(User.name).\
...             filter(User.fullname=='Ed Jones'):
...    print(name)
ed


>>> for user in session.query(User).\
...          filter(User.name=='ed').\
...          filter(User.fullname=='Ed Jones'):
...    print(user)
<User(name='ed', fullname='Ed Jones', nickname='eddie')>

更多的filter操作:

query.filter(User.name == 'ed')
query.filter(User.name != 'ed')
query.filter(User.name.like('%ed%'))
query.filter(User.name.ilike('%ed%'))
query.filter(User.name.in_(['ed', 'wendy', 'jack']))
query.filter(User.name.in_(
    session.query(User.name).filter(User.name.like('%ed%'))
))
query.filter(~User.name.in_(['ed', 'wendy', 'jack']))
query.filter(User.name == None)
query.filter(User.name.is_(None))
query.filter(User.name != None)
query.filter(User.name.isnot(None))

# and在filter中的三种方式
from sqlalchemy import and_
query.filter(and_(User.name == 'ed', User.fullname == 'Ed Jones'))
query.filter(User.name == 'ed', User.fullname == 'Ed Jones')
query.filter(User.name == 'ed').filter(User.fullname == 'Ed Jones')

from sqlalchemy import or_
query.filter(or_(User.name == 'ed', User.name == 'wendy'))

query.filter(User.name.match('wendy'))

返回列表和标量

>>> query = session.query(User).filter(User.name.like('%ed')).order_by(User.id)
>>> query.all()
[<User(name='ed', fullname='Ed Jones', nickname='eddie')>,
      <User(name='fred', fullname='Fred Flintstone', nickname='freddy')>]

>>> query.first()
<User(name='ed', fullname='Ed Jones', nickname='eddie')>

>>> user = query.one()#不是只有一行就报错(0行)
Traceback (most recent call last):
...
MultipleResultsFound: Multiple rows were found for one()

>>> user = query.filter(User.id == 99).one()#不是只有一行就报错(2行以上)
Traceback (most recent call last):
...
NoResultFound: No row was found for one()

>>> user = query.filter(User.id == 99).one_or_none()#超过一行就报错,一行或零行不报错
Traceback (most recent call last):
...
NoResultFound: No row was found for one()

>>> query = session.query(User.id).filter(User.name == 'ed').order_by(User.id)
>>> query.scalar()#scalar() 调用 one() 方法,并在成功时返回行的第一列:
1

使用文本SQL

>>> from sqlalchemy import text
>>> for user in session.query(User).\
...             filter(text("id<224")).\
...             order_by(text("id")).all():
...     print(user.name)
ed
wendy
mary
fred

使用冒号指定绑定参数,用 params() 方法指定值:

>>> session.query(User).filter(text("id<:value and name=:name")).\
...     params(value=224, name='fred').order_by(User.id).one()
<User(name='fred', fullname='Fred Flintstone', nickname='freddy')>

把表示完整语句的text()传递给 from_statement():

>>> session.query(User).from_statement(
...                     text("SELECT * FROM users where name=:name")).\
...                     params(name='ed').all()
[<User(name='ed', fullname='Ed Jones', nickname='eddie')>]

text() 允许我们将文本SQL按位置链接到ORM映射的列;具体的操作是将列作为位置参数传递给 TextClause.columns():

>>> stmt = text("SELECT name, id, fullname, nickname "
...             "FROM users where name=:name")
>>> stmt = stmt.columns(User.name, User.id, User.fullname, User.nickname)
>>> session.query(User).from_statement(stmt).params(name='ed').all()
[<User(name='ed', fullname='Ed Jones', nickname='eddie')>]

或者

>>> stmt = text("SELECT name, id FROM users where name=:name")
>>> stmt = stmt.columns(User.name, User.id)
>>> session.query(User.id, User.name).\
...          from_statement(stmt).params(name='ed').all()
[(1, u'ed')]

计数

>>> session.query(User).filter(User.name.like('%ed')).count()
2

>>> from sqlalchemy import func
>>> session.query(func.count(User.name), User.name).group_by(User.name).all()
[(1, u'ed'), (1, u'fred'), (1, u'mary'), (1, u'wendy')]


>>> session.query(func.count('*')).select_from(User).scalar()#SELECT count(*) FROM table
4

>>> session.query(func.count(User.id)).scalar()#也可以这样
4

建立关联

User表中的每一用户可以存储任意数量的与其用户名相关联的电子邮件地址。从User表到存储电子邮件地址(我们称之为Address)的新表存在一对多关联。我们定义这个表及其映射类Address:

>>> from sqlalchemy import ForeignKey
>>> from sqlalchemy.orm import relationship

>>> class Address(Base):
...     __tablename__ = 'addresses'
...     id = Column(Integer, primary_key=True)
...     email_address = Column(String, nullable=False)
...     user_id = Column(Integer, ForeignKey('users.id'))
...
...     user = relationship("User", back_populates="addresses")
...
...     def __repr__(self):
...         return "<Address(email_address='%s')>" % self.email_address

>>> User.addresses = relationship(
...     "Address", order_by=Address.id, back_populates="user")

上面的类引入了ForeignKey,它是一个应用于列的指令,指示应该将该列中的值限制在指定的另一张表的对应列中可能出现的值的范围内。这是关系数据库的核心特性,也是将原本不连接的表集合转换为具有丰富的重叠关系的粘合剂。上面的外键表示Address中的user_id列应该被约束为User表中的id列,即它的主键。

第二个指令称为relationship(),它告诉ORM,Address类本身应该使用属性Address. user链接到User类。relationship()使用两个表之间的外键关系来确定这个链接的性质,从而确定该Address与User的关系是多对一。

附加的relationship()放在User类下新建的User.address属性上。在这两个relationship()中,back_populates指定为互相引用的属性名;通过这样做,每个relationship()可以用back_populates传递关联:一端Address.user引用一个user实例,另一端则是User.addresses引用Address实例的列表,我们可以通过User表的back_populates反向查出所有它在addresses表里的关联项。

我们需要在数据库中创建地址表,所以我们将从我们的元数据发出另一个创建,它将跳过已经创建的表:

>>> Base.metadata.create_all(engine)

现在,当我们创建一个User时,将出现一个空白addresses集合。在这里可以使用各种集合类型,比如集合和字典,但是在默认情况下,集合是一个Python列表。

>>> jack = User(name='jack', fullname='Jack Bean', nickname='gjffdd')
>>> jack.addresses
[]

我们可以自由地在User对象上添加Address对象:

>>> jack.addresses = [
...                 Address(email_address='jack@google.com'),
...                 Address(email_address='j25@yahoo.com')]

当使用双向关系时,在一个方向上添加的元素在另一个方向上自动变得可见。此行为基于on-change事件属性发生,并在Python中进行计算,不使用任何SQL:

>>> jack.addresses[1]
<Address(email_address='j25@yahoo.com')>

>>> jack.addresses[1].user
<User(name='jack', fullname='Jack Bean', nickname='gjffdd')>

我们现在就向数据库添加并提交Jack Bean。jack和对应addresses集合中的两个Address成员都被一次性添加到会话中,这个添加关联数据的过程叫级联(cascading):

>>> session.add(jack)
>>> session.commit()

查询 Jack,只返回 Jack。没有关于Jack的addresses的SQL:

>>> jack = session.query(User).\
... filter_by(name='jack').one()
>>> jack
<User(name='jack', fullname='Jack Bean', nickname='gjffdd')>

让我们看看addresses集合。注意观察SQL:

>>> jack.addresses
[<Address(email_address='jack@google.com')>, <Address(email_address='j25@yahoo.com')>]

当我们访问addresses集合时,SQL指令才被发出。这是一个延迟加载(lazy loading)关系的例子。现在已经加载了addresses集合,其行为与普通列表一样。我们将简要介绍优化这个集合的加载的方法。

基于外键关联的多表联合查询

要在User和Address之间构造一个简单的隐式连接,可以使用Query.filter()将它们的相关列等同起来。下面我们用这个方法加载用户和地址实体

>>> for u, a in session.query(User, Address).\
...                     filter(User.id==Address.user_id).\
...                     filter(Address.email_address=='jack@google.com').\
...                     all():
...     print(u)
...     print(a)
<User(name='jack', fullname='Jack Bean', nickname='gjffdd')>
<Address(email_address='jack@google.com')>

另一方面,实际的SQL JOIN语法最容易使用Query.join()方法实现:

>>> session.query(User).join(Address).\
...         filter(Address.email_address=='jack@google.com').\
...         all()
[<User(name='jack', fullname='Jack Bean', nickname='gjffdd')>]

Query.join()知道如何在用户和地址之间进行连接,因为它们之间只有一个外键。如果没有外键,或者有多个外键,那么Query.join()在使用以下一种形式时工作得更好:

query.join(Address, User.id==Address.user_id)    # 显式关系
query.join(User.addresses)                       # 右连接指定关系
query.join(Address, User.addresses)              # 显式关系指定

正如您所期望的,使用Query.outerjoin()函数对外连接使用了相同的思想:

query.outerjoin(User.addresses)   # LEFT OUTER JOIN

join()方法通常从实体列表中最左边的项进行连接,如果省略了ON子句,或者ON子句是一个普通的SQL表达式。要控制连接列表中的第一个实体,可以使用Query.select_from()方法:

query = session.query(User, Address).select_from(Address).join(User)

使用别名

在跨多个表进行查询时,如果需要多次引用同一个表,SQL通常要求该表使用另一个名称作为别名,这样就可以根据该表的其他情况对其进行区分。使用aliased()构造支持这一点。在使用aliased()连接关系时,可以使用特殊属性方法PropComparator.of_type()来更改关系连接的目标,以引用给定的aliased()对象。下面,我们两次连接到Address实体,以定位具有两个不同电子邮件地址的用户:

>>> from sqlalchemy.orm import aliased
>>> adalias1 = aliased(Address)
>>> adalias2 = aliased(Address)
>>> for username, email1, email2 in \
...     session.query(User.name, adalias1.email_address, adalias2.email_address).\
...     join(User.addresses.of_type(adalias1)).\
...     join(User.addresses.of_type(adalias2)).\
...     filter(adalias1.email_address=='jack@google.com').\
...     filter(adalias2.email_address=='j25@yahoo.com'):
...     print(username, email1, email2)
jack jack@google.com j25@yahoo.com

除了使用PropComparator.of_type()方法之外,还经常可以看到Query.join()方法通过单独指示一个特定的目标来连接:

# 等同于 query.join(User.addresses.of_type(adalias1))
q = query.join(adalias1, User.addresses)

使用子查询

Query可以用作生成子查询的语句。假设我们想加载User对象以及每个用户有多少Address记录的计数。生成这样的SQL的最佳方法是获得按用户id分组的地址计数,并连接到父地址。在这种情况下,我们使用左外连接,这样我们可以为那些没有任何地址的用户返回行,例如:

SELECT users.*, adr_count.address_count FROM users LEFT OUTER JOIN
    (SELECT user_id, count(*) AS address_count
        FROM addresses GROUP BY user_id) AS adr_count
    ON users.id=adr_count.user_id

把这段SQL换成我们的query:

>>> from sqlalchemy.sql import func
>>> stmt = session.query(Address.user_id, func.count('*').\
...         label('address_count')).\
...         group_by(Address.user_id).subquery()

func生成SQL函数,Query上的subquery()方法生成一个SQL表达式构造,表示嵌入在别名中的SELECT语句(实际上是Query.statement.alias()的简写形式)。
一旦有了语句,它的行为就像一个表结构,就像我们在本教程开始时为用户创建的表结构一样。语句的列可以通过一个名为c的属性访问:

>>> for u, count in session.query(User, stmt.c.address_count).\
...     outerjoin(stmt, User.id==stmt.c.user_id).order_by(User.id):
...     print(u, count)
<User(name='ed', fullname='Ed Jones', nickname='eddie')> None
<User(name='wendy', fullname='Wendy Williams', nickname='windy')> None
<User(name='mary', fullname='Mary Contrary', nickname='mary')> None
<User(name='fred', fullname='Fred Flintstone', nickname='freddy')> None
<User(name='jack', fullname='Jack Bean', nickname='gjffdd')> 2

从子查询中选择对象

上面,我们只是从子查询中选择了一个包含列的结果
如果我们希望我们的子查询映射到一个对象呢?
可以使用aliased()将映射类的“别名”关联到子查询

>>> stmt = session.query(Address).\
...                 filter(Address.email_address != 'j25@yahoo.com').\
...                 subquery()
>>> adalias = aliased(Address, stmt)
>>> for user, address in session.query(User, adalias).\
...         join(adalias, User.addresses):#这样就返回Address的对象,而不是列了
...     print(user)
...     print(address)
<User(name='jack', fullname='Jack Bean', nickname='gjffdd')>
<Address(email_address='jack@google.com')>

如果不加aliased()是这样的,stmt被拆成列:

>>> stmt = session.query(Address).\
...                 filter(Address.email_address != 'j25@yahoo.com').\
...                 subquery()
>>> for user in session.query(User, stmt).\
...         join(stmt, User.addresses):
...     print(user)
(<User(name='jack', fullname='Jack Bean', nickname='gjffdd')>, 1, 'jack@google.com', 5)

判断数据的存在性

SQL中的EXISTS关键字是一个布尔运算符,如果给定的表达式包含任何行,它将返回True。它可以在许多场景中代替join,并且对于定位相关表中没有对应行的行也很有用。
存在一个显式的结构,它看起来像这样:

>>> from sqlalchemy.sql import exists
>>> stmt = exists().where(Address.user_id==User.id)
>>> for name, in session.query(User.name).filter(stmt):
...     print(name)
jack

上面的语句换成可以沿着User.addresses的关联使用Comparator.any():

>>> for name, in session.query(User.name).\
...         filter(User.addresses.any()):
...     print(name)
jack

也可以给它个判断依据,来限制它返回的行数:

>>> for name, in session.query(User.name).\
...     filter(User.addresses.any(Address.email_address.like('%google%'))):
...     print(name)
jack

Comparator.has() 跟 Comparator.any()一样都是用于多对一的关系:

>>> session.query(Address).\
...         filter(~Address.user.has(User.name=='jack')).all()
[]

各种基于关系的操作:

#Comparator.__eq__() (多对一,等于):
query.filter(Address.user == someuser)
#Comparator.__ne__() (多对一,不等于):
query.filter(Address.user != someuser)
#IS NULL (多对一):
query.filter(Address.user == None)
#Comparator.contains() (用于一对多的集合):
query.filter(User.addresses.contains(someaddress))
#Comparator.any() (用于集合):
query.filter(User.addresses.any(Address.email_address == 'bar'))

# 也接受关键字参数
query.filter(User.addresses.any(email_address='bar'))
#Comparator.has() (Address.user.has(User.name=='ed')):
query.filter(Address.user.has(name='ed'))
#Query.with_parent() (用于任何关联):
session.query(Address).with_parent(someuser, 'addresses')

预加载

回想一下前面我们在访问User.addresses时演示的延迟加载操作,在访问User的address集合时才发出SQL。如果您想要减少查询的次数(在很多情况下是显著减少),我们可以对查询操作应用即时加载。
SQLAlchemy提供了三种即时加载类型,其中两种是自动加载的,第三种是自定义标准。这三种方法通常都是通过称为query options的函数调用的,该函数通过query.options()方法向查询提供关于我们希望如何加载各种属性的附加指令。

Selectin Load

在这种情况下,我们想要User.addresses应尽快加载。加载一组对象及其相关的集合的一个很好的选择是selectinload(),它发出第二个SELECT语句,该语句完全加载与刚刚加载的结果相关联的集合。名称“selectin”源于SELECT语句使用IN子句来同时定位多个对象的相关行:

>>> from sqlalchemy.orm import selectinload
>>> jack = session.query(User).\
...                 options(selectinload(User.addresses)).\
...                 filter_by(name='jack').one()
>>> jack
<User(name='jack', fullname='Jack Bean', nickname='gjffdd')>

>>> jack.addresses
[<Address(email_address='jack@google.com')>, <Address(email_address='j25@yahoo.com')>]

Joined Load

这种加载方式会发出连接,默认情况下是左外连接,以便在一次操作中预先加载主对象及其相关对象(注意:这个不是JOIN)

>>> from sqlalchemy.orm import joinedload

>>> jack = session.query(User).\
...                        options(joinedload(User.addresses)).\
...                        filter_by(name='jack').one()
>>> jack
<User(name='jack', fullname='Jack Bean', nickname='gjffdd')>

>>> jack.addresses
[<Address(email_address='jack@google.com')>, <Address(email_address='j25@yahoo.com')>]

Explicit Join + Eagerload

第三种快速加载方式是,当我们显式地构造一个JOIN以定位主行,并且希望将额外的表应用于主对象上的相关对象或集合时。该特性是通过contains_eager()函数提供的,通常用于在需要对同一对象进行筛选的查询中预加载多对一对象。下面的例子做到了预先加载每一条Address表的记录和与之关联的User对象(例子中只筛选出名为“jack”的User),使用contains_eager()将“User”表中的列预加载进Address.user。

>>> from sqlalchemy.orm import contains_eager

>>> jacks_addresses = session.query(Address).\
...                             join(Address.user).\
...                             filter(User.name=='jack').\
...                             options(contains_eager(Address.user)).\
...                             all()
>>> jacks_addresses
[<Address(email_address='jack@google.com')>, <Address(email_address='j25@yahoo.com')>]

>>> jacks_addresses[0].user
<User(name='jack', fullname='Jack Bean', nickname='gjffdd')>

删除操作

让我们试着删除jack,看看结果如何。我们会在会话中将对象标记为已删除,然后我们会发出一个计数查询用以确认。

>>> session.delete(jack)
sql
>>> session.query(User).filter_by(name='jack').count()
0

到目前为止,一切顺利。那Jack的Address对象呢?

>>> session.query(Address).filter(
...     Address.email_address.in_(['jack@google.com', 'j25@yahoo.com'])
...  ).count()
2

它还在!SQLAlchemy并没有级联删除它。

配置级联删除

我们将在User.addresses的关联上配置级联选项。虽然SQLAlchemy允许您在任何时间点向映射添加新的属性和关系,但是在这种情况下,需要删除现有的关系,所以我们需要完全删除映射并重新开始—先关闭会话:

>>> session.close()
ROLLBACK

创建一个新的declarative_base():

>>> Base = declarative_base()

接下来,我们将声明User类,添加addresses关联,包括级联配置(我们也将不使用构造函数):

>>> class User(Base):
...     __tablename__ = 'users'
...
...     id = Column(Integer, primary_key=True)
...     name = Column(String)
...     fullname = Column(String)
...     nickname = Column(String)
...
...     addresses = relationship("Address", back_populates='user',
...                     cascade="all, delete, delete-orphan")
...
...     def __repr__(self):
...        return "<User(name='%s', fullname='%s', nickname='%s')>" % (
...                                self.name, self.fullname, self.nickname)

然后我们重建Address类,注意,在本例中我们已经通过User类创建了Address.user关联:

>>> class Address(Base):
...     __tablename__ = 'addresses'
...     id = Column(Integer, primary_key=True)
...     email_address = Column(String, nullable=False)
...     user_id = Column(Integer, ForeignKey('users.id'))
...     user = relationship("User", back_populates="addresses")
...
...     def __repr__(self):
...         return "<Address(email_address='%s')>" % self.email_address

下面使用Query.get()加载user jack(它按主键加载),从对应的addresses集合中移除一个地址将会导致该地址也被删除:

# 按主键加载jack
>>> jack = session.query(User).get(5)

# 删除一个地址(延迟加载被触发)
>>> del jack.addresses[1]

# 只剩下一个地址
>>> session.query(Address).filter(
...     Address.email_address.in_(['jack@google.com', 'j25@yahoo.com'])
... ).count()
1

删除Jack将同时删除Jack和与用户关联的剩余Address:

>>> session.delete(jack)

>>> session.query(User).filter_by(name='jack').count()
0

>>> session.query(Address).filter(
...    Address.email_address.in_(['jack@google.com', 'j25@yahoo.com'])
... ).count()
0

建立多对多的关联

对于普通的多对多,我们需要创建一个未映射的表结构作为关联表。这看起来像如下:

>>> from sqlalchemy import Table, Text
>>> # 关联表
>>> post_keywords = Table('post_keywords', Base.metadata,
...     Column('post_id', ForeignKey('posts.id'), primary_key=True),
...     Column('keyword_id', ForeignKey('keywords.id'), primary_key=True)
... )

在上面,我们可以看到直接声明一个表与声明一个映射类有一点不同。Table是一个构造函数,因此每个列参数之间用逗号分隔。列对象的名称也是显式给定的,而不是从已分配的属性名称中获取的。
接下来,我们定义BlogPost和Keyword结构,互相relationship(),每个都将post_keywords表引用为一个关联表:

>>> class BlogPost(Base):
...     __tablename__ = 'posts'
...
...     id = Column(Integer, primary_key=True)
...     user_id = Column(Integer, ForeignKey('users.id'))
...     headline = Column(String(255), nullable=False)
...     body = Column(Text)
...
...     # many to many BlogPost<->Keyword
...     keywords = relationship('Keyword',
...                             secondary=post_keywords,
...                             back_populates='posts')
...
...     def __init__(self, headline, body, author):
...         self.author = author
...         self.headline = headline
...         self.body = body
...
...     def __repr__(self):
...         return "BlogPost(%r, %r, %r)" % (self.headline, self.body, self.author)


>>> class Keyword(Base):
...     __tablename__ = 'keywords'
...
...     id = Column(Integer, primary_key=True)
...     keyword = Column(String(50), nullable=False, unique=True)
...     posts = relationship('BlogPost',
...                          secondary=post_keywords,
...                          back_populates='keywords')
...
...     def __init__(self, keyword):
...         self.keyword = keyword

我们还希望我们的BlogPost类有一个author字段。我们将把它添加为另一个双向关系,因为单个用户可能有很多博客文章。当我们访问User.posts时,我们希望能够在不加载整个集合的前提下进一步filter结果。为此,我们使用了一个名为lazy='dynamic'的relationship()加载策略:

>>> BlogPost.author = relationship(User, back_populates="posts")
>>> User.posts = relationship(BlogPost, back_populates="author", lazy="dynamic")

创建新表

>>> Base.metadata.create_all(engine)

用法和我们之前做的没有太大区别。让我们给Wendy一些博客文章:

>>> wendy = session.query(User).\
...                 filter_by(name='wendy').\
...                 one()
>>> post = BlogPost("Wendy's Blog Post", "This is a test", wendy)
>>> session.add(post)

我们将关键字唯一地存储在数据库中,但我们知道我们还没有任何关键字,所以我们先创建它们

>>> post.keywords.append(Keyword('wendy'))
>>> post.keywords.append(Keyword('firstpost'))

我们现在可以用“firstpost”这个关键字查找所有的博客文章。我们将使用any操作符来定位“其中的任何关键词含有字符串' firstpost '的博客文章”:

>>> session.query(BlogPost).\
...             filter(BlogPost.keywords.any(keyword='firstpost')).\
...             all()
[BlogPost("Wendy's Blog Post", 'This is a test', <User(name='wendy', fullname='Wendy Williams', nickname='windy')>)]

如果我们想要查找用户wendy拥有的文章,我们可以告诉查询将范围缩小到User对象作为父对象:

>>> session.query(BlogPost).\
...             filter(BlogPost.author==wendy).\
...             filter(BlogPost.keywords.any(keyword='firstpost')).\
...             all()
[BlogPost("Wendy's Blog Post", 'This is a test', <User(name='wendy', fullname='Wendy Williams', nickname='windy')>)]

或者我们可以使用Wendy自己的posts关系,这是一种“动态”关系,直接从wendy.posts查询:

>>> wendy.posts.\
...         filter(BlogPost.keywords.any(keyword='firstpost')).\
...         all()
[BlogPost("Wendy's Blog Post", 'This is a test', <User(name='wendy', fullname='Wendy Williams', nickname='windy')>)]

(完)
参考资料:[SQLAlchemy 1.3 Documentation]https://docs.sqlalchemy.org/en/13/orm/tutorial.html

上一篇下一篇

猜你喜欢

热点阅读