首页 文章

如何(以及如果)使用TPL编写单个使用者队列?

提问于
浏览
16

我最近听说过很多关于.NET 4.0中TPL的播客 . 他们中的大多数使用任务描述后台活动,如下载图像或进行计算,以便工作不会干扰GUI线程 .

我工作的大多数代码都有更多的多 生产环境 者/单一消费者风格,其中来自多个来源的工作项必须排队,然后按顺序处理 . 一个示例是日志记录,其中来自多个线程的日志行被顺序化为单个队列,以便最终写入文件或数据库 . 来自任何单一来源的所有记录必须保持有序,并且来自同一时刻的记录应该在最终输出中彼此“接近” .

因此,多个线程或任务或任何调用队列的任何东西:

lock( _queue ) // or use a lock-free queue!
{
   _queue.enqueue( some_work );
   _queueSemaphore.Release();
}

专用工作线程处理队列:

while( _queueSemaphore.WaitOne() )
{
   lock( _queue )
   {
      some_work = _queue.dequeue();     
   }
   deal_with( some_work );
}

将工作线程专门用于这些任务的消费者方面似乎总是合理的 . 我应该使用TPL中的某些构造来编写未来的程序吗?哪一个?为什么?

4 回答

  • 2

    您可以使用长时间运行的任务来处理Wilka建议的BlockingCollection中的项目 . 这是一个非常符合您的应用程序要求的示例 . 你会看到这样的输出:

    Log from task B
    Log from task A
    Log from task B1
    Log from task D
    Log from task C
    

    并非A,B,C&D的输出看起来是随机的,因为它们取决于线程的开始时间,但B总是出现在B1之前 .

    public class LogItem 
    {
        public string Message { get; private set; }
    
        public LogItem (string message)
        {
            Message = message;
        }
    }
    
    public void Example()
    {
        BlockingCollection<LogItem> _queue = new BlockingCollection<LogItem>();
    
        // Start queue listener...
        CancellationTokenSource canceller = new CancellationTokenSource();
        Task listener = Task.Factory.StartNew(() =>
            {
                while (!canceller.Token.IsCancellationRequested)
                {
                    LogItem item;
                    if (_queue.TryTake(out item))
                        Console.WriteLine(item.Message);
                }
            },
        canceller.Token, 
        TaskCreationOptions.LongRunning,
        TaskScheduler.Default);
    
        // Add some log messages in parallel...
        Parallel.Invoke(
            () => { _queue.Add(new LogItem("Log from task A")); },
            () => { 
                _queue.Add(new LogItem("Log from task B")); 
                _queue.Add(new LogItem("Log from task B1")); 
            },
            () => { _queue.Add(new LogItem("Log from task C")); },
            () => { _queue.Add(new LogItem("Log from task D")); });
    
        // Pretend to do other things...
        Thread.Sleep(1000);
    
        // Shut down the listener...
        canceller.Cancel();
        listener.Wait();
    }
    
  • 3

    我知道这个答案大约晚了一年,但看看MSDN .

    其中显示了如何从TaskScheduler类创建LimitedConcurrencyLevelTaskScheduler . 通过将并发性限制为单个任务,然后应该按顺序处理您的任务,因为它们通过以下方式排队:

    LimitedConcurrencyLevelTaskScheduler lcts = new LimitedConcurrencyLevelTaskScheduler(1);
    TaskFactory factory = new TaskFactory(lcts);
    
    factory.StartNew(()=> 
    {
       // your code
    });
    
  • 13

    我不确定TPL在你的用例中是否足够 . 根据我的理解,TPL的主要用例是将一个巨大的任务分成几个可以并行运行的小任务 . 例如,如果您有一个大的列表,并且您想要对每个元素应用相同的转换 . 在这种情况下,您可以在列表的子集上应用转换 .

    你描述的案例对我来说似乎不适合这张照片 . 在您的情况下,您没有几个并行执行相同操作的任务 . 你有几个不同的任务,每个任务都是自己的工作( 生产环境 者)和一个消耗的任务 . 如果你想拥有多个消费者,也许TPL可以用于消费者部分,因为在这种情况下,每个消费者都做同样的工作(假设你找到一个逻辑来强制你寻找的时间一致性) .

    嗯,这当然只是我对这个问题的个人看法

    Health 长寿·繁荣昌盛

  • 5

    这听起来像BlockingCollection对你来说会很方便 . 因此,对于上面的代码,您可以使用类似的东西(假设 _queueBlockingCollection 实例):

    // for your producers 
    _queue.Add(some_work);
    

    处理队列的专用工作线程:

    foreach (var some_work in _queue.GetConsumingEnumerable())
    {
        deal_with(some_work);
    }
    

    注意:当你的所有制作人都完成制作时,你需要在 _queue 上调用 CompleteAdding() ,否则你的消费者会等待更多的工作 .

相关问题