延迟引用

本文档是关于 twisted.internet.defer.Deferred 对象行为的指南,以及在函数返回它们时使用它们的各种方法。

本文档假设您熟悉 Twisted 框架构建的基础原理:异步、基于回调的编程,其中您不需要在程序中使用阻塞代码或使用线程来运行阻塞代码,而是使用立即返回的函数,并在数据可用时开始回调链。

阅读本文档后,读者应该能够处理 Twisted 和使用 Twisted 的代码中返回 Deferred 的大多数简单 API。

  • 当您从函数调用中获得 Deferred 时,您可以做些什么;以及

  • 如何编写代码以稳健地处理 Deferred 代码中的错误。

Deferred

Twisted 使用 Deferred 对象来管理回调序列。客户端应用程序将一系列函数附加到 Deferred,以便在异步请求的结果可用时按顺序调用这些函数(这组函数被称为一系列 **回调** 或 **回调链**),以及一系列函数,如果异步请求中出现错误,则调用这些函数(被称为一系列 **错误回调** 或 **错误回调链**)。异步库代码在结果可用时调用第一个回调,或在出现错误时调用第一个错误回调,然后 Deferred 对象将每个回调或错误回调函数的结果传递给链中的下一个函数。

回调

twisted.internet.defer.Deferred 是一个承诺,即函数将在某个时刻具有结果。我们可以将回调函数附加到 Deferred,一旦它获得结果,这些回调就会被调用。此外,Deferred 允许开发人员为错误注册回调,默认行为是记录错误。Deferred 机制标准化了应用程序程序员与各种阻塞或延迟操作的接口。

from twisted.internet import reactor, defer

def getDummyData(inputData):
    """
    This function is a dummy which simulates a delayed result and
    returns a Deferred which will fire with that result. Don't try too
    hard to understand this.
    """
    print('getDummyData called')
    deferred = defer.Deferred()
    # simulate a delayed result by asking the reactor to fire the
    # Deferred in 2 seconds time with the result inputData * 3
    reactor.callLater(2, deferred.callback, inputData * 3)
    return deferred

def cbPrintData(result):
    """
    Data handling function to be added as a callback: handles the
    data by printing the result
    """
    print('Result received: {}'.format(result))

deferred = getDummyData(3)
deferred.addCallback(cbPrintData)

# manually set up the end of the process by asking the reactor to
# stop itself in 4 seconds time
reactor.callLater(4, reactor.stop)
# start up the Twisted reactor (event loop handler) manually
print('Starting the reactor')
reactor.run()

多个回调

可以将多个回调添加到 Deferred。Deferred 的回调链中的第一个回调将使用结果调用,第二个使用第一个回调的结果调用,依此类推。为什么我们需要这样做?考虑由 twisted.enterprise.adbapi 返回的 Deferred - SQL 查询的结果。Web 小部件可能会添加一个回调,将此结果转换为 HTML,并将 Deferred 传递下去,其中回调将被 Twisted 用于将结果返回给 HTTP 客户端。如果出现错误或异常,回调链将被绕过。

from twisted.internet import reactor, defer

class Getter:
    def gotResults(self, x):
        """
        The Deferred mechanism provides a mechanism to signal error
        conditions.  In this case, odd numbers are bad.

        This function demonstrates a more complex way of starting
        the callback chain by checking for expected results and
        choosing whether to fire the callback or errback chain
        """
        if self.d is None:
            print("Nowhere to put results")
            return

        d = self.d
        self.d = None
        if x % 2 == 0:
            d.callback(x*3)
        else:
            d.errback(ValueError("You used an odd number!"))

    def _toHTML(self, r):
        """
        This function converts r to HTML.

        It is added to the callback chain by getDummyData in
        order to demonstrate how a callback passes its own result
        to the next callback
        """
        return "Result: %s" % r

    def getDummyData(self, x):
        """
        The Deferred mechanism allows for chained callbacks.
        In this example, the output of gotResults is first
        passed through _toHTML on its way to printData.

        Again this function is a dummy, simulating a delayed result
        using callLater, rather than using a real asynchronous
        setup.
        """
        self.d = defer.Deferred()
        # simulate a delayed result by asking the reactor to schedule
        # gotResults in 2 seconds time
        reactor.callLater(2, self.gotResults, x)
        self.d.addCallback(self._toHTML)
        return self.d

def cbPrintData(result):
    print(result)

def ebPrintError(failure):
    import sys
    sys.stderr.write(str(failure))

# this series of callbacks and errbacks will print an error message
g = Getter()
d = g.getDummyData(3)
d.addCallback(cbPrintData)
d.addErrback(ebPrintError)

# this series of callbacks and errbacks will print "Result: 12"
g = Getter()
d = g.getDummyData(4)
d.addCallback(cbPrintData)
d.addErrback(ebPrintError)

reactor.callLater(4, reactor.stop)
reactor.run()

注意

请特别注意 gotResults 方法中 self.d 的处理方式。在 Deferred 使用结果或错误触发之前,该属性被设置为 None,因此 Getter 实例不再引用即将触发的 Deferred。这有几个好处。首先,它避免了 Getter.gotResults 意外地多次触发同一个 Deferred 的任何可能性(这会导致 AlreadyCalledError 异常)。其次,它允许 Deferred 上的回调调用 Getter.getDummyData(它为 d 属性设置一个新值),而不会造成问题。第三,它通过消除引用循环,使 Python 垃圾收集器的工作更容易。

视觉解释

../../_images/deferred-attach.png
  1. 请求方法(数据接收器)请求数据,获得 Deferred 对象。

  2. 请求方法将回调附加到 Deferred 对象。

../../_images/deferred-process.png
  1. 当结果准备就绪时,将其传递给 Deferred 对象。如果操作成功,则使用 .callback(result),如果失败,则使用 .errback(failure)。请注意,failure 通常是 twisted.python.failure.Failure 实例。

  2. Deferred 对象使用 resultfailure 触发先前添加的(调用/错误)回调。然后,执行遵循以下规则,沿着要处理的回调链向下进行。

    • 回调的结果始终作为第一个参数传递给下一个回调,从而创建一系列处理器。

    • 如果回调引发异常,则切换到错误回调。

    • 未处理的错误会沿着错误回调链向下传递,这创建了异步模拟,类似于一系列 except: 语句。

    • 如果错误回调没有引发异常或返回 twisted.python.failure.Failure 实例,则切换到回调。

错误回调

Deferred 的错误处理是根据 Python 的异常处理模型进行的。如果没有任何错误发生,所有回调都会按顺序运行,如上所述。

如果错误回调被调用而不是回调(例如,因为 DB 查询引发了错误),则 twisted.python.failure.Failure 会被传递给第一个错误回调(您可以添加多个错误回调,就像回调一样)。您可以将错误回调视为普通 Python 代码中的 except 块。

除非你在 except 块中显式地 raise 错误,否则 Exception 将被捕获并停止传播,正常执行将继续。errback 也是一样:除非你显式地 return 一个 Failure 或(重新)引发异常,否则错误将停止传播,正常的回调将从该点继续执行(使用从 errback 返回的值)。如果 errback 返回一个 Failure 或引发异常,那么它将被传递给下一个 errback,依此类推。

注意:如果 errback 没有返回任何东西,那么它实际上返回了 None,这意味着回调将在该 errback 之后继续执行。这可能不是你期望发生的事情,所以要小心。确保你的 errback 返回一个 Failure(可能是传递给它的那个),或者为下一个回调返回一个有意义的返回值。

此外,twisted.python.failure.Failure 实例有一个有用的方法叫做 trap,它允许你有效地执行等效于

try:
    # code that may throw an exception
    cookSpamAndEggs()
except (SpamException, EggException):
    # Handle SpamExceptions and EggExceptions
    ...

你可以通过以下方式做到这一点

def errorHandler(failure):
    failure.trap(SpamException, EggException)
    # Handle SpamExceptions and EggExceptions

d.addCallback(cookSpamAndEggs)
d.addErrback(errorHandler)

如果传递给 failure.trap 的参数都不匹配封装在该 Failure 中的错误,那么它将重新引发错误。

这里还有另一个潜在的“陷阱”。有一个方法 twisted.internet.defer.Deferred.addCallbacks()addCallback 后跟 addErrback 类似,但并不完全相同。特别是,考虑以下两种情况

# Case 1
d = getDeferredFromSomewhere()
d.addCallback(callback1)       # A
d.addErrback(errback1)         # B
d.addCallback(callback2)
d.addErrback(errback2)

# Case 2
d = getDeferredFromSomewhere()
d.addCallbacks(callback1, errback1)  # C
d.addCallbacks(callback2, errback2)

如果 callback1 中发生错误,那么对于情况 1,errback1 将使用失败调用。对于情况 2,errback2 将被调用。小心你的回调和 errback。

在实际意义上,这意味着在情况 1 中,A 行的回调将处理来自 getDeferredFromSomewhere 的成功条件,而 B 行的 errback 将处理来自上游源或在 A 中发生的任何错误。在情况 2 中,C 行的 errback将只处理由getDeferredFromSomewhere 引发的错误条件,它不会处理在 callback1 中引发的任何错误。

未处理的错误

如果一个 Deferred 被垃圾回收,并且有一个未处理的错误(即,如果有下一个 errback,它将调用下一个 errback),那么 Twisted 将将错误的回溯写入日志文件。这意味着你通常可以不添加 errback 仍然可以记录错误。但要小心;如果你保留了对 Deferred 的引用,阻止它被垃圾回收,那么你可能永远不会看到错误(并且你的回调似乎从未被调用过)。如果不确定,你应该在你的回调之后显式地添加一个 errback,即使你所做的只是

# Make sure errors get logged
from twisted.python import log
d.addErrback(log.err)

处理同步或异步结果

在某些应用程序中,有些函数可能是同步的,也可能是异步的。例如,用户身份验证函数可能能够在内存中检查用户是否已通过身份验证,从而允许身份验证函数返回立即结果,或者它可能需要等待网络数据,在这种情况下,它应该返回一个 Deferred,并在数据到达时触发。但是,想要检查用户是否已通过身份验证的函数将需要接受立即结果 Deferreds。

在这个例子中,库函数 authenticateUser 使用应用程序函数 isValidUser 来验证用户

def authenticateUser(isValidUser, user):
    if isValidUser(user):
        print("User is authenticated")
    else:
        print("User is not authenticated")

但是,它假设 isValidUser 会立即返回,而 isValidUser 实际上可能异步地验证用户并返回一个 Deferred。可以将这个简单的用户身份验证代码改编为接受同步 isValidUser 或异步 isValidUser,允许库处理这两种类型的函数。但是,也可以将同步函数改编为返回 Deferreds。本节描述了两种选择:在库函数 (authenticateUser) 或应用程序代码中处理可能同步或异步的函数。

在库代码中处理可能的 Deferreds

这是一个可能传递给 authenticateUser 的同步用户身份验证函数的示例

synch-validation.py

def synchronousIsValidUser(user):
    """
    Return true if user is a valid user, false otherwise
    """
    return user in ["Alice", "Angus", "Agnes"]

但是,这是一个返回 Deferred 的 asynchronousIsValidUser 函数

from twisted.internet import reactor, defer

def asynchronousIsValidUser(user):
    d = defer.Deferred()
    reactor.callLater(2, d.callback, user in ["Alice", "Angus", "Agnes"])
    return d

我们最初的 authenticateUser 实现期望 isValidUser 是同步的,但现在我们需要改变它来处理 isValidUser 的同步和异步实现。为此,我们使用 maybeDeferred 来调用 isValidUser,确保 isValidUser 的结果是一个 Deferred,即使 isValidUser 是一个同步函数

from twisted.internet import defer

def printResult(result):
    if result:
        print("User is authenticated")
    else:
        print("User is not authenticated")

def authenticateUser(isValidUser, user):
    d = defer.maybeDeferred(isValidUser, user)
    d.addCallback(printResult)

现在 isValidUser 可以是 synchronousIsValidUserasynchronousIsValidUser

也可以修改 synchronousIsValidUser 来返回一个 Deferred,有关更多信息,请参见 生成 Deferreds

取消

动机

一个 Deferred 可能需要任何时间才能被回调;事实上,它可能永远不会被回调。你的用户可能没有那么耐心。由于 Deferred 完成时采取的所有操作都在你的应用程序或库的回调代码中,因此你始终可以选择在收到结果时简单地忽略它,如果时间太长。但是,在你忽略它的同时,Deferred 所代表的底层操作仍在后台运行,可能会消耗资源,例如 CPU 时间、内存、网络带宽,甚至可能包括磁盘空间。因此,当用户关闭窗口、点击取消按钮、断开与服务器的连接或发送“停止”网络消息时,你将希望宣布你对该操作结果的漠不关心,以便 Deferred 的发起者可以清理所有内容并释放这些资源,以便更好地利用它们。

用于消耗 Deferreds 的应用程序的取消

这是一个简单的例子。你正在使用 端点 连接到外部主机,但该主机非常慢。你希望在你的应用程序中添加一个“取消”按钮来终止连接尝试,以便用户可以尝试连接到其他主机。这是一个简单应用程序的草图,实际的用户界面留给读者练习

def startConnecting(someEndpoint):
    def connected(it):
        "Do something useful when connected."
    return someEndpoint.connect(myFactory).addCallback(connected)
# ...
connectionAttempt = startConnecting(endpoint)
def cancelClicked():
    connectionAttempt.cancel()

显然(我希望),startConnecting 应该由一些 UI 元素调用,这些元素允许用户选择要连接到的主机,然后构建一个适当的端点(可能使用 twisted.internet.endpoints.clientFromString)。然后,一个取消按钮或类似的东西被连接到 cancelClicked

connectionAttempt.cancel 被调用时,它将

  1. 导致底层连接操作被终止,如果它仍在进行中

  2. 导致 connectionAttempt Deferred 以某种方式及时完成

  3. 可能导致 connectionAttempt Deferred 被 CancelledError 错误回调

您可能会注意到,这一系列后果被非常严格地限定了。虽然取消表示调用 API *希望* 停止底层操作,但底层操作不一定能立即做出反应。即使在这个非常简单的例子中,也有一件事可能无法中断:平台原生名称解析块,因此需要在单独的线程中执行;如果连接操作卡在等待以这种方式解析名称,则无法取消它。因此,您正在取消的 Deferred 可能不会立即回调或错误回调。

Deferred 可以在其回调链中的任何点等待另一个 Deferred(参见上面的“处理...异步结果”)。回调链中的特定点无法知道是否所有操作都已完成。由于回调链的多个层可能希望取消同一个 Deferred,因此任何层都可以在任何时间调用 .cancel()。方法 .cancel() 永远不会引发任何异常或返回任何值;您可以重复调用它,即使在已经触发的 Deferred 上,或者在没有剩余回调的 Deferred 上。除了特定示例之外,所有这些限定的主要原因是,任何实例化 Deferred 的人都可以为其提供一个取消函数;该函数可以执行它想要的任何操作。理想情况下,它所做的任何事情都将用于停止您请求的操作,但无法保证所有可能被取消的 Deferred 的确切行为。Deferred 的取消是尽力而为的。这可能是由于以下几个原因:

  1. Deferred 不知道如何取消底层操作。

  2. 底层操作可能已达到不可取消的状态,因为已经执行了一些不可逆转的操作。

  3. Deferred 可能已经有了结果,因此没有可取消的内容。

调用 cancel() 始终会在没有错误的情况下成功,无论取消是否可能。在情况 1 和 2 中,Deferred 可能会用 twisted.internet.defer.CancelledError 错误回调,而底层操作继续进行。支持取消的 Deferred 应该记录它们在被取消时会做什么,如果它们在某些边缘情况下不可取消,等等。

如果被取消的 Deferred 正在等待另一个 Deferred,则取消将被转发到另一个 Deferred

默认取消行为

所有 Deferred 都支持取消。但是,默认情况下,它们支持一种非常基本的取消形式,该形式不会释放任何资源。

考虑这个对取消一无所知的 Deferred 的例子

operation = Deferred()
def x(result):
    print("Hooray, a result:" + repr(x))
operation.addCallback(x)
# ...
def operationDone():
    operation.callback("completed")

接收 operation 的 API 的调用者可能会在它上面调用 cancel。由于 operation 没有取消函数,因此会发生以下两种情况之一。

  1. 如果 operationDone 已经被调用,并且操作已经完成,那么不会有太大变化。 operation 仍然会有一个结果,并且没有更多回调,因此行为上没有明显的变化。

  2. 如果 operationDone 还没有被调用,那么 operation 将立即用 CancelledError 错误回调。

    但是,一旦它被取消,就没有办法告诉 operationDone 不要运行;它最终会在稍后调用 operation.callback。在正常操作中,在已经回调的 Deferred 上发出 callback 会导致 AlreadyCalledError,这会导致一个无法捕获的丑陋回溯。因此,.callback 可以被精确地调用一次,导致一个无操作,在一个已经被取消但没有取消者的 Deferred 上。如果您多次调用它,您仍然会收到 AlreadyCalledError 异常。

创建可取消的 Deferred:自定义取消函数

假设您正在实现一个 HTTP 客户端,它返回一个 Deferred,并用来自服务器的响应触发。取消最好通过关闭连接来实现。为了让取消做到这一点,您所要做的就是将一个函数传递给 Deferred 的构造函数(它将使用正在被取消的 Deferred 调用该函数)

class HTTPClient(Protocol):
    def request(self, method, path):
        self.resultDeferred = Deferred(
            lambda ignore: self.transport.abortConnection())
        request = b"%s %s HTTP/1.0\r\n\r\n" % (method, path)
        self.transport.write(request)
        return self.resultDeferred

    def dataReceived(self, data):
        # ... parse HTTP response ...
        # ... eventually call self.resultDeferred.callback() ...

现在,如果有人在从 HTTPClient.request() 返回的 Deferred 上调用 cancel(),HTTP 请求将被取消(假设现在取消还不算太晚)。应该注意不要在已经被取消的 Deferred 上调用 callback()

超时

超时是 取消 的一个特例。假设我们有一个 Deferred,它代表一个可能需要很长时间的任务。我们希望对该任务设置一个上限,因此我们希望 Deferred 在未来 X 秒后超时。

一个方便的 API 是 Deferred.addTimeout。默认情况下,如果 Deferredtimeout 秒内没有触发(通过错误回调或回调),它将使用 TimeoutError 失败。

import random
from twisted.internet import task

def f():
    return "Hopefully this will be called in 3 seconds or less"

def main(reactor):
    delay = random.uniform(1, 5)

    def called(result):
        print("{0} seconds later:".format(delay), result)

    d = task.deferLater(reactor, delay, f)
    d.addTimeout(3, reactor).addBoth(called)

    return d

# f() will be timed out if the random delay is greater than 3 seconds
task.react(main)

Deferred.addTimeout 在幕后使用 Deferred.cancel 函数,但可以区分用户对 Deferred.cancel 的调用和由于超时而导致的取消。默认情况下,Deferred.addTimeout 将超时产生的 CancelledError 转换为 TimeoutError

但是,如果您在创建 Deferred 时提供了自定义 取消,那么取消它可能不会产生 CancelledError。在这种情况下,Deferred.addTimeout 的默认行为是保留自定义取消函数产生的任何回调或错误回调值。这可能很有用,例如,取消或超时应该产生一个默认值而不是错误。

Deferred.addTimeout 还接受一个可选的可调用对象 onTimeoutCancel,它在延迟超时后立即被调用。如果延迟在超时之前被取消,则不会调用 onTimeoutCancel。它接受一个任意值,该值是延迟在该确切时间的返回值(可能是一个 CancelledError Failure),以及 timeout。这可能很有用,例如,取消或超时不会导致错误,但您仍然希望记录超时。它也可以用来改变返回值。

from twisted.internet import task, defer

def logTimeout(result, timeout):
    print("Got {0!r} but actually timed out after {1} seconds".format(
        result, timeout))
    return result + " (timed out)"

def main(reactor):
    # generate a deferred with a custom canceller function, and never
    # never callback or errback it to guarantee it gets timed out
    d = defer.Deferred(lambda c: c.callback("Everything's ok!"))
    d.addTimeout(2, reactor, onTimeoutCancel=logTimeout)
    d.addBoth(print)
    return d

task.react(main)

请注意,在回调链中添加 Deferred.addTimeout 的确切位置决定了回调链中应超时的时间。超时涵盖了在调用 Deferred 之前添加到 addTimeout 的所有回调和错误回调,但不包括调用之后添加的任何回调和错误回调。超时也从调用时开始倒计时。

DeferredList

有时您希望在多个不同的事件都发生后收到通知,而不是分别等待每个事件。例如,您可能希望等待列表中的所有连接关闭。 twisted.internet.defer.DeferredList 是实现此目的的方法。

要从多个 Deferred 创建 DeferredList,只需传递您希望它等待的 Deferred 列表即可

# Creates a DeferredList
dl = defer.DeferredList([deferred1, deferred2, deferred3])

现在您可以将 DeferredList 视为普通的 Deferred;您可以调用 addCallbacks 等。当所有 Deferred 完成时,DeferredList 将调用其回调。回调将使用包含的 Deferred 结果列表调用,如下所示

# A callback that unpacks and prints the results of a DeferredList
def printResult(result):
    for (success, value) in result:
        if success:
            print('Success:', value)
        else:
            print('Failure:', value.getErrorMessage())

# Create three deferreds.
deferred1 = defer.Deferred()
deferred2 = defer.Deferred()
deferred3 = defer.Deferred()

# Pack them into a DeferredList
dl = defer.DeferredList([deferred1, deferred2, deferred3], consumeErrors=True)

# Add our callback
dl.addCallback(printResult)

# Fire our three deferreds with various values.
deferred1.callback('one')
deferred2.errback(Exception('bang!'))
deferred3.callback('three')

# At this point, dl will fire its callback, printing:
#    Success: one
#    Failure: bang!
#    Success: three
# (note that defer.SUCCESS == True, and defer.FAILURE == False)

标准 DeferredList 永远不会调用错误回调,但传递给 DeferredList 的 Deferred 中的错误仍将触发错误回调,除非传递了 consumeErrors True。有关此内容和其他修改 DeferredList 行为的标志的更多详细信息,请参见下文。

注意

如果您想将回调应用于进入 DeferredList 的各个 Deferred,您应该注意添加这些回调的时间。将 Deferred 添加到 DeferredList 的操作会将回调插入该 Deferred(当该回调运行时,它会检查 DeferredList 是否已完成)。要记住的重要一点是,正是 *这个回调* 记录了传递给 DeferredList 回调的结果列表中的值。

因此,如果您在将 Deferred 添加到 DeferredList *之后* 将回调添加到 Deferred,则该回调返回的值将不会传递给 DeferredList 的回调。为了避免混淆,我们建议不要在将 Deferred 用于 DeferredList 后向其添加回调。

def printResult(result):
    print(result)

def addTen(result):
    return result + " ten"

# Deferred gets callback before DeferredList is created
deferred1 = defer.Deferred()
deferred2 = defer.Deferred()
deferred1.addCallback(addTen)
dl = defer.DeferredList([deferred1, deferred2])
dl.addCallback(printResult)
deferred1.callback("one") # fires addTen, checks DeferredList, stores "one ten"
deferred2.callback("two")
# At this point, dl will fire its callback, printing:
#     [(1, 'one ten'), (1, 'two')]

# Deferred gets callback after DeferredList is created
deferred1 = defer.Deferred()
deferred2 = defer.Deferred()
dl = defer.DeferredList([deferred1, deferred2])
deferred1.addCallback(addTen) # will run *after* DeferredList gets its value
dl.addCallback(printResult)
deferred1.callback("one") # checks DeferredList, stores "one", fires addTen
deferred2.callback("two")
# At this point, dl will fire its callback, printing:
#     [(1, 'one), (1, 'two')]

其他行为

DeferredList 接受三个修改其行为的关键字参数:fireOnOneCallbackfireOnOneErrbackconsumeErrors。如果设置了 fireOnOneCallback,则 DeferredList 将立即调用其回调,只要其任何 Deferred 调用其回调即可。类似地,fireOnOneErrback 将在任何 Deferred 调用其错误回调时调用错误回调。请注意,DeferredList 仍然是单次执行的,就像普通的 Deferred 一样,因此在回调或错误回调被调用后,DeferredList 将不再执行任何操作(它只会静默地忽略其 Deferred 的任何其他结果)。

当您希望在一切成功时等待所有结果,但又希望立即知道是否出现错误时,fireOnOneErrback 选项特别有用。

consumeErrors 参数将阻止 DeferredList 沿其包含的任何 Deferred 的回调链传播任何错误(通常创建 DeferredList 对其 Deferred 的回调和错误回调传递的结果没有影响)。使用此选项在 DeferredList 处停止错误将阻止 Deferred 中的“Deferred 中的未处理错误”警告,而无需添加额外的错误回调 [1] 。为 consumeErrors 参数传递真值不会改变 fireOnOneCallbackfireOnOneErrback 的行为。

gatherResults

DeferredList 的一个常见用途是“连接”多个并行异步操作,如果所有操作都成功则成功完成,或者如果任何一个操作失败则失败。在这种情况下,twisted.internet.defer.gatherResults() 是一个有用的快捷方式

from twisted.internet import defer

d1 = defer.Deferred()
d2 = defer.Deferred()
d = defer.gatherResults([d1, d2], consumeErrors=True)

def cbPrintResult(result):
    print(result)

d.addCallback(cbPrintResult)

d1.callback("one")
# nothing is printed yet; d is still awaiting completion of d2
d2.callback("two")
# printResult prints ["one", "two"]

consumeErrors 参数与 DeferredList 中的含义相同:如果为真,它会导致 gatherResults 消耗传递的 Deferred 中的任何错误。始终使用此参数,除非您正在向传递的 Deferred 添加进一步的回调或错误回调,或者除非您知道它们不会失败。否则,失败将导致 Twisted 记录未处理的错误。此参数从 Twisted 11.1.0 开始可用。

类概述

这是从使用函数返回的 Deferred 的角度来看的 Deferred 的 API 概述参考。它并非旨在替代 Deferred 类中的文档字符串,但可以提供使用指南。

生成 Deferreds 中,有一个并行的 Deferred *创建者* 使用的函数概述。

基本回调函数

  • addCallbacks(self, callback[, errback, callbackArgs, callbackKeywords, errbackArgs, errbackKeywords])

    这是您将用于与 Deferred 交互的方法。它在 Deferred 被回调时添加一对“并行”回调(参见上面的图表)。使用 addCallbacks 添加的方法的签名应为 myMethod(result, *methodAsrgs, **methodKeywords)。例如,如果您的方法在回调插槽中传递,则元组 callbackArgs 中的所有参数都将作为 *methodArgs 传递给您的方法。

    有各种便利方法是 addCallbacks 的派生方法。我不会在这里详细介绍它们,但了解它们对于创建简洁的代码非常重要。

  • addCallback(callback, *callbackArgs, **callbackKeywords)

    在处理链的下一个点添加您的回调,同时添加一个错误回调,该错误回调将重新抛出其第一个参数,不会影响错误情况下的进一步处理。

    请注意,虽然 addCallbacks(复数)要求参数在元组中传递,但 addCallback(单数)将所有剩余的参数作为要传递给回调函数的内容。原因很明显:addCallbacks(复数)无法判断参数是用于回调还是错误回调,因此必须通过将它们放入元组中来明确标记。addCallback(单数)知道所有内容都将传递给回调,因此它可以使用 Python 的“*”和“**”语法来收集剩余的参数。

  • addErrback(errback, *errbackArgs, **errbackKeywords)

    在处理链的下一个点添加您的错误回调,同时添加一个回调,该回调将返回其第一个参数,不会影响成功情况下的进一步处理。

  • addBoth(callbackOrErrback, *callbackOrErrbackArgs, **callbackOrErrbackKeywords)

    此方法将相同的回调添加到处理链的两侧的两个点。请记住,如果您使用此方法,第一个参数的类型是不确定的!将其用于 finally: 样式块。

链接 Deferreds

如果您需要一个 Deferred 来等待另一个 Deferred,您只需从添加到 addCallbacks 的方法中返回一个 Deferred。具体来说,如果您从使用 A.addCallbacks 添加到 Deferred A 的方法中返回 Deferred B,则 Deferred A 的处理链将停止,直到 Deferred B 的 .callback() 方法被调用;此时,A 中的下一个回调将被传递 Deferred B 的处理链在该时间点上的最后一个回调的结果。

注意

如果一个 Deferred 以某种方式从其自身回调(直接或间接)返回,则行为未定义。Deferred 代码将尝试检测这种情况并发出警告。将来,这将成为一个异常。

如果这看起来很混乱,现在不用担心 - 当您遇到需要此行为的情况时,您可能会立即识别出来并意识到为什么会发生这种情况。如果您想手动链接 Deferred,还有一个方便的方法可以帮助您。

  • chainDeferred(otherDeferred)

    otherDeferred 添加到此 Deferred 处理链的末尾。当 self.callback 被调用时,我的处理链到目前为止的结果将被传递给 otherDeferred.callback。对我的回调链的进一步添加不会影响 otherDeferred

    这与 self.addCallbacks(otherDeferred.callback, otherDeferred.errback) 相同。

另请参阅

  1. 生成 Deferreds,介绍编写返回 Deferred 的异步函数。

脚注