使用进程¶
概述¶
除了连接到互联网上的服务器之外,Twisted 还使用与 API 非常相似的 API 连接到本地进程。API 在以下文档中进行了更详细的描述:
运行另一个进程¶
进程通过 reactor 运行,使用 reactor.spawnProcess
。创建到子进程的管道,并将其添加到 reactor 核心,以便应用程序在向新进程发送数据或从新进程中提取数据时不会阻塞。 reactor.spawnProcess
需要两个参数,processProtocol
和 executable
,并且可以选择接受更多参数:args
、environment
、path
、userID
、groupID
、usePTY
和 childFDs
。并非所有这些参数在 Windows 上都可用。
from twisted.internet import reactor
processProtocol = MyProcessProtocol()
reactor.spawnProcess(processProtocol, executable, args=[program, arg1, arg2],
env={'HOME': os.environ['HOME']}, path,
uid, gid, usePTY, childFDs)
processProtocol
应该是twisted.internet.protocol.ProcessProtocol
子类的实例。接口将在下面描述。executable
是要运行的程序的完整路径。它将连接到 processProtocol。args
是要传递给进程的命令行参数列表。args[0]
应该是进程的名称。env
是一个字典,包含要传递给进程的环境。path
是运行进程的目录。子进程将在启动新程序之前切换到给定的目录。默认情况下,它将保留在当前目录中。uid
和gid
是以其运行子进程的用户 ID 和组 ID。当然,如果你以 root 身份启动,更改身份会更有可能成功。usePTY
指定子进程是否应该使用 pty 运行,或者它是否应该只获得一对管道。程序是否需要使用 PTY 取决于该程序的具体情况。通常,主要通过终端与用户交互的程序确实需要 PTY。childFDs
允许你指定子进程的文件描述符应如何设置。每个键都是子进程看到的文件描述符编号(整数)。0、1 和 2 通常是 stdin、stdout 和 stderr,但某些程序可能会被指示通过命令行参数或环境变量使用额外的 fds。每个值都是一个整数,指定父进程的当前文件描述符之一,字符串“r”创建父进程可以从中读取的管道,或者字符串“w”创建父进程可以写入的管道。如果未提供childFDs
,则将使用默认值,该默认值创建通常的 stdin-writer、stdout-reader 和 stderr-reader 管道。
args
和 env
具有空默认值,但许多程序依赖于它们被正确设置。至少,args[0]
可能应该与 executable
相同。如果你只为 env
提供 os.environ
,子程序将从当前进程继承环境,这通常是文明的做法(除非你希望明确地清理环境作为安全预防措施)。默认情况下,会向子进程提供一个空的 env
。
reactor.spawnProcess
返回一个实现了 IProcessTransport
的实例。
编写 ProcessProtocol¶
传递给 spawnProcess
的 ProcessProtocol 是你与进程交互的接口。它的签名与常规 Protocol 非常相似,但它有一些额外的用于处理特定于进程的事件的方法。在我们的示例中,我们将与“wc”交互以创建用户给定文本的字数统计。首先,我们将从导入所需的模块开始,并编写 ProcessProtocol 的初始化。
from twisted.internet import protocol
class WCProcessProtocol(protocol.ProcessProtocol):
def __init__(self, text):
self.text = text
当 ProcessProtocol 连接到协议时,会调用 connectionMade 方法。在我们的协议中,我们将文本写入进程的标准输入,然后关闭标准输入,以让进程知道我们已完成写入。
...
def connectionMade(self):
self.transport.write(self.text)
self.transport.closeStdin()
此时,进程已收到数据,现在该我们读取结果了。数据不是在 dataReceived
中接收的,而是从标准输出接收的,在 outReceived
中接收。这是为了将其与标准错误上的数据区分开来。
...
def outReceived(self, data):
fieldLength = len(data) / 3
lines = int(data[:fieldLength])
words = int(data[fieldLength:fieldLength*2])
chars = int(data[fieldLength*2:])
self.transport.loseConnection()
self.receiveCounts(lines, words, chars)
现在,进程已解析输出并结束了与进程的连接。然后,它将结果发送到最终方法 receiveCounts。这是供类的用户覆盖的,以便对数据执行其他操作。为了演示,我们将只打印结果。
...
def receiveCounts(self, lines, words, chars):
print('Received counts from wc.')
print('Lines:', lines)
print('Words:', words)
print('Characters:', chars)
我们完成了!要使用 WCProcessProtocol,我们创建一个实例,并将其传递给 spawnProcess。
from twisted.internet import reactor
wcProcess = WCProcessProtocol("accessing protocols through Twisted is fun!\n")
reactor.spawnProcess(wcProcess, 'wc', ['wc'])
reactor.run()
可能发生在 ProcessProtocol 上的事情¶
这些是你可以在 ProcessProtocol
子类中有效覆盖的方法
.connectionMade()
:当程序启动时,会调用此方法,这是一个将数据写入 stdin 管道(使用self.transport.write
)的好地方。.outReceived(data)
:当从进程的 stdout 管道接收数据时,会调用此方法。管道往往比套接字提供更大块的数据(一 KB 是常见的缓冲区大小),因此你可能不会遇到网络套接字典型的“随机少量数据”行为,但无论如何,你应该做好准备,以防你没有在一次调用中获得所有数据。要正确执行此操作,outReceived
应该只累积数据,并在进程完成之前不要执行任何操作。.errReceived(data)
:当从进程的 stderr 管道接收数据时,会调用此方法。它的行为与outReceived
相同。.inConnectionLost
:当 reactor 发现进程的 stdin 管道已关闭时,会调用此方法。程序通常不会关闭自己的 stdin,因此当你的 ProcessProtocol 使用self.transport.loseConnection
关闭写入端时,这可能会被调用。.outConnectionLost
: 当程序关闭其 stdout 管道时调用。这通常发生在程序终止时。.errConnectionLost
: 与outConnectionLost
相同,但针对 stderr 而不是 stdout。.processExited(status)
: 当子进程被回收时调用,并接收有关进程退出状态的信息。状态以Failure
实例的形式传递,该实例使用.value
创建,该实例要么包含一个ProcessDone
对象(如果进程正常终止(它因自然原因死亡而不是接收信号,并且如果退出代码为 0)),要么包含一个ProcessTerminated
对象(带有.exitCode
属性)(如果出现问题)。.processEnded(status)
: 当与子进程关联的所有文件描述符都被关闭并且进程已被回收时调用。这意味着它是对ProcessProtocol
进行的最后一个回调。status
参数与processExited
中的含义相同。
大多数这些函数的基类定义都是空操作。这将导致所有 stdout 和 stderr 被丢弃。请注意,对于您不关心的数据,将其丢弃非常重要:如果管道没有被读取,子进程最终会在尝试写入满管道时阻塞。
您可以从 ProcessProtocol 中执行的操作¶
以下是控制子进程的基本方法
self.transport.write(data)
: 在 stdin 管道中放入一些数据。请注意,此write
方法将排队任何无法立即写入的数据。当管道再次可写时,写入将在将来恢复。self.transport.closeStdin
: 关闭 stdin 管道。充当过滤器(从 stdin 读取、修改数据、写入 stdout)的程序通常将此视为完成工作并终止的信号。对于这些程序,在您完成操作后关闭 stdin 很重要,否则子进程永远不会退出。self.transport.closeStdout
: 通常不调用,因为您将进程置于任何尝试写入 stdout 都会导致 SIGPIPE 错误的状态。这对可怜的进程来说不是一件好事。self.transport.closeStderr
: 通常不调用,原因与closeStdout
相同。self.transport.loseConnection
: 关闭所有三个管道。self.transport.signalProcess('KILL')
: 杀死子进程。这最终将导致调用processEnded
。
详细示例¶
这是一个关于所有方法何时被调用的详细示例。它将多行写入 wc
程序,然后解析输出。
#!/usr/bin/env python
# Copyright (c) Twisted Matrix Laboratories.
# See LICENSE for details.
import re
from twisted.internet import protocol, reactor
class MyPP(protocol.ProcessProtocol):
def __init__(self, verses):
self.verses = verses
self.data = ""
def connectionMade(self):
print("connectionMade!")
for i in range(self.verses):
self.transport.write(
"Aleph-null bottles of beer on the wall,\n"
+ "Aleph-null bottles of beer,\n"
+ "Take one down and pass it around,\n"
+ "Aleph-null bottles of beer on the wall.\n"
)
self.transport.closeStdin() # tell them we're done
def outReceived(self, data):
print("outReceived! with %d bytes!" % len(data))
self.data = self.data + data
def errReceived(self, data):
print("errReceived! with %d bytes!" % len(data))
def inConnectionLost(self):
print("inConnectionLost! stdin is closed! (we probably did it)")
def outConnectionLost(self):
print("outConnectionLost! The child closed their stdout!")
# now is the time to examine what they wrote
# print("I saw them write:", self.data)
(dummy, lines, words, chars, file) = re.split(r"\s+", self.data)
print("I saw %s lines" % lines)
def errConnectionLost(self):
print("errConnectionLost! The child closed their stderr.")
def processExited(self, reason):
print("processExited, status %d" % (reason.value.exitCode,))
def processEnded(self, reason):
print("processEnded, status %d" % (reason.value.exitCode,))
print("quitting")
reactor.stop()
pp = MyPP(10)
reactor.spawnProcess(pp, "wc", ["wc"], {})
reactor.run()
此程序的确切输出取决于一些不同步事件的相对时间。特别是,程序可能会在从 stdout 管道读取数据之前或之后观察到子进程关闭其 stderr 管道。一个可能的记录将如下所示
% ./process.py
connectionMade!
inConnectionLost! stdin is closed! (we probably did it)
errConnectionLost! The child closed their stderr.
outReceived! with 24 bytes!
outConnectionLost! The child closed their stdout!
I saw 40 lines
processEnded, status 0
quitting
Main loop terminated.
%
简单方法¶
通常,人们只需要一种简单的方法来获取程序的所有输出。在阻塞世界中,您可能会使用标准库中的 commands.getoutput
,但在事件驱动的程序中使用它会导致所有其他操作都停滞,直到命令完成。(此外,该函数使用的 SIGCHLD 处理程序与 Twisted 自己的信号处理不兼容)。对于这些情况,可以使用 twisted.internet.utils.getProcessOutput()
函数。这是一个简单的示例
from cStringIO import StringIO
from twisted.internet import protocol, reactor, utils
from twisted.python import failure
class FortuneQuoter(protocol.Protocol):
fortune = "/usr/games/fortune"
def connectionMade(self):
output = utils.getProcessOutput(self.fortune)
output.addCallbacks(self.writeResponse, self.noResponse)
def writeResponse(self, resp):
self.transport.write(resp)
self.transport.loseConnection()
def noResponse(self, err):
self.transport.loseConnection()
if __name__ == "__main__":
f = protocol.Factory()
f.protocol = FortuneQuoter
reactor.listenTCP(10999, f)
reactor.run()
如果您只需要最终的退出代码(如 commands.getstatusoutput(cmd)[0]
),则 twisted.internet.utils.getProcessValue()
函数很有用。这是一个示例
from twisted.internet import reactor, utils
def printTrueValue(val):
print("/bin/true exits with rc=%d" % val)
output = utils.getProcessValue("/bin/false")
output.addCallback(printFalseValue)
def printFalseValue(val):
print("/bin/false exits with rc=%d" % val)
reactor.stop()
output = utils.getProcessValue("/bin/true")
output.addCallback(printTrueValue)
reactor.run()
映射文件描述符¶
“stdin”、 “stdout” 和 “stderr” 只是约定。充当过滤器的程序通常接受 fd0 上的输入,将它们的输出写入 fd1,并在 fd2 上发出错误消息。这很常见,以至于标准 C 库提供了像 “stdin” 这样的宏来表示 fd0,并且 shell 解释管道字符 “|” 表示 “将 fd1 从一个命令重定向到下一个命令的 fd0”。
但这些只是约定,程序可以自由使用其他文件描述符,甚至完全忽略标准的三个文件描述符。“childFDs” 参数允许您精确指定子进程应该获得什么样的文件描述符。
每个子 FD 可以处于三种状态之一
映射到父 FD:这会导致子进程的读写来自或去往与父进程相同的源/目标。
馈送到父进程可以读取的管道。
从父进程写入的管道中馈送。
将子 FD 映射到父进程的 FD 非常普遍地用于将子进程的 stderr 输出发送到与父进程的 stderr 输出相同的位置。当您从 shell 运行程序时,它通常会将 fds 0、1 和 2 映射到 shell 的 0、1 和 2,允许您在用于启动子进程的同一终端上看到子程序的输出。同样,inetd 通常会将 stdin 和 stdout 映射到网络套接字,并且可能会将 stderr 映射到同一个套接字或某种日志记录机制。这允许子程序在不知道网络的情况下实现:它只是通过对 fd0 进行读取和对 fd1 进行写入来执行其协议。
馈送到父进程的读取管道用于收集来自子进程的输出,并且是与子进程交互的最常见方法。
从父进程的写入管道中馈送允许父进程控制子进程。像 “bc” 或 “ftp” 这样的程序可以通过将命令写入其 stdin 流来控制。
“childFDs” 字典将文件描述符编号(如子进程所见)映射到这三种状态之一。要将 fd 映射到父进程的 fds 之一,只需提供 fd 编号作为值即可。要将其映射到读取管道,请使用字符串 “r” 作为值。要将其映射到写入管道,请使用字符串 “w”。
例如,默认映射设置了标准的 stdin/stdout/stderr 管道。它使用以下字典实现
childFDs = { 0: "w", 1: "r", 2: "r" }
要启动一个进程,该进程读取和写入与父 Python 程序相同的位置,请使用以下方法
childFDs = { 0: 0, 1: 1, 2: 2}
要写入额外的 fd(假设它是 fd 编号 4),请使用以下方法
childFDs = { 0: "w", 1: "r", 2: "r" , 4: "w"}
具有额外文件描述符的 ProcessProtocols¶
当您提供一个 “childFDs” 字典,其中包含超过正常三个 fds 时,您需要额外的用于访问这些管道的方法。这些方法比上面描述的 .outReceived
方法更通用。事实上,这些方法(outReceived
和 errReceived
)实际上只是为了与旧代码(在实现这种通用 fd 映射之前编写的代码)兼容而保留的包装器。您可以对 ProcessProtocol 执行的新操作列表如下
.connectionMade
: 当程序启动时调用。.childDataReceived(childFD, data)
: 当从进程的输出管道之一(即 childFDs 值为 “r” 的地方)接收数据时调用。实际文件编号(从子进程的角度来看)位于 “childFD” 中。为了兼容性,.childDataReceived
的默认实现会在 “childFD” 为 1 或 2 时分派到.outReceived
或.errReceived
。.childConnectionLost(childFD)
: 当反应器注意到进程的管道之一已关闭时调用。这意味着您刚刚关闭了管道父端的管道(使用.transport.closeChildFD
),子进程显式关闭了管道(有时表示 EOF),或者子进程已终止并且内核已关闭其所有管道。“childFD” 参数告诉您哪个管道已关闭。请注意,您只能找到已映射到管道的文件描述符:当它们映射到父进程已有的 fds 时,父进程无法知道它们何时被关闭。为了兼容性,默认实现分派到.inConnectionLost
、.outConnectionLost
或.errConnectionLost
。.processEnded(status)
: 当子进程被回收并且所有管道都被关闭时调用。这确保了在子进程死亡之前子进程写入的所有数据都将在调用.processEnded
之前被接收。
除了这些方法之外,还有其他方法可用于影响子进程
self.transport.writeToChild(childFD, data)
: 在输入管道中放入一些数据。.write
只是写入 childFD=0。self.transport.closeChildFD(childFD)
: 关闭子进程的管道之一。关闭输入管道是向子进程指示 EOF 的一种常见方法。关闭输出管道既不友好也不实用。
示例¶
加密程序 GnuPG 可以使用额外的文件描述符来接收密码并输出状态信息。这些描述符与 stdin(用于接收密文)、stdout(用于输出明文)和 stderr(用于输出人类可读的状态/警告消息)不同。密码 FD 读取直到管道关闭,并使用结果字符串来解锁执行实际解密的密钥。状态 FD 发出机器可解析的状态消息,以指示签名的有效性、消息加密到的密钥等。
gpg 接受命令行参数来指定这些 fd 是什么,然后假设它们在 gpg 进程启动之前由父进程打开。它只是对这些 fd 编号进行读写操作。
要以解密/验证模式调用 gpg,您可以执行以下操作:
class GPGProtocol(ProcessProtocol):
def __init__(self, crypttext):
self.crypttext = crypttext
self.plaintext = ""
self.status = ""
def connectionMade(self):
self.transport.writeToChild(3, self.passphrase)
self.transport.closeChildFD(3)
self.transport.writeToChild(0, self.crypttext)
self.transport.closeChildFD(0)
def childDataReceived(self, childFD, data):
if childFD == 1: self.plaintext += data
if childFD == 4: self.status += data
def processEnded(self, status):
rc = status.value.exitCode
if rc == 0:
self.deferred.callback(self)
else:
self.deferred.errback(rc)
def decrypt(crypttext):
gp = GPGProtocol(crypttext)
gp.deferred = Deferred()
cmd = ["gpg", "--decrypt", "--passphrase-fd", "3", "--status-fd", "4",
"--batch"]
p = reactor.spawnProcess(gp, cmd[0], cmd, env=None,
childFDs={0:"w", 1:"r", 2:2, 3:"w", 4:"r"})
return gp.deferred
在这个例子中,状态输出可以在事后解析。当然,它也可以在运行时解析,因为它是一个简单的面向行的协议。可以将 LineReceiver 的方法混合使用,使这种解析更加方便。
使用的 stderr 映射(“2:2”)将导致任何 GPG 错误由父程序发出,就像这些错误是由父程序本身引起的。这在某些情况下是可取的(它大致对应于让异常向上传播),尤其是在您不希望在子进程中遇到错误并且希望它们对最终用户更可见的情况下。另一种方法是将 stderr 映射到一个读管道,并从 ProcessProtocol 中处理任何此类输出(大致对应于在本地捕获异常)。