首页 文章

在Java中实现多个使用者多 生产环境 者的问题

提问于
浏览
3

我用一个阻塞队列编写了一个简单的消费者 - 生产环境 者问题,该队列有多个 生产环境 者和多个消费者接受并将整数放在队列中 . 但是,当我尝试测试它时,结果并不理想,例如队列的大小不正确 . 我不认为消费者和 生产环境 者规模正在同步 . 此外,我对 生产环境 者和消费者都进行了2秒钟的睡眠,但在测试时,每两秒就打印出2个 生产环境 者和2个消费者的结果 . 有谁知道我做错了什么?也许我开始错误的线程?我评论了另一种方式,但结果仍然是错误的 .

结果:

run:
Producing 425     Thread-0 size left 0
Consuming 890     Thread-3 size left 0
Consuming 425     Thread-2 size left 0
Producing 890     Thread-1 size left 0
Consuming 192     Thread-2 size left 0
Consuming 155     Thread-3 size left 0
Producing 155     Thread-1 size left 0
Producing 192     Thread-0 size left 0
Consuming 141     Thread-2 size left 1
Producing 141     Thread-0 size left 0
Producing 919     Thread-1 size left 0
Consuming 919     Thread-3 size left 0
Producing 361     Thread-1 size left 0
Producing 518     Thread-0 size left 0
Consuming 518     Thread-3 size left 0
Consuming 361     Thread-2 size left 0
Producing 350     Thread-0 size left 1
Consuming 350     Thread-3 size left 0
Consuming 767     Thread-2 size left 0
Producing 767     Thread-1 size left 0

制片人

import java.util.Random;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.logging.Level;
import java.util.logging.Logger;

public class Producer implements Runnable {

    BlockingQueue<Integer> items = new LinkedBlockingQueue<>();

    public Producer(BlockingQueue<Integer> q) {
        this.items = q;
    }

    private int generateRandomNumber(int start, int end) {
        Random rand = new Random();
        int number = start + rand.nextInt(end - start + 1);
        return number;
    }

    public void run() {
        for (int i = 0; i < 5; i++) {
            int rand = generateRandomNumber(100, 1000);
            try {
                items.put(rand);
                System.out.println("Producing " + rand + "     " + Thread.currentThread().getName() + " size left " + items.size());
                Thread.sleep(3000);
            } catch (InterruptedException ex) {
                Logger.getLogger(ProducerConsumer.class.getName()).log(Level.SEVERE, null, ex);
            }
        }
    }
}

消费者

import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.logging.Level;
import java.util.logging.Logger;

public class Consumer implements Runnable {

    BlockingQueue<Integer> items = new LinkedBlockingQueue<>();

    public Consumer(BlockingQueue<Integer> q) {
        this.items = q;
    }

    public void run() {
        while (true) {
            try {
                System.out.println("Consuming " + items.take() + "     " + Thread.currentThread().getName() + " size left " + items.size());
                Thread.sleep(3000);
            } catch (InterruptedException ex) {
                Logger.getLogger(ProducerConsumer.class.getName()).log(Level.SEVERE, null, ex);
            }
        }
    }
}

测试

import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;

public class ProducerConsumer {
    public static void main(String args[]) {
        BlockingQueue<Integer> items = new LinkedBlockingQueue<>();

        Producer producer = new Producer(items);
        Consumer consumer = new Consumer(items);
        Thread t1 = new Thread(producer);
        Thread t2 = new Thread(producer);
        Thread t3 = new Thread(consumer);
        Thread t4 = new Thread(consumer);
        /*
        Thread t1 = new Thread(new Producer());
        Thread t2 = new Thread(new Producer());
        Thread t3 = new Thread(new Consumer());
        Thread t4 = new Thread(new Consumer());
        */
        t1.start();
        t2.start();
        t3.start();
        t4.start();
    }
}

UPDATE: I tried to implement the reentrant lock but my program stops at the lock line. Any help? 消费者

import java.util.concurrent.locks.ReentrantReadWriteLock;

public class Consumer implements Runnable { 

    //private BlockingQueue<Integer> items = new LinkedBlockingQueue<>(); 
    private MyBlockingQ items;

    public Consumer(MyBlockingQ q) { 
        this.items = q; 
    } 

    public void run() { 
        while (true) { 
            items.remove();
            //Thread.sleep(1000);
        }
    }
}

制片人

import java.util.Random;

public class Producer implements Runnable {
    private MyBlockingQ items;
    public Producer(MyBlockingQ q) {
        this.items = q;
    }

    private int generateRandomNumber(int start, int end) {
        Random rand = new Random();
        int number = start + rand.nextInt(end - start + 1);
        return number;
    }

    public void run() {
        for (int i = 0; i < 5; i++) {
            int rand = generateRandomNumber(100, 1000);
            items.add(rand);
        }
    }
}

MyBlockingQ(共享资源)

import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.logging.Level;
import java.util.logging.Logger;

public class MyBlockingQ {

    private BlockingQueue<Integer> items = new LinkedBlockingQueue<>();
    private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock();

    public MyBlockingQ() {
    }

    public void add(Integer i) {
        lock.writeLock().lock();
        try {
            items.put(i);
            System.out.println("Producing " + i + "     " + Thread.currentThread().getName() + " size left " + items.size());
        } catch (InterruptedException ex) {
            Logger.getLogger(ProducerConsumer.class.getName()).log(Level.SEVERE, null, ex);
        } finally {
            lock.writeLock().unlock();
        }
    }

    public void remove() {
        lock.writeLock().lock();
        try {
            int taken = items.take();
            System.out.println("Consuming " + taken + "     " + Thread.currentThread().getName() + " size left " + items.size());
        } catch (InterruptedException ex) {
            Logger.getLogger(ProducerConsumer.class.getName()).log(Level.SEVERE, null, ex);
        } finally {
            lock.writeLock().unlock();
        }
    }
}

测试

import java.util.concurrent.BlockingQueue; 
import java.util.concurrent.LinkedBlockingQueue; 

public class ProducerConsumer { 
    public static void main(String args[]) { 
        MyBlockingQ items = new MyBlockingQ(); 

        System.out.println("starting");
        Thread t1 = new Thread(new Producer(items)); 
        Thread t2 = new Thread(new Producer(items)); 
        Thread t3 = new Thread(new Consumer(items)); 
        Thread t4 = new Thread(new Consumer(items)); 
        t1.start(); 
        t2.start(); 
        t3.start(); 
        t4.start(); 
    } 
}

3 回答

  • 0

    你可能会对这部分输出感到困惑:

    Producing 425     Thread-0 size left 0
    Consuming 890     Thread-3 size left 0
    Consuming 425     Thread-2 size left 0
    Producing 890     Thread-1 size left 0
    

    问题:在Thread-1产生之前,Thread-3如何消耗890个项目?

    答案:Thread-3在生成之前不会消耗Thread-1中的项目 .

    原因:当Thread-1将项目放入队列时,Thread-3可能已经在等待从队列中获取的项目 . 所以Thread-1放置物品:

    items.put(rand);
    

    并且在线程1之前跳转到下一行并打印有关它生成的项目的信息,线程3执行以下行:

    System.out.println("Consuming " + items.take() + "     " + Thread.currentThread().getName() + " size left " + items.size());
    

    只有这时Thread-1才会执行println:

    System.out.println("Producing " + rand + "     " + Thread.currentThread().getName() + " size left " + items.size());
    

    因此,您可以在控制台中看到这些有趣的结果 .

    您可能想要阅读synchronizing . 有两种方法可以解决您的问题:

    • 同步方法

    • 同步语句(brimborium使用的方法)

    同步锁定对同步块内对象的访问 . 这意味着每个其他方法必须等待轮到它才能访问对象 .

    因此,如果您对Producer和Consumer中的项目使用同步,那么:

    • 消费者在 生产环境 者放置物品时无法取物品 .

    • 生产环境 者在消费者购买时不能放置物品 .

    我的情况是当商品为空且消费者的方法锁定商品时,程序将落入所谓的 deadlock . 生产环境 者必须等待消费者解锁,但它永远不会发生,因为消费者正在等待采取物品(必须由 生产环境 者放置) .

    此外,我对 生产环境 者和消费者都进行了2秒钟的睡眠,但在测试时,每两秒就打印出2个 生产环境 者和2个消费者的结果 .

    这是你应该期待的 . 在Test类中,您将 生产环境 2个 生产环境 者和2个消费者 .

    Thread t1 = new Thread(producer);
    Thread t2 = new Thread(producer);
    Thread t3 = new Thread(consumer);
    Thread t4 = new Thread(consumer);
    
    t1.start();
    t2.start();
    t3.start();
    t4.start();
    
  • 5

    这两行

    items.put(rand);
    System.out.println("Producing " + rand + "     " + Thread.currentThread().getName() + " size left " + items.size());
    

    没有同步 . 生产环境 者可能会将数字放入队列中,但是当从放入其中的线程显示队列的大小时,消费者可能已经消耗了一个数字 .

  • 3

    您需要同步 items 访问权限 . 我只是略微改变了你的例子,结果看起来不错 . 由于同步,您还必须处理死锁 . 在这种情况下,只要您不在 Consumer 中的 items.take() 上进行同步就应该没问题 .

    你的新测试:

    import java.util.concurrent.BlockingQueue;
    import java.util.concurrent.LinkedBlockingQueue;
    
    public class ProducerConsumer {
        public static void main(String args[]) {
            BlockingQueue<Integer> items = new LinkedBlockingQueue<>();
    
            Thread t1 = new Thread(new Producer(items));
            Thread t2 = new Thread(new Producer(items));
            Thread t3 = new Thread(new Consumer(items));
            Thread t4 = new Thread(new Consumer(items));
            t1.start();
            t2.start();
            t3.start();
            t4.start();
        }
    }
    

    消费者

    import java.util.concurrent.BlockingQueue;
    import java.util.concurrent.LinkedBlockingQueue;
    import java.util.logging.Level;
    import java.util.logging.Logger;
    
    public class Consumer implements Runnable {
    
        BlockingQueue<Integer> items = new LinkedBlockingQueue<>();
    
        public Consumer(BlockingQueue<Integer> q) {
            this.items = q;
        }
    
        public void run() {
            while (true) {
                try {
                    System.out.println("Consuming " + items.take() + "     " + Thread.currentThread().getName() + " size left " + items.size());
                    Thread.sleep(1000);
                } catch (InterruptedException ex) {
                    Logger.getLogger(ProducerConsumer.class.getName()).log(Level.SEVERE, null, ex);
                }
            }
        }
    }
    

    和制片人

    import java.util.Random;
    import java.util.concurrent.BlockingQueue;
    import java.util.concurrent.LinkedBlockingQueue;
    import java.util.logging.Level;
    import java.util.logging.Logger;
    
    public class Producer implements Runnable {
    
        BlockingQueue<Integer> items = new LinkedBlockingQueue<>();
    
        public Producer(BlockingQueue<Integer> q) {
            this.items = q;
        }
    
        private int generateRandomNumber(int start, int end) {
            Random rand = new Random();
            int number = start + rand.nextInt(end - start + 1);
            return number;
        }
    
        public void run() {
            for (int i = 0; i < 5; i++) {
                int rand = generateRandomNumber(100, 1000);
                try {
                    synchronized (items) {
                        items.put(rand);
                        System.out.println("Producing " + rand + "     " + Thread.currentThread().getName() + " size left " + items.size());
                    }
                    Thread.sleep(1000);
                } catch (InterruptedException ex) {
                    Logger.getLogger(ProducerConsumer.class.getName()).log(Level.SEVERE, null, ex);
                }
            }
        }
    }
    

相关问题