目前,我的解决方案涉及1个Web和1个Worker角色,通过Azure Service Bus相互“交谈”:
-
Web角色将消息发送到"Request"队列 .
-
Worker从"Request"队列中获取消息,执行一些工作并将响应消息发送到"Response"队列,Web角色将从这些队列中获取响应消息 .
-
由Web角色发送的每条请求消息,都必须找到您的响应,即每条请求消息都带有唯一的相关标识符's why I' m "marking" . Worker从请求中获取此标识符并使用"mark"响应 .
最初,我发送一堆消息:
var messages = tasksForExecution.Select(s =>
{
var message = new BrokeredMessage(s);
message.Properties.Add("action", "submit");
message.ReplyToSessionId = Guid.NewGuid().ToString("D");
return message;
}).ToList();
var correlationIdentifiers = messages.Select(x => x.ReplyToSessionId).ToList();
messages.ForEach(message => _requestQueueClient.SendAsync(message));
得到回应:
var sessions = correlationIdentifiers
.Select(x=>_responseQueueClient.AcceptMessageSession(x)).ToList();
var receiveMessageTasks = sessions.Select(session => session.ReceiveAsync()).ToList();
var allReadyTask = Task.WhenAll(receiveMessageTasks).ContinueWith(x =>
{
...extract data from result...
}
..........
..........
allReadyTask.Wait();
正如您所看到的,我正在创建一堆任务,当所有任务完成后,将调用回调来提取一些数据 . 但是,如果任务中出现异常,会发生什么? - >捕获此异常,并在调用 Wait()
时将其提供给我们 . 由于 ReceiveAsync()
方法的默认超时为1分钟,因此很有可能收到TimeoutException,因为worker稍后可能会完成其作业,并且队列中不会存在此会话的消息 . 我找到了许多例子,人们捕捉到这种类型的异常,消费它,再次尝试接收 - 他们给了 Worker 另外一分钟的时间来完成工作 . 我怎样才能在我的场景中完成此任务 . 谢谢!
1 回答
我发布了我对问题的解决方案 . 请参阅上一篇文章以找出变量 .
其中
receiveMessageTasks
方法如下:请注意
ConfigureAwait(false)
. Here你可以找到问题的描述:)