我使用ExecutorService在不同的线程中运行许多任务 . 有时,在线程池中等待的Runnable实例太多可能会导致Out of Memory问题 .
我尝试编写一个阻塞作业 Actuator 来解决它 . 这有什么官方解决方案吗?
例如:
BlockingJobExecutor executor = new BlockingJobExecutor(3);
for (int i = 0; i < 1000; i++) {
executor.addJob(new Runnable() {
@Override
public void run() {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
LogFactory.getLog(BTest.class).info("test " + System.currentTimeMillis());
}
});
}
executor.shutdown();
这是BlockingJobExecutor类:
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
public class BlockingJobExecutor {
AtomicInteger counter = new AtomicInteger();
ExecutorService service;
int threads;
public BlockingJobExecutor(int threads) {
if (threads < 1) {
throw new IllegalArgumentException("threads must be greater than 1.");
}
service = Executors.newFixedThreadPool(threads);
this.threads = threads;
}
static class JobWrapper implements Runnable {
BlockingJobExecutor executor;
Runnable job;
public JobWrapper(BlockingJobExecutor executor, Runnable job) throws InterruptedException {
synchronized (executor.counter) {
while (executor.counter.get() >= executor.limit()) {
executor.counter.wait();
}
}
this.executor = executor;
this.job = job;
}
@Override
public void run() {
try {
job.run();
} finally {
synchronized (executor.counter) {
executor.counter.decrementAndGet();
executor.counter.notifyAll();
}
}
}
}
public int limit() {
return threads;
}
public void shutdown() {
service.shutdown();
try {
service.awaitTermination(Long.MAX_VALUE, TimeUnit.MILLISECONDS);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
public void addJob(Runnable job) {
try {
service.execute(new JobWrapper(this, job));
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
}
1 回答
有两种方法可以实现 . 排队等待运行的runnables太多,或者同时运行的线程太多 . 如果排队的作业太多,您可以在
ExecutorService
中使用固定大小的BlockingQueue
来限制可以排队的项目数 . 然后,当您尝试对新任务进行排队时,操作将阻塞,直到队列中有空间为止 .如果一次运行的线程太多,则可以通过使用所需的线程数调用
Executors.newFixedThreadPool
来限制可用于在ExecutorService中运行任务的线程数 .