我有一个应用程序,使用PUB / SUB设置从ZeroMQ发布者获取消息 . 读者有时很慢,所以我在发送器和接收器上都设置了HWM . 我希望接收器将填充缓冲区并跳转以便在处理速度减慢时恢复 . 但我观察到的行为是它永远不会掉落! ZeroMQ似乎忽视了HWM . 难道我做错了什么?
这是一个最小的例子:
publisher.py
import zmq
import time
ctx = zmq.Context()
sock = ctx.socket(zmq.PUB)
sock.setsockopt(zmq.SNDHWM, 1)
sock.bind("tcp://*:5556")
i = 0
while True:
sock.send(str(i))
print i
time.sleep(0.1)
i += 1
subscriber.py
import zmq
import time
ctx = zmq.Context()
sock = ctx.socket(zmq.SUB)
sock.setsockopt(zmq.SUBSCRIBE, "")
sock.setsockopt(zmq.RCVHWM, 1)
sock.connect("tcp://localhost:5556")
while True:
print sock.recv()
time.sleep(0.5)
2 回答
我相信这里有几件事情在起作用:
High Water Marks are not exact(参见链接部分的最后一段) - 通常这意味着真正的队列大小将小于列出的数字,我不知道这将如何在
1
表现 .您的
PUB
HWM
永远不会丢弃消息...由于PUB
套接字的工作方式,它将始终立即处理消息,无论是否有可用的订阅者 . 因此,除非实际需要ZMQ .1秒来处理队列中的消息,否则HWM
将永远不会在PUB
端发挥作用 .应该发生的事情类似于以下内容(我假设一个操作顺序,允许您实际接收第一个发布的消息):
启动subscriber.py并等待一段合适的时间以确保它完全旋转(基本上立即)
启动publisher.py
PUB
处理并发送第一条消息,SUB
接收并处理第一条消息PUB
睡眠.1秒并处理并发送第二条消息SUB
睡眠时间为.5秒,套接字收到第二条消息但是在队列中,直到下一次调用sock.recv()
处理它PUB
睡眠.1秒并处理并发送第三条消息SUB
仍在睡眠另一个.3秒,所以第三条消息应该在第二条消息后面的队列,这将在队列中发出2条消息,第三条消息应该因为HWM
而丢失等等等
我建议进行以下更改以帮助解决问题:
删除发布者的
HWM
除了在测试用例中添加我们不需要处理的变量之外什么都不做,因为我们从不期望它会改变任何东西 . 如果您的 生产环境 环境需要它,请将其重新添加并在以后的大批量方案中进行测试 .将订阅者的
HWM
更改为50.它是'll make the test take longer, but you won' t处于最边缘的情况,并且由于ZMQ文档声明HWM不是't exact, the extreme edge cases could cause unexpected behavior. Mind you, I believe your test (being small numbers) wouldn' t这样做,但我没有确切地说,并且它可能是您的数据足够小,有效HWM
实际上更大 .将您的订阅者睡眠时间更改为3整秒......理论上,如果您的队列最多包含50条消息,那么您必须等待2.5分钟才能完成这些消息,看看您是否开始跳过这些消息前50条消息应该开始跳过大组数字 . 但是我被数据的小小所困扰了 .
这当然不能解决如果你仍然没有跳过任何消息会发生什么......如果你这样做并且仍然遇到同样的问题,那么我们可能需要更多地了解水印实际上有多么有效,可能有我们缺少的东西 .
我遇到了完全相同的问题,我的演示与您的演示几乎相同,订阅者或发布者在zmq.RCVHWM或zmq.SNDHWM设置为1后不会丢弃任何消息 .
我在zguide的第5章中提到了慢速用户检测的suicidal snail pattern之后走来走去 . 希望能帮助到你 .
顺便说一下:如果你已经解决了zmq.HWM的错误,请告诉我吗?