首页 文章

使用队列的 生产环境 者/消费者线程

提问于
浏览
53

我想创建一些 Producer/Consumer 线程应用程序 . 但我不确定在两者之间实现队列的最佳方法是什么 .

所以我有两个想法(两者都可能是完全错误的) . 我想知道哪个更好,如果它们都吮吸那么什么是实现队列的最佳方式 . 这主要是我在这些例子中实现的队列,我很关心 . 我正在扩展一个内部类的Queue类,并且是线程安全的 . 以下是两个示例,每个示例包含4个类 .

主类 -

public class SomeApp
{
    private Consumer consumer;
    private Producer producer;

    public static void main (String args[])
    {
        consumer = new Consumer();
        producer = new Producer();
    }
}

消费者类 -

public class Consumer implements Runnable
{
    public Consumer()
    {
        Thread consumer = new Thread(this);
        consumer.start();
    }

    public void run()
    {
        while(true)
        {
            //get an object off the queue
            Object object = QueueHandler.dequeue();
            //do some stuff with the object
        }
    }
}

制片人类 -

public class Producer implements Runnable
{
    public Producer()
    {
        Thread producer = new Thread(this);
        producer.start();
    }

    public void run()
    {
        while(true)
        {
            //add to the queue some sort of unique object
            QueueHandler.enqueue(new Object());
        }
    }
}

队列类 -

public class QueueHandler
{
    //This Queue class is a thread safe (written in house) class
    public static Queue<Object> readQ = new Queue<Object>(100);

    public static void enqueue(Object object)
    {
        //do some stuff
        readQ.add(object);
    }

    public static Object dequeue()
    {
        //do some stuff
        return readQ.get();
    }
}

OR

主类 -

public class SomeApp
{
    Queue<Object> readQ;
    private Consumer consumer;
    private Producer producer;

    public static void main (String args[])
    {
        readQ = new Queue<Object>(100);
        consumer = new Consumer(readQ);
        producer = new Producer(readQ);
    }
}

消费者类 -

public class Consumer implements Runnable
{
    Queue<Object> queue;

    public Consumer(Queue<Object> readQ)
    {
        queue = readQ;
        Thread consumer = new Thread(this);
        consumer.start();
    }

    public void run()
    {
        while(true)
        {
            //get an object off the queue
            Object object = queue.dequeue();
            //do some stuff with the object
        }
    }
}

制片人类 -

public class Producer implements Runnable
{
    Queue<Object> queue;

    public Producer(Queue<Object> readQ)
    {
        queue = readQ;
        Thread producer = new Thread(this);
        producer.start();
    }

    public void run()
    {

        while(true)
        {
            //add to the queue some sort of unique object
            queue.enqueue(new Object());
        }
    }
}

队列类 -

//the extended Queue class is a thread safe (written in house) class
public class QueueHandler extends Queue<Object>
{    
    public QueueHandler(int size)
    {
        super(size); //All I'm thinking about now is McDonalds.
    }

    public void enqueue(Object object)
    {
        //do some stuff
        readQ.add();
    }

    public Object dequeue()
    {
        //do some stuff
        return readQ.get();
    }
}

去!

6 回答

  • 17

    Java 5拥有您需要的所有工具 . 你会想:

    • 将所有制作人放在一个ExecutorService;

    • 将所有消费者放在另一个消费者身上 ExecutorService ;

    • 如有必要,使用BlockingQueue在两者之间进行通信 .

    我说“如果有必要”(3),因为根据我的经验,这是一个不必要的步骤 . 您所做的就是向消费者执行者服务提交新任务 . 所以:

    final ExecutorService producers = Executors.newFixedThreadPool(100);
    final ExecutorService consumers = Executors.newFixedThreadPool(100);
    while (/* has more work */) {
      producers.submit(...);
    }
    producers.shutdown();
    producers.awaitTermination(Long.MAX_VALUE, TimeUnit.NANOSECONDS);
    consumers.shutdown();
    consumers.awaitTermination(Long.MAX_VALUE, TimeUnit.NANOSECONDS);
    

    所以 producers 直接提交给 consumers .

  • 1

    好的,正如其他人所说,最好的办法是使用 java.util.concurrent 包 . 我强烈推荐"Java Concurrency in Practice" . 这本书很棒,几乎涵盖了你需要知道的一切 .

    至于你的特定实现,正如我在评论中提到的,不要从构造函数启动线程 - 它可能是不安全的 .

    抛开这一点,第二个实现似乎更好 . 您不希望将队列放在静态字段中 . 你可能只是失去了灵活性 .

    如果你想继续你自己的实现(我猜是为了学习目的?),至少提供一个 start() 方法 . 您应该构造对象(您可以实例化 Thread 对象),然后调用 start() 来启动该线程 .

    编辑: ExecutorService 有自己的队列,所以这可能会令人困惑..这是让你开始的东西 .

    public class Main {
        public static void main(String[] args) {
            //The numbers are just silly tune parameters. Refer to the API.
            //The important thing is, we are passing a bounded queue.
            ExecutorService consumer = new ThreadPoolExecutor(1,4,30,TimeUnit.SECONDS,new LinkedBlockingQueue<Runnable>(100));
    
            //No need to bound the queue for this executor.
            //Use utility method instead of the complicated Constructor.
            ExecutorService producer = Executors.newSingleThreadExecutor();
    
            Runnable produce = new Produce(consumer);
            producer.submit(produce);   
        }
    }
    
    class Produce implements Runnable {
        private final ExecutorService consumer;
    
        public Produce(ExecutorService consumer) {
            this.consumer = consumer;
        }
    
        @Override
        public void run() {
            Pancake cake = Pan.cook();
            Runnable consume = new Consume(cake);
            consumer.submit(consume);
        }
    }
    
    class Consume implements Runnable {
        private final Pancake cake;
    
        public Consume(Pancake cake){
            this.cake = cake;
        }
    
        @Override
        public void run() {
            cake.eat();
        }
    }
    

    进一步编辑:对于 生产环境 者而不是 while(true) ,您可以执行以下操作:

    @Override
    public void run(){
        while(!Thread.currentThread().isInterrupted()){
            //do stuff
        }
    }
    

    这样,您可以通过调用 .shutdownNow() 来关闭执行程序 . 如果您使用 while(true) ,它将不会关闭 .

    另请注意 Producer 仍然容易受到 RuntimeExceptions (即一个 RuntimeException 将停止处理)

  • 72

    You are reinventing the wheel.

    如果您需要持久性和其他企业功能使用JMS(我建议ActiveMq) .

    如果你需要快速的内存中队列,请使用java的Queue的一种强制性 .

    如果您需要支持java 1.4或更早版本,请使用Doug Lea优秀的concurrent包 .

  • 8

    我已经扩展了cletus提出的工作代码示例的答案 .

    • 一个 ExecutorService (pes)接受 Producer 任务 .

    • 一个 ExecutorService (ces)接受 Consumer 任务 .

    • ProducerConsumer 共享 BlockingQueue .

    • 多个 Producer 任务生成不同的数字 .

    • Consumer 任务中的任何一个都可以消耗由 Producer 生成的数字

    码:

    import java.util.concurrent.*;
    
    public class ProducerConsumerWithES {
        public static void main(String args[]){
             BlockingQueue<Integer> sharedQueue = new LinkedBlockingQueue<Integer>();
    
             ExecutorService pes = Executors.newFixedThreadPool(2);
             ExecutorService ces = Executors.newFixedThreadPool(2);
    
             pes.submit(new Producer(sharedQueue,1));
             pes.submit(new Producer(sharedQueue,2));
             ces.submit(new Consumer(sharedQueue,1));
             ces.submit(new Consumer(sharedQueue,2));
             // shutdown should happen somewhere along with awaitTermination
             / * https://stackoverflow.com/questions/36644043/how-to-properly-shutdown-java-executorservice/36644320#36644320 */
             pes.shutdown();
             ces.shutdown();
        }
    }
    class Producer implements Runnable {
        private final BlockingQueue<Integer> sharedQueue;
        private int threadNo;
        public Producer(BlockingQueue<Integer> sharedQueue,int threadNo) {
            this.threadNo = threadNo;
            this.sharedQueue = sharedQueue;
        }
        @Override
        public void run() {
            for(int i=1; i<= 5; i++){
                try {
                    int number = i+(10*threadNo);
                    System.out.println("Produced:" + number + ":by thread:"+ threadNo);
                    sharedQueue.put(number);
                } catch (Exception err) {
                    err.printStackTrace();
                }
            }
        }
    }
    
    class Consumer implements Runnable{
        private final BlockingQueue<Integer> sharedQueue;
        private int threadNo;
        public Consumer (BlockingQueue<Integer> sharedQueue,int threadNo) {
            this.sharedQueue = sharedQueue;
            this.threadNo = threadNo;
        }
        @Override
        public void run() {
            while(true){
                try {
                    int num = sharedQueue.take();
                    System.out.println("Consumed: "+ num + ":by thread:"+threadNo);
                } catch (Exception err) {
                   err.printStackTrace();
                }
            }
        }   
    }
    

    输出:

    Produced:11:by thread:1
    Produced:21:by thread:2
    Produced:22:by thread:2
    Consumed: 11:by thread:1
    Produced:12:by thread:1
    Consumed: 22:by thread:1
    Consumed: 21:by thread:2
    Produced:23:by thread:2
    Consumed: 12:by thread:1
    Produced:13:by thread:1
    Consumed: 23:by thread:2
    Produced:24:by thread:2
    Consumed: 13:by thread:1
    Produced:14:by thread:1
    Consumed: 24:by thread:2
    Produced:25:by thread:2
    Consumed: 14:by thread:1
    Produced:15:by thread:1
    Consumed: 25:by thread:2
    Consumed: 15:by thread:1
    

    注意 . 如果您不需要多个 生产环境 者和消费者,请保留单个 生产环境 者和消费者 . 我添加了多个 生产环境 者和消费者,以在多个 生产环境 者和消费者之间展示BlockingQueue的功能 .

  • 7

    这是一个非常简单的代码 .

    import java.util.*;
    
    // @author : rootTraveller, June 2017
    
    class ProducerConsumer {
        public static void main(String[] args) throws Exception {
            Queue<Integer> queue = new LinkedList<>();
            Integer buffer = new Integer(10);  //Important buffer or queue size, change as per need.
    
            Producer producerThread = new Producer(queue, buffer, "PRODUCER");
            Consumer consumerThread = new Consumer(queue, buffer, "CONSUMER");
    
            producerThread.start();  
            consumerThread.start();
        }   
    }
    
    class Producer extends Thread {
        private Queue<Integer> queue;
        private int queueSize ;
    
        public Producer (Queue<Integer> queueIn, int queueSizeIn, String ThreadName){
            super(ThreadName);
            this.queue = queueIn;
            this.queueSize = queueSizeIn;
        }
    
        public void run() {
            while(true){
                synchronized (queue) {
                    while(queue.size() == queueSize){
                        System.out.println(Thread.currentThread().getName() + " FULL         : waiting...\n");
                        try{
                            queue.wait();   //Important
                        } catch (Exception ex) {
                            ex.printStackTrace();
                        }
                    }
    
                    //queue empty then produce one, add and notify  
                    int randomInt = new Random().nextInt(); 
                    System.out.println(Thread.currentThread().getName() + " producing... : " + randomInt); 
                    queue.add(randomInt); 
                    queue.notifyAll();  //Important
                } //synchronized ends here : NOTE
            }
        }
    }
    
    class Consumer extends Thread {
        private Queue<Integer> queue;
        private int queueSize;
    
        public Consumer(Queue<Integer> queueIn, int queueSizeIn, String ThreadName){
            super (ThreadName);
            this.queue = queueIn;
            this.queueSize = queueSizeIn;
        }
    
        public void run() {
            while(true){
                synchronized (queue) {
                    while(queue.isEmpty()){
                        System.out.println(Thread.currentThread().getName() + " Empty        : waiting...\n");
                        try {
                            queue.wait();  //Important
                        } catch (Exception ex) {
                            ex.printStackTrace();
                        }
                    }
    
                    //queue not empty then consume one and notify
                    System.out.println(Thread.currentThread().getName() + " consuming... : " + queue.remove());
                    queue.notifyAll();
                } //synchronized ends here : NOTE
            }
        }
    }
    
  • 2
    • Java代码"BlockingQueue"已经同步了put和get方法 .

    • Java代码"Producer",生成者线程生成数据 .

    • Java代码"Consumer",消费者线程消耗所产生的数据 .

    • Java代码"ProducerConsumer_Main",启动 生产环境 者和消费者线程的主要功能 .

    BlockingQueue.java

    public class BlockingQueue 
    {
        int item;
        boolean available = false;
    
        public synchronized void put(int value) 
        {
            while (available == true)
            {
                try 
                {
                    wait();
                } catch (InterruptedException e) { 
                } 
            }
    
            item = value;
            available = true;
            notifyAll();
        }
    
        public synchronized int get()
        {
            while(available == false)
            {
                try
                {
                    wait();
                }
                catch(InterruptedException e){
                }
            }
    
            available = false;
            notifyAll();
            return item;
        }
    }
    

    Consumer.java

    package com.sukanya.producer_Consumer;
    
    public class Consumer extends Thread
    {
        blockingQueue queue;
        private int number;
        Consumer(BlockingQueue queue,int number)
        {
            this.queue = queue;
            this.number = number;
        }
    
        public void run()
        {
            int value = 0;
    
            for (int i = 0; i < 10; i++) 
            {
                value = queue.get();
                System.out.println("Consumer #" + this.number+ " got: " + value);
            }
        }
    }
    

    ProducerConsumer_Main.java

    package com.sukanya.producer_Consumer;
    
    public class ProducerConsumer_Main 
    {
        public static void main(String args[])
        {
            BlockingQueue queue = new BlockingQueue();
            Producer producer1 = new Producer(queue,1);
            Consumer consumer1 = new Consumer(queue,1);
            producer1.start();
            consumer1.start();
        }
    }
    

相关问题