Deferred 简介

本文档介绍了 Deferred,Twisted 用于控制异步代码流的首选机制。如果您还不了解这意味着什么,请不要担心 - 您来对地方了!

它面向 Twisted 的新手,特别是为了帮助人们阅读和理解已经使用 Deferred 的代码而编写。

本文档假设您精通 Python。它不假设您了解 Twisted。

在阅读完本文档后,您应该了解 Deferred 是什么以及如何使用它们来协调异步代码。特别是,您应该能够

  • 阅读和理解使用 Deferred 的代码

  • 在同步代码和异步代码之间进行转换

  • 实现您想要的任何类型的异步代码错误处理

顺序的乐趣

当您编写 Python 代码时,一个普遍的、深刻的、不可动摇的假设是,代码块中的一行代码只有在前面的代码执行完毕后才会执行。

pod_bay_doors.open()
pod.launch()

舱门打开,然后才发射飞船。这很好。逐行执行是语言中内置的机制,用于编码执行顺序。它清晰、简洁且明确。

异常使事情变得更加复杂。如果 pod_bay_doors.open() 抛出异常,那么我们无法确定它是否已完成,因此贸然执行下一行代码是错误的。因此,Python 为我们提供了 tryexceptfinallyelse,它们共同模拟了处理抛出异常的几乎所有可能的方式,并且效果很好。

函数应用是编码执行顺序的另一种方式

pprint(sorted(x.get_names()))

首先调用 x.get_names(),然后使用其返回值调用 sorted,然后使用 sorted 的返回值调用 pprint

它也可以写成

names = x.get_names()
sorted_names = sorted(names)
pprint(sorted_names)

有时它会导致我们在不需要的时候编码顺序,例如在这个例子中

from __future__ import print_function

total = 0
for account in accounts:
    total += account.get_balance()
print("Total balance ${}".format(total))

但这通常不是什么大问题。

总而言之,情况还不错,上面所有的解释都是对熟悉且明显的事实的阐述。一行代码接一行代码,一件事接一件事,这两个事实是密不可分的。

但如果我们必须以不同的方式来做呢?

一个假设的问题

如果我们不能再依赖于前面的代码行在开始解释和执行下一行代码之前完成(无论这意味着什么)呢?如果 pod_bay_doors.open() 立即返回,并在其他地方触发一些东西,最终打开舱门,使 Python 解释器不顾一切地进入 pod.launch() 呢?

也就是说,如果执行顺序与 Python 代码行的顺序不匹配怎么办?如果“返回”不再意味着“完成”呢?

异步操作?

我们如何防止飞船撞向仍然关闭的舱门?我们如何应对舱门可能无法打开的情况?如果打开舱门会提供一些我们在发射飞船时需要的关键信息怎么办?我们如何获取这些信息?

而且,至关重要的是,由于我们正在编写代码,我们如何编写代码,以便能够在其基础上构建其他代码?

解决方案的组成部分

我们仍然需要一种方法来表达“只有在完成之后才做”。

我们需要一种方法来区分成功完成和中断处理,通常使用 tryexceptelsefinally 来模拟。

我们需要一种机制来从刚刚执行的代码中获取返回失败和异常信息,并将其传递给需要执行的下一段代码。

我们需要以某种方式能够对我们还没有的结果进行操作。我们不需要采取行动,而是需要制定和编码计划,以便在我们可以采取行动时采取行动。

除非我们以某种方式修改解释器,否则我们需要使用提供的 Python 语言结构来构建它:方法、函数、对象等等。

也许我们想要一些看起来像这样的东西

placeholder = pod_bay_doors.open()
placeholder.when_done(pod.launch)

一个解决方案:Deferred

Twisted 使用 Deferred 来解决这个问题,这是一种专门用于执行一项操作的对象,并且只执行一项操作:将执行顺序与 Python 源代码中的行顺序分开编码。

它不处理线程、并行、信号或子进程。它不知道事件循环、greenlets 或调度。它只知道执行顺序。它是怎么知道的?因为我们明确地告诉它我们想要的顺序。

因此,我们不写

pod_bay_doors.open()
pod.launch()

我们写

d = pod_bay_doors.open()
d.addCallback(lambda ignored: pod.launch())

这在几行代码中引入了十几个新概念,所以让我们分解一下。如果你认为你已经理解了,你可能想跳到下一节。

这里,pod_bay_doors.open() 返回一个 Deferred,我们将其分配给 d。我们可以将 d 视为一个占位符,代表 open() 最终完成时将返回的值。

为了“接下来做这个”,我们在 d 上添加一个回调。回调是一个函数,它将使用 open() 最终返回的任何值来调用。在本例中,我们并不关心,所以我们创建一个只有一个忽略参数的函数,它只调用 pod.launch()

因此,我们用一个在 Python 中明确编码的执行顺序替换了“行顺序即执行顺序”,其中 d 代表特定的流程,而 d.addCallback 替换了“新行”。

当然,程序通常包含不止两行,我们仍然不知道如何处理失败。

做对:失败情况

在接下来的内容中,我们将采用在普通 Python 中表达操作顺序的每种方式(使用代码行和 try/except),并将它们转换为使用 Deferred 对象构建的等效代码。

这将有点费力,但如果你真的想了解如何使用 Deferred 并维护使用它们的代码,那么理解下面的每个示例都是值得的。

一件事,然后另一件事,然后又一件事

回想一下我们之前的示例

pprint(sorted(x.get_names()))

也可以写成

names = x.get_names()
sorted_names = sorted(names)
pprint(sorted_names)

如果 get_namessorted 都无法保证在返回之前完成呢?也就是说,如果两者都是异步操作呢?

好吧,用 Twisted 的说法,它们将返回 Deferred,所以我们会写

d = x.get_names()
d.addCallback(sorted)
d.addCallback(pprint)

最终,sorted 将使用 get_names 最终提供的任何内容来调用。当 sorted 完成时,pprint 将使用它提供的任何内容来调用。

我们也可以写成

x.get_names().addCallback(sorted).addCallback(pprint)

因为 d.addCallback 返回 d

简单的错误处理

我们经常想写等效于此代码的代码

try:
    x.get_names()
except Exception as e:
    report_error(e)

我们如何用 Deferred 来写?

d = x.get_names()
d.addErrback(report_error)

errback 是 Twisted 中对在收到错误时调用的回调的称呼。

这忽略了一个重要的细节。report_error 不会得到异常对象 e,而是会得到一个 Failure 对象,它包含 e 中所有有用的信息,但针对 Deferred 进行了优化。

我们将在处理完所有其他异常组合之后再深入研究它。

处理错误,但在成功时做其他事情

如果我们想在 try 块成功后做一些事情呢?放弃我们人为的例子,使用通用的变量名,我们得到

try:
    y = f()
except Exception as e:
    g(e)
else:
    h(y)

好吧,我们会用 Deferred 这样写

d = f()
d.addCallbacks(h, g)

其中 addCallbacks 表示“同时添加回调和 errback”。h 是回调,g 是 errback。

现在我们有了 addCallbacks 以及 addErrbackaddCallback,我们可以通过改变调用它们的顺序来匹配 tryexceptelsefinally 的任何可能组合。解释它的确切工作原理很棘手(尽管 Deferred 参考 做得相当不错),但一旦我们完成了所有示例,它应该会更清楚。

处理错误,然后继续执行

如果我们想在 try/except 块之后做一些事情,无论是否发生异常呢?也就是说,如果我们想做等效于此通用代码的事情呢

try:
    y = f()
except Exception as e:
    y = g(e)
h(y)

Deferred,它看起来像这样

d = f()
d.addErrback(g)
d.addCallback(h)

或者,更简洁地说

f().addErrback(g).addCallback(h)

因为 addErrback 返回 d,所以我们可以像这样链接调用

addErrbackaddCallback 的顺序很重要。在下一节中,我们可以看到交换它们会发生什么。

为整个操作处理错误

try:
    y = f()
    z = h(y)
except Exception as e:
    g(e)

如果我们想在一个异常处理程序中包装一个多步骤操作呢?

d = f()
d.addCallback(h)
d.addErrback(g)

使用 Deferred,它看起来像这样

d = f().addCallback(h).addErrback(g)

或者,更简洁地说

无论如何都要做点什么

try:
    y = f()
finally:
    g()

那么 finally 呢?我们如何无论是否发生异常都做一些事情?我们如何翻译这个

d = f()
d.addBoth(g)

好吧,大致上我们这样做

d.addCallbacks(g, g)

这将 g 添加为回调和 errback。它等效于

为什么是“大致上”?因为如果 f 抛出异常,g 将传递一个 Failure 对象,代表该异常。否则,g 将传递 f() 的返回值的异步等效项(即 y)。

使用 async/await 的协程

注意

版本 16.4 中的新增功能。

使用 async/await 的协程

Python 3.5 引入了 PEP 492(“使用 async 和 await 语法的协程”)和原生协程。 Deferred.fromCoroutine 允许你使用 async def 语法和 await 在 Deferred 上编写协程,类似于 inlineCallbacks。你不需要像使用 inlineCallbacksyield Deferred 的函数那样,对每个可能 await Deferred 的函数进行装饰(因为你将使用 inlineCallbacks 对每个可能 yield Deferred 的函数进行装饰),你只需要使用最外层的协程对象调用 fromCoroutine 来安排它执行。协程可以在运行时 await 其他协程,而无需使用此函数本身。

ensureDeferred 函数还提供了一种将协程转换为 Deferred 的方法,但它的接口类型更模糊;Deferred.fromCoroutine 旨在取代它。

import json
from twisted.internet.defer import Deferred
from twisted.logger import Logger
log = Logger()

async def getUsers():
    try:
        return json.loads(await makeRequest("GET", "/users"))
    except ConnectionError:
        log.failure("makeRequest failed due to connection error")
        return []

def do():
    d = Deferred.fromCoroutine(getUsers())
    d.addCallback(print)
    return d

在编写协程时,如果你的协程调用了其他协程,而这些协程等待 Deferred,你不需要使用 Deferred.fromCoroutine;你可以直接使用 await。例如

async def foo():
    res = await someFunctionThatReturnsADeferred()
    return res

async def bar():
    baz = await someOtherDeferredFunction()
    fooResult = await foo()
    return baz + fooResult

def myDeferredReturningFunction():
    coro = bar()
    return Deferred.fromCoroutine(coro)

尽管两个协程都使用了 Deferred,但只有 bar 需要用 Deferred.fromCoroutine 包装才能返回一个 Deferred。

内联回调 - 使用 ‘yield’

使用 async/await 的协程

除非你的代码支持 Python 2(因此需要与旧版本的 Twisted 兼容),否则使用“使用 async/await 的协程”中描述的功能编写协程比使用 inlineCallbacks 更可取。协程由专门的 Python 语法支持,与 asyncio 兼容,并提供更高的性能。

Twisted 提供了一个名为 inlineCallbacks 的装饰器,它允许你使用 Deferred 而不必编写回调函数。

这是通过将你的代码编写为生成器来实现的,生成器会 *yield* Deferred,而不是附加回调。

考虑以下用传统的 Deferred 样式编写的函数

def getUsers():
   d = makeRequest("GET", "/users")
   d.addCallback(json.loads)
   return d

使用 inlineCallbacks,我们可以这样编写:

from twisted.internet.defer import inlineCallbacks, returnValue

@inlineCallbacks
def getUsers(self):
    responseBody = yield makeRequest("GET", "/users")
    returnValue(json.loads(responseBody))

这里发生了一些事情

  1. 我们没有在 makeRequest 返回的 Deferred 上调用 addCallback,而是 *yield* 了它。这会导致 Twisted 将 Deferred 的结果返回给我们。

  2. 我们使用 returnValue 来传播函数的最终结果。因为这个函数是一个生成器,我们不能使用 return 语句;那样会是语法错误。

使用 async/await 的协程

版本 15.0 中的新功能。

在 Python 3 中,你可以使用 return json.loads(responseBody) 来代替 returnValue(json.loads(responseBody))。这在可读性方面有很大的优势,但不幸的是,如果你需要与 Python 2 兼容,这将不可行。

两个版本的 getUsers 对调用者呈现完全相同的 API:两者都返回一个 Deferred,该 Deferred 会触发请求的解析后的 JSON 主体。虽然 inlineCallbacks 版本看起来像同步代码,它在等待请求完成时会阻塞,但每个 yield 语句都允许其他代码在等待被 yield 的 Deferred 触发时运行。

inlineCallbacks 在处理复杂的控制流和错误处理时变得更加强大。例如,如果 makeRequest 由于连接错误而失败怎么办?为了这个例子,假设我们想要记录异常并返回一个空列表。

def getUsers():
   d = makeRequest("GET", "/users")

   def connectionError(failure):
       failure.trap(ConnectionError)
       log.failure("makeRequest failed due to connection error",
                   failure)
       return []

   d.addCallbacks(json.loads, connectionError)
   return d

使用 inlineCallbacks,我们可以这样重写:

@inlineCallbacks
def getUsers(self):
    try:
        responseBody = yield makeRequest("GET", "/users")
    except ConnectionError:
       log.failure("makeRequest failed due to connection error")
       returnValue([])

    returnValue(json.loads(responseBody))

我们的异常处理得到了简化,因为我们可以使用 Python 熟悉的 try / except 语法来处理 ConnectionError

结论

你已经了解了异步代码,并且已经看到了如何使用 Deferred

  • 在异步操作成功完成之后执行某些操作

  • 使用成功异步操作的结果

  • 捕获异步操作中的错误

  • 如果操作成功,执行一项操作,如果失败,执行另一项操作

  • 在错误成功处理后执行某些操作

  • 用一个错误处理程序包装多个异步操作

  • 在异步操作之后执行某些操作,无论它成功还是失败

  • 使用 inlineCallbacks 编写不使用回调的代码

  • 使用 Deferred.fromCoroutine 编写与 Deferred 交互的协程

这些只是 Deferred 的一些基本用法。有关它们如何工作、如何组合多个 Deferred 以及如何编写混合同步和异步 API 的代码的详细信息,请参阅 Deferred 参考。或者,阅读有关如何编写生成 Deferred 的函数 的信息。