首页 文章

对队列进行分区以进行并行处理

提问于
浏览
0

我正在编写一个从数千个客户端提取数据的服务器 . 我想并行处理这些数据,但必须按顺序串行处理每个客户端的数据 .

例如,如果我有客户端A,B和C,我将同时处理来自所有3个客户端的数据,但是每个客户端一次只处理一个项目 .

我的第一次尝试是创建一种分区队列,每个分区使用一个 Task 消息: ConcurrentDictionary<string, BlockingCollection<T>> ,其中 string 是客户端标识符 . 有了这个,我从每个客户端开始消耗 Task 并将它们保存在字典_2522909中 . 任务很简单,并使用 GetConsumingEnumerable

return Task.Run(() =>
{
    foreach (var item in list.GetConsumingEnumerable())
    {
        this.action(item);
    }
});

这种方法效果很好,但最多只有大约75个客户端 - 在此之后,每秒项目的性能迅速下降,可能是由于任务之间的争用 .

有一个更好的方法吗?也许使用较小的,固定数量的消费线程,可以某种方式旋转它们正在消耗的分区?

我在.NET Framework本身没有找到任何有用的东西,但我觉得有可能以某种方式利用TPL?

EDIT 我不是简单地询问如何处理具有多个 生产环境 者和单个消费者的单个队列 . 此外,我不仅对基于TPL的解决方案感兴趣;我只是觉得它可能有所帮助 .

1 回答

  • 0

    我想通过考虑为每个客户端 Build 一个单独的分区来让我陷入困境 - 我想到的是根据客户端ID的哈希码(这是一个GUID)分成固定数量的队列 .

    这样做的好处是可以控制并发性,并且实现起来也很简单 .

    所以我的队列定义如下: Dictionary<int, BlockingCollection<T>> queues;

    数据添加如下:

    public void Enqueue(T val)
    {
        var bucket = val.Id.GetHashCode() % this.maxConcurrent;
        this.queues[bucket].Add(val);
    }
    

相关问题