首页 文章

关于使用阻塞队列方法的java中的 生产环境 者和消费者模式

提问于
浏览
1

关于java中的线程,我正在研究 生产环境 者和消费者设计模式,我最近在Java 5中进行了探讨,随着Java 5中的BlockingQueue数据结构的介绍,它现在变得更加简单,因为BlockingQueue通过引入阻塞方法隐式地提供了这种控制 . put()和take() . 现在,您不需要使用wait和notify在Producer和Consumer之间进行通信 . 如果队列队列中的队列已满,则BlockingQueue put()方法将被阻止,如果队列为空,则take()将阻塞 . 在下一节中,我们将看到Producer Consumer设计模式的代码示例 . 我已经开发了以下程序,但也请让我知道waut()和notify()的旧样式方法,我想用旧式方法开发相同的逻辑

请问如何实现这一点,经典方式是使用wait()和notify()方法在Producer和Consumer线程之间进行通信,并在完整队列和空队列等个别条件下阻塞它们?

import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.logging.Level;
import java.util.logging.Logger;

public class ProducerConsumerPattern {

    public static void main(String args[]){

     //Creating shared object
     BlockingQueue sharedQueue = new LinkedBlockingQueue();

     //Creating Producer and Consumer Thread
     Thread prodThread = new Thread(new Producer(sharedQueue));
     Thread consThread = new Thread(new Consumer(sharedQueue));

     //Starting producer and Consumer thread
     prodThread.start();
     consThread.start();
    }

}

//Producer Class in java
class Producer implements Runnable {

    private final BlockingQueue sharedQueue;

    public Producer(BlockingQueue sharedQueue) {
        this.sharedQueue = sharedQueue;
    }

    @Override
    public void run() {
        for(int i=0; i<10; i++){
            try {
                System.out.println("Produced: " + i);
                sharedQueue.put(i);
            } catch (InterruptedException ex) {
                Logger.getLogger(Producer.class.getName()).log(Level.SEVERE, null, ex);
            }
        }
    }

}

//Consumer Class in Java
class Consumer implements Runnable{

    private final BlockingQueue sharedQueue;

    public Consumer (BlockingQueue sharedQueue) {
        this.sharedQueue = sharedQueue;
    }

    @Override
    public void run() {
        while(true){
            try {
                System.out.println("Consumed: "+ sharedQueue.take());
            } catch (InterruptedException ex) {
                Logger.getLogger(Consumer.class.getName()).log(Level.SEVERE, null, ex);
            }
        }
    }


}

Output:
Produced: 0
Produced: 1
Consumed: 0
Produced: 2
Consumed: 1
Produced: 3
Consumed: 2
Produced: 4
Consumed: 3
Produced: 5
Consumed: 4
Produced: 6
Consumed: 5
Produced: 7
Consumed: 6
Produced: 8
Consumed: 7
Produced: 9
Consumed: 8
Consumed: 9

3 回答

  • 2

    我可以在睡梦中做这个等待通知的东西(或者至少我认为我可以) . Java 1.4源代码提供了所有这些的漂亮例子,但他们已经转而使用原子来做所有事情,现在它变得更加复杂 . wait-notify确实提供了灵活性和强大功能,但其他方法可以保护您免受并发性的危害并使代码更简单 .

    为此,您需要一些字段,如下所示:

    private final ConcurrentLinkedQueue<Intger>  sharedQueue =
                                                        new ConcurrentLinkedQueue<>();
    private volatile   boolean  waitFlag = true;
    

    你的Producer.run看起来像这样:

    public void run()  {
        for (int i = 0; i < 100000, i++)  {
            System.out.println( "Produced: " + i );
            sharedQueue.add( new Integer( i ) );
            if (waitFlag)       // volatile access is cheaper than synch.
                synchronized (sharedQueue)  { sharedQueue.notifyAll(); }
        }
    }
    

    和Consumer.run:

    public void run()  {
        waitFlag = false;
        for (;;)  {
            Integer  ic = sharedQueue.poll();
            if (ic == null)  {
                synchronized (sharedQueue)  {
                    waitFlag = true;
                    // An add might have come through before waitFlag was set.
                    ic = sharedQueue.poll();
                    if (ic == null)  {
                        try  { sharedQueue.wait(); }
                        catch (InterruptedException ex)  {}
                        waitFlag = false;
                        continue;
                    }
                    waitFlag = true;
                }
            }
            System.out.println( "Consumed: " + ic );
        }
    }
    

    这使同步保持最小化 . 如果一切顺利,那么's only one look at a volatile field per add. You should be able to run any number of producers simultaneously. (Consumer'将会更加棘手 - 你必须放弃 waitFlag . )你可以使用不同的对象进行wait / notifyAll .

  • 4

    如果您想知道另一种方法,请尝试使用ExecutorService

    public static void main(String... args) {
        ExecutorService service = Executors.newSingleThreadExecutor();
        for (int i = 0; i < 100; i++) {
            System.out.println("Produced: " + i);
    
            final int finalI = i;
            service.submit(new Runnable() {
                @Override
                public void run() {
                    System.out.println("Consumed: " + finalI);
                }
            });
        }
        service.shutdown();
    }
    

    只有10个任务, 生产环境 者可以在消费者开始之前完成 . 如果您尝试100个任务,您可能会发现它们是交错的 .

  • 1

    如果您想了解BlockingQueue的工作原理,出于教育目的,您可以随时查看its source code .

    最简单的方法可能是 synchronizetake() 方法,一旦队列满了,有人试图 offer() 一个元素 - 调用 wait() . 当有人拿一个元素时,睡觉的线程 . (尝试从空队列中 take() 时也一样) .
    请记住确保所有 wait() 调用都嵌套在循环中,以便在每次唤醒线程时检查是否满足条件 .

    如果您计划从头开始实现产品目的 - 我强烈反对它 . 您应该尽可能使用现有的,经过测试的库和组件 .

相关问题