首页 文章

如何将队列连接到ZeroMQ PUB / SUB

提问于
浏览
0

考虑以下:

  • 一组3个逻辑服务: S1S2S3

  • 每个服务的两个实例正在运行,因此我们有以下过程: S1P12721212,_ 27213S2P2S3P1S3P2

  • a ZeroMQ 代理在单个进程中运行,并且可由所有服务进程访问

逻辑服务,比方说 S1 ,发布了逻辑服务 S2S3 感兴趣的消息 M1 . 每个逻辑服务只有一个进程必须接收 M1 ,所以让我们说 S2P1S3P2 .

我试过以下,但没有成功:

  • 代理线程1正在运行 XSUB/XPUB 代理

  • 代理线程2正在运行 ROUTER/DEALER 代理, ROUTER 连接到 XPUB 套接字并订阅了所有内容(对于逻辑 S1

  • 代理线程3正在运行 ROUTER/DEALER 代理, ROUTER 连接到 XPUB 套接字并订阅了所有内容(对于逻辑 S2

  • 代理线程4运行 ROUTER/DEALER 代理, ROUTER 连接到XPUB套接字并订阅了所有内容(逻辑 S3

  • 每个逻辑服务进程正在运行连接到代理 DEALER 套接字的 REP 套接字线程

我认为 XSUB/XPUB 代理会给我发布/订阅语义,并且 ROUTER/DEALER 代理会在 REP 代理之间为 REP 代理发送的消息引入竞争 .

How can I combine ZeroMQ sockets to accomplish this?

Update1

我知道“没有成功”没有帮助,我尝试了不同的配置并得到了不同的错误 . 我尝试的最新配置如下:

(XSUB proxy=> XPUB) => (SUB copyLoop=> REQ) => (ROUTER proxy=> DEALER) => REP

copyLoop是这样的:

public void start() {
    context = ZMQ.context(1);

    subSocket = context.socket(ZMQ.SUB);
    subSocket.connect(subSocketUrl);
    subSocket.subscribe("".getBytes());

    reqSocket = context.socket(ZMQ.REQ);
    reqSocket.connect(reqSocketUrl);

    while (!Thread.currentThread().isInterrupted()) {
        final Message msg = receiveNextMessage();
        resendMessage(msg);
    }
}

private Message receiveNextMessage() {
    final String header = subSocket.recvStr();
    final String entity = subSocket.recvStr();

    return new Message(header, entity);
}

private void resendMessage(Message msg) {
    reqSocket.sendMore(msg.getKey());
    reqSocket.send(msg.getData(), 0);
}

我得到的例外情况如下:

java.lang.IllegalStateException: Cannot send another request
    at zmq.Req.xsend(Req.java:51) ~[jeromq-0.3.4.jar:na]
    at zmq.SocketBase.send(SocketBase.java:613) ~[jeromq-0.3.4.jar:na]
    at org.zeromq.ZMQ$Socket.send(ZMQ.java:1206) ~[jeromq-0.3.4.jar:na]
    at org.zeromq.ZMQ$Socket.sendMore(ZMQ.java:1189) ~[jeromq-0.3.4.jar:na]
    at com.xyz.messaging.zeromq.SubReqProxyConnector.resendMessage(SubReqProxyConnector.java:47) ~[classes/:na]
    at com.xyz.messaging.zeromq.SubReqProxyConnector.start(SubReqProxyConnector.java:35) ~[classes/:na]

我正在运行JeroMQ 0.3.4,Oracle Java 8 JVM和Windows 7 .

2 回答

  • 0

    显然,我混淆了一些元素:

    • 套接字具有相同的API,无论您将其用作客户端套接字( Socket.connect )还是服务器端套接字( Socket.bind

    • 套接字具有相同的API,无论类型如何(例如, Socket.subscribe 不应在 PUSH 套接字上调用)

    • 某些套接字类型需要发送/接收响应循环(例如 REQ/REP

    • 沟通模式的一些细微差别( PUSH/PULL vs ROUTER/DEALER

    • 调试ZeroMQ设置的难度(不可能性?)

    所以非常感谢杰森的非常详细的答案(和令人敬畏的图表!),这使我指向正确的方向 .

    我最终得到了以下设计:

    • 代理线程1正在 bind(localhost:6000)bind(localhost:6001) 上运行扇出 XSUB/XPUB 代理

    • 代理线程2正在 connect(localhost:6001)bind(localhost:6002) 上运行排队 SUB/PUSH 代理;代理线程3和4使用具有不同绑定端口号的类似设计

    • 消息生成器使用 connect(localhost:6000) 上的 PUB 套接字连接到代理

    • 消息使用者使用 connect(localhost:6002) 上的 PULL 套接字连接到代理排队代理

    除了这个特定于服务的排队机制之外,我还能够简单地添加一个类似于服务的扇出机制:

    • 代理线程在 connect(localhost:6001)bind(localhost:6003) 上运行 SUB/PUB 代理

    • 消息生成器仍使用 connect(localhost:6000) 上的 PUB 套接字连接到代理

    • 消息使用者使用 connect(localhost:6003) 上的 SUB 套接字连接到代理扇出代理

    这是一个有趣的旅程 .

  • 2

    您似乎在使用 ROUTER 连接时增加了一些复杂性 - 您应该可以直接与发布者 Build 连接 .

    您当前遇到的错误是 REQ 套接字具有严格的消息排序模式 - 您不能连续两次 send() ,您必须发送/接收/发送/接收/等(同样, REP 套接字必须接收/发送/接收/发送/等) . 从它的外观来看,你只是在 REQ 套接字上进行发送/发送/发送/等,而没有收到响应 . 如果您不关心对等方的响应,那么您必须接收并丢弃它,或者使用 DEALER (或 ROUTER ,但 DEALER 在您当前的图表中更有意义) .

    我已经创建了一个如何在下面完成此体系结构的图表 - 使用您的基本流程结构 .

    Broker T1         Broker T2                Broker T3                Broker T4
    (PUB*)------>(*SUB)[--](DEALER*)   -->(*SUB)[--](DEALER*)   -->(*SUB)[--](DEALER*)
           |_____________________||____|                  ||    |                  ||
           |_____________________||_______________________||____|                  ||
                                 ||                       ||                       ||
         ========================||     ==================||            ===========||=
       ||             ||              ||              ||              ||              ||
       ||             ||              ||              ||              ||              ||
       ||             ||              ||              ||              ||              ||
    (REP*)         (REP*)          (REP*)          (REP*)          (REP*)          (REP*)
     S1P1           S1P2            S2P1            S2P2            S3P1            S3P2
    

    所以,主要区别在于我放弃了你的 (SUB copyLoop=> REQ) 步骤 . 你选择 XPUB/XSUB vs PUB/SUB 取决于你,但除非你现在想要利用 XPUB/XSUB 的额外功能,否则我会更容易开始 .

    显然,这个图表没有处理信息如何进入你的代理,你当前显示的是 XSUB 套接字 - 到目前为止提供的's out of scope for the information you' ve,大概是你're able to receive information into your broker successfully already so I won' t处理它 .

    我假设您的代理线程专用于每个服务,是否可以明智地选择是否将消息发送到他们的服务?如果是这样,那么你选择让他们订阅一切应该可以正常工作,否则可能需要更智能的订阅设置 .

    如果您在服务进程上使用 REP 套接字,则服务进程必须接收该消息并异步处理它,从不将有关该消息的任何详细信息传递给代理 . 然后它必须通过确认(如"RECEIVED")响应每条消息,以便它遵循 REP 套接字的严格接收/发送/接收/发送模式 .

    如果您需要有关服务如何处理发送回代理的消息的任何其他类型的通信, REP 不再是服务进程的适当套接字类型,并且 DEALER 可能不再是代理的正确套接字类型 . 如果您需要某种形式的负载 balancer 以便发送到下一个打开的服务进程,则需要使用 ROUTER/REQ 并让每个服务指示其可用性并让代理保留该消息,直到下一个服务进程显示其可用通过发回结果 . 如果您需要其他类型的消息处理,则必须指明这是什么,以便可以提出合适的体系结构 .

相关问题