我正在尝试实现支持消费者超时的并发 生产环境 者 - 消费者集合(多个 生产环境 者和消费者) .
现在实际的集合非常复杂(不幸的是,在System.Collections.Concurrent中没有任何工作),但我在这里有一个最小的示例来演示我的问题(看起来有点像 BlockingCollection<T>
) .
public sealed class ProducerConsumerQueueDraft<T>
{
private readonly Queue<T> queue = new Queue<T>();
private readonly object locker = new object();
public void Enqueue(T item)
{
lock (locker)
{
queue.Enqueue(item);
/* This "optimization" is broken, as Nicholas Butler points out.
if(queue.Count == 1) // Optimization
*/
Monitor.Pulse(locker); // Notify any waiting consumer threads.
}
}
public T Dequeue(T item)
{
lock (locker)
{
// Surprisingly, this needs to be a *while* and not an *if*
// which is the core of my problem.
while (queue.Count == 0)
Monitor.Wait(locker);
return queue.Dequeue();
}
}
// This isn't thread-safe, but is how I want TryDequeue to look.
public bool TryDequeueDesired(out T item, TimeSpan timeout)
{
lock (locker)
{
if (queue.Count == 0 && !Monitor.Wait(locker, timeout))
{
item = default(T);
return false;
}
// This is wrong! The queue may be empty even though we were pulsed!
item = queue.Dequeue();
return true;
}
}
// Has nasty timing-gymnastics I want to avoid.
public bool TryDequeueThatWorks(out T item, TimeSpan timeout)
{
lock (locker)
{
var watch = Stopwatch.StartNew();
while (queue.Count == 0)
{
var remaining = timeout - watch.Elapsed;
if (!Monitor.Wait(locker, remaining < TimeSpan.Zero ? TimeSpan.Zero : remaining))
{
item = default(T);
return false;
}
}
item = queue.Dequeue();
return true;
}
}
}
这个想法很简单:找到空队列的消费者等待发信号通知,并且生成者(注意:不是 PulseAll
,这将是低效的)他们通知他们等待的项目 .
我的问题是Monitor.Pulse的这个属性:
当调用Pulse的线程释放锁定时,就绪队列中的下一个线程(不一定是脉冲线程)获取锁定 .
这意味着消费者线程C1可以被 生产环境 者线程唤醒以消耗一个项目,但是另一个消费者线程C2可以在C1有机会重新获取它之前获取锁定并消耗该项目,留下C1与给予控制时的空队列 .
这意味着如果队列确实非空,我必须在每个脉冲上防御性地检查消费者代码,如果不是这种情况,则返回并空手等待 .
我的主要问题是它效率低下 - 线程可能会被唤醒以进行工作,然后立即返回等待 . 这样做的一个相关结果是,当它应该是优雅的时候实现具有超时的 TryDequeue
是不必要的困难和低效的(参见 TryDequeueThatWorks
)(参见 TryDequeueDesired
) .
How can I twist Monitor.Pulse to do what I want? Alternatively, is there another synchronization primitive that does? Is there a more efficient and/or elegant way to implement a TryDequeue timeout than what I have done?
仅供参考,这是一个测试,用于演示我所需解决方案的问题:
var queue = new ProducerConsumerQueueDraft<int>();
for (int consumer = 0; consumer < 3; consumer++)
new Thread(() =>
{
while (true)
{
int item;
// This call should occasionally throw an exception.
// Switching to queue.TryDequeueThatWorks should make
// the problem go away.
if (queue.TryDequeueDesired(out item, TimeSpan.FromSeconds(1)))
{
// Do nothing.
}
}
}).Start();
Thread.Sleep(1000); // Let consumers get up and running
for (int itemIndex = 0; itemIndex < 50000000; itemIndex++)
{
queue.Enqueue(0);
}
3 回答
它不是 . 你认为这是一种常见现象,但这种竞争很少发生 . 一旦进入蓝月亮,充其量 . while循环是必要的,以确保它确实发生时没有出错 . 它会 . 不要乱用它 .
事实上相反,锁设计是有效的,因为它确实允许比赛发生 . 处理它 . 摆弄锁定设计是非常危险的,因为比赛不会经常发生 . 它们是非常随机的,它阻止了足够的测试来证明改变也不起作用,它改变了时机 .
我写了一篇关于此的文章可能有所帮助:
Thread synchronization: Wait and Pulse demystified
特别是,它解释了为什么需要
while
循环 .这是一个简单的基于密钥的混合 生产环境 者 - 消费者队列:
(这可能需要一两次代码审查,但我认为一般来说它是理智的 . )