数据库数据库mysql

Python 如何优雅的操作 PyMySQL

2021-11-18  本文已影响0人  虐心笔记

一、PyMysql

在使用Python操作MySQL数据过的过程中,基本的增删改查操作如何更加高效优雅的执行。这里将以PyMySQL为例,介绍一下如何使用Python操作数据库。 Python对MySQL数据库进行操作,基本思路是先连接数据库 Connection 对象,建立游标 Cursor 对象,然后执行SQL语句对数据库进行操作,获取执行结果,最终断开连接。大致过程是这样,在对其进行介绍之前,先介绍一些基本的概念。

Connection

Connection 对象即为数据库连接对象,在python中可以使用pymysql.connect()方法创建Connection对象,该方法的常用参数如下:

host:IP地址,字符串类型
user:用户名, 字符串类型
passwd:无默认值;字符串类
db:数据库名称,无默认值;字符串类型(可以不传但是SQL中必须体现)
port:端口, 默认为3306, 整型
charset:设置utf8, 字符串类型
close:关闭当前连接对象

Cursor

Cursor对象即为游标对象,用于执行查询和获取结果,在python中可以使用connect.cursor()创建

execute():执行数据库单个查询或命令,将结果从数据库获取
executemany(): 对一个查询运行多个数据,其返回是:受影响的行数(如果有的话)
close():关闭当前游标对象

Transaction

1.事务是数据库理论中一个比较重要的概念,指访问和更新数据库的一个程序执行单元,具有ACID特性:

原子性(Atomic):事务中的各项操作要么全都做,要么全都不做,任何一项操作的失败都会导致整个事务的失败
一致性(Consistent):事务必须使数据库从一个一致性状态变到另一个一致性状态
隔离性(Isolated):并发执行的事务彼此无法看到对方的中间状态,一个事务的执行不能被其他事务干扰
持久性(Durable):事务一旦提交,它对数据库的改变就是永久性的,可以通过日志和同步备份在故障发生后重建数据。

2.常用事务方法

Connection.commit():正常事务提交
Connection.rollback():事务异常回滚
Connection.autocommit():事务自动提交机制,默认TRUE,设置FALSE则关闭。

二、Python操作MySQL

1.安装

$ pip3 install PyMySQL

2.数据库连接
        import pymysql
 
        # 打开数据库连接
        db = pymysql.connect(host='127.0.0.1',
                             user='user',
                             password='123456',
                             database='demo',
                             port=3306,
                             charset='utf8')
         
        # 使用 cursor() 方法创建一个游标对象 cursor
        cursor = db.cursor()
         
        # 使用 execute()  方法执行 SQL 查询 
        cursor.execute("SELECT * FROM USER;")
         
        # 使用 fetchone() 方法获取单条数据.
        data = cursor.fetchone()
         
        print ("Database data : %s " % data)
         
        # 关闭数据库连接
        cursor.close()
        db.close()
    
3.数据库DML操作
事务执行过程
        import pymysql
 
        # 打开数据库连接
        db = pymysql.connect(**config) # 省略连接信息
         
        # 使用cursor()方法获取操作游标 
        cursor = db.cursor()
         
        # SQL插入语句
        sql = """INSERT INTO EMPLOYEE(FIRST_NAME,
                 LAST_NAME, AGE, SEX, INCOME)
                 VALUES ('Mac', 'Mohan', 20, 'M', 2000)"""
        try:
           # 执行sql语句
           cursor.execute(sql)
           
           # 提交到数据库执行
           db.commit()
        except:
           # 如果发生错误则回滚
           db.rollback()
         
        # 关闭当前游标对象
        cursor.close()
        # 关闭数据库连接
        db.close()
        
        # 执行传入的SQL也可以更加的灵活,可以使用另外一种%s 占位符,后续的参数依次传入。
        sql2 = """INSERT INTO EMPLOYEE(FIRST_NAME,
                 LAST_NAME, AGE, SEX, INCOME)
                 VALUES ('%s', '%s', %s, '%s', %s)"""
        cursor.execute(sql2, 'Mac', 'Mohan', 20, 'M', 2000)
4.数据库DQL操作

Python查询Mysql使用常用几个方法。

fetchone(): 该方法获取下一个查询结果集。结果集是一个对象.
fetchmany():获取结果集的指定几行.
fetchall(): 接收全部的返回结果行.
rowcount: 这是一个只读属性,并返回执行execute()方法后影响的行数。

        import pymysql
         
        # 打开数据库连接
        db = pymysql.connect(**config)
         
        # 使用cursor()方法获取操作游标 
        cursor = db.cursor()
         
        # SQL 查询语句
        sql = "SELECT * FROM EMPLOYEE WHERE INCOME > %s" % (1000)
        try:
           # 执行SQL语句
           cursor.execute(sql)
           # 获取所有记录列表
           result1=cursor.fetchone()
           result2=cursor.fetchmany(2)
           results = cursor.fetchall()
           print(result1)
           print(result2)
           print(results)
        except:
           print ("Error: unable to fetch data")
         
        # 关闭数据库连接
        cursor.close()
        db.close()

三、工具类封装

通过封装常用方法将会大大降低对数据库操作的成本。接下来分为几步进行操作:
1.可以通过env文件来存储数据库的连接信息
2.将env文件数据加载进系统环境变量
3.从系统环境变量中获取对应连接数据
4.连接数据库,操作增删改查

# .env
DB_INFO={"host": "127.0.0.1","port":3306,"user": "user","passwd": "123456","charset": "utf8"}
import io
import os


class EnvironmentVarUtils(object):
    def __init__(self, fileName=None):
        self.file_name = fileName
        self._load_dot_env_file(self._get_environment_path())

    def _get_environment_path(self):
        """
        :return: project_path
        """
        return os.path.join(os.path.dirname(os.getcwd()), '.env') if self.file_name is None\
            else os.path.join(os.path.dirname(os.getcwd()), self.file_name)

    def _load_dot_env_file(self, dot_env_path):
        """ load .env file.
        Args:
            dot_env_path (str): .env file path
        """
        if not os.path.isfile(dot_env_path):
            raise FileNotFoundError(".env file not found Error.")

        print("Loading environment variables from 【{}】".format(dot_env_path))
        env_variables_mapping = {}

        with io.open(dot_env_path, 'r', encoding='utf-8') as fp:
            for line in fp:
                if "=" in line:
                    variable, value = line.split("=", 1)
                else:
                    raise Exception(".env format error")

                env_variables_mapping[variable.strip()] = value.strip()
        self._set_os_environ(env_variables_mapping)

    @staticmethod
    def _set_os_environ(variables_mapping):
        """ set variables mapping to os.environ """
        for variable in variables_mapping:
            os.environ[variable] = variables_mapping[variable]
            print("Set OS environment variable: {}".format(variable))

    @staticmethod
    def get_os_environ(variable_name):
        """ get value of environment variable.
        """
        try:
            return os.environ[variable_name]
        except Exception as e:
            raise e
import pymysql


class SqlHelper(object):
    def __init__(self, config):
        self.connect = pymysql.connect(**eval(config))
        self.connect.autocommit(True)
        # default return tuple, DictCursor return Json .
        self.cursor = self.connect.cursor()

    def __enter__(self):
        # DictCursor return Json .
        self.cursor = self.connect.cursor(cursor=pymysql.cursors.DictCursor)
        return self

    def __exit__(self, exc_type, exc_val, exc_tb):
        self.cursor.close()
        self.connect.close()

    def queryAll(self, sql, params=None):
        """
        :param sql:
        :param params:
        :return:
        """
        self.cursor.execute(sql, params)
        return self.cursor.fetchall()

    def queryMany(self, sql, num, params=None):
        """
        :param sql:
        :param num:
        :param params:
        :return:
        """
        self.cursor.execute(sql, params)
        return self.cursor.fetchmany(num)

    def queryOne(self, sql, params=None):
        """
        :param sql:
        :param params:
        :return:
        """
        self.cursor.execute(sql, params)
        return self.cursor.fetchone()

    def operation(self, sql, params=None, DML=True):
        """
        DML: insert / update / delete
        DDL: CREATE TABLE/VIEW/INDEX/SYN/CLUSTER
        :param DML:
        :param sql:
        :param params:
        :return:
        """
        try:
            self.cursor.execute(sql, params)
        except Exception as e:
            if DML:
                self.connect.rollback()
            raise e

    def batch_operation(self, sql_list, params_list=None, DML=True):
        """
            Process multiple SQL files in batches .
        :param DML:
        :param sql_list:
        :param params_list:
        :return:
        """
        for i in range(len(sql_list)):
            try:
                if params_list is not None:
                    self.operation(sql_list[i], params_list[i], DML)
                else:
                    self.operation(sql_list[i], params_list, DML)
            except Exception as e:
                raise e


    def batch_processing(self, sql, params_list, DML=True):
        """
         The same SQL is executed multiple times in batches.
        :param DML:
        :param sql:
        :param params_list:
        :return:
        """
        try:
            self.cursor.executemany(sql, params_list)
        except Exception as e:
            if DML:
                self.connect.rollback()
            raise e

    def __del__(self):
        """
            Automatic disconnection
        :return:
        """
        if self.connect.open:  # 解决连接重复关闭的
            self.cursor.close()
            self.connect.close()

1.正常方式执行

if __name__ == '__main__':
    sql = "select age from `demo`.`user` where name= 'Amy';"
    env = EnvironmentVarUtils()  # 初始化对象加载env文件数据
    config = env.get_os_environ("DB_INFO")  # 获取指定key数据

    # 1.正常方式执行
    db = SqlHelper(config)
    result = db.queryOne(sql)
    print(result)

-------------------------------------------------------------------------------

控制台输出:

Loading environment variables from 【C:\Users\lenovo\PycharmProjects\job\httpRunner_demo\.env】
Set OS environment variable: USERNAME
Set OS environment variable: PASSWORD
Set OS environment variable: BASE_URL
Set OS environment variable: db_info
(18,)

Process finished with exit code 0

2.通过上下文管理的方式执行

if __name__ == '__main__':
    sql = "select age from `demo`.`user` where name= 'Amy';"
    env = EnvironmentVarUtils()  # 初始化对象加载env文件数据
    config = env.get_os_environ("DB_INFO")  # 获取指定key数据

    # 2.上下文管理方式执行
    with SqlHelper(config) as db:
        result = db.queryOne(sql)
        print(result)

-------------------------------------------------------------------------------

控制台输出:

Loading environment variables from 【C:\Users\lenovo\PycharmProjects\job\httpRunner_demo\.env】
Set OS environment variable: USERNAME
Set OS environment variable: PASSWORD
Set OS environment variable: BASE_URL
Set OS environment variable: db_info
{'age': 18}

Process finished with exit code 0

以上亲测可用,可以尝试在自动化项目实践中操作。有问题欢迎留言讨论。

上一篇下一篇

猜你喜欢

热点阅读