首页 文章

在zeromq中使用XSUB / XPUB代理的简单示例是什么

提问于
浏览
1

我跟进How to implement Pub-Sub Network with a Proxy by using XPUB and XSUB in ZeroMQ(C++)?

该问题请求使用XSUB和XPUB的C代理 . 给出的答案基本上是下面引用的代理main()函数 .

我将此代理扩展为包含发布者和订阅者的完整工作示例 . 问题是我的代码只适用于经销商/路由器选项(如下面的评论所示) . 使用下面的实际(未注释)XPUB / XSUB选项,订阅者不会收到消息 . 出了什么问题?是否有调整来获取消息?

Proxy 不使用XPUB / XSUB(评论中有经验的经销商/路由器)

#include <zmq.hpp>

int main(int argc, char* argv[]) {
    zmq::context_t ctx(1);
    zmq::socket_t frontend(ctx, /*ZMQ_ROUTER*/ ZMQ_XSUB);
    zmq::socket_t backend(ctx, /*ZMQ_DEALER*/ ZMQ_XPUB);
    frontend.bind("tcp://*:5570");
    backend.bind("tcp://*:5571");
    zmq::proxy(frontend, backend, nullptr);
    return 0;
}

Subscriber 无法使用ZMQ_SUB(评论中有工作的经销商/路由器选项)

#include <iostream>
#include <zmq.hpp>

std::string GetStringFromMessage(const zmq::message_t& msg) {
    char* tmp = new char[msg.size()+1];
    memcpy(tmp,msg.data(),msg.size());
    tmp[msg.size()] = '\0';
    std::string rval(tmp);
    delete[] tmp;
    return rval;
}

int main(int argc, char* argv[]) {
    zmq::context_t ctx(1);
    zmq::socket_t socket(ctx, /*ZMQ_DEALER*/ ZMQ_SUB);
    socket.connect("tcp://localhost:5571");
    while (true) {
        zmq::message_t identity;
        zmq::message_t message;
        socket.recv(&identity);
        socket.recv(&message);
        std::string identityStr(GetStringFromMessage(identity));
        std::string messageStr(GetStringFromMessage(message));
        std::cout << "Identity: " << identityStr << std::endl;
        std::cout << "Message: "  << messageStr  << std::endl;
    }
}

Publisher 不使用ZMQ_PUB(评论中有工作的经销商/路由器选项)

#include <unistd.h>
#include <sstream>
#include <zmq.hpp>

int main (int argc, char* argv[])
{
    // Context
    zmq::context_t ctx(1);

    // Create a socket and set its identity attribute
    zmq::socket_t socket(ctx, /*ZMQ_DEALER*/ ZMQ_PUB);
    char identity[10] = {};
    sprintf(identity, "%d", getpid());
    socket.setsockopt(ZMQ_IDENTITY, identity, strlen(identity));
    socket.connect("tcp://localhost:5570");

    // Send some messages
    unsigned int counter = 0;
    while (true) {
        std::ostringstream ss;
        ss << "Message #" << counter << " from PID " << getpid();
        socket.send(ss.str().c_str(),ss.str().length());
        counter++;
        sleep(1);
    }
    return 0;
}

1 回答

  • 0

    在订阅者代码中,您尚未订阅接收来自发布者的消息 . 尝试添加该行:

    socket.setsockopt(ZMQ_SUBSCRIBE, "", 0);
    

    行之前/之后:

    socket.connect("tcp://localhost:5571");
    

    在您的订阅者代码中

相关问题