首页 文章

确定性监视器脉冲/等待并在 生产环境 者 - 消费者集合中实现超时

提问于
浏览
2

我正在尝试实现支持消费者超时的并发 生产环境 者 - 消费者集合(多个 生产环境 者和消费者) .

现在实际的集合非常复杂(不幸的是,在System.Collections.Concurrent中没有任何工作),但我在这里有一个最小的示例来演示我的问题(看起来有点像 BlockingCollection<T> ) .

public sealed class ProducerConsumerQueueDraft<T>
{
    private readonly Queue<T> queue = new Queue<T>();
    private readonly object locker = new object();

    public void Enqueue(T item)
    {
        lock (locker)
        {
            queue.Enqueue(item);

            /* This "optimization" is broken, as Nicholas Butler points out.
            if(queue.Count == 1) // Optimization
            */
                Monitor.Pulse(locker); // Notify any waiting consumer threads.
        }
    }

    public T Dequeue(T item)
    {
        lock (locker)
        {
            // Surprisingly, this needs to be a *while* and not an *if*
            // which is the core of my problem.
            while (queue.Count == 0)
                Monitor.Wait(locker);

            return queue.Dequeue();
        }
    }

    // This isn't thread-safe, but is how I want TryDequeue to look.
    public bool TryDequeueDesired(out T item, TimeSpan timeout)
    {
        lock (locker)
        {
            if (queue.Count == 0 && !Monitor.Wait(locker, timeout))
            {
                item = default(T);
                return false;
            }

            // This is wrong! The queue may be empty even though we were pulsed!
            item = queue.Dequeue();
            return true;
        }
    }

    // Has nasty timing-gymnastics I want to avoid.
    public bool TryDequeueThatWorks(out T item, TimeSpan timeout)
    {
        lock (locker)
        {
            var watch = Stopwatch.StartNew();
            while (queue.Count == 0)
            {
                var remaining = timeout - watch.Elapsed;

                if (!Monitor.Wait(locker, remaining < TimeSpan.Zero ? TimeSpan.Zero : remaining))
                {
                    item = default(T);
                    return false;
                }
            }
            item = queue.Dequeue();
            return true;
        }
    }
}

这个想法很简单:找到空队列的消费者等待发信号通知,并且生成者(注意:不是 PulseAll ,这将是低效的)他们通知他们等待的项目 .

我的问题是Monitor.Pulse的这个属性:

当调用Pulse的线程释放锁定时,就绪队列中的下一个线程(不一定是脉冲线程)获取锁定 .

这意味着消费者线程C1可以被 生产环境 者线程唤醒以消耗一个项目,但是另一个消费者线程C2可以在C1有机会重新获取它之前获取锁定并消耗该项目,留下C1与给予控制时的空队列 .

这意味着如果队列确实非空,我必须在每个脉冲上防御性地检查消费者代码,如果不是这种情况,则返回并空手等待 .

我的主要问题是它效率低下 - 线程可能会被唤醒以进行工作,然后立即返回等待 . 这样做的一个相关结果是,当它应该是优雅的时候实现具有超时的 TryDequeue 是不必要的困难和低效的(参见 TryDequeueThatWorks )(参见 TryDequeueDesired ) .

How can I twist Monitor.Pulse to do what I want? Alternatively, is there another synchronization primitive that does? Is there a more efficient and/or elegant way to implement a TryDequeue timeout than what I have done?

仅供参考,这是一个测试,用于演示我所需解决方案的问题:

var queue = new ProducerConsumerQueueDraft<int>();

for (int consumer = 0; consumer < 3; consumer++)
    new Thread(() =>
    {
        while (true)
        {
            int item;

            // This call should occasionally throw an exception.
            // Switching to queue.TryDequeueThatWorks should make
            // the problem go away.
            if (queue.TryDequeueDesired(out item, TimeSpan.FromSeconds(1)))
            {
                // Do nothing.
            }
        }

    }).Start();

Thread.Sleep(1000); // Let consumers get up and running

for (int itemIndex = 0; itemIndex < 50000000; itemIndex++)
{
    queue.Enqueue(0);
}

3 回答

  • 1

    我的主要问题是效率低下

    它不是 . 你认为这是一种常见现象,但这种竞争很少发生 . 一旦进入蓝月亮,充其量 . while循环是必要的,以确保它确实发生时没有出错 . 它会 . 不要乱用它 .

    事实上相反,锁设计是有效的,因为它确实允许比赛发生 . 处理它 . 摆弄锁定设计是非常危险的,因为比赛不会经常发生 . 它们是非常随机的,它阻止了足够的测试来证明改变也不起作用,它改变了时机 .

  • 2

    我写了一篇关于此的文章可能有所帮助:

    Thread synchronization: Wait and Pulse demystified

    特别是,它解释了为什么需要 while 循环 .

  • 1

    这是一个简单的基于密钥的混合 生产环境 者 - 消费者队列:

    public class ConflatingConcurrentQueue<TKey, TValue>
    {
        private readonly ConcurrentDictionary<TKey, Entry> entries;
        private readonly BlockingCollection<Entry> queue;
    
        public ConflatingConcurrentQueue()
        {
            this.entries = new ConcurrentDictionary<TKey, Entry>();
            this.queue = new BlockingCollection<Entry>();
        }
    
        public void Enqueue(TValue value, Func<TValue, TKey> keySelector)
        {
            // Get the entry for the key. Create a new one if necessary.
            Entry entry = entries.GetOrAdd(keySelector(value), k => new Entry());
    
            // Get exclusive access to the entry.
            lock (entry)
            {
                // Replace any old value with the new one.
                entry.Value = value;
    
                // Add the entry to the queue if it's not enqueued yet.
                if (!entry.Enqueued)
                {
                    entry.Enqueued = true;
                    queue.Add(entry);
                }
            }
        }
    
        public bool TryDequeue(out TValue value, TimeSpan timeout)
        {
            Entry entry;
    
            // Try to dequeue an entry (with timeout).
            if (!queue.TryTake(out entry, timeout))
            {
                value = default(TValue);
                return false;
            }
    
            // Get exclusive access to the entry.
            lock (entry)
            {
                // Return the value.
                value = entry.Value;
    
                // Mark the entry as dequeued.
                entry.Enqueued = false;
                entry.Value = default(TValue);
            }
    
            return true;
        }
    
        private class Entry
        {
            public TValue Value { get; set; }
            public bool Enqueued { get; set; }
        }
    }
    

    (这可能需要一两次代码审查,但我认为一般来说它是理智的 . )

相关问题