首页 文章

从java客户端的多个RabbitMQ交换中读取没有轮询

提问于
浏览
1

请解释如何配置Java客户端以便在没有轮询的情况下从两个不同的RabbitMQ交换中读取 . 我希望客户端在消息到达时唤醒,然后再次阻止 .

在我的小系统集成问题中,一个RabbitMQ交换使用各种路由密钥(我知道如何使用通配符来捕获它们)来携带工作消息,而另一个交换机携带控制消息(例如,“停止”) . 所以我的客户必须听取来自两个地方的消息 . 这是一个相对低容量的系统问题,我不是在询问负载分配或公平性等 .

当然,我可以运行一个线程,轮询每个交易所,睡觉,发送,永远 . 但我想避免投票 .

我以某种方式提醒Unix select()系统调用,当数据在传递给它的任何文件描述符上准备就绪时唤醒它 . RabbitMQ有类似的东西吗?

我目前的解决方案是一个适配器,它在每个输入交换上旋转一个线程来阻塞;收到后,每个线程都写入java.util.concurrent集合;我使用另一个线程阻止该集合,并在消息到达最终消费者时传递消息 . 它工作正常,但如果我能把这种复杂性砍掉,那就太好了 .

这些SO帖子围绕这个问题跳舞,如果我在这些帖子中忽略了它,请随意在解决方案中嗤之以鼻:

对于java:RabbitMQ by Example: Multiple Threads, Channels and Queues

对于C#:Reading from multiple queues, RabbitMQ

提前致谢 .

1 回答

  • 1

    谢谢,抢劫狼,评论 . 是的,我已阅读教程,我知道每个消费者需要一个帖子 .

    事实证明,使用单个线程从多个交换中读取是直截了当的,并且根本没有轮询:获取新队列,并将其绑定到所有相关交换 . 适用于主题和扇动 . 用SSCE测试过,见下文 .

    我感叹RabbitMQ javadoc缺乏细节,Channel#queueBind(String,String,String)方法中的一些选择词会有很多帮助 .

    HTH

    package rabbitExample;
    
    import com.rabbitmq.client.Channel;
    import com.rabbitmq.client.Connection;
    import com.rabbitmq.client.ConnectionFactory;
    import com.rabbitmq.client.QueueingConsumer;
    
    /**
     * Demonstrates reading messages from two exchanges via a single queue monitored
     * by a single thread.
     * 
     */
    public class MultiExchangeReadTest implements Runnable {
    
    private final String exch1 = "my.topic.exchange";
    private final String exch2 = "my.fanout.exchange";
    private final Channel channel;
    private final QueueingConsumer consumer;
    
    public MultiExchangeReadTest(final String mqHost) throws Exception {
    
        // Connect to server
        System.out.println("Connecting to host " + mqHost);
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost(mqHost);
        Connection connection = factory.newConnection();
        channel = connection.createChannel();
    
        // Declare exchanges; use defaults for durable etc.
        channel.exchangeDeclare(exch1, "topic");
        channel.exchangeDeclare(exch2, "fanout");
    
        // Get a new, unique queue name
        final String queue = channel.queueDeclare().getQueue();
    
        // Bind the queue to the exchanges; topic gets non-empty routing key
        channel.queueBind(queue, exch1, "my.key");
        channel.queueBind(queue, exch2, "");
    
        // Configure the channel to fetch one message at a time, auto-ACK
        channel.basicQos(1);
        consumer = new QueueingConsumer(channel);
        channel.basicConsume(queue, true, consumer);
    }
    
    public void run() {
        // Reads messages until interrupted
        try {
            while (true) {
                // Wait for a message
                System.out.println("Awaiting message");
                QueueingConsumer.Delivery delivery = consumer.nextDelivery();
                // Show contents using default encoding scheme
                String body = new String(delivery.getBody());
                System.out.println("Message from exch "
                        + delivery.getEnvelope().getExchange() + ", key '"
                        + delivery.getEnvelope().getRoutingKey() + "':\n"
                        + body);
            } // while
        } catch (Exception ex) {
            ex.printStackTrace();
        }
    }
    
    public static void main(String[] args) throws Exception {
        if (args.length != 1) {
            System.err
                    .println("Usaage: MultiExchangeReadTest.main mq-host-name");
        } else {
            MultiExchangeReadTest multiReader = new MultiExchangeReadTest(
                    args[0]);
            multiReader.run();
        }
    }
    }
    

相关问题