基于Tornado方便业务编写的封装

2017-10-05  本文已影响0人  an_owl

Tornado是异步非阻塞Web框架,能抗住每秒可以处理数以千计的连接。背后使用了Epoll,是一个高性能Web框架。

但是在使用Tornado作为框架来做业务逻辑编写,就会发现,虽然能抗住数以千计的连接,Tornado只有一个主线程。而编写业务肯定需要有耗时的过程逻辑,比如数据库的操作。这个时候,单线程的Tornado就特别容易阻塞在耗时逻辑上。

要想很好的适用业务逻辑,我的设计是基于Tornado引入线程池,并且能方便操作数据库,对Sqlalchemy包装一下。目前这种设计经历了1年多的项目运行,稳定运行。特别适合高并发不是特别要求高的业务情况。

多线程装饰器

建立线程池很简单,编写装饰器 @thread_executor,这样就可以很轻易的挂载装饰器来把业务逻辑压入线程池中运行。

executor = ThreadPoolExecutor(8)

def thread_executor(fn):
    @functools.wraps(fn)
    def wrapper(self, *args, **kwargs):
        future = executor.submit(fn, self, *args, **kwargs)
        return future
    return wrapper

多线程带Session数据库操作装饰器


sqlalchemy_session = sessionmaker(bind=engine)

def thread_db_session_executor(fn):
    """
    fn 是一个方法, @thread_db_session_executor 装饰后,将自动转入线程池里面运行。
    fn 的参数会自动追加一个session,为sqlalchemy的DB数据操作。 例如 原为fn(a,b) 会变成fn(a,b,session)
    fn 运行完成后 session自动关闭,请不要再fn内关闭。
    如果 fn 运行异常,数据库会自动回滚,fn返回值变成异常对象。
    """
    @thread_executor
    @functools.wraps(fn)
    def wrapper(self, *args, **kwargs):
        start_time = time.time()
        try:
            session = sqlalchemy_session()
            new_fn = functools.partial(fn, session=session)
            result = new_fn(self, *args, **kwargs)
        except Exception as e:
            try:
               logging.exception(
                    "\n%s \n--function-->>\n%s -args: %s \n%s -kwargs: %s\n<<--function--\n"
                    % (str(e), fn.__name__, str(args), fn.__name__, str(kwargs))
                )
                session.rollback()
            except Exception as e2:
                pass
            result = e
        finally:
            session.close()
        return result
    return wrapper

封装Tornado的RequestHandler(json版本)

class SimpleThreadDBSessionHandler(tornado.web.RequestHandler):
    """
    SimpleThreadDBSessionHandler 实例化后,会在线程池里面获得一个线程,并且获得DB的一个session(操作数据库)。
    重载 on_post, on_get 来实现业务,方法执行线程安全。执行完成后,线程自动回收,session自动关闭。

    post request body 是json格式
    post get response body 是json
    返回结果 也是 json
    """

    @tornado.web.asynchronous
    @tornado.gen.coroutine
    def post(self):
        try:
            post_body_data = self.measure_post_body()
            result = yield self._on_post(post_body_data)
        except Exception as e:
            result = e
            print e
        if isinstance(result, Exception) or result is None:
            exception_response_result = self.exception_response(result)
            if exception_response_result is None:
                raise result
            else:
                result = exception_response_result
        self.write(self.measure_response_body(result))
        self.finish()

    @thread_db_session_executor
    def _on_post(self, post_body_data, session):
        return self.on_post(post_body_data, session)

    def on_post(self, post_body_data, session):
        """
        请重载此函数处理
        """
        raise tornado.web.HTTPError(405)

    def measure_post_body(self):
        """
        处理 request body的数据 转成json格式字典
        :return:
        """
        return utils_json_string_to_dict(self.request.body.decode('utf-8'))

通过以上的编写,在编写业务逻辑的时候,使用就特别方便。

class TestHandler(SimpleThreadDBSessionHandler):

    def on_post(self, post_body_data, session):
        user = session.query(User).filter(User.id == 1).first()
        return {
            'user_name': user.nickname
        }

业务逻辑(POST) 只要在def on_post(self, post_body_data, session)里面实现就可以了,
on_post 带有一个操作数据库的session,并且通过使用@thread_db_session_executor的,on_post的逻辑就放入线程池里面运行,完成以后再返回到主线程里面返回。

就算中间发生异常,异常也会被捕捉,数据库回滚。

上一篇下一篇

猜你喜欢

热点阅读