在 Twisted 中使用线程¶
Twisted 如何使用线程¶
所有注册到 reactor 的回调函数 — 例如,dataReceived
、connectionLost
,或来自这些回调函数的任何更高级别的函数,例如 twisted.web 中的 render_GET
,或添加到 Deferred
的回调函数 — 都是从 reactor.run
中调用的。关于此的术语是,我们说这些回调函数是在“主线程”或“reactor 线程”或“I/O 线程”中运行的。
因此,在内部,Twisted 很少使用线程。这并不是说它完全不使用线程;有很多 API 没有非阻塞等效项,所以当 Twisted 需要调用这些 API 时,它会在线程中调用它们。一个突出的例子是系统主机名解析:除非你已将 Twisted 配置为在 twisted.names
中使用自己的 DNS 客户端,否则它将不得不使用你的操作系统的阻塞 API 来将主机名映射到 IP 地址,在 reactor 的线程池中。但是,这只是你需要了解的资源调整目的,例如设置要使用的线程数;否则,这是一个你可以忽略的实现细节。
人们常犯的一个错误是,由于 Twisted 可以同时管理多个连接,因此事情是在多个线程中发生的,所以你需要仔细管理锁。幸运的是,Twisted 在一个线程中完成了大部分工作!本文档解释了如何与需要在自己的线程中运行的现有 API 交互,因为它们会阻塞。如果你只是使用 Twisted 自己的 API,那么线程的规则很简单:“不要使用它们”。
从其他线程调用 Twisted¶
Twisted 中的函数只能从 reactor 线程中调用,除非另有说明。Twisted 中很少有东西是线程安全的。例如,从协议写入数据到传输不是线程安全的。这意味着,如果你启动一个线程并调用 Twisted 函数,你可能会得到正确的结果……或者你可能会遇到挂起、崩溃或数据损坏。所以不要这样做。
从另一个线程调用 reactor 上的函数的正确方法,以及因此可能调用 reactor 上的函数的任何对象,是向 reactor 提供一个函数,以便在它自己的线程中执行。这可以使用 callFromThread
函数来完成。
from twisted.internet import reactor
def notThreadSafe(someProtocol, message):
someProtocol.transport.write(b"a message: " + message)
def callFromWhateverThreadYouWant():
reactor.callFromThread(notThreadSafe, b"hello")
在这个例子中,callFromWhateverThreadYouWant
是线程安全的,可以被任何线程调用,但是 notThreadSafe
只能被运行在 reactor.run
运行的线程中的代码调用。
在线程中运行代码¶
有时我们可能希望在非 reactor 线程中运行代码,以避免阻塞 reactor。Twisted 提供了一个低级 API 来执行此操作,即 reactor 上的 callInThread
函数。
例如,要在非 reactor 线程中运行一个函数,我们可以这样做
from twisted.internet import reactor
def aSillyBlockingMethod(x):
import time
time.sleep(2)
print(x)
reactor.callInThread(aSillyBlockingMethod, "2 seconds have passed")
reactor.run()
callInThread
会将你的代码放入一个队列中,以便由 reactor 线程池中的下一个可用线程运行。这意味着,根据提交到池中的其他工作,你的函数可能不会立即运行。
注意
请记住,callInThread
只能并发运行固定数量的任务,并且所有 reactor 用户都在共享这个限制。因此,你不应该提交依赖于其他任务才能完成的任务来由 callInThread
执行。这样的任务的一个例子可能是这样的
q = Queue()
def blocker():
print(q.get() + q.get())
def unblocker(a, b):
q.put(a)
q.put(b)
在这种情况下,blocker
将永远阻塞,除非 unblocker
可以成功运行以提供输入;类似地,unblocker
可能会永远阻塞,如果 blocker
没有运行来消耗它的输出。因此,如果你有一个最大大小为 X 的线程池,并且你运行了 for each in range(X): reactor.callInThread(blocker)
,那么 reactor 线程池将永远卡住,无法处理更多工作,甚至无法关闭。
请参阅下面的“管理 Reactor 线程池”以调整这些限制。
获取结果¶
callInThread
和 callFromThread
允许你分别将代码的执行从 reactor 线程中移出和移入,但这并不总是足够的。
当我们运行一些代码时,我们通常想知道它的结果是什么。为此,Twisted 提供了两种函数:deferToThread
和 blockingCallFromThread
,它们定义在 twisted.internet.threads
模块中。
为了将阻塞代码的结果返回到反应器线程,我们可以使用 deferToThread
来执行它,而不是使用 callFromThread
。
from twisted.internet import reactor, threads
def doLongCalculation():
# .... do long calculation here ...
return 3
def printResult(x):
print(x)
# run method in thread and get result as defer.Deferred
d = threads.deferToThread(doLongCalculation)
d.addCallback(printResult)
reactor.run()
类似地,如果您希望在非反应器线程中运行的代码调用反应器线程中的代码并获取其结果,您可以使用 blockingCallFromThread
from twisted.internet import threads, reactor, defer
from twisted.web.client import Agent
from twisted.web.error import Error
def inThread():
agent = Agent(reactor)
try:
result = threads.blockingCallFromThread(
reactor,
agent.request,
b"GET",
b"https://twistedmatrix.com/",
)
except Exception as exc:
print(exc)
else:
print(result)
reactor.callFromThread(reactor.stop)
reactor.callInThread(inThread)
reactor.run()
blockingCallFromThread
将返回传递给它的函数返回的对象或抛出的异常。如果传递给它的函数返回一个 Deferred
,它将返回延迟对象被触发时的值或抛出它被失败时的异常。
管理反应器线程池¶
我们可能希望修改线程池的大小,增加或减少正在使用的线程数量。我们可以这样做
from twisted.internet import reactor
reactor.suggestThreadPoolSize(30)
线程池的默认大小取决于正在使用的反应器;默认反应器使用最小大小为 0,最大大小为 10。
反应器线程池由 ThreadPool
实现。要访问此对象上的方法以进行更高级的调整和监控(有关详细信息,请参阅 API 文档),您可以使用 getThreadPool
获取线程池。