python自学

Python3 多线程执行顺序的可行方案

2020-04-05  本文已影响0人  哦小小树
保持多线程执行顺序的可行方案有以下几种:

锁,Condition,信号量,Event,Barrier,队列

示例代码
import threading
import asyncio
import queue

# 需要重写来实现多线程调用
class Foo:
    def __init__(self):
        self.l = threading.Lock()
        self.l1 = threading.Lock()

        self.l.acquire()
        self.l1.acquire()
        pass

    def printA(self):
        self.l.acquire()
        print("A", threading.currentThread().getName())
        self.l1.release()


    def printB(self):
        print("B",threading.currentThread().getName())
        self.l.release()

    def printC(self):
        self.l1.acquire()
        print("C", threading.currentThread().getName())


if __name__ == '__main__':

    foo = Foo()

    async def test1():
        threading.Thread(target=foo.printA).start()

    async def test2():
        threading.Thread(target=foo.printB).start()

    async def test3():
        threading.Thread(target=foo.printC).start()

# 异步测试打印
    asyncio.run(test1())
    asyncio.run(test2())
    asyncio.run(test3())



  1. 初始化一个锁
self.l = threading.Lock()
  1. 获取一个锁,获取后除非释放,否则代码会在下次获取之前阻塞
self.l.acquire()        # 如果不release,如果再次调用acquire就会被阻塞,直至调用release
exec...
  1. 执行以下代码后锁可以被其他线程获取
self.l.release()

注意:
一个锁,只能保证一组数据按顺序,如果有3个资源需要保证顺序就需要2把锁,4个就需要3把锁


Condition

  1. 初始化:
self.c = threading.Condition()
  1. 使用with语法,使用这个条件
with self.c:
  1. 设置等待条件
# 一个bool表达式,如果条件达到会继续执行,否则
self.c.wait_for(lambda: a == 10) 
exec... # 执行被阻塞的代码
  1. 通知其他线程,我执行完了,谁需要执行就执行
self.c.notify_all() # 通知所有线程开始抢占资源
# 或
self.c.notify(n)    # 激活几个线程

注意:
一个条件可以多处使用,用来保证顺序可以一个条件做到

示例
class Foo:
    def __init__(self):
        self.c = threading.Condition()
        self.t = 0;
        pass

    def printA(self):
        with self.c:
            self.c.wait_for(lambda : self.t == 1)
            print("A", threading.currentThread().getName())
            self.t += 1
            self.c.notify_all()


    def printB(self):
        with self.c:
            print("B",threading.currentThread().getName())
            self.t += 1
            self.c.notify_all()

    def printC(self):
        with self.c:
            self.c.wait_for(lambda : self.t == 2)
            print("C", threading.currentThread().getName())


信号量

  1. 初始化一个信号量
self.s1 = threading.Semaphore(0)
# 使用说明,如果信号量当前为0,当需要获取信号量时就会出现阻塞,
# 直到有其他线程释放信号量
  1. 获取信号量开始执行
self.ls.acquire()
exec... # 执行代码
  1. 释放信号量,执行上面被阻塞的代码
self.s1.release()

注意:使用0的信号量,效果和锁是一样的。
区别:
* 锁先调用了一次acquire,让下一次调用acquire处于阻塞状态。
* 信号量为0表示当前需要调用acquire的线程已经会处于阻塞状态。

示例
class Foo:
    def __init__(self):
        self.s = threading.Semaphore(0)
        self.s1 = threading.Semaphore(0)
        pass

    def printA(self):
        self.s.acquire()
        print("A", threading.currentThread().getName())
        self.s1.release()
        
    def printB(self):
        print("B",threading.currentThread().getName())
        self.s.release()

    def printC(self):
        self.s1.acquire()
        print("C", threading.currentThread().getName())


Event对象

wait 方法作为阻塞;set方法来释放线程【默认类赋值就是阻塞】

  1. 创建一个Event对象
self.e = threading.Event()
  1. 在需要等待的时候调用
self.e.wait()
exec ... # 执行被阻塞的代码
  1. 执行完前期条件后释放阻塞
self.e.set()
示例
class Foo:
    def __init__(self):
        self.e = threading.Event()
        self.e1 = threading.Event()
        pass

    def printA(self):
        self.e.wait()
        print("A", threading.currentThread().getName())
        self.e1.set()

    def printB(self):
        print("B",threading.currentThread().getName())
        self.e.set()

    def printC(self):
        self.e1.wait()
        print("C", threading.currentThread().getName())


Barrier对象

  1. 创建一个barrier对象
self.b = threading.Barrier(parties)
# 如果parties为2 ,则说明,需要调用两次wait方法,
# 才可以让被阻塞的线程执行
  1. 让线程开始执行
self.b.wait()      # 执行次数与parties保持一致
self.b.wait()
exec ...
示例
class Foo:
    def __init__(self):
        self.b = threading.Barrier(2)
        self.b1 = threading.Barrier(2)
        pass

    def printA(self):
        self.b.wait()
        print("A", threading.currentThread().getName())
        self.b1.wait()

    def printB(self):
        print("B",threading.currentThread().getName())
        self.b.wait()

    def printC(self):
        self.b1.wait()
        print("C", threading.currentThread().getName())


空Queue

阻塞队列使用;对于队列为空时,get方法就会自动阻塞,直到put使之非空才会释放进程

  1. 定义一个空队列
self.q = queue.Queue(0)
  1. 如果想阻塞列
self.q.get()        # 直接调用会因为队列中没有数据而阻塞,需要等到向队列put后,才会让get
exec ...
  1. 解除阻塞队列
self.q.put(0)
示例
class Foo:
    def __init__(self):
        self.q = queue.Queue(0)
        self.q1 = queue.Queue(0)
        pass

    def printA(self):
        self.q.get()
        print("A", threading.currentThread().getName())
        self.q1.put(0)

    def printB(self):
        print("B",threading.currentThread().getName())
        self.q.put(0)

    def printC(self):
        self.q1.get()
        print("C", threading.currentThread().getName())


满Queue

定容队列方式;如果队列满了,put方法会被阻塞

  1. 创建一个队列,并将其填充满
self.q = queue.Queue(1) # 队列中最多只能放1个元素
self.q.put(0)
  1. 此时由于存储的队列已经满了,此时以下操作会被阻塞
self.q.put(0)
exec....
  1. 执行以下代码可以解除上面的阻塞
self.q.get()
示例代码
class Foo:
    def __init__(self):
        self.q = queue.Queue(1)
        self.q1 = queue.Queue(1)
        self.q.put(0)
        self.q1.put(0)
        pass

    def printA(self):
        self.q.put(0)
        print("A", threading.currentThread().getName())
        self.q1.get()

    def printB(self):
        print("B",threading.currentThread().getName())
        self.q.get()

    def printC(self):
        self.q1.put(0)
        print("C", threading.currentThread().getName())


上一篇 下一篇

猜你喜欢

热点阅读