Python 如何优雅的操作 PyMySQL
一、PyMysql
在使用Python操作MySQL数据过的过程中,基本的增删改查操作如何更加高效优雅的执行。这里将以PyMySQL为例,介绍一下如何使用Python操作数据库。 Python对MySQL数据库进行操作,基本思路是先连接数据库 Connection 对象,建立游标 Cursor 对象,然后执行SQL语句对数据库进行操作,获取执行结果,最终断开连接。大致过程是这样,在对其进行介绍之前,先介绍一些基本的概念。
Connection
Connection 对象即为数据库连接对象,在python中可以使用pymysql.connect()方法创建Connection对象,该方法的常用参数如下:
host:IP地址,字符串类型
user:用户名, 字符串类型
passwd:无默认值;字符串类
db:数据库名称,无默认值;字符串类型(可以不传但是SQL中必须体现)
port:端口, 默认为3306, 整型
charset:设置utf8, 字符串类型
close:关闭当前连接对象
Cursor
Cursor对象即为游标对象,用于执行查询和获取结果,在python中可以使用connect.cursor()创建
execute():执行数据库单个查询或命令,将结果从数据库获取
executemany(): 对一个查询运行多个数据,其返回是:受影响的行数(如果有的话)
close():关闭当前游标对象
Transaction
1.事务是数据库理论中一个比较重要的概念,指访问和更新数据库的一个程序执行单元,具有ACID特性:
原子性(Atomic):事务中的各项操作要么全都做,要么全都不做,任何一项操作的失败都会导致整个事务的失败
一致性(Consistent):事务必须使数据库从一个一致性状态变到另一个一致性状态
隔离性(Isolated):并发执行的事务彼此无法看到对方的中间状态,一个事务的执行不能被其他事务干扰
持久性(Durable):事务一旦提交,它对数据库的改变就是永久性的,可以通过日志和同步备份在故障发生后重建数据。
2.常用事务方法
Connection.commit():正常事务提交
Connection.rollback():事务异常回滚
Connection.autocommit():事务自动提交机制,默认TRUE,设置FALSE则关闭。
二、Python操作MySQL
1.安装
$ pip3 install PyMySQL
2.数据库连接
import pymysql
# 打开数据库连接
db = pymysql.connect(host='127.0.0.1',
user='user',
password='123456',
database='demo',
port=3306,
charset='utf8')
# 使用 cursor() 方法创建一个游标对象 cursor
cursor = db.cursor()
# 使用 execute() 方法执行 SQL 查询
cursor.execute("SELECT * FROM USER;")
# 使用 fetchone() 方法获取单条数据.
data = cursor.fetchone()
print ("Database data : %s " % data)
# 关闭数据库连接
cursor.close()
db.close()
3.数据库DML操作
事务执行过程 import pymysql
# 打开数据库连接
db = pymysql.connect(**config) # 省略连接信息
# 使用cursor()方法获取操作游标
cursor = db.cursor()
# SQL插入语句
sql = """INSERT INTO EMPLOYEE(FIRST_NAME,
LAST_NAME, AGE, SEX, INCOME)
VALUES ('Mac', 'Mohan', 20, 'M', 2000)"""
try:
# 执行sql语句
cursor.execute(sql)
# 提交到数据库执行
db.commit()
except:
# 如果发生错误则回滚
db.rollback()
# 关闭当前游标对象
cursor.close()
# 关闭数据库连接
db.close()
# 执行传入的SQL也可以更加的灵活,可以使用另外一种%s 占位符,后续的参数依次传入。
sql2 = """INSERT INTO EMPLOYEE(FIRST_NAME,
LAST_NAME, AGE, SEX, INCOME)
VALUES ('%s', '%s', %s, '%s', %s)"""
cursor.execute(sql2, 'Mac', 'Mohan', 20, 'M', 2000)
4.数据库DQL操作
Python查询Mysql使用常用几个方法。
fetchone(): 该方法获取下一个查询结果集。结果集是一个对象.
fetchmany():获取结果集的指定几行.
fetchall(): 接收全部的返回结果行.
rowcount: 这是一个只读属性,并返回执行execute()方法后影响的行数。
import pymysql
# 打开数据库连接
db = pymysql.connect(**config)
# 使用cursor()方法获取操作游标
cursor = db.cursor()
# SQL 查询语句
sql = "SELECT * FROM EMPLOYEE WHERE INCOME > %s" % (1000)
try:
# 执行SQL语句
cursor.execute(sql)
# 获取所有记录列表
result1=cursor.fetchone()
result2=cursor.fetchmany(2)
results = cursor.fetchall()
print(result1)
print(result2)
print(results)
except:
print ("Error: unable to fetch data")
# 关闭数据库连接
cursor.close()
db.close()
三、工具类封装
通过封装常用方法将会大大降低对数据库操作的成本。接下来分为几步进行操作:
1.可以通过env文件来存储数据库的连接信息
2.将env文件数据加载进系统环境变量
3.从系统环境变量中获取对应连接数据
4.连接数据库,操作增删改查
# .env
DB_INFO={"host": "127.0.0.1","port":3306,"user": "user","passwd": "123456","charset": "utf8"}
import io
import os
class EnvironmentVarUtils(object):
def __init__(self, fileName=None):
self.file_name = fileName
self._load_dot_env_file(self._get_environment_path())
def _get_environment_path(self):
"""
:return: project_path
"""
return os.path.join(os.path.dirname(os.getcwd()), '.env') if self.file_name is None\
else os.path.join(os.path.dirname(os.getcwd()), self.file_name)
def _load_dot_env_file(self, dot_env_path):
""" load .env file.
Args:
dot_env_path (str): .env file path
"""
if not os.path.isfile(dot_env_path):
raise FileNotFoundError(".env file not found Error.")
print("Loading environment variables from 【{}】".format(dot_env_path))
env_variables_mapping = {}
with io.open(dot_env_path, 'r', encoding='utf-8') as fp:
for line in fp:
if "=" in line:
variable, value = line.split("=", 1)
else:
raise Exception(".env format error")
env_variables_mapping[variable.strip()] = value.strip()
self._set_os_environ(env_variables_mapping)
@staticmethod
def _set_os_environ(variables_mapping):
""" set variables mapping to os.environ """
for variable in variables_mapping:
os.environ[variable] = variables_mapping[variable]
print("Set OS environment variable: {}".format(variable))
@staticmethod
def get_os_environ(variable_name):
""" get value of environment variable.
"""
try:
return os.environ[variable_name]
except Exception as e:
raise e
import pymysql
class SqlHelper(object):
def __init__(self, config):
self.connect = pymysql.connect(**eval(config))
self.connect.autocommit(True)
# default return tuple, DictCursor return Json .
self.cursor = self.connect.cursor()
def __enter__(self):
# DictCursor return Json .
self.cursor = self.connect.cursor(cursor=pymysql.cursors.DictCursor)
return self
def __exit__(self, exc_type, exc_val, exc_tb):
self.cursor.close()
self.connect.close()
def queryAll(self, sql, params=None):
"""
:param sql:
:param params:
:return:
"""
self.cursor.execute(sql, params)
return self.cursor.fetchall()
def queryMany(self, sql, num, params=None):
"""
:param sql:
:param num:
:param params:
:return:
"""
self.cursor.execute(sql, params)
return self.cursor.fetchmany(num)
def queryOne(self, sql, params=None):
"""
:param sql:
:param params:
:return:
"""
self.cursor.execute(sql, params)
return self.cursor.fetchone()
def operation(self, sql, params=None, DML=True):
"""
DML: insert / update / delete
DDL: CREATE TABLE/VIEW/INDEX/SYN/CLUSTER
:param DML:
:param sql:
:param params:
:return:
"""
try:
self.cursor.execute(sql, params)
except Exception as e:
if DML:
self.connect.rollback()
raise e
def batch_operation(self, sql_list, params_list=None, DML=True):
"""
Process multiple SQL files in batches .
:param DML:
:param sql_list:
:param params_list:
:return:
"""
for i in range(len(sql_list)):
try:
if params_list is not None:
self.operation(sql_list[i], params_list[i], DML)
else:
self.operation(sql_list[i], params_list, DML)
except Exception as e:
raise e
def batch_processing(self, sql, params_list, DML=True):
"""
The same SQL is executed multiple times in batches.
:param DML:
:param sql:
:param params_list:
:return:
"""
try:
self.cursor.executemany(sql, params_list)
except Exception as e:
if DML:
self.connect.rollback()
raise e
def __del__(self):
"""
Automatic disconnection
:return:
"""
if self.connect.open: # 解决连接重复关闭的
self.cursor.close()
self.connect.close()
1.正常方式执行
if __name__ == '__main__':
sql = "select age from `demo`.`user` where name= 'Amy';"
env = EnvironmentVarUtils() # 初始化对象加载env文件数据
config = env.get_os_environ("DB_INFO") # 获取指定key数据
# 1.正常方式执行
db = SqlHelper(config)
result = db.queryOne(sql)
print(result)
-------------------------------------------------------------------------------
控制台输出:
Loading environment variables from 【C:\Users\lenovo\PycharmProjects\job\httpRunner_demo\.env】
Set OS environment variable: USERNAME
Set OS environment variable: PASSWORD
Set OS environment variable: BASE_URL
Set OS environment variable: db_info
(18,)
Process finished with exit code 0
2.通过上下文管理的方式执行
if __name__ == '__main__':
sql = "select age from `demo`.`user` where name= 'Amy';"
env = EnvironmentVarUtils() # 初始化对象加载env文件数据
config = env.get_os_environ("DB_INFO") # 获取指定key数据
# 2.上下文管理方式执行
with SqlHelper(config) as db:
result = db.queryOne(sql)
print(result)
-------------------------------------------------------------------------------
控制台输出:
Loading environment variables from 【C:\Users\lenovo\PycharmProjects\job\httpRunner_demo\.env】
Set OS environment variable: USERNAME
Set OS environment variable: PASSWORD
Set OS environment variable: BASE_URL
Set OS environment variable: db_info
{'age': 18}
Process finished with exit code 0
以上亲测可用,可以尝试在自动化项目实践中操作。有问题欢迎留言讨论。