首页 文章

使用PUB-SUB(慢速用户)的ZMQ延迟

提问于
浏览
1

我在类似的主题上发现了很多问题,但它们并没有帮助我解决我的问题 .

使用:

  • Linux Ubuntu 14.04

  • python 3.4

  • zmq:4.0.4 // pyZMQ 14.3.1

TL; DR

即使设置了HWM,ZMQ SUB套接字中的接收器队列也会无限增长 . 当订阅者比发布者慢时会发生这种情况 . 我该怎么做才能防止它?

背景

我在人机交互领域工作 . 我们有一个巨大的代码库来控制鼠标光标,这种事情 . 我想在几个模块中“打破它”,与ZMQ沟通 . 它必须具有尽可能小的延迟,但丢失(丢失)消息并不重要 .

另一个有趣的方面是在节点之间添加“ Spy ”的可能性 . 因此PUB / SUB插座似乎是最合适的 .

像这样的东西:

+----------+                +-----------+                 +------------+
|          | PUB            |           |  PUB            |            |
|  Input   | +----+------>  |  Filter   |  +----+------>  |   Output   |
|          |      |     SUB |           |       |     SUB |            |
+----------+      v         +-----------+       v         +------------+
               +-----+                       +-----+                   
               |Spy 1|                       |Spy 2|                   
               +-----+                       +-----+

问题

除非我们添加 Spy ,否则一切正常 . 如果我们使用matplotlib添加一个使用matplotlib进行实时可视化等“重物”的 Spy ,我们会注意到图中的延迟增加 . IE:在上图中,过滤器和输出很快,没有看到延迟,但是在Spy 2上,延迟可以在运行20分钟后达到10分钟(!!)

看起来接收器上的队列无限增长 . 我们调查了ZMQ的高水位标记(HWM)功能,将其设置为低以丢弃旧消息,但没有任何改变 .

最小代码

建筑:

+------------+                +-------------+
|            |  PUB           |             |
|   sender   | -------------> |  receiver   |
|            |             SUB|             |
+------------+                +-------------+

接收器是一个慢接收器(在第一个图中充当 Spy )

代码:

Sender.py

import time
import zmq

ctx = zmq.Context()

sender = ctx.socket(zmq.PUB)
sender.setsockopt(zmq.SNDBUF, 256)
sender.set_hwm(10)
sender.bind('tcp://127.0.0.1:1500')

print(zmq.zmq_version()) ## 4.0.4
print(zmq.__version__) ## 14.3.1
print(sender.get_hwm()) ## 10

i = 0
while True:
    mess = "{} {}".format(i, time.time())
    sender.send_string(mess)
    print("Send : {}".format(mess))
    i+= 1

receiver.py:

import time
import zmq

ctx = zmq.Context()
front_end = ctx.socket(zmq.SUB)

front_end.set_hwm(1)
front_end.setsockopt(zmq.RCVBUF, 8)

front_end.setsockopt_string(zmq.SUBSCRIBE, '')
front_end.connect('tcp://127.0.0.1:1500')

print(zmq.zmq_version()) ## 4.0.4
print(zmq.__version__) ## 14.3.1
print(front_end.get_hwm()) ## 1

while True:
    mess = front_end.recv_string()
    i, t = mess.split(" ")
    mess = "{} {}".format(i, time.time() - float(t))
    print("received : {}".format(mess))
    time.sleep(1)  # slow

我不认为这是ZMQ Pub / Sub的正常行为 . 我尝试在接收器中,在订户中设置HWM,但两者都没有改变 .

我错过了什么?

Edit :

当我解释我的问题时,我不认为我很清楚 . 我做了一个移动鼠标光标的实现 . 输入是在ZMQ中以200Hz(带有 .sleep( 1.0 / 200 ) )发送的鼠标光标位置,完成了一些处理并更新了鼠标光标位置(我的最小例子中没有这个睡眠) .

即使我发射了 Spy ,一切都很顺利 . 然而 Spy 的延迟时间越来越长(因为处理速度慢) . 延迟不会出现在光标中,在“管道”的末尾 .

我认为问题来自于消息的缓慢 subscriber queuing .

在我的示例中,如果我们终止发送方并让接收方处于活动状态,则将继续显示消息,直到显示所有(?)提交的消息 .

Spy 正在绘制光标的位置以提供一些反馈,这样的滞后仍然非常不方便......我只是想要发送最后一条消息,这就是我试图降低HWM的原因 .

2 回答

  • 3

    来自http://zguide.zeromq.org/page:all#toc50

    当套接字到达其HWM时,它将阻止或丢弃数据,具体取决于套接字类型 . 如果PUB和ROUTER套接字到达其HWM,则它们将丢弃数据,而其他套接字类型将阻塞 . 在inproc传输中,发送方和接收方共享相同的缓冲区,因此真正的HWM是双方设置的HWM的总和 .

    所以SUB套接字并没有真正丢弃旧消息 . 您可以使用路由器来实现丢弃订户,或考虑可以满足缓慢元素的设计 . 对零的好处是你的核心代码很多都可以保持不变,并且你可能会绕过处理套接字的包装器 .

  • 3

    缺少更好的实时设计/验证

    ZeroMQ是一个功能强大的消息传递层 .

    也就是说, check 它在原始 while True: 杀手循环中每秒发送的消息数量是多少

    Measure 它 . 设计事实,而不是感情 .

    事实很重要 .

    start_CLK = time.time()                                    # .SET _CLK
    time.sleep( 0.001)                                         # .NOP avoid DIV/0!
    i = 0                                                      # .SET CTR
    while True:                                                # .LOOP
        sender.send_string( "{} {}".format( i, time.time() ) ) # .SND ZMQ-PUB 
        print i / ( time.time() - start_CLK )                  # .GUI perf [msg/sec]
        i+= 1                                                  # .INC CTR
    

    ZeroMQ最好将该雪崩纳入该计划 .

    这是相当不错的 .

    您的[ Filter ] [ Spy1 ] [ Output ] [ Spy2 ]管道处理,端到端, has either

    • 要快,包括两个.send() . recv_string()开销比[ Input ] -sender

    要么

    • 是阻塞病态元素的主要因素,导致内部PUB / SUB排队增长,成长,增长

    这种排队链问题可以通过另一种架构设计来解决 .

    重新思考的事情:

    • sub-sample [ Filter ] .send()cadency(交错因子取决于你控制下的实时过程的稳定性问题 - 是1毫秒(顺便说一下O / S定时器分辨率,所以没有量子物理实验是可以使用COTS O / S定时器控制:o)),10毫秒用于双向语音流,50毫秒用于电视/ GUI流,300毫秒用于键盘事件流等等)

    • online v / s offline 后期处理/可视化(您注意到重 matplotlib 处理, there you typically bear about 800 - 1600 - 3600 msec overheads ,即使在简单的2D图形上 - measure it ,然后才决定PUB / SUB- <proc1> -PUB / SUB- <proc2>中的更改处理架构(你已经注意到,<spy2>导致增长<proc2> -PUB-feeding&sending的问题开销) .

    • 线程数与执行它们的localhost核心数 - 从localhost ip可以看出,所有进程都驻留在同一个localhost上 . 另外,如果所有线程都是从同一个Python解释器实例化的,那么每个使用的ZMQ.Context添加一个线程,并查看Python GIL锁定开销......阻塞增长 . 阻止伤害 . 更好的分布式架构可以改善这些性能方面 . 但是,首先回顾[1]和[2]

    n.b. 调用20分钟处理流水线延迟(实时系统TimeDOMAIN偏斜)延迟是很多委婉的

相关问题