Python

Python-线程与协程

2024-09-12  本文已影响0人  阿凡提说AI

同步编程模型

线程编程模型

异步编程模型

Reactor模式是一种在事件驱动编程中广泛使用的模式,特别是在网络编程中。以下是对您提到的概念和方法的解释:

Reactor模式

Twisted框架中的Reactor

Twisted是一个流行的Python网络编程框架,它实现了Reactor模式。以下是一些Twisted中常用的方法和函数:

协议中的方法

在Twisted协议实现中,以下是一些常用的方法:

from twisted.internet import reactor, protocol
class EchoProtocol(protocol.Protocol):
    def dataReceived(self, data):
        print("Received:", data)
        self.transport.write(data)  # Echo back the data
    def connectionLost(self, reason):
        print("Connection lost")
class EchoFactory(protocol.Factory):
    def buildProtocol(self, addr):
        return EchoProtocol()
reactor.listenTCP(8000, EchoFactory())
reactor.run()

在这个例子中,我们创建了一个监听8000端口的TCP服务器,它将收到的数据原样返回给客户端。当有数据到达时,dataReceived方法会被调用,并且我们使用self.transport.write来发送数据回客户端。如果连接断开,connectionLost方法会被调用。

下面是对 _thread 模块的一些详细说明和使用示例。

_thread 模块概述

在 Python 中,_thread 模块提供了基本的线程支持。这个模块相对较低级,通常建议使用更高级别的 threading 模块,因为它提供了更全面和方便的线程管理功能。不过,在某些轻量级或者特定场景下,使用 _thread 可能更为合适。

使用 _thread 创建线程

要使用 _thread 创建新线程,可以使用 start_new_thread 函数。以下是如何使用它的基本步骤:

  1. 定义一个线程函数,它将被线程执行。
  2. 调用 _thread.start_new_thread 并传递线程函数和其参数。

示例代码

下面是一个简单的示例,演示如何使用 _thread 模块来创建一个新线程:

import _thread
# 定义一个线程函数
def print_numbers():
    for i in range(5):
        print(i)
# 使用 _thread.start_new_thread 创建新线程
_thread.start_new_thread(print_numbers, ())
# 为了让主线程等待子线程完成,我们可以使用一个简单的循环
# 注意:这不是线程同步的好方法,仅用于演示
import time
time.sleep(1)

锁(Locks)

在多线程程序中,锁是用于同步线程,防止多个线程同时访问共享资源的一种机制。

创建锁

使用 _thread.allocate_lock() 创建一个锁对象。

获取锁

使用锁对象的 acquire() 方法获取锁。如果锁已经被其他线程获取,则当前线程会阻塞直到锁被释放。

释放锁

使用锁对象的 release() 方法释放锁。

检查锁状态

使用锁对象的 locked() 方法检查锁是否被获取。

锁的示例代码

下面是如何在 _thread 模块中使用锁的示例:

import _thread
# 创建一个锁对象
lock = _thread.allocate_lock()
# 定义一个共享资源
counter = 0
# 定义一个线程函数
def increment_counter():
    global counter
    for i in range(100000):
        lock.acquire()  # 获取锁
        try:
            counter += 1
        finally:
            lock.release()  # 释放锁
# 创建两个线程
_thread.start_new_thread(increment_counter, ())
_thread.start_new_thread(increment_counter, ())
# 等待线程完成
import time
time.sleep(1)
print("Counter should be 200000, and it is:", counter)

在上面的代码中,我们使用锁来确保 counter 变量的增加是原子的,防止在增加操作过程中出现竞态条件。通过 try...finally 结构,我们确保即使发生异常,锁也会被释放。

threading 模块是 Python 中用于线程操作的高级模块,提供了更易于使用的接口和更丰富的功能。以下是关于 threading.Thread 类的一些详细信息和使用方法。

threading.Thread

threading.Thread 类是 threading 模块的核心,它允许用户创建并控制线程。

创建线程

创建 Thread 对象时,可以使用以下参数:

import threading
# 定义一个线程要执行的函数
def fun(x, y):
    print(f"Sum: {x + y}")
# 创建一个线程对象
thread1 = threading.Thread(target=fun, args=(10, 4))

启动线程

要启动线程,可以调用线程对象的 start() 方法:

# 启动线程
thread1.start()

等待线程执行完成

要等待线程执行完成,可以调用线程对象的 join() 方法:

# 等待线程执行完成
thread1.join()

使用对象作为 target

target 参数不仅可以是一个函数,也可以是一个具有 __call__ 方法的对象。在这种情况下,当线程启动时,会调用对象的 __call__ 方法。
以下是如何使用对象作为线程的 target 的示例:

import threading
class MyThreadTarget:
    def __init__(self, x, y):
        self.x = x
        self.y = y
    
    def __call__(self):
        print(f"Product: {self.x * self.y}")
# 创建一个对象,这个对象将作为线程的 target
target_obj = MyThreadTarget(10, 4)
# 创建一个线程对象,target 是上面创建的对象
thread2 = threading.Thread(target=target_obj)
# 启动线程
thread2.start()
# 等待线程执行完成
thread2.join()

在这个例子中,MyThreadTarget 类的实例 target_obj 被用作 Threadtarget。当线程启动时,会调用 target_obj__call__ 方法。这种方式使得可以将线程的行为封装在一个对象中,从而提供更高级别的抽象。

创建自定义线程类

以下是如何从 threading.Thread 类派生一个子类的示例:

import threading
class MyCustomThread(threading.Thread):
    def __init__(self, group=None, target=None, name=None, args=(), kwargs=None, *, daemon=None):
        super().__init__(group=group, target=target, name=name, daemon=daemon)
        self.args = args
        self.kwargs = kwargs
    def run(self):
        # 这里重写父类的 run 方法,实现自定义的线程行为
        # 使用 self.args 和 self.kwargs 访问传递给线程的参数
        print(f"Running thread with arguments: {self.args}, keyword arguments: {self.kwargs}")
# 使用自定义线程类创建线程
custom_thread = MyCustomThread(args=(10, 20), kwargs={'x': 1, 'y': 2})
custom_thread.start()  # 启动线程
custom_thread.join()   # 等待线程执行完成

在这个例子中,我们创建了一个 MyCustomThread 类,它继承自 threading.Thread。我们重写了 __init__ 方法来接受额外的参数,并重写了 run 方法来定义线程启动时执行的操作。

线程锁(Lock)

线程锁用于确保同一时间只有一个线程可以执行一段代码,防止多个线程同时修改同一数据。

import threading
lock = threading.Lock()
# 获取锁
lock.acquire()
# 执行需要同步的代码块
try:
    # ... 代码 ...
finally:
    # 释放锁
    lock.release()

有界信号量(BoundedSemaphore)

有界信号量是一个更高级的同步原语,它可以被用来限制对资源的访问数量。

import threading
MAX = 5  # 最大资源数量
semaphore = threading.BoundedSemaphore(value=MAX)
# 获取资源
semaphore.acquire()
# 使用资源
try:
    # ... 代码 ...
finally:
    # 释放资源
    semaphore.release()
# 查看当前信号量的值
print(f"Current semaphore value: {semaphore._value}")

在这个例子中,BoundedSemaphore 初始化为 MAX 值,表示有 MAX 个资源可用。每次调用 acquire() 方法时,信号量的值减一,当信号量的值为零时,后续的 acquire() 调用将阻塞,直到某个线程调用 release() 方法释放资源。_value 属性用于查看当前信号量的值,但通常不建议直接访问内部属性,因为它可能在不同版本的 Python 中有所变化。正确的做法是使用 BoundedSemaphore 提供的公共方法来管理信号量。

协程是单线程的,每个协程是一个函数,这些函数在执行时可以不断切换,从宏观上看才体现出并发的效果。
协程函数前面要加async关键字修饰,如果要等待一个协程执行完,再执行其他代码,需要用await语句执行协程函数。

运行协程:asyncio.run(func())

import asyncio

async def my_coroutine():
    print("Coroutine is running")
    await asyncio.sleep(1)
    print("Coroutine has finished")

# 运行协程
asyncio.run(my_coroutine())

判断是否是协程函数:asyncio.iscoroutinefunction(func)

async def another_coroutine():
    pass

print(asyncio.iscoroutinefunction(my_coroutine))  # 输出: True
print(asyncio.iscoroutinefunction(another_coroutine))  # 输出: True
print(asyncio.iscoroutinefunction(print))  # 输出: False

异步执行:asyncio.create_task(func())

async def say_after(delay, what):
    await asyncio.sleep(delay)
    print(what)

async def main():
    task1 = asyncio.create_task(say_after(1, 'hello'))
    task2 = asyncio.create_task(say_after(2, 'world'))

    print('started at', datetime.datetime.now())

    # 等待任务完成
    await task1
    await task2

    print('finished at', datetime.datetime.now())

asyncio.run(main())

上一篇下一篇

猜你喜欢

热点阅读