首页 文章

多个DefaultMessageListenerContainer提供单个TaskExecutor / ThreadPool(公平)

提问于
浏览
4

我正在开发一个Spring Boot应用程序,我打算在其中使用来自多个队列的JMS消息 - 但是我想控制处理这些消息的执行线程的总数,而不是控制每个JMS队列执行的线程数 .

简而言之:大量的JMS队列 . 一个线程池来进行处理 .

有些队列比其他队列更繁忙(有些可能总是有工作,有些则长时间闲置) - 所以我想利用可用的处理能力做任何需要完成的工作,无论源队列如何 .

我已经使用具有固定池大小的共享 TaskExecutor 设置了一系列 DefaultMessageListenerContainer . 我观察到的行为是,一个队列将消耗所有可用的插槽 - 然后(即使当第一个队列变空)时,插槽也要使用.'t become available for the other queues' DefaultMessageListenerContainer s .

这是在javadocs for DefaultMessageListenerContainer.setTaskExecutor()中详细说明的:

普通线程池不会添加太多值,因为此侦听器容器将在其整个生命周期中占用多个线程 .

  • 有办法解决这个问题吗?它在javadocs中提到的"J2EE environment"中的工作方式有何不同?

  • 我可以做一些像 MessageListenerContainer 这样可以消耗多个队列的东西吗?

  • 如果我想要的是可能的 - 那么是否可以应用一些排序,从而在先前空闲的队列上接收的消息可以被赋予更高的优先级?

1 回答

  • 6

    我试图重新创建你的问题,但我不能 . 可能是因为您在此问题中缺少源代码 . 但这就是我的尝试 .

    import java.io.File;
    
    import javax.jms.ConnectionFactory;
    import javax.jms.JMSException;
    import javax.jms.Message;
    import javax.jms.Session;
    
    import org.springframework.boot.SpringApplication;
    import org.springframework.boot.autoconfigure.EnableAutoConfiguration;
    import org.springframework.context.ConfigurableApplicationContext;
    import org.springframework.context.annotation.Bean;
    import org.springframework.context.annotation.Configuration;
    import org.springframework.core.task.TaskExecutor;
    import org.springframework.jms.core.JmsTemplate;
    import org.springframework.jms.core.MessageCreator;
    import org.springframework.jms.listener.DefaultMessageListenerContainer;
    import org.springframework.jms.listener.adapter.MessageListenerAdapter;
    import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
    import org.springframework.util.FileSystemUtils;
    
    @Configuration
    @EnableAutoConfiguration
    public class Application {
    
        public class Receiver {
    
            private String name;
    
            public Receiver(String name) {
                this.name = name;
            }
    
            /**
             * When you receive a message, print it out, then shut down the application.
             * Finally, clean up any ActiveMQ server stuff.
             */
            public void receiveMessage(String message) {
                System.out.println("Received <" + message + "> @ "+name);
                //context.close();
                FileSystemUtils.deleteRecursively(new File("activemq-data"));
            }
        }
    
        static String mailboxDestination = "mailbox-destination";
    
        static String mailboxDestination2= "mailbox-destination2";
    
        @Bean
        MessageListenerAdapter adapter1() {
            MessageListenerAdapter messageListener
                    = new MessageListenerAdapter(new Receiver("MailBox1"));
            messageListener.setDefaultListenerMethod("receiveMessage");
            return messageListener;
        }
    
        @Bean
        MessageListenerAdapter adapter2() {
            MessageListenerAdapter messageListener
                    = new MessageListenerAdapter(new Receiver("MailBox2"));
            messageListener.setDefaultListenerMethod("receiveMessage");
            return messageListener;
        }
    
        @Bean
        DefaultMessageListenerContainer container(ConnectionFactory connectionFactory) throws Exception {
            DefaultMessageListenerContainer container = new DefaultMessageListenerContainer();
            container.setMessageListener(adapter1());
            container.setConnectionFactory(connectionFactory);
            container.setDestinationName(mailboxDestination);
            container.setTaskExecutor(taskExecutor());
    
            return container;
        }
    
        @Bean
        DefaultMessageListenerContainer container2(ConnectionFactory connectionFactory) throws Exception {
            DefaultMessageListenerContainer container = new DefaultMessageListenerContainer();
            container.setMessageListener(adapter2());
            container.setConnectionFactory(connectionFactory);
            container.setDestinationName(mailboxDestination2);
            container.setTaskExecutor(taskExecutor());
    
            return container;
        }
    
        @Bean
        TaskExecutor taskExecutor() throws Exception {
            ThreadPoolTaskExecutor taskExecutor = new ThreadPoolTaskExecutor();
            taskExecutor.setCorePoolSize(10);
            taskExecutor.setMaxPoolSize(100);
            taskExecutor.setQueueCapacity(1000);
            taskExecutor.setThreadGroupName("MyThreads");
    
            return taskExecutor;
        }
    
        public static void main(String[] args) {
            // Clean out any ActiveMQ data from a previous run
            FileSystemUtils.deleteRecursively(new File("activemq-data"));
    
            // Launch the application
            ConfigurableApplicationContext context = SpringApplication.run(Application.class, args);
    
            // Send a message
    
            final JmsTemplate jmsTemplate = context.getBean(JmsTemplate.class);
            System.out.println("Sending a new message.");
            new Thread() {
                public void run() {
                    for(int i=0;i<100;i++) {
                        final String message = (i+1)+" ping!";
                        MessageCreator messageCreator = new MessageCreator() {
                            @Override
                            public Message createMessage(Session session) throws JMSException {
                                return session.createTextMessage(message);
                            }
                        };
                        jmsTemplate.send(mailboxDestination, messageCreator);
                    }
                }
            }.start();
    
            new Thread() {
                public void run() {
                    for(int i=0;i<100;i++) {
                        final String message = (i+1)+" ping!";
                        MessageCreator messageCreator = new MessageCreator() {
                            @Override
                            public Message createMessage(Session session) throws JMSException {
                                return session.createTextMessage(message);
                            }
                        };
                        jmsTemplate.send(mailboxDestination2, messageCreator);
                    }
                }
            }.start();
    
        }
    
    }
    

    而我得到的输出是;

    Received <1 ping!> @ MailBox2
    Received <1 ping!> @ MailBox1
    Received <2 ping!> @ MailBox2
    Received <2 ping!> @ MailBox1
    Received <3 ping!> @ MailBox2
    Received <3 ping!> @ MailBox1
    Received <4 ping!> @ MailBox2
    Received <4 ping!> @ MailBox1
    Received <5 ping!> @ MailBox2
    Received <5 ping!> @ MailBox1
    Received <6 ping!> @ MailBox2
    Received <6 ping!> @ MailBox1
    Received <7 ping!> @ MailBox2
    Received <7 ping!> @ MailBox1
    Received <8 ping!> @ MailBox2
    Received <8 ping!> @ MailBox1
    Received <9 ping!> @ MailBox2
    Received <9 ping!> @ MailBox1
    Received <10 ping!> @ MailBox2
    Received <10 ping!> @ MailBox1
    Received <11 ping!> @ MailBox2
    Received <11 ping!> @ MailBox1
    Received <12 ping!> @ MailBox1
    Received <12 ping!> @ MailBox2
    Received <13 ping!> @ MailBox1
    Received <13 ping!> @ MailBox2
    Received <14 ping!> @ MailBox1
    Received <14 ping!> @ MailBox2
    Received <15 ping!> @ MailBox2
    Received <15 ping!> @ MailBox1
    Received <16 ping!> @ MailBox2
    Received <16 ping!> @ MailBox1
    Received <17 ping!> @ MailBox2
    Received <17 ping!> @ MailBox1
    Received <18 ping!> @ MailBox2
    Received <18 ping!> @ MailBox1
    Received <19 ping!> @ MailBox1
    Received <19 ping!> @ MailBox2
    Received <20 ping!> @ MailBox1
    Received <20 ping!> @ MailBox2
    Received <21 ping!> @ MailBox1
    Received <21 ping!> @ MailBox2
    Received <22 ping!> @ MailBox1
    Received <22 ping!> @ MailBox2
    Received <23 ping!> @ MailBox1
    Received <23 ping!> @ MailBox2
    Received <24 ping!> @ MailBox1
    Received <24 ping!> @ MailBox2
    Received <25 ping!> @ MailBox1
    Received <25 ping!> @ MailBox2
    Received <26 ping!> @ MailBox1
    Received <26 ping!> @ MailBox2
    Received <27 ping!> @ MailBox1
    Received <27 ping!> @ MailBox2
    Received <28 ping!> @ MailBox2
    Received <28 ping!> @ MailBox1
    Received <29 ping!> @ MailBox2
    Received <29 ping!> @ MailBox1
    Received <30 ping!> @ MailBox2
    Received <30 ping!> @ MailBox1
    Received <31 ping!> @ MailBox2
    Received <31 ping!> @ MailBox1
    Received <32 ping!> @ MailBox2
    Received <33 ping!> @ MailBox2
    Received <32 ping!> @ MailBox1
    Received <34 ping!> @ MailBox2
    Received <33 ping!> @ MailBox1
    Received <34 ping!> @ MailBox1
    Received <35 ping!> @ MailBox2
    Received <35 ping!> @ MailBox1
    Received <36 ping!> @ MailBox2
    Received <36 ping!> @ MailBox1
    Received <37 ping!> @ MailBox2
    Received <37 ping!> @ MailBox1
    Received <38 ping!> @ MailBox2
    Received <38 ping!> @ MailBox1
    Received <39 ping!> @ MailBox2
    Received <39 ping!> @ MailBox1
    Received <40 ping!> @ MailBox2
    Received <40 ping!> @ MailBox1
    Received <41 ping!> @ MailBox2
    Received <42 ping!> @ MailBox2
    Received <43 ping!> @ MailBox2
    Received <44 ping!> @ MailBox2
    Received <45 ping!> @ MailBox2
    Received <46 ping!> @ MailBox2
    Received <47 ping!> @ MailBox2
    Received <48 ping!> @ MailBox2
    Received <49 ping!> @ MailBox2
    Received <50 ping!> @ MailBox2
    Received <51 ping!> @ MailBox2
    Received <52 ping!> @ MailBox2
    Received <53 ping!> @ MailBox2
    Received <54 ping!> @ MailBox2
    Received <55 ping!> @ MailBox2
    Received <56 ping!> @ MailBox2
    Received <57 ping!> @ MailBox2
    Received <58 ping!> @ MailBox2
    Received <59 ping!> @ MailBox2
    Received <60 ping!> @ MailBox2
    Received <61 ping!> @ MailBox2
    Received <62 ping!> @ MailBox2
    Received <63 ping!> @ MailBox2
    Received <64 ping!> @ MailBox2
    Received <65 ping!> @ MailBox2
    Received <66 ping!> @ MailBox2
    Received <67 ping!> @ MailBox2
    Received <68 ping!> @ MailBox2
    Received <69 ping!> @ MailBox2
    Received <70 ping!> @ MailBox2
    Received <71 ping!> @ MailBox2
    Received <72 ping!> @ MailBox2
    Received <73 ping!> @ MailBox2
    Received <74 ping!> @ MailBox2
    Received <75 ping!> @ MailBox2
    Received <76 ping!> @ MailBox2
    Received <77 ping!> @ MailBox2
    Received <78 ping!> @ MailBox2
    Received <79 ping!> @ MailBox2
    Received <80 ping!> @ MailBox2
    Received <81 ping!> @ MailBox2
    Received <82 ping!> @ MailBox2
    Received <83 ping!> @ MailBox2
    Received <84 ping!> @ MailBox2
    Received <85 ping!> @ MailBox2
    Received <86 ping!> @ MailBox2
    Received <87 ping!> @ MailBox2
    Received <88 ping!> @ MailBox2
    Received <89 ping!> @ MailBox2
    Received <90 ping!> @ MailBox2
    Received <91 ping!> @ MailBox2
    Received <92 ping!> @ MailBox2
    Received <93 ping!> @ MailBox2
    Received <94 ping!> @ MailBox2
    Received <95 ping!> @ MailBox2
    Received <96 ping!> @ MailBox2
    Received <97 ping!> @ MailBox2
    Received <98 ping!> @ MailBox2
    Received <99 ping!> @ MailBox2
    Received <100 ping!> @ MailBox2
    Received <41 ping!> @ MailBox1
    Received <42 ping!> @ MailBox1
    Received <43 ping!> @ MailBox1
    Received <44 ping!> @ MailBox1
    Received <45 ping!> @ MailBox1
    Received <46 ping!> @ MailBox1
    Received <47 ping!> @ MailBox1
    Received <48 ping!> @ MailBox1
    Received <49 ping!> @ MailBox1
    Received <50 ping!> @ MailBox1
    Received <51 ping!> @ MailBox1
    Received <52 ping!> @ MailBox1
    Received <53 ping!> @ MailBox1
    Received <54 ping!> @ MailBox1
    Received <55 ping!> @ MailBox1
    Received <56 ping!> @ MailBox1
    Received <57 ping!> @ MailBox1
    Received <58 ping!> @ MailBox1
    Received <59 ping!> @ MailBox1
    Received <60 ping!> @ MailBox1
    Received <61 ping!> @ MailBox1
    Received <62 ping!> @ MailBox1
    Received <63 ping!> @ MailBox1
    Received <64 ping!> @ MailBox1
    Received <65 ping!> @ MailBox1
    Received <66 ping!> @ MailBox1
    Received <67 ping!> @ MailBox1
    Received <68 ping!> @ MailBox1
    Received <69 ping!> @ MailBox1
    Received <70 ping!> @ MailBox1
    Received <71 ping!> @ MailBox1
    Received <72 ping!> @ MailBox1
    Received <73 ping!> @ MailBox1
    Received <74 ping!> @ MailBox1
    Received <75 ping!> @ MailBox1
    Received <76 ping!> @ MailBox1
    Received <77 ping!> @ MailBox1
    Received <78 ping!> @ MailBox1
    Received <79 ping!> @ MailBox1
    Received <80 ping!> @ MailBox1
    Received <81 ping!> @ MailBox1
    Received <82 ping!> @ MailBox1
    Received <83 ping!> @ MailBox1
    Received <84 ping!> @ MailBox1
    Received <85 ping!> @ MailBox1
    Received <86 ping!> @ MailBox1
    Received <87 ping!> @ MailBox1
    Received <88 ping!> @ MailBox1
    Received <89 ping!> @ MailBox1
    Received <90 ping!> @ MailBox1
    Received <91 ping!> @ MailBox1
    Received <92 ping!> @ MailBox1
    Received <93 ping!> @ MailBox1
    Received <94 ping!> @ MailBox1
    Received <95 ping!> @ MailBox1
    Received <96 ping!> @ MailBox1
    Received <97 ping!> @ MailBox1
    Received <98 ping!> @ MailBox1
    Received <99 ping!> @ MailBox1
    Received <100 ping!> @ MailBox1
    

    如果我错了,请纠正我但是使用这种配置我没有看到任何问题 . 两个队列和一个任务 Actuator .

相关问题