在 Twisted 中使用线程

Twisted 如何使用线程

所有注册到 reactor 的回调函数 — 例如,dataReceivedconnectionLost,或来自这些回调函数的任何更高级别的函数,例如 twisted.web 中的 render_GET,或添加到 Deferred 的回调函数 — 都是从 reactor.run 中调用的。关于此的术语是,我们说这些回调函数是在“主线程”或“reactor 线程”或“I/O 线程”中运行的。

因此,在内部,Twisted 很少使用线程。这并不是说它完全不使用线程;有很多 API 没有非阻塞等效项,所以当 Twisted 需要调用这些 API 时,它会在线程中调用它们。一个突出的例子是系统主机名解析:除非你已将 Twisted 配置为在 twisted.names 中使用自己的 DNS 客户端,否则它将不得不使用你的操作系统的阻塞 API 来将主机名映射到 IP 地址,在 reactor 的线程池中。但是,这只是你需要了解的资源调整目的,例如设置要使用的线程数;否则,这是一个你可以忽略的实现细节。

人们常犯的一个错误是,由于 Twisted 可以同时管理多个连接,因此事情是在多个线程中发生的,所以你需要仔细管理锁。幸运的是,Twisted 在一个线程中完成了大部分工作!本文档解释了如何与需要在自己的线程中运行的现有 API 交互,因为它们会阻塞。如果你只是使用 Twisted 自己的 API,那么线程的规则很简单:“不要使用它们”。

从其他线程调用 Twisted

Twisted 中的函数只能从 reactor 线程中调用,除非另有说明。Twisted 中很少有东西是线程安全的。例如,从协议写入数据到传输不是线程安全的。这意味着,如果你启动一个线程并调用 Twisted 函数,你可能会得到正确的结果……或者你可能会遇到挂起、崩溃或数据损坏。所以不要这样做。

从另一个线程调用 reactor 上的函数的正确方法,以及因此可能调用 reactor 上的函数的任何对象,是向 reactor 提供一个函数,以便在它自己的线程中执行。这可以使用 callFromThread 函数来完成。

from twisted.internet import reactor
def notThreadSafe(someProtocol, message):
    someProtocol.transport.write(b"a message: " + message)
def callFromWhateverThreadYouWant():
    reactor.callFromThread(notThreadSafe, b"hello")

在这个例子中,callFromWhateverThreadYouWant 是线程安全的,可以被任何线程调用,但是 notThreadSafe 只能被运行在 reactor.run 运行的线程中的代码调用。

注意

Twisted 中有很多对象代表值 — 例如,FilePathURLPath — 你可以自己构建这些对象。只要这些对象不与其他线程共享,就可以在非 reactor 线程中安全地构建和使用它们。但是,你应该确保这些对象不共享任何状态,尤其是与 reactor 不共享状态。一个好的经验法则是,任何其函数返回 Deferred 的对象几乎肯定会在某个时刻触及 reactor,并且永远不应该从非 reactor 线程中访问。

在线程中运行代码

有时我们可能希望在非 reactor 线程中运行代码,以避免阻塞 reactor。Twisted 提供了一个低级 API 来执行此操作,即 reactor 上的 callInThread 函数。

例如,要在非 reactor 线程中运行一个函数,我们可以这样做

from twisted.internet import reactor

def aSillyBlockingMethod(x):
    import time
    time.sleep(2)
    print(x)

reactor.callInThread(aSillyBlockingMethod, "2 seconds have passed")
reactor.run()

callInThread 会将你的代码放入一个队列中,以便由 reactor 线程池中的下一个可用线程运行。这意味着,根据提交到池中的其他工作,你的函数可能不会立即运行。

注意

请记住,callInThread 只能并发运行固定数量的任务,并且所有 reactor 用户都在共享这个限制。因此,你不应该提交依赖于其他任务才能完成的任务来由 callInThread 执行。这样的任务的一个例子可能是这样的

q = Queue()

def blocker():
    print(q.get() + q.get())

def unblocker(a, b):
    q.put(a)
    q.put(b)

在这种情况下,blocker 将永远阻塞,除非 unblocker 可以成功运行以提供输入;类似地,unblocker 可能会永远阻塞,如果 blocker 没有运行来消耗它的输出。因此,如果你有一个最大大小为 X 的线程池,并且你运行了 for each in range(X): reactor.callInThread(blocker),那么 reactor 线程池将永远卡住,无法处理更多工作,甚至无法关闭。

请参阅下面的“管理 Reactor 线程池”以调整这些限制。

获取结果

callInThreadcallFromThread 允许你分别将代码的执行从 reactor 线程中移出和移入,但这并不总是足够的。

当我们运行一些代码时,我们通常想知道它的结果是什么。为此,Twisted 提供了两种函数:deferToThreadblockingCallFromThread,它们定义在 twisted.internet.threads 模块中。

为了将阻塞代码的结果返回到反应器线程,我们可以使用 deferToThread 来执行它,而不是使用 callFromThread

from twisted.internet import reactor, threads

def doLongCalculation():
    # .... do long calculation here ...
    return 3

def printResult(x):
    print(x)

# run method in thread and get result as defer.Deferred
d = threads.deferToThread(doLongCalculation)
d.addCallback(printResult)
reactor.run()

类似地,如果您希望在非反应器线程中运行的代码调用反应器线程中的代码并获取其结果,您可以使用 blockingCallFromThread

from twisted.internet import threads, reactor, defer
from twisted.web.client import Agent
from twisted.web.error import Error

def inThread():
    agent = Agent(reactor)
    try:
        result = threads.blockingCallFromThread(
            reactor,
            agent.request,
            b"GET",
            b"https://twistedmatrix.com/",
        )
    except Exception as exc:
        print(exc)
    else:
        print(result)
    reactor.callFromThread(reactor.stop)

reactor.callInThread(inThread)
reactor.run()

blockingCallFromThread 将返回传递给它的函数返回的对象或抛出的异常。如果传递给它的函数返回一个 Deferred,它将返回延迟对象被触发时的值或抛出它被失败时的异常。

管理反应器线程池

我们可能希望修改线程池的大小,增加或减少正在使用的线程数量。我们可以这样做

from twisted.internet import reactor

reactor.suggestThreadPoolSize(30)

线程池的默认大小取决于正在使用的反应器;默认反应器使用最小大小为 0,最大大小为 10。

反应器线程池由 ThreadPool 实现。要访问此对象上的方法以进行更高级的调整和监控(有关详细信息,请参阅 API 文档),您可以使用 getThreadPool 获取线程池。