使用psycopg2进行批量更新数据库及批量插入
2019-09-30 本文已影响0人
barriers
1处理数据
def update():
# 获取太阳光照
local_data = pd.read_csv('./data.csv')
# 调用数据库工具类生成对象
db = DbHandle()
engine = create_engine("postgresql+psycopg2://username:password@host:port/database")
data_range = [datetime.datetime.strptime('2019-04-25 08:00:00', '%Y-%m-%d %H:%M:%S'), datetime.datetime.strptime('2019-09-27 00:00:00', '%Y-%m-%d %H:%M:%S')]
data_day_delta = (data_range[1]-data_range[0]).days
for delta_day in range(data_day_delta + 1):
data_add_delta_day = data_range[0] + datetime.timedelta(days=delta_day)
for hour in range(24):
data_add_delta_day_hour = data_add_delta_day + datetime.timedelta(hours=hour)
data_add_delta_day_hour = str(data_add_delta_day_hour)
print(f'更新第{data_add_delta_day_hour}天数据')
sql = f"select id, grid_id, data, to_char(published_at, 'YYYY-MM-DD HH24:MI:SS') as published_at from grid_weather where published_at='{data_add_delta_day_hour}'"
# 获取空气质量
db_data = pd.read_sql(sql, engine)
# 如果dataframe不为空
if not db_data.empty:
db_data = pd.merge(db_data, local_data, left_on=['grid_id', 'published_at'], right_on=['id', 'date'])
if not db_data.empty:
# 取特定列
db_data = db_data[['grid_id', 'data', 'published_at', 'ssra', 'id_x']]
print(db_data)
# 将dataframe转换为numpy格式(结果为列表套列表)
db_data = db_data.to_numpy()
insert_data = []
for data in db_data:
# 如果data中的第二个数据中的字典里有solar_radiation字段,则不作任何处理,否则将特定值赋值给该字典中的solar_radiation字段
if 'solar_radiation' in data[1]:
pass
else:
data[1]['solar_radiation'] = data[3]
# data = (json.dumps(data[1]), data[0], data[2])
data = (json.dumps(data[1]), data[4])
# 将数据经过处理后加入一个元组,一个元组就是一条更新信息,然后加入一个列表(此处列表中元组个数为5000个)
insert_data.append(data)
print(insert_data)
if insert_data:
# 对5000个元组组成的列表调用对象方法进行批量更新
db.update_db(insert_data)
2批量更新对象方法
import psycopg2
import psycopg2.extras
class DbHandle:
def __init__(self):
self.link_pgsql = {
'database': 'test',
'user': 'username',
'password': 'password',
'host': '127.0.0.1',
'port': 5432
}
self.link = psycopg2.connect(**self.link_pgsql)
self.corsur = self.link.cursor(cursor_factory=psycopg2.extras.RealDictCursor)
def update_db(self, data, name):
# sql = 'update grid_weather set data=new_data.data::json from (values %s) as new_data (data, grid_id, published_at_time) where grid_weather.grid_id=new_data.grid_id and published_at=published_at_time;'
# 批量更新sql,将传入的列表当作一个临时表new_data,字段为data, id,要和元组的顺序对应,注意传入的若有字符串形式的字典要在字段后接::json将其转换为json格式
columns = data.columns
sql = f"""update {name} set data=new_data.data::json from (values %s) as new_data ({','.join(columns)}) where {name}.id=new_data.id;"""
try:
print(data)
print(len(data))
data = data.to_numpy()
# 批量更新用execute_values进行,传入游标,sql,数据,和最大数据长度
psycopg2.extras.execute_values(self.corsur, sql, data, page_size=5000)
self.link.commit()
print('更新成功')
return True
except Exception as e:
print(e)
print('更新失败')
return False
批量更新要点:
1.先组装数据数组;
2.写批量更新sql(将传入的数组当作临时表,数组中的元素当作临时表中的字段,进行条件判断更新),数组中的元素一般为元组,元组中的字典元素需转换为json格式(py中其形式为字符串),然后在更新时要在字符串形式的json字段后面接::json将其转换为数据库中的json格式,否则会报错,正确形式如下图。
sql = 'update grid_weather set data=new_data.data::json from (values %s) as new_data (data, grid_id, published_at_time) where grid_weather.grid_id=new_data.grid_id and published_at=published_at_time;'此处条件判断时最好用唯一键进行判断,慎用时间相关的字段进行判断,否则有可能会因为两个时间格式不同而无法更新。
3批量插入的三种方法
本方法都是将pandas的dataframe的值批量插入数据库
import psycopg2
import psycopg2.extras
class DbHandle:
def __init__(self):
self.link_pgsql = {
'database': 'test',
'user': 'spider',
'password': '123456',
'host': '127.0.0.1',
'port': 5432
}
self.link = psycopg2.connect(**self.link_pgsql)
self.corsur = self.link.cursor(cursor_factory=psycopg2.extras.RealDictCursor)
def update_db(self, data, name):
"""批量更新"""
# sql = 'update grid_weather set data=new_data.data::json from (values %s) as new_data (data, grid_id, published_at_time) where grid_weather.grid_id=new_data.grid_id and published_at=published_at_time;'
columns = data.columns
sql = f"""update {name} set data=new_data.data::json from (values %s) as new_data ({','.join(columns)}) where {name}.id=new_data.id;"""
try:
print(data)
print(len(data))
data = data.to_numpy()
psycopg2.extras.execute_values(self.corsur, sql, data, page_size=4900)
self.link.commit()
print('更新成功')
return True
except Exception as e:
print(e)
print('更新失败')
return False
def insert_lots_of_by_many(self, df, name):
"""简单实用,属于游标的对象方法"""
# sql = f'insert into {name}(grid_id, data, published_at) values (%s, %s, %s);'
columns = data.columns
sql = f"""insert into {name}({','.join(columns)}) values ({','.join(['%s'] * len(columns))});"""
print(sql)
data = df.to_numpy()
print(data)
self.corsur.executemany(sql, data)
self.link.commit()
def insert_lots_of_by_values(self, data, name):
"""官方推荐,要批量操作的字段的值必须相同"""
columns = data.columns
sql = f'insert into {name}({",".join(columns)}) values %s;'
print(sql)
try:
data = data.to_numpy()
print(data)
print(len(data))
psycopg2.extras.execute_values(self.corsur, sql, data, page_size=4900)
self.link.commit()
print('更新成功')
return True
except Exception as e:
print(e)
print('更新失败')
return False
def insert_lots_of_by_batch(self, data, name):
"""性能好,速度快,属于类方法"""
# sql = f"""insert into {name}(grid_id, data, published_at) values (%s, %s, %s);"""
columns = data.columns
sql = f"""insert into {name}({','.join(columns)}) values ({','.join(['%s']*len(columns))});"""
print(sql)
try:
data = data.to_numpy()
psycopg2.extras.execute_batch(self.corsur, sql, data, page_size=4900)
self.link.commit()
print('更新成功')
return True
except Exception as e:
print(e)
print('更新失败')
return False