python | pymysql模块

2019-03-01  本文已影响12人  Root_123

1、基本增删改查操作
2、python调用存储过程
3、多线程实现mysql存取操作

一、基础操作

#!/usr/bin/env python
# -*- coding: utf-8 -*-
import pymysql 

# 连接数据库
#conn = pymysql.connect('localhost', 'root', 'root')

# 也可以使用关键字参数
#conn = pymysql.connect(host='127.0.0.1', port=3306, user='root', passwd='root', db='', charset='utf8')

# 也可以使用字典进行连接参数的管理
config = {
    'host': '127.0.0.1',
    'port': 3306,
    'user': 'root',
    'passwd': '',
    'charset': 'utf8'
}
conn = pymysql.connect(**config)

# 如果使用事务引擎,可以设置自动提交事务,或者在每次操作完成后手动提交事务conn.commit()
#conn.autocommit(1)    # conn.autocommit(True) 

# 使用cursor()方法获取操作游标
cursor = conn.cursor()
# 因该模块底层其实是调用CAPI的,所以,需要先得到当前指向数据库的指针。

try:
    # 创建数据库
    DB_NAME = 'test'
    cursor.execute('DROP DATABASE IF EXISTS %s' %DB_NAME)
    cursor.execute('CREATE DATABASE IF NOT EXISTS %s' %DB_NAME)
    conn.select_db(DB_NAME)

     #创建表
    TABLE_NAME = 'user'
    cursor.execute('CREATE TABLE %s(id int primary key,name varchar(30))' %TABLE_NAME)

    # 插入单条数据
    sql = 'INSERT INTO user values("%d","%s")' %(1,"jack")
    
    # 批量插入数据
    values = []
    for i in range(3, 20):
        values.append((i,'kk'+str(i)))
    cursor.executemany('INSERT INTO user values(%s,%s)',values)
   
   # 查询数据条目
    count = cursor.execute('SELECT * FROM %s' %TABLE_NAME)
    print('total records: %d' %count)
    print('total records:', cursor.rowcount)

    # 获取表名信息
    desc = cursor.description
    print("%s %3s" % (desc[0][0], desc[1][0]))

    # 查询一条记录
    print 'fetch one record:'
    result = cursor.fetchone()
    print result
    print ('id: %s,name: %s' %(result[0],result[1]))

    # 查询多条记录
    print 'fetch five record:'
    results = cursor.fetchmany(5)
    for r in results:
        print (r)

    print('===============')

     # 查询所有记录
    # 重置游标位置,偏移量:大于0向后移动;小于0向前移动,mode默认是relative
    # relative:表示从当前所在的行开始移动; absolute:表示从第一行开始移动
    cursor.scroll(0,mode='absolute')
    results = cursor.fetchall()
    for r in results:
        print r
    print('--------------')
    cursor.scroll(-2)
    results = cursor.fetchall()
    for r in results:
        print r

    # 更新记录
    cursor.execute('UPDATE %s SET name = "%s" WHERE id = %s' %(TABLE_NAME,'Jack',1))



    # 如果没有设置自动提交事务,则这里需要手动提交一次
    conn.commit()
except:
    import traceback
    traceback.print_exc()
    # 发生错误时会滚
    conn.rollback()
finally:
    # 关闭游标连接
    cursor.close()
    # 关闭数据库连接
    conn.close()

每次都连接关闭很麻烦,使用上下文管理,简化连接过程

import pymysql
import contextlib
#定义上下文管理器,连接后自动关闭连接
@contextlib.contextmanager
def mysql(host='127.0.0.1', port=3306, user='root', passwd='', db='tkq1',charset='utf8'):
  conn = pymysql.connect(host=host, port=port, user=user, passwd=passwd, db=db, charset=charset)
  cursor = conn.cursor(cursor=pymysql.cursors.DictCursor)
  try:
    yield cursor
  finally:
    conn.commit()
    cursor.close()
    conn.close()
 
# 执行sql
with mysql() as cursor:
  print(cursor)
  row_count = cursor.execute("select * from tb7")
  row_1 = cursor.fetchone()
  print row_count, row_1

二、如何python调用 callproc 进行调用储存过程

1.创建完整的Mysql数据库连接
2.使用cursor()初始化数据库游标
3.使用游标来调用callproc函数 里面添加 需要传入的变量 例如 callproc(name,args) name="proc_user",args=['21',syh];
4.cursor可以传递出一系列的结果集,使用storeresult来获取一系列的iterator指向结果集
5.用fetchall方法获取结果

callproc 无法直接获得out和INOUT变量 ,但是变量存在server中,可以通过@_procname_n 来获取变量值,可以按照传入参数的位置获取,如第1个 SELECT @_procname_0

调用无参存储过程

#! /usr/bin/env python
# -*- coding:utf-8 -*-
# __author__ = "TKQ"

import pymysql

conn = pymysql.connect(host='127.0.0.1', port=3306, user='root', passwd='', db='tkq1')
#游标设置为字典类型
cursor = conn.cursor(cursor=pymysql.cursors.DictCursor)
#无参数存储过程
cursor.callproc('p2')    #等价于cursor.execute("call p2()")

row_1 = cursor.fetchone()
print row_1

conn.commit()
cursor.close()
conn.close()

调用有参存储过程

#! /usr/bin/env python
# -*- coding:utf-8 -*-
# __author__ = "TKQ"

import pymysql

conn = pymysql.connect(host='127.0.0.1', port=3306, user='root', passwd='', db='tkq1')
cursor = conn.cursor(cursor=pymysql.cursors.DictCursor)

cursor.callproc('p1', args=(1, 22, 3, 4))
#获取执行完存储的参数,参数@开头
cursor.execute("select @p1,@_p1_1,@_p1_2,@_p1_3")   #{u'@_p1_1': 22, u'@p1': None, u'@_p1_2': 103, u'@_p1_3': 24}
row_1 = cursor.fetchone()
print row_1


conn.commit()
cursor.close()
conn.close()

三、多线程、多进程操作mysql

1、多线程连接:

起初使用threading.Thread模块,先建立一个MySQL连接,然后由多个线程来执行具体的SQL。但发现在执行的时候,不是报MySQL连接被关闭,就是出现其他异常错误。上网查询,是因为多个线程无法共享一个数据库连接,会出现不可预测的情况。

官方建议使用连接池模块,参照了他人的做法,创建线程连接池,一次性创建多个连接。

2、创建线程池的三种方法
1)过去:
使用threadpool模块,这是个python的第三方模块,支持python2和python3。threadpool是一个比较老的模块了,现在虽然还有一些人在用,但已经不再是主流了,关于python多线程,现在已经开始步入未来(future模块)了具体使用方式如下:

import threadpool
import time

def sayhello (a):
    print("hello: "+a)
    time.sleep(2)

def main():
    global result
    seed=["a","b","c"]
    start=time.time()
    task_pool=threadpool.ThreadPool(5)
    requests=threadpool.makeRequests(sayhello,seed)
    for req in requests:
        task_pool.putRequest(req)
    task_pool.wait()
    end=time.time()
    time_m = end-start
    print("time: "+str(time_m))
    start1=time.time()
    for each in seed:
        sayhello(each)
    end1=time.time()
    print("time1: "+str(end1-start1))

if __name__ == '__main__':
    main()

运行结果如下:


2)未来:
使用concurrent.futures模块,这个模块是python3中自带的模块,但是,python2.7以上版本也可以安装使用,具体使用方式如下:

from concurrent.futures import ThreadPoolExecutor
import time

def sayhello(a):
    print("hello: "+a)
    time.sleep(2)

def main():
    seed=["a","b","c"]
    start1=time.time()
    for each in seed:
        sayhello(each)
    end1=time.time()
    print("time1: "+str(end1-start1))
    start2=time.time()
    with ThreadPoolExecutor(3) as executor:
        for each in seed:
            executor.submit(sayhello,each)
    end2=time.time()
    print("time2: "+str(end2-start2))
    start3=time.time()
    with ThreadPoolExecutor(3) as executor1:
        executor1.map(sayhello,seed)
    end3=time.time()
    print("time3: "+str(end3-start3))

if __name__ == '__main__':
    main()

运行结果如下:


【注意】
concurrent.futures.ThreadPoolExecutor,在提交任务的时候,有两种方式,一种是submit()函数,另一种是map()函数,两者的主要区别在于:

3)现在?
这里要考虑一个问题,以上两种线程池的实现都是封装好的,任务只能在线程池初始化的时候添加一次,那么,假设我现在有这样一个需求,需要在线程池运行时,再往里面添加新的任务(注意,是新任务,不是新线程),那么要怎么办?
其实有两种方式:

#! /usr/bin/env python
# -*- coding: utf-8 -*-

import threading
import Queue
import hashlib
import logging
from utils.progress import PrintProgress
from utils.save import SaveToSqlite


class ThreadPool(object):
    def __init__(self, thread_num, args):

        self.args = args
        self.work_queue = Queue.Queue()
        self.save_queue = Queue.Queue()
        self.threads = []
        self.running = 0
        self.failure = 0
        self.success = 0
        self.tasks = {}
        self.thread_name = threading.current_thread().getName()
        self.__init_thread_pool(thread_num)

    # 线程池初始化
    def __init_thread_pool(self, thread_num):
        # 下载线程
        for i in range(thread_num):
            self.threads.append(WorkThread(self))
        # 打印进度信息线程
        self.threads.append(PrintProgress(self))
        # 保存线程
        self.threads.append(SaveToSqlite(self, self.args.dbfile))

    # 添加下载任务
    def add_task(self, func, url, deep):
        # 记录任务,判断是否已经下载过
        url_hash = hashlib.new('md5', url.encode("utf8")).hexdigest()
        if not url_hash in self.tasks:
            self.tasks[url_hash] = url
            self.work_queue.put((func, url, deep))
            logging.info("{0} add task {1}".format(self.thread_name, url.encode("utf8")))

    # 获取下载任务
    def get_task(self):
        # 从队列里取元素,如果block=True,则一直阻塞到有可用元素为止。
        task = self.work_queue.get(block=False)

        return task

    def task_done(self):
        # 表示队列中的某个元素已经执行完毕。
        self.work_queue.task_done()

    # 开始任务
    def start_task(self):
        for item in self.threads:
            item.start()

        logging.debug("Work start")

    def increase_success(self):
        self.success += 1

    def increase_failure(self):
        self.failure += 1

    def increase_running(self):
        self.running += 1

    def decrease_running(self):
        self.running -= 1

    def get_running(self):
        return self.running

    # 打印执行信息
    def get_progress_info(self):
        progress_info = {}
        progress_info['work_queue_number'] = self.work_queue.qsize()
        progress_info['tasks_number'] = len(self.tasks)
        progress_info['save_queue_number'] = self.save_queue.qsize()
        progress_info['success'] = self.success
        progress_info['failure'] = self.failure

        return progress_info

    def add_save_task(self, url, html):
        self.save_queue.put((url, html))

    def get_save_task(self):
        save_task = self.save_queue.get(block=False)

        return save_task

    def wait_all_complete(self):
        for item in self.threads:
            if item.isAlive():
                # join函数的意义,只有当前执行join函数的线程结束,程序才能接着执行下去
                item.join()

# WorkThread 继承自threading.Thread
class WorkThread(threading.Thread):
    # 这里的thread_pool就是上面的ThreadPool类
    def __init__(self, thread_pool):
        threading.Thread.__init__(self)
        self.thread_pool = thread_pool

    #定义线程功能方法,即,当thread_1,...,thread_n,调用start()之后,执行的操作。
    def run(self):
        print (threading.current_thread().getName())
        while True:
            try:
                # get_task()获取从工作队列里获取当前正在下载的线程,格式为func,url,deep
                do, url, deep = self.thread_pool.get_task()
                self.thread_pool.increase_running()

                # 判断deep,是否获取新的链接
                flag_get_new_link = True
                if deep >= self.thread_pool.args.deep:
                    flag_get_new_link = False

                # 此处do为工作队列传过来的func,返回值为一个页面内容和这个页面上所有的新链接
                html, new_link = do(url, self.thread_pool.args, flag_get_new_link)

                if html == '':
                    self.thread_pool.increase_failure()
                else:
                    self.thread_pool.increase_success()
                    # html添加到待保存队列
                    self.thread_pool.add_save_task(url, html)

                # 添加新任务,即,将新页面上的不重复的链接加入工作队列。
                if new_link:
                    for url in new_link:
                        self.thread_pool.add_task(do, url, deep + 1)

                self.thread_pool.decrease_running()
                # self.thread_pool.task_done()
            except Queue.Empty:
                if self.thread_pool.get_running() <= 0:
                    break
            except Exception, e:
                self.thread_pool.decrease_running()
                # print str(e)
                break

3、多进程连接
多进程(multiprocessing模块),具体使用看这里multiprocessing模块的使用
可以和MySQL建立多个连接并发执行SQL。对比执行耗时,整体性能比单个进程快,但其中单个SQL的执行效率,多进程没有单进程执行的快。
demo:

#!/usr/bin/env python
# -*- coding:utf-8 -*-
import MySQLdb
from multiprocessing import Process,Pool

class mysqlopr():
    '''省略'''

pool = Pool(5)      ####设置进程数
for i in range(10):
    pool.apply_async(func=run_sql_func, args=(i,))      ####异步执行
    #pool.apply(func=run_sql_func, args=(arg,))                 ####同步执行,官方不建议使用,python3.+版本已无该方法
pool.close()
pool.join()  # 进程池中进程执行完毕后再关闭,如果注释,那么程序直接关闭。
上一篇下一篇

猜你喜欢

热点阅读