sqlalchemy使用及序列化
2019-10-01 本文已影响0人
barriers
1创建model
config.py
from sqlalchemy import create_engine
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import sessionmaker
# 设置最大连接数为5个
engine = create_engine("postgresql+psycopg2://username:password@host:port/database", max_overflow=5, encoding='utf-8')
Base = declarative_base()
session_maker = sessionmaker(bind=engine)
# 获取数据库会话
session = session_maker()
model.py
import datetime
from sqlalchemy import Column, Integer, ForeignKey, UniqueConstraint, Index, String, JSON, DateTime
from sqlalchemy.orm import relationship
from tools.config import Base
class GridAIrQuality(Base):
# 表名
__tablename__ = 'grid_air_quality'
# 表格参数设置,此处设置唯一键
__table_args__ = (
UniqueConstraint(
'published_at',
'grid_id',
name='grid_weather_unique_index'
),
)
# 表格的字段
id = Column(Integer, primary_key=True, autoincrement=True)
# 设置外键
grid_id = Column(Integer, ForeignKey('grid.id'))
data = Column(JSON)
published_at = Column(DateTime)
# 设置表格的手动序列化实现,不推荐
def to_dict(self):
return {
'id': self.id,
'data': self.data,
'grid_id': self.grid_id,
# 'grid': [i.to_json() for i in self.grid],
# 获取它关联的其他表的信息并调用对象方法序列化
'grid': self.grid.to_json(),
'published_at': datetime.datetime.strftime(self.published_at, '%Y-%m-%d %H:%M:%S')
}
class Grid(Base):
__tablename__ = 'grid'
id = Column(Integer, primary_key=True, autoincrement=True)
size = Column(Integer)
area = Column(Integer)
location = Column(String(512))
district = Column(String(32))
remark = Column(String(32))
row = Column(Integer)
column = Column(Integer)
bottom_left_coord = Column(JSON)
top_right_coord = Column(JSON)
center_coord = Column(JSON)
created_at = Column(DateTime)
updated_at = Column(DateTime)
tags = Column(String(128))
operator = Column(String(32))
name = Column(String(32))
area_ids = Column(String(32))
grid_code = Column(String(32))
# 设置关联键和反向引用描述
grid_air_quality = relationship('GridAIrQuality', backref='grid')
def to_json(self):
return {
'id': self.id,
'size': self.size,
'area': self.area,
'location': self.location,
'district': self.district,
'remark': self.remark,
'row': self.row,
'column': self.column,
'bottom_left_coord': self.bottom_left_coord,
'top_right_coord': self.top_right_coord,
'center_coord': self.center_coord,
'created_at': datetime.datetime.strftime(self.created_at, '%Y-%m-%d %H:%M:%S'),
'updated_at': datetime.datetime.strftime(self.updated_at, '%Y-%m-%d %H:%M:%S'),
'tags': self.tags,
'operator': self.operator,
'name': self.name,
'area_ids': self.area_ids,
'grid_code': self.grid_code,
}
2使用model进行查询
查询中,对查询结果用all(),表示取所有,用one()或者first()表示取第一个;
联合条件查询and_,or_需要使用filter进行筛选,而单个查询既能使用filter,也可以使用filter_by进行筛选,用filter筛选需要使用类.字段==的形式进行筛选,而filter_by直接使用字段=进行筛选即可
use.py
from config import session
from model import GridAIrQuality, Grid
gridHourAQData = session.query(GridAIrQuality).filter(GridAIrQuality.published_at<='2019-05-05', GridAIrQuality.published_at>='2019-05-01').all()
# 调用自定义对象方法进行序列化
gridHourAQData[0].to_dict()
3使用marshmallow进行序列化
form.py
from datetime import datetime
from marshmallow import Schema, fields, pprint
class GridSchema(Schema):
id = fields.Integer()
size = fields.Integer()
area = fields.Integer()
location = fields.String()
district = fields.String()
remark = fields.String()
row = fields.Integer()
column = fields.Integer()
bottom_left_coord = fields.Dict()
top_right_coord = fields.Dict()
center_coord = fields.Dict()
created_at = fields.Function(lambda obj: datetime.strftime(obj.created_at, '%Y-%m-%d %H:%M:%S'))
updated_at = fields.Function(lambda obj: datetime.strftime(obj.updated_at, '%Y-%m-%d %H:%M:%S'))
# created_at = fields.DateTime()
# updated_at = fields.DateTime()
tags = fields.String()
operator = fields.String()
name = fields.String()
area_ids = fields.String()
grid_code = fields.String()
class GridAirQualitySchema(Schema):
id = fields.Integer()
grid_id = fields.Integer()
# grid = fields.Nested(GridSchema, many=True)
# 一对一时不用many=True,一对多或者多对多时需要many=True参数
grid = fields.Nested(GridSchema)
data = fields.Dict()
# published_at = fields.DateTime()
# 自定义时间序列化规则,源数据直接序列化后不符合要求
published_at = fields.Function(lambda obj: datetime.strftime(obj.published_at, '%Y-%m-%d %H:%M:%S'))
4使用定义的序列化与反序列化器序列化
use.py
from config import session
from model import GridAIrQuality, Grid
from marshmallow import pprint
from form import GridAirQualitySchema
gridHourAQData = session.query(GridAIrQuality).filter(GridAIrQuality.published_at<='2019-05-05', GridAIrQuality.published_at>='2019-05-01').all()
# 生成序列化器对象,并传入many=True参数表示是对多个对象进行序列化(即数组中的所有对象)
schema = GridAirQualitySchema(many=True)
# 调用dump序列化为python格式的数据,若用dumps,则序列化为json格式
data = schema.dump(gridHourAQData)
print(data)
5使用未定义好的外键和关系描述进行联表查询
要联结超过 2 张以上的表,可以直接在 join 得到的结果之后链式调用 join 。也可以在 filter 的结果后面链式调用 join 。join 和 filter 返回的都是 query 对象,因此可以无限链式调用下去。
写完查询后,应该打印生成的 SQL 语句查看一下有没有性能问题。
from config import session
ret=session.query(Person,Favor).filter(Person.favor_id==Favor.nid).all()
ret1=session.query(Person).join(Favor).all()
data=session.query(Account, Bind).join(Bind,Account.gameuid==Bind.fromid).filter(Bind.toid==1000).all()
6查询数据库所有表格及其字段信息
import os
import pandas as pd
from sqlalchemy import create_engine
import psycopg2
import psycopg2.extras
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import session_maker
engine = create_engine("postgresql+psycopg2://glzt:123456@127.0.0.1:5432/spider", max_overflow=5, encoding='utf-8')
Base = declarative_base()
session_maker = sessionmaker(bind=engine)
session = session_maker()
Base.metadata.reflect(engine)
# 查看表结构,(返回一个字典,键为表名,值为字段信息)
tables = Base.metadata.tables
# 获取所有表格,字典为键为表名,值为字段属性组成的
tables = list(tables.keys())
data = pd.read_sql(sql, engine)
sql = f"""select setval('{table}_id_seq', (select max(id) from {table}));"""
dayAQData_o3 = [gridHourAQDataGroupById['o3', 'published_at'].get_group(x) for x in index]