gevent笔记与源码分析

基本概念

coroutine

Coroutine 也就是 corporate routine,中文名就是协程,从它的英文可以看出是协同 的例程的意思,
实际上这个概念和进程与线程有相似之处, 因为linux线程就是所谓的
轻量级进程,所以我们来比较一下进程与协程的异同:

  • 相同点:二者都是可以看做是一种执行流, 该执行流可以挂起,并且在将来又可以在 你挂起的地方恢复执行,
    这实际上都可以看做是continuation, 我们来看看当我们挂
    起一个执行流时我们要保存的东西
    1. , 因为如果你不保存栈,那么局部变量你就无法恢复,同时函数的调用链你也无 法恢复,
    2. 寄存器的状态: 这好理解, 比如说EIP,如果你不保存,那么你恢复执行流就不知道 到底执行哪一条指令,
      在比如说ESP,EBP, 如果你不保存,那么你即便有完整的栈 你也不知道怎么用.
      这二者实际就是所谓的上下文,也可以说是continuation. 在执行流切换时必须保存 这两个东西, 内核调度进程时也是一回事.
  • 不同点:
    1. 执行流的调度者不同, 进程是内核调度, 而协程是在用户态调度, 也就是说进程
      的上下文是在内核态保存恢复的,而协程是在用户态保存恢复的.
      很显然用户态的 代价更低
    2. 进程会被抢占,而协程不会,也就是说协程如果不主动让出CPU,那么其他的协程是不
      可能得到执行机会,这实际和早期的操作系统类似,比如DOS,
      它有一个yield原语, 一个进程调用yield,那么它就会让出CPU, 其他的进程也就有机会执行了, 如果一
      个进程进入了死循环,那么整个系统也就挂起了,永远无法运行其他的进程了, 但
      对协程而言,这不是问题
    3. 对内存的占用不同,实际上协程可以只需要4K的栈就够了, 而进程占用的内存要大 的多.
    4. 从操作系统的角度讲, 多协程的程序是单线程,单进程的

gevent背景知识

gevent用到了了libev以及greenlet还有cares,下面简单的介绍这几个库.

greenlet

实际是一个协程库(官方叫micro-thread), 它只提供协程本身,要在协程间切 换调度必须你在程序中手动来进行,直接上代码:

1
2
3
4
5
6
7
8
9
10
11
def test1(x, y):
z = gr2.switch(x+y)
print z

def test2(u):
print u
gr1.switch(42)

gr1 = greenlet(test1)
gr2 = greenlet(test2)
gr1.switch("hello", " world")

gr1, gr2以及运行该代码的解释器本身都是协程, 协程对象的switch方法用 来切换,比如 gr2.switch 就是切换到gr2.

api

  1. greenlet(run=None, parent=None): 创建一个greenlet实例.
  2. greenlet.getcurrent:

实例方法与属性

  1. gr.parent:每一个协程都有一个父协程,当前协程结束后会回到父协程中执行,该 属性默认是创建该协程的协程.
  2. gr.run: 该属性是协程实际运行的代码. run方法结束了,那么该协程也就结束了.
  3. gr.switch(*args, **kwargs): 切换到gr协程.
  4. gr.throw(): 切换到gr协程,接着抛出一个异常.

libev

和libevent类似, libev是一个事件循环库,他可以监听各个file descriptor, 一旦发现 就绪就调用对应的回调函数,
gevent内部就是使用libev来监听socket的.为了后续理解 方便有必要对libev进行简单的了解.

  1. watcher: 实际上是用来封装各种类型的事件的,不同类型的事件会有不同类型的 watcher, 比如 ev_io,
    ev_timer, 该结构一般会有一个回调函数,当事件触发 使就会调用回调函数.
    watcher会有两种函数(注意TYPE代表watcher类型,可以是 io, timer,
    signal等等):
    • ev_TYPE_init: 对watcher对象进行初始化, 对IO而言该函数是 ev_io_init,
      对timer而言,该函数是 ev_timer_init.
    • ev_TYPE_set: 与init系列函数的区别是该函数一般不设置callback
    • ev_TYPE_start: 将watcher注册到事件循环中,这样就可以监听事件了.
  2. loop: 事件循环

来看看示例代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
// a single header file is required
#include <ev.h>

#include <stdio.h> // for puts

// every watcher type has its own typedef'd struct
// with the name ev_TYPE
ev_io stdin_watcher;
ev_timer timeout_watcher;

// all watcher callbacks have a similar signature
// this callback is called when data is readable on stdin
static void
stdin_cb (EV_P_ ev_io *w, int revents)
{
puts ("stdin ready");
// for one-shot events, one must manually stop the watcher
// with its corresponding stop function.
ev_io_stop (EV_A_ w);

// this causes all nested ev_run's to stop iterating
ev_break (EV_A_ EVBREAK_ALL);
}

// another callback, this time for a time-out
static void
timeout_cb (EV_P_ ev_timer *w, int revents)
{
puts ("timeout");
// this causes the innermost ev_run to stop iterating
ev_break (EV_A_ EVBREAK_ONE);
}

int
main (void)
{
// use the default event loop unless you have special needs
struct ev_loop *loop = EV_DEFAULT;

// initialise an io watcher, then start it
// this one will watch for stdin to become readable
ev_io_init (&stdin_watcher, stdin_cb, /*STDIN_FILENO*/ 0, EV_READ);
ev_io_start (loop, &stdin_watcher);

// initialise a timer watcher, then start it
// simple non-repeating 5.5 second timeout
ev_timer_init (&timeout_watcher, timeout_cb, 5.5, 0.);
ev_timer_start (loop, &timeout_watcher);

// now wait for events to arrive
ev_run (loop, 0);

// break was called, so exit
return 0;
}

获得loop对象, 创建一个io watcher,一个timer watcher, 分别初始化(调用init函
数),然后调用start注册回调函数到事件循环中, 接着调用ev_run启动事件循环.

gevent源码分析

开始之前先申明: 本文分析的是gevent1.0.

core

core.ppyx文件实际上是用Cython写的代码,在Makefile中有这样一行代码:

1
2
3
4
5
6
gevent/gevent.core.c: gevent/core.ppyx gevent/libev.pxd
$(PYTHON) util/cythonpp.py -o gevent.core.c gevent/core.ppyx
echo >> gevent.core.c
echo '#include "callbacks.c"' >> gevent.core.c
mv gevent.core.* gevent/

上面的代码告诉我们core.ppyx会先转换为gevent.core.c这个C语言文件,然后在编译成 动态链接库,它的语法基本能看懂,
这个模块主要是实现loop这个类, hub对象中的
loop就是这个类的一个对象(注意名字虽然相同但一个是类,一个对象,不要弄混),这个
类将libev的事件循环机制封装了起来,我们先熟悉下这个类提供的API,熟悉这些API对
使用gevent很有帮助,我假设lp是loop类的对象(在gevent中就是get_hub().loop)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
def io(self, int fd, int events, ref=True, priority=None):
return io(self, fd, events, ref, priority)

def timer(self, double after, double repeat=0.0, ref=True, priority=None):
return timer(self, after, repeat, ref, priority)

def signal(self, int signum, ref=True, priority=None):
return signal(self, signum, ref, priority)

def idle(self, ref=True, priority=None):
return idle(self, ref, priority)

def prepare(self, ref=True, priority=None):
return prepare(self, ref, priority)

def fork(self, ref=True, priority=None):
return fork(self, ref, priority)

def async(self, ref=True, priority=None):
return async(self, ref, priority)

注意上面是Cython, 上面的一系列方法实际是libev中watcher的等价物.比如你调用 lp.io(fd, 1),
就创建了一个监听fd的read事件的watcher对象,至于其它的api都是 类似,
每一个watcher对象都有一个 start 方法, 该方法接受一个回调函数以及一系 列传递给回调函数的参数,
调用该方法就会将watcher对象注册到libev的事件循环上, 看下面的示例:

1
2
read_watcher = lp.io(fd, 1)
read_watcher.start(cb, args)

运行上面的两行代码,那么当fd上读就绪时,那么就会调用cb函数,并且会把args传递给
cb函数.在gevent中回调函数一般是协程的switch方法,
这样一旦调用,那么就切换到 另一个协程中去执行.

core源码分析

这一节来分析core.ppyx的源码, 在一次提醒你注意,代码是Cython, 我对Cython也不 太熟,但是代码大致能看懂.
同时要明白该模块是对libev的封装, libev中loop,
watcher,callback在该模块中都有对应物, 所以你要注意当提到loop时,我们到底是
该模块的loop还是libev的loop.

  1. 先看callback

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    cdef public class callback [object PyGeventCallbackObject, type PyGeventCallback_Type]:
    cdef public object callback
    cdef public tuple args

    def __init__(self, callback, args):
    self.callback = callback
    self.args = args

    def stop(self):
    self.callback = None
    self.args = None

    实际上就是把回调函数以及要提供给回调函数的参数封装了起来.

  2. loop: 事件循环的封装.

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    cdef public class loop [object PyGeventLoopObject, type PyGeventLoop_Type]:
    cdef libev.ev_loop* _ptr
    cdef public object error_handler
    cdef libev.ev_prepare _prepare
    cdef public list _callbacks
    cdef libev.ev_timer _timer0

    def __init__(self, object flags=None, object default=None, size_t ptr=0):
    cdef unsigned int c_flags
    cdef object old_handler = None
    libev.ev_prepare_init(&self._prepare, <void*>gevent_run_callbacks)
    libev.ev_timer_init(&self._timer0, <void*>gevent_noop, 0.0, 0.0)
    if ptr:
    self._ptr = <libev.ev_loop*>ptr
    else:
    ......

    self._callbacks = []
    1. _ptr: libev的一个ev_loop对象.
    2. _prepare: libev中的prepare watcher,该watcher注册的回调函数会在事件循环进 入阻塞时调用,
      从代码中可以看到注册的回调函数是 gevent_run_callbacks, 该函数会运行 _callbacks
      列表中的每一个callback实例.
    3. _callbacks: 一个列表,实际上当你使用gevent的spawn创建协程时, spawn会在
      该列表中插入一个callback实例, 该实例的回调函数实际就是你创建的
      greenlet的switch方法, 这样当 _prepare watcher就绪时,新的协程就有了 启动的机会.
  3. watcher: 这是libev的watcher对象的封装,作为例子,我只分析io这一个例 子,timer,signal等等都是相似的,
    为了方便我使用cwatcher来指代libev中的 watcher.

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37

    #define WATCHER_BASE(TYPE) \
    cdef public loop loop \
    cdef object _callback \
    cdef public tuple args \
    cdef readonly int _flags \
    cdef libev.ev_##TYPE _watcher \

    cdef public class io(watcher) [object PyGeventIOObject, type PyGeventIO_Type]:

    WATCHER_BASE(io)

    def start(self, object callback, *args, pass_events=False):
    CHECK_LOOP2(self.loop)
    if callback is None:
    raise TypeError('callback must be callable, not None')
    self.callback = callback
    if pass_events:
    self.args = (GEVENT_CORE_EVENTS, ) + args
    else:
    self.args = args
    LIBEV_UNREF
    libev.ev_io_start(self.loop._ptr, &self._watcher)

    def __init__(self, loop loop, int fd, int events, ref=True, priority=None):
    if fd < 0:
    raise ValueError('fd must be non-negative: %r' % fd)
    if events & ~(libev.EV__IOFDSET | libev.EV_READ | libev.EV_WRITE):
    raise ValueError('illegal event mask: %r' % events)
    libev.ev_io_init(&self._watcher, <void *>gevent_callback_io, fd, events)
    self.loop = loop
    if ref:
    self._flags = 0
    else:
    self._flags = 4
    if priority is not None:
    libev.ev_set_priority(&self._watcher, priority)
    1. WATCH_BASE, 它实际上定义了一系列的属性:

      • loop: 实际是上面分析的loop类的一个实例
      • _watcher: cwatcher对象,也就是一个libev的ev_io对象.
      • callback: 回调函数, 注意该回调函数是由上层传递进来,它不是由libev直接
        调用,而是由libev的回调函数调用,具体到本例就是被
        gevent_callback_io 调用.
      • args: 一个元组,传递给回调函数的参数
    2. init:
      该函数会设置loop属性,同时初始化libev的io watcher对象 _watcher (主要做两件事:
      指定事件类型,指定回调函数), 注意它的回调函数 是 gevent_callback_io

    3. start: 该函数中 会设置回调函数以及参数, 这里设置的回调函数是上层传入的, 不要和libev的回调函数混淆, 同时调用
      ev_io_start 将该watcher注册到 libev的事件循环中. 为了弄明白libev事件循环的过程,我接下来分析
      gevent_callback_io.

    4. gevent_callback_io

      1
      2
      3
      4
      5
      6
      7
      #define GET_OBJECT(PY_TYPE, EV_PTR, MEMBER)                             \
      ((struct PY_TYPE *)(((char *)EV_PTR) - offsetof(struct PY_TYPE, MEMBER)))

      static void gevent_callback_io(struct ev_loop *_loop, void *c_watcher, int revents) {
      struct PyGeventIOObject* watcher = GET_OBJECT(PyGeventIOObject, c_watcher, _watcher);
      gevent_callback(watcher->loop, watcher->_callback, watcher->args, (PyObject*)watcher, c_watcher, revents);
      }

      GET_OBJECT的作用是通过结构体中某一个域的指针来获得整个结构体的指针. 如果
      你熟悉linux内核就会发现它和container_of的功能很相似.
      所以这里实际就是根 据cwatcher对象_watcher来获得watcher的指针, 接着就调用
      gevent_callback.

      1
      2
      3
      4
      5
      6
      7
      static void gevent_callback(struct PyGeventLoopObject* loop, PyObject* callback,
      PyObject* args, PyObject* watcher, void *c_watcher,
      int revents) {
      ......
      result = PyObject_Call(callback, args, NULL);
      ......
      }

      所以该函数就调用了上层传入的callback.

core的api总结

假设Loop代表类, loop代表实例

  1. loop.run: 启动事件循环
  2. loop.run_callback(fun, *args): 将fun注册给loop的_prepare watcher,这样
    fun就会在事件循环要阻塞时运行, spawn以及rawlink都会使用该方法.
  3. loop.io: 创建一个IO watcher实例, 调用该实例的start方法来注册回调函数,同 时将该watcher放入事件循环.
  4. loop.timer: 创建Timer Watcher对象
  5. loop.signal: 创建signal Watcher对象
  6. loop.idle:
  7. loop.prepare:
  8. loop.fork:

注意使用io,timer, signal 等方法创建watcher对象后, 必须调用该对象start方法
才能将watcher注册到事件循环中

HUB

这实际上是greenlet的子类,所以它的每一个实例实际上就代表一个协程,这个类创建的 协程是专门用来运行事件循环的.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
class Hub(greenlet):
...

NOT_ERROR = (GreenletExit, SystemExit)
loop_class = config('gevent.core.loop', 'GEVENT_LOOP')
...
backend = config(None, 'GEVENT_BACKEND')
...

def __init__(self, loop=None, default=None):
greenlet.__init__(self)
if hasattr(loop, 'run'):
...
else:
...
loop_class = _import(self.loop_class)
if loop is None:
loop = self.backend
self.loop = loop_class(flags=loop, default=default)
...

创建一个hub实例, 这个实例最重要的就是loop属性,这个实际就是core模块的loop类的 实例,也就是说是libev的事件循环的封装.

1
2
3
4
5
6
7
8
9
10
def run(self):
assert self is getcurrent(), 'Do not call Hub.run() directly'
while True:
loop = self.loop
loop.error_handler = self
try:
loop.run()
finally:
loop.error_handler = None # break the refcount cycle
self.parent.throw(LoopExit('This operation would block forever'))

这个方法就是协程的入口函数,它内部实际是一个循环, 这个循环就是用来启动libev的 事件循环的. 该函数一般是在调用 hub.switch
时开始运行的.

Waiter

协程间的通信机制.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
class Waiter(object):
def __init__(self, hub=None):
if hub is None:
self.hub = get_hub()
else:
self.hub = hub
self.greenlet = None
self.value = None
self._exception = _NONE

def get(self):
"""If a value/an exception is stored, return/raise it. Otherwise until switch() or throw() is called."""
if self._exception is not _NONE:
if self._exception is None:
return self.value
else:
getcurrent().throw(*self._exception)
else:
assert self.greenlet is None, 'This Waiter is already used by %r' % (self.greenlet, )
self.greenlet = getcurrent()
try:
return self.hub.switch()
finally:
self.greenlet = None

def switch(self, value=None):
"""Switch to the greenlet if one's available. Otherwise store the value."""
greenlet = self.greenlet
if greenlet is None:
self.value = value
self._exception = None
else:
assert getcurrent() is self.hub, "Can only use Waiter.switch method from the Hub greenlet"
switch = greenlet.switch
try:
switch(value)
except:
self.hub.handle_error(switch, *sys.exc_info())

该类的实例有一个value属性, 一个_expception属性, 一个get方法,一个switch方法,他 们的行为是这样的:

  1. get: 当你在一个协程中调用get方法时, 它会先检查_exception的值,如果不为默 认的_NONE,
    那么它就会根据value属性的值来决定是返回value的值还是抛出异 常,
    如果_exception为默认值, 它会设置greenlet属性为当前的协程对象,接着就 会切换到hub协程.
  2. switch: 实际就是调用Waiter对象的greenlet属性的switch方法, 这样就切换到 了对应的协程.
    一般会注册到某个watcher的回调函数. 如果greenlet属性为
    None,那么意味着switch在get之前运行了,那么就简单的设置下value以
    及_exception属性.

需要等待的协程调用get方法,这样该协程就会挂起, 其他的协程调用switch方法切换 到因等待而挂起的协程,
我们来看看Waiter的一个使用例子, Hub的wait方法的代 码:

1
2
3
4
5
6
7
8
9
10
11
12
class Hub(greenlet):
...
def wait(self, watcher):
waiter = Waiter()
unique = object()
watcher.start(waiter.switch, unique)
try:
result = waiter.get()
assert result is unique, 'Invalid switch into %s: %r (expected %r)' % (getcurrent(), result, unique)
finally:
watcher.stop()
...

wait方法的作用是挂起当前的协程,直到watcher监听的事件就绪.它创建一个
Waiter实例waiter,接着将waiter的switch方法注册到wacher上,这样当watcher监听的
事件就绪后就会调用实例的switch方法,接着就调用waiter的get方法, 根据watcher监
听的事件就绪的快慢,这里有两种可能:

  1. get在switch之前运行: get会设置waiter的greenlet属性为当前执行的协程, 接着 切换到hub,
    当将来某个时候事件就绪,那么调用waiter的switch,switch会调用
    greenlet属性的switch方法,这样就切换回了当前运行的协程.
  2. get在switch之后运行: 这种情况比较少见,可是也是存在的, 这种情况下运行
    switch时,waiter对象的greenlet属性为None, 所以switch方法只是简单的设置
    waiter的value属性, 接着调用get会直接返回value属性,而不阻塞.注意不要弄 混_NONE与None.

Greenlet

这也是一个greenlet的子类,它也是用来产生协程的, 我们先来看看我们创建协程时常 用的spawn函数的源码:

1
2
3
4
5
6
7
8
9
10
11
12
13
class Greenlet(greenlet):
def __init__(self, run=None, *args, **kwargs):
hub = get_hub()
greenlet.__init__(self, parent=hub)

@classmethod
def spawn(cls, *args, **kwargs):
"""Return a new :class:`Greenlet` object, scheduled to start.
The arguments are passed to :meth:`Greenlet.__init__`.
"""
g = cls(*args, **kwargs)
g.start()
return g

gevent.spawn实际就是Greenlet类的spawn方法,该方法直接创建一个Greenlet实例,注
意该实例的parent是hub,而不是默认的主协程, 这样的用处是当协程完成退出时,程序
会继续执行hub的事件循环.然后调用实例的start方法, 下面看看start方法的代码

1
2
3
4
def start(self):
"""Schedule the greenlet to run in this loop iteration"""
if self._start_event is None:
self._start_event = self.parent.loop.run_callback(self.switch)

start方法实际上就是把该实例丢到hub协程的循环当中,也就是说这个新建的协程就可 以被hub调度了.

1
2
3
4
5
6
def run_callback(self, func, *args):
CHECK_LOOP2(self)
cdef callback cb = callback(func, args)
self._callbacks.append(cb)
libev.ev_ref(self._ptr)
return cb

上面的代码先创建一个callback实例cb,接着将这个实例放进_callbacks列表中, 在core
部分我们分析了_callbacks列表的所有callback实例都会被_prepare watcher的回调
函数 gevent_run_callbacks 运行, 这样实际就是启动了协程.

socket模块

我们知道为了发挥协程的威力, 我们不能使用标准socket库,必须使用gevent实现的 socket库,
现在我们来分析一下gevent的socket模块,看看该模块是如何使用协程的,
我这里以socket的recv方法为例. 假设调用recv方法的协程为gr.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
class socket(object):

def __init__(self, family=AF_INET, type=SOCK_STREAM, proto=0, _sock=None):
if _sock is None:
self._sock = _realsocket(family, type, proto)
self.timeout = _socket.getdefaulttimeout()
else:
if hasattr(_sock, '_sock'):
self._sock = _sock._sock
self.timeout = getattr(_sock, 'timeout', False)
if self.timeout is False:
self.timeout = _socket.getdefaulttimeout()
else:
self._sock = _sock
self.timeout = _socket.getdefaulttimeout()
self._sock.setblocking(0)
fileno = self._sock.fileno()
self.hub = get_hub()
io = self.hub.loop.io
self._read_event = io(fileno, 1)
self._write_event = io(fileno, 2)

__init__很简单,创建一个socket(self._sock),将该描述符设置为非阻塞,同时创建两个
watcher,分别监听读事件(self._read_event)以及写事件(self._write_event),下面
看看recv的代码:

1
2
3
4
5
6
7
8
9
10
11
12
def recv(self, *args):
sock = self._sock # keeping the reference so that fd is not closed during waiting
while True:
try:
return sock.recv(*args)
except error:
ex = sys.exc_info()[1]
if ex.args[0] != EWOULDBLOCK or self.timeout == 0.0:
raise
# QQQ without clearing exc_info test__refcount.test_clean_exit fails
sys.exc_clear()
self._wait(self._read_event)

recv直接调用内置模块的recv方法,如果发现该调用会阻塞,那么就调用_wait方法, 该 方法也是代码的关键部分.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
def _wait(self, watcher, timeout_exc=timeout('timed out')):
"""Block the current greenlet until *watcher* has pending events.

If *timeout* is non-negative, then *timeout_exc* is raised after *timeout* second has passed.
By default *timeout_exc* is ``socket.timeout('timed out')``.

If :func:`cancel_wait` is called, raise ``socket.error(EBADF, 'File descriptor was closed in another greenlet')``.
"""
assert watcher.callback is None, 'This socket is already used by another greenlet: %r' % (watcher.callback, )
if self.timeout is not None:
timeout = Timeout.start_new(self.timeout, timeout_exc, ref=False)
else:
timeout = None
try:
self.hub.wait(watcher)
finally:
if timeout is not None:
timeout.cancel()

根据注释我们知道_wait方法会使当前的协程暂停,直到watcher监听的事件就绪. 代码的 关键部分是
self.hub.wait(watcher), 这个方法在上面已经分析过,只要明白它会阻 塞当前的协程切换到hub协程,
而如果watcher监听的事件就绪,它又会切换会当前协程,
在recv的例子中,一旦wacher监听的事件就绪也就意味着socket已经处于读就绪状态,所
以也就可以调用内置的socket模块的recv方法来获得数据了.

timeout模块

该模块实现了一个超时机制, 它先挂起当前的协程, 当指定的时间到了之后,它会切换 到该协程,并且在该协程中抛出异常. 这样就实现了挂起协程的目的

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
class Timeout(BaseException):
def __init__(self, seconds=None, exception=None, ref=True, priority=-1):
self.seconds = seconds
self.exception = exception
self.timer = get_hub().loop.timer(seconds or 0.0, ref=ref, priority=priority)

def start(self):
"""Schedule the timeout."""
assert not self.pending, '%r is already started; to restart it, cancel it first' % self
if self.seconds is None: # "fake" timeout (never expires)
pass
elif self.exception is None or self.exception is False or isinstance(self.exception, string_types):
# timeout that raises self
self.timer.start(getcurrent().throw, self)
else: # regular timeout with user-provided exception
self.timer.start(getcurrent().throw, self.exception)

@classmethod
def start_new(cls, timeout=None, exception=None, ref=True):
if isinstance(timeout, Timeout):
if not timeout.pending:
timeout.start()
return timeout
timeout = cls(timeout, exception, ref=ref)
timeout.start()
return timeout

@property
def pending(self):
"""Return True if the timeout is scheduled to be raised."""
return self.timer.pending or self.timer.active

def cancel(self):
"""If the timeout is pending, cancel it. Otherwise, do nothing."""
self.timer.stop()

先看__init__, 它为实例创建了如下属性:

  1. seconds: 超时的秒数, 如果为None,那么永不超时
  2. exception: 超时抛出的异常,如果为None,那么就抛出self本身
  3. timer: 一个timer watcher

在来看start, 它分为三种情况:

  1. self.second为None: 那么直接pass, 这就意味者timer没有注册到时间循环中,所 以也就永远不会超时
  2. self.exception为None: 它会将 getcurrent().throw 注册为timer的回调函数,
    我们知道协程对象的throw方法和switch是相似的,都会切换到对应协程,只是throw
    在切换到对应协程后会立刻将它的参数作为异常抛出, 所以一旦超时,那么就会切 换到当前协程,然后抛出self
  3. self.exception不为None, 和2相似,只是超时会抛出self.exception而不是self本 身.

start_new是一个包装函数, 正常情况下你要先创建一个timeout实例,然后调用该实例 的start方法,
现在你只需要调用这个方法它就会把这两步一起搞定.

timeout使用指南

一般情况下timeout都是这样使用的

1
2
3
4
5
6
timeout = Timeout(seconds, exception)
timeout.start()
try:
... # exception will be raised here, after *seconds* passed since start() call
finally:
timeout.cancel()

最开始的两行可以用Timeout.start_new代替, 在try中间我们一般会切换到其它的协 程,
当超时后会自动切换回来,并且抛出异常,这样try就可以捕捉到了.来看看一个更
具体的例子,event.py中的例子:

1
2
3
4
5
6
7
8
9
10
timer = Timeout.start_new(timeout)
try:
try:
result = self.hub.switch()
assert result is self, 'Invalid switch into Event.wait(): %r' % (result, )
except Timeout as ex:
if ex is not timer:
raise
finally:
timer.cancel()

很显然的例子.

Event

该模块的Event实现了协程间的通知机制, 也就是一个协程可以唤醒监听该event的所 有协程.

Event使用指南

在一个协程中创建event对象,并调用该对象的wait方法,这样该协程就会阻塞,直到另 外一个协程调用了该event对象的set方法,代码如下:

1
2
3
4
5
6
# greenlet1
evt = Event()
evt.wait() # block until other greenlets invoke evt.set()

# greenlet2
evt.set()

AsycResult

queue

channel

channel使用指南

和go语言的channel类似,只是没有缓存也没有类型信息,如果要缓存,那么可能queue 更合适

1
2
3
4
5
6
# greenlet1
chan = Channel()
val = chan.get()

# greenlet2
chan.put(val)

使用方法也是两个协程配合, 一个读一个写,如果channel未就绪,那么相应的读或者 写就会阻塞执行该操作的的这个协程.

其它

在Greenlet类的join函数中有如下代码:

numberLines
1
2
3
4
switch = getcurrent().switch
self.rawlink(switch)

result = self.parent.switch()

rawlink的作用是注册一个函数,这个函数会在这个greenlet运行完成后调用

第三方库

gevent不像go一样是官方内置的,所以有些时候和第三方库配合会有一些问题,总的来说 python写成的库可以直接monkey
patch,C写成的库可以直接用豆瓣开源的greenify来打 patch。

greenify

这个库可以将C扩展打patch,这样可以让他们兼容gevent,它直接工作于二进制文件这一级,
不需要你修改C扩展代码,目前只支持ELF文件格式。他会自动的对网络相关的代码来patch。

PyMongo

有几个注意事项。

  1. 只初始化一个 pymongo.Connection 对象,最好把这个弄成一个模块级或者全局变量,
    库的内部由pool,所以你不用操心。
  2. 至少要monkey patch掉socket和threading模块
  3. 要调用 end_request 来将连接归还到pool中。

本博客所有文章除特别声明外,均采用 CC BY-SA 4.0 协议 ,转载请注明出处!