我习惯在Azure上使用webjob来触发Azure队列 . 它就像一个魅力 .
static void Main(string[] args)
{
JobHost host = new JobHost();
host.RunAndBlock();
}
public static void ProcessQueueMessage([QueueTrigger("logqueue")] string logMessage, TextWriter logger)
{
logger.WriteLine(logMessage);
}
queueTrigger真正有用的是在消息触发的进程没有完成之前,消息是不可见的(不是删除) . 因此,如果您关闭webjob(例如,用于webjob更新),队列中的消息将被更新的webjob(完美)处理 .
现在我想做同样的事情,但在 Worker 角色 . 今天我喜欢这个 .
while (true)
{
var cloudMessage = await sourceImportationQueue.GetMessageAsync();
if (cloudMessage != null)
sourceImportationQueue.DeleteMessage(cloudMessage);
// process my job (few hours)
else
await Task.Delay(1000 * 5);
}
但如果我在工作期间停止工作,我就丢失了信息 . 那我怎么能像webJob触发一样呢?
3 回答
最后我找到一个简单的解决方案在运行几个小时的作业之前,我启动了一个任务
KeepHiddenMessageAsync
,用超时更新消息 . 在超时结束之前,将完成消息的新更新 . 如果出现问题,则将达到消息超时并且消息将变为可见 .默认情况下,一旦检索到队列消息,它将在5分钟内变为不可见 . 在此延迟之后,如果消息尚未从队列中删除,它将再次可见,以便可以再次处理它 .
在您的代码示例中,您将在从队列中获取消息后立即删除该消息 . 如果您想使其安全,则只应在流程结束时删除该消息 . 您是否尝试在处理作业结束时移动
sourceImportationQueue.DeleteMessage(cloudMessage);
?如果不使用某种持久性存储来跟踪您的工作进度,可能无法解决此问题 . 正如已经确定的那样,您将在作业开始之前删除该消息,因此如果作业因任何原因失败(包括停止该角色),则消息将丢失 . 消息的最长锁定时间为5分钟,这意味着在作业仍在运行时消息将再次出现,如果删除操作已移至最后,则会因丢失锁定而失败 .
如果长时间运行的作业由多个较小的步骤组成,这些步骤都不超过5分钟,那么您可以定期调用RenewLock()来保留对消息的锁定并阻止它重新出现在队列中 . 只要锁永不过期,在这种情况下,最终的DeleteMessage就会成功 . 尽管如此,这可能不太适合您的情况 .
一种可能的解决方案是将作业状态写入例如Azure表,并在整个作业处理过程中记录状态 . 您的工作者角色循环将检查表中是否有任何尚未完成的作业,并继续存在任何已存在的作业,如果没有找到,则检查服务总线是否有任何新作业 . 此解决方案还可以让您有机会从他们到达的地方获取失败的工作,而不是从头开始再次开始2小时的工作 .