首页 文章

从MessageSession异步接收消息时处理TimeoutException

提问于
浏览
0

目前,我的解决方案涉及1个Web和1个Worker角色,通过Azure Service Bus相互“交谈”:

  • Web角色将消息发送到"Request"队列 .

  • Worker从"Request"队列中获取消息,执行一些工作并将响应消息发送到"Response"队列,Web角色将从这些队列中获取响应消息 .

  • 由Web角色发送的每条请求消息,都必须找到您的响应,即每条请求消息都带有唯一的相关标识符's why I' m "marking" . Worker从请求中获取此标识符并使用"mark"响应 .

最初,我发送一堆消息:

var messages = tasksForExecution.Select(s =>
            {
                var message = new BrokeredMessage(s);
                message.Properties.Add("action", "submit");
                message.ReplyToSessionId =  Guid.NewGuid().ToString("D");
                return message;
            }).ToList();
var correlationIdentifiers = messages.Select(x => x.ReplyToSessionId).ToList();
messages.ForEach(message => _requestQueueClient.SendAsync(message));

得到回应:

var sessions = correlationIdentifiers
            .Select(x=>_responseQueueClient.AcceptMessageSession(x)).ToList();
var receiveMessageTasks = sessions.Select(session => session.ReceiveAsync()).ToList();
var allReadyTask = Task.WhenAll(receiveMessageTasks).ContinueWith(x =>
            {
                ...extract data from result...
            }
..........
..........
allReadyTask.Wait();

正如您所看到的,我正在创建一堆任务,当所有任务完成后,将调用回调来提取一些数据 . 但是,如果任务中出现异常,会发生什么? - >捕获此异常,并在调用 Wait() 时将其提供给我们 . 由于 ReceiveAsync() 方法的默认超时为1分钟,因此很有可能收到TimeoutException,因为worker稍后可能会完成其作业,并且队列中不会存在此会话的消息 . 我找到了许多例子,人们捕捉到这种类型的异常,消费它,再次尝试接收 - 他们给了 Worker 另外一分钟的时间来完成工作 . 我怎样才能在我的场景中完成此任务 . 谢谢!

1 回答

  • 0

    我发布了我对问题的解决方案 . 请参阅上一篇文章以找出变量 .

    var receiveMessageTasks = sessions.Select(ReceiveMessageAsync).ToList();
    var allReadyTask = Task.WhenAll(receiveMessageTasks).ContinueWith(x =>
            {
                ...extract data from result...
            }
    allReadyTask.Wait();
    

    其中 receiveMessageTasks 方法如下:

    private async Task<BrokeredMessage> ReceiveMessageAsync(MessageSession session)
        {
            BrokeredMessage message = null;
    
            // We can specify dead line for processing, but since our worker will return error message if a problem arise,
            // I prefer to wait endlessly
            // var timesToRetry = 0;
    
            // if Receive operation throws an exception, then message will be null
            while (message == null /* && timesToRetry < 5*/)
            {
                try
                {
                    var task = session.ReceiveAsync().ConfigureAwait(false);
                    await task;
                    message = task.GetAwaiter().GetResult();
                    //timesToRetry++;
                }
                catch (Exception e)
                {
                    // session was idle for a long time and azure closed it -> renew it
                    session = _responseQueueClient.AcceptMessageSession(session.SessionId);
                }
            }
            return message;
        }
    

    请注意 ConfigureAwait(false) . Here你可以找到问题的描述:)

相关问题