我正在尝试编写一个Windows服务,其 生产环境 者和消费者的工作方式如下:
-
Producer: 在计划的时间,获取所有未处理的项目(在数据库中的行上
Processed = 0
)并将每个项目添加到工作队列中尚未存在的工作队列中 -
Consumer: 不断从工作队列中提取项目并处理它们并更新数据库(行上的
Processed = 1
)
我试图在C#.NET中查找这个确切数据流的示例,以便我可以利用现有的库 . 但到目前为止,我还没有找到那个 .
我在https://blog.stephencleary.com/2012/11/async-producerconsumer-queue-using.html上看到了这个例子
private static void Produce(BufferBlock<int> queue, IEnumerable<int> values)
{
foreach (var value in values)
{
queue.Post(value);
}
queue.Complete();
}
private static async Task<IEnumerable<int>> Consume(BufferBlock<int> queue)
{
var ret = new List<int>();
while (await queue.OutputAvailableAsync())
{
ret.Add(await queue.ReceiveAsync());
}
return ret;
}
这是我想要修改它的“想法”:
while(true)
{
if(await WorkQueue.OutputAvailableAsync())
{
ProcessItem(await WorkQueue.ReceiveAsync());
}
else
{
await Task.Delay(5000);
}
}
将是消费者如何工作和
MyTimer.Elapsed += Produce;
static async void Produce(object source, ElapsedEventArgs e)
{
IEnumerable<Item> items = GetUnprocessedItemsFromDb();
foreach(var item in items)
if(!WorkQueue.Contains(w => w.Id == item.Id))
WorkQueue.Enqueue(item);
}
将是制片人的工作方式 .
这是我想要做的一个粗略的想法 . 你们中的任何人都可以告诉我正确的方法,或者将我链接到解决此类问题的正确文档吗?