首页 文章

在多个线程上同步Java中的队列

提问于
浏览
3

我理解同步的概念,但我现在确定它为什么以这种方式实现,所以我需要一点帮助:

我有2个帖子:

PeriodicalThread 将定期接收数据(假设每5秒钟)并将其放入 Queue (暂时使用 ArrayDeque ,但我不知道其他任何 Queue 实现是否会更好)

ProccessThread 将不断检查 Queue 以查看它是否为空 . 如果它不为空,它将处理数据 (FIFO) .

所以,起初我的实现将是:

// Both threads are inner class so they have access to Queue

private Queue queue;
private boolean isReadyToProccess;


class PeriodicalThread extends Thread {
    public void run() {
        while(true) {
           if(isNewDataAvailable) {
                // create Data object
                queue.add(data);
           }
        }
    }
}

class ProcessThread extends Thread {
    public void run() {
        while(true) {
           if(!queue.isEmpty() && isReadyToProccess) {
               Data data = queue.poll();
               processData(data);
           }
        }
    }
}

private void processData(Data data) {
    // this method send data over network, and the server response callback
    // changes isReadyToProcess value to true.
}

然后当想要处理 synchronization 时,我不知道我是否应该使用 lock 对象(以及它是如何实现的)或者是否已经存在一个线程安全的包 Queue 实现(因为 add()poll() 方法)

Edit: 我忘记了标志 isReadyToProcess ,表示下一个队列 Data 对象是......好吧,准备好被处理了 . 此标志也应同步 .

5 回答

  • 3

    ArrayDeque 不支持并发 . 相反,使用支持并发工作的真实队列,如BlockingQueue,以及 java.util.concurrent 包中的一个实现 . 我建议使用LinkedBlockingQueue .

    如果您需要在线程之间共享标志,最好使用AtomicBoolean而不是手动同步原始 boolean 字段 .

    注意:如果您将使用并发进程,最好使用已经支持锁定和同步开箱即用的java.util.concurrent包提供的类 .

  • 1

    您正在寻找Blocking Queue实施

    这提供了您正在寻找的开箱即用的功能 . 这就是为什么它最适合 生产环境 者消费者的例子 .

    这是一个例子

    public class BlockingQueueExample {
    
        public static void main(String[] args) throws Exception {
    
            BlockingQueue queue = new ArrayBlockingQueue(1024);
    
            Producer producer = new Producer(queue);
            Consumer consumer = new Consumer(queue);
    
            new Thread(producer).start();
            new Thread(consumer).start();
    
            Thread.sleep(4000);
        }
    }
    
    
    public class Producer implements Runnable{
    
        protected BlockingQueue queue = null;
    
        public Producer(BlockingQueue queue) {
            this.queue = queue;
        }
    
        public void run() {
            try {
                queue.put("1");
                Thread.sleep(1000);
                queue.put("2");
                Thread.sleep(1000);
                queue.put("3");
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }
    
    
    public class Consumer implements Runnable{
    
        protected BlockingQueue queue = null;
    
        public Consumer(BlockingQueue queue) {
            this.queue = queue;
        }
    
        public void run() {
            try {
                System.out.println(queue.take());
                System.out.println(queue.take());
                System.out.println(queue.take());
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }
    
  • 1

    查看ArrayBlockingQueue和另一个BlockingQueue实现 .

    来自文档:

    一个队列,它还支持在检索元素时等待队列变为非空的操作,并在存储元素时等待队列中的空间可用 .

    另一种方法是ConcurrentLinkedQueue .

    您可以在此处阅读有关并发队列的更多信息:

    http://docs.oracle.com/javase/tutorial/collections/implementations/queue.html

  • 0

    在Java 5之前,您必须实现等待/通知机制 . 从Java 5开始,您可以使用BlockingQueue接口的实现来解决 生产环境 者/消费者问题 .

    看看这个:

    http://www.javamex.com/tutorials/synchronization_producer_consumer.shtml

  • 2

    你可以使用java Blocking Queue或Collections.synchronizedList(new LinkedList <>()),我喜欢Collections.synchronized ...因为它有效地使你的集合线程安全,你不会打扰锁和标志和类似的东西..只是写出逻辑可读代码 .

    LinkedList将为您提供addFirst()addLast()和ofcourse getFirst(),getLast()来实现FIFO或LIFO行为......

相关问题