考虑以下:
-
一组3个逻辑服务:
S1
,S2
和S3
-
每个服务的两个实例正在运行,因此我们有以下过程:
S1P1
,2721212,_ 27213,S2P2
,S3P1
,S3P2
-
a
ZeroMQ
代理在单个进程中运行,并且可由所有服务进程访问
逻辑服务,比方说 S1
,发布了逻辑服务 S2
和 S3
感兴趣的消息 M1
. 每个逻辑服务只有一个进程必须接收 M1
,所以让我们说 S2P1
和 S3P2
.
我试过以下,但没有成功:
-
代理线程1正在运行 XSUB/XPUB 代理
-
代理线程2正在运行 ROUTER/DEALER 代理,
ROUTER
连接到XPUB
套接字并订阅了所有内容(对于逻辑S1
) -
代理线程3正在运行
ROUTER/DEALER
代理,ROUTER
连接到XPUB
套接字并订阅了所有内容(对于逻辑S2
) -
代理线程4运行
ROUTER/DEALER
代理,ROUTER
连接到XPUB套接字并订阅了所有内容(逻辑S3
) -
每个逻辑服务进程正在运行连接到代理
DEALER
套接字的REP
套接字线程
我认为 XSUB/XPUB
代理会给我发布/订阅语义,并且 ROUTER/DEALER
代理会在 REP
代理之间为 REP
代理发送的消息引入竞争 .
How can I combine ZeroMQ sockets to accomplish this?
Update1
我知道“没有成功”没有帮助,我尝试了不同的配置并得到了不同的错误 . 我尝试的最新配置如下:
(XSUB proxy=> XPUB) => (SUB copyLoop=> REQ) => (ROUTER proxy=> DEALER) => REP
copyLoop是这样的:
public void start() {
context = ZMQ.context(1);
subSocket = context.socket(ZMQ.SUB);
subSocket.connect(subSocketUrl);
subSocket.subscribe("".getBytes());
reqSocket = context.socket(ZMQ.REQ);
reqSocket.connect(reqSocketUrl);
while (!Thread.currentThread().isInterrupted()) {
final Message msg = receiveNextMessage();
resendMessage(msg);
}
}
private Message receiveNextMessage() {
final String header = subSocket.recvStr();
final String entity = subSocket.recvStr();
return new Message(header, entity);
}
private void resendMessage(Message msg) {
reqSocket.sendMore(msg.getKey());
reqSocket.send(msg.getData(), 0);
}
我得到的例外情况如下:
java.lang.IllegalStateException: Cannot send another request
at zmq.Req.xsend(Req.java:51) ~[jeromq-0.3.4.jar:na]
at zmq.SocketBase.send(SocketBase.java:613) ~[jeromq-0.3.4.jar:na]
at org.zeromq.ZMQ$Socket.send(ZMQ.java:1206) ~[jeromq-0.3.4.jar:na]
at org.zeromq.ZMQ$Socket.sendMore(ZMQ.java:1189) ~[jeromq-0.3.4.jar:na]
at com.xyz.messaging.zeromq.SubReqProxyConnector.resendMessage(SubReqProxyConnector.java:47) ~[classes/:na]
at com.xyz.messaging.zeromq.SubReqProxyConnector.start(SubReqProxyConnector.java:35) ~[classes/:na]
我正在运行JeroMQ 0.3.4,Oracle Java 8 JVM和Windows 7 .
2 回答
显然,我混淆了一些元素:
套接字具有相同的API,无论您将其用作客户端套接字(
Socket.connect
)还是服务器端套接字(Socket.bind
)套接字具有相同的API,无论类型如何(例如,
Socket.subscribe
不应在PUSH
套接字上调用)某些套接字类型需要发送/接收响应循环(例如
REQ/REP
)沟通模式的一些细微差别(
PUSH/PULL
vsROUTER/DEALER
)调试ZeroMQ设置的难度(不可能性?)
所以非常感谢杰森的非常详细的答案(和令人敬畏的图表!),这使我指向正确的方向 .
我最终得到了以下设计:
代理线程1正在
bind(localhost:6000)
和bind(localhost:6001)
上运行扇出XSUB/XPUB
代理代理线程2正在
connect(localhost:6001)
和bind(localhost:6002)
上运行排队SUB/PUSH
代理;代理线程3和4使用具有不同绑定端口号的类似设计消息生成器使用
connect(localhost:6000)
上的PUB
套接字连接到代理消息使用者使用
connect(localhost:6002)
上的PULL
套接字连接到代理排队代理除了这个特定于服务的排队机制之外,我还能够简单地添加一个类似于服务的扇出机制:
代理线程在
connect(localhost:6001)
和bind(localhost:6003)
上运行SUB/PUB
代理消息生成器仍使用
connect(localhost:6000)
上的PUB
套接字连接到代理消息使用者使用
connect(localhost:6003)
上的SUB
套接字连接到代理扇出代理这是一个有趣的旅程 .
您似乎在使用
ROUTER
连接时增加了一些复杂性 - 您应该可以直接与发布者 Build 连接 .您当前遇到的错误是
REQ
套接字具有严格的消息排序模式 - 您不能连续两次send()
,您必须发送/接收/发送/接收/等(同样,REP
套接字必须接收/发送/接收/发送/等) . 从它的外观来看,你只是在REQ
套接字上进行发送/发送/发送/等,而没有收到响应 . 如果您不关心对等方的响应,那么您必须接收并丢弃它,或者使用DEALER
(或ROUTER
,但DEALER
在您当前的图表中更有意义) .我已经创建了一个如何在下面完成此体系结构的图表 - 使用您的基本流程结构 .
所以,主要区别在于我放弃了你的
(SUB copyLoop=> REQ)
步骤 . 你选择XPUB/XSUB
vsPUB/SUB
取决于你,但除非你现在想要利用XPUB/XSUB
的额外功能,否则我会更容易开始 .显然,这个图表没有处理信息如何进入你的代理,你当前显示的是
XSUB
套接字 - 到目前为止提供的's out of scope for the information you' ve,大概是你're able to receive information into your broker successfully already so I won' t处理它 .我假设您的代理线程专用于每个服务,是否可以明智地选择是否将消息发送到他们的服务?如果是这样,那么你选择让他们订阅一切应该可以正常工作,否则可能需要更智能的订阅设置 .
如果您在服务进程上使用
REP
套接字,则服务进程必须接收该消息并异步处理它,从不将有关该消息的任何详细信息传递给代理 . 然后它必须通过确认(如"RECEIVED")响应每条消息,以便它遵循REP
套接字的严格接收/发送/接收/发送模式 .如果您需要有关服务如何处理发送回代理的消息的任何其他类型的通信,
REP
不再是服务进程的适当套接字类型,并且DEALER
可能不再是代理的正确套接字类型 . 如果您需要某种形式的负载 balancer 以便发送到下一个打开的服务进程,则需要使用ROUTER/REQ
并让每个服务指示其可用性并让代理保留该消息,直到下一个服务进程显示其可用通过发回结果 . 如果您需要其他类型的消息处理,则必须指明这是什么,以便可以提出合适的体系结构 .