首页 文章

Java Concurrency - 使用信号量实现监视器 - 所有线程都陷入等待状态,我的理解有问题

提问于
浏览
1

我正在尝试使用Java中的信号量来实现监视器,以创建一个带有一些编写器线程和一些读取器线程的有界缓冲区 .

到目前为止,我做了以下事情:

对于我们可能想要锁定的每个类(即之前,我们将在其中有一个同步的代码块)我添加两个信号量,一个二进制一个锁定在块的开始并在最后解锁(确保仅代码的一个关键部分可以在任何时间执行)而另一个代码可以作为传递notify和notifyAll信号的单元 . 我还创建了一个整数计数器来跟踪调用wait的线程 .

然后在同步代码块的开始,我得到'监视器'的锁,然后在必要时调用我的替换等待指令 .

我有两个线程不断调用put和两个线程不断调用get . 在1秒到10秒之间的任何时间之后,所有线程都会卡住 .

不知怎的,他们都等不及了,我真的看不出来!我花了几天时间思考这个问题 . 有任何想法吗?

有没有人有任何想法导致所有这些线程在这一点上被卡住?

谢谢,

2 回答

  • 0

    我认为你的 notifyAll() 实施存在缺陷 . 要正确执行此操作,您需要进行互锁的比较和交换操作(如 AtomicInteger 等人提供的那样) . 问题在于你的循环:

    // Equivalent of notifyAll()
        for (int i = val(); i>0; i--) {
            dec();
            notifyCalled.release();
        }
    

    两个线程可以竞争并且都将 val() 视为1,然后两个 dec() 调用将成功, blocksWaitingCount 将为-1 . 然后,因为 blocksWaitingCount 不再匹配等待 notifyCalled permit的线程数,所以 notifyAll() 的未来调用将无法通知所有阻塞线程(因为即使 i == 0 ,仍然存在线程阻塞) . 重复几次迭代, notifyAll() 将最终停止释放所有线程,并且所有线程都将被阻止 .

  • 0

    问题是你无法控制notifyCalled.acquire()中哪个线程成功 .

    例如,考虑这种情况:

    • 线程A等待获取

    • 线程B执行put两次并填充缓冲区 . 只有一个线程在等待,所以它调用notifyCalled.release()

    • 线程C执行put并且由于缓冲区已满,它进入等待块 .

    • 3中的notifyCalled.release()导致notifyCalled.aquire()在线程C而不是线程A中成功

    由于缓冲区仍然是完整的,因此线程C(以及所有其他put操作)进入重新进入while循环并再次等待,并且线程B将永远不会收到它正在等待的释放 .

    SOLUTION

    当由于缓冲区已满(或与get操作相反)而未使用在放置操作块中获得的许可时,会出现问题 . 为避免这种情况,可以使用标志释放许可证,以便具有相反操作的另一个等待线程可以尝试获取它 .

    此外,正如Daniel Pryden所述 inc() 也应该在monitorSemaphore锁内移动以避免竞争条件 .

    请注意,这使得用于修改 blocksWaitingCount 的同步块变得不必要(尽管由于它们是无竞争的,因此它们对性能的影响很小) .

    这是修改后的代码

    public class BufferNonSync {
    
        private int[] buffer = new int[] { 0};
        private int start = 0;
        private int last = 0;
        private final int size = 1;
        private int numberInBuffer = 0;
    
        // Monitor variables
        private Semaphore monitorSemaphore = new Semaphore(1);
        private Semaphore notifyCalled = new Semaphore(0);
    
        private int blocksWaitingCount = 0;
    
        public void put(int input, int id) throws InterruptedException {
            monitorSemaphore.acquire();
    
            boolean acquired = false;
            while (numberInBuffer == size) {
                // Equivalent of wait()
                if (acquired) {
                    dec();
                    notifyCalled.release();
                }
                inc();
                monitorSemaphore.release();
                notifyCalled.acquire();
                monitorSemaphore.acquire();
                acquired = true;
            }
    
            // Critical section
            buffer[last] = input;
            last = (last + 1) % size;
            numberInBuffer++;
    
            // Equivalent of notifyAll()
            for (int i = val(); i > 0; i--) {
                dec();
                notifyCalled.release();
            }
    
            monitorSemaphore.release();
        }
    
        public int get(int id) throws InterruptedException {
            monitorSemaphore.acquire();
    
            boolean acquired = false;
            while (numberInBuffer == 0) {
                // Equivalent of wait()
                if (acquired) {
                    dec();
                    notifyCalled.release();
                }
                inc();
                monitorSemaphore.release();
                notifyCalled.acquire();
                monitorSemaphore.acquire();
                acquired = true;
            }
    
            // Critical section
            int temp = buffer[start];
            start = (start + 1) % size;
            numberInBuffer--;
    
            // Equivalent of notifyAll()
            for (int i = val(); i > 0; i--) {
                dec();
                notifyCalled.release();
            }
    
            monitorSemaphore.release(); 
    
            return temp;
        }
    
        private void inc() {
            blocksWaitingCount++;
        }
    
        private void dec() {
            blocksWaitingCount--;
        }
    
        private int val() {
            return blocksWaitingCount;
        }
    }
    

相关问题