python

Python连接MYSQL数据库类

2021-01-29  本文已影响0人  yichen_china

pymysql.connect()参数说明

字段 说明
host(str): MySQL服务器地址
port(int): MySQL服务器端口号
user(str): 用户名
passwd(str): 密码
db(str): 数据库名称
charset(str): 连接编码、

connection对象支持的方法

字段 说明
cursor() 使用该连接创建并返回游标
commit() 提交当前事务
rollback() 回滚当前事务
close() 关闭连接

cursor对象支持的方法

字段 说明
execute(op) 执行一个数据库的查询命令
fetchone() 取得结果集的下一行
fetchmany(size) 获取结果集的下几行
fetchall() 获取结果集中的所有行
rowcount() 返回数据条数或影响行数
close() 关闭游标对象

DBHelper.py:

#!D:\Programs\Python\Python37
# -*- coding:utf-8 -*-
# @Author : Echo
# @Time :2021/1/29 10:12
import pymysql
import logging
import sys
from config.database import db_config

# 加入日志
# 获取logger实例
logger = logging.getLogger("baseSpider")
# 指定输出格式
formatter = logging.Formatter('%(asctime)s\
              %(levelname)-8s:%(message)s')
# 文件日志
file_handler = logging.FileHandler("operation_database.log")
file_handler.setFormatter(formatter)
# 控制台日志
console_handler = logging.StreamHandler(sys.stdout)
console_handler.setFormatter(formatter)

# 为logge添加具体的日志处理器
logger.addHandler(file_handler)
logger.addHandler(console_handler)

logger.setLevel(logging.INFO)


class DBHelper():
    # 构造函数,初始化数据库连接
    def __init__(self,sql,params=None):
        self.sql = sql
        self.params = params
        self.conn = None
        self.cur = None

    def connectiondatabase(self):
        print(db_config['host'],db_config['username'],db_config['password'],db_config['database'],db_config['charset'])
        try:
            self.conn = pymysql.connect(host=db_config['host'],user=db_config['username'],
                                    passwd=db_config['password'],db=db_config['database'],charset=db_config['charset'])
        except:
            logger.error("connectDatabase failed")
            return False
        self.cur = self.conn.cursor()
        return True



    # 关闭数据库
    def closedatabase(self):
        # 如果数据打开,则关闭;否则没有操作
        if self.conn and self.cur:
            self.cur.close()
            self.conn.close()
        return True

    # 执行数据库的sq语句,主要用来做插入操作
    def execute(self):
        self.connectiondatabase()
        try:
            if self.conn and self.cur:
                # 正常逻辑,执行sql,提交操作
                self.cur.execute(self.sql,self.params)
                self.conn.commit()
        except:
            logger.error("execute failed: " + self.sql)
            logger.error("params: " + self.params)
            self.closedatabase()
            return False
        return True

    # 用来查询表数据
    def select(self):
        self.connectiondatabase()
        # 执行SQL语句
        b=self.cur.execute(self.sql,self.params)
        # print(self.cur.description)  # 获取字段列表
        result = self.cur.fetchall()

     #返回值和数据表字段组成json格式
        lists = []
        t = 0
        for x in result:
            i = 0
            onelist = {}
            for field in self.cur.description:
                onelist[field[0]] = x[i]
                i = i + 1
            lists.append(onelist)
        # for q in lists:
        #     print(q['update_time'])
        #     # print(q['data'])
        # # print(result)
        return lists

配置信息 db_config.py

db_config = {
    'host':'127.0.0.1',
    'port':3306,
    'username':'root',
    'password':'admin',
    'database':'DataBase',
    'charset':'utf8'
}

测试代码:

from mySQL import  DBHelper as mdb


def main():
    # 连接 mysql,获取连接的对象
    SQL=r'SELECT * FROM TUsers'
    db=mdb.DBHelper(SQL)
    rows=db.select()
    # 依次遍历结果集,发现每个元素,就是表中的一条记录,用一个元组来显示
    for row in rows:
        print(row)


if __name__ == '__main__':
    main()
上一篇下一篇

猜你喜欢

热点阅读