首页 文章

Producer-Consumer with ExecutorService.newFixedThreadPool - 创建了多少个线程?

提问于
浏览
1
public class MainClass {

private static final int producerPoolSize = 10;
private static final int consumerPoolSize = 20;    

private ExecutorService prodExec = Executors.newFixedThreadPool(producerPoolSize);
private ExecutorService consExec = Executors.newFixedThreadPool(consumerPoolSize);

//main method here, which calls start() below

private void start(String[] args) {

    // Get list of ids, split them in to n(producerPoolSize) chunks

    for (int index = 0; index < producerPoolSize; index++) {
        Runnable producer = new Producer(consExec, chunkOfIdsForThisProducer);
        prodExec.execute(producer);
    }

}

public class Producer implements Runnable {

    private ExecutorService consExec;
    private List<Long> list;

    public Producer(ExecutorService exec, List<Long> list) {
        this.consExec = exec;
        this.list = list;
    }

    public void run() {

       for (Long id: list) {
          data = get data from db for the id
          consExec.execute(new Consumer(data));
       }
    }
 }

 public class Consumer implements Runnable {

     public void run() {
        // call web service
     }
 }

在上面的代码中,我有两个线程池 - 一个用于 生产环境 者和消费者 . 我从数据库中获取了许多ID,将它们分成相等的块,以便将它们分发给Producer线程进行处理 . 生产环境 者线程接收ID列表并按顺序处理每个ID,检索每个ID的数据并将该数据提交给Consumer线程进行处理 . 现在我的问题是:

我上面创建了10个生成器线程 . 我希望Consumer线程池的大小为20.但是,在处理每个ID时,Producer创建一个新的Runnable(Consumer)并将其提交(执行)到Consumer executor服务 . 我对ExecutorService的理解是你提交给它的Runnable被包装在一个Worker线程中然后被执行 . 那么,在上面的代码中,如果每个 生产环境 者获得的ID数量是50,我实际上是在创建50 * 10 = 500个消费者线程吗?太多了吗?

或者池大小实际上是指工作线程的数量?所以在上面的代码中,我在Consumer executor上创建了500个任务,它们实际上是由20个工作线程排队并执行的?我可能没有正确解释这个问题,但在 Actuator 的内部实现方面有点混淆,并担心如果我创建了太多的Consumer线程 .

如果这不是实现这一点的方法,有人可以提出更好的方法吗?谢谢 .

2 回答

  • 2

    池大小实际上是指工作线程的数量吗?是 .

    如果消费者Runnable进程需要很长时间,则只会同时运行20个 . 其余的将在集合中等待,直到有一个线程可以运行它 .

    至于是否有更好的方法来做到这一点 . 你需要使用线程吗?除非您有20个并行运行此处理器的处理器,否则可能不会增加处理时间,因为所有线程都会在上下文切换等中花费时间来处理数据 .

    此外, 生产环境 者获取所有数据并将其存储在消费者中 . 如果消费者无法运行,因为您有500个,并且只有20个可以一次运行,那么您将存储(500减20)*您可以处理的数据 . 您可以让消费者获取自己的数据 .

    回应评论:

    代替

    for (int index = 0; index < producerPoolSize; index++) {
        Runnable producer = new Producer(consExec, chunkOfIdsForThisProducer);
        prodExec.execute(producer);
    }
    

    和处理器

    for (Long id: list) {
        data = get data from db for the id
        consExec.execute(new Consumer(data));
    }
    

    消费者看起来像:

    public class Consumer implements Runnable {
    
         long myId;
    
         Consumer(long id){
           myId = id;
         }
    
         public void run() {
            data = get data from db for the id
            // do whatever a consumer does with data
         }
     }
    

    private void start(String[] args) {
    
        // Get list of ids create a new consumer for each id
    
        for (int index = 0; index < everyID.length; index++) {
            consExec.execute(new Consumer(everyID[i]));
        }
    
    }
    

    然后你松开了整个类,并且20个池更有意义,因为在IO获取数据时被阻止的消费者将被等待,并且准备就绪的消费者可以继续处理 .

  • 3

    池大小决定了工作线程的数量 . 如果在所有工作线程忙时尝试提交项目,则ExecutorService将对其进行排队,并在工作人员空闲时运行 .

    The javadocs这样说:

    创建一个线程池,该线程池重用一组在共享无界队列中运行的固定线程 . 如果任何线程由于在关闭之前执行期间的故障而终止,则如果需要执行后续任务,则新线程将取代它 .

    请注意高亮部分 . 线程数是固定的,队列是无限制的,这意味着线程忙时提交的项将始终排队,而不是被拒绝 .

相关问题