Python多进程 多线程

Python 进程,线程,协程及IO模型

2018-08-22  本文已影响218人  月亮是我踢弯得

一. 操作系统概念

操作系统位于底层硬件与应用软件之间的一层.工作方式: 向下管理硬件,向上提供接口.
操作系统进行进程切换: 1. 出现IO操作;2. 固定时间.
固定时间很短,人感受不到.每个应用层运行起来的程序都是进程.

二. 进程与线程的概念

1. 进程

程序仅仅只是一堆代码而已,而进程指的是程序的运行过程.需要强调的是:同一个程序执行两次,那也是两个进程.
进程: 资源管理单位(容器).
线程: 最小执行单位,管理线程的是进程.
进程的定义:
进程就是一个程序在一个数据集上的一次动态执行过程.进程一般由程序,数据集,进程控制块三部分组成.我们编写的程序 用来描述进程完成那些功能以及如何完成;数据集 则是程序在执行过程中多需要使用的资源;进程控制块 用来记录进程外部特征,描述进程的执行变化过程,系统可以用它来控制和管理进程,它是系统感知进程存在的唯一标志.

举一例说明进程:
想象一位有一手好厨艺的计算机科学家正在为他的女儿烘制生日蛋糕。他有做生日蛋糕的食谱,厨房里有所需的原料:面粉、鸡蛋、糖、香草汁等。在这个比喻中,做蛋糕的食谱就是程序(即用适当形式描述的算法)计算机科学家就是处理器(cpu),而做蛋糕的各种原料就是输入数据。进程就是厨师阅读食谱、取来各种原料以及烘制蛋糕等一系列动作的总和。现在假设计算机科学家的儿子哭着跑了进来,说他的头被一只蜜蜂蛰了。计算机科学家就记录下他照着食谱做到哪儿了(保存进程的当前状态),然后拿出一本急救手册,按照其中的指示处理蛰伤。这里,我们看到处理机从一个进程(做蛋糕)切换到另一个高优先级的进程(实施医疗救治),每个进程拥有各自的程序(食谱和急救手册)。当蜜蜂蛰伤处理完后,这位计算机科学家又回来做蛋糕,从他离开时的那一步继续做下去。

2. 线程

线程的出现是为了降低上下文切换的消耗,提高系统的并发性,并突破一个进程只能干一样事的缺陷,使得进程内并发成为可能. 假设,一个文本程序,需要接受键盘输入,将内容显示在屏幕上,还需要保存信息到硬盘中。若只有一个进程,势必造成同一时间只能干一样事的尴尬(当保存时,就不能通过键盘输入内容)。若有多个进程,每个进程负责一个任务,进程A负责接收键盘输入的任务,进程B负责将内容显示在屏幕上的任务,进程C负责保存内容到硬盘中的任务。这里进A,B,C间的协作涉及到了进程通信问题,而且有共同都需要拥有的东西——-文本内容,不停的切换造成性能上的损失。若有一种机制,可以使任务A,B,C共享资源,这样上下文切换所需要保存和恢复的内容就少了,同时又可以减少通信所带来的性能损耗,那就好了。是的,这种机制就是线程。
线程也叫轻量级进程,它是一个基本的CPU执行单元,也是程序执行过程中的最小单元,由 线程ID,程序计数器,寄存器集合,和堆栈, 共同组成.线程的引入减小了进程并发执行的开销,提高了操作系统的并发性能.线程没有自己的系统资源.

3. 进程与线程的关系

在传统操作系统中,每个进程有一个地址空间,而且默认就有一个控制进程.
多线程(即多个控制线程)的概念是,在一个进程中存在多个控制线程,控制该进程的地址空间.
进程只是用来把资源集中到一起(进程只是一个资源单位,或者说资源集合),而线程才是cpu上的执行单位.

进程是计算机中的程序关于某数据集合上的一次运行活动,是系统进行资源分配和调度的基本单位,是操作系统结构的基础。或者说进程是具有一定独立功能的程序关于某个数据集合上的一次运行活动,进程是系统进行资源分配和调度的一个独立单位。

线程则是进程的一个实体,是CPU调度和分派的基本单位,它是比进程更小的能独立运行的基本单位。

进程与线程的关系.png

进程和线程的关系:

4. 并行与并发

无论是并行还是并发,在用户看来都是'同事'运行的,而一个CPU同一时刻只能执行一个任务.
并行: 同时运行,只有具备多个cpu才能实现并行.
并发: 是伪并行,即看起来是同时运行,单个cpu+多道技术.

所有现代计算机经常会在同一时间做很多件事,一个用户的PC(无论是单cpu还是cpu),都可以同时运行多个任务(一个任务可以理解为一个进程)。当启动系统时,会秘密启动许多进程:
    启动一个进程来杀毒(360软件)
    启动一个进程来看电影(暴风影音)
    启动一个进程来聊天(腾讯QQ)
所有的这些进程都需被管理,于是一个支持多进程的多道程序系统是至关重要的。

多道技术: 内存中同时存入多道(多个)程序,cpu从一个进程快速切换到另一个,使每个进程各自运行几十或几百毫秒,这样,虽然在某一个瞬间,一个cpu只能执行一个任务,但在1内,cpu却可以运行多个进程,这就给人产生了并行的错觉,即伪并发,以此来区分多处理器操作系统的真正硬件并行(多个cpu共享同一个物理内存)。

.串行并行 并发.png

5. 同步和异步

举个例子,打电话时就是同步通信,发短息时就是异步通信。

6. 进程的创建

但凡是硬件,都需要有操作系统去管理,只要有操作系统,就有进程的概念,就需要有创建进程的方式,一些操作系统只为一个应用程序设计,比如微波炉中的控制器,一旦启动微波炉,所有的进程都已经存在。

而对于通用系统(跑很多应用程序),需要有系统运行过程中创建或撤销进程的能力,主要分为4中形式创建新的进程

无论哪一种,新进程的创建都是由一个已经存在的进程执行了一个用于创建进程的系统调用而创建的:

关于创建的子进程,UNIX和windows

7. 进程的终止

8. 进程的层次结构

无论UNIX还是windows,进程只有一个父进程,不同的是:

9. 进程的状态

ail -f access.log |grep '404'
  执行程序tail,开启一个子进程,执行程序grep,开启另外一个子进程,两个进程之间基于管道'|'通讯,将tail的结果作为grep的输入。
  进程grep在等待输入(即I/O)时的状态称为阻塞,此时grep命令都无法运行

其实在两种情况下会导致一个进程在逻辑上不能运行,

因而一个进程有三个状态


进程的三种状态.png

10. 进程并发的实现

进程并发的实现在于,硬件中断一个正在运行的进程,把此时进程运行的所有状态保存下来,为此,操作系统维护一张表格,即进程表(process table),每个进程占用一个进程表项(这些表项也称为进程控制块)


进程表项.png

该表存放了进程状态的重要信息:程序计数器、堆栈指针、内存分配状况、所有打开文件的状态、帐号和调度信息,以及其他在进程由运行态转为就绪态或阻塞态时,必须保存的信息,从而保证该进程在再次启动时,就像从未被中断过一样。

三. threading模块

python的多线程:由于GIL,导致同一进程中只能有一个线程运运行在cpu上,而不能有多个线程同时在一个cpu上运行.
实现多线程的并发需要使用threading模块.

线程对象的创建

1. Thread类直接创建

# 多线程的并发,只能是交给一个cpu执行,不能多个cpu执行.即多个线程不能实现并行.

# 多线程并发方式一:
import threading
import time
def tingge():
    print("听歌")
    time.sleep(3)
    print("听歌结束")
def xieboke():
    print("写博客")
    time.sleep(5)
    print("写博客结束")
    print(time.time()-s)  # 计算整个程序运行时间,不能放在函数外,不然要和另外三个进程竞争,导致其输出的时间不准确。

s = time.time()
t1 = threading.Thread(target=tingge)  # 创建听歌线程,多线程的主进程.
t2 = threading.Thread(target=xieboke) # 创建写博客线程,多线程的主进程
t1.strat()  # 运行听歌线程,多线程的子线程
t2.start()  #运行写博客线程,多线程的子进程
print("ending")  # 多线程的主进程
=========================
听歌
ending
写博客
听歌结束
写博客结束
5.0137574672698975

因为三个线程t1.start(),t2.start() 和print("ending")之间竞争的原因,print("ending")竞争成功,所以先运行print("ending")这个进程。
当tingge函数中睡眠更改为8s的话,不考虑小数点的情况下,真实运行时间为8s

2. Thread类继承式创建

# 调用多线程方式二
import threading
import time

class MyThread(threading.Thread):
     def __init__(self,num):
         threading.Thread.__init__(self)
         self.num = num

     def run(self):
         print("running on  number:%s" %self.num)
         time.sleep(3)

t1 = MyThread(56)
t2 = MyThread(78)

t1.start()   # 该进程运行run函数原因,请查看源码,一系列的调用最终是调用run函数

t2.start()   # 该进程运行run函数原因,请查看源码,一系列的调用最终是调用run函数

print("ending")

3. Thread类的实例方法

join()

import threading
from time import ctime,sleep
import time
def Music(name):
    print("Begin listenning to {name}.{time}".format(name=name,time=ctime()))
    sleep(3)
    print("end listening {time}".format(time=ctime()))

def Blog(title):
    print("Begin recording the {title}.{time}".format(title=title,time=ctime()))
    sleep(5)
    print("end recording {time}".format(time=ctime()))

threads = []

t1 = threading.Thread(target=Music,args=("FILL ME",))
t2 = threading.Thread(target=Blog,args=("",))

threads.append(t1)
threads.append(t2)

if __name__ == "__main__":
      for t in threads:
          t.start()

      t.join()
      print("all over %s "%ctime())
==============================
Begin listenning to FILL ME.Tue Aug 21 17:15:29 2018
Begin recording the python.Tue Aug 21 17:15:29 2018    #Music函数和Blog函数同时运行
end listening Tue Aug 21 17:15:32 2018
end recording Tue Aug 21 17:15:34 2018
all over Tue Aug 21 17:15:34 2018
#添加t1.join()                       
                                   
import threading                   
from time import ctime,sleep       
import time                        
def Music(name):                   
    print("Begin listenning to {nam
    sleep(3)                       
    print("end listening {time}".fo
                                   
def Blog(title):                   
    print("Begin recording the {tit
    sleep(5)                       
    print("end recording {time}".fo
                                   
threads = []                       
                                   
t1 = threading.Thread(target=Music,
t2 = threading.Thread(target=Blog,a
                                   
threads.append(t1)                 
threads.append(t2)                 
                                   
if __name__ == "__main__":         
    for t in threads:              
        t.start()    #子进程
                                   
    t1.join()    #添加堵塞            
    print("all over %s" %ctime())  
=====================
Begin listenning to FILL ME.Tue Aug 21 17:30:36 2018
Begin recording the python.Tue Aug 21 17:30:36 2018
end listening Tue Aug 21 17:30:39 2018  #但只有当子进程Music函数运行完才能运行主进程,所以这里打印结果与上一个程序顺序不同
all over Tue Aug 21 17:30:39 2018
end recording Tue Aug 21 17:30:41 2018   #整个程序执行时间为5s
#t1.join()改成t2.join()查看运行结果          
                                     
import threading                     
from time import ctime,sleep         
import time                          
def Music(name):                     
    print("Begin listenning to {name}
    sleep(3)                         
    print("end listening {time}".form
                                     
def Blog(title):                     
    print("Begin recording the {title
    sleep(5)                         
    print("end recording {time}".form
                                     
threads = []                         
                                     
t1 = threading.Thread(target=Music,ar
t2 = threading.Thread(target=Blog,arg
                                     
threads.append(t1)                   
threads.append(t2)                   
                                     
if __name__ == "__main__":           
    for t in threads:                
        t.start()                    
                                     
    t2.join()                        
    print("all over %s" %ctime())    
========================
Begin listenning to FILL ME.Tue Aug 21 17:32:44 2018
Begin recording the python.Tue Aug 21 17:32:44 2018
end listening Tue Aug 21 17:32:47 2018
end recording Tue Aug 21 17:32:49 2018
all over Tue Aug 21 17:32:49 2018  #整个程序执行时间为5s
import threading                      
from time import ctime,sleep          
import time                           
def Music(name):                      
    print("Begin listenning to {name}.
    sleep(3)                          
    print("end listening {time}".forma
                                      
def Blog(title):                      
    print("Begin recording the {title}
    sleep(5)                          
    print("end recording {time}".forma
                                      
threads = []                          
                                      
t1 = threading.Thread(target=Music,arg
t2 = threading.Thread(target=Blog,args
                                      
threads.append(t1)                    
threads.append(t2)                    
                                      
if __name__ == "__main__":            
    for t in threads:                 
        t.start()                     
                                      
        t.join()                      
    print("all over %s" %ctime())     
===============================
Begin listenning to FILL ME.Tue Aug 21 17:34:43 2018
end listening Tue Aug 21 17:34:46 2018
Begin recording the python.Tue Aug 21 17:34:46 2018  #函数Music和函数Blog不能同时进行
end recording Tue Aug 21 17:34:51 2018
all over Tue Aug 21 17:34:51 2018    #运行时间8s

4. setDaemon()设置守护线程

将线程声明为守护线程,必须在start() 方法调用之前设置,如果不设置为守护线程程序会被无限挂起。
当我们在程序运行中,执行一个主线程,如果主线程又创建一个子线程,主线程和子线程就分兵两路,分别运行,那么当主线程完
成。想退出时,会检验子线程是否完成。如果子线程未完成,则主线程会等待子线程完成后再退出。
但是有时候我们需要的是只要主线程 完成了,不管子线程是否完成,都要和主线程一起退出,这时就可以 用setDaemon方法了。

# 主进程结束但子进程未结束,整个程序同样结束。
import threading
from time import ctime,sleep
 
def Music(name):
        print ("Begin listening to {name}. {time}".format(name=name,time=ctime()))
        sleep(3)
        print("end listening {time}".format(time=ctime()))
 
def Blog(title):
        print ("Begin recording the {title}. {time}".format(title=title,time=ctime()))
        sleep(5)
        print('end recording {time}'.format(time=ctime()))
 
threads = []
 
t1 = threading.Thread(target=Music,args=('FILL ME',))
t2 = threading.Thread(target=Blog,args=('python',))
 
threads.append(t1)
threads.append(t2)
 
if __name__ == '__main__':
    for t in threads:
        t.setDaemon(True)   # 注意:一定在start之前设置
        t.start()
 
    print ("all over %s" %ctime())
============================
Begin listening to FILL ME. Tue Aug 21 19:02:20 2018
all over Tue Aug 21 19:02:20 2018
Begin recording the python. Tue Aug 21 19:02:20 2018

只设置t1为守护线程

import threading
from time import ctime,sleep
 
def Music(name):
        print ("Begin listening to {name}. {time}".format(name=name,time=ctime()))
        sleep(3)
        print("end listening {time}".format(time=ctime()))
 
def Blog(title):
        print ("Begin recording the {title}. {time}".format(title=title,time=ctime()))
        sleep(5)
        print('end recording {time}'.format(time=ctime()))
 
threads = []
 
t1 = threading.Thread(target=Music,args=('FILL ME',))
t2 = threading.Thread(target=Blog,args=('python',))
 
threads.append(t1)
threads.append(t2)
 
if __name__ == '__main__':
    t1.setDaemon(True)  # 注意:一定在start之前设置
    t1.start()
    t2.start()
 
    print ("all over %s" %ctime())
=============================
Begin listening to FILL ME. Tue Aug 21 19:26:54 2018
all over Tue Aug 21 19:26:54 2018
Begin recording the python. Tue Aug 21 19:26:54 2018
end listening Tue Aug 21 19:26:57 2018
end recording Tue Aug 21 19:26:59 2018  #因为t1运行时间比较长,所以t1运行完其他线程也都运行完毕。

只设置t2为守护线程

import threading
from time import ctime,sleep
 
def Music(name):
        print ("Begin listening to {name}. {time}".format(name=name,time=ctime()))
        sleep(3)
        print("end listening {time}".format(time=ctime()))
 
def Blog(title):
        print ("Begin recording the {title}. {time}".format(title=title,time=ctime()))
        sleep(5)
        print('end recording {time}'.format(time=ctime()))
 
threads = []
 
t1 = threading.Thread(target=Music,args=('FILL ME',))
t2 = threading.Thread(target=Blog,args=('python',))
 
threads.append(t1)
threads.append(t2)
 
if __name__ == '__main__':
    t2.setDaemon(True)  # 注意:一定在start之前设置
    t1.start()
    t2.start()
 
    print ("all over %s" %ctime())
==============================
Begin listening to FILL ME. Mon May  8 17:54:44 2017
Begin recording the python. Mon May  8 17:54:44 2017
all over Mon May  8 17:54:44 2017
end listening Mon May  8 17:54:47 2017   #因为t2进程运行只有3s,而t1进程运行需要5s,所以当t2进程和主进程运行完毕,整个程序就结束,不管t1是否运行完毕。

5. 其它方法

Thread实例对象的方法
t.start() : 激活线程,
t.getName() : 获取线程的名称
t.setName() : 设置线程的名称
t.name : 获取或设置线程的名称
t.is_alive() : 判断线程是否为激活状态
t.isAlive() :判断线程是否为激活状态
t.setDaemon() 设置为后台线程或前台线程(默认:False);通过一个布尔值设置线程是否为守护线程,必须在执行start()方法之后才可以使用。如果是后台线程,主线程执行过程中,后台线程也在进行,主线程执行完毕后,后台线程不论成功与否,均停止;如果是前台线程,主线程执行过程中,前台线程也在进行,主线程执行完毕后,等待前台线程也执行完成后,程序停止
t.isDaemon() : 判断是否为守护线程
t.ident :获取线程的标识符。线程标识符是一个非零整数,只有在调用了start()方法之后该属性才有效,否则它只返回None。
t.join() :逐个执行每个线程,执行完毕后继续往下执行,该方法使得多线程变得无意义
t.run():线程被cpu调度后自动执行线程对象的run方法

threading模块提供的一些方法:
threading.currentThread(): 返回当前的线程变量。
threading.enumerate(): 返回一个包含正在运行的线程的list。正在运行指线程启动后、结束前,不包括启动前和终止后的线程。
threading.activeCount(): 返回正在运行的线程数量,与len(threading.enumerate())有相同的结果。

import os
import threading
from time import ctime, sleep


def Music(name):
    print("Begin listening to {name}. {time}".format(name=name, time=ctime()))
    sleep(3)
    print("线程数:", threading.activeCount())  # threading.activeCount()线程数:3
    print("正在运行的线程:", threading.enumerate())  # 正在运行的线程
    print("正在运行Music线程的id:",os.getpid(),"正在运行Music线程的上一级进程id:",os.getppid())
    print("end listening {time}".format(time=ctime()))


def Blog(title):
    print("Begin recording the {title}. {time}".format(title=title, time=ctime()))
    sleep(5)
    print('end recording {time}'.format(time=ctime()))
   
threads = []

t1 = threading.Thread(target=Music, args=('FILL ME',), name="sub_thread")  # name="sub_thread"定义线程名
t2 = threading.Thread(target=Blog, args=('python',))

threads.append(t1)
threads.append(t2)

if __name__ == '__main__':
    t2.setDaemon(True)  # 注意:一定在start之前设置
    t1.start()
    t2.start()

    print("all over %s" % ctime())
================================
Begin listening to FILL ME. Tue Aug 21 19:49:05 2018
all over Tue Aug 21 19:49:05 2018
Begin recording the python. Tue Aug 21 19:49:05 2018
线程数: 3
正在运行的线程: [<_MainThread(MainThread, stopped 140104305641216)>, <Thread(sub_thread, started 140104279365376)>, <Thread(Thread-1, started daemon 140104270972672)>]
正在运行Music线程的id: 124750 正在运行Music线程的上一级进程id: 52971
end listening Tue Aug 21 19:49:08 2018

四. GIL(全局解释器锁)

1. GIL定义

In CPython, the global interpreter lock, or GIL, is a mutex that prevents multiple
native threads from executing Python bytecodes at once. This lock is necessary mainly
because CPython’s memory management is not thread-safe. (However, since the GIL
exists, other features have grown to depend on the guarantees that it enforces.)

GIL加在cpython解释器中,其他的python解释器不会有GIL.
Python中的线程是操作系统的原生线程,Python虚拟机使用一个全局解释器锁(Global Interpreter Lock)来互斥线程对Python虚拟机的使用。为了支持多线程机制,一个基本的要求就是需要实现不同线程对共享资源访问的互斥,所以引入了GIL。
GIL : 在一个线程拥有了解释器的访问权之后,其他的所有线程都必须等待它释放解释器的访问权,即使这些线程的下一条指令并不会互相影响。

在调用任何Python C API之前,要先获得GIL

GIL优缺点:

2. GIL的早期设计

Python支持多线程,而解决多线程之间数据完整性和状态同步的最简单方法自然就是加锁。 于是有了GIL这把超级大锁,而当越来越多的代码库开发者接受了这种设定后,他们开始大量依赖这种特性(即默认python内部对象是thread-safe的,无需在实现时考虑额外的内存锁和同步操作)。 慢慢的这种实现方式被发现是蛋疼且低效的。但当大家试图去拆分和去除GIL的时候,发现大量库代码开发者已经重度依赖GIL而非常难以去除了。有多难?做个类比,像MySQL这样的“小项目”为了把Buffer Pool Mutex这把大锁拆分成各个小锁也花了从5.5到5.6再到5.7多个大版为期近5年的时间,并且仍在继续。MySQL这个背后有公司支持且有固定开发团队的产品走的如此艰难,那又更何况Python这样核心开发和代码贡献者高度社区化的团队呢?

3. GIL的影响

无论你启多少个线程,你有多少个cpu, Python在执行一个进程的时候会淡定的在同一时刻只允许一个线程运行。 所以,python是无法利用多核CPU实现多线程的。 这样,python对于计算密集型的任务开多线程的效率甚至不如串行(没有大量切换),但是,对于IO密集型的任务效率还是有显著提升的。

GIL.png

计算密集型:一直在使用CPU.
IO密集型:存在 大量的IO操作.
对于IO密集型任务,python的多线程能够节省时间.
对于计算密集型任务,Python的多线程并没有用处.

#单线程即cpu串行情况下,查看运行时间
import time
def cal(n):
    sum=0
    for i in range(n):
       sum+=i
 
s=time.time()
 
cal(50000000)
cal(50000000)

print("time",time.time()-s)
=========================
time 7.650153636932373   #python3串行运行结果
('time', 12.600000143051147)    #python2串行运行结果
import time
import threading 
def cal(n):
    sum=0
    for i in range(n):
       sum+=i
s=time.time()

t1=threading.Thread(target=cal,args=(50000000,))
t2=threading.Thread(target=cal,args=(50000000,))
 
t1.start()
t2.start()
t1.join()
t2.join()
 
print("time",time.time()-s)
==========================
time 7.961973667144775   #python3中多线程运行时间
('time', 20.12600016593933)        #python2中多线程运行时间

从上述单线程和多线程运行结果来看,不管在python2或者3中运行结果均显示多线程比单线程运行时间更长。
因为GIL锁限制你只有一个线程执行,切换进程浪费时间,导致多线程话费时间更多。

python3中时间差不明显的原因是因为python3改进了GIL锁,但根本没有解决问题。

4. 解决方案

1. python使用多核,即开多个进程。
2. 终极思路:

换C模块实现多线程,即换一个python解释器,或者换门编程语言避免GIL锁。

5. 多进程

multiprocessing替代Thread multiprocessing库的出现很大程度上是为了弥补thread库因为GIL而低效的缺陷。它完整的复制了一套thread所提供的接口方便迁移。唯一的不同就是它使用了多进程而不是多线程。每个进程有自己的独立的GIL,因此也不会出现进程之间的GIL争抢。

# coding:utf8
from multiprocessing import Process
import time

def counter():
    i = 0
    for _ in range(40000000):
        
        i = i + 1

    return True

def main():
    l = []
    start_time = time.time()

    for _ in range(2):
        t = Process(target=counter)
        t.start()
        l.append(t)
        # t.join()

    for t in l:
        t.join()

    end_time = time.time()
    print("Total time: {}".format(end_time - start_time))

if __name__ == '__main__':
    main()
==============================
py2.7:
     串行:6.1565990448 s
     并行:3.1639978885 s
py3.5:
     串行:6.556925058364868 s
     并发:3.5378448963165283 s

当然multiprocessing也不是万能良药。它的引入会增加程序实现时线程间数据通讯和同步的困难。就拿计数器来举例子,如果我们要多个线程累加同一个变量,对于thread来说,申明一个global变量,用thread.Lock的context包裹住三行就搞定了。而multiprocessing由于进程之间无法看到对方的数据,只能通过在主线程申明一个Queue,put再get或者用share memory的方法。
这个额外的实现成本使得本来就非常痛苦的多线程程序编码,变得更加痛苦了。

总结: 因为GIL的存在,只有IO Bound场景下得多线程会得到较好的性能 - 如果对并行计算性能较高的程序可以考虑把核心部分也成C模块,或者索性用其他语言实现 - GIL在较长一段时间内将会继续存在,但是会不断对其进行改进所以对于GIL,既然不能反抗,那就学会去享受它吧!

五. 同步锁(Lock)

先看代码:

import time
import threading

def addNum():
    global num  # 在每个线程中都获取这个全局变量
    num -= 1

num = 100  # 设定一个共享变量

thread_list = []

for i in range(100):
    t = threading.Thread(target=addNum)
    t.start()
    thread_list.append(t)

for t in thread_list:  # 等待所有线程执行完毕
    t.join()
print('Result: ', num)
==================================
Result:  0

修改addNum中的代码:

import time
import threading

def addNum():
    global num  # 在每个线程中都获取这个全局变量
    temp = num
    time.sleep(1)
    num = temp - 1  # 对此公共变量进行-1操作


num = 100  # 设定一个共享变量
thread_list = []
for i in range(100):  # 循环进行了100次addNum函数
    t = threading.Thread(target=addNum)
    t.start()
    thread_list.append(t)
for t in thread_list:  # 等待所有线程执行完毕
    t.join()
print('Result: ', num)
============================
Result:  99  

100个线程同时竞争运行函数,睡眠1s肯定够100个进程运行到同时处于睡眠的状态,第一个竞争到的肯定率先醒来速度极快计算完,num=99,线程2醒来从上面携带的global num=100同样计算num=99

睡眠时间较短时

import time
import threading
 
def addNum():
    global num #在每个线程中都获取这个全局变量
    temp=num
    time.sleep(0.001)
    num =temp-1  # 对此公共变量进行-1操作
 
num = 100  #设定一个共享变量
thread_list = []
 
for i in range(100):            #循环进行了100次addNum函数
    t = threading.Thread(target=addNum)
    t.start()
    thread_list.append(t)
 
for t in thread_list: #等待所有线程执行完毕
    t.join()
 
print('Result: ', num)
==============================
93或91或89...

每次执行程序结果都不同。100个线程因为GIL大锁的原因竞争运行函数,for循环第一次时线程1率先运行函数,线程1最快运行到time.sleep(0.001)睡眠时,GIL释放,线程1还未运行完addNum函数。for循环了2,3...次,线程2,3...竞争到运行函数,假设线程1 醒来时不知道有多少个线程在同时运行函数,当线程1计算num值,num值改变了,改变后的num值对在函数中的线程2,3...计算时num已经不是100的初始值了,num值由于一直不停的有线程进入一直在改变。而且线程1睡眠时不知道有多少个线程同时在睡眠,最后的结果肯定也不同。

上述就是线程安全问题,数据不可控,不安全,解决方法就是再创建一把锁。

锁通常被用来实现对共享资源的同步访问。 为每一个共享资源创建一个Lock对象,当你需要访问该资源时,调用acquire方法来获取锁对象(如果其它线程已经获得了该锁,则当前线程需等待其被释放),待资源访问完后,再调用release方法释放锁:

#注意获取锁和释放锁的位置
import time
import threading
 
def addNum():
    global num
    lock.acquire()      #获取这把锁
    temp=num
    time.sleep(0.01)
    num =temp-1
    lock.release()      #释放这把锁
 
num = 100
thread_list = []
 
lock=threading.Lock()       #创建一把锁
 
for i in range(100):
    t = threading.Thread(target=addNum)
    t.start()
    thread_list.append(t)
for t in thread_list:
    t.join()
print('Result: ', num)
==============================
Result:  0

上锁的作用是这个线程未结束其他线程无法竞争,只能等,是一个串行,运行时间为0.001s*100次。
但与join()不同的是:join()是整个程序是串行的,上锁的话只有公共数据部分加锁,是串行的,程序其他内容还是并行的。
但上锁后的程序可能会出现死锁的情况.

六. 死锁与递归锁

所谓死锁: 是指两个或两个以上的进程或线程在执行过程中,因争夺资源而造成的一种互相等待的现象,若无外力作用,它们都将无法推进下去。此时称系统处于死锁状态或系统产生了死锁,这些永远在互相等待的进程称为死锁进程。

import threading
import time

mutexA = threading.Lock()   #创建一把锁
mutexB = threading.Lock()


class MyThread(threading.Thread):

    def __init__(self):
        threading.Thread.__init__(self)

    def run(self):
        self.fun1()
        self.fun2()

    def fun1(self):
        mutexA.acquire()  # 如果锁被占用,则阻塞在这里,等待锁的释放

        print("I am %s , get res: %s---%s" % (self.name, "ResA", time.time()))

        mutexB.acquire()
        print("I am %s , get res: %s---%s" % (self.name, "ResB", time.time()))
        mutexB.release()
        mutexA.release()   #释放这把锁

    def fun2(self):
        mutexB.acquire()
        print("I am %s , get res: %s---%s" % (self.name, "ResB", time.time()))
        time.sleep(0.2)

        mutexA.acquire()  #获取这把锁
        print("I am %s , get res: %s---%s" % (self.name, "ResA", time.time()))
        mutexA.release()

        mutexB.release()

if __name__ == "__main__":

    print("start-----------%s" % time.time())

    for i in range(0, 10):
        my_thread = MyThread()
        my_thread.start()

出现死锁不停竞争,程序卡住。

解决方法:
在python中为了支持在同一线程中多次请求同一资源,python提供了可重入锁RLock。这个RLock内部维护着一个Lock和一个counter变量,counter记录了acquire的次数,从而使得资源可以被多次require。直到一个线程所有的acquire都被release,其他的线程才能获得资源。上面的例子如果使用RLock代替Lock,则不会发生死锁:

rl = threading.RLock() # 递归锁
rl.acquire() # 上锁 计数+1 counter=1
rl.acquire() # 上锁 计数+1 counter=2
...
rl.release() # 解锁 计数-1 counter=1
rl.release() # 解锁 计数-1 counter=0

counter记录了acquire的次数,直到一个线程所有的acquire都被release,即count为0时,其他线程才可以访问该资源。

import threading
import time

Rlock = threading.RLock()  # 创建一个递归锁


class MyThread(threading.Thread):
    def __init__(self):
        threading.Thread.__init__(self)

    def run(self):
        self.func1()
        self.func2()

    def func1(self):
        Rlock.acquire()  # 如果锁被占用,则阻塞在这里,等待锁的释放 counter = 1
        print('I am %s ,get res: %s --- %s ' % (self.name, 'ResA', time.time()))

        Rlock.acquire()  # counter = 2
        print('I am %s ,get res: %s --- %s ' % (self.name, 'ResB', time.time()))
        Rlock.release()  # counter = 1

        Rlock.release()  # counter = 0

    def func2(self):
        Rlock.acquire()  # counter = 1
        print('I am %s ,get res: %s --- %s ' % (self.name, 'ResB', time.time()))
        time.sleep(0.2)

        Rlock.acquire()  # counter = 2
        print('I am %s ,get res: %s --- %s ' % (self.name, 'ResA', time.time()))
        Rlock.release()  # counter = 1   #释放这把锁

        Rlock.release()  # counter = 0

if __name__ == '__main__':
    print('start ----------- %s' % time.time())

    for i in range(0, 10):
        mt = MyThread()
        mt.start()

七. 同步条件 Event对象

线程之间的通信作用
线程的一个关键特性是每个线程都是独立运行且状态不可预测。 如果程序中的其 他线程需要通过判断某个线程的状态来确定自己下一步的操作,这时线程同步问题就 会变得非常棘手。 为了解决这些问题,我们需要使用threading库中的Event对象。 对象包含一个可由线程设置的信号标志,它允许线程等待某些事件的发生。在 初始情况下,Event对象中的信号标志被设置为假。 如果有线程等待一个Event对象, 而这个Event对象的标志为假,那么这个线程将会被一直阻塞直至该标志为真。一个线程如果将一个Event对象的信号标志设置为真,它将唤醒所有等待这个Event对象的线程。如果一个线程等待一个已经被设置为真的Event对象,那么它将忽略这个事件, 继续执行。

event.isSet():返回event的状态值,False或True;
event.wait():如果event.isSet()==False将阻塞线程,可以加参数,表示等待秒数;
event.set(): 设置event的状态值将为True,所有阻塞池的线程激活进入就绪状态, 等待操作系统调度;
event.clear():恢复event的状态值为False。

Event.png

可以考虑一种应用场景(仅仅作为说明),例如,我们有多个线程从Redis队列中读取数据来处理,这些线程都要尝试去连接Redis的服务,一般情况下,如果Redis连接不成功,在各个线程的代码中,都会去尝试重新连接。如果我们想要在启动时确保Redis服务正常,才让那些工作线程去连接Redis服务器,那么我们就可以采用threading.Event机制来协调各个工作线程的连接操作:主线程中会去尝试连接Redis服务,如果正常的话,触发事件,各工作线程会尝试连接Redis服务。

import threading
import time
import logging

logging.basicConfig(level=logging.DEBUG, format='(%(threadName)-10s) %(message)s', )

def worker(event):
    logging.debug('Waiting for redis ready...')
    event.wait()
    logging.debug('redis ready, and connect to redis server and do some work [%s]', time.ctime())
    time.sleep(1)

def main():
    readis_ready = threading.Event()
    t1 = threading.Thread(target=worker, args=(readis_ready,), name='t1')
    t1.start()

    t2 = threading.Thread(target=worker, args=(readis_ready,), name='t2')
    t2.start()

    logging.debug('first of all, check redis server, make sure it is OK, and then trigger the redis ready event')
    time.sleep(3)  # simulate the check progress
    readis_ready.set()

if __name__ == "__main__":
    main()
=======================================
(t1        ) Waiting for redis ready...
(t2        ) Waiting for redis ready...
(MainThread) first of all, check redis server, make sure it is OK, and then trigger the redis ready event
(t1        ) redis ready, and connect to redis server and do some work [Wed Aug 22 00:43:20 2018]
(t2        ) redis ready, and connect to redis server and do some work [Wed Aug 22 00:43:20 2018]

threading.Eventwait方法还接受一个超时参数,默认情况下如果事件一致没有发生,wait方法会一直阻塞下去,而加入这个超时参数之后,如果阻塞时间超过这个参数设定的值之后,wait方法会返回。对应于上面的应用场景,如果Redis服务器一致没有启动,我们希望子线程能够打印一些日志来不断地提醒我们当前没有一个可以连接的Redis服务,我们就可以通过设置这个超时参数来达成这样的目的:

import logging
import time
import threading

logging.basicConfig(level=logging.DEBUG, format='(%(threadName)-10s) %(message)s', )

def worker(event):
    logging.debug('等待redis准备…')

    while not event.isSet():
        logging.debug('等待连接...')
        event.wait(3)  # if flag = False阻塞,等待flag = True 继续执行

    logging.debug('redis准备好,并连接到redis服务器和做一些工作 %s', time.ctime())
    time.sleep(1)

def main():
    r = threading.Event()  # flag = False

    t1 = threading.Thread(target=worker, args=(r,), name='t1')
    t1.start()

    t2 = threading.Thread(target=worker, args=(r,), name='t2')
    t2.start()

    logging.debug('首先,检查redis服务器,确保它是OK,然后触发复述事件做好准备')

    time.sleep(6)
    r.set()  # flag = True

============================
(t1        ) 等待redis准备…
(t1        ) 等待连接...
(t2        ) 等待redis准备…
(MainThread) 首先,检查redis服务器,确保它是OK,然后触发复述事件做好准备
(t2        ) 等待连接...
(t1        ) 等待连接...
(t2        ) 等待连接...
(t1        ) 等待连接...
(t2        ) 等待连接...
(t1        ) redis准备好,并连接到redis服务器和做一些工作 Wed Aug 22 00:50:30 2018
(t2        ) redis准备好,并连接到redis服务器和做一些工作 Wed Aug 22 00:50:30 2018

这样,我们就可以在等待Redis服务启动的同时,看到工作线程里正在等待的情况。

八. Semaphore(信号量)

同时只有n个线程可以获得semaphore,即可以限制最大连接数为n)

Semaphore管理一个内置的计数器,
每当调用acquire()时内置计数器-1;
调用release() 时内置计数器+1;
计数器不能小于0;当计数器为0时,
acquire()
将阻塞线程直到其他线程调用release()。

实例:(同时只有5个线程可以获得semaphore,即可以限制最大连接数为5):

import threading
import time

semaphore = threading.Semaphore(5)
 
def func():
    if semaphore.acquire():
        print(threading.currentThread().getName() + ' get semaphore',time.ctime())
        time.sleep(2)
        semaphore.release()

for i in range(20):
    t1 = threading.Thread(target=func)
    t1.start()
========================================
Thread-1 get semaphore Mon Oct 23 18:44:40 2017
Thread-2 get semaphore Mon Oct 23 18:44:40 2017
Thread-3 get semaphore Mon Oct 23 18:44:40 2017
Thread-4 get semaphore Mon Oct 23 18:44:40 2017
Thread-5 get semaphore Mon Oct 23 18:44:40 2017
Thread-8 get semaphore Mon Oct 23 18:44:42 2017
Thread-9 get semaphore Mon Oct 23 18:44:42 2017
Thread-7 get semaphore Mon Oct 23 18:44:42 2017
Thread-6 get semaphore Mon Oct 23 18:44:42 2017
Thread-10 get semaphore Mon Oct 23 18:44:42 2017
Thread-13 get semaphore Mon Oct 23 18:44:44 2017
Thread-11 get semaphore Mon Oct 23 18:44:44 2017
Thread-12 get semaphore Mon Oct 23 18:44:44 2017
Thread-15 get semaphore Mon Oct 23 18:44:44 2017
Thread-14 get semaphore Mon Oct 23 18:44:44 2017
Thread-16 get semaphore Mon Oct 23 18:44:46 2017
Thread-18 get semaphore Mon Oct 23 18:44:46 2017
Thread-17 get semaphore Mon Oct 23 18:44:46 2017
Thread-19 get semaphore Mon Oct 23 18:44:46 2017
Thread-20 get semaphore Mon Oct 23 18:44:46 2017

20个线程同时获取,但每次只能运行5个线程,所以运行程序显示的结果是5个5个的打印出来。

九. multiprocessing模块

由于GIL的存在,Python不存在多线程,要充分利用多核资源,就需要使用多进程。
multiprocessing模块是Python中的多进程管理包。

通过multiprocessing.Process对象来创建一个进程,Process对象与Thread对象的用法相同,也有start(), run(), join()的方法。

multiprocessing与threading一样,调用同一套API。

1. python的进程调用

from multiprocessing import Process
import time
  
def f(name):
    print('hello', name, time.ctime())
    time.sleep(1)
  
if __name__ == '__main__':
    p_list = []
    for i in range(3):
        p = Process(target=f, args=('xuyaping:%s' % i,))
        p_list.append(p)
        p.start()
    for i in p_list:
        p.join()
    print('end')
===================================
hello xuyaping:0 Mon Oct 23 18:57:11 2017
hello xuyaping:2 Mon Oct 23 18:57:11 2017
hello xuyaping:1 Mon Oct 23 18:57:11 2017
end
from multiprocessing import Process
import time
 
class MyProcess(Process):
    def __init__(self):
        super(MyProcess, self).__init__()
        # self.name = name
 
    def run(self):
        print('hello', self.name, time.ctime())
        time.sleep(1)
 
if __name__ == '__main__':
    p_list = []
    for i in range(3):
        p = MyProcess()
        p.start()
        p_list.append(p)
 
    for p in p_list:
        p.join()
    print('end')
================================================
hello MyProcess-1 Mon Oct 23 18:59:08 2017
hello MyProcess-2 Mon Oct 23 18:59:08 2017
hello MyProcess-3 Mon Oct 23 18:59:08 2017
end

2 . process类

Process([group [, target [, name [, args [, kwargs]]]]])
  group: 线程组,目前还没有实现,库引用中提示必须是None;
  target:要执行的方法;
  name:进程名;
  args/kwargs:要传入方法的参数。

is_alive():返回进程是否在运行。
join([timeout]):阻塞当前上下文环境的进程程,直到调用此方法的进程终止或到达指定的timeout(可选参数)。
start():进程准备就绪,等待CPU调度
run():strat()调用run方法,如果实例进程时未制定传入target,这star执行t默认run()方法。
terminate():不管任务是否完成,立即停止工作进程

daemon:和线程的setDeamon功能一样
name:进程名字。
pid:进程号。

from multiprocessing import Process
import os
import time
def info(name):
    print("name:",name)
    print('parent process:', os.getppid())
    print('process id:', os.getpid())
    print("------------------")
    time.sleep(1)
 
def foo(name):
    info(name)

if __name__ == '__main__':
 
    info('main process line')

    p1 = Process(target=info, args=('xuyaping',))
    p2 = Process(target=foo, args=('egon',))
    p1.start()
    p2.start()
 
    p1.join()
    p2.join()
 
    print("ending")
========================================
name: main process line
parent process: 9900
process id: 13264
------------------
name: xuyaping
name: egon
parent process: 13264
process id: 13720
------------------
parent process: 13264
process id: 20128
------------------
ending

通过tasklist(Win)或者ps -elf |grep(linux)命令检测每一个进程号(PID)对应的进程名

十. 进程间通讯

1. 进程队列Queue

from multiprocessing import Process, Queue
import queue


def f(q, n):
    # q.put([123, 456, 'hello'])
    q.put(n * n + 1)
    print("son process", id(q))


if __name__ == '__main__':
    q = Queue()  # try: q=queue.Queue()
    print("main process", id(q))

    for i in range(3):
        p = Process(target=f, args=(q, i))
        p.start()

    print(q.get())
    print(q.get())
    print(q.get())
============================================
main process 140231804541808
son process 140231804541808
1
son process 140231804541808
2
son process 140231804541808
5

2. 管道(pipe)

The Pipe() function returns a pair of connection objects connected by a pipe which by default is duplex (two-way). For example:

from multiprocessing import Process, Pipe
 
def f(conn):
    conn.send([12, {"name":"xyp"}, 'hello'])
    response=conn.recv()
    print("response",response)
    conn.close()
    print("q_ID2:",id(child_conn))
 
if __name__ == '__main__':
 
    parent_conn, child_conn = Pipe()
    print("q_ID1:",id(child_conn))
    p = Process(target=f, args=(child_conn,))
    p.start()
    print(parent_conn.recv())   # prints "[42, None, 'hello']"
    parent_conn.send("儿子你好!")
    p.join()
=====================================
q_ID1: 140077283723752
[12, {'name': 'xyp'}, 'hello']
response 儿子你好!
q_ID2: 140077283723752

Pipe()返回的两个连接对象代表管道的两端。 每个连接对象都有send()和recv()方法(等等)。 请注意,如果两个进程(或线程)尝试同时读取或写入管道的同一端,管道中的数据可能会损坏。

3. manager

Queue和pipe只是实现了数据交互,并没实现数据共享,即一个进程去更改另一个进程的数据。
A manager object returned by Manager() controls a server process which holds Python objects and allows other processes to manipulate them using proxies.

from multiprocessing import Process, Manager
 
def f(d, l,n):
 
    d[n] = n
    d["name"] ="tom"
    l.append(n)
 
    #print("l",l)
 
if __name__ == '__main__':
 
    with Manager() as manager:
 
        d = manager.dict()
 
        l = manager.list(range(5))
        p_list = []
 
        for i in range(10):
            p = Process(target=f, args=(d,l,i))
            p.start()
            p_list.append(p)
 
        for res in p_list:
            res.join()
 
        print(d)
        print(l)
================================
{4: 4, 'name': 'tom', 2: 2, 1: 1, 3: 3, 0: 0, 5: 5, 6: 6, 7: 7, 8: 8, 9: 9}
[0, 1, 2, 3, 4, 4, 2, 1, 3, 0, 5, 6, 7, 8, 9]

十一. 进程池(Using a pool of workers)

Pool类描述了一个工作进程池,他有几种不同的方法让任务卸载工作进程。

进程池内部维护一个进程序列,当使用时,则去进程池中获取一个进程,如果进程池序列中没有可供使用的进进程,那么程序就会等待,直到进程池中有可用进程为止。

我们可以用Pool类创建一个进程池, 展开提交的任务给进程池。 例:

from multiprocessing import Pool
import time
 
def foo(args):
 time.sleep(1)
 print(args)
 
if __name__ == '__main__':
 p = Pool(5)
 for i in range(30):
     p.apply_async(func=foo, args= (i,))
 
 p.close()   # 等子进程执行完毕后关闭线程池
 # time.sleep(2)
 # p.terminate()  # 立刻关闭线程池
 p.join()

一个进程池对象可以控制工作进程池的哪些工作可以被提交,它支持超时和回调的异步结果,有一个类似map的实现。

processes :使用的工作进程的数量,如果processes是None那么使用os.cpu_count()返回的数量。
initializer: 如果initializer是None,那么每一个工作进程在开始的时候会调用initializer(*initargs)。
maxtasksperchild:工作进程退出之前可以完成的任务数,完成后用一个心的工作进程来替代原进程,来让闲置的资源被释放。maxtasksperchild默认是None,意味着只要Pool存在工作进程就会一直存活。
context:用在制定工作进程启动时的上下文,一般使用 multiprocessing.Pool() 或者一个context对象的Pool()方法来创建一个池,两种方法都适当的设置了context

注意:Pool对象的方法只可以被创建pool的进程所调用。

apply(func[, args[, kwds]]) :使用arg和kwds参数调用func函数,结果返回前会一直阻塞,由于这个原因,apply_async()更适合并发执行,另外,func函数仅被pool中的一个进程运行。
apply_async(func[,args[,kwds[,callback[,error_callback]]]]) : apply()方法的一个变体,会返回一个结果对象。如果callback被指定,那么callback可以接收一个参数然后被调用,当结果准备好回调时会调用callback,调用失败时,则用error_callback替换callback。 Callbacks应被立即完成,否则处理结果的线程会被阻塞。
close() : 阻止更多的任务提交到pool,待任务完成后,工作进程会退出。
terminate() : 不管任务是否完成,立即停止工作进程。在对pool对象进程垃圾回收的时候,会立即调用terminate()。
join() : wait工作线程的退出,在调用join()前,必须调用close() or terminate()。这样是因为被终止的进程需要被父进程调用wait(join等价与wait),否则进程会成为僵尸进程。
map(func, iterable[, chunksize])
map_async(func, iterable[, chunksize[, callback[, error_callback]]])
imap(func, iterable[, chunksize])
imap_unordered(func, iterable[, chunksize])
starmap(func, iterable[, chunksize])
starmap_async(func, iterable[, chunksize[, callback[, error_back]]])

自己做个线程池
简单往队列中传输线程数

import threading
import time
import queue

class Threadingpool():
    def __init__(self,max_num = 10):
        self.queue = queue.Queue(max_num)
        for i in range(max_num):
            self.queue.put(threading.Thread)

    def getthreading(self):
        return self.queue.get()

    def addthreading(self):
        self.queue.put(threading.Thread)

def func(p,i):
    time.sleep(1)
    print(i)
    p.addthreading()

if __name__ == "__main__":
    p = Threadingpool()
    for i in range(20):
        thread = p.getthreading()
        t = thread(target = func, args = (p,i))
        t.start()

往队列中无限添加任务

import queue
import threading
import contextlib
import time

StopEvent = object()


class ThreadPool(object):

    def __init__(self, max_num):
        self.q = queue.Queue()
        self.max_num = max_num

        self.terminal = False
        self.generate_list = []
        self.free_list = []

    def run(self, func, args, callback=None):
        """
        线程池执行一个任务
        :param func: 任务函数
        :param args: 任务函数所需参数
        :param callback: 任务执行失败或成功后执行的回调函数,回调函数有两个参数1、任务函数执行状态;2、任务函数返回值(默认为None,即:不执行回调函数)
        :return: 如果线程池已经终止,则返回True否则None
        """

        if len(self.free_list) == 0 and len(self.generate_list) < self.max_num:
            self.generate_thread()
        w = (func, args, callback,)
        self.q.put(w)

    def generate_thread(self):
        """
        创建一个线程
        """
        t = threading.Thread(target=self.call)
        t.start()

    def call(self):
        """
        循环去获取任务函数并执行任务函数
        """
        current_thread = threading.currentThread
        self.generate_list.append(current_thread)

        event = self.q.get()  # 获取线程
        while event != StopEvent:   # 判断获取的线程数不等于全局变量

            func, arguments, callback = event   # 拆分元祖,获得执行函数,参数,回调函数
            try:
                result = func(*arguments)   # 执行函数
                status = True
            except Exception as e:    # 函数执行失败
                status = False
                result = e

            if callback is not None:
                try:
                    callback(status, result)
                except Exception as e:
                    pass

            # self.free_list.append(current_thread)
            # event = self.q.get()
            # self.free_list.remove(current_thread)
            with self.work_state():
                event = self.q.get()
        else:
            self.generate_list.remove(current_thread)

    def close(self):
        """
        关闭线程,给传输全局非元祖的变量来进行关闭
        :return:
        """
        for i in range(len(self.generate_list)):
            self.q.put(StopEvent)

    def terminate(self):
        """
        突然关闭线程
        :return:
        """
        self.terminal = True
        while self.generate_list:
            self.q.put(StopEvent)
        self.q.empty()

    @contextlib.contextmanager
    def work_state(self):
        self.free_list.append(threading.currentThread)
        try:
            yield
        finally:
            self.free_list.remove(threading.currentThread)

def work(i):
    print(i)
    return i +1 # 返回给回调函数

def callback(ret):
    print(ret)

pool = ThreadPool(10)
for item in range(50):
    pool.run(func=work, args=(item,),callback=callback)

pool.terminate()
# pool.close()

十二. yield和协程(Python协程)

线程和进程的操作是由程序触发系统接口,最后的执行者是系统; 协程的操作则是程序员。

import asyncio

async def cor1():
    print("COR1 start")
    await cor2()
    print("COR1 end")

async def cor2():
    print("COR2")

loop = asyncio.get_event_loop()
loop.run_until_complete(cor1())
loop.close()
==========================================
COR1 start
COR2
COR1 end

最后三行是重点
asyncio.get_event_loop() : asyncio启动默认的event loop
*run_until_complete() : * 这个函数是阻塞执行的,知道所有的异步函数执行完成,
close() : 关闭event loop。

1. greenlet

方便手动切换
greenlet机制的主要思想是:生成器函数或者协程函数中的yield语句挂起函数的执行,直到稍后使用next()或send()操作进行恢复为止。可以使用一个调度器循环在一组生成器函数之间协作多个任务。greenlet是python中实现我们所谓的"Coroutine(协程)"的一个基础库.

from greenlet import greenlet

def test1():
    print(12)
    gr2.switch()
    print(34)
    gr2.switch()

def test2():
    print(56)
    gr1.switch()
    print(78)

gr1 = greenlet(test1)
gr2 = greenlet(test2)
gr1.switch()
=================================
COR1 start
COR2
COR1 end

2.gevent

自动切换

import gevent
import time
 
def foo():
    print("running in foo")
    gevent.sleep(2)
    print("switch to foo again")
 
def bar():
    print("switch to bar")
    gevent.sleep(5)
    print("switch to bar again")
 
start=time.time()

gevent.joinall(
    [gevent.spawn(foo),
    gevent.spawn(bar)]
)
 
print(time.time()-start)
============================================
running in foo
switch to bar
switch to foo again
switch to bar again
5.010286569595337

由于切换是在IO操作时自动完成,所以gevent需要修改Python自带的一些标准库,这一过程在启动时通过monkey patch完成:

from gevent import monkey
monkey.patch_all()
import gevent
from urllib import request
import time
 
def f(url):
    print('GET: %s' % url)
    resp = request.urlopen(url)
    data = resp.read()
    print('%d bytes received from %s.' % (len(data), url))
 
start=time.time()
 
gevent.joinall([
        gevent.spawn(f, 'https://itk.org/'),
        gevent.spawn(f, 'https://www.github.com/'),
        gevent.spawn(f, 'https://zhihu.com/'),
])
 
# f('https://itk.org/')
# f('https://www.github.com/')
# f('https://zhihu.com/')
 
print(time.time()-start)
===============================================

GET: https://itk.org/
GET: https://www.github.com/
GET: https://zhihu.com/
11785 bytes received from https://zhihu.com/.
12221 bytes received from https://itk.org/.
51166 bytes received from https://www.github.com/.
4.193239688873291

自己做个线程池
简单往队列中传输线程数

import threading
import time
import queue

class Threadingpool():
    def __init__(self,max_num = 10):
        self.queue = queue.Queue(max_num)
        for i in range(max_num):
            self.queue.put(threading.Thread)

    def getthreading(self):
        return self.queue.get()

    def addthreading(self):
        self.queue.put(threading.Thread)


def func(p,i):
    time.sleep(1)
    print(i)
    p.addthreading()

if __name__ == "__main__":
    p = Threadingpool()
    for i in range(20):
        thread = p.getthreading()
        t = thread(target = func, args = (p,i))
        t.start()

往队列中无限添加任务

import queue
import threading
import contextlib
import time

StopEvent = object()


class ThreadPool(object):

    def __init__(self, max_num):
        self.q = queue.Queue()
        self.max_num = max_num

        self.terminal = False
        self.generate_list = []
        self.free_list = []

    def run(self, func, args, callback=None):
        """
        线程池执行一个任务
        :param func: 任务函数
        :param args: 任务函数所需参数
        :param callback: 任务执行失败或成功后执行的回调函数,回调函数有两个参数1、任务函数执行状态;2、任务函数返回值(默认为None,即:不执行回调函数)
        :return: 如果线程池已经终止,则返回True否则None
        """

        if len(self.free_list) == 0 and len(self.generate_list) < self.max_num:
            self.generate_thread()
        w = (func, args, callback,)
        self.q.put(w)

    def generate_thread(self):
        """
        创建一个线程
        """
        t = threading.Thread(target=self.call)
        t.start()

    def call(self):
        """
        循环去获取任务函数并执行任务函数
        """
        current_thread = threading.currentThread
        self.generate_list.append(current_thread)

        event = self.q.get()  # 获取线程
        while event != StopEvent:   # 判断获取的线程数不等于全局变量

            func, arguments, callback = event   # 拆分元祖,获得执行函数,参数,回调函数
            try:
                result = func(*arguments)   # 执行函数
                status = True
            except Exception as e:    # 函数执行失败
                status = False
                result = e

            if callback is not None:
                try:
                    callback(status, result)
                except Exception as e:
                    pass

            # self.free_list.append(current_thread)
            # event = self.q.get()
            # self.free_list.remove(current_thread)
            with self.work_state():
                event = self.q.get()
        else:
            self.generate_list.remove(current_thread)

    def close(self):
        """
        关闭线程,给传输全局非元祖的变量来进行关闭
        :return:
        """
        for i in range(len(self.generate_list)):
            self.q.put(StopEvent)

    def terminate(self):
        """
        突然关闭线程
        :return:
        """
        self.terminal = True
        while self.generate_list:
            self.q.put(StopEvent)
        self.q.empty()

    @contextlib.contextmanager
    def work_state(self):
        self.free_list.append(threading.currentThread)
        try:
            yield
        finally:
            self.free_list.remove(threading.currentThread)

def work(i):
    print(i)
    return i +1 # 返回给回调函数

def callback(ret):
    print(ret)

pool = ThreadPool(10)
for item in range(50):
    pool.run(func=work, args=(item,),callback=callback)

pool.terminate()
# pool.close()

十三. IO模型

1 定义:

I/O输入/输出(Input/Output),分为IO设备和IO接口两个部分。

2 . IO模型前戏准备

在进行解释之前,首先要说明几个概念:

  • 用户空间和内核空间
  • 进程切换
  • 进程的阻塞
  • 文件描述符
  • 缓存 I/O

现在操作系统都是采用虚拟存储器,那么对32位操作系统而言,它的寻址空间(虚拟存储空间)为4G(2的32次方)。
操作系统的核心是内核,独立于普通的应用程序,可以访问受保护的内存空间,也有访问底层硬件设备的所有权限。
为了保证用户进程不能直接操作内核(kernel),保证内核的安全,操心系统将虚拟空间划分为两部分,一部分为内核空间,一部分为用户空间。
针对linux操作系统而言,将最高的1G字节(从虚拟地址0xC0000000到0xFFFFFFFF),供内核使用,称为内核空间
,而将较低的3G字节(从虚拟地址0x00000000到0xBFFFFFFF),供各个进程使用,称为用户空间**

为了控制进程的执行,内核必须有能力挂起正在CPU上运行的进程,并恢复以前挂起的某个进程的执行。*这种行为被称为进程切换 这种切换是由操作系统来完成的。因此可以说,任何进程都是在操作系统内核的支持下运行的,是与内核紧密相关的。
从一个进程的运行转到另一个进程上运行,这个过程中经过下面这些变化:
1.保存处理机上下文,包括程序计数器和其他寄存器。
2. 更新PCB信息。
3. 把进程的PCB移入相应的队列,如就绪、在某事件阻塞等队列。
4. 选择另一个进程执行,并更新其PCB。
5. 更新内存管理的数据结构。
6. 恢复处理机上下文。
注:总而言之就是很耗资源的

正在执行的进程,由于期待的某些事件未发生,如请求系统资源失败、等待某种操作的完成、新数据尚未到达或无新工作做等,则由系统自动执行阻塞原语(Block),使自己由运行状态变为阻塞状态。可见,进程的阻塞是进程自身的一种主动行为,也因此只有处于运行态的进程(获得CPU),才可能将其转为阻塞状态。当进程进入阻塞状态,是不占用CPU资源的。

文件描述符(File descriptor)是计算机科学中的一个术语,是一个用于表述指向文件的引用的抽象化概念。
文件描述符在形式上是一个非负整数。实际上,它是一个索引值,指向内核为每一个进程所维护的该进程打开文件的记录表。当程序打开一个现有文件或者创建一个新文件时,内核向进程返回一个文件描述符。在程序设计中,一些涉及底层的程序编写往往会围绕着文件描述符展开。但是文件描述符这一概念往往只适用于UNIX、Linux这样的操作系统。

import socket
print(socket.socket())
<socket.socket fd=172, family=AddressFamily.AF_INET, type=SocketKind.SOCK_STREAM, proto=0>

缓存 I/O 又被称作标准 I/O,大多数文件系统的默认 I/O 操作都是缓存 I/O。在 Linux 的缓存 I/O 机制中,操作系统会将 I/O 的数据缓存在文件系统的页缓存( page cache )中,也就是说,数据会先被拷贝到操作系统内核的缓冲区中,然后才会从操作系统内核的缓冲区拷贝到应用程序的地址空间。用户空间没法直接访问内核空间的,内核态到用户态的数据拷贝
缓存 I/O 的缺点:
数据在传输过程中需要在应用程序地址空间和内核进行多次数据拷贝操作,这些数据拷贝操作所带来的 CPU 以及内存开销是非常大的。

3.常用IO模型

常用的IO模型:

不常用:

1).阻塞IO( blocking IO )

在linux中,默认情况下所有的socket都是blocking,全程阻塞,不管是等待数据或者是从内核态拷贝数据到用户态,一个典型的读操作流程大概是这样:

blocking IO.gif
系统调用两个阶段:

当用户进程调用了recvfrom这个系统调用,kernel就开始了IO的第一个阶段:准备数据。对于network io来说,很多时候数据在一开始还没有到达(比如,还没有收到一个完整的UDP包),这个时候kernel就要等待足够的数据到来。而在用户进程这边,整个进程会被阻塞。当kernel一直等到数据准备好了,它就会将数据从kernel中拷贝到用户内存,然后kernel返回结果,用户进程才解除block的状态,重新运行起来。
所以,blocking IO的特点就是在IO执行的两个阶段都被block了。

示例:
阻塞IO,就是简单的客户端和服务端进行交流,但是只能有一个客户端,再开一个客户端就不行了,来看代码的实现:
服务端:

import socket

sk = socket.socket()
print(sk)
sk.bind(("127.0.0.1",8080))
sk.listen(3)
while 1:
    conn,addr = sk.accept()
    while 1:
        data = conn.recv(1024)
        print(data.decode("utf8"))
        conn.sendall(data)

客户端:

import socket
sk = socket.socket()

sk.connect(("127.0.0.1",8080))
while 1:
    inp = input(">>>")
    sk.sendall(inp.encode("utf8"))
    data = sk.recv(1024)
    print(data.decode("utf8"))

2). 非阻塞IO(non-blocking )

linux下,可以通过设置socket使其变为non-blocking。setblocking(False) 设置阻塞状态为非阻塞固定时间循环发起系统调用,请求不到做自己的事情,等待下次请求,内核态拷贝数据到用户态需要等待。当对一个non-blocking socket执行读操作时,流程是这个样子:

non-blocking.gif
系统调用两个阶段:

从图中可以看出,当用户进程发出read操作时,如果kernel中的数据还没有准备好,那么它并不会block用户进程,而是立刻返回一个error。从用户进程角度讲 ,它发起一个read操作后,并不需要等待,而是马上就得到了一个结果。用户进程判断结果是一个error时,它就知道数据还没有准备好,于是它可以再次发送read操作。一旦kernel中的数据准备好了,并且又再次收到了用户进程的system call,那么它马上就将数据拷贝到了用户内存,然后返回。
所以,用户进程其实是需要不断的主动询问kernel数据好了没有。

注意:
  在网络IO时候,非阻塞IO也会进行recvform系统调用,检查数据是否准备好,与阻塞IO不一样,”非阻塞将大的整片时间的阻塞分成N多的小的阻塞, 所以进程不断地有机会 ‘被’ CPU光顾”。即每次recvform系统调用之间,cpu的权限还在进程手中,这段时间是可以做其他事情的,也就是说非阻塞的recvform系统调用调用之后,进程并没有被阻塞,内核马上返回给进程,如果数据还没准备好,此时会返回一个error。进程在返回之后,可以干点别的事情,然后再发起recvform系统调用。重复上面的过程,循环往复的进行recvform系统调用。这个过程通常被称之为轮询。轮询检查内核数据,直到数据准备好,再拷贝数据到进程,进行数据处理。需要注意,拷贝数据整个过程,进程仍然是属于阻塞的状态。

示例:
非阻塞IO 这里所举的例子都是以客户端和服务端为模板来写的,来看一个代码实现的例子,其实这个例子和上面的差不多就是在其中加了一些简单的代码,只有几行,可以看看差别就 改变了阻塞的方式:
server端

import socket
sk = socket.socket()
#print(sk)
sk.bind(("127.0.0.1",8080))
sk.listen(3)
sk.setblocking(False)
import time
while 1:
    try:
        conn,addr = sk.accept()
        print(addr)
        # while 1:
        data = conn.recv(1024)
        print(data.decode("utf8"))
        #conn.sendall(data)
        conn.close()
    except Exception as e:
        print("error:",e)
        time.sleep(2)

client端

import socket
sk = socket.socket()

sk.connect(("127.0.0.1",8080))
while 1:
    #inp = input(">>>")
    sk.sendall("hello".encode("utf8"))
    data = sk.recv(1024)
    print(data.decode("utf8"))
3). IO多路复用( IO multiplexing)

IO multiplexing这个词可能有点陌生,但是如果我说select,epoll,大概就都能明白了。有些地方也称这种IO方式为event driven IO。我们都知道,select/epoll的好处就在于单个process就可以同时处理多个网络连接的IO。它的基本原理就是select/epoll这个function会不断的轮询所负责的所有socket,当某个socket有数据到达了,就通知用户进程。 (全程阻塞,监听多个链接系统调用select完成wait for data工作)
它的流程如图:

IO multiplexing.gif
系统调用两个阶段:

当用户进程调用了select,那么整个进程会被block,而同时,kernel会“监视”所有select负责的socket,当任何一个socket中的数据准备好了,select就会返回。这个时候用户进程再调用read操作,将数据从kernel拷贝到用户进程。
这个图和blocking IO的图其实并没有太大的不同,事实上,还更差一些。因为这里需要使用两个system call (select recvfrom),而blocking IO只调用了一个system call (recvfrom)。但是,用select的优势在于它可以同时处理多个connection。(多说一句。所以,如果处理的连接数不是很高的话,使用select/epoll的web server不一定比使用multi-threading + blocking IO的web server性能更好,可能延迟还更大。select/epoll的优势并不是对于单个连接能处理得更快,而是在于能处理更多的连接。)
在IO multiplexing Model中,实际中,对于每一个socket,一般都设置成为non-blocking,但是,如上图所示,整个用户的process其实是一直被block的。只不过process是被select这个函数block,而不是被socket IO给block。

示例:

# import socket
# import select
#
# sk1=socket.socket()
# sk1.bind(('127.0.0.1',8080))
# sk1.listen(3)
#
# sk2=socket.socket()
# sk2.bind(('127.0.0.1',8081))
# sk2.listen(3)
#
# while 1:
#     r,w,e=select.select([sk1,sk2],[],[])
#     print('rrr')
#     for obj in r:#[sk1,]
#         conn,addr=obj.accept()
#         print(addr)
#         conn.send('i am server'.encode('utf8'))

select-client端

# import socket
# sk=socket.socket()
# sk.connect(('127.0.0.1', 8080))
#
# while 1:
#     data = sk.recv(1024)
#     print(data.decode('utf8'))
#     inp=input('>>>')
#     sk.sendall(inp.encode('utf8'))
#

import time
import socket
sk = socket.socket(socket.AF_INET,socket.SOCK_STREAM)

while True:
    sk.connect(('127.0.0.1',8800))
    print("hello")
    sk.sendall(bytes("hello","utf8"))
    time.sleep(2)
    break

IO多路复用来实现一个简单的聊天
talk-server端:

import socket
import select

sk = socket.socket()
sk.bind(("127.0.0.1",8800))
sk.listen(5)
inp = [sk,]

while 1:
    inputs,outputs,errors = select.select(inp,[],[],)
    for obj in inputs:          #conn
        if obj == sk:
            conn,addr = sk.accept()
            print(conn)
            inp.append(conn)
        else:
            data = obj.recv(1024)
            print(data.decode("utf8"))
            Inputs = input("回答%s>>>>"%inp.index(obj))
            obj.sendall(Inputs.encode("utf8"))

talk-client端:

import time
import socket
sk = socket.socket(socket.AF_INET,socket.SOCK_STREAM)
sk.connect(("127.0.0.1",8800))
while True:
    # sk.connect(("127.0.0.1",8800))
    #print("hello")
    inp = input(">>>:")
    sk.sendall(bytes(inp,"utf8"))
    data = sk.recv(1024)
    print(data.decode("utf8"))
    # time.sleep(2)
    # break
4). 异步IO(Asynchronous I/O)

linux下的asynchronous IO其实用得很少。全程无阻塞,实现复杂。先看一下它的流程:


Asynchronous IO.gif

用户进程发起read操作之后,立刻就可以开始去做其它的事。而另一方面,从kernel的角度,当它受到一个asynchronous read之后,首先它会立刻返回,所以不会对用户进程产生任何block。然后,kernel会等待数据准备完成,然后将数据拷贝到用户内存,当这一切都完成之后,kernel会给用户进程发送一个signal,告诉它read操作完成了。

5). 五种IO模型比较:
五种IO模型比较.png

4. IO多路复用实现机制

IO多路复用机制:就是单个process可以同时处理多个网络连接的IO,基本原理就是通过select / epoll函数不断轮询所负责的所有socket,当某个socket有数据到达,就通知用户进程。

不同的操作系统提供的函数不同:

1). 简单介绍select、poll、epoll三者的特点:
2). select 介绍

select最早于1983年出现在4.2BSD中,它通过一个select()系统调用来监视多个文件描述符的数组,当select()返回后,该数组中就绪的文件描述符便会被内核修改标志位,使得进程可以获得这些文件描述符从而进行后续的读写操作。
  select目前几乎在所有的平台上支持。
  select的一个缺点在于单个进程能够监视的文件描述符的数量存在最大限制,在Linux上一般为1024,不过可以通过修改宏定义甚至重新编译内核的方式提升这一限制。
  另外,select()所维护的存储大量文件描述符的数据结构,随着文件描述符数量的增大,其复制的开销也线性增长。同时,由于网络响应时间的延迟使得大量TCP连接处于非活跃状态,但调用select()会对所有socket进行一次线性扫描,所以这也浪费了一定的开销。

3). selectors模块

selectors基于select模块实现IO多路复用,调用语句selectors.DefaultSelector(),特点是根据平台自动选择最佳IO多路复用机制,调用顺序:epoll > poll > select

import selectors
import socket
 
 
def accept(sock, mask):
    conn, addr = sock.accept()
    sel.register(conn, selectors.EVENT_READ, read)  # 将conn和read函数注册到一起,当conn有变化时执行read函数
 
 
def read(conn, mask):
    try:
        data = conn.recv(1000)
        print(data.decode('utf8'))
        inputs = input('>>:').strip()
        conn.send(inputs.encode('utf8'))
    except Exception:
        sel.unregister(conn)
        conn.close()
 
 
sock = socket.socket()
sock.bind(('127.0.0.1', 8080))
sock.listen(100)
sock.setblocking(False)  # 设置为非阻塞IO
 
sel = selectors.DefaultSelector()  # 根据平台自动选择最佳IO多路复用机制
sel.register(sock, selectors.EVENT_READ, accept)  # 将sock和accept函数注册到一起,当sock有变化时执行accept函数
 
while True:
    events = sel.select()  # 监听  [(key1,mask1),(key2),(mask2)]
    for key, mask in events:
        func = key.data  # 1 key.data就是accept   # 2 key.data就是read
        obj = key.fileobj  # 1 key.fileobj就是sock   # 2 key.fileobj就是conn
 
        func(obj, mask)  # 1 accept(sock,mask)   # 2read(conn,mask)
4). 队列queue

队列与线程(和进程)有关,保证多线程信息交换的安全。
队列是一种数据类型(数据结构),可用于存放数据创建队列语法queue.Queue(),默认是先进先出(FIFO)。
队列的优点:保证线程安全

import queue
 
q = queue.Queue() #创建队列对象q
 
q.put(123) #将123放入队列中
q.put('hello')
 
q.get() #将第一个值从队列中取出
import queue
 
q = queue.Queue(5)
q.put(10)
q.put(20)
print(q.get())
q.task_done()
print(q.get())
q.task_done()
 
q.join()
 
print("ending!")

q.put([1,‘123’]) #1为有限等级,越小越优先

十四. 生产者消费者模型

在线程世界里,生产者就是生产数据的线程,消费者就是消费数据的线程。在多线程开发当中,如果生产者处理速度很快,而消费者处理速度很慢,那么生产者就必须等待消费者处理完,才能继续生产数据。同样的道理,如果消费者的处理能力大于生产者,那么消费者就必须等待生产者。为了解决这个问题于是引入了生产者和消费者模式。
  生产者消费者模式是通过一个容器来解决生产者和消费者的强耦合问题。生产者和消费者彼此之间不直接通讯,而通过阻塞队列来进行通讯,所以生产者生产完数据之后不用等待消费者处理,直接扔给阻塞队列,消费者不找生产者要数据,而是直接从阻塞队列里取,阻塞队列就相当于一个缓冲区,平衡了生产者和消费者的处理能力。
  这就像,在餐厅,厨师做好菜,不需要直接和客户交流,而是交给前台,而客户去饭菜也不需要不找厨师,直接去前台领取即可,这也是一个结耦的过程。

import time,random
import queue,threading
 
q = queue.Queue()
 
def Producer(name):
  count = 0
  while count <10:
    print("making........")
    time.sleep(random.randrange(3))
    q.put(count)
    print('Producer %s has produced %s baozi..' %(name, count))
    count +=1
    #q.task_done()
    #q.join()
    print("ok......")
def Consumer(name):
  count = 0
  while count <10:
    time.sleep(random.randrange(4))
    if not q.empty():
        data = q.get()
        #q.task_done()
        #q.join()
        print(data)
        print('\033[32;1mConsumer %s has eat %s baozi...\033[0m' %(name, data))
    else:
        print("-----no baozi anymore----")
    count +=1
 
p1 = threading.Thread(target=Producer, args=('A',))
c1 = threading.Thread(target=Consumer, args=('B',))
# c2 = threading.Thread(target=Consumer, args=('C',))
# c3 = threading.Thread(target=Consumer, args=('D',))
p1.start()
c1.start()
# c2.start()
# c3.start()

总结:

进程:最小的资源管理单位(盛放线程的容器)

线程:最小的执行单位

串行、并行、并发

cpython因为存在GIL导致,同一时刻,同一进程只能有一个线程执行

关于daemon:程序直到不存在非守护线程时退出

同步锁:由于多线程处理公共数据

递归锁

event:一个对象,让多个进程间通信
查看原文>>>>

上一篇下一篇

猜你喜欢

热点阅读