异步消息协议概述¶
本指南的目的是描述 twisted.protocols.amp
的用途和用法,超出 API 文档中所解释的内容。它将向您展示如何实现一个 AMP 服务器,该服务器可以响应命令或直接与单个消息进行交互。它还将向您展示如何实现一个 AMP 客户端,该客户端可以向服务器发出命令。
AMP 是一种双向命令/响应式协议,旨在通过应用程序特定的请求类型和处理程序进行扩展。它支持各种简单数据类型,应用程序可以添加对新数据类型的支持。
设置¶
AMP 在面向流的基于连接的协议(如 TCP 或 SSL)上运行。在使用 AMP 协议的任何功能之前,您需要建立连接。用于建立 AMP 连接的协议类是 AMP
。连接设置与 Twisted 中几乎所有协议的工作方式相同。例如,您可以使用服务器端点设置一个监听 AMP 服务器
from twisted.application.internet import StreamServerEndpointService
from twisted.application.service import Application
from twisted.internet import reactor
from twisted.internet.endpoints import TCP4ServerEndpoint
from twisted.internet.protocol import Factory
from twisted.protocols.amp import AMP
application = Application("basic AMP server")
endpoint = TCP4ServerEndpoint(reactor, 8750)
factory = Factory()
factory.protocol = AMP
service = StreamServerEndpointService(endpoint, factory)
service.setServiceParent(application)
您可以使用客户端端点连接到 AMP 服务器
if __name__ == "__main__":
import basic_client
raise SystemExit(basic_client.main())
from sys import stdout
from twisted.internet import reactor
from twisted.internet.endpoints import TCP4ClientEndpoint
from twisted.internet.protocol import Factory
from twisted.protocols.amp import AMP
from twisted.python.log import err, startLogging
def connect():
endpoint = TCP4ClientEndpoint(reactor, "127.0.0.1", 8750)
return endpoint.connect(Factory.forProtocol(AMP))
def main():
startLogging(stdout)
d = connect()
d.addErrback(err, "Connection failed")
def done(ignored):
reactor.stop()
d.addCallback(done)
reactor.run()
命令¶
AMP 连接的任一端都可以向另一端发出命令。每种命令都表示为 Command
的子类。一个 Command
定义了参数、响应值和错误条件。
from twisted.protocols.amp import Integer, String, Unicode, Command
class UsernameUnavailable(Exception):
pass
class RegisterUser(Command):
arguments = [('username', Unicode()),
('publickey', String())]
response = [('uid', Integer())]
errors = {UsernameUnavailable: 'username-unavailable'}
命令签名的定义 - 它的参数、响应和可能的错误条件 - 与执行命令时执行的行为的实现是分开的。Command
子类只定义前者。
命令通过在连接的任一端调用 callRemote
来发出。此方法返回一个 Deferred
,该 Deferred
最终会触发命令的结果。
if __name__ == "__main__":
import command_client
raise SystemExit(command_client.main())
from sys import stdout
from basic_client import connect
from twisted.internet import reactor
from twisted.protocols.amp import Command, Integer, String, Unicode
from twisted.python.log import err, startLogging
class UsernameUnavailable(Exception):
pass
class RegisterUser(Command):
arguments = [("username", Unicode()), ("publickey", String())]
response = [("uid", Integer())]
errors = {UsernameUnavailable: "username-unavailable"}
def main():
startLogging(stdout)
d = connect()
def connected(protocol):
return protocol.callRemote(
RegisterUser,
username="alice",
publickey="ssh-rsa AAAAB3NzaC1yc2 alice@actinium",
)
d.addCallback(connected)
def registered(result):
print("Registration result:", result)
d.addCallback(registered)
d.addErrback(err, "Failed to register")
def finished(ignored):
reactor.stop()
d.addCallback(finished)
reactor.run()
定位器¶
处理命令的逻辑可以指定为与解释和格式化网络字节的 AMP
实例分开的对象。
from twisted.protocols.amp import CommandLocator
from twisted.python.filepath import FilePath
class UsernameUnavailable(Exception):
pass
class UserRegistration(CommandLocator):
uidCounter = 0
@RegisterUser.responder
def register(self, username, publickey):
path = FilePath(username)
if path.exists():
raise UsernameUnavailable()
self.uidCounter += 1
path.setContent('%d %s\n' % (self.uidCounter, publickey))
return self.uidCounter
当您定义一个单独的 CommandLocator
子类时,通过将它的实例传递给 AMP
初始化程序来使用它。
factory = Factory()
factory.protocol = lambda: AMP(locator=UserRegistration())
如果没有传递定位器,AMP
将充当自己的定位器。命令响应器可以在 AMP
子类上定义,就像响应器在上面的 UserRegistration
示例中定义的那样。
盒子接收器¶
AMP 会话由一系列称为盒子的消息交换组成。一个盒子由一系列键值对组成(例如,username
和 alice
对)。盒子通常表示为 dict
实例。通常,盒子在来回传递以实现上面描述的命令请求/响应功能。处理每个盒子的逻辑可以指定为与 AMP
实例分开的对象。
from zope.interface import implementer
from twisted.protocols.amp import IBoxReceiver
@implementer(IBoxReceiver)
class BoxReflector(object):
def startReceivingBoxes(self, boxSender):
self.boxSender = boxSender
def ampBoxReceived(self, box):
self.boxSender.sendBox(box)
def stopReceivingBoxes(self, reason):
self.boxSender = None
这些方法与 IProtocol
的方法类似。启动通知由 startReceivingBoxes
提供。传递给它的参数是一个 IBoxSender
提供者,它可以用来将盒子发送回网络。ampBoxReceived
提供一个完整的盒子已接收的通知。最后,stopReceivingBoxes
通知对象不再接收盒子,也不再发送盒子。传递给它的参数是一个 Failure
,它可能包含导致会话结束的原因的详细信息。
要使用自定义的 IBoxReceiver
,请将其传递给 AMP
初始化程序。
factory = Factory()
factory.protocol = lambda: AMP(boxReceiver=BoxReflector())
如果没有传递盒子接收器,AMP
将充当自己的盒子接收器。它通过将盒子视为命令请求或响应,并将它们传递给相应的响应器或作为 callRemote
Deferred
的结果来处理盒子。