我正在编写一个从数千个客户端提取数据的服务器 . 我想并行处理这些数据,但必须按顺序串行处理每个客户端的数据 .
例如,如果我有客户端A,B和C,我将同时处理来自所有3个客户端的数据,但是每个客户端一次只处理一个项目 .
我的第一次尝试是创建一种分区队列,每个分区使用一个 Task
消息: ConcurrentDictionary<string, BlockingCollection<T>>
,其中 string
是客户端标识符 . 有了这个,我从每个客户端开始消耗 Task
并将它们保存在字典_2522909中 . 任务很简单,并使用 GetConsumingEnumerable
:
return Task.Run(() =>
{
foreach (var item in list.GetConsumingEnumerable())
{
this.action(item);
}
});
这种方法效果很好,但最多只有大约75个客户端 - 在此之后,每秒项目的性能迅速下降,可能是由于任务之间的争用 .
有一个更好的方法吗?也许使用较小的,固定数量的消费线程,可以某种方式旋转它们正在消耗的分区?
我在.NET Framework本身没有找到任何有用的东西,但我觉得有可能以某种方式利用TPL?
EDIT 我不是简单地询问如何处理具有多个 生产环境 者和单个消费者的单个队列 . 此外,我不仅对基于TPL的解决方案感兴趣;我只是觉得它可能有所帮助 .
1 回答
我想通过考虑为每个客户端 Build 一个单独的分区来让我陷入困境 - 我想到的是根据客户端ID的哈希码(这是一个GUID)分成固定数量的队列 .
这样做的好处是可以控制并发性,并且实现起来也很简单 .
所以我的队列定义如下:
Dictionary<int, BlockingCollection<T>> queues;
数据添加如下: