你如何限制ExecutorService中的线程?

我使用ExecutorService在不同的线程中运行许多任务 . 有时,在线程池中等待的Runnable实例太多可能会导致Out of Memory问题 .

我尝试编写一个阻塞作业 Actuator 来解决它 . 这有什么官方解决方案吗?

例如:

BlockingJobExecutor executor = new BlockingJobExecutor(3);
    for (int i = 0; i < 1000; i++) {
        executor.addJob(new Runnable() {

            @Override
            public void run() {
                try {
                    Thread.sleep(1000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                LogFactory.getLog(BTest.class).info("test " + System.currentTimeMillis());
            }
        });
    }
    executor.shutdown();

这是BlockingJobExecutor类:

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;

public class BlockingJobExecutor {

    AtomicInteger counter = new AtomicInteger();
    ExecutorService service;
    int threads;

    public BlockingJobExecutor(int threads) {
        if (threads < 1) {
            throw new IllegalArgumentException("threads must be greater than 1.");
        }
        service = Executors.newFixedThreadPool(threads);
        this.threads = threads;
    }

    static class JobWrapper implements Runnable {
        BlockingJobExecutor executor;
        Runnable job;

        public JobWrapper(BlockingJobExecutor executor, Runnable job) throws InterruptedException {
            synchronized (executor.counter) {
                while (executor.counter.get() >= executor.limit()) {
                    executor.counter.wait();
                }
            }
            this.executor = executor;
            this.job = job;
        }

        @Override
        public void run() {
            try {
                job.run();
            } finally {
                synchronized (executor.counter) {
                    executor.counter.decrementAndGet();
                    executor.counter.notifyAll();
                }
            }
        }
    }

    public int limit() {
        return threads;
    }

    public void shutdown() {
        service.shutdown();
        try {
            service.awaitTermination(Long.MAX_VALUE, TimeUnit.MILLISECONDS);
        } catch (InterruptedException e) {
            throw new RuntimeException(e);
        }
    }

    public void addJob(Runnable job) {
        try {
            service.execute(new JobWrapper(this, job));
        } catch (InterruptedException e) {
            throw new RuntimeException(e);
        }
    }

}

回答(1)

3 years ago

有两种方法可以实现 . 排队等待运行的runnables太多,或者同时运行的线程太多 . 如果排队的作业太多,您可以在 ExecutorService 中使用固定大小的 BlockingQueue 来限制可以排队的项目数 . 然后,当您尝试对新任务进行排队时,操作将阻塞,直到队列中有空间为止 .

如果一次运行的线程太多,则可以通过使用所需的线程数调用 Executors.newFixedThreadPool 来限制可用于在ExecutorService中运行任务的线程数 .