首页 文章

移动到Poison Queue的队列消息仍显示为队列计数,但保持隐藏状态

提问于
浏览
2

我正在测试我正在构建的Webjob的Poison消息处理 .

一切似乎都按预期工作,除了一件奇怪的事情:

当消息移动到“-poison”队列时,其重影似乎在主作业队列中保持隐藏(不可见) . 这意味着如果我将6个有害消息移动到“-poison”队列,则存储资源管理器会显示“ Showing 0 of 6 messages in queue ” . 我无法在Storage Explorer中看到6条隐藏的消息 .

我试图删除作业队列并重新创建它,但是在我运行测试后仍然会发生奇怪的问题 . 存储资源管理器显示“在队列中显示6条消息中的0条” .

现场背后发生了什么?

Update 1

我做了一些调查,我认为WebJob SDK不会删除有害消息 .

我浏览了WebJob SDK源代码,我认为这行代码由于某种原因没有被执行:

https://github.com/Azure/azure-webjobs-sdk/blob/dev/src/Microsoft.Azure.WebJobs.Host/Queues/QueueProcessor.cs#L119

这是我的功能,可以帮助重现问题:

public class Functions
{
    public static void ProcessQueueMessage([QueueTrigger("%QueueName%")] string message, TextWriter log)
    {
        if (message.Contains("Break"))
        {
            throw new Exception($"Error while processing message {message}");
        }

        log.WriteLine($"Processed message {message}");
    }

}

Update 2

这是我正在使用的WebJob SDK:

enter image description here

3 回答

  • 3

    据我所知,azure存储SDK 8.与Azure webjobs SDK2.0(related issue)不兼容 .

    如果您使用存储SDK 8.毒性消息保持未删除但不可见 .

    解决方法方法是使用低azure存储SDK 7.2.1 .

    它会运作良好 .

    这个问题将在未来的SDK版本中得到解决 .

  • 1

    我也有同样的问题 .

    问题是当毒性队列中的消息副本通过ref没有可见时间https://github.com/Azure/azure-webjobs-sdk/blob/dev/src/Microsoft.Azure.WebJobs.Host/Queues/QueueProcessor.cs#L145时,并且当尝试从原始队列中删除消息时,服务返回404未找到 . 是azure-webjobs-sdk中的问题,解决方案是进行此更改

    await AddMessageAndCreateIfNotExistsAsync(poisonQueue, new CloudQueueMessage(message.AsString), cancellationToken);
    

    https://github.com/Azure/azure-webjobs-sdk/blob/dev/src/Microsoft.Azure.WebJobs.Host/Queues/QueueProcessor.cs#L145
    我们等待这个修复的新版本

    Custom solution

    要解决此问题,请创建自己的CustomProcessor,并在 CopyMessageToPoisonQueueAsync 函数中创建新的CloudMessage,从原始队列传入毒性队列,请参阅下面的示例 .

    var config = new JobHostConfiguration
    config.Queues.QueueProcessorFactory = new CustomQueueProcessorFactory();
    
    public QueueProcessor Create(QueueProcessorFactoryContext context)
        {
            // demonstrates how the Queue.ServiceClient options can be configured
            context.Queue.ServiceClient.DefaultRequestOptions.ServerTimeout = TimeSpan.FromSeconds(30);
    
            // demonstrates how queue options can be customized
            context.Queue.EncodeMessage = true;
    
            // return the custom queue processor
            return new CustomQueueProcessor(context);
        }
    
        /// <summary>
        /// Custom QueueProcessor demonstrating some of the virtuals that can be overridden
        /// to customize queue processing.
        /// </summary>
        private class CustomQueueProcessor : QueueProcessor
        {
            private QueueProcessorFactoryContext _context;
            public CustomQueueProcessor(QueueProcessorFactoryContext context)
                : base(context)
            {
                _context = context;
            }
    
            public override async Task CompleteProcessingMessageAsync(CloudQueueMessage message, FunctionResult result, CancellationToken cancellationToken)
            {
                await base.CompleteProcessingMessageAsync(message, result, cancellationToken);
            }
            protected override async Task CopyMessageToPoisonQueueAsync(CloudQueueMessage message, CloudQueue poisonQueue, CancellationToken cancellationToken)
            {
                var msg = new CloudQueueMessage(message.AsString);
                await base.CopyMessageToPoisonQueueAsync(msg, poisonQueue, cancellationToken);
            }
            protected override void OnMessageAddedToPoisonQueue(PoisonMessageEventArgs e)
            {
                base.OnMessageAddedToPoisonQueue(e);
            }
        }
    
  • 1

    对于任何人仍然有这个问题 . 这应该在2.1.0-beta1-10851之后修复 . 缺点是目前还没有稳定发布的2.1.0版本 .

相关问题