我正在使用 ZMQ_PUB 看到一个奇怪的行为 .
我有一个 生产环境 者 .connect()
-s到不同的进程
那个 .bind()
在 ZMQ_SUB
套接字上 .
订阅者全部 .bind()
,发布者 .connect()
-s .
当一个 生产环境 者启动时,它会创建一个 ZMQ_PUB
套接字并将 .connect()
-s创建到不同的进程 . 然后它会立即开始定期发送消息 .
正如预期的那样,如果没有连接的用户,它将丢弃所有消息,直到用户启动 .
流程正常,然后当用户启动时,它从那一刻开始接收消息 .
Now, the problem is:
-
我断开用户连接(停止进程) .
-
此时没有活跃的订阅者,因为我停止了唯一的订阅者 . 制作人继续发送消息,应该删除,因为没有连接的用户了...
-
我重新启动原始订阅者,它绑定,发布者重新连接...并且订阅者接收在此期间生成的所有消息!
所以我看到的是,在订阅者关闭时, 生产环境 者将所有消息排入队列 . 一旦套接字重新连接,由于订户进程重新启动,它就会发送所有排队的消息 .
据我所知here,发布者应该在没有连接订阅者时删除所有已发送的消息:
ZeroMQ示例“发布者没有连接订阅者,那么它只会丢弃所有消息 . ”
Why is this happening?
顺便说一句,我使用C over linux进行这些测试 .
我尝试在绑定时在订阅者上设置不同的身份,但它不起作用 . Publisher仍会将消息排入队列,并在用户重新启动时将其全部传递 .
提前致谢,
路易斯
UPDATE:
重要更新!!!!!在发布此问题之前,我尝试了不同的解决方案一个是将ZMQ_LINGER设置为0,这不起作用 . 我添加了ZMQ:IMMEDIATE,它有效,但我发现ZMQ:IMMEDIATE不起作用 . 它还需要ZMQ_LINGER . Luis Rojas 3小时前
UPDATE: 根据请求,我添加了一些简单的测试用例来说明我的观点 . 一个是简单订阅者,它在命令行上运行并接收uri绑定的位置,例如:
$ ./sub tcp://127.0.0.1:50001
另一个是发布者,它接收要连接的uris列表,例如:
./pub tcp://127.0.0.1:50001 tcp://127.0.0.1:50002
订户最多接收5条消息,然后关闭套接字并退出 . 我们可以在wireshark上看到FIN / ACK的交换,两种方式,以及套接字如何移动到TIME_WAIT状态 . 然后,发布者开始发送SYN,尝试重新连接(探测ZMQ_PUB知道该连接已关闭)
我显然没有取消订阅套接字,只是关闭它 . 在我看来,如果套接字关闭,发布者应自动结束该连接的任何订阅 .
所以我看到的是:我启动订阅者(一个或多个),我启动发布者,它开始发送消息 . 订阅者收到5条消息并结束 . 在此期间,发布者继续发送消息,没有连接的订阅者 . 我重新启动订阅者,并立即收到几条消息,因为它们在发布者端排队 . 我认为这些排队的消息会破坏发布/订阅模式,其中消息应仅传递给已连接的订阅者 . 如果susbcriber关闭了连接,则应删除该订户的消息 . 更重要的是,当用户重新启动时,它可能决定订阅其他消息,但它仍然会接收那些在同一端口绑定的“先前版图”订阅的消息 .
我的建议是ZMQ_PUB(在连接模式下),当检测到套接字断开时,应清除该套接字上的所有订阅,直到它重新连接并且新订户决定重新订阅 .
我为语言错误道歉,但英语不是我的母语 .
酒吧代码:
#include <stdio.h>
#include <stdlib.h>
#include <libgen.h>
#include <unistd.h>
#include <string>
#include <zeromq/zmq.hpp>
int main( int argc, char *argv[] )
{
if ( argc < 2 )
{
fprintf( stderr, "Usage : %s <remoteUri1> [remoteUri2...]\n",
basename( argv[0] ) );
exit ( EXIT_FAILURE );
}
std::string pLocalUri( argv[1] );
zmq::context_t localContext( 1 );
zmq::socket_t *pSocket = new zmq::socket_t( localContext, ZMQ_PUB );
if ( NULL == pSocket )
{
fprintf( stderr, "Couldn't create socket. Aborting...\n" );
exit ( EXIT_FAILURE );
}
int i;
try
{
for ( i = 1; i < argc; i++ )
{
printf( "Connecting to [%s]\n", argv[i] );
{
pSocket->connect( argv[i] );
}
}
}
catch( ... )
{
fprintf( stderr, "Couldn't connect socket to %s. Aborting...\n", argv[i] );
exit ( EXIT_FAILURE );
}
printf( "Publisher Up and running... sending messages\n" );
fflush(NULL);
int msgCounter = 0;
do
{
try
{
char msgBuffer[1024];
sprintf( msgBuffer, "Message #%d", msgCounter++ );
zmq::message_t outTask( msgBuffer, strlen( msgBuffer ) + 1 );
printf("Sending message [%s]\n", msgBuffer );
pSocket->send ( outTask );
sleep( 1 );
}
catch( ... )
{
fprintf( stderr, "Some unknown error ocurred. Aborting...\n" );
exit ( EXIT_FAILURE );
}
}
while ( true );
exit ( EXIT_SUCCESS );
}
子代码
#include <stdio.h>
#include <stdlib.h>
#include <libgen.h>
#include <unistd.h>
#include <string>
#include <zeromq/zmq.hpp>
int main( int argc, char *argv[] )
{
if ( argc != 2 )
{
fprintf( stderr, "Usage : %s <localUri>\n", basename( argv[0] ) );
exit ( EXIT_FAILURE );
}
std::string pLocalUri( argv[1] );
zmq::context_t localContext( 1 );
zmq::socket_t *pSocket = new zmq::socket_t( localContext, ZMQ_SUB );
if ( NULL == pSocket )
{
fprintf( stderr, "Couldn't create socket. Aborting...\n" );
exit ( EXIT_FAILURE );
}
try
{
pSocket->setsockopt( ZMQ_SUBSCRIBE, "", 0 );
pSocket->bind( pLocalUri.c_str() );
}
catch( ... )
{
fprintf( stderr, "Couldn't bind socket. Aborting...\n" );
exit ( EXIT_FAILURE );
}
int msgCounter = 0;
printf( "Subscriber Up and running... waiting for messages\n" );
fflush( NULL );
do
{
try
{
zmq::message_t inTask;
pSocket->recv ( &inTask );
printf( "Message received : [%s]\n", inTask.data() );
fflush( NULL );
msgCounter++;
}
catch( ... )
{
fprintf( stderr, "Some unknown error ocurred. Aborting...\n" );
exit ( EXIT_FAILURE );
}
}
while ( msgCounter < 5 );
// pSocket->setsockopt( ZMQ_UNSUBSCRIBE, "", 0 ); NOT UNSUBSCRIBING
pSocket->close();
exit ( EXIT_SUCCESS );
}
2 回答
问:为什么会这样?
因为
SUB
实际上仍然连接(不足够"disconnected") .是的,可能会令人惊讶,但是 killing -118138-process,无论是在
.bind()
- 或.connect()
附近的套接字传输媒体, does not mean ,I / O泵的有限状态机已经"moved"断开连接 - 州 .鉴于此, PUB -side没有其他选择,只能考虑 SUB -仍然生活和连接(即使这个过程在
PUB
-旁边的视线范围内被无声地杀死了,并且对于这样的状态,有一个ZeroMQ协议定义的行为(一个PUB
-side duty)来收集a的所有临时消息(是的,SUB
-scriber,PUB
-side仍然认为是公平的(但可能只是在传输I / O级别或某些类型的远程CPU资源starvations或并发性上只有一些暂时的间歇性问题)介绍了瞬时间歇性{local | remote}阻塞状态等 .所以它缓冲......
如果您对 SUB -side代理的暗杀似乎更加优雅(在套接字资源实例上使用零化的
ZMQ_LINGER
足够的.close()
), PUB -side将识别"distributed"-系统系统范围的有限状态-Automaton转变为一个确实"DISCONNECT" -ed状态,并且"distributed-FSA"的PUB
-side会发生应有的行为改变,而不是为此存储任何消息"DISCONNECT" -ed SUB - 正如文档所述 ."Distributed-FSA"有一个很弱的方法来识别状态变化事件“超出它的本地主机控制范围. KILL - 一个远程进程,实现了"distributed-FSA"的一些显着部分是一个毁灭性的事件,而不是一个如何保持系统工作的方法这种外部风险的一个很好的选择可能是
听起来很复杂?
哦,是的,确实很复杂 . 这正是ZeroMQ为我们解决这个问题的原因,让我们自由地享受基于这些(已经解决的)低级复杂性的设计我们的应用程序架构 .
分布式系统FSA(子FSA-s的分层组合的系统范围FSA)
为了想象一下在引擎盖下默默发展的东西,想象一下只有一对简单的串联FSA-FSA - 正是这对 .Context() 实例试图在最简单的1:1 PUB/SUB 场景中为我们处理 - 其中使用 - case
KILL
-sSUB
-side上的所有子FSA-s,没有给出确认PUB
-side的意图 . 甚至TCP协议(同时生活在PUB
-side和SUB
-)也有几个从[ ESTABLISHED ]到[ CLOSED ]状态的状态转换 .分布式系统FSA-FSA-s上的快速X射线视图
(为清楚起见,仅描绘了TCP协议FSA)
PUB -side:
.socket( .. )
实例的行为FSA:SUB -side:
(礼貌nanomsg) .
绑定和连接虽然无动于衷,但在这里有特定的含义 .
Option 1:
将代码更改为这种方式,没有问题:
Publisher应该
bind
到一个地址订阅者应该
connect
到该地址'如果绑定订阅者然后中断它,那么发布者就无法知道订阅者是未绑定的,因此它将消息排队到绑定端口,当您在同一端口上重新启动时,排队的消息将被耗尽 .
Option 2:
但是如果你想按自己的方式去做,你需要做以下事情:
在订户代码中注册中断处理程序(
SIGINT
)关于用户的中断,请执行以下操作:
unsubscribe
这个话题close
子插座干净地退出订户进程,最好使用0返回代码
UPDATE:
关于身份点,不要假设设置标识将唯一地标识连接 . 如果留给zeromq,它将使用唯一的任意数字分配传入连接的标识 .
一般而言,身份不会用于回复客户 . 它们用于在使用
ROUTER
套接字的情况下响应客户端 . 'Coz ROUTERsockets是异步的,因为REQ / REP是同步的 . 在Async中,我们需要知道我们回复的对象 . 它可以是n / w地址或随机数或uuid等 .UPDATE:
我不认为这与zeromq有关,因为在整个指南中PUB / SUB的解释方式是Publisher通常是静态的(服务器并绑定到端口),订阅者一路走来(连接到的客户端)港口) .
还有另一种选择完全符合您的要求
ZMQ_IMMEDIATE
或ZMQ_DELAY_ATTACH_ON_CONNECT
在发布者上设置上面的套接字选项不会让消息在没有时排队与它的活动连接 .