我正在尝试编写一个Windows服务,其 生产环境 者和消费者的工作方式如下:

  • Producer: 在计划的时间,获取所有未处理的项目(在数据库中的行上 Processed = 0 )并将每个项目添加到工作队列中尚未存在的工作队列中

  • Consumer: 不断从工作队列中提取项目并处理它们并更新数据库(行上的 Processed = 1

我试图在C#.NET中查找这个确切数据流的示例,以便我可以利用现有的库 . 但到目前为止,我还没有找到那个 .

我在https://blog.stephencleary.com/2012/11/async-producerconsumer-queue-using.html上看到了这个例子

private static void Produce(BufferBlock<int> queue, IEnumerable<int> values)
{
    foreach (var value in values)
    {
        queue.Post(value);
    }

    queue.Complete();
}

private static async Task<IEnumerable<int>> Consume(BufferBlock<int> queue)
{
    var ret = new List<int>();
    while (await queue.OutputAvailableAsync())
    {
        ret.Add(await queue.ReceiveAsync());
    }

    return ret;
}

这是我想要修改它的“想法”:

while(true)
{
    if(await WorkQueue.OutputAvailableAsync())
    {
        ProcessItem(await WorkQueue.ReceiveAsync());
    }
    else
    {
        await Task.Delay(5000);
    }
}

将是消费者如何工作和

MyTimer.Elapsed += Produce;

static async void Produce(object source, ElapsedEventArgs e)
{
     IEnumerable<Item> items = GetUnprocessedItemsFromDb();
     foreach(var item in items)
         if(!WorkQueue.Contains(w => w.Id == item.Id))
             WorkQueue.Enqueue(item);  
}

将是制片人的工作方式 .

这是我想要做的一个粗略的想法 . 你们中的任何人都可以告诉我正确的方法,或者将我链接到解决此类问题的正确文档吗?