我正在编写一个应该通过ZeroMQ发送C结构的程序 . 因此我使用Google的ProtocolBuffers来序列化结构 .
我现在遇到的问题是我的订户方没有收到任何东西 . 发布者打印出“消息已成功发送”,因此我认为错误发生在订阅者端 .
出版商:
int main (void)
{
Message protomsg = MESSAGE__INIT;
void *buf;
unsigned len;
void *context = zmq_ctx_new();
void *subscriber = zmq_socket(context, ZMQ_PUB);
zmq_bind(subscriber, "ipc://my.sock");
//Initialising protomsg (not so important)
//sending message
len = message__get_packed_size(&protomsg);
buf = malloc(len);
message__pack(&protomsg, buf);
zmq_msg_t output;
zmq_msg_init_size(&output, len);
zmq_msg_init_data(&output, buf, len, NULL, NULL);
if(zmq_msg_send(&output, subscriber, 0) == -1)
perror("Error sending message \n");
else
printf("Message successfully sent \n");
zmq_msg_close(&output);
free(buf);
zmq_close (subscriber);
zmq_ctx_destroy (context);
return 0;
}
订户:
int main (void){
Message *protomsg;
void *context = zmq_ctx_new ();
void *publisher = zmq_socket (context, ZMQ_SUB);
zmq_connect(publisher, "ipc://my.sock");
zmq_setsockopt(publisher, ZMQ_SUBSCRIBE, "", 0);
// Read packed message from ZMQ.
zmq_msg_t msg;
zmq_msg_init(&msg);
if(zmq_msg_recv(&msg, publisher, 0) == -1)
perror("Error receiving message \n");
else
printf("Message received");
memcpy((void *)protomsg, zmq_msg_data(&msg), zmq_msg_size(&msg));
// Unpack the message using protobuf-c.
protomsg = message__unpack(NULL, zmq_msg_size(&msg), (void *)&data);
if (protomsg == NULL)
{
fprintf(stderr, "error unpacking incoming message\n");
exit(1);
}
printf("Address: %u, Type: %u, Information[0]: %u, Information[1]: %u \n", protomsg->address-48, protomsg->frametype, protomsg->information[0], protomsg->information[1]);
zmq_msg_close (&msg);
// Free the unpacked message
message__free_unpacked(protomsg, NULL);
//close context,socket..
}
1 回答
不知道是否还有人关心这一点,但是这里......我同意@ Steve-o这是一个时间问题,虽然我认为问题是你过早关闭了发布者套接字 .
您的发布者代码发布消息然后立即关闭套接字并终止上下文 . 因此,消息在发布者中存在毫秒,然后永远消失 .
如果您首先运行发布者,那就是它,退出并且消息消失了 . 当您启动订户时,它会尝试连接到不再存在的IPC套接字 . ZeroMQ允许这样,并且订户将阻塞,直到有一个IPC套接字连接到 .
我没有查看ZeroMQ IPC源代码,但我怀疑,在用户定期下,订阅者会定期尝试连接到发布者套接字 . 现在,如果您再次运行发布者,它可能会有效,但您有严重的竞争条件 . 如果您在ZeroMQ工作者尝试重试的确切时刻启动发布者,则可能会发生连接,甚至可能在发布者销毁所有内容之前收到您的消息 .
我很确定这个问题与结构和protobuf无关 . 从ZeroMQ的角度来看,您只是发送字节 . 没有区别 . 如果ZeroMQ字符串的测试用例与ZeroMQ结构的测试用例完全相同 - 那么代码更改可能会添加或删除几纳秒,这样就能以错误的方式打破竞争条件 .
具体建议:
将发布者中的套接字重命名为"publisher"而不是订阅者(复制/粘贴错误)
在zmq_close(发布者)之前添加睡眠30秒;
希望这将解决您的测试代码的问题
如果这不能解决问题,请考虑切换到tcp传输并使用wireshark来诊断实际情况 .