异步消息协议概述

本指南的目的是描述 twisted.protocols.amp 的用途和用法,超出 API 文档中所解释的内容。它将向您展示如何实现一个 AMP 服务器,该服务器可以响应命令或直接与单个消息进行交互。它还将向您展示如何实现一个 AMP 客户端,该客户端可以向服务器发出命令。

AMP 是一种双向命令/响应式协议,旨在通过应用程序特定的请求类型和处理程序进行扩展。它支持各种简单数据类型,应用程序可以添加对新数据类型的支持。

设置

AMP 在面向流的基于连接的协议(如 TCP 或 SSL)上运行。在使用 AMP 协议的任何功能之前,您需要建立连接。用于建立 AMP 连接的协议类是 AMP 。连接设置与 Twisted 中几乎所有协议的工作方式相同。例如,您可以使用服务器端点设置一个监听 AMP 服务器

basic_server.tac

from twisted.application.internet import StreamServerEndpointService
from twisted.application.service import Application
from twisted.internet import reactor
from twisted.internet.endpoints import TCP4ServerEndpoint
from twisted.internet.protocol import Factory
from twisted.protocols.amp import AMP

application = Application("basic AMP server")

endpoint = TCP4ServerEndpoint(reactor, 8750)
factory = Factory()
factory.protocol = AMP
service = StreamServerEndpointService(endpoint, factory)
service.setServiceParent(application)

您可以使用客户端端点连接到 AMP 服务器

basic_client.py

if __name__ == "__main__":
    import basic_client

    raise SystemExit(basic_client.main())

from sys import stdout

from twisted.internet import reactor
from twisted.internet.endpoints import TCP4ClientEndpoint
from twisted.internet.protocol import Factory
from twisted.protocols.amp import AMP
from twisted.python.log import err, startLogging


def connect():
    endpoint = TCP4ClientEndpoint(reactor, "127.0.0.1", 8750)
    return endpoint.connect(Factory.forProtocol(AMP))


def main():
    startLogging(stdout)

    d = connect()
    d.addErrback(err, "Connection failed")

    def done(ignored):
        reactor.stop()

    d.addCallback(done)

    reactor.run()

命令

AMP 连接的任一端都可以向另一端发出命令。每种命令都表示为 Command 的子类。一个 Command 定义了参数、响应值和错误条件。

from twisted.protocols.amp import Integer, String, Unicode, Command

class UsernameUnavailable(Exception):
    pass

class RegisterUser(Command):
    arguments = [('username', Unicode()),
                 ('publickey', String())]

    response = [('uid', Integer())]

    errors = {UsernameUnavailable: 'username-unavailable'}

命令签名的定义 - 它的参数、响应和可能的错误条件 - 与执行命令时执行的行为的实现是分开的。Command 子类只定义前者。

命令通过在连接的任一端调用 callRemote 来发出。此方法返回一个 Deferred,该 Deferred 最终会触发命令的结果。

command_client.py

if __name__ == "__main__":
    import command_client

    raise SystemExit(command_client.main())

from sys import stdout

from basic_client import connect

from twisted.internet import reactor
from twisted.protocols.amp import Command, Integer, String, Unicode
from twisted.python.log import err, startLogging


class UsernameUnavailable(Exception):
    pass


class RegisterUser(Command):
    arguments = [("username", Unicode()), ("publickey", String())]

    response = [("uid", Integer())]

    errors = {UsernameUnavailable: "username-unavailable"}


def main():
    startLogging(stdout)

    d = connect()

    def connected(protocol):
        return protocol.callRemote(
            RegisterUser,
            username="alice",
            publickey="ssh-rsa AAAAB3NzaC1yc2 alice@actinium",
        )

    d.addCallback(connected)

    def registered(result):
        print("Registration result:", result)

    d.addCallback(registered)

    d.addErrback(err, "Failed to register")

    def finished(ignored):
        reactor.stop()

    d.addCallback(finished)

    reactor.run()

定位器

处理命令的逻辑可以指定为与解释和格式化网络字节的 AMP 实例分开的对象。

from twisted.protocols.amp import CommandLocator
from twisted.python.filepath import FilePath

class UsernameUnavailable(Exception):
    pass

class UserRegistration(CommandLocator):
    uidCounter = 0

    @RegisterUser.responder
    def register(self, username, publickey):
        path = FilePath(username)
        if path.exists():
            raise UsernameUnavailable()
        self.uidCounter += 1
        path.setContent('%d %s\n' % (self.uidCounter, publickey))
        return self.uidCounter

当您定义一个单独的 CommandLocator 子类时,通过将它的实例传递给 AMP 初始化程序来使用它。

factory = Factory()
factory.protocol = lambda: AMP(locator=UserRegistration())

如果没有传递定位器,AMP 将充当自己的定位器。命令响应器可以在 AMP 子类上定义,就像响应器在上面的 UserRegistration 示例中定义的那样。

盒子接收器

AMP 会话由一系列称为盒子的消息交换组成。一个盒子由一系列键值对组成(例如,usernamealice 对)。盒子通常表示为 dict 实例。通常,盒子在来回传递以实现上面描述的命令请求/响应功能。处理每个盒子的逻辑可以指定为与 AMP 实例分开的对象。

from zope.interface import implementer

from twisted.protocols.amp import IBoxReceiver

@implementer(IBoxReceiver)
class BoxReflector(object):
    def startReceivingBoxes(self, boxSender):
        self.boxSender = boxSender

    def ampBoxReceived(self, box):
        self.boxSender.sendBox(box)

    def stopReceivingBoxes(self, reason):
        self.boxSender = None

这些方法与 IProtocol 的方法类似。启动通知由 startReceivingBoxes 提供。传递给它的参数是一个 IBoxSender 提供者,它可以用来将盒子发送回网络。ampBoxReceived 提供一个完整的盒子已接收的通知。最后,stopReceivingBoxes 通知对象不再接收盒子,也不再发送盒子。传递给它的参数是一个 Failure,它可能包含导致会话结束的原因的详细信息。

要使用自定义的 IBoxReceiver,请将其传递给 AMP 初始化程序。

factory = Factory()
factory.protocol = lambda: AMP(boxReceiver=BoxReflector())

如果没有传递盒子接收器,AMP 将充当自己的盒子接收器。它通过将盒子视为命令请求或响应,并将它们传递给相应的响应器或作为 callRemote Deferred 的结果来处理盒子。