我在类似的主题上发现了很多问题,但它们并没有帮助我解决我的问题 .
使用:
-
Linux Ubuntu 14.04
-
python 3.4
-
zmq:4.0.4 // pyZMQ 14.3.1
TL; DR
即使设置了HWM,ZMQ SUB套接字中的接收器队列也会无限增长 . 当订阅者比发布者慢时会发生这种情况 . 我该怎么做才能防止它?
背景
我在人机交互领域工作 . 我们有一个巨大的代码库来控制鼠标光标,这种事情 . 我想在几个模块中“打破它”,与ZMQ沟通 . 它必须具有尽可能小的延迟,但丢失(丢失)消息并不重要 .
另一个有趣的方面是在节点之间添加“ Spy ”的可能性 . 因此PUB / SUB插座似乎是最合适的 .
像这样的东西:
+----------+ +-----------+ +------------+
| | PUB | | PUB | |
| Input | +----+------> | Filter | +----+------> | Output |
| | | SUB | | | SUB | |
+----------+ v +-----------+ v +------------+
+-----+ +-----+
|Spy 1| |Spy 2|
+-----+ +-----+
问题
除非我们添加 Spy ,否则一切正常 . 如果我们使用matplotlib添加一个使用matplotlib进行实时可视化等“重物”的 Spy ,我们会注意到图中的延迟增加 . IE:在上图中,过滤器和输出很快,没有看到延迟,但是在Spy 2上,延迟可以在运行20分钟后达到10分钟(!!)
看起来接收器上的队列无限增长 . 我们调查了ZMQ的高水位标记(HWM)功能,将其设置为低以丢弃旧消息,但没有任何改变 .
最小代码
建筑:
+------------+ +-------------+
| | PUB | |
| sender | -------------> | receiver |
| | SUB| |
+------------+ +-------------+
接收器是一个慢接收器(在第一个图中充当 Spy )
代码:
Sender.py
import time
import zmq
ctx = zmq.Context()
sender = ctx.socket(zmq.PUB)
sender.setsockopt(zmq.SNDBUF, 256)
sender.set_hwm(10)
sender.bind('tcp://127.0.0.1:1500')
print(zmq.zmq_version()) ## 4.0.4
print(zmq.__version__) ## 14.3.1
print(sender.get_hwm()) ## 10
i = 0
while True:
mess = "{} {}".format(i, time.time())
sender.send_string(mess)
print("Send : {}".format(mess))
i+= 1
receiver.py:
import time
import zmq
ctx = zmq.Context()
front_end = ctx.socket(zmq.SUB)
front_end.set_hwm(1)
front_end.setsockopt(zmq.RCVBUF, 8)
front_end.setsockopt_string(zmq.SUBSCRIBE, '')
front_end.connect('tcp://127.0.0.1:1500')
print(zmq.zmq_version()) ## 4.0.4
print(zmq.__version__) ## 14.3.1
print(front_end.get_hwm()) ## 1
while True:
mess = front_end.recv_string()
i, t = mess.split(" ")
mess = "{} {}".format(i, time.time() - float(t))
print("received : {}".format(mess))
time.sleep(1) # slow
我不认为这是ZMQ Pub / Sub的正常行为 . 我尝试在接收器中,在订户中设置HWM,但两者都没有改变 .
我错过了什么?
Edit :
当我解释我的问题时,我不认为我很清楚 . 我做了一个移动鼠标光标的实现 . 输入是在ZMQ中以200Hz(带有 .sleep( 1.0 / 200 )
)发送的鼠标光标位置,完成了一些处理并更新了鼠标光标位置(我的最小例子中没有这个睡眠) .
即使我发射了 Spy ,一切都很顺利 . 然而 Spy 的延迟时间越来越长(因为处理速度慢) . 延迟不会出现在光标中,在“管道”的末尾 .
我认为问题来自于消息的缓慢 subscriber queuing .
在我的示例中,如果我们终止发送方并让接收方处于活动状态,则将继续显示消息,直到显示所有(?)提交的消息 .
Spy 正在绘制光标的位置以提供一些反馈,这样的滞后仍然非常不方便......我只是想要发送最后一条消息,这就是我试图降低HWM的原因 .
2 回答
来自http://zguide.zeromq.org/page:all#toc50:
所以SUB套接字并没有真正丢弃旧消息 . 您可以使用路由器来实现丢弃订户,或考虑可以满足缓慢元素的设计 . 对零的好处是你的核心代码很多都可以保持不变,并且你可能会绕过处理套接字的包装器 .
缺少更好的实时设计/验证
ZeroMQ是一个功能强大的消息传递层 .
也就是说, check 它在原始
while True:
杀手循环中每秒发送的消息数量是多少Measure 它 . 设计事实,而不是感情 .
事实很重要 .
ZeroMQ最好将该雪崩纳入该计划 .
这是相当不错的 .
您的[ Filter ] [ Spy1 ] [ Output ] [ Spy2 ]管道处理,端到端, has either
要么
这种排队链问题可以通过另一种架构设计来解决 .
重新思考的事情:
sub-sample [ Filter ] .send()cadency(交错因子取决于你控制下的实时过程的稳定性问题 - 是1毫秒(顺便说一下O / S定时器分辨率,所以没有量子物理实验是可以使用COTS O / S定时器控制:o)),10毫秒用于双向语音流,50毫秒用于电视/ GUI流,300毫秒用于键盘事件流等等)
online v / s offline 后期处理/可视化(您注意到重
matplotlib
处理, there you typically bear about 800 - 1600 - 3600 msec overheads ,即使在简单的2D图形上 - measure it ,然后才决定PUB / SUB- <proc1> -PUB / SUB- <proc2>中的更改处理架构(你已经注意到,<spy2>导致增长<proc2> -PUB-feeding&sending的问题开销) .线程数与执行它们的localhost核心数 - 从localhost ip可以看出,所有进程都驻留在同一个localhost上 . 另外,如果所有线程都是从同一个Python解释器实例化的,那么每个使用的ZMQ.Context添加一个线程,并查看Python GIL锁定开销......阻塞增长 . 阻止伤害 . 更好的分布式架构可以改善这些性能方面 . 但是,首先回顾[1]和[2]
n.b. 调用20分钟处理流水线延迟(实时系统TimeDOMAIN偏斜)延迟是很多委婉的