首页 文章

.NET 生产环境 者 - 消费者问题

提问于
浏览
1

我正在为一个Web服务编写一个相对简单的“代理”应用程序 . 一般的想法是TCP服务器(带有异步连接)将从客户端读取(字符串)数据,并将该数据(作为读回调函数的一部分)放入两个队列(Q1和Q2)之一 . 另一个线程将读取这些队列中的数据并将其传递给Web服务 . Q1中的数据应该优先于Q2中的任何数据 .

我一直在阅读有关 生产环境 者/消费者模式的内容,看起来这就像我正在尝试实施的队列一样 . 由于我的入队和出队操作将在不同的线程上发生,因此很明显我的队列需要是线程安全的并且支持某种锁定机制?这是一个.NET 4.0应用程序,我在新的BlockingCollection和ConcurrentQueue类上看到了文档,但是我不确定在这种情况下到底有什么不同或者我是如何实现它们的 . 任何人都可以对此有所了解吗?谢谢!

2 回答

  • 0

    我会像下面的课一样 . 当您生成项目以将其添加到其中一个队列时,可以调用 Enqueue() . 此方法始终(几乎)立即返回 . 在另一个线程中,当您准备使用项目时,请调用 Dequeue() . 它尝试首先从高优先级队列中获取 . 如果当前在任何队列中没有可用项目,则呼叫阻止 . 完成制作后,调用 Complete() . 在完成该呼叫并且两个队列都为空之后,下一个呼叫(或当前被阻止的呼叫)到 Dequeue() 将抛出 InvalidOperationException .

    如果您的 生产环境 者可能比您的消费者长时间更快,您应该绑定队列( new BlockingCollection<T>(capacity) ) . 但在这种情况下,如果您只有一个线程同时生成低优先级项目和高优先级项目,那么它将立即获得一百万个低优先级项目 .

    class Worker<T>
    {
        BlockingCollection<T> m_highPriorityQueue = new BlockingCollection<T>();
        BlockingCollection<T> m_lowPriorityQueue = new BlockingCollection<T>();
    
        public void Enqueue(T item, bool highPriority)
        {
            BlockingCollection<T> queue;
            if (highPriority)
                queue = m_highPriorityQueue;
            else
                queue = m_lowPriorityQueue;
    
            queue.Add(item);
        }
    
        public T Dequeue()
        {
            T result;
    
            if (!m_highPriorityQueue.IsCompleted)
            {
                if (m_highPriorityQueue.TryTake(out result))
                    return result;
            }
    
            if (!m_lowPriorityQueue.IsCompleted)
            {
                if (m_lowPriorityQueue.TryTake(out result))
                    return result;
            }
    
            if (m_highPriorityQueue.IsCompleted && m_lowPriorityQueue.IsCompleted)
                throw new InvalidOperationException("All work is done.");
            else
            {
                try
                {
                    BlockingCollection<T>.TakeFromAny(
                        new[] { m_highPriorityQueue, m_lowPriorityQueue },
                        out result);
                }
                catch (ArgumentException ex)
                {
                    throw new InvalidOperationException("All work is done.", ex);
                }
    
                return result;
            }
        }
    
        public void Complete()
        {
            m_highPriorityQueue.CompleteAdding();
            m_lowPriorityQueue.CompleteAdding();
        }
    }
    
  • 2

    BlockingCollection默认使用ConcurrentQueue . 应该非常适合您的应用 . 如果将F#与邮箱和异步块一起使用可能会更容易 . 我之前做了一个常见实现的示例帖子 .

    Map/reduce with F# Agent or MailboxProcessor

相关问题