首页 文章

如何修复仅限于从Azure ServiceBus读取的1个客户端

提问于
浏览
3

我有一些对我来说很头疼的东西......

我有一个小服务,从Azure ServiceBus队列中读取消息并将数据存储在CosmosDB集合中 .

问题是我无法扩展我的服务 . 我已经能够优化一些事情来改善一个服务实例每秒读取的消息数量 . 但是,添加更多服务实例会略微降低每秒读取的消息数量!

重要的是要注意,批量发送消息就像一个魅力,我可以每秒向队列发送1000-2000条消息,没有任何问题 . 从队列中读取是个问题 .

我的处理程序有点CPU密集,消息的大小约为2 KB到900 KB,平均值大约为25 KB . 我现在有一个实例来处理大约每秒41.5条消息 .

如果我添加该服务的第二个实例(顺便说一下是Azure Web App),则所有实例每秒读取的消息总数将下降到大约40个 . 添加另一个实例会将其减少到接近38 .

从队列中读取消息(并处理重试,破坏等)的实际代码是内部公司框架的一部分,许多其他服务使用该框架,但没有一个具有此问题 . 其他服务具有预期的行为,即性能与服务实例的数量呈线性关系(显然,ServiceBus可以处理的最大值) .

我在两个使用Premium ServiceBus层的不同Azure订阅(TEST和PROD)上遇到了同样的问题 .

我没有在队列中使用会话 .

Has anyone here ever had a similar issue, and how did you solve it?

我尝试过的事情:

  • 只关闭与从Blob存储中读取代码而从ServiceBus中取出的代码 . 这给了我CosmosDB更高吞吐量的数量级(大约15,000个文档,pr . 第二个,41.6个,当源是ServiceBus时) . 通过扩展我获得了如此高的吞吐量,这就是我在使用ServiceBus时遇到的问题所以CosmosDb肯定不是瓶颈 .

  • 我尝试删除并重新创建队列以及调整代码中的各种内容 - 但从逻辑上讲,在我看来,无论我在代码中做什么,都只能影响单个服务实例的性能 .

  • 我尝试在我的计算机上本地运行该服务,同时它也在Azure中运行 . 从Splunk日志中我可以看到,在代表性的一分钟内,该服务的Azure实例处理了1371条消息,而该服务的本地实例仅处理了23条消息 . 所以,正如我一直在说的,这里似乎存在某种僵局或某些事情 . 进一步的分析表明,Azure中的实例平均花费了247毫秒来处理消息,而本地实例平均花费了66秒!如果锁定过期或发生未处理的异常,则会在队列中放回一条消息,并在10次传递尝试失败后进行破坏 . 所以看起来似乎大多数本地处理的消息都失败了,然后被放回到队列中,最后由Azure实例处理(这是我的猜测) .

我的Web应用程序实例之间唯一的共享资源是ServiceBus和CosmosDb,如上所述,我已经排除了CosmosDb . 然而,看到我在我们的TEST和PROD订阅中遇到同样的问题(我们的DEV订阅不允许扩展),我尝试以各种不同的方式重新创建队列几次,它不能也是队列本身,并且在同一ServiceBus实例上使用的其他队列都没有遇到此问题 .

正如预期的那样,调整/优化代码只会影响一个实例的性能 . 据我所知,可能已经排除了外部瓶颈的可能性 . 剩下的一件事,我们的内部框架处理队列中消息的实际读取,也被排除在框架的完全相同版本用于许多其他网络应用程序的事实,其中扩展已被证明工作 .

我觉得这里很漂亮...

解决方案:忘了更新这个问题,所以最后这里是......我们最终设法留出时间完全专注于这个问题,并通过各种测试我们得出结论,它是在SDK中使用ReadBatchAsync方法的组合并且有相当大的消息是造成这个问题的原因 . 切换到使用OnMessageAsync修复它 .

2 回答

  • 1

    拥有 async void 操作通常不是一个好主意 .

    此外,您还可以重构要批量调用的处理 .

    第一种方法假定无法使 StartProcessMessage 异步

    void StartProcessMessage(Message m) {
        //...
    }
    
    public async Task Start() {
        while (true) {
            var messages = (await _queueClient.ReceiveBatchAsync(Math.Max(1, _configuration.MaxConcurrentCalls - _messagesInProgress))).ToArray();
            Interlocked.Add(ref _messagesInProgress, messages.Length);
            var tasks = messages.Select(m => Task.Run(() => StartProcessMessage(m)));
            await Task.WhenAll(tasks); //process in parallel.
            while (_messagesInProgress > _configuration.MaxConcurrentCalls) {
                await Task.Delay(100);
            }
        }
    }
    

    第二种方法假设 StartProcessMessage 可以重构为异步

    Task StartProcessMessage(Message m) {
        //...
    }
    
    public async Task Start() {
        while (true) {
            var messages = (await _queueClient.ReceiveBatchAsync(Math.Max(1, _configuration.MaxConcurrentCalls - _messagesInProgress))).ToArray();
            Interlocked.Add(ref _messagesInProgress, messages.Length);
            var tasks = messages.Select(m => StartProcessMessage(m));
            await Task.WhenAll(tasks); //process in parallel.
            while (_messagesInProgress > _configuration.MaxConcurrentCalls) {
                await Task.Delay(100);
            }
        }
    }
    
  • 1

    我建议首先消除处理代码是问题的可能性 . 尝试使用虚拟 StartProcessMessage 运行,它不会确保它不是问题/瓶颈,即太多的写入者写入某些共享资源或类似的东西 .

    您可以尝试的另一个选项是使用最新的.Net库 Microsoft.Azure.ServiceBus . 那里可用的类允许运行内置循环,以更自然的方式和简单的方式允许 MaxConcurrentCalls . 但是确保它不是处理程序是你应该尝试的第一件事 . 如果你已经做过,也许你应该分享它 .

相关问题