python多线程并发threading模块

2018-03-11  本文已影响0人  安静的码农

最近开发一个获取所有集群从读写实例的接口,运行接口发现返回数据时间长,达到了9秒多,实在是太慢了!那么慢在哪里呢?

review代码,发现是因为需要读取二十多个不同的数据库查询结果,这些数据库分布在二十多台不同的物理机,代码采用for循环依次读取这些数据库,从而产生耗时严重的情况.

模拟for循环依次读取数据库脚本如下:

#!/usr/bin/python
# encoding=utf-8

import os
import mysql.connector

ossdb_dict = {'上海A':'xxx.xxx.xxx.xxx:3306','上海B':'xxx.xxx.xxx.xxx:3306','上海C':'xxx.xxx.xxx.xxx:3306','北京A':'xxx.xxx.xxx.xxx:3306','北京B':'xxx.xxx.xxx.xxx:3306','北美A':'xxx.xxx.xxx.xxx:3306','天津A':'xxx.xxx.xxx.xxx:3306','天津B':'xxx.xxx.xxx.xxx:3306','广州A':'xxx.xxx.xxx.xxx:3306','广州B':'xxx.xxx.xxx.xxx:3306','广州C':'xxx.xxx.xxx.xxx:3306','广州D':'xxx.xxx.xxx.xxx:3306','德国A':'xxx.xxx.xxx.xxx:3306','成都A':'xxx.xxx.xxx.xxx:3306','新加坡A':'xxx.xxx.xxx.xxx:3306','深圳A':'xxx.xxx.xxx.xxx:3306','深圳B':'xxx.xxx.xxx.xxx:3306','美国A':'xxx.xxx.xxx.xxx:3306','重庆A':'xxx.xxx.xxx.xxx:3306','香港A':'xxx.xxx.xxx.xxx:3306','韩国A':'xxx.xxx.xxx.xxx:3306','广州E':'xxx.xxx.xxx.xxx:3306'}

cluster_dict = {'上海A':49,'上海B':61,'上海C':63,'北京A':204,'北京B':206,'北美A':59,'天津A':64,'天津B':70,'广州A':73,'广州B':22,'广州C':60,'广州永顺':D,'德国A':202,'成都A':201,'新加坡A':71,'深圳A':74,'深圳B':81,'美国A':79,'重庆A':205,'香港A':52,'韩国A':207,'广州E':76}

slave_rw_sql = """select tb_mysql_pair.instance_name,tb_mysql_pair.app_name,tb_mysql_pair.master_port,tb_mysql_pair.slave_port,tb_device_pair.master_ip,tb_device_pair.slave_ip from tb_mysql_pair,tb_device_pair where tb_mysql_pair.status=19 and tb_mysql_pair.device_pair_id=tb_device_pair.pair_id;"""

db_user  = 'xxx'
db_pass = 'xxx'
access_db = 'xxx'   

def execDB(ossdb_dict,cluster_dict,db_user,db_pass,access_db,slave_rw_sql):
         
    cluster_ids  = []
    for key in cluster_dict:
        id   = cluster_dict[key] 
        cluster_ids.append(id)

    ossdb_list = [] 
    for cluster_id in cluster_ids:
        cluster_name = cluster_dict.keys()[cluster_dict.values().index(cluster_id)]
        ossdb  = ossdb_dict[cluster_name]

        ossdb_list.append(ossdb)

    #for循环一个一个读取数据库,是单线程的,所以会比较慢
    for ossdb in ossdb_list:
        cluster_name = ossdb_dict.keys()[ossdb_dict.values().index(ossdb)]
        db_host  = ossdb.split(':',1)[0]
        db_port   = ossdb.split(':',1)[1]

        conn = mysql.connector.connect(
            user  = db_user,
            password = db_pass,
            host  = db_host,
            port   = db_port,
            database = access_db)

        cur = conn.cursor()
        cur.execute(slave_rw_sql)
        slave_rw_results = cur.fetchall()


if __name__=='__main__':
    execDB(ossdb_dict,cluster_dict,db_user,db_pass,access_db,slave_rw_sql)

实际执行脚本,耗时情况:
time python 3.py

real 0m9.444s
user 0m0.060s
sys 0m0.016s

跑完脚本需要0m9.444s,很显然耗时是在for循环依次查询数据库产生的耗时.

是否有办法提高读取数据库的效率,消除耗时呢?答案是肯定的!

python的标准库提供了并发执行的多线程模块thread和threading,thread是低级模块,而threading是高级模块,对thread进行了封装。在绝大多数情况下,我们只需要使用threading这个高级模块.

在python中实现多线程有两种方式,一种就是函数形式,通过将需要执行的方法传入,然后创建多线程实例;另一种就是创建一个类,并且继承threading.Thread类来实现. 这里我只讲第一种方式,比较容易理解一些.

注:
什么是多线程呢?顾名思义,多线程就是在同一个进程的情况下拉起来多个线程,进程和线程之间的关系,就好比是工厂和工人之间的关系。工厂是一个,但是工人有多个,多人干活自然就可以提高生产效率。

来看看多线程并发执行的脚本:

#!/usr/bin/python
# encoding=utf-8


import os
import threading  #引入threading多线程模块
import mysql.connector

ossdb_dict = {'上海A':'xxx.xxx.xxx.xxx:3306','上海B':'xxx.xxx.xxx.xxx:3306','上海C':'xxx.xxx.xxx.xxx:3306','北京A':'xxx.xxx.xxx.xxx:3306','北京B':'xxx.xxx.xxx.xxx:3306','北美A':'xxx.xxx.xxx.xxx:3306','天津A':'xxx.xxx.xxx.xxx:3306','天津B':'xxx.xxx.xxx.xxx:3306','广州A':'xxx.xxx.xxx.xxx:3306','广州B':'xxx.xxx.xxx.xxx:3306','广州C':'xxx.xxx.xxx.xxx:3306','广州D':'xxx.xxx.xxx.xxx:3306','德国A':'xxx.xxx.xxx.xxx:3306','成都A':'xxx.xxx.xxx.xxx:3306','新加坡A':'xxx.xxx.xxx.xxx:3306','深圳A':'xxx.xxx.xxx.xxx:3306','深圳B':'xxx.xxx.xxx.xxx:3306','美国A':'xxx.xxx.xxx.xxx:3306','重庆A':'xxx.xxx.xxx.xxx:3306','香港A':'xxx.xxx.xxx.xxx:3306','韩国A':'xxx.xxx.xxx.xxx:3306','广州E':'xxx.xxx.xxx.xxx:3306'}

cluster_dict = {'上海A':49,'上海B':61,'上海C':63,'北京A':204,'北京B':206,'北美A':59,'天津A':64,'天津B':70,'广州A':73,'广州B':22,'广州C':60,'广州永顺':D,'德国A':202,'成都A':201,'新加坡A':71,'深圳A':74,'深圳B':81,'美国A':79,'重庆A':205,'香港A':52,'韩国A':207,'广州E':76}

slave_rw_sql = """select tb_mysql_pair.instance_name,tb_mysql_pair.app_name,tb_mysql_pair.master_port,tb_mysql_pair.slave_port,tb_device_pair.master_ip,tb_device_pair.slave_ip from tb_mysql_pair,tb_device_pair where tb_mysql_pair.status=19 and tb_mysql_pair.device_pair_id=tb_device_pair.pair_id;"""

db_user  = 'xxx'
db_pass  = 'xxx'
access_db  = 'xxx'   

def execDB(ossdb,ossdb_dict,db_user,db_pass,access_db,slave_rw_sql):
    cluster_name = ossdb_dict.keys()[ossdb_dict.values().index(ossdb)]
    db_host   = ossdb.split(':',1)[0]
    db_port    = ossdb.split(':',1)[1]
    
    conn = mysql.connector.connect(
        user  = db_user,
        password = db_pass,
        host   = db_host,
        port    = db_port,
        database = access_db)
        
    cur = conn.cursor()
    cur.execute(slave_rw_sql)
    slave_rw_results = cur.fetchall()


#定义多线程执行的函数multithread  
def multithread(ossdb_dict,cluster_dict,db_user,db_pass,access_db,slave_rw_sql):
    cluster_ids  = []
    for key in cluster_dict:
        id  = cluster_dict[key]
        cluster_ids.append(id)
        
    ossdb_list = []
    for cluster_id in cluster_ids:
        cluster_name = cluster_dict.keys()[cluster_dict.values().index(cluster_id)]
        ossdb   = ossdb_dict[cluster_name]
        
        ossdb_list.append(ossdb)
    
    #定义一个列表threads,存储要启动多线程的实例
    threads = []
    
    #循环读取多个ossdb,也就是启动了多个线程去查询DB啦~
    for ossdb in ossdb_list:
           #target表示实际要执行读取数据库的函数,multithread函数调用execDB函数,往execDB函数传参.
        t   = threading.Thread(target=execDB,args=(ossdb,ossdb_dict,db_user,db_pass,access_db,slave_rw_sql,))  
        threads.append(t)   #把要启动多线程的实例,追加到列表threads
        

    #把threads列表中的实例遍历出来后,调用start()方法启动多线程,就会有多个线程并发去读取数据库
    for thr in threads:
        thr.start()
    
    for thr in threads:
        #isAlive()可以返回true或者false,用来判断此时是否还有没有执行完的线程,如果还有未执行完的线程就让主线程等待线程执行结束之后,主线程再来结束.
        if thr.isAlive():
            thr.join()
    
if __name__=='__main__':
    multithread(ossdb_dict,cluster_dict,db_user,db_pass,access_db,slave_rw_sql)

上述脚本遍历了两次threads列表,最后一次遍历的目的是为了查看还有没有没有执行完成的子线程,只要还有子线程是活的,没有退出,就通过join()方法强制程序不可以让主线程退出,只有等所有子线程执行完成退出后,才能让主线程退出.

来看看采用多线程并发之后,实际执行脚本耗时:
time python 1.py
real 0m1.927s
user 0m0.064s
sys 0m0.012s

可以看到耗时0m1.927s,效率已经提升了好几倍,这个耗时在可接受范围.

上一篇下一篇

猜你喜欢

热点阅读