Python 系列学习十九:Twisted 之 Deferred

前言

打算写一系列文章来记录自己学习 Python 3 的点滴;

本文作为 Twisted 的先导篇,来了解 Twisted 的 Deferred 机制;

Twisted 算是最早支持 Linux poll 和 epoll 特性的异步编程框架,比 Java NIO 早了好些年推出,其核心是通过 Linux IO 的异步事件机制,通过 Selector 和 Event 的方式通过单线程来同时处理多个 I/O 并发请求;通过 Twisted 我们可以构建基于 NIO 后者 AIO 支持海量并发请求的服务器;异步请求虽然高效,但难点在于如何优雅的处理回调,于是 Twisted 退出了 Deferred 机制,该机制就是通过事件的方式,通知 Application 某个异步请求执行完毕,可以接着继续执行下一个异步请求了,它的作用非常类似于 Javascript 用来处理异步回调的 Promise 框架,不过 Javascript Promise 的实现应该是借鉴了 Twisted Deferred 的思想,所以两者大同小异,如出一辙;

本文不打算对 Twisted 的内部机制进行探讨,仅仅对 Deferred 在功能上进行梳理;

定义

Twisted 利用 Deferred 对象来处理它的异步回调,使用 Deferred 来封装异步 I/O 事件,当某个异步 I/O 事件有响应以后,便会通过 Deferred 对象中所绑定的回调事件来依次触发其相应的回调方法,不过要注意的是,一个 Deferred 对象通常会依次绑定多个回调方法构成一个回调方法链,回调过程中会依次的挨个调用这些回调方法,同时回调方法本身还可以构建并返回另一个 Deferred 对象,进而进行另一次的异步回调链;

上述便是 Twisted 定义 Deferred 的初衷,但是 Deferred 作为一个独立的框架,它其实是可以独立于 Twisted 使用的,它甚至可以 Defered 一个同步的调用;所以,在开始深入 Defered 以前,要明白 Defered 本身只是处理异步方法回调之后如何优雅的处理其返回结果的一个框架,它本身并不是一个异步执行程序,也不具备任何异步的特性,所以,如果想把一个本身就是同步的代码块作为 callback 注册到 Deferred 对象中,执行过程中,该同步代码块依然是同步代码块,这点和 Javascript 的 Promise 框架是一摸一样的;

回调

callback 是 Deferred 的核心,无论是异步执行成功还是失败,都会都会通过注册到 Deferred 对象中的 callback 方法进行回调处理;Deferred 包含两种回调方式,一种是成功的回调方式,通过 Deferred.callback(result) 触发执行,一种是失败的回调,通过 Deferred.errback(err) 触发执行;下面笔者分别就这两种方式进行描述;

callback

官方 API: https://twistedmatrix.com/documents/18.4.0/api/twisted.internet.defer.Deferred.html#callback

1
2
def callback(self, result):
pass
  • 参数 result
    异步调用成功以后,将其结果作为参数调用 callback 方法,一旦触发 callback 方法,将会依次调用回调方法链(processing chain)中的回调方法(通过 addCallback 方法注入的),并且在调用过程中将 result 做为第一个回调方法的参数传入;在回调方法的调用过程中,如果某个方法返回了 twisted.python.failure.Failure 或者抛出了一个 Exception,那么会进入 errback 调用链的过程中执行;
  • 异常 AlreadyCalledError
    表示该方法已经调用过了;

该方法的作用就是,接收异步调用的结果,并触发依次回调调用链中的方法;看一个例子,

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
from twisted.internet import reactor, defer

def getDummyData(inputData):
"""
This function is a dummy which simulates a delayed result and
returns a Deferred which will fire with that result. Don't try too
hard to understand this.
"""
print('getDummyData called')
deferred = defer.Deferred()
# simulate a delayed result by asking the reactor to fire the
# Deferred in 2 seconds time with the result inputData * 3
reactor.callLater(2, deferred.callback, inputData * 3)
return deferred

def cbPrintData(result):
"""
Data handling function to be added as a callback: handles the
data by printing the result
"""
print('Result received: {}'.format(result))

deferred = getDummyData(3)
deferred.addCallback(cbPrintData)

# manually set up the end of the process by asking the reactor to
# stop itself in 4 seconds time
reactor.callLater(4, reactor.stop)
# start up the Twisted reactor (event loop handler) manually
print('Starting the reactor')
reactor.run()
  1. 代码第 10 行,初始化一个 Deferred 对象 deferred;
  2. 代码第 13 行,这里通过 reactor.callLater 方法模拟异步调用,

    1
    reactor.callLater(2, deferred.callback, inputData * 3)

    其含义是,当 reactor 启动后两秒,将 inputData * 3 作为调用参数调用方法 deferred.callback 方法,就相当于执行

    1
    deferred.callback(inputData * 3)

    一旦开始执行,便会调用 processing chain 中的回调方法,调用过程中,参数 inputData * 3 将始终作为每个回调方法的第一个参数进行回调

  3. 代码第 24 行,往 processing chain 中添加第一个回调方法,cbPrintData
  4. 代码第 28 行,当 reactor 开始执行的 4 秒钟以后,关闭 reactor;记住,当 processing chain 中的所有回调都结束以后,记得一定要关闭 reactor,否则,reactor 将会占用系统的 socket 资源;
  5. 代码第 31 行,开始执行;

errback

Deferred.errback(self, fail=None) 方法表示异步调用过程失败或者有异常发生,通过 errback 触发失败调用链的回调方法,回调过程中,参数 fail 将会作为第一个回调方法的第一个参数进行调用;来看一个例子,

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
from twisted.internet import reactor, defer

class Getter:
def gotResults(self, x):
"""
The Deferred mechanism provides a mechanism to signal error
conditions. In this case, odd numbers are bad.

This function demonstrates a more complex way of starting
the callback chain by checking for expected results and
choosing whether to fire the callback or errback chain
"""
if self.d is None:
print("Nowhere to put results")
return

d = self.d
self.d = None
if x % 2 == 0:
d.callback(x*3)
else:
d.errback(ValueError("You used an odd number!"))

def _toHTML(self, r):
"""
This function converts r to HTML.

It is added to the callback chain by getDummyData in
order to demonstrate how a callback passes its own result
to the next callback
"""
return "Result: %s" % r

def getDummyData(self, x):
"""
The Deferred mechanism allows for chained callbacks.
In this example, the output of gotResults is first
passed through _toHTML on its way to printData.

Again this function is a dummy, simulating a delayed result
using callLater, rather than using a real asynchronous
setup.
"""
self.d = defer.Deferred()
# simulate a delayed result by asking the reactor to schedule
# gotResults in 2 seconds time
reactor.callLater(2, self.gotResults, x)
self.d.addCallback(self._toHTML)
return self.d

def cbPrintData(result):
print(result)

def ebPrintError(failure):
import sys
sys.stderr.write(str(failure))

# this series of callbacks and errbacks will print an error message
g = Getter()
d = g.getDummyData(3)
d.addCallback(cbPrintData)
d.addErrback(ebPrintError)

# this series of callbacks and errbacks will print "Result: 12"
g = Getter()
d = g.getDummyData(4)
d.addCallback(cbPrintData)
d.addErrback(ebPrintError)

reactor.callLater(4, reactor.stop)
reactor.run()
  1. 代码第 19 - 22 行,如果是偶数,则调用 callback 触发正常回调链中的方法并依次执行,如果是奇数,则调用 errback 方法触发异常回调链中的方法

  2. 代码第 17 - 18 行,这里告诉我们一个变成的良好习惯(Best Practice),就是当得到异步执行结果以后,将实例对象 self.deferred 赋值给一个局部变量 d 以后,再将 self.deferred 设置为 None,这样做的目的有二,一是,避免 deferred 被重复触发两次,因为有时候当某个 socket 或者 I/O 结束以后,epoll 或者 poll 有可能会被触发两次,二是,可以告诉 Python 垃圾回收器该 deferred 对象可以回收了;

  3. 代码第 59 - 68 行,
    首先,回答自己的一个疑惑,两个不同的 deferred 对象在 reactor 开始执行之前都使用同一个变量 d 来注册 callback 和 errback 回调方法,那后面的变量 d 所代表的 deferred 对象岂不是就覆盖了之前的变量 d 所代表的 deferred 对象以及它的相关配置?咋一看,应该是会覆盖的,但是其实,代码第 60 行的变量 d 引用的其实是 reactor 对象的 deferred 队列中的某个 deferred 对象,建设叫做 deferred_123,而同理,代码第 66 行的变量 d 引用的同样是 reactor deferred 队列中的某一个 deferred 对象,假设为 deferred_456,由此,虽然都是用同一个变量 d 来表示,但其实引用的是两个不同 deferred 对象,因此互不影响。
    其次,这里不仅仅注册了一个 cbPrintData 的回调方法,从 getDummyData 方法中可以看到,首先,注册了一个 _toHTML 回调方法,见代码第 48 行,然后才是 cbPrintData 方法;
    最后,要注意的是这里的异常处理回调方法的注册,使用 addErrback 方法注入用于失败处理的回调方法 ebPrintError

    1
    2
    3
    def ebPrintError(failure):
    import sys
    sys.stderr.write(str(failure))

    代码 22 行通过 errback 注入的 ValueError 异常参数将作为此方法的参数进行回调;

异常处理

特性

  1. 当异步执行结束,如果调用的是 Deferred.errback 方法,那么开始调用异常调用链中的回调方法;可以理解为这种调用的方式等价于执行的就是 except;
  2. 除非你显式的在异常回调链的方法中返回一个 twisted.python.failure.Failure 对象或者抛出一个新的异常,否则异常回调链中的(后续)方法将会停止(stops propagating),并且,注意了,将会跳转到正常的回调方法链中并继续被执行;其实该逻辑和 except 是一样的,如果你不再在 except 代码块中抛出一个异常,那么就会认为该异常已经被捕获并被成功处理了,所以,会继续执行后续的正常代码块;所以,这里需要特别注意的是,如果你期望后续的异常回调方法继续被执行,当前的异常回调一定要返回一个 Failure 或者新抛出一个异常啥的;不过要注意一个特别的场景,当在 except 代码块中返回一个 None,那么会终止后续的正常代码块的执行,但是在 Deferred 的异常回调方法可不这样,它依然会认为你已经处理了该异常并且没有其它任何问题了,依然会进入正常的回调链中开始执行;

例子

假设,在正常的 Python 代码中,我们有如下的处理异常的代码块,

1
2
3
4
5
6
try:
# code that may throw an exception
cookSpamAndEggs()
except (SpamException, EggException):
# Handle SpamExceptions and EggExceptions
...

那么上述的代码块在 Deferred 中该如何处理呢?

1
2
3
4
5
6
def errorHandler(failure):
failure.trap(SpamException, EggException)
# Handle SpamExceptions and EggExceptions

d.addCallback(cookSpamAndEggs)
d.addErrback(errorHandler)

在正常的回调方法 cookSpamAndEggs 注册完成以后,立刻注册一个异常处理的回调方法,这样,当 cookSpamAndEggs 中抛出的任何异常,都将会被 errorHandler 回调方法执行,要注意的是,errorHandler 中的 failure 是一个 twisted.python.failure.Failure 对象实例,可以通过该实例的 trap 方法捕获到想要的异常,不过上面的方法没有补全,如果写全了,大概会是下面这个样子,

1
2
3
4
5
def errorHandler(failure):
err = failure.trap(SpamException, EggException)
# Handle SpamExceptions and EggExceptions
log.err(str(err))
# save the error into database..

最佳实践

虽然有时候,当你确信自己的回调方法不会抛出任何异常,也没有任何的失败结果需要处理;但是你并不能确保 Deferred 自己抛出异常,所以最好的实践方式是,无论你自己的代码是否有异常或者需要处理失败结果与否,最好都添加上下面这段异常捕获的代码,

1
2
3
# Make sure errors get logged
from twisted.python import log
d.addErrback(log.err)

这样,无论何种情况,任何原因导致的异常,都会被捕获并被记录;

处理异常的其它方式

除了使用上述标准的方式处理异常以外,Deferred 还提供了 callbacks 方法能够注入异常回调方法,比如,

1
2
3
d = getDeferredFromSomewhere()
d.addCallbacks(callback1, errback1) # C
d.addCallbacks(callback2, errback2)

这样,当 getDeferredFromSomewhere() 方法中遇到了异常的情况会调用 errback1 和 errback2;那么,它和下面的这种常规的调用方式有什么异同呢?

1
2
3
4
5
d = getDeferredFromSomewhere()
d.addCallback(callback1) # A
d.addErrback(errback1) # B
d.addCallback(callback2)
d.addErrback(errback2)

区别是,errback1 不仅会处理 getDeferredFromSomewhere() 所抛出的异常,同时会处理 callback1 中所抛出的异常;而如果使用 callbacks 的方式,那么 errback1 则只会处理 getDeferredFromSomewhere() 中所抛出的异常;

将同步代码块封装为异步执行

很多应用场景需要我们使用到多线程的方式,来将同步的代码块利用多线程的方式改为并发执行;这个时候,我们可以利用 Deferred 来优雅的帮我们处理多线程的异步回调过程;

  1. 首先我们来看一个使用 Deferred 封装同步代码块的例子

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    def largeFibonnaciNumber():
    # create a Deferred object to return:
    d = defer.Deferred()

    # calculate the ten thousandth Fibonnaci number

    first = 0
    second = 1

    for i in range(TARGET - 1):
    new = first + second
    first = second
    second = new
    if i % 100 == 0:
    print("Progress: calculating the %dth Fibonnaci number" % i)

    # give the Deferred the answer to pass to the callbacks:
    d.callback(second)

    # return the Deferred with the answer:
    return d

    import time

    timeBefore = time.time()

    # call the function and get our Deferred
    d = largeFibonnaciNumber()

    timeAfter = time.time()

    print("Total time taken for largeFibonnaciNumber call: %0.3f seconds" % \
    (timeAfter - timeBefore))

    # add a callback to it to print the number

    def printNumber(number):
    print("The %dth Fibonacci number is %d" % (TARGET, number))

    print("Adding the callback now.")

    d.addCallback(printNumber)

    虽然在方法 largeFibonnaciNumber() 中我们返回的是一个 Deferred 对象,但是要注意的是,largeFibonnaciNumber() 方法中用来计算斐波那奇数字的那段比较耗时的代码依然是同步的,既代码 3 - 15 行的代码依然是同步的;这就是一个使用 Deferred 代码封装同步代码块的典型例子;但是一般情况下,如果不是为了兼容一些旧的接口,我们不推荐使用 Deferred 去封装一些同步的调用,于是,我们可以通过下面的方式将上面的代码块封装为多线程已达到异步执行的目的;

  2. 将上面的同步代码封装为异步执行

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    from twisted.internet import threads, reactor

    def largeFibonnaciNumber():
    """
    Represent a long running blocking function by calculating
    the TARGETth Fibonnaci number
    """
    TARGET = 10000

    first = 0
    second = 1

    for i in range(TARGET - 1):
    new = first + second
    first = second
    second = new

    return second

    def fibonacciCallback(result):
    """
    Callback which manages the largeFibonnaciNumber result by
    printing it out
    """
    print("largeFibonnaciNumber result =", result)
    # make sure the reactor stops after the callback chain finishes,
    # just so that this example terminates
    reactor.stop()

    def run():
    """
    Run a series of operations, deferring the largeFibonnaciNumber
    operation to a thread and performing some other operations after
    adding the callback
    """
    # get our Deferred which will be called with the largeFibonnaciNumber result
    d = threads.deferToThread(largeFibonnaciNumber)
    # add our callback to print it out
    d.addCallback(fibonacciCallback)
    print("1st line after the addition of the callback")
    print("2nd line after the addition of the callback")

    if __name__ == '__main__':
    run()
    reactor.run()

    上面代码的核心在第 37 行,利用 threads.deferToThread 将同步方法 largeFibonnaciNumber() 方法封装为了一个线程进行异步执行,并且直接返回一个 Deferred 对象,largeFibonnaciNumber() 方法返回的值 second 既作为该 Deferred 对象的第一个 callback,如果该 callback 直接是一个值的话,那么 Deferred 将会把该值直接赋值给回调链中的下一个回调方法,既是 fibonacciCallback() 方法;这样,我们便完成了将一个同步代码块变为异步执行,并通过 Deferred 框架优雅的处理其异步回调结果的方法;注意上述代码第 28 行,如果所有的回调事件都已经结束,需要显式的关闭 reactor 以释放系统的 socket 资源占用;

想想,我们经常使用到 requests 来同步的处理 HTTP 请求,但 requests 的操作都是同步执行,试想,如果我们需要同时并发的下载多个大文件呢?难道一个接一个的下载吗?这就是上面的代码可以派上用场的地方了,将同步变为异步,并通过 Deferred 来优雅的处理其异步的回调结果;

Reference

历史:http://blog.csdn.net/hanhuili/article/details/9389433

http://krondo.com/an-introduction-to-asynchronous-programming-and-twisted/

官网,Deferred object 简介: https://twistedmatrix.com/documents/current/core/howto/defer.html
官网,generate deferer object: https://twistedmatrix.com/documents/current/core/howto/gendefer.html
官网: https://twistedmatrix.com/documents/14.0.2/core/howto/python3.html