机器学习python全栈开发Python语言与信息数据获取和机器学习

线程、进程与协程

2017-03-07  本文已影响252人  凉茶半盏

众所周知, 计算机是由软件和硬件组成. 硬件中的CPU主要用于解释指令和处理数据, 软件中的操作系统负责资源的管理和分配以及任务的调度. 而程序则是运行在操作系统上具有特定功能的软件. 每当程序执行完成特定功能的时候, 为了保证程序的独立运行不受影响往往需要进程控制块(专门管理和控制程序执行的数据结构)的作用.
说了以上这么多基本理论知识, 接下来我们谈谈进程. 进程本质上就是一个程序在一个数据集上的动态执行过程. 进程通常由程序, 数据集和进程控制块三部分组成.

而线程在现在的多处理器电子设备中是最小的处理单元. 一个进程可以有多个线程, 这些线程之间彼此共享该进程的资源. 但是进程之间默认是相互独立的, 若数据共享则需要另外特定的操作. 这里做一个比喻. 现在有一个大型工厂, 该工厂负责生产汽车. 同时这个工厂又有多个车间, 每个车间负责不同的功能, 有的生产轮胎, 有的生产方向盘等等. 每个车间又有多个车间工人, 这些工人相互合作, 彼此共享资源来共同生产轮胎方向盘等等. 这里的工厂就相当于一个应用程序, 而每个车间相当于一个进程, 每个车间工人就相当于线程.

普通多线程创建使用

#!/usr/bin/env python
# -*- coding:utf-8 -*-
import threading
import time

def showThreading(arg):
    time.sleep(1)
    print("current thread is : ",arg)

if __name__ == '__main__':
    for tmp in range(10):
        t=threading.Thread(target=showThreading,args=(tmp,))
        t.start()
    print('main thread has been stopped')

执行结果如下:

简单多线程运行结果 Daemon为True时的执行结果

自定义线程类

#!/usr/bin/env python
# -*- coding:utf-8 -*-
import threading,time

class MyThread(threading.Thread):
    def __init__(self,target,arg=()):
        super(MyThread, self).__init__()
        self.target=target
        self.arg=arg

    def run(self):
        self.target(self.arg)

def test(arg):
    time.sleep(1)
    print("current thread is : ",arg)

if __name__ == '__main__':
    for tmp in range(10):
        mt=MyThread(target=test,arg=(tmp,))
        mt.start()
    print("main thread has been stopped")

由于CPU对线程是随机调度执行, 并且往往会在当前线程执行一小段代码之后便直接换为其他线程执行, 如此往复循环直到所有的线程执行结束. 因此在一个共享资源和数据的进程中, 多个线程对同一资源操或者同一数据操作容易造成资源抢夺和产生脏数据. 此时我们引入锁的概念, 对这种资源和数据进行加锁, 直到当前线程操作完毕再释放锁让其他线程操作.

我们先看看不加锁时候对数据的操作情况:

#!/usr/bin/env python
# -*- coding:utf-8 -*-
import threading, time

NUM = 0


def add():
    global NUM
    NUM += 1
    name=t.getName()
    time.sleep(1)
    print('current thread is: ',name ,' current NUM is: ',NUM )


if __name__ == '__main__':
    for tmp in range(10):
        t=threading.Thread(target=add)
        t.start()
    print("main thread has been stopped !")

不加锁执行结果

我们再来看看加锁的情况:

#!/usr/bin/env python
# -*- coding:utf-8 -*-
import threading, time

NUM = 0


def add():
    global NUM
    lock.acquire()
    NUM += 1
    name=t.getName()
    print('current thread is: ',name ,' current NUM is: ',NUM )
    time.sleep(1)
    lock.release()

if __name__ == '__main__':
    lock=threading.Lock()
    for tmp in range(10):
        t=threading.Thread(target=add)
        t.start()
    print("main thread has been stopped !")
加锁后的执行结果

python中在threading模块中定义了一下几种锁:

Semaphore 信号量锁使用:

#!/usr/bin/env python
# -*- coding:utf-8 -*-
import threading, time


def test():
    semaphore.acquire()
    print("current thread is: ", t.getName())
    time.sleep(1)
    semaphore.release()

if __name__ == '__main__':
    semaphore = threading.BoundedSemaphore(5)
    for tmp in range(20):
        t = threading.Thread(target=test)
        t.start()

event 事件机制锁使用

#!/usr/bin/env python
# -*- coding:utf-8 -*-
import threading,time

def test():
    print(t.getName())
    event.wait()


if __name__ == '__main__':
    event=threading.Event()
    for tmp in range(10):
        t=threading.Thread(target=test)
        t.start()
    print("zhe middle of main thread")
    if input("input your flag: ")=='1':
        event.set()
    print("main thread has been stopped")

condition 条件锁使用

#!/usr/bin/env python
# -*- coding:utf-8 -*-
import threading


def condition():
    inp = input("input your condition: ")
    print(inp)
    if inp == "yes":
        return True
    return False


def test():
    cd.acquire()
    # cd.wait(1)
    cd.wait_for(condition)
    # cd.notify(2)
    print(t.getName())
    cd.release()


if __name__ == '__main__':
    cd = threading.Condition()
    for tmp in range(10):
        t = threading.Thread(target=test)
        t.start()
        t.join()
    print("\nmain thread has been stopped")
运行结果

在python的queue模块中内置了一种特殊的数据结构, 即队列. 这里我们可以把队列简单的看作是规定顺序执行的一组线程.

**Queue 先进先出队列的使用: **

#!/usr/bin/env python
# -*- coding:utf-8 -*-
import queue

q=queue.Queue(10)

for tmp in range(10):
    q.put(tmp)

for tmp in range(10):
    print(q.get(),q.qsize())

利用Queue实现生产者消费者模型

#!/usr/bin/env python
# -*- coding:utf-8 -*-
import time, threading, queue


def productor(i):
    while True:
        q.put(i)
        time.sleep(1)


def consumer(i):
    while True:
        print("consumer-%s ate %s" % (i, q.get()))


if __name__ == '__main__':
    q = queue.Queue(10)
    for tmp in range(8):
        t = threading.Thread(target=productor, args=(tmp,))
        t.start()

    for tmp in range(5):
        t = threading.Thread(target=consumer, args=(tmp,))
        t.start()

    print("main has been stopped")
运行结果

不断的创建和销毁线程是非常消耗CPU的, 因此我们会采取维护一个线程池来实现多线程. 但是python中并未提供线程池的模块, 这里就需要我们自己来写.

**简单版本的线程池实现: **

#!/usr/bin/env python
# -*- coding:utf-8 -*-
import queue, threading


class ThreadPool(object):
    def __init__(self, max_num=5):
        self.queue = queue.Queue(max_num)
        for i in range(max_num):
            self.queue.put(threading.Thread)

    def get_thread(self):
        return self.queue.get()

    def add_thread(self):
        self.queue.put(threading.Thread)


def test(pool, i):
    tm = __import__("time")
    tm.sleep(1)
    print("current thread is: ", i)
    pool.add_thread()


if __name__ == '__main__':
    p = ThreadPool()
    for tmp in range(20):
        thread = p.get_thread()
        t = thread(target=test, args=(p, tmp))
        t.start()
    print("main thread has been stopped")
运行结果

**健壮版本的线程池: **

#!/usr/bin/env python
# -*- coding:utf-8 -*-
import queue, threading, contextlib

stopFlag = object()


class ThreadPool(object):
    def __init__(self, max_num):
        self.queue = queue.Queue()
        self.max_num = max_num

        self.terminal = False
        self.queue_real_list_list = []
        self.queue_free_list = []

    def run(self, target, args, callback):
        task_tuple = (target, args, callback)
        self.queue.put(task_tuple)
        if len(self.queue_free_list) == 0 and len(self.queue_real_list_list) < self.max_num:
            self.add_thread()

    def add_thread(self):
        t = threading.Thread(target=self.fetch)
        t.start()

    def fetch(self):
        current_thread = threading.currentThread
        self.queue_real_list_list.append(current_thread)
        task_tuple = self.queue.get()
        while task_tuple != stopFlag:
            func, args, callback = task_tuple
            result_status = True
            try:
                result = func(*args)
            except Exception as e:
                result_status = False
                result = e
            if callback is not None:
                try:
                    callback(result_status, result)
                except Exception as e:
                    pass
            if not self.terminal:
                # self.queue_free_list.append(current_thread)
                # task_tuple = self.queue.get()
                # self.queue_free_list.remove(current_thread)
                with ThreadPool.queue_operate(self.queue_free_list,current_thread):
                    task_tuple = self.queue.get()
            else:
                task_tuple = stopFlag

        else:
            self.queue_real_list_list.remove(current_thread)

    def close(self):
        num = len(self.queue_real_list_list)
        while num:
            self.queue.put(stopFlag)
            num -= 1

    def terminate(self):
        self.terminal = True
        max_num = len(self.queue_real_list_list)
        while max_num:
            self.queue.put(stopFlag)
            max_num -= 1

    def terminate_clean_queue(self):
        self.terminal = True
        while self.queue_real_list_list:
            self.queue.put(stopFlag)
        self.queue.empty()

    @staticmethod
    @contextlib.contextmanager
    def queue_operate(ls, ct):
        ls.append(ct)
        try:
            yield
        finally:
            ls.remove(ct)


def callback_func(result_status, result):
    print(result_status, result)


def test(i):
    tm = __import__("time")
    tm.sleep(1)
    return "current thread is: {}".format(i)


if __name__ == '__main__':
    pool = ThreadPool(5)
    for tmp in range(20):
        pool.run(target=test, args=(tmp,), callback=callback_func)
    # pool.close()
    pool.terminate()

if len(self.queue_free_list) == 0 and len(self.queue_real_list_list) < self.max_num:
self.add_thread()

判断空闲列表是否为空且真实的线程列表数目是否小于最大线程数目, 若是则执行`add_thread()`函数添加线程
 - `add_thread(self)` 添加并启动线程, 并将线程要执行的功能交给`fetch(self)`函数
 - `current_thread = threading.currentThread` 获取当前线程, `self.queue_real_list_list.append(current_thread)` 将当前线程加入到真实线程列表中
 - `task_tuple = self.queue.get()` 从任务队列中获取任务元组
 - `while task_tuple != stopFlag` 该循环语句内容表示任务元组对象不是`stopFlag`结束标志的时候执行其具体的功能和回调函数
 - `if not self.terminal` 判断是否立即终止当前线程(等待当前线程执行完任何立即结束)
- `pool.close()` 根据当前真实线程列表添加对应的`stopFlag`终止符
- `pool.terminate()` 此为不清空任务队列的立即终止线程方法
- `terminate_clean_queue(self)` 清空任务队列的立即终止线程方法

在python中由multiprocess模块提供的Process类来实现进程相关功能(process与Process是不同的)

**Process的使用: **
```python
#!/usr/bin/env python
# -*- coding:utf-8 -*-
from multiprocessing import Process


def test(pro):
    print("current process is: ",pro)


if __name__ == '__main__':
    for tmp in range(10):
        p = Process(target=test,args=(tmp,))
        p.start()
运行结果

**普通的数据共享在进程中的实现: **

#!/usr/bin/env python
# -*- coding:utf-8 -*-
from multiprocessing import Process

ls = []


def test(i):
    ls.append(i)
    print("current process is: ", i, " and list is: ", ls)


if __name__ == '__main__':
    for tmp in range(10):
        p = Process(target=test, args=(tmp,))
        p.start()
        p.join()
    print("The final list is: ", ls)
运行结果

用Array共享数据

# -*- coding:utf-8 -*-
from multiprocessing import Process, Array


def test(i, ay):
    ay[i] += 10
    print('current process is: ', i)
    for tmp in ay:
        print(tmp)


if __name__ == '__main__':
    ay = Array('i', [1, 2, 3, 4, 5, 6])
    for tmp in range(5):
        p = Process(target=test, args=(tmp, ay))
        p.start()
运行结果

**使用Manager共享数据: **

#!/usr/bin/env python
# -*- coding:utf-8 -*-
from multiprocessing import Manager, Process


def test(i, dic):
    dic[i] = i + 10
    print('current process is: ', i)
    for k, v in dic.items():
        print(k, v)


if __name__ == '__main__':
    mg = Manager()
    dic = mg.dict()
    for tmp in range(10):
        p = Process(target=test, args=(tmp, dic))
        p.start()
        p.join()
运行结果

**使用queue共享数据: **

#!/usr/bin/env python
# -*- coding:utf-8 -*-
from multiprocessing import Process,queues

import multiprocessing


def test(i,qu):
    qu.put(i+10)
    print("current process is: ",i," and zhe size of zhe queue is: ",qu.qsize())

if __name__ == '__main__':
    qu=queues.Queue(10,ctx=multiprocessing)
    for tmp in range(10):
        p=Process(target=test,args=(tmp,qu))
        p.start()
运行结果

在进程中共享数据也会出现脏数据的问题, 比如用multiprocessing模块中的queue或者Queue共享数据时候就会出现脏数据. 此时我们往往需要设置进程锁. 进程锁的使用和线程锁使用完全相同(Rlock, Lock, Semaphore, Event, Condition, 这些锁均在multiprocess中)

在实际开发中我们并不会采取直接创建多进程来实现某些功能, 而是主动维护一个指定进程数的进程池来实现多进程. 因为不断的创建进程和销毁进程对CPU的开销太大. python中内置了了进程池Pool 模块

**进程池Pool的使用: **

#!/usr/bin/env python
# -*- coding:utf-8 -*-
from multiprocessing import Pool
import time


def test(arg):
    time.sleep(1)
    return arg + 10


def call_end(arg):
    print(arg)


if __name__ == '__main__':
    p = Pool(5)
    for tmp in range(10):
        p.apply_async(func=test, args=(tmp,), callback=call_end)
    p.close()
    # p.terminate()
    p.join()
运行结果

协成是python中特有的一个概念, 它是人为的利用单线程在操作某任务等待空闲的时间内, 通过yield来保存当时状态, 进而用该线程做其他的操作. 由此实现的并发操作, 本质上跟IO多路复用类似.

**基础版本协成的使用: **

#!/usr/bin/env python
# -*- coding:utf-8 -*-
import greenlet


def f1():
    print('1111')
    gr2.switch()
    print('2222')
    gr2.switch()


def f2():
    print('3333')
    gr1.switch()
    print('4444')


if __name__ == '__main__':
    gr1 = greenlet.greenlet(f1)
    gr2 = greenlet.greenlet(f2)
    gr1.switch()

**封装后的协成模块使用: **

#!/usr/bin/env python
# -*- coding:utf-8 -*-
import gevent


def f1():
    print('this is f1 !!!')
    gevent.sleep(0)
    print('f1 after sleep')


def f2():
    print("this is f2 !!!")
    gevent.sleep(0)
    print('f2 after sleep')


if __name__ == '__main__':
    gevent.joinall([
        gevent.spawn(f1),
        gevent.spawn(f2),
    ])

gevent.joinall([
gevent.spawn(f1),
gevent.spawn(f2),
])

- 等待`f1`和`f2`执行完成再结束当前线程, 类似线程中的`join()`方法
- `gevent.sleep(0)` 设置等待时间
- 往往实际开发中并不需要设置从哪里需要切换代码执行或者等待的

**用协成访问网页简单例子: **
```python
#!/usr/bin/env python
# -*- coding:utf-8 -*-
from gevent import monkey

monkey.patch_all()
import gevent, requests


def fetch(url):
    print('current url %s' % url)
    rp = requests.get(url)
    data = rp.text
    print(url, len(data))


if __name__ == '__main__':
    gevent.joinall([
        gevent.spawn(fetch, 'https://www.baidu.com'),
        gevent.spawn(fetch, 'https://www.sogou.com/'),
        gevent.spawn(fetch, 'http://www.jianshu.com'),
    ])
运行结果
上一篇 下一篇

猜你喜欢

热点阅读