首页 文章

ActiveMQ一些消费者如果在 生产环境 者之后到达,就不会接受任务

提问于
浏览
0

我刚开始使用ActiveMQ,我似乎有一个奇怪的问题 . (来源如下)

有两种情况

  • 消费者连接到代理,等待队列上的任务 . 制作人稍后到达,删除任务列表,他们被不同的消费者正确地接受并执行 . 这工作正常,我也模拟了它 .

  • Producer首先连接,删除任务列表 . 此时没有消费者连接 . 现在,当我们说3个消费者 - C1,C2和C3连接到代理(按此顺序)时,我看到只有C1接收并执行 生产环境 者丢弃的任务 . C2和C3保持闲置状态 . 为什么会这样?

关于第二种情况,我还注意到了另外一件事 - 如果 生产环境 者继续在队列中删除任务,C2和C3会接收任务但是如果 生产环境 者之前已经删除了任务(如上所述)那么C2和C3就不会做任何事情 .

制片人代码

package com.activemq.apps;

import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;

import org.apache.activemq.ActiveMQConnectionFactory;

import com.commons.helpers.Maths;

public class Publisher implements MessageListener {

    private static String _URL;
    private static String _TOPIC_PUBLISH;
    private static String _TOPIC_CONSUME;

    public Publisher (String URL, String TOPIC) {

        _URL = URL;
        _TOPIC_PUBLISH = TOPIC + "_REQUESTS";
        _TOPIC_CONSUME = TOPIC + "_RESPONSES";

    }

    public void initialize() {

        try
        {

            ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(_URL);
            Connection connection = connectionFactory.createConnection();
            connection.start();

            Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);

            Destination destinationProducer = session.createQueue(_TOPIC_PUBLISH);
            Destination destinationConsumers = session.createQueue(_TOPIC_CONSUME);

            MessageProducer producer = session.createProducer(destinationProducer);
            MessageConsumer consumer = session.createConsumer(destinationConsumers);

            consumer.setMessageListener(this);

            int count = 0;

            System.out.println("Sending requests");

            while (true)
            {
                int randomSleepTime = Maths.rand(1000, 5000);

                String messageToSend = count + "_" + randomSleepTime;

                TextMessage message = session.createTextMessage(messageToSend);

                producer.send(message);

                System.out.println("Job #" + count + " | " + (randomSleepTime/1000) + "s");

                if (count++%10 == 0)
                    Thread.sleep(10000);

            }

//          System.out.println("Waiting for responses");

//          connection.close();
        }

        catch (JMSException ex)
        {
            ex.printStackTrace();
        }

        catch (InterruptedException e) {
            e.printStackTrace();
        }

    }

    @Override
    public void onMessage(Message message) {

        if (message instanceof TextMessage)
        {
            TextMessage msg = (TextMessage) message;

            try {

                String response = msg.getText();
                String[] responseSplit = response.split("_");

                String clientId = responseSplit[1];
                String count = responseSplit[0];

                System.out.println("Got response from " + clientId + " Job #" + count);
            } 

            catch (JMSException e) {
                e.printStackTrace();
            }
        }

    }

}

消费者代码

package com.activemq.apps;

import java.util.UUID;

import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;

import org.apache.activemq.ActiveMQConnectionFactory;

public class Consumer implements MessageListener {

    private static String _URL;
    private static String _TOPIC_PUBLISH;
    private static String _TOPIC_CONSUME;
    private static String _CLIENTID;

    private MessageProducer producer;
    private Session session;

    public Consumer (String URL, String TOPIC) {

        _URL = URL;
        _TOPIC_PUBLISH = TOPIC + "_RESPONSES";
        _TOPIC_CONSUME = TOPIC + "_REQUESTS";

    }

    public void initialize() {

        try
        {

            _CLIENTID = UUID.randomUUID().toString();

            ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(_URL);
            Connection connection = connectionFactory.createConnection();
            connection.start();

            session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);

            Destination destinationProducer = session.createQueue(_TOPIC_PUBLISH);
            Destination destinationConsumers = session.createQueue(_TOPIC_CONSUME);

            producer = session.createProducer(destinationProducer);
            MessageConsumer consumer = session.createConsumer(destinationConsumers);

            consumer.setMessageListener(this);

            System.out.println("Client: " + _CLIENTID + "\nWaiting to pick up tasks");

//          connection.close();
        }

        catch (JMSException ex)
        {
            ex.printStackTrace();
        }

    }

    @Override
    public void onMessage(Message message) {

        if (message instanceof TextMessage)
        {
            TextMessage msg = (TextMessage) message;

            try 
            {

                String[] messageSplits = msg.getText().split("_");

                String count = messageSplits[0];
                String timeString = messageSplits[1];

                int sleepFor = Integer.parseInt(timeString);

                System.out.println("Job #" + count + " | Sleeping for " + (sleepFor/1000) + "s");

                Thread.sleep(sleepFor);

                TextMessage sendToProducer = session.createTextMessage(count + "_" + _CLIENTID);

                producer.send(sendToProducer);
            } 

            catch (JMSException e) {
                e.printStackTrace();
            } 

            catch (InterruptedException e) {
                e.printStackTrace();
            }
        }

    }

}

2 回答

  • 1

    你提到过

    现在让我们说3个消费者 - C1,C2和C3连接到经纪人(按此顺序)

    由于C1首先连接,因此在连接后立即开始获取队列中的所有消息 . 这是预料之中的 . 所以我在这里看不到任何问题 . C2,C3没有空闲,但C1在C2和C3可以之前得到了消息 .

    我不确定制作人发送了多少条消息 . 我假设消息的数量较少 . 要查看您的期望,请尝试从 生产环境 者发送许多消息,例如数千或数百万,然后启动消费者 . 大量消息是主观的,取决于内存,网络和其他资源 . 您可能会看到您的期望 .

  • 0

    我不认为这里有什么奇怪的 . 队列代表P2P模式,该模式应该只有一个消费者 . 在我们的案例中,我们有3个消费者,这是不被禁止的,但不保证消费者将收到消息的任何特定订单 .

相关问题