使用psycopg2进行批量更新数据库及批量插入

2019-09-30  本文已影响0人  barriers

1处理数据

    def update():
        # 获取太阳光照
        local_data = pd.read_csv('./data.csv')
        # 调用数据库工具类生成对象
        db = DbHandle()
        engine = create_engine("postgresql+psycopg2://username:password@host:port/database")
        data_range = [datetime.datetime.strptime('2019-04-25 08:00:00', '%Y-%m-%d %H:%M:%S'), datetime.datetime.strptime('2019-09-27 00:00:00', '%Y-%m-%d %H:%M:%S')]
        data_day_delta = (data_range[1]-data_range[0]).days
        for delta_day in range(data_day_delta + 1):
            data_add_delta_day = data_range[0] + datetime.timedelta(days=delta_day)
            for hour in range(24):
                data_add_delta_day_hour = data_add_delta_day + datetime.timedelta(hours=hour)
                data_add_delta_day_hour = str(data_add_delta_day_hour)
                print(f'更新第{data_add_delta_day_hour}天数据')
                sql = f"select id, grid_id, data, to_char(published_at, 'YYYY-MM-DD HH24:MI:SS') as published_at from grid_weather where published_at='{data_add_delta_day_hour}'"
                # 获取空气质量
                db_data = pd.read_sql(sql, engine)
                # 如果dataframe不为空
                if not db_data.empty:
                    db_data = pd.merge(db_data, local_data, left_on=['grid_id', 'published_at'], right_on=['id', 'date'])
                    if not db_data.empty:
                        # 取特定列
                        db_data = db_data[['grid_id', 'data', 'published_at', 'ssra', 'id_x']]
                        print(db_data)
                        # 将dataframe转换为numpy格式(结果为列表套列表)
                        db_data = db_data.to_numpy()
                        insert_data = []
                        for data in db_data:
                            # 如果data中的第二个数据中的字典里有solar_radiation字段,则不作任何处理,否则将特定值赋值给该字典中的solar_radiation字段
                            if 'solar_radiation' in data[1]:
                                pass
                            else:
                                data[1]['solar_radiation'] = data[3]
                               # data = (json.dumps(data[1]), data[0], data[2])
                                data = (json.dumps(data[1]), data[4])
                                # 将数据经过处理后加入一个元组,一个元组就是一条更新信息,然后加入一个列表(此处列表中元组个数为5000个)
                                insert_data.append(data)
                        print(insert_data)
                        if insert_data:
                            # 对5000个元组组成的列表调用对象方法进行批量更新
                            db.update_db(insert_data)

2批量更新对象方法

import psycopg2
import psycopg2.extras

class DbHandle:
    def __init__(self):
        self.link_pgsql = {
               'database': 'test',
               'user': 'username',
               'password': 'password',
               'host': '127.0.0.1',
               'port': 5432
          }

        self.link = psycopg2.connect(**self.link_pgsql)
        self.corsur = self.link.cursor(cursor_factory=psycopg2.extras.RealDictCursor)

    def update_db(self, data, name):
            # sql = 'update grid_weather set data=new_data.data::json from (values %s) as new_data (data, grid_id, published_at_time) where grid_weather.grid_id=new_data.grid_id and published_at=published_at_time;'
        # 批量更新sql,将传入的列表当作一个临时表new_data,字段为data, id,要和元组的顺序对应,注意传入的若有字符串形式的字典要在字段后接::json将其转换为json格式
        columns = data.columns
        sql = f"""update {name} set data=new_data.data::json from (values %s) as new_data ({','.join(columns)}) where {name}.id=new_data.id;"""
        try:
            print(data)
            print(len(data))
            data = data.to_numpy()
            # 批量更新用execute_values进行,传入游标,sql,数据,和最大数据长度
            psycopg2.extras.execute_values(self.corsur, sql, data, page_size=5000)
            self.link.commit()
            print('更新成功')
            return True
        except Exception as e:
            print(e)
            print('更新失败')
            return False

批量更新要点:
1.先组装数据数组;
2.写批量更新sql(将传入的数组当作临时表,数组中的元素当作临时表中的字段,进行条件判断更新),数组中的元素一般为元组,元组中的字典元素需转换为json格式(py中其形式为字符串),然后在更新时要在字符串形式的json字段后面接::json将其转换为数据库中的json格式,否则会报错,正确形式如下图。
sql = 'update grid_weather set data=new_data.data::json from (values %s) as new_data (data, grid_id, published_at_time) where grid_weather.grid_id=new_data.grid_id and published_at=published_at_time;'此处条件判断时最好用唯一键进行判断,慎用时间相关的字段进行判断,否则有可能会因为两个时间格式不同而无法更新。

3批量插入的三种方法

本方法都是将pandas的dataframe的值批量插入数据库

import psycopg2
import psycopg2.extras

class DbHandle:
    def __init__(self):
        self.link_pgsql = {
                    'database': 'test',
                    'user': 'spider',
                    'password': '123456',
                    'host': '127.0.0.1',
                    'port': 5432
                }
        self.link = psycopg2.connect(**self.link_pgsql)
        self.corsur = self.link.cursor(cursor_factory=psycopg2.extras.RealDictCursor)
    def update_db(self, data, name):
        """批量更新"""
        # sql = 'update grid_weather set data=new_data.data::json from (values %s) as new_data (data, grid_id, published_at_time) where grid_weather.grid_id=new_data.grid_id and published_at=published_at_time;'
        columns = data.columns
        sql = f"""update {name} set data=new_data.data::json from (values %s) as new_data ({','.join(columns)}) where {name}.id=new_data.id;"""
        try:
            print(data)
            print(len(data))
            data = data.to_numpy()
            psycopg2.extras.execute_values(self.corsur, sql, data, page_size=4900)
            self.link.commit()
            print('更新成功')
            return True
        except Exception as e:
            print(e)
            print('更新失败')
            return False

    def insert_lots_of_by_many(self, df, name):
        """简单实用,属于游标的对象方法"""
        # sql = f'insert into {name}(grid_id, data, published_at) values (%s, %s, %s);'
        columns = data.columns
        sql = f"""insert into {name}({','.join(columns)}) values ({','.join(['%s'] * len(columns))});"""
        print(sql)
        data = df.to_numpy()
        print(data)
        self.corsur.executemany(sql, data)
        self.link.commit()

    def insert_lots_of_by_values(self, data, name):
        """官方推荐,要批量操作的字段的值必须相同"""
        columns = data.columns
        sql = f'insert into {name}({",".join(columns)}) values %s;'
        print(sql)
        try:
            data = data.to_numpy()
            print(data)
            print(len(data))
            psycopg2.extras.execute_values(self.corsur, sql, data, page_size=4900)
            self.link.commit()
            print('更新成功')
            return True
        except Exception as e:
            print(e)
            print('更新失败')
            return False

    def insert_lots_of_by_batch(self, data, name):
        """性能好,速度快,属于类方法"""
        # sql = f"""insert into {name}(grid_id, data, published_at) values (%s, %s, %s);"""
        columns = data.columns
        sql = f"""insert into {name}({','.join(columns)}) values ({','.join(['%s']*len(columns))});"""
        print(sql)
        try:
            data = data.to_numpy()
            psycopg2.extras.execute_batch(self.corsur, sql, data, page_size=4900)
            self.link.commit()
            print('更新成功')
            return True
        except Exception as e:
            print(e)
            print('更新失败')
            return False
上一篇下一篇

猜你喜欢

热点阅读