首页 文章

发布者在订阅者和消息丢失之前完成 - 为什么?

提问于
浏览
1

对zeromq来说相当新,并试图让一个基本的pub / sub工作 . 当我运行以下(在pub之前启动子站)时,发布者完成但订阅者挂起但没有收到所有消息 - 为什么?

我认为套接字正在关闭,但邮件已被发送?有没有办法确保收到所有邮件?

出版商:

import zmq
import random
import time
import tnetstring

context=zmq.Context()
socket=context.socket(zmq.PUB)
socket.bind("tcp://*:5556")

y=0
for x in xrange(5000):
    st = random.randrange(1,10) 
    data = []
    data.append(random.randrange(1,100000))
    data.append(int(time.time()))
    data.append(random.uniform(1.0,10.0))

    s = tnetstring.dumps(data)
    print 'Sending ...%d %s' % (st,s)

    socket.send("%d %s" % (st,s))
    print "Messages sent: %d" % x
    y+=1

print '*** SERVER FINISHED. # MESSAGES SENT = ' + str(y)

订阅者: -

import sys
import zmq
import tnetstring

# Socket to talk to server
context = zmq.Context()
socket = context.socket(zmq.SUB)

socket.connect("tcp://localhost:5556")
filter = "" # get all messages

socket.setsockopt(zmq.SUBSCRIBE, filter)

x=0
while True:
    topic,data = socket.recv().split()
    print "Topic: %s, Data = %s. Total # Messages = %d" % (topic,data,x)
    x+=1

2 回答

  • 0

    在ZeroMQ中,客户端和服务器总是尝试重新连接;他们赢了't go down if the other side disconnects (because in many cases you'如果对方再次出现,他们希望他们继续说话 . 因此,在您的测试代码中,客户端将等到服务器再次开始发送消息,除非您在某个时刻停止 recv() 消息 .

  • 0

    在您的特定实例中,您可能希望使用socket.close()和context.term()进行调查 . 它将阻塞,直到所有消息都已发送 . 你也有一个慢木匠的问题 . 您可以在绑定后添加睡眠,但在开始发布之前 . 这适用于测试用例,但您需要真正了解解决方案与创可贴的关系 .

    您需要像收音机一样考虑PUB / SUB模式 . 发送方和接收方都是异步的 . 即使没有人在收听,发布者也会继续发送 . 订户只有在收听时才会收到数据 . 如果网络中间发生故障,数据将丢失 .

    您需要了解这一点才能设计您的消息 . 例如,如果您将消息设计为“幂等”,则丢失数据无关紧要 . 一个例子是状态类型消息 . 如果您具有以前的任何状态,则无关紧要 . 最新的一个是正确的,消息丢失无关紧要 . 这种方法的好处是你最终得到了一个更强大和高性能的系统 . 缺点是你无法以这种方式设计你的消息 .

    您的示例包含一种不需要丢失的消息 . 另一种类型的消息是事务性的 . 例如,如果您刚刚发送了系统中已更改内容的增量,则无法丢失消息 . 数据库复制通常以这种方式进行管理,这就是数据库复制通常非常脆弱的原因 . 要尝试提供保证,您需要做几件事 . 一件事是添加持久缓存 . 发送的每条消息都需要记录在持久性缓存中 . 需要为每条消息分配唯一的ID(最好是序列),以便客户端可以确定它们是否缺少消息 . 需要添加第二个套接字(ROUTER / REQ),以便客户端单独请求丢失的消息 . 或者,您可以使用辅助套接字请求通过PUB / SUB重新发送 . 然后,客户端将再次接收消息(适用于多播版本) . 客户会忽略他们已经看过的消息 . 注意:这遵循ZeroMQ指南中的MAJORDOMO模式 .

    另一种方法是使用ROUTER / DEALER套接字创建自己的代理 . 当ROUTER套接字看到每个DEALER连接时,它将存储其ID . 当ROUTER需要发送数据时,它将迭代所有客户端ID并发布消息 . 每条消息都应包含一个序列,以便客户端可以知道要请求的丢失消息 . 注意:这是来自linkedin的Kafka的一种重新实现 .

相关问题