首页 文章

生产环境 者消费者 - 使用Executors.newFixedThreadPool

提问于
浏览
7

我对 生产环境 者 - 消费者模式的理解是,它可以使用 生产环境 者和消费者之间共享的队列来实现 . 生产环境 者将工作提交给共享队列,消费者检索它并处理它 . 它也可以由 生产环境 者直接提交给消费者来实现( 生产环境 者线程直接提交给消费者的执行者服务) .

现在,我一直在查看Executors类,它提供了一些线程池的常见实现 . 根据规范,newFixedThreadPool方法“重用固定数量的线程在共享的无界队列中运行” . 他们在这里谈论哪个队列?

如果Producer直接向使用者提交任务,那么它是包含Runnables列表的ExecutorService的内部队列吗?

或者它是中间队列,以防 生产环境 者提交到共享队列?

可能是我错过了重点,但请有人澄清一下吗?

3 回答

  • 1

    你是对的, ExecutorService 不仅是一个线程池,而且是一个完整的Producer-Consumer实现 . 这个内部队列实际上是 Runnable (确切地说是 FutureTask )的线程安全队列,用于保存任务 submit() .

    池中的所有线程都在该队列上被阻塞,等待执行任务 . 当你执行任务时,只需一个线程就可以启动并运行它 . 当然 submit() 不是在等待池中的线程来完成处理 .

    另一方面,如果您提交了大量任务(或长时间运行的任务),您最终可能会占用池中的所有线程并且某些任务在队列中等待 . 一旦任何线程完成其任务,它将立即从队列中选择第一个 .

  • 4
    public class Producer extends Thread {  
        static List<String> list = new ArrayList<String>();  
    
        public static void main(String[] args) {  
            ScheduledExecutorService executor = Executors  
                    .newScheduledThreadPool(12);  
            int initialDelay = 5;  
            int pollingFrequency = 5;  
            Producer producer = new Producer();  
            @SuppressWarnings({ "rawtypes", "unused" })  
            ScheduledFuture schedFutureProducer = executor.scheduleWithFixedDelay(  
                    producer, initialDelay, pollingFrequency, TimeUnit.SECONDS);  
            for (int i = 0; i < 3; i++) {  
                Consumer consumer = new Consumer();  
                @SuppressWarnings({ "rawtypes", "unused" })  
                ScheduledFuture schedFutureConsumer = executor  
                        .scheduleWithFixedDelay(consumer, initialDelay,  
                                pollingFrequency, TimeUnit.SECONDS);  
            }  
    
        }  
    
        @Override  
        public void run() {  
            list.add("object added to list is " + System.currentTimeMillis());  
                                  ///adding in list become slow also because of synchronized behavior  
        }  
    }  
    
    class Consumer extends Thread {  
    
        @Override  
        public void run() {  
            getObjectFromList();  
        }  
    
        private void getObjectFromList() {  
            synchronized (Producer.list) {  
                if (Producer.list.size() > 0) {  
                    System.out.println("Object name removed by "  
                            + Thread.currentThread().getName() + "is "  
                            + Producer.list.get(0));  
                    Producer.list.remove(Producer.list.get(0));  
                }  
            }  
        }  
    }
    
  • 0

    看一下这个:
    Producer-Consumer example in Java (RabbitMQ)(在Java中它是's written for another library but it',它清楚地表明了这个概念;)
    希望能帮助到你!

    P.S.:Actually,它有几个例子,但你明白了;)

相关问题