首页 文章

设置阅读器,解码器和消费者线程

提问于
浏览
1

我有一个相当标准的 生产环境 者和消费者线程:

  • producer将文件中的字节读取并解码为阻塞队列 .

  • consumer正在从队列中轮询项目

Happenes认为解码过程是一个瓶颈,可能会从拥有更多的CPU中获益 . 这是制片人时间的70% . 如果我介绍“解码器”线程,我会获得任何显着的性能吗?

  • producer将文件中的字节读取到阻塞"object"队列

  • decoder将字节对象解码为项目

  • 消费者正在轮询"decoded"项目

由于内存占用,我需要使用一个队列 - 无法承受两个队列(字节/项),所以我猜对象“cast”开销会出现吗?

关于如何实现这个3线程解决方案的任何想法?

谢谢!

2 回答

  • 0

    您应该调整 生产环境 者和使用者的线程池 - 例如,如果消费者对 生产环境 者来说太快,那么它的线程池可以分配比 生产环境 者线程池更少的线程 . 这应该导致吞吐量的显着增加 . 应调整 生产环境 者与消费者线程的比例(例3:1) .

    在类似的行上,您可以有三个线程池,其中Producer(Reader)和Consumer具有较少的线程,而解码器(转换器)线程池具有更多的线程 . 我不确定您是否需要代码示例,在这种情况下您应该分享您目前拥有的代码 . 我将从 生产环境 者和消费者的大小为1的线程池和变换器(解码器)的大小5开始,然后测量瓶颈是什么(如果那么吞吐量满足您的期望)

    import java.util.Random;
    import java.util.concurrent.BlockingQueue;
    import java.util.concurrent.Callable;
    import java.util.concurrent.ExecutorService;
    import java.util.concurrent.Executors;
    import java.util.concurrent.PriorityBlockingQueue;
    import java.util.concurrent.TimeUnit;
    
    public class ProducerDecoderConsumer {
        /**
         * @param args
         */
        public static void main(String[] args) {
            BlockingQueue<Integer> inputQueue = new PriorityBlockingQueue<Integer>();
            BlockingQueue<String> outputQueue = new PriorityBlockingQueue<String>();
            ExecutorService reader = Executors.newSingleThreadExecutor();
            reader.submit(new Producer(inputQueue));
            ExecutorService decoder = Executors.newFixedThreadPool(5);
            decoder.submit(new Transformer(inputQueue, outputQueue));
            ExecutorService writer = Executors.newSingleThreadExecutor();
            writer.submit(new Consumer(outputQueue));
    
        }
    
        private static class Producer implements Callable<Void> {
            final BlockingQueue<Integer> queue;
    
            public Producer(final BlockingQueue<Integer> pQueue) {
                queue = pQueue;
            }
    
            @Override
            public Void call() throws Exception {
                try {
                    Random random = new Random();
                    while (true) {
                        queue.put(random.nextInt());
                    }
                } catch (Exception e) {
    
                }
                return null;
            }
        }
    
        private static class Transformer implements Callable<Void> {
            final BlockingQueue<Integer> inputQueue;
    
            final BlockingQueue<String> outputQueue;
    
            public Transformer(final BlockingQueue<Integer> pInputQueue, final BlockingQueue<String> pOutputQueue) {
                inputQueue = pInputQueue;
                outputQueue = pOutputQueue;
            }
    
            @Override
            public Void call() throws Exception {
                try {
                    while (true) {
                        Integer input = inputQueue.take();
                        String output = String.valueOf(input); // decode input to output
                        outputQueue.put(output); // output
                    }
                } catch (Exception e) {
    
                }
                return null;
            }
        }
    
        private static class Consumer implements Callable<Void> {
            final BlockingQueue<String> queue;
    
            public Consumer(final BlockingQueue<String> pQueue) {
                queue = pQueue;
            }
    
            @Override
            public Void call() throws Exception {
                try {
                    while (true) {
                        System.out.println(queue.take());
                    }
                } catch (Exception e) {
    
                }
                return null;
            }
        }
    }
    

    我添加了一些代码来说明这个想法 - 我使用的是两个阻塞队列,不像你问题中提到的单个队列,因为我认为不会有额外队列的开销 - 我建议使用分析器证明这样的事情 . 但是,我希望你发现它很有用,如果你真的觉得有需要,可以将它改装成单队列模型 .

  • 0

    2个队列,一个用于保存多个消费者解码的未解码对象 .

    多个消费者将解码并将解码的对象写入第二队列,最终消费者将从中消费 .

    确保避免死锁(除非你真的知道你在做什么,否则请使用 notifyAll() 而不是 notify()

相关问题