首页 文章

如何在ZeroMQ(jzmq)中使用XPUB和XSUB与代理实现Pub-Sub网络

提问于
浏览
4

我正在尝试使用XPUB和XSUB实现,如下图所示 . 我已经完成了他们提供的示例,但是无法在Java中获得XPUB和XSUB . Here他们在C中给出了一个例子,因为我是ZeroMQ的新手 .

Pub-Sub Network with a Proxy in ZeroMQ

我试图在android中使用jni wrapped version . 请帮我看一个例子,如何使用java在ZeroMQ中实现这个 Pub-Sub Network with a Proxy .

目前我在提及http://zguide.zeromq.org/page:all

我试着按如下方式移植它 . Subscriber.java

public class Subscriber extends Thread implements Runnable { 

 private static final String TAG = "Subscriber";
private Context ctx;

public Subscriber(ZMQ.Context z_context) {
    this.ctx = z_context;
}

@Override
public void run() {

    super.run();

    ZMQ.Socket mulServiceSubscriber = ctx.socket(ZMQ.SUB);
    mulServiceSubscriber.connect("tcp://localhost:6001");
    mulServiceSubscriber.subscribe("A".getBytes());
    mulServiceSubscriber.subscribe("B".getBytes()); 


        while (true) {
            Log.d(TAG, "Subscriber loop started..");
            String content = new String(mulServiceSubscriber.recv(0));
            Log.d(TAG, "Subscriber Received : "+content);
        }
}
 

 }

Publisher.java

public class Publisher extends Thread implements Runnable { 

 private static final String TAG = "Publisher";
private Context ctx;

public Publisher(ZMQ.Context z_context) {
    this.ctx = z_context;
}

@Override
public void run() {

    super.run();

    ZMQ.Socket publisher = ctx.socket(ZMQ.PUB);
    publisher.connect("tcp://localhost:6000");

    while (true) {
        Log.d(TAG, "Publisher loop started..");
        publisher.send(("A Hello " + new Random(100).nextInt()).getBytes() , 0);
        try {
            Thread.sleep(1000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}
 

 }

XListener.java (现在是一个简单的货代)

public class XListener extends Thread implements Runnable { 

 private static final String TAG = null;
private Socket publisherX;
private Context ctx;
private Socket subscriberX;

public XListener(ZMQ.Context ctx, ZMQ.Socket subscriberX,
        ZMQ.Socket publisherX) {
    this.ctx = ctx;
    this.subscriberX = subscriberX;
    this.publisherX = publisherX;

}

@Override
public void run() {
    super.run();
    while (true) {          
        Log.d(TAG, "XListener loop started..");

        String msg = new String(subscriberX.recvStr());
        Log.v(TAG, "Listener Received: " +"MSG :"+msg);
        publisherX.send(msg.getBytes(), 0);         
    }
}
 

 }

在申请 main()

private void main() {
        ZMQ.Context ctx = ZMQ.context(1);   

    ZMQ.Socket subscriberX = ctx.socket(ZMQ.XSUB);
    subscriberX.bind("tcp://*:6000");
    try {
        Thread.sleep(500);
    } catch (InterruptedException e) {
        // TODO Auto-generated catch block
        e.printStackTrace();
    }
    ZMQ.Socket publisherX = ctx.socket(ZMQ.XPUB);
    publisherX.bind("tcp://*:6001");
    try {
        Thread.sleep(500);
    } catch (InterruptedException e) {
        // TODO Auto-generated catch block
        e.printStackTrace();
    }

    new XListener(ctx, subscriberX, publisherX).start();
    try {
        Thread.sleep(500);
    } catch (InterruptedException e) {
        // TODO Auto-generated catch block
        e.printStackTrace();
    }

   new XSender(ctx, subscriberX, publisherX).start();
    try {
        Thread.sleep(500);
    } catch (InterruptedException e) {
        // TODO Auto-generated catch block
        e.printStackTrace();
    }
    new Subscriber(ctx).start();
    new Publisher(ctx).start();
}

使用代码我无法收听 XSUB . 在移植 espresso.c 时,我无法在ZMQ的java绑定中找到任何包装器 . 如何实现一个简单的代理或我错过了什么?

1 回答

  • 1

    哇我正在回答我自己的问题 . 我错过了将 publisherX 的转发器添加到 subscriberX . 这是缺少的代码 . 现在XSUB和XPUB能够发送和获取数据 .

    public class XSender extends Thread implements Runnable { 
    
     private static final String TAG = null;
    private Socket publisherX;
    private Context ctx;
    private Socket subscriberX;
    
    public XSender(ZMQ.Context ctx, ZMQ.Socket subscriberX,
            ZMQ.Socket publisherX) {
        this.ctx = ctx;
        this.subscriberX = subscriberX;
        this.publisherX = publisherX;
    
    }
    
    @Override
    public void run() {
        super.run();
        while (true) {
            // Read envelope with address
            Log.d(TAG, "XListener loop started..");
    
            String msg = new String(subscriberX.recv(0));
            Log.v(TAG, "Listener Received: " +"MSG :"+msg);
            publisherX.send(msg.getBytes(), 0);         
    
        }
    
    
    }
     
    
     }
    

相关问题