Python-线程与协程
同步编程模型
- 定义:在同步编程模型中,代码是顺序执行的。每个任务必须等待前一个任务完成之后才能开始执行。
- 优点:编程模型简单,易于理解和调试。
- 缺点:效率较低,特别是在执行IO密集型或长时间运行的任务时,整个程序会被阻塞。
线程编程模型
- 定义:线程编程模型通过创建多个线程来并行执行任务。每个线程可以独立运行,从而实现多任务同时处理。
-
优点:
- 提高了程序的并发性。
- 可以更好地利用多核处理器的计算能力。
-
缺点:
- 线程创建和管理的开销较大。
- 需要处理线程同步和死锁等问题,增加了编程的复杂性。
异步编程模型
- 定义:在异步编程模型中,任务的执行不是阻塞的。当一个任务开始执行后,程序可以继续执行其他任务,而不需要等待前一个任务完成。
-
优点:
- 提高了程序的响应性和效率。
- 特别适合IO密集型任务,如网络请求、文件读写等。
-
缺点:
- 编程模型相对复杂,不易理解和调试。
- 需要程序员手动管理任务的调度和状态。
在实际应用中,根据不同的场景和需求,会选择合适的编程模型。例如,对于需要高并发处理的服务器端应用,通常会采用线程编程模型或异步编程模型;而对于简单的脚本或批处理任务,同步编程模型可能就足够了。在选择编程模型时,还需要考虑到资源的合理利用和程序的维护性。
Reactor模式是一种在事件驱动编程中广泛使用的模式,特别是在网络编程中。以下是对您提到的概念和方法的解释:
Reactor模式
- 基本原理:Reactor模式的核心是事件循环(Event Loop),它不断等待并处理事件。当事件发生时,如数据到达、连接建立或断开等,Reactor会调用相应的处理函数来处理这些事件。
-
组件:
- 事件循环(Event Loop):循环等待并分发事件。
- 事件多路分解器(Event Demultiplexer):等待事件的发生,如文件描述符可读、可写等。
- 事件处理器(EventHandler):处理具体的事件。
Twisted框架中的Reactor
Twisted是一个流行的Python网络编程框架,它实现了Reactor模式。以下是一些Twisted中常用的方法和函数:
-
reactor.callWhenRunning(func):将函数
func
添加到事件循环中,当Reactor启动并开始运行时,这个函数会被调用。 - reactor.run():启动事件循环。这个方法会阻塞,直到事件循环停止。
-
reactor.connectTCP(host, port, factory, timeout=30, bindAddress=None):创建一个TCP连接。
host
和port
是远程服务器的地址和端口,factory
是用于创建协议实例的工厂,timeout
是连接超时时间,bindAddress
是本地绑定的地址。 -
reactor.listenTCP(port, factory, backlog=50, interface=''):在指定的端口上监听TCP连接。
port
是监听的端口,factory
是用于创建协议实例的工厂,backlog
是连接队列的大小,interface
是监听的接口。
协议中的方法
在Twisted协议实现中,以下是一些常用的方法:
- self.transport.write(data):将数据写入到传输层,通常用于发送数据到网络对端。
- self.transport.loseConnection():关闭当前的连接。
-
self.transport.getPeer().host:获取对端(客户端)的IP地址。
以下是一个简单的Twisted TCP服务器示例:
from twisted.internet import reactor, protocol
class EchoProtocol(protocol.Protocol):
def dataReceived(self, data):
print("Received:", data)
self.transport.write(data) # Echo back the data
def connectionLost(self, reason):
print("Connection lost")
class EchoFactory(protocol.Factory):
def buildProtocol(self, addr):
return EchoProtocol()
reactor.listenTCP(8000, EchoFactory())
reactor.run()
在这个例子中,我们创建了一个监听8000端口的TCP服务器,它将收到的数据原样返回给客户端。当有数据到达时,dataReceived
方法会被调用,并且我们使用self.transport.write
来发送数据回客户端。如果连接断开,connectionLost
方法会被调用。
下面是对 _thread
模块的一些详细说明和使用示例。
_thread
模块概述
在 Python 中,_thread
模块提供了基本的线程支持。这个模块相对较低级,通常建议使用更高级别的 threading
模块,因为它提供了更全面和方便的线程管理功能。不过,在某些轻量级或者特定场景下,使用 _thread
可能更为合适。
使用 _thread
创建线程
要使用 _thread
创建新线程,可以使用 start_new_thread
函数。以下是如何使用它的基本步骤:
- 定义一个线程函数,它将被线程执行。
- 调用
_thread.start_new_thread
并传递线程函数和其参数。
示例代码
下面是一个简单的示例,演示如何使用 _thread
模块来创建一个新线程:
import _thread
# 定义一个线程函数
def print_numbers():
for i in range(5):
print(i)
# 使用 _thread.start_new_thread 创建新线程
_thread.start_new_thread(print_numbers, ())
# 为了让主线程等待子线程完成,我们可以使用一个简单的循环
# 注意:这不是线程同步的好方法,仅用于演示
import time
time.sleep(1)
锁(Locks)
在多线程程序中,锁是用于同步线程,防止多个线程同时访问共享资源的一种机制。
创建锁
使用 _thread.allocate_lock()
创建一个锁对象。
获取锁
使用锁对象的 acquire()
方法获取锁。如果锁已经被其他线程获取,则当前线程会阻塞直到锁被释放。
释放锁
使用锁对象的 release()
方法释放锁。
检查锁状态
使用锁对象的 locked()
方法检查锁是否被获取。
锁的示例代码
下面是如何在 _thread
模块中使用锁的示例:
import _thread
# 创建一个锁对象
lock = _thread.allocate_lock()
# 定义一个共享资源
counter = 0
# 定义一个线程函数
def increment_counter():
global counter
for i in range(100000):
lock.acquire() # 获取锁
try:
counter += 1
finally:
lock.release() # 释放锁
# 创建两个线程
_thread.start_new_thread(increment_counter, ())
_thread.start_new_thread(increment_counter, ())
# 等待线程完成
import time
time.sleep(1)
print("Counter should be 200000, and it is:", counter)
在上面的代码中,我们使用锁来确保 counter
变量的增加是原子的,防止在增加操作过程中出现竞态条件。通过 try...finally
结构,我们确保即使发生异常,锁也会被释放。
threading
模块是 Python 中用于线程操作的高级模块,提供了更易于使用的接口和更丰富的功能。以下是关于 threading.Thread
类的一些详细信息和使用方法。
threading.Thread
类
threading.Thread
类是 threading
模块的核心,它允许用户创建并控制线程。
创建线程
创建 Thread
对象时,可以使用以下参数:
-
target
: 指定线程要执行的函数。 -
args
: 传递给线程函数的参数元组。 -
kwargs
: 传递给线程函数的关键字参数字典。
以下是如何创建一个线程的示例:
import threading
# 定义一个线程要执行的函数
def fun(x, y):
print(f"Sum: {x + y}")
# 创建一个线程对象
thread1 = threading.Thread(target=fun, args=(10, 4))
启动线程
要启动线程,可以调用线程对象的 start()
方法:
# 启动线程
thread1.start()
等待线程执行完成
要等待线程执行完成,可以调用线程对象的 join()
方法:
# 等待线程执行完成
thread1.join()
使用对象作为 target
target
参数不仅可以是一个函数,也可以是一个具有 __call__
方法的对象。在这种情况下,当线程启动时,会调用对象的 __call__
方法。
以下是如何使用对象作为线程的 target
的示例:
import threading
class MyThreadTarget:
def __init__(self, x, y):
self.x = x
self.y = y
def __call__(self):
print(f"Product: {self.x * self.y}")
# 创建一个对象,这个对象将作为线程的 target
target_obj = MyThreadTarget(10, 4)
# 创建一个线程对象,target 是上面创建的对象
thread2 = threading.Thread(target=target_obj)
# 启动线程
thread2.start()
# 等待线程执行完成
thread2.join()
在这个例子中,MyThreadTarget
类的实例 target_obj
被用作 Thread
的 target
。当线程启动时,会调用 target_obj
的 __call__
方法。这种方式使得可以将线程的行为封装在一个对象中,从而提供更高级别的抽象。
创建自定义线程类
以下是如何从 threading.Thread
类派生一个子类的示例:
import threading
class MyCustomThread(threading.Thread):
def __init__(self, group=None, target=None, name=None, args=(), kwargs=None, *, daemon=None):
super().__init__(group=group, target=target, name=name, daemon=daemon)
self.args = args
self.kwargs = kwargs
def run(self):
# 这里重写父类的 run 方法,实现自定义的线程行为
# 使用 self.args 和 self.kwargs 访问传递给线程的参数
print(f"Running thread with arguments: {self.args}, keyword arguments: {self.kwargs}")
# 使用自定义线程类创建线程
custom_thread = MyCustomThread(args=(10, 20), kwargs={'x': 1, 'y': 2})
custom_thread.start() # 启动线程
custom_thread.join() # 等待线程执行完成
在这个例子中,我们创建了一个 MyCustomThread
类,它继承自 threading.Thread
。我们重写了 __init__
方法来接受额外的参数,并重写了 run
方法来定义线程启动时执行的操作。
线程锁(Lock)
线程锁用于确保同一时间只有一个线程可以执行一段代码,防止多个线程同时修改同一数据。
import threading
lock = threading.Lock()
# 获取锁
lock.acquire()
# 执行需要同步的代码块
try:
# ... 代码 ...
finally:
# 释放锁
lock.release()
有界信号量(BoundedSemaphore)
有界信号量是一个更高级的同步原语,它可以被用来限制对资源的访问数量。
import threading
MAX = 5 # 最大资源数量
semaphore = threading.BoundedSemaphore(value=MAX)
# 获取资源
semaphore.acquire()
# 使用资源
try:
# ... 代码 ...
finally:
# 释放资源
semaphore.release()
# 查看当前信号量的值
print(f"Current semaphore value: {semaphore._value}")
在这个例子中,BoundedSemaphore
初始化为 MAX
值,表示有 MAX
个资源可用。每次调用 acquire()
方法时,信号量的值减一,当信号量的值为零时,后续的 acquire()
调用将阻塞,直到某个线程调用 release()
方法释放资源。_value
属性用于查看当前信号量的值,但通常不建议直接访问内部属性,因为它可能在不同版本的 Python 中有所变化。正确的做法是使用 BoundedSemaphore
提供的公共方法来管理信号量。
协程是单线程的,每个协程是一个函数,这些函数在执行时可以不断切换,从宏观上看才体现出并发的效果。
协程函数前面要加async关键字修饰,如果要等待一个协程执行完,再执行其他代码,需要用await语句执行协程函数。
运行协程:asyncio.run(func())
import asyncio
async def my_coroutine():
print("Coroutine is running")
await asyncio.sleep(1)
print("Coroutine has finished")
# 运行协程
asyncio.run(my_coroutine())
判断是否是协程函数:asyncio.iscoroutinefunction(func)
async def another_coroutine():
pass
print(asyncio.iscoroutinefunction(my_coroutine)) # 输出: True
print(asyncio.iscoroutinefunction(another_coroutine)) # 输出: True
print(asyncio.iscoroutinefunction(print)) # 输出: False
异步执行:asyncio.create_task(func())
async def say_after(delay, what):
await asyncio.sleep(delay)
print(what)
async def main():
task1 = asyncio.create_task(say_after(1, 'hello'))
task2 = asyncio.create_task(say_after(2, 'world'))
print('started at', datetime.datetime.now())
# 等待任务完成
await task1
await task2
print('finished at', datetime.datetime.now())
asyncio.run(main())