进程,线程,协程与python的实现

2020-07-12  本文已影响0人  睡不醒的大橘

进程

进程
进程地址空间
内存区域 存储内容
局部变量,函数参数,函数的返回值
动态分配的内存
BSS段 未初始化或初值为0的全局变量和静态局部变量
数据段 已初始化且初值非0的全局变量和静态局部变量
代码段 可执行文件的操作指令,可执行程序在内存中的镜像(只读)
进程间通信
管道
消息队列
共享内存
信号量
信号
Ctrl+C 产生 SIGINT 信号,表示终止该进程;
Ctrl+Z 产生 SIGTSTP 信号,表示停止该进程,但还未结束;
kill -9 1050 ,表示给 PID 为 1050 的进程发送 SIGKILL 信号,用来立即结束该进程
Socket
Python实现
  1. multiprocessing
from multiprocessing import Process
import os, time

# 子进程要执行的代码
def run_proc(name):
    time.sleep(2)
    print('Run child process %s (%s)...' % (name, os.getpid()))

if __name__=='__main__':
    print('Parent process %s.' % os.getpid())
    p = Process(target=run_proc, args=('test',))
    print('Child process will start.')
    p.start()
    p.join()
    print('Child process end.')
  1. multiprocessing.Pool 进程池
from multiprocessing import Pool
import os, time, random

def long_time_task(name):
    print('Run task %s (%s)...' % (name, os.getpid()))
    start = time.time()
    time.sleep(2)
    end = time.time()
    print('Task %s runs %0.2f seconds.' % (name, (end - start)))

if __name__=='__main__':
    print('Parent process %s.' % os.getpid())
    p = Pool(2)
    for i in range(3):
        p.apply_async(long_time_task, args=(i,))
    print('Waiting for all subprocesses done...')
    p.close()
    p.join()
    print('All subprocesses done.')
Parent process 5584.
Waiting for all subprocesses done...
Run task 0 (12836)...
Run task 1 (5916)...
Task 0 runs 2.00 seconds.
Run task 2 (12836)...
Task 1 runs 2.00 seconds.
Task 2 runs 2.00 seconds.
All subprocesses done.
  1. 进程间通信
from multiprocessing import Process, Queue
import os, time

# 写数据进程执行的代码:
def write(q):
    print('Process to write: %s' % os.getpid())
    for value in ['A', 'B', 'C']:
        print('Put %s to queue.' % value)
        q.put(value)
        time.sleep(2)

# 读数据进程执行的代码:
def read(q):
    print('Process to read: %s' % os.getpid())
    while True:
        value = q.get(True)
        print('Get %s from queue.' % value)

if __name__=='__main__':
    # 父进程创建Queue,并传给各个子进程:
    q = Queue()
    pw = Process(target=write, args=(q,))
    pr = Process(target=read, args=(q,))
    # 启动子进程pw,写入:
    pw.start()
    # 启动子进程pr,读取:
    pr.start()
    # 等待pw结束:
    pw.join()
    # pr进程里是死循环,无法等待其结束,只能强行终止:
    pr.terminate()
Process to read: 9504
Process to write: 10672
Put A to queue.
Get A from queue.
Put B to queue.
Get B from queue.
Put C to queue.
Get C from queue.

线程

线程
线程的内存模型
线程的状态
  1. 可运行 (runnable):线程被创建之后,调用Start()函数就到了这个状态。
  2. 运行 (running):start()函数之后,CPU切换到了这个线程开始执行里面的Run方法就称为运行状态。
  3. 阻塞 (blocked):阻塞状态是指线程因为某种原因放弃了cpu执行权,暂时停止运行。直到线程进入可运行(runnable)状态,才有机会再次获得cpu 执行权 转到运行(running)状态。
  1. 计算密集型 ,程序执行时大部分时间处于 Running 状态;
  2. IO密集型 ,程序执行时大部分时间处于 IO Blocked 状态;
线程的调度
  1. 内核级线程:

内核线程建立和销毁都是由操作系统负责、通过系统调用完成,内核维护进程及线程的上下文信息以及线程切换。

程序一般不会直接使用内核线程,而是使用内核线程的一种高级接口-轻量级进程(Light-weight Process简称LWP ,是一种由内核支持的用户线程,每一个轻量级进程都与一个特定的内核线程关联。)。

优势:内核级线级能参与全局的多核处理器资源分配,充分利用多核 CPU 优势。

局限性:需要系统调度,系统调度代价大,需要在用户态和内核态来回切换;内核进程数量有限。

  1. 用户级线程

用户线程的创建、调度、同步和销毁全由用户空间的库函数完成,不需要内核的参与。内核的调度对象是进程本身,内核并不知道用户线程的存在。

优势:性能极强。用户线程的建立,同步,销毁和调度完全在用户态中完成,操作非常快,低消耗,无需切换内核态。

局限性:所有操作都需要自己实现,逻辑极其复杂。 用户线级线程只能参与竞争该进程的处理器资源,不能参与全局处理器资源的竞争。

  1. 分时调度
    所有线程轮流使用 CPU 的使用权,平均分配每个线程占用 CPU 的时间。

  2. 抢占式调度
    优先让优先级高的线程使用 CPU,如果线程的优先级相同,那么会随机选择一个(线程随机性)。

Python实现
  1. 多线程和多进程最大的不同在于,多进程中,同一个变量,各自有一份拷贝存在于每个进程中,互不影响。而多线程中,静态变量,实例变量被线程共享,局部变量不被线程共享
from threading import Thread

num = 0

def run_thread(name):
    global num
    num = num + 1
    print('{} num: {}'.format(name, num))

if __name__=='__main__':
    p1 = Thread(target=run_thread, args=('thread1',))
    p2 = Thread(target=run_thread, args=('thread2',))
    p1.start()
    p2.start()
    p1.join()
    p2.join()
    
# thread1 num: 1
# thread2 num: 2
from multiprocessing import Process

num = 0

def run_proc(name):
    global num
    num = num + 1
    print('{} num: {}'.format(name, num))

if __name__=='__main__':
    p1 = Process(target=run_proc, args=('process1',))
    p2 = Process(target=run_proc, args=('process2',))
    p1.start()
    p2.start()
    p1.join()
    p2.join()
    
# process1 num: 1
# process2 num: 1
  1. 线程之间共享数据最大的危险在于多个线程同时改一个变量:
from threading import Thread

num = 0

def change_num(name):
    global num
    num = num + 1

def run_thread(name):
    for i in range(1000000):
        change_num(name)

if __name__=='__main__':
    p1 = Thread(target=run_thread, args=('thread1',))
    p2 = Thread(target=run_thread, args=('thread2',))
    p1.start()
    p2.start()
    p1.join()
    p2.join()
    print(num)
    
# 1851956
  1. 可以使用互斥锁(Lock),同一时刻只允许一个线程执行操作
from threading import Thread, Lock

num = 0

def change_num(name, lock):
    global num
    lock.acquire()
    num = num + 1
    lock.release()

def run_thread(name, lock):
    for i in range(1000000):
        change_num(name, lock)

if __name__=='__main__':
    lock = Lock()
    p1 = Thread(target=run_thread, args=('thread1', lock))
    p2 = Thread(target=run_thread, args=('thread2', lock))
    p1.start()
    p2.start()
    p1.join()
    p2.join()
    print(num)
    
# 2000000
  1. 由于python GIL全局锁,python多线程并不能提升计算密集型任务的效率。这种情况下可以考虑使用多进程。
from threading import Thread
import time

def change_num(name):
    nums = [0] * 100
    for num in nums:
        for i in range(1000000):
            num = num + 1

if __name__=='__main__':
    p1 = Thread(target=change_num, args=('thread1',))
    start = time.time()
    p1.start()
    p1.join()
    end = time.time()
    print('runs %0.2f seconds.' % (end - start))
    
# runs 21.23 seconds.
from threading import Thread
import time

def change_num(name):
    nums = [0] * 100
    for num in nums:
        for i in range(1000000):
            num = num + 1

if __name__=='__main__':
    p1 = Thread(target=change_num, args=('thread1',))
    p2 = Thread(target=change_num, args=('thread2',))
    p3 = Thread(target=change_num, args=('thread2',))
    start = time.time()
    p1.start()
    p2.start()
    p3.start()
    p1.join()
    p2.join()
    p3.join()
    end = time.time()
    print('runs %0.2f seconds.' % (end - start))
    
# runs 19.49 seconds.

协程

协程
Python实现
  1. python 2.5 中引入 yield/send 表达式用于实现协程
def consumer():
    r = ''
    while True:
        n = yield r
        if not n:
            return
        print('Consuming %s...' % n)
        r = '200 OK'

def produce(c):
    c.__next__()
    n = 0
    while n < 3:
        n = n + 1
        print('Producing %s...' % n)
        r = c.send(n)
        print('Consumer return: %s' % r)
    c.close()

if __name__=='__main__':
    c = consumer()
    produce(c)
Producing 1...
Consuming 1...
Consumer return: 200 OK
Producing 2...
Consuming 2...
Consumer return: 200 OK
Producing 3...
Consuming 3...
Consumer return: 200 OK
  1. gevent是一个广泛使用的协程框架,它的一个特点是可以用同步的方法写异步应用,不需要写回调函数:
from gevent import monkey; monkey.patch_all()
import gevent
import time

def f(num):
    print('start: ' + str(num))
    time.sleep(3)
    print('end: ' + str(num))


gevent.joinall([
        gevent.spawn(f, 1),
        gevent.spawn(f, 2),
        gevent.spawn(f, 3),
])
start: 1
start: 2
start: 3
end: 1
end: 2
end: 3
  1. python 3.5 之后还引入async/await。

1)

import asyncio

async def hw_async():
    print('Hello world!')

loop = asyncio.get_event_loop()
loop.run_until_complete(hw_async())
loop.close()

2)

import asyncio
import datetime
from threading import currentThread

async def hw_async(num):
    print(f"Hello world {num} begin at: {datetime.datetime.now().strftime('%S')},thread:{currentThread().name}")
    await asyncio.sleep(3)
    print(f"Hello world {num} end at: {datetime.datetime.now().strftime('%S')}")

loop = asyncio.get_event_loop()

tasks = [hw_async(1), hw_async(2),hw_async(3)]
loop.run_until_complete(asyncio.wait(tasks))
loop.close()
Hello world 3 begin at: 53,thread:MainThread
Hello world 1 begin at: 53,thread:MainThread
Hello world 2 begin at: 53,thread:MainThread
Hello world 3 end at: 56
Hello world 1 end at: 56
Hello world 2 end at: 56

什么时候使用进程,线程和协程

上一篇 下一篇

猜你喜欢

热点阅读