一篇入门SQLAlchemy

2019-05-06  本文已影响0人  苏尚君

changelog

背景

近期团队内部拟培训 SQLAlchemy,但我觉得组织培训太费资源了,只是入门教程的话,还是用文档+代码说话吧。所以有了这篇文章,相信工程师们一点就通,不懂的请留言/互相探讨切磋/自行根据文档尝试/google尝试等。

历史

有关关系型数据库和编程,我们知道几点:

  1. SQL 是一种编程语言,专用于数据库查询;Python 也是一种编程语言。
  2. 不同的编程语言需要由不同的后端来执行。SQL 需要有相应的数据库后端来执行,Python 则需要相应的解释器来执行。
  3. Python 解释器可以直接调用 SQL 执行后端提供的 API,但稍微了解数据库的同学就知道,涉及到的细节操作还挺多的(例如编码、网络等)。所有了若干 Python 包,用户可以直接用这些包提供的高级一点的 API 来和数据库打交道(而不用处理许多细节)。
  4. 准确地说,SQL 更像一种规范,即「官话」、「普通话」,有许多方言比如「北京话」、「东北话」等——MySQL,PostgreSQL,Oracle 等等,它们的共性也有各自的特性,如语法、如数据类型。
  5. 不同的方言需要不同的 Python 包来「翻译」和执行,比如 PostgreSQL 常用的 Python 后端是 psycopg2

由上可知,我们在服务某些客户时将遇到这种情况:(不同的)客户的数据存储在不同的数据库中,如 A 客户有 MySQL,B 客户有 Oracle,我们给他们提供的服务相同,只是数据库不同;但我们希望只写一套 Python 代码,通过少数配置项的修改(而非逐一检查所有 SQL 并修改!),就能使这套代码同时运行在两个数据库上。这时候,我们的 ORM 就出场了。

本文仅简单介绍 SQLAlchemy(以下简称 SA)

基本用法

截止文档撰写时,1.3 版本的 SA 是最新的稳定版,因此本文档主要基于 1.3 版撰写而成,主要参考材料是官方文档 https://docs.sqlalchemy.org/en/13/和项目实践

套路:4步法

对于初学者而言,最核心的文档就是这篇 https://docs.sqlalchemy.org/en/13/orm/tutorial.html,根据需要查询 API 即可。SA 使用套路主要就是以下 4 步:

  1. 照着关系型数据库的表结构,定义一个映射(mapping)关系 M
  2. 创建一个引擎(engine)和一个会话(session)
  3. 将会话(session)绑定到引擎(engine)上
  4. 使用会话(session)和映射(M)进行查询

定义映射

有 2 种定义方式:https://docs.sqlalchemy.org/en/13/orm/mapping_styles.html

除非有特殊需要,否则一般使用声明法即可,就像写一个类一样简单。

以下是一个示例:

from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy import Column, String, Integer
from sqlalchemy.dialects.postgresql import JSONB

Base = declarative_base()
class PaginationCache(Base):
    __tablename__ = 'pagination_cache'    # 对应数据库中的表名
    token = Column('token', String(50), primary_key=True)
    page = Column('page', Integer, primary_key=True)
    content = Column('content', JSONB)

Base.metadata.create_all(sa_engine['db_user'])    # 建表

关于定义映射所需要的数据类型,请见下一小节。

数据类型

一般只要了解基本数据类型即可:https://docs.sqlalchemy.org/en/13/core/type_basics.html

from sqlalchemy.types import Date, String, Float, Numeric, Text

一些特殊的数据类型需要从方言中导入:https://docs.sqlalchemy.org/en/13/core/type_basics.html#vendor-specific-types

from sqlalchemy.dialects.postgresql import ARRAY

更复杂的内容请自行查阅文档(一般用不上):https://docs.sqlalchemy.org/en/13/core/types.html

创建引擎与会话

基本的用法教程中都有,具体参数请自行查阅文档。给出一份示例代码并简要解释若干点:

对于 engine:

  1. 第 1 个位置参数是数据库协议,具体请见 https://docs.sqlalchemy.org/en/13/core/engines.html
  2. creator 参数接受一个函数,该函数返回一个数据库连接。强调:creator 接受的是「函数」,而不是 「函数返回值」
  3. pool_size 限制了同时允许存在的连接数,设为 0 表示不限制连接数
  4. (可选)echo 为 True/"debug" 时,用于打印日志。详情看文档 https://docs.sqlalchemy.org/en/13/core/engines.html#sqlalchemy.create_engine
  5. (可选)json_serializer 不是必要的:当连接 Postgres 且需要使用 JSON/JSONB 类型时,可以将一个自定义的函数传递给该参数,从而在数据传递进数据库前通过该函数进行 dump 操作 https://docs.sqlalchemy.org/en/13/dialects/postgresql.html#sqlalchemy.dialects.postgresql.JSON
from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker
import psycopg2  # 如果要使用下面的 pg 连接辅助函数 connect
import json
import datetime
import decimal
import mysql


def make_sqlalchemy_handlers(db_choice):
    creator_table = {
        "postgresql://": lambda x=db_choice: pg_connect(x),
        "mysql+mysqlconnector://": lambda x=db_choice: mysql_connect(x)
    }
    kwargs = {
        "pool_size": 0,
        "json_serializer": json_stringify,
        "creator": creator_table[protocol],
        "encoding": "utf8",
        "pool_recycle": 600 # https://docs.sqlalchemy.org/en/13/core/pooling.html#setting-pool-recycle
#        "echo": True
    }
    engine = create_engine(
            protocol,
            **kwargs)
    Session = sessionmaker(autoflush=False)
    Session.configure(bind=engine)
    sa_session = Session()
    return engine, sa_session
    

# 用 psycopg2 连接中文数据库的辅助函数
def pg_connect(db_choice):
    dbname = config.get(db_choice, 'database')
    host = config.get(db_choice, 'host')
    port = config.get(db_choice, 'port')
    user = config.get(db_choice, 'user')
    password = config.get(db_choice, 'password')
    conn = psycopg2.connect(
        "dbname='%(dbname)s' user='%(user)s' password='%(password)s' host='%(host)s' port='%(port)s'"
        % {'dbname': dbname, 'user': user, 'password': password, 'host': host, 'port': port}
    )
    return conn


def mysql_connect(db_choice):
    dbname = config.get(db_choice, 'database')
    host = config.get(db_choice, 'host')
    port = config.get(db_choice, 'port')
    user = config.get(db_choice, 'user')
    password = config.get(db_choice, 'password')
    conn = mysql.connector.connect(
            user=user, password=password,
            host=host, port=port,
            database=dbname, buffered=True)
    return conn


def json_default(o):
    if isinstance(o, datetime.datetime):
        return o.strftime('%Y-%m-%d %H:%M:%S')
    elif isinstance(o, datetime.date):
        return o.isoformat()
    elif isinstance(o, decimal.Decimal):
        return float(o)


def json_stringify(some_dict):
    return json.dumps(some_dict,
                      default=json_default,
                      ensure_ascii=False,
                      indent=4
                      )

对于 session: https://docs.sqlalchemy.org/en/13/orm/session_api.html#sqlalchemy.orm.session.Session.init

  1. autoflush 保证每次查询时能够自动把新加入的对象刷入数据库
  2. bind 用来绑定会话(session)与引擎(engine)

查询

以上面的 session 和类为例,如下的方法创建了 2 个查询对象

# 同时查多个列
orm_query_obj_0 = sa_session['db_user'].query(
                PaginationCache.content, PaginationCache.page).filter(
                PaginationCache.token == token)

# 同时有多个筛选条件
orm_query_obj_1 = sa_session['db_user'].query(PaginationCache.content).filter(
                PaginationCache.token == token, PaginationCache.page > 2)

先对上述代码做点解释:

构建好查询对象 q 后:

写入

基本套路就是:

有 2 种方法:

示例代码如下:

# 批量添加对象,一次性提交
for i in range(pages):
    new_page = PaginationCache(
                    token=token,
                    page=i,
                    content=[{"a": i, "b": i*2}, {"a": i+1, "b": (i+1)*2}])
    new_pages.append(new_page)
    
sa_session['db_user'].add_all(new_pages)
sa_session['db_user'].commit()



# 逐个添加对象,逐次提交;附上事务回滚示例
for i in range(pages):
    new_page = PaginationCache(
                    token=token,
                    page=i,
                    content=[{"a": i, "b": i*2}, {"a": i+1, "b": (i+1)*2}])
    sa_session['db_user'].add(new_page)
    try:
        sa_session['db_user'].commit()
    except:
        print('Error when commit pagination cache.')
        print(traceback.format_exc())        # 打印错误栈
        sa_session['db_user'].rollback()     # 消除这次提交,回滚状态

稍微进阶

每当交互时才创建连接会话/连接上下文

在上面最初的版本中,会话在一开始就创建了,然后多次使用:

# in db.py
def make_sqlalchemy_handlers(db_choice):
    # ...blabla...
    Session = sessionmaker(autoflush=False)
    Session.configure(bind=engine)
    sa_session = Session()
    return engine, sa_session
    
engine, session = make_sqlalchemy_handlers('postgresql://')

# in a.py
from db import session
orm_query_obj_0 = session.query(*args_0).all()

# in b.py
from db import session
orm_query_obj_1 = session.query(*args_1).all()

上述这种做法在特别简单的系统可能不会出大问题;但只要系统运行时间变长,系统复杂程度提高,那么这么做就有风险了。至少有 1 点风险:

鉴于此,可以考虑做一个简单的上下文管理器:

# db.py

import contextlib
from sqlalchemy.orm import sessionmaker

# 已经定义了 make_sqlalchemy_handlers,同上 ...

sa_engine = dict()
sa_session_maker = dict()
for db_section in ['db_a', 'db_b', 'db_c']:
    engine_t, session_t = make_sqlalchemy_handlers(db_section)
    sa_engine[db_section] = engine_t
    sa_session_maker[db_section] = session_t


@contextlib.contextmanager
def db_context(choice):
    """
        Context manager for a transaction
        ref:

        https://docs.sqlalchemy.org/en/13/orm/session_basics.html
        https://docs.python.org/3.5/library/contextlib.html#contextlib.contextmanager
    """
    session = sa_session_maker[choice]()
    try:
        yield session
        session.commit()
    except Exception as e:
        session.rollback()
        raise e
    finally:
        session.close()      

这样以后每次需要和数据库打交道时,创建连接上下文,不需要的时候退出上下文、连接自然释放了:

from db import db_context

with db_context('db_a') as sa_session:
    query = sa_session.query(*view_columns).filter(*filters)
    result = query.all()

需要注意的是,建立会话也就是建立连接(大致相同,严格来说本句话可能还要斟酌。。。),较消耗资源(众所周知,在耗时上,CPU<内存<IO,建立连接就是在IO层面的操作),所以在这种为了安全性而设计的写法之下,需要仔细考虑建立会话的时机。

创建约束

参考 Table ConfigurationUNIQUE Constraint ,延续声明式的建模方式,要在建表时对某几个字段组合的唯一性进行限制时,可在名为 __table_args__ 的类变量中加入限制,例如:

from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy import Column, String, Date, Integer, UniqueConstraint

Base = declarative_base()

class MyTable(Base):
    __tablename__ = 'mytable'
    
    col1 = Column('col1', String(20))
    col2 = Column('col2', Date)
    col3 = Column('col3', Integer)
    # 其他字段定义……
    
    __table_args = (
      UniqueConstraint('col1', 'col2', 'col3', name='uix_state'),
    )

这里有个额外建议注意的点:注上述表定义中,字段名字面量被重复使用。因此这里建议在表格定义前将这些字面量存为变量(常量):

# blabla

_COLNAME_MyTable_col1 = 'col1'
_COLNAME_MyTable_col2 = 'col2'
_COLNAME_MyTable_col3= 'col3'

# blabla
class MyTable(Base):
    __tablename__ = 'mytable'
    
    col1 = Column(_COLNAME_MyTable_col1, String(20))
    col2 = Column(_COLNAME_MyTable_col2, Date)
    col3 = Column(_COLNAME_MyTable_col3, Integer)
    # 其他字段定义……
    
    __table_args = (
      UniqueConstraint(
        _COLNAME_MyTable_col1, 
        _COLNAME_MyTable_col2,
        _COLNAME_MyTable_col3,
        name='uix_state'
      ),
    )

这么做是为了避免哪一天修改了某个字段名字面量,该字段正好被 UniqueConstraint 使用,但修改者又忘了去修改 UniqueConstraint 中的对应字面量,也就是下面这种超出用户预期的示例情况:修改了 col1 的命名为 col110,但在 UniqueConstraint 中,该字段仍然命名为 col1

# blabla

class MyTable(Base):
    __tablename__ = 'mytable'
    
    col1 = Column('col110', String(20))
    col2 = Column('col2', Date)
    col3 = Column('col3', Integer)
    # 其他字段定义……
    
    __table_args = (
      UniqueConstraint('col1', 'col2', 'col3', name='uix_state'),
    )
上一篇下一篇

猜你喜欢

热点阅读