python 花里胡哨的东西
2021-06-16 本文已影响0人
hehehehe
contextlib --- 为 with语句上下文提供的工具
- 可以以一种更加优雅的方式,操作(创建/获取/释放)资源,如文件操作、数据库连接;
- 可以以一种更加优雅的方式,处理异常;
class Resource():
def __enter__(self):
print('===connect to resource===')
return self
def __exit__(self, exc_type, exc_val, exc_tb):
print('===close resource connection===')
return True
def operate(self):
1/0
with Resource() as res:
res.operate()
运行一下,惊奇地发现,居然不会报错。
这就是上下文管理协议的一个强大之处,异常可以在exit 进行捕获并由你自己决定如何处理,是抛出呢还是在这里就解决了。在exit 里return True(没有return 就默认为 return False),就相当于告诉 Python解释器,这个异常我们已经捕获了,不需要再往外抛了, return False 则会报错
在 写exit 函数时,需要注意的事,它必须要有这三个参数:
- exc_type:异常类型
- exc_val:异常值
- exc_tb:异常的错误栈信息
当主逻辑代码没有报异常时,这三个参数将都为None。
在被装饰函数里,必须是一个生成器(带有yield),而yield之前的代码,就相当于enter里的内容。yield 之后的代码,就相当于exit 里的内容。
from contextlib import contextmanager
@contextmanager
def file_open(path):
try:
f_obj = open(path,"w")
yield f_obj
except OSError:
print("We had an error!")
finally:
print("Closing file")
f_obj.close()
with file_open("test/test.txt") as fobj:
fobj.write("Testing context managers")
import time
from concurrent.futures import ProcessPoolExecutor, ThreadPoolExecutor
from contextlib import contextmanager
from datetime import datetime
from typing import Generator
from core.session import SessionLocal
@contextmanager
def session_resource():
try:
session = SessionLocal()
yield session
print("commit...")
session.commit()
except Exception as e:
print(e)
print("rollback...")
session.rollback()
finally:
print("close...")
session.close()
def get_db() -> Generator:
try:
db = SessionLocal()
yield db
finally:
db.close()
class MyResource:
def query(self):
print('query data')
def try_finally() -> str:
try:
print("start try_finally")
yield MyResource()
except Exception as e:
print("except")
finally:
print("end try_finally")
@contextmanager
def make_myresource():
print('start contextmanager')
yield MyResource()
print('end contextmanager')
def dow():
with session_resource() as session:
rows = session.execute("select * from hn_test ")
for row in rows:
print(row)
if __name__ == '__main__':
# test()[0]
# print(list(test()))
# r = try_finally()
# r2 = next(r)
# r2.query()
# with make_myresource() as r:
# r.query()
start = datetime.now()
with ThreadPoolExecutor(max_workers=5) as pool:
for i in range(100000):
futures = pool.submit(dow)
time.sleep(1000)
from urllib.parse import quote_plus
from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker
from_database = "xx"
from_user = "xxx"
from_pw = "xxx"
from_host = "xxx"
from_port = "xx"
pg_conn = f"postgresql://{from_user}:{quote_plus(from_pw)}@{from_host}:{from_port}/{from_database}"
engine = create_engine(pg_conn, pool_size=2, pool_recycle=3600, max_overflow=2, pool_pre_ping=True)
SessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=engine)