首页 文章

高CPU Azure工作者角色

提问于
浏览
7

所以这是一个相当广泛的问题,但已经没有想法了 . 我们当前正在运行2个辅助角色实例,它们执行以下操作:

  • 通过为每个批次生成N个线程来监视和处理IoT Hub事件 .

  • 监视和处理来自IoT Hub的连接/断开(操作监视)消息

  • 某些服务总线是否有效(主题和队列)

  • 写入SQL,DocDB(Mongo API)和Azure表存储,以便通过NLOG进行日志记录

  • 通过IoT Hub发送 Cloud 到设备消息

我们面临的问题是在高峰期间我们的CPU显然会增加,但遗憾的是它永远不会降低,并且通常会高达100%并且坐在那里直到我重新启动实例以使其恢复原状 . 我一直在研究线程,因为我仍然觉得它可能与“while(1)”类型场景有关,即使看不清楚原因 . 让我们现在进入代码......

WorkerRole.cs

class WorkerRole : RoleEntryPoint
    {
        private readonly ManualResetEvent runCompleteEvent = new ManualResetEvent(false);

        public override void Run()
        {
            _eventprocessor.Start(instanceId, instanceIndex);//.Wait(-1);

            //Wait for shutdown to be called, else the role will recycle
            this.runCompleteEvent.WaitOne();
        }
    }

EventProcessor.cs :我会尝试省去很多果汁但添加我认为可能值得的东西 . 将尽可能添加"pseudo code" .

public class EventProcessor : IEventProcessor
{
  private readonly ManualResetEvent runCompleteEvent = new ManualResetEvent(false);

  public async Task Start(string serviceId, int InstanceIndex)
  {

    //Setup Topic

    //Setup Queue

    //Setup EventProcessorHost for receiving events and operations monitoring and start listening

    //Receiving cloud to device feedback from service
    ReceiveFeedbackAsync();

    runCompleteEvent.WaitOne();
  }

  async Task IEventProcessor.ProcessEventsAsync(PartitionContext context, IEnumerable<EventData> messages)
  {
        if (messages.Count() > 0)
        {
            if (!_cancellationSource.IsCancellationRequested)
            {
                await ProcessEventsBulk(context, messages);
            }
        }

        if (messages.Count() > 0)
        {
            await context.CheckpointAsync();               
        }
   }

  async Task ProcessEventsBulk(PartitionContext context, IEnumerable<EventData> messages)
        {
            List<Task> TaskList = new List<Task>();
            foreach (EventData message in messages)
            {
                var LastTask = Task.Run(() => GoBoy(context, message));
                TaskList.Add(LastTask);
            }
            await Task.WhenAll(TaskList);
        }

    async Task GoBoy(PartitionContext context, EventData message)
    {
        try
        {
            using (var db = new AppDbContext(_dbContextConnectionString))
            {
                await ProcessEvent(message, context.Lease.PartitionId, new CoreManagerContainer(db), db);
                await db.SaveChangesAsync();
            }
        }
        catch (Exception e)
        {
           //Do Some stuff...
        }
    }

  private async void ReceiveFeedbackAsync()
    {
        var feedbackReceiver = serviceClientReceiver.GetFeedbackReceiver();
        while (true)
        {
            try
            {
              var feedbackBatch = await feedbackReceiver.ReceiveAsync();
              if (feedbackBatch == null) continue;
              foreach (var records in feedbackBatch.Records)
              {

              }
              await feedbackReceiver.CompleteAsync(feedbackBatch);
            }
            catch (Exception)
            {
              Thread.Sleep(30000);                    
            }
         }

    }

}

如果有任何人需要的东西,请不要犹豫 . 我非常感谢任何帮助 .

这里显示了重新启动worker后的CPU丢弃
enter image description here

微软支持协助我要求我做一些PerfViews和一些ProcDumps . 结果是我们应该调查调用我们的集线器“https://abcxyz.azure-devices.net:443/ $ iothub / websocket”的线程 . 这就是为什么我决定添加ReceiveFeedbackAsync()方法,因为我知道它依赖于永久连接到我们的集线器来收集反馈 .

从我可以看到我们正确地注册到我们的EVPH,但让我知道是否有人想查看该代码 .

1 回答

  • 0

    您是否单步执行代码并确保没有创建无限循环条件,而不会抛出任何异常,因此您的Thead.Sleep将执行 . 由于您希望在代码中使用Sleep,因此最好避免使用Exception来触发它 . 也许在处理完每批反馈后将其编码为Sleep . 例外是错误处理和特殊情况,而不是帮助控制逻辑流程 .

相关问题