psycopg2 conn线程安全吗,会自动关闭吗

2021-08-05  本文已影响0人  hehehehe

https://www.psycopg.org/docs/

connect

连接是线程安全的,可以在多个线程之间共享。有关详细信息,请参阅 线程和进程安全
连接可以用作上下文管理器。请注意,上下文包装事务:如果上下文成功退出,则提交事务,如果退出并出现异常,则回滚事务。请注意,连接不会被上下文关闭,它可以用于多个上下文。

conn = psycopg2.connect(DSN)

with conn:
    with conn.cursor() as curs:
        curs.execute(SQL1)

with conn:
    with conn.cursor() as curs:
        curs.execute(SQL2)

# leaving contexts doesn't close the connection
conn.close()
cursor

从同一连接创建的游标不是孤立的,即一个游标对数据库所做的任何更改都可以立即被其他游标看到。从不同连接创建的游标可以或不能隔离,具体取决于连接的隔离级别。另见rollback()commit()方法。
游标不是线程安全的:多线程应用程序可以从同一个连接创建多个游标,并且应该使用来自单个线程的每个游标。有关详细信息,请参阅线程和进程安全
游标可以用作上下文管理器:离开上下文将关闭游标。

with conn.cursor() as curs:
    curs.execute(SQL)

# the cursor is now closed
事务

从 version 2.5 开始,psycopg2 的连接和游标是上下文管理器,可以与with语句一起使用:

with psycopg2.connect(DSN) as conn:
    with conn.cursor() as curs:
        curs.execute(SQL)

当连接退出with块时,如果块没有引发异常,则提交事务。在异常的情况下,事务被回滚。
当游标退出with块时,它被关闭,释放最终与其关联的任何资源。事务状态不受影响。
一个连接可以在多个with语句中使用,并且每个with块都有效地包装在一个单独的事务中:

conn = psycopg2.connect(DSN)

with conn:
    with conn.cursor() as curs:
        curs.execute(SQL1)

with conn:
    with conn.cursor() as curs:
        curs.execute(SQL2)

conn.close()

警告 与文件对象或其他资源不同,退出连接的 with块不会关闭连接,而只会关闭与其关联的事务。如果你想确保连接在某个时间点后关闭,你仍然应该使用 try-catch 块:

from pathlib import Path

import psycopg2
from psycopg2.extras import execute_batch, execute_values
from tool.logger_config import logger
import yaml

st_geomfromtext = "st_geomfromtext('point(%s %s)',4326)"
st_geomfromtext2 = "st_geomfromtext(%s,4326)"


def list_join_in(items, sep):
    return sep.join("'" + str(item) + "'" for item in items)


def list_join(items, sep):
    return sep.join(str(item) for item in items)


def get_conn(host, port, user, database, password):
    conn = psycopg2.connect(database=database, user=user, password=password,
                            host=host, port=port)
    logger.info("connected success")
    return conn


def close_conn(conn):
    try:
        conn.close()
    except:
        pass


def batch_execute(cursor, sql, val_list, page_size=1000):
    if val_list:
        execute_batch(cursor, sql, val_list, page_size)


def values_execute(cursor, sql, val_list, page_size=1000):
    if val_list:
        execute_values(cursor, sql, val_list, template=None, page_size=page_size)


def many_execute(cursor, sql, val_list):
    if val_list:
        cursor.executemany(sql, val_list)


def get_insert_sql(table_name, insert_fields: list, has_geom: bool) -> str:
    if has_geom:
        sql = f"""
            insert into {table_name} ({list_join(insert_fields, ',')})
            values({"%s," * (len(insert_fields) - 1) + "st_geomfromtext(%s,4326)"})
        """
    else:
        sql = f"""
            insert into {table_name} ({list_join(insert_fields, ',')}) 
            values({"%s," * (len(insert_fields) - 1) + "%s"})
        """
    return sql


def obj2list(fields, obj_list):
    val_list = []
    for obj in obj_list:
        val_list.append([getattr(obj, field) for field in fields])
    return val_list


def get_yaml_db(db_name, cfg_file: Path) -> tuple:
    cfg = yaml.load(cfg_file.read_text(), Loader=yaml.FullLoader)
    database = cfg[db_name]['database']
    user = cfg[db_name]['user']
    password = cfg[db_name]['password']
    host = cfg[db_name]['host']
    port = cfg[db_name]['port']
    return host, port, database, user, password,

上一篇下一篇

猜你喜欢

热点阅读