public class MainClass {
private static final int producerPoolSize = 10;
private static final int consumerPoolSize = 20;
private ExecutorService prodExec = Executors.newFixedThreadPool(producerPoolSize);
private ExecutorService consExec = Executors.newFixedThreadPool(consumerPoolSize);
//main method here, which calls start() below
private void start(String[] args) {
// Get list of ids, split them in to n(producerPoolSize) chunks
for (int index = 0; index < producerPoolSize; index++) {
Runnable producer = new Producer(consExec, chunkOfIdsForThisProducer);
prodExec.execute(producer);
}
}
public class Producer implements Runnable {
private ExecutorService consExec;
private List<Long> list;
public Producer(ExecutorService exec, List<Long> list) {
this.consExec = exec;
this.list = list;
}
public void run() {
for (Long id: list) {
data = get data from db for the id
consExec.execute(new Consumer(data));
}
}
}
public class Consumer implements Runnable {
public void run() {
// call web service
}
}
在上面的代码中,我有两个线程池 - 一个用于 生产环境 者和消费者 . 我从数据库中获取了许多ID,将它们分成相等的块,以便将它们分发给Producer线程进行处理 . 生产环境 者线程接收ID列表并按顺序处理每个ID,检索每个ID的数据并将该数据提交给Consumer线程进行处理 . 现在我的问题是:
我上面创建了10个生成器线程 . 我希望Consumer线程池的大小为20.但是,在处理每个ID时,Producer创建一个新的Runnable(Consumer)并将其提交(执行)到Consumer executor服务 . 我对ExecutorService的理解是你提交给它的Runnable被包装在一个Worker线程中然后被执行 . 那么,在上面的代码中,如果每个 生产环境 者获得的ID数量是50,我实际上是在创建50 * 10 = 500个消费者线程吗?太多了吗?
或者池大小实际上是指工作线程的数量?所以在上面的代码中,我在Consumer executor上创建了500个任务,它们实际上是由20个工作线程排队并执行的?我可能没有正确解释这个问题,但在 Actuator 的内部实现方面有点混淆,并担心如果我创建了太多的Consumer线程 .
如果这不是实现这一点的方法,有人可以提出更好的方法吗?谢谢 .
2 回答
池大小实际上是指工作线程的数量吗?是 .
如果消费者Runnable进程需要很长时间,则只会同时运行20个 . 其余的将在集合中等待,直到有一个线程可以运行它 .
至于是否有更好的方法来做到这一点 . 你需要使用线程吗?除非您有20个并行运行此处理器的处理器,否则可能不会增加处理时间,因为所有线程都会在上下文切换等中花费时间来处理数据 .
此外, 生产环境 者获取所有数据并将其存储在消费者中 . 如果消费者无法运行,因为您有500个,并且只有20个可以一次运行,那么您将存储(500减20)*您可以处理的数据 . 您可以让消费者获取自己的数据 .
回应评论:
代替
和处理器
消费者看起来像:
和
然后你松开了整个类,并且20个池更有意义,因为在IO获取数据时被阻止的消费者将被等待,并且准备就绪的消费者可以继续处理 .
池大小决定了工作线程的数量 . 如果在所有工作线程忙时尝试提交项目,则ExecutorService将对其进行排队,并在工作人员空闲时运行 .
The javadocs这样说:
请注意高亮部分 . 线程数是固定的,队列是无限制的,这意味着线程忙时提交的项将始终排队,而不是被拒绝 .