所以这是一个相当广泛的问题,但已经没有想法了 . 我们当前正在运行2个辅助角色实例,它们执行以下操作:
-
通过为每个批次生成N个线程来监视和处理IoT Hub事件 .
-
监视和处理来自IoT Hub的连接/断开(操作监视)消息
-
某些服务总线是否有效(主题和队列)
-
写入SQL,DocDB(Mongo API)和Azure表存储,以便通过NLOG进行日志记录
-
通过IoT Hub发送 Cloud 到设备消息
我们面临的问题是在高峰期间我们的CPU显然会增加,但遗憾的是它永远不会降低,并且通常会高达100%并且坐在那里直到我重新启动实例以使其恢复原状 . 我一直在研究线程,因为我仍然觉得它可能与“while(1)”类型场景有关,即使看不清楚原因 . 让我们现在进入代码......
在 WorkerRole.cs :
class WorkerRole : RoleEntryPoint
{
private readonly ManualResetEvent runCompleteEvent = new ManualResetEvent(false);
public override void Run()
{
_eventprocessor.Start(instanceId, instanceIndex);//.Wait(-1);
//Wait for shutdown to be called, else the role will recycle
this.runCompleteEvent.WaitOne();
}
}
在 EventProcessor.cs :我会尝试省去很多果汁但添加我认为可能值得的东西 . 将尽可能添加"pseudo code" .
public class EventProcessor : IEventProcessor
{
private readonly ManualResetEvent runCompleteEvent = new ManualResetEvent(false);
public async Task Start(string serviceId, int InstanceIndex)
{
//Setup Topic
//Setup Queue
//Setup EventProcessorHost for receiving events and operations monitoring and start listening
//Receiving cloud to device feedback from service
ReceiveFeedbackAsync();
runCompleteEvent.WaitOne();
}
async Task IEventProcessor.ProcessEventsAsync(PartitionContext context, IEnumerable<EventData> messages)
{
if (messages.Count() > 0)
{
if (!_cancellationSource.IsCancellationRequested)
{
await ProcessEventsBulk(context, messages);
}
}
if (messages.Count() > 0)
{
await context.CheckpointAsync();
}
}
async Task ProcessEventsBulk(PartitionContext context, IEnumerable<EventData> messages)
{
List<Task> TaskList = new List<Task>();
foreach (EventData message in messages)
{
var LastTask = Task.Run(() => GoBoy(context, message));
TaskList.Add(LastTask);
}
await Task.WhenAll(TaskList);
}
async Task GoBoy(PartitionContext context, EventData message)
{
try
{
using (var db = new AppDbContext(_dbContextConnectionString))
{
await ProcessEvent(message, context.Lease.PartitionId, new CoreManagerContainer(db), db);
await db.SaveChangesAsync();
}
}
catch (Exception e)
{
//Do Some stuff...
}
}
private async void ReceiveFeedbackAsync()
{
var feedbackReceiver = serviceClientReceiver.GetFeedbackReceiver();
while (true)
{
try
{
var feedbackBatch = await feedbackReceiver.ReceiveAsync();
if (feedbackBatch == null) continue;
foreach (var records in feedbackBatch.Records)
{
}
await feedbackReceiver.CompleteAsync(feedbackBatch);
}
catch (Exception)
{
Thread.Sleep(30000);
}
}
}
}
如果有任何人需要的东西,请不要犹豫 . 我非常感谢任何帮助 .
这里显示了重新启动worker后的CPU丢弃
微软支持协助我要求我做一些PerfViews和一些ProcDumps . 结果是我们应该调查调用我们的集线器“https://abcxyz.azure-devices.net:443/ $ iothub / websocket”的线程 . 这就是为什么我决定添加ReceiveFeedbackAsync()方法,因为我知道它依赖于永久连接到我们的集线器来收集反馈 .
从我可以看到我们正确地注册到我们的EVPH,但让我知道是否有人想查看该代码 .
1 回答
您是否单步执行代码并确保没有创建无限循环条件,而不会抛出任何异常,因此您的Thead.Sleep将执行 . 由于您希望在代码中使用Sleep,因此最好避免使用Exception来触发它 . 也许在处理完每批反馈后将其编码为Sleep . 例外是错误处理和特殊情况,而不是帮助控制逻辑流程 .