首页 文章

从ZMQ PULL套接字获取数据 . 如何同步计算?

提问于
浏览
0

我有一个 生产环境 者使用PULL / PUSH向多个 Worker 发送数据 . 在执行计算任务之前,所有工作人员都需要接收所有数据 .

我尝试使用发送“go”的PUB / SUB套接字进行同步,但由于PUSH套接字是非阻塞的,因此在数据流结束之前收到go ...


发件人:

context = zmq.Context()
push_socket = self.context.socket(zmq.PUSH)
push_socket.bind("tcp://127.0.0.1:5557")

pull_socket = self.context.socket(zmq.PULL)
pull_socket.bind("tcp://127.0.0.1:5558")

for index, data in range(100): 
    push_socket.send_json({"data": data, "id": index})
pub_socket.send_json({"command": "map"})

接收者:

# recieve work
consumer_receiver = context.socket(zmq.PULL)
consumer_receiver.connect("tcp://127.0.0.1:5557")

# receive commands
consumer_command = context.socket(zmq.SUB)
consumer_command.subscribe("")
consumer_command.connect("tcp://127.0.0.1:5559")

poller = zmq.Poller()
poller.register(consumer_receiver, zmq.POLLIN)
poller.register(consumer_command, zmq.POLLIN)

while True:
    events = dict(poller.poll(100))
    if consumer_command in events:
        received = consumer_command.recv_json()
        command = received["command"]
        print("received command : ", command)

    if consumer_receiver in events:
        received = consumer_receiver.recv_json()
        print("received data", received)

接收器输出:

received data {'data': ['Hi'], 'id': 0}
received command :  map   
received data {'data': ['hi'], 'id': 1}
...

我想拥有:

received data {'data': ['Hi'], 'id': 0}
received data {'data': ['hi'], 'id': 1}
...
received command :  map

我试图为PUSH套接字设置1的HWM,但它不起作用 .

在PUSH完成后,如何向所有工作人员发送同步消息?

2 回答

  • 1

    您正在寻求实施障碍 .

    ZeroMQ是关于Actor模型编程的,其中一个特征是在发送和接收消息时没有隐含的显式渲染 . 也就是说,无论另一端是否已阅读该消息,都将返回发送 .

    所以这意味着必须在ZeroMQ的Actor模型之上合成障碍(一种类型的渲染) .

    • 使用PUSH / PULL套接字对向工作人员获取数据 .

    • 使用单独的PUSH / PULL套接字对,以便 Worker 将"I have the data and am ready to proceed"消息发送回 生产环境 者 .

    • 让制作人等待这些"I can proceed"消息,

    • 当从每个工作人员收到一个时,在PUB / SUB套接字上向工作人员发送"go"消息 .

    Communicating Sequential Processes

    只是出于兴趣,您可能希望将Actor模型编程与Communicating Sequential Processes(在Rust,Erlang和(我认为?)中进行比较)Go正在卷土重来 . 在CSP发送/接收消息是一个集合 . 这有几个好处;

    • 发件人知道已收到邮件而不只是排队,

    • 如果有一个具有性能和延迟目标,它会强制一个人正确地解决架构和资源分配问题 . 您无法隐藏传输中的邮件 . 因此,如果一个人没有提供足够的工作人员, 生产环境 者显然无法卸载消息;延迟增加不能暂时隐藏缺陷 .

    • 如果你已经设法构建了一个可以死锁,活锁等的架构,那么它总会如此 . 虽然演员模型架构可能看起来几年都很好,直到有一天网络变得更加繁忙 .

    要使用CSP执行您想要的操作,您可以省略上面的步骤2和3 . 生产环境 者会知道每个 Worker 在发送给最后一个 Worker 的时候都收到了数据,并且“go”可以立即发送出去 .

    就个人而言,我真的希望ZeroMQ可以选择成为CSP,而不是演员 . 然后它会很棒,而不仅仅是非常巨大 . 真正好的是它无论是tcp,ipc,inproc等都无关紧要 . 它们的行为都相同(速度变化明显) .

    AFAIK Rust,Erlang和Go CSP Channels 不仅仅是这个过程 . ZMQ可以是内部和/或内部处理和/或计算机间,这使得它非常适合于开发可能超出一台计算机的系统 . 需要将线程卸载到另一台计算机?更改连接字符串,不需要更改其他代码 . 非常好 .

  • 0

    您正在为命令和数据使用单独的流 - 这将始终保证同步问题 . 在收件人方面,您将有两个流缓冲区 - 首先要处理大量数据,第二个只有命令和poll()将确保通知您已准备好读取它们 .

    我看到两种方法来处理这个问题:

    1)保持简单:只使用一个流 . 最后发送的所有内容都将在最后收到 . TCP保证 . 如果你正在使用json,你只需添加'type':'command'或'type':'data'来区分消息类型 .

    2)如果由于某种原因,您确实需要两个流(例如,您确实想要使用发布者/订阅者模式),接收方应在发送方发送其命令之前确认发送方接收最后一个数据批 . 如果 all worker需要在使用该命令启动 any 之前接收数据,那么此选项也是可供选择的选项 .

相关问题