下面是一段Tornado启动代码

handlers = [(r"/request",RequestHandler,{})]
app = web.Application(handlers=handlers)
http_server = httpserver.HTTPServer(app, max_body_size=524288000)
http_server.listen(9000)
try:
    ioloop.IOLoop.current().start()
except BaseException as e:
    print(e)

上面代码创建了一个HTTPServer,在9000端口上监听,当然HTTPServer的核心是TCPServer,毕竟HTTP也是一个基于TCP/IP通信协议来传递数据的协议,所以TCP还是关键。

TCPServer

TCPServer有两个关键属性self.io_loopself._sockets.
Tornado的解释中给出,在该方法之后必须启动IOLoop,那么我们看一下该方法到底做了什么,首先我们先看到listen()的主体:

def listen(self, port, address=""):
        sockets = bind_sockets(port, address=address)
        self.add_sockets(sockets)

首先是创建并套接字(文件描述符),将其设置值非阻塞,这也是IOLoop的关键,并在指定地址端口上监听。

def bind_sockets(port, address=None, family=socket.AF_UNSPEC,
                 backlog=_DEFAULT_BACKLOG, flags=None):
			.......
			try:
            sock = socket.socket(af, socktype, proto)
      except socket.error as e:
            if errno_from_exception(e) == errno.EAFNOSUPPORT:
                continue
            raise
			......
			sock.setblocking(0)
			sock.bind(sockaddr)
			bound_port = sock.getsockname()[1]
			sock.listen(backlog)

建立监听之后,需要做就是让服务端能够处理该套接字上接受的连接,看一下add_sockets,先是获取io_loop实例,然后逐个为套接字添加处理器(handler),当新的连接建立并有数据可读时,会触发回调函数(callback)。

def add_sockets(self, sockets):
        if self.io_loop is None:
            self.io_loop = IOLoop.current()

        for sock in sockets:
            self._sockets[sock.fileno()] = sock
            add_accept_handler(sock, self._handle_connection,
                               io_loop=self.io_loop)
															 
def add_accept_handler(sock, callback, io_loop=None):
			def accept_handler(fd, events):
						......
						callback(connection, address)
			io_loop.add_handler(sock, accept_handler, IOLoop.READ)

Tornado中IOLoop的实现首选就是EPollIOLoop,虽然是继承至PollIOLoop,但是最大的不同在于EPollIOLoop使用的是linux系统的epoll()函数实现IO的多路复用。
回到io_loop.add_handler()函数:

def add_handler(self, fd, handler, events):
        fd, obj = self.split_fd(fd)
        self._handlers[fd] = (obj, stack_context.wrap(handler))
        self._impl.register(fd, events | self.ERROR)

self._impl实际上就是epoll()函数的句柄,向系统注册对该套接字(文件描述符)感兴趣的事件为IOLoop.READ或者是self.ERROR,如果后面感兴趣的事件到来,则会取回该套接字上绑定的handler,即TCPServer._handle_connection()函数。

IOloop.start()

既然我们创建了套接字,也就是向系统注册了我们感兴趣的事件,那么系统怎么通知我们,通知后我们下一步该怎么处理,这就是IOloop.start()中的主要逻辑。start()会单线程不断该循环,通过linux的epoll机制返回已经有事件到来的套接字,event_pairs就是(fd,events)的键值对,遍历这些事件,然后调用handler进行处理。

def start(self):
			.......
			while True:
						.........
						event_pairs = self._impl.poll(poll_timeout)
						.........
						self._events.update(event_pairs)
						while self._events:
										fd, events = self._events.popitem()
										try:
													fd_obj, handler_func = self._handlers[fd]
													handler_func(fd_obj, events)
										except (OSError, IOError) as e:
													.......
			 ........

因为Tornado是一个WEB框架,那么消息来了之后,会映射到具体的RequestHandler上,这其中还会涉及到IO流的读取,HTTP协议的解析,具体就不说了,我们直接跟入RequestHandler.execute()

   @gen.coroutine
    def _execute(self, transforms, *args, **kwargs):
        """Executes this request with the given output transforms."""
        self._transforms = transforms
				.......
				if self.request.method not in ("GET", "HEAD", "OPTIONS") and \
                    self.application.settings.get("xsrf_cookies"):
                self.check_xsrf_cookie()
				result = self.prepare()
				if is_future(result):
						result = yield result
			  method = getattr(self, self.request.method.lower())
        result = method(*self.path_args, **self.path_kwargs)
				if is_future(result):
                result = yield result
				if result is not None:
						raise TypeError("Expected None, got %r" % result)
				if self._auto_finish and not self._finished:
						self.finish()
			  .......

到这里,已经很接近业务层面了,也就是get()/post()方法的,那么Tornado是如何做到它宣称的高效呢,这就涉及到IO多路复用、协程的配合了。

@gen.coroutine装饰器

上面的代码我们发现IOloop是单线程不断循环处理的,那么怎么同时处理多个IO请求,充分利用CPU资源呢?
在大部分的WEB应用中,一个HTTP请求到来之后,会涉及到很多的IO处理,例如读文件、读数据库等等,我们能不能在读数据库时候转出去处理其它到来的HTTP请求呢,等读数据库的数据到来时再回到之前的HTTP请求上继续处理,这样就可以充分利用CPU资源。
那么怎么才能做到在同一个进程中,不同子程序的异步调度呢,这就涉及到Python中的协程,想要了解协程,我们借助yield给出一个简单实现:

def a():
    print("start a")
    while True:
        x = yield 1+2
        print('end a,and recive x:', x)
def b(r):
    print("start b")
    yielded = r.send(None)
    print('return yield:', yielded)
    r.send(100)
    r.close()
    
if __name__ == "__main__":
    ret = a()
    b(ret)

这一段程序的执行结果如下:

start b
start a
return yield: 3
end a,and recive x: 100
  • 因为a()函数中有yield的存在,所以ret = a()返回一个生成器对象(GeneratorType):
  • 当我们在b中调用 r.send(None)时,启动生成器,也就是a开始执行,一直到执行完yield后的语句1+2,将结果3返回(如果该操作是异步语句那么会立即返回)
  • 此时a方法就会被挂起,回到b()方法继续执行,直到再一次向生成器发送消息1000,此时会将1000赋值方法a中的x,并执行yield后的语句。

那么Tornado中的子程序调度室怎么实现的呢,我们结合下面一个get请求进行分析:

@gen.coroutine
def get(self):
		http_client = AsyncHTTPClient()
		response = yield http_client.fetch(url)
		do_something_with_response(response)

接下来,我们看一下@gen.coroutine装饰器的代码:

def _make_coroutine_wrapper(func, replace_callback):
    @functools.wraps(func)
    def wrapper(*args, **kwargs):
        future = TracebackFuture()

        if replace_callback and 'callback' in kwargs:
            callback = kwargs.pop('callback')
            IOLoop.current().add_future(
                future, lambda future: callback(future.result()))

        try:
            result = func(*args, **kwargs)
        except (Return, StopIteration) as e:
            result = getattr(e, 'value', None)
        except Exception:
            future.set_exc_info(sys.exc_info())
            return future
        else:
            if isinstance(result, types.GeneratorType):
                try:
                    orig_stack_contexts = stack_context._state.contexts
                    yielded = next(result)
                    if stack_context._state.contexts is not orig_stack_contexts:
                        yielded = TracebackFuture()
                        yielded.set_exception(
                            stack_context.StackContextInconsistentError(
                                'stack_context inconsistency (probably caused '
                                'by yield within a "with StackContext" block)'))
                except (StopIteration, Return) as e:
                    future.set_result(getattr(e, 'value', None))
                except Exception:
                    future.set_exc_info(sys.exc_info())
                else:
                    Runner(result, future, yielded)
                try:
                    return future
                finally:
                    future = None
        future.set_result(result)
        return future
    return wrapper
  • 执行完result = func(*args, **kwargs)后返回一个生成器对象(GeneratorType);
  • 调用next(),启动生成器,异步执行并返回http_client.fetch(url)结果future;
  • 调用Runner,在该逻辑中future对象得到result的时候会调用future.add_done_callback添加的callback,再将其转至ioloop执行;

对比测试

在我们的实际场景中,也不完全是IO密集的应用,也有很多无法利用Tornado提供的异步接口,那么怎么办呢,我们可以借助ThreadPoolExecutor进行异步,我们看一下如下两端代码:
(1)基于ThreadPoolExecutor

import time
from tornado import web, gen, httpclient, ioloop
from concurrent.futures import ThreadPoolExecutor

class RequestHandler(web.RequestHandler):
    executor = ThreadPoolExecutor(30)

    def initialize(self):
        self.set_header('Content-Type', 'text/html; charset=UTF-8')
    @gen.coroutine
    def get(self):
        res = yield self.sleep()
        self.write("sleep %f s" % res)
        self.finish()
	  @run_on_executor
		 def sleep(self):
				 time.sleep(1)
				 return 1

(2)没有基于ThreadPoolExecutor

class RequestHandler(web.RequestHandler):
    def initialize(self):
        self.set_header('Content-Type', 'text/html; charset=UTF-8')
    @gen.coroutine
    def get(self):
        time.sleep(1)
        self.write("sleep %f s" % 1)
        self.finish()

我们使用20个并发进行压测时,结果如下,第一张图是基于ThreadPoolExecutor,第二张图是没有基于ThreadPoolExecutor的结果:
Base On ThreadPoolExecutor
Sleep

因为tornado是单进程处理,所以没有基于ThreadPoolExecutor的处理方式,是逐个请求进行处理,吞吐基本上是1s一个请求,而基于ThreadPoolExecutor的处理充分利用了IO多路复用的特性,虽然也是单进程的处理,但是他将耗时处理做了线程异步,所以吞吐能够比较接近我们的并发数。

另外也有一些repo给tornado开发提供了一些封装:tornado-threadpool,使得我们在我们我们的应用时,不必太过关注这个方法是同步怎么处理异步怎么处理,只需要在方法上加对应的装饰器即可。

建议

通常,在开发tornado应用时我们需要按照如下顺序考虑IO策略:

  • Use an async library if available (e.g. AsyncHTTPClient instead of requests).

  • Make it so fast you don't mind doing it synchronously and blocking the IOLoop. This is most appropriate for things like memcache and database queries that are under your control and should always be fast. If it's not fast, make it fast by adding the appropriate indexes to the database, etc.

  • Do the work in a ThreadPoolExecutor. Remember that worker threads cannot access the IOLoop (even indirectly) so you must return to the main thread before writing any responses.

  • Move the work out of the tornado process. If you're sending email, for example, just write it to the database and let another process (whose latency doesn't matter) read from the queue and do the actual sending.

  • Block the IOLoop anyway. This is the lazy way out but may be acceptable in some cases.

FROM:Threading and concurrency