生产者和消费者:高效高容量流式传输

本指南的目的是描述 Twisted 的生产者消费者系统。生产者系统允许应用程序以一种既节省内存又节省 CPU 的方式流式传输大量数据,并且不会在 Reactor 中引入不可接受的延迟来源。

读者应该至少对与接口相关的术语有所了解。

推送生产者

推送生产者是一种在没有外部提示的情况下会继续生成数据直到被告知停止的生产者;拉取生产者则会根据对更多数据的明确请求,一次生成一个数据块。

推送生产者 API 由 IPushProducer 接口定义。当数据生成与事件源紧密相连时,最好创建一个推送生产者。例如,一个将来自一个套接字的传入字节转发到另一个传出套接字的代理可以使用推送生产者来实现:dataReceived 充当事件源,生产者从中生成字节,并且不需要任何外部干预即可执行此操作。

在推送生产者的生命周期中,可以在不同的时间点调用三种方法:pauseProducingresumeProducingstopProducing

pauseProducing()

为了避免使用无限的内存来缓冲无法快速处理的生产数据,有必要能够告诉推送生产者暂时停止生产数据。这可以通过 pauseProducing 方法来实现。推送生产者的实现者应该在调用此方法时暂时停止生产数据。

resumeProducing()

在推送生产者暂停一段时间后,它生产的过量数据将被处理,生产者可以再次开始生产数据。当时机成熟时,将调用推送生产者的 resumeProducing 方法。

stopProducing()

大多数生产者会生成一些有限的(尽管可能事先未知)数据量,然后停止,完成其预期目的。但是,在发生这种情况之前,可能会发生导致剩余未生产数据变得无关紧要的事件。在这种情况下,继续生产数据将是浪费的。将调用推送生产者的 stopProducing 方法。实现应该停止生产数据并清理生产者拥有的任何资源。

拉取生产者

拉取生产者 API 由 IPullProducer 接口定义。拉取生产者在没有明确的事件源参与数据生成的情况下很有用。例如,如果数据是某些仅受 CPU 时间限制的算法过程的结果,则拉取生产者是合适的。

拉取生产者仅由两种方法定义:resumeProducingstopProducing

resumeProducing()

与推送生产者不同,拉取生产者预期仅在调用 resumeProducing 时才生产数据。每当需要更多数据时,都会调用此方法。响应此方法调用要生产多少数据取决于各种因素:数据太少,运行时成本将主要由与缓冲区变空和请求更多数据进行处理相关的来回事件通知所决定;数据太多,内存使用量将比需要的高,创建如此多数据所带来的延迟会导致应用程序的整体性能下降。一个好的经验法则是每次生成 16 到 64 千字节的数据,但你应该尝试不同的值来确定最适合你的应用程序的值。

stopProducing()

此方法对拉取生产者的含义与对推送生产者的含义相同。

消费者

到目前为止,我已经讨论了 Twisted 支持的两种生产者的各种外部 API。但是,我没有提到生产者生成的数据实际去哪里,也没有提到哪个实体负责调用这些 API。这两个角色都由消费者来完成。消费者由一个接口 IConsumer 定义。

IConsumer 定义了三个方法:registerProducerunregisterProducerwrite

registerProducer(producer, streaming)

为了让消费者能够调用生产者上的方法,消费者需要被告知生产者。这可以通过 registerProducer 方法来实现。第一个参数是 IPullProducerIPushProducer 提供者;第二个参数指示提供了哪种接口:True 表示推送生产者,False 表示拉取生产者。

unregisterProducer()

最终,消费者将不再对生产者感兴趣。这可能是因为生产者已经完成了所有数据的生成,或者因为消费者正在转向其他事物,或者任何其他原因。无论如何,此方法都会逆转 registerProducer 的效果。

write(data)

正如您可能猜到的,这是生产者在生成一些数据时调用的方法。推送生产者应该尽可能频繁地调用它,只要它们没有被暂停。拉取生产者应该在每次对它们调用 resumeProducing 时调用它一次。

进一步阅读

可以在 doc/examples/streaming.py 中找到一个示例推送生产者应用程序。