首页 文章

从ZMQ PUB套接字获取订户过滤器

提问于
浏览
13

我在常见问题解答中注意到,在Monitoring section中,无法获得连接对等列表或在对等连接/断开连接时收到通知 .

这是否意味着它也不可能从上游反馈中知道PUB / XPUB套接字知道它应该发布哪些主题?或者有没有办法访问这些数据?

我知道ZMQ> = 3.0“supports PUB/SUB filtering at the publisher”,但我真正想要的是使用ZMQ有关订阅哪些主题的知识来过滤我的应用程序代码 .

我的用例是我想发布有关机器人状态的信息 . 一些主题涉及主要的硬件操作,例如切换ADC上的选择线以读取IR值 .

我在机器人上运行了一个发布者线程,当有实际订阅者时,它应该只执行“读取”以获取IR数据 . 但是,因为我只能将一个字符串提供给我的pub_sock.send,所以我总是要做昂贵的操作,即使ZMQ即将在没有订阅者时丢弃该消息 .

我有一个实现,它使用反向通道REQ / REP套接字发送主题信息,我的应用程序可以在其发布循环中检查,从而只收集需要收集的数据 . 这似乎非常不优雅,因为ZMQ必须已经拥有我需要的数据,这可以通过它对发布者的过滤来证明 .

我注意到在这个mailing list message中,OP似乎能够看到订阅消息被发送到XPUB套接字 .

但是,没有提到他们是如何做到这一点的,而且我没有在文档中看到任何这样的能力(仍在寻找) . 也许他们只是使用Wireshark(查看XPUB套接字的上游订阅消息) .

2 回答

  • 6

    使用 zmq.XPUB 套接字类型,有一种方法可以检测新用户和离开用户 . 以下代码示例显示了如何:

    # Publisher side
    import zmq
    
    ctx = zmq.Context.instance()
    xpub_socket = ctx.socket(zmq.XPUB)
    xpub_socket.bind("tcp://*:%d" % port_nr)
    poller = zmq.Poller()
    poller.register(xpub_socket)
    
    events = dict(poller.poll(1000))
    if xpub_socket in events:
        msg = xpub_socket.recv()
        if msg[0] == b'\x01':
            topic = msg[1:]
            print "Topic '%s': new subscriber" % topic
        elif msg[0] == b'\x00':
            topic = msg[1:]
            print "Topic '%s': subscriber left" % topic
    

    请注意, zmq.XSUB 套接字类型的订阅方式与"normal" zmq.SUB 的方式不同 . 代码示例:

    # Subscriber side
    import zmq
    ctx = zmq.Context.instance()
    
    # Subscribing of zmq.SUB socket
    sub_socket = ctx.socket(zmq.SUB)
    sub_socket.setsockopt(zmq.SUBSCRIBE, "sometopic") # OK
    sub_socket.connect("tcp://localhost:%d" % port_nr)
    
    # Subscribing zmq.XSUB socket
    xsub_socket = ctx.socket(zmq.XSUB)
    xsub_socket.connect("tcp://localhost:%d" % port_nr)
    # xsub_socket.setsockopt(zmq.SUBSCRIBE, "sometopic") # NOK, raises zmq.error.ZMQError: Invalid argument
    xsub_socket.send_multipart([b'\x01', b'sometopic']) # OK, triggers the subscribe event on the publisher
    

    我还想指出 zmq.XPUB_VERBOSE 套接字选项 . 如果设置,则在套接字上接收所有订阅事件 . 如果未设置,则会过滤重复的订阅 . 另请参阅以下帖子:ZMQ: No subscription message on XPUB socket for multiple subscribers (Last Value Caching pattern)

  • 1

    至少对于XPUB / XSUB套接字案例,您可以通过手动转发和处理软件包来保存订阅状态:

    context = zmq.Context()
    
    xsub_socket = context.socket(zmq.XSUB)
    xsub_socket.bind('tcp://*:10000')
    xpub_socket = context.socket(zmq.XPUB)
    xpub_socket.bind('tcp://*:10001')
    
    poller = zmq.Poller()
    poller.register(xpub_socket, zmq.POLLIN)
    poller.register(xsub_socket, zmq.POLLIN)
    
    while True:
        try:
            events = dict(poller.poll(1000))
        except KeyboardInterrupt:
            break
    
        if xpub_socket in events:
            message = xpub_socket.recv_multipart()
    
            # HERE goes some subscription handle code which inspects
            # message
    
            xsub_socket.send_multipart(message)
        if xsub_socket in events:
            message = xsub_socket.recv_multipart()
            xpub_socket.send_multipart(message)
    

    (这是Python代码,但我猜C / C看起来非常相似)

    我目前正在研究这个主题,我会尽快添加更多信息 .

相关问题