Python使用ORM进行数据库操作&原生SQL查询

2022-09-29  本文已影响0人  乘风破浪的姐姐

sqlalchemy是python最为常用的第三方ORM模块。
1、pycharm安装sqlalchemy
命令: pip install sqlalchemy
2、sqlalchemy连接数据库
db_sqlalchemy.py模块代码:
参数都放在yaml中,此步省略...

HOST = db.get("host")
PORT = db.get("port")
USERNAME = db.get("user")
PASSWORD = db.get("passwd")
DBBASE = db.get("database")
OVERFLOW = db.get("max_overflow")
POOLSIZE = db.get("pool_size")
TIMEOUT = db.get("pool_timeout")
RECYCEL = db.get("pool_recycle")
DBMS = db.get("dbms")
DBAPI = db.get("dbapi")
CHARSET = db.get("db_charset")

DB_URI = f'{DBMS}+{DBAPI}://{USERNAME}:{PASSWORD}@{HOST}:{PORT}/{DBBASE}'
# 创建引擎
engine = create_engine(
    DB_URI,
    # 超过链接池大小外最多创建的链接
    max_overflow=OVERFLOW,
    # 链接池大小
    pool_size=POOLSIZE,
    # 链接池中没有可用链接则最多等待的秒数,超过该秒数后报错
    pool_timeout=TIMEOUT,
    # 多久之后对链接池中的链接进行一次回收
    pool_recycle=RECYCEL,
    # 查看原生语句(未格式化)
    echo=True,
    encoding=CHARSET

)

class MySqlalchemySession():
    def __enter__(self):
        self.session = self.getSession()

    def getSession(self):
        # 绑定引擎
        Session = sessionmaker(bind=engine, expire_on_commit=False)
        session = scoped_session(Session)

        return session
    # 释放连接池资源
    def __exit__(self, exc_type, exc_val, exc_tb):
        self.session.close()

session = MySqlalchemySession().getSession()

3、创建映射对象
customer.py模块:

import datetime
import traceback
from common_util import reda_sql_data
from db_sqlalchemy import Base, session
from sqlalchemy import (
    Column,
    Integer,
    String,
    DateTime,
)

ms = reda_sql_data("XXX", "customer_list_by_id")

class customer(Base):

    """ 必须继承Base """
    # 数据库中存储的表名
    __tablename__ = "customer"
    # 对于必须插入的字段,采用nullable=False进行约束,它相当于NOT NULL
    # 对于非必须插入的字段,不用采取nullable=False进行约束
    id = Column(Integer, primary_key=True, autoincrement=True, comment="主键")
    sa_code = Column(String(100), nullable=False, comment="sa_code ")
    customer_no = Column(String(64), nullable=False, comment="编号")
    status = Column(String(64), nullable=False, comment="status ")
    create_time = Column(DateTime, default=datetime.datetime.now, comment="创建时间")
    last_modify_time = Column(DateTime, onupdate=datetime.datetime.now, comment="最后更新时间")
    deleted = Column(Integer, comment="是否删除")


    def __str__(self):
        return f"<id:{self.id} customer_no :{self.customer_no }>"
 

4、使用原生SQL查询

def query_customer_list_by_id(sql,sa_code='AA', limit=1):
        try:
            conditions = dict()
            base_sql_pre = sql
            conditions.update({"sa_code": sa_code})
            if status :
                status _sql = " and cm.status = :status "
                conditions.update({"status ": status })
            else:
                status_sql = ""

            page_sql = " order by create_time desc limit :limit_size"
            conditions.update({"limit_size": int(limit)})

            # 组合sql
            select_sql = base_sql_pre + status_sql + page_sql
            print(select_sql)
            cursor = session.execute(select_sql, conditions)
            return cursor.fetchall()

            # return res
        except Exception:
            traceback.print_exc()

5、测试

if __name__ == "__main__":
    re = query_customer_list_by_id("select cm.customer_no from customer cm where   cm.sa_code = :saas_tenant_code and ")
    print(list(re[0])[0])
上一篇下一篇

猜你喜欢

热点阅读