首页 文章

Azure WorkerRole在WebJob等队列上触发

提问于
浏览
1

我习惯在Azure上使用webjob来触发Azure队列 . 它就像一个魅力 .

Azure tutorial webjob + queue

static void Main(string[] args)
{
    JobHost host = new JobHost();
    host.RunAndBlock();
}

public static void ProcessQueueMessage([QueueTrigger("logqueue")] string logMessage, TextWriter logger)
{
    logger.WriteLine(logMessage);
}

queueTrigger真正有用的是在消息触发的进程没有完成之前,消息是不可见的(不是删除) . 因此,如果您关闭webjob(例如,用于webjob更新),队列中的消息将被更新的webjob(完美)处理 .

现在我想做同样的事情,但在 Worker 角色 . 今天我喜欢这个 .

while (true)
{
     var cloudMessage = await sourceImportationQueue.GetMessageAsync();
     if (cloudMessage != null)
           sourceImportationQueue.DeleteMessage(cloudMessage);
      // process my job (few hours)
     else
           await Task.Delay(1000 * 5);
}

但如果我在工作期间停止工作,我就丢失了信息 . 那我怎么能像webJob触发一样呢?

3 回答

  • 2

    最后我找到一个简单的解决方案在运行几个小时的作业之前,我启动了一个任务 KeepHiddenMessageAsync ,用超时更新消息 . 在超时结束之前,将完成消息的新更新 . 如果出现问题,则将达到消息超时并且消息将变为可见 .

    private bool jobIsComplete;
    
            private void Run()
            {
                 while (true)
                {
                     jobIsComplete = false;
                     //get the message
                     var cloudMessage = await queue.GetMessageAsync();
    
                     if (cloudMessage != null)
                            //run the task to keep the message until end of the job and worker role stopping for an update for example 
                           var keepHiddenMessageTask = KeepHiddenMessageAsync(cloudMessage);
    
                            //
                            // process my job (few hours)
                            //
    
                          jobIsComplete = true;
                          await keepHiddenMessageTask;
                          await _queue.DeleteMessageAsync(cloudMessage);
                     else
                           await Task.Delay(1000 * 5);
                }
            }
    
            private async Task KeepHiddenMessageAsync(CloudQueueMessage iCloudQueueMessage)
            {
                while (true)
                {
                    //Update message and hidding during 5 new minutes
                    await _queue.UpdateMessageAsync(iCloudQueueMessage, TimeSpan.FromMinutes(5), MessageUpdateFields.Visibility);
    
                    //Wait 4 minutes
                    for (int i = 0; i < 60 * 4; i++)
                    {
                        if (JobIsComplete)
                            return;
                        else
                            await Task.Delay(1000);
                    }
                }
            }
    
  • 0

    默认情况下,一旦检索到队列消息,它将在5分钟内变为不可见 . 在此延迟之后,如果消息尚未从队列中删除,它将再次可见,以便可以再次处理它 .

    在您的代码示例中,您将在从队列中获取消息后立即删除该消息 . 如果您想使其安全,则只应在流程结束时删除该消息 . 您是否尝试在处理作业结束时移动 sourceImportationQueue.DeleteMessage(cloudMessage);

  • 0

    如果不使用某种持久性存储来跟踪您的工作进度,可能无法解决此问题 . 正如已经确定的那样,您将在作业开始之前删除该消息,因此如果作业因任何原因失败(包括停止该角色),则消息将丢失 . 消息的最长锁定时间为5分钟,这意味着在作业仍在运行时消息将再次出现,如果删除操作已移至最后,则会因丢失锁定而失败 .

    如果长时间运行的作业由多个较小的步骤组成,这些步骤都不超过5分钟,那么您可以定期调用RenewLock()来保留对消息的锁定并阻止它重新出现在队列中 . 只要锁永不过期,在这种情况下,最终的DeleteMessage就会成功 . 尽管如此,这可能不太适合您的情况 .

    一种可能的解决方案是将作业状态写入例如Azure表,并在整个作业处理过程中记录状态 . 您的工作者角色循环将检查表中是否有任何尚未完成的作业,并继续存在任何已存在的作业,如果没有找到,则检查服务总线是否有任何新作业 . 此解决方案还可以让您有机会从他们到达的地方获取失败的工作,而不是从头开始再次开始2小时的工作 .

相关问题