我想使用异步 生产环境 者/消费者队列(AsyncEx lib)通过总线一次发送一条消息 . 现在我只是通过异步阻塞来实现这一点 . 它工作正常,但我无法控制队列:(
所以我提出了以下解决方案,问题是没有从队列中删除已取消的任务 . 如果我将队列限制为10(因为每个消息需要1s发送,最大队列时间应为10s左右)并且队列包含8个等待任务和2个已取消任务,则下一个排队任务将抛出InvalidOperationException,尽管无论如何都不会发送两个被取消的任务 .
也许有更好的方法来做到这一点:D
class Program
{
static AsyncProducerConsumerQueue<Tuple<string, TaskCompletionSource>> s_Queue =
new AsyncProducerConsumerQueue<Tuple<string, TaskCompletionSource>>();
static void Main()
{
StartAsync().Wait();
}
static async Task StartAsync()
{
var sendingTask = StartSendingAsync();
var tasks = new List<Task>();
using (var cts = new CancellationTokenSource(TimeSpan.FromSeconds(8)))
{
for (var i = 0; i < 10; i++)
{
tasks.Add(EnqueueMessageAsync("Message " + i, cts.Token));
}
try
{
await Task.WhenAll(tasks);
Console.WriteLine("All messages sent.");
}
catch (TaskCanceledException)
{
Console.WriteLine("At least one task was canceled.");
}
}
s_Queue.CompleteAdding();
await sendingTask;
s_Queue.Dispose();
Console.WriteLine("Queue completed.");
Console.WriteLine("Press any key to continue...");
Console.ReadKey();
}
static async Task EnqueueMessageAsync(string message, CancellationToken token)
{
var tcs = new TaskCompletionSource();
using (token.Register(() => tcs.TrySetCanceled()))
{
await s_Queue.EnqueueAsync(new Tuple<string, TaskCompletionSource>(message, tcs));
Console.WriteLine("Thread '{0}' - {1}: {2} queued.", Thread.CurrentThread.ManagedThreadId, DateTime.Now.TimeOfDay, message);
await tcs.Task;
}
}
static async Task SendMessageAsync(string message)
{
await Task.Delay(TimeSpan.FromSeconds(1));
Console.WriteLine("Thread '{0}' - {1}: {2} sent.", Thread.CurrentThread.ManagedThreadId, DateTime.Now.TimeOfDay, message);
}
static async Task StartSendingAsync()
{
while (await s_Queue.OutputAvailableAsync())
{
var t = await s_Queue.DequeueAsync();
if (t.Item2.Task.IsCanceled || t.Item2.Task.IsFaulted) continue;
await SendMessageAsync(t.Item1);
t.Item2.TrySetResult();
}
}
}
Edit 1:
正如svik指出的那样,只有在队列已经完成时才会抛出InvalidOperationException . 所以这个解决方案甚至没有解决我的等待任务的非托管“队列”的初始问题 . 如果有例如超过10次调用/ 10s我得到了一个完整的队列和一个额外的非托管“队列”等待任务,比如我的异步阻塞方法(AsyncMonitor) . 我想我必须提出一些其他的解决方案呢......
Edit 2:
我有N个不同的消息生成器(我不知道有多少消息因为它不是我的代码)而且只有一个消费者通过总线发送消息并检查它们是否被正确发送(不是真正的字符串消息) .
以下代码模拟了代码应该中断的情况(队列大小为10):
-
排队10条消息(超时为5秒)
-
等待5秒(发送消息0-4,取消消息5-9)
-
排队11条新消息(没有超时)
-
消息10 - 19应该排队,因为队列只包含已取消的消息
-
消息20应抛出异常(例如QueueOverflowException),因为队列已满,生成器代码将处理或不处理
制片人:
using (var cts = new CancellationTokenSource(TimeSpan.FromSeconds(5)))
{
for (var i = 0; i < 10; i++) { tasks.Add(EnqueueMessageAsync("Message " + i, cts.Token)); }
await Task.Delay(TimeSpan.FromSeconds(5));
for (var i = 10; i < 21; i++) { tasks.Add(EnqueueMessageAsync("Message " + i, default(CancellationToken))); }
try
{
await Task.WhenAll(tasks);
Console.WriteLine("All messages sent.");
}
catch (TaskCanceledException)
{
Console.WriteLine("At least one task was canceled.");
Console.WriteLine("Press any key to complete queue...");
Console.ReadKey();
}
}
我的目标是,我希望完全控制所有应该发送的消息,但在我之前发布的代码中并非如此,因为我只能控制队列中的消息,但不能控制消息等待入队(可能有10000条异步等待入队的消息,我不知道=> 生产环境 者代码无论如何也无法正常工作,因为它需要永远发送等待的所有消息...)
我希望这能让我更清楚我想达到的目标;)
1 回答
我不确定回答我自己的任务是否正常,所以我不会将其标记为答案,也许有人想出一个更好的解决方案:P
首先,这里是 生产环境 者代码:
25条消息在5秒内排队
发送
16条消息
3条消息未发送(队列已满)
6条消息被取消
这里是基于List的“Queue”类 . 它是队列和消费者的组合 . 使用AsyncMonitor类(Stephen Cleary的AsyncEx)完成同步 .
现在我对这个解决方案没问题;它完成了工作:D