生成延迟对象

Deferred 对象是信号,表示您调用的函数尚未提供您想要的数据。当函数返回一个 Deferred 对象时,您的调用函数会附加回调函数,以便在数据可用时处理数据。

本文档讨论了问题的另一半:编写返回 Deferred 的函数,即构造 Deferred 对象,安排它们立即返回而不阻塞直到数据可用,并在数据可用时触发它们的回调函数。

本文档假设您熟悉 Twisted 使用的异步模型,以及 使用函数返回的延迟对象

类概述

这是 Deferred 的 API 参考概述,从创建 Deferred 和触发其回调函数和错误回调函数的角度出发。它不打算替代 Deferred 类中的文档字符串,但可以提供使用指南。

使用延迟对象 中,有一个并行的概述,介绍了调用函数使用的函数,这些函数将返回延迟对象。

基本回调函数

  • callback(result)

    使用给定的结果运行成功回调函数。这只能运行一次。对该函数或 errback 的后续调用将引发 twisted.internet.defer.AlreadyCalledError 。如果在此之后添加了更多回调函数或错误回调函数,addCallbacks 将立即运行回调函数。

  • errback(failure)

    使用给定的错误运行错误回调函数。这只能运行一次。对该函数或 callback 的后续调用将引发 twisted.internet.defer.AlreadyCalledError 。如果在此之后添加了更多回调函数或错误回调函数,addCallbacks 将立即运行回调函数。

延迟对象不做什么:使您的代码异步

延迟对象不会使代码神奇地不阻塞。

以这个函数为例

from twisted.internet import defer

TARGET = 10000

def largeFibonnaciNumber():
    # create a Deferred object to return:
    d = defer.Deferred()

    # calculate the ten thousandth Fibonnaci number

    first = 0
    second = 1

    for i in range(TARGET - 1):
        new = first + second
        first = second
        second = new
        if i % 100 == 0:
            print("Progress: calculating the %dth Fibonnaci number" % i)

    # give the Deferred the answer to pass to the callbacks:
    d.callback(second)

    # return the Deferred with the answer:
    return d

import time

timeBefore = time.time()

# call the function and get our Deferred
d = largeFibonnaciNumber()

timeAfter = time.time()

print("Total time taken for largeFibonnaciNumber call: %0.3f seconds" % \
      (timeAfter - timeBefore))

# add a callback to it to print the number

def printNumber(number):
    print("The %dth Fibonacci number is %d" % (TARGET, number))

print("Adding the callback now.")

d.addCallback(printNumber)

您会注意到,尽管在 largeFibonnaciNumber 函数中创建了一个 Deferred 对象,但发生了以下情况

  • “largeFibonnaciNumber 调用所用总时间”输出显示该函数没有立即返回,而异步函数应该立即返回;并且

  • 回调函数不是在结果可用之前添加,并在结果可用之后调用,而是在计算完成之后才添加。

该函数在返回之前完成了计算,阻塞了进程直到它完成,这正是异步函数不应该做的事情。延迟对象不是一个非阻塞护身符:它们是异步函数用来将结果传递给回调函数的信号,但使用它们并不能保证您拥有一个异步函数。

高级处理链控制

  • pause()

    停止调用任何添加的方法,并且不要响应 callback ,直到调用 self.unpause()

  • unpause()

    如果 callback 已经在这个 Deferred 上被调用,则调用自 pause 被调用以来添加到这个 Deferred 的所有回调函数。

    无论它是否被调用,这都将使这个 Deferred 处于一种状态,在这种状态下,对 addCallbackscallback 的进一步调用将正常工作。

从同步函数返回延迟对象

有时您可能希望从同步函数返回一个 Deferred 对象。有几个原因,主要两个原因是:保持与返回 Deferred 对象的另一个版本的函数的 API 兼容性,或者允许将来您的函数可能需要异步执行的可能性。

使用延迟对象 参考中,我们给出了以下同步函数示例

synch-validation.py

def synchronousIsValidUser(user):
    """
    Return true if user is a valid user, false otherwise
    """
    return user in ["Alice", "Angus", "Agnes"]

虽然我们可以要求调用我们函数的人使用 maybeDeferred 将我们的同步结果包装在一个 Deferred 对象中,但为了 API 兼容性,最好使用 defer.succeed 自己返回一个 Deferred 对象。

from twisted.internet import defer

def immediateIsValidUser(user):
    '''
    Returns a Deferred resulting in true if user is a valid user, false
    otherwise
    '''

    result = user in ["Alice", "Angus", "Agnes"]

    # return a Deferred object already called back with the value of result
    return defer.succeed(result)

有一个等效的 defer.fail 方法来返回一个 Deferred 对象,其错误回调函数链已经触发。

将阻塞代码与 Twisted 集成

在某些时候,您可能需要调用一个阻塞函数:第三方库中的许多函数将具有长时间运行的阻塞函数。无法“强制”函数变为异步:它必须专门以这种方式编写。使用 Twisted 时,您自己的代码应该是异步的,但除了重写之外,没有办法使第三方函数异步。

在这种情况下,Twisted 提供了在单独的线程中运行阻塞代码的能力,而不是让它阻塞您的应用程序。 twisted.internet.threads.deferToThread() 函数将设置一个线程来运行您的阻塞函数,返回一个 Deferred 对象,并在线程完成时触发该 Deferred 对象。

假设我们上面的 largeFibonnaciNumber 函数位于第三方库中(返回计算结果,而不是 Deferred 对象),并且不容易修改为在离散块中完成。此示例显示了它在线程中被调用,与上一节不同,我们将看到该操作不会阻塞整个程序

def largeFibonnaciNumber():
    """
    Represent a long running blocking function by calculating
    the TARGETth Fibonnaci number
    """
    TARGET = 10000

    first = 0
    second = 1

    for i in range(TARGET - 1):
        new = first + second
        first = second
        second = new

    return second

from twisted.internet import threads, reactor

def fibonacciCallback(result):
    """
    Callback which manages the largeFibonnaciNumber result by
    printing it out
    """
    print("largeFibonnaciNumber result =", result)
    # make sure the reactor stops after the callback chain finishes,
    # just so that this example terminates
    reactor.stop()

def run():
    """
    Run a series of operations, deferring the largeFibonnaciNumber
    operation to a thread and performing some other operations after
    adding the callback
    """
    # get our Deferred which will be called with the largeFibonnaciNumber result
    d = threads.deferToThread(largeFibonnaciNumber)
    # add our callback to print it out
    d.addCallback(fibonacciCallback)
    print("1st line after the addition of the callback")
    print("2nd line after the addition of the callback")

if __name__ == '__main__':
    run()
    reactor.run()

可能的错误来源

延迟对象通过提供注册回调函数的标准,极大地简化了编写异步代码的过程,但如果您要使用它们,则需要遵循一些微妙且有时令人困惑的规则。这主要适用于编写使用延迟对象内部的新系统的人,而不是编写只向由其他系统生成和处理的延迟对象添加回调函数的应用程序的人。尽管如此,了解这些规则还是很有必要的。

多次触发延迟对象是不可能的

延迟对象是单次使用的。您只能调用 Deferred.callbackDeferred.errback 一次。每次向已经调用过回调函数的 Deferred 添加新的回调函数时,处理链都会继续。

同步回调函数执行

如果一个 Deferred 已经有了可用的结果,addCallback **可能** 会同步调用回调函数:也就是说,在回调函数被添加后立即调用。在回调函数修改状态的情况下,可能希望处理链在所有回调函数被添加之前停止。为此,可以在添加大量回调函数时 pauseunpause Deferred 的处理链。

使用这些方法时要小心!如果你 pause 了一个 Deferred,那么你**有责任**确保你取消暂停它。添加回调函数的函数必须取消暂停一个已暂停的 Deferred,这**绝不应该**是实际通过调用 callbackerrback 来触发回调链的代码的责任,因为这会使其失去作用!