无标题文章

2014-05-09  本文已影响0人  河自清

1. Stackless Python简介 

  

Stackless Python是CPython的增强版本,它实现了一个合作式多任务系统。和抢占式多任 

务系统不同,Stackless中的每个任务(tasklet)必须主动放弃对处理器的控制,才能实现 

多任务。 

  

严格地说,合作式多任务系统都可以通过简单的循环实现。但是使用Stackless可以让代码 

更清楚,同时也让程序员不必考虑复杂的同步控制,因为实际上实现合作式多任务系统的循 

环并不简单。另一方面,这种合作式多任务系统的资源消耗比传统线程小得多,加上 

Python有伟大的GIL,所以Stackless的效率还是很不错的。 

  

Stackless Python的主页是

http://www.stackless.com

,上面可以下载源代码和预编译的版 

本。同时也有不少文档——多是介绍性质的,缺少手册。幸好,使用Stackless Python不需 

要厚厚的手册,看了主页上的入门加上help()就足够了。Python的self-documenting就是好 

!下面有关Stackless的介绍可能尚不敷用,所以如果对Stackless Python感兴趣最好还是 

啃啃主页上的英文。 

  

1.1. tasklet & run 

  

Stackless中一个任务被称作一个tasklet,可以这样创建一个tasklet: 

stackless.tasklet(a_callable)(args)。之后处理这个任务就是运行a_callable(args) 

  

创建了一个或多个tasklet之后,可以用stackless.run()来运行这些任务。实际上可以认为 

run()是进入了多任务系统,如果还有可运行的任务(没有运行完或者阻塞在某个channel上 

),那么run()就会持续运行。 

  

1.2. schedule 

  

当在一个tasklet中运行stackless.schedule(),当前的任务就被挂起,并放在运行循环的 

最后。下面一个程序是一个死循环: 

  

import stackless 

def idle(): 

     while True: 

         print 'idle' 

         stackless.schedule() 

stackless.tasklet(idle)() 

stackless.run() 

  

1.3. channel: send & receive 

  

channel是tasklet之间的同步方式,使用stackless.channel()就能建立一个通信用的 

channel。channel可以传送任何Python对象。下面假设ch = stackless.channel()。 

  

ch.receive()返回从ch中收到的对象,如果ch中没有对象,则该tasklet被阻塞。 

  

ch.send(obj)把obj送入ch,如果已经有其他tasklet执行了ch.receive(),则当前tasklet 

停止运行并成为最后一个tasklet,而阻塞在ch.receive()的tasklet继续运行。如果没有, 

则当前tasklet阻塞在ch.send(obj)。 

  

这又是一个死循环: 

  

import stackless 

ch = stackless.channel() 

def ping(): 

     while True: 

         print ch.receive() 

         ch.send('ping') 

def pong(): 

     while True: 

         print ch.receive() 

         ch.send('pong') 

stackless.tasklet(ping)() 

stackless.tasklet(pong)() 

ch.send('start') 

stackless.run() 

  

2. 基于Stackless的多任务:三种行为模式 

  

概念基本上来自Introduction to Concurrent Programming with Stackless Python,分别 

是: 

  

Daemon——每个周期都要运行。 

class Daemon(object): 

     def __init__(self): 

         stackless.tasklet(self)() 

  

     def __call__(self): 

         while True: 

             self.run() 

             stackless.schedule() 

  

Task——只运行一次。 

class Task(object): 

     def __init__(self): 

         stackless.tasklet(self)() 

  

     def __call__(self): 

         self.run() 

  

Handler——通过channel处理请求,没有请求就阻塞。如果收到一个nil请求,则结束运行。 

class Handler(object): 

     def __init__(self): 

         self.channel = stackless.channel() 

         stackless.tasklet(self)() 

  

     def __call__(self): 

         is_alive = True 

         while is_alive: 

             message = self.channel.receive() 

             if message: 

                 is_alive = self.run(message) 

             else: 

                 is_alive = False 

  

在游戏中,Daemon实现一些管理功能,每个Object可能都要有一个Handler与之对应(反正 

大多数Handler都是阻塞的),但因为Handler模型需要分析收到的message,所以有的时候 

也可以用Task。 

  

3. 网络连接 

  

网络连接当然要使用异步socket,我从Python In A Nutshell抄了一个asyncore+asynchat 

的例子,放在子线程里运行asyncore.loop()。我没有用www.stackless.com提供的一个 

stacklesssocket,因为大概看了一下stacklesssocket.py,觉得好像有些问题。 

  

4. PyMud01 

  

代码:实现了一个echo server,利用了Daemon-Handler-Task三级模型。 

network_thread通过conn_manager.queue和主线程通信, 

EchoDaemon给每个不同的连接建立一个EchoHandler, 

收到信息时发给相应的EchoHandler,EchoHandler建立一个EchoTask。 

最后由EchoTask完成回显。 

  

import stackless 

import asyncore, asynchat, socket 

import threading, Queue 

  

class Monitor(asyncore.dispatcher): 

     def __init__(self, conn_manager, addr='', port=9000): 

         asyncore.dispatcher.__init__(self) 

         self.create_socket(socket.AF_INET, socket.SOCK_STREAM) 

         self.bind((addr, port)) 

         self.listen(5) 

         self.conn_manager = conn_manager 

          

     def handle_accept(self): 

         conn, addr_port = self.accept() 

         self.conn_manager.Create(conn, addr_port) 

  

class Connection(asynchat.async_chat): 

     def __init__(self, conn, addr_port, conn_manager): 

         asynchat.async_chat.__init__(self, conn) 

         self.set_terminator('\r\n') 

         self.addr_port = addr_port 

         self.data_pieces = [] 

         self.conn_manager = conn_manager 

          

     def collect_incoming_data(self, data): 

         self.data_pieces.append(data) 

  

     def found_terminator(self): 

         self.conn_manager.Receive(self.addr_port, ''.join(self.data_pieces)) 

         self.data_pieces = [] 

  

     def handle_close(self): 

         self.conn_manager.Close(self.addr_port) 

         self.close() 

          

class ConnectionManager(object): 

     def __init__(self): 

         self.queue = Queue.Queue() 

         self.conn_table = {} 

          

     def Create(self, conn, addr_port): 

         self.conn_table[addr_port] = Connection(conn, addr_port, self) 

  

     def Receive(self, addr_port, data): 

         self.queue.put_nowait((addr_port, data)) 

  

     def Send(self, addr_port, data): 

         self.conn_table[addr_port].push(data) 

  

     def Close(self, addr_port): 

         self.conn_table[addr_port] = None 

  

     def IsConnect(self, addr_port): 

         if self.conn_table.has_key(addr_port): 

             return bool(self.conn_table[addr_port]) 

         else: 

             return False 

          

conn_manager = ConnectionManager() 

  

class NetworkThread(threading.Thread): 

     def run(self): 

         Monitor(conn_manager) 

         asyncore.loop() 

      

network_thread = NetworkThread() 

network_thread.start() 

  

class Daemon(object): 

     def __init__(self): 

         stackless.tasklet(self)() 

  

     def __call__(self): 

         while True: 

             self.run() 

             stackless.schedule() 

  

class Task(object): 

     def __init__(self): 

         stackless.tasklet(self)() 

  

     def __call__(self): 

         self.run() 

  

class Handler(object): 

     def __init__(self): 

         self.channel = stackless.channel() 

         stackless.tasklet(self)() 

  

     def __call__(self): 

         is_alive = True 

         while is_alive: 

             message = self.channel.receive() 

             if message: 

                 is_alive = self.run(message) 

             else: 

                 is_alive = False 

  

class EchoTask(Task): 

     def __init__(self, addr_port, data): 

         Task.__init__(self) 

         self.addr_port = addr_port 

         self.data = data 

  

     def run(self): 

         print 'task:', stackless.getruncount(), self.addr_port, self.data 

         conn_manager.Send(self.addr_port, self.data+'\r\n') 

  

class EchoHandler(Handler): 

     def run(self, message): 

         print 'handler:', stackless.getruncount(), message 

         EchoTask(message[0], message[1]) 

         return True 

      

class EchoDaemon(Daemon): 

     def __init__(self): 

         Daemon.__init__(self) 

         self.handler_table = {} 

          

     def run(self): 

         for key in self.handler_table.keys(): 

             if not conn_manager.IsConnect(key): 

                 self.handler_table[key].channel.send(None) 

                 del self.handler_table[key] 

         try: 

             message = conn_manager.queue.get_nowait() 

         except Queue.Empty: 

             return 

         print 'daemon:', message 

          

         if not self.handler_table.has_key(message[0]): 

             self.handler_table[message[0]] = EchoHandler() 

             print 'daemon:', len(self.handler_table) 

  

         self.handler_table[message[0]].channel.send(message) 

  

EchoDaemon() 

  

stackless.run() 

上一篇下一篇

猜你喜欢

热点阅读