首页 文章

Azure Worker角色以异步方式处理作业

提问于
浏览
2

我正在尝试实现以下用例 . 我有一个Azure辅助角色,它将监视Azure存储队列,当有消息进入时,这将触发一个异步运行的作业 . 我想尽可能使用TPL,并且需要操作来支持取消,这样当Azure Role OnStop触发时,如果可能,作业可以正常退出 . Scott Guthrie发布的MyFixIt示例几乎就是我所需要的,我已经将它用作我项目的模板 . 不支持的一个关键方面是要求异步运行作业 . 在FixIt代码中,一旦启动作业,在完成作业之前不会处理任何其他作业 . 我的应用程序将处理的一些作业长时间运行,我需要worker角色能够注意到其他传入的作业,并在长时间运行的作业运行时运行这些作业 .

这里的两个关键方法是ProcessMessagesAsync,它监视队列,ProcessMessage将在消息进入时运行作业 . 这是我拥有的,它主要工作,除了它没有正确处理CancellationRequest,以及Azure Worker角色将在不等待作业完成的情况下关闭 .

/// <summary>
    /// Continuous loop that monitors the queue and launches jobs when they are retrieved.
    /// </summary>
    /// <param name="token"></param>
    /// <returns></returns>
    public virtual async Task ProcessMessagesAsync(CancellationToken token)
    {
        CloudQueue queue = _queueClient.GetQueueReference(_queueName);
        await queue.CreateIfNotExistsAsync(token);

        while (!token.IsCancellationRequested)
        {
            Debug.WriteLine("inLoop");
            // The default timeout is 90 seconds, so we won’t continuously poll the queue if there are no messages.
            // Pass in a cancellation token, because the operation can be long-running.
            CloudQueueMessage message = await queue.GetMessageAsync(token);
            if (message != null)
            {
                ProcessMessage(message, queue, token);
            }
            else
            {
                await Task.Delay(500, token);
            }
        }
    }



    protected virtual async Task ProcessMessage(CloudQueueMessage message, CloudQueue queue, CancellationToken token)
    {
        var jobDetails = JobDetails.DeserializeJson(message.AsString);
        var result = await _jobRunner.RunJob(jobDetails, token);

        //todo handle error
        //if (result.Status == JobStatus.Error)

        await queue.DeleteMessageAsync(message);
    }

然后JobRunner运行请求的作业 . 我写了一个TestJob,其中我试图模拟一个可以注意到CancellationRequest的长时间运行的作业,并在短暂的清理期后,提前退出作业 .

public virtual async Task<JobResult> RunJob(JobDetails jobDetails, CancellationToken token)
    {
        switch (jobDetails.JobName.ToLower())
        {
            case "testjob":
                return await TestJob(jobDetails.Args, token);
        }
        return new JobResult(JobStatus.Error) { ErrorMessage = "The job requested does not exist." };
    }
    protected virtual async Task<JobResult> TestJob(List<string> jobArgs, CancellationToken token)
    {
        var message = "no message";
        if (jobArgs != null && jobArgs.Any())
            message = jobArgs[0];

        return await Task.Run(async () =>
        {
            Debug.WriteLine(string.Format("Start:{0}", message));
            for (int i = 1; i <= 800; i++)
            {
                if (token.IsCancellationRequested)
                {
                    Debug.WriteLine("CancelationRequest in TestJob");
                    //simulate short time to cleanup and exit early
                    Thread.Sleep(1500);
                    Debug.WriteLine("Cancelation Job Cleanup finsihed.");
                    token.ThrowIfCancellationRequested();
                }
                Thread.Sleep(10);
            }

            Debug.WriteLine(string.Format("Finish:{0}", message));
            return new JobResult(JobStatus.Success);
        });
    }

我一直在搜索和研究2天,包括TPL DataFlow库,但还没有找到一种方法来使这项工作正常进行 . 我觉得对ProcessMessage的调用(消息,队列,令牌)没有正确完成,甚至还有一个编译器警告'因为没有等待这个调用......' . 但我不想等待(这是FixIt示例所做的),因为在运行完成之前,其他任何工作都不会被注意到 . 这似乎不是一个不常见的用例,虽然我似乎找不到任何描述它的人 .

预先感谢您的任何帮助!

丹尼格林

2 回答

  • 0

    发生这种情况的原因是因为您没有履行从ProcessMessage返回的任务 . 因为ProcessMessageAsync可以在ProcessMessage正常完成或取消之前完成 . 请记住,您不希望等待ProcessMessage,因为它会使消息处理顺序,我建议您保留一个正在运行的任务列表 . 换句话说,在ProcessMessageAsync中创建一个List,并将从ProcessMessage返回的任务添加到此列表中 . 然后在while循环结束时,如果令牌被取消,您应循环遍历此列表以取消所有待处理任务 .

    对不起,我没有VS方便,但我希望你明白这一点 .

  • 0

    谢谢Sanjay,根据你的建议,我想出了以下内容 .

    /// <summary>
        /// Continuous loop that monitors the queue and launches jobs when they are retrieved.
        /// </summary>
        /// <param name="token"></param>
        /// <returns></returns>
        public virtual async Task ProcessMessagesAsync(CancellationToken token)
        {
            CloudQueue queue = _queueClient.GetQueueReference(_queueName);
            await queue.CreateIfNotExistsAsync(token);
    
            var runningTasks = new ConcurrentDictionary<int, Task>();
    
            while (!token.IsCancellationRequested)
            {
                Debug.WriteLine("inLoop");
                // The default timeout is 90 seconds, so we won’t continuously poll the queue if there are no messages.
                // Pass in a cancellation token, because the operation can be long-running.
                CloudQueueMessage message = await queue.GetMessageAsync(token);
                if (message != null)
                {
                    var t = ProcessMessage(message, queue, token);
                    var c = t.ContinueWith(z => RemoveRunningTask(t.Id, runningTasks));
                    while (true)
                    {
                        if (runningTasks.TryAdd(t.Id, t))
                            break;
                        Task.Delay(25);
                    }                                    
                }                    
                else
                {
                    try
                    {
                        await Task.Delay(500, token);
                    }
                    catch (Exception ex)
                    {
                        Debug.WriteLine(ex.Message);
                    }
                }
            }
            while (!runningTasks.IsEmpty)
            {
                Debug.WriteLine("Waiting for running tasks");
                Task.Delay(500);
            }
    
        }
    
        private static void RemoveRunningTask(int id, ConcurrentDictionary<int, Task> runningTasks)
        {
            while (true)
            {
                Task outTask;
                if (runningTasks.TryRemove(id, out outTask))
                    break;
                Task.Delay(25);
            }
    
        }
    

    这似乎有效,但我觉得它有点笨拙 . 我开始像这样对'ContinueWith'进行编码,但是对于传入的任务具有不同的Id值感到惊讶(我预计它将是相同的任务):

    var task = ProcessMessage(message, queue, token).ContinueWith(x =>
                    {
                        while (true)
                        {
                            Task outTask;
                            if (runningTasks.TryRemove(x.Id, out outTask))
                                break;
                            Task.Delay(25);
                        }
    
                    });
    

    UPDATE: 事实证明这仍然不能正常工作,我在早先测试时误解了结果 . 基于MyFixIt示例,在Work Role OnStop中,我有以下代码:

    public override void OnStop()
        {
            Debug.WriteLine("OnStop_Begin");
            tokenSource.Cancel();
            tokenSource.Token.WaitHandle.WaitOne();
            base.OnStop();
            Debug.WriteLine("Onstop_End");
            tokenSource.Dispose();
        }
    

    似乎tokenSource.Token.WaitHandle.WaitOne实际上无法等到所有引用令牌的任务都完成,因此即使任务仍处于完成处理中,角色也会继续并停止 . 有什么方法可以在取消实际完成时正确使用令牌发出信号吗?

    谢谢!

    UPDATE 2

    好吧,我想我现在有一个解决方案 . 看起来在调用.Cancel时会发出CancellationToken.WaitHandle的信号,所以我不确定在调用.Cancel后立即使用它的目的是什么,似乎它总是会立即继续通过该代码?这就是FixIt示例中的情况,但我并不是真的理解它 . 出于我的目的,我已将ProcessMessagesAsync更改为现在在ManualResetEventSlim中传递,然后在所有任务完成后设置 . 然后在OnStop中,我在完成停止之前等待 .

    /// <summary>
        /// Continuous loop that monitors the queue and launches jobs when they are retrieved.
        /// </summary>
        /// <param name="token"></param>
        /// <returns></returns>
        public virtual async Task ProcessMessagesAsync(CancellationToken token, ManualResetEventSlim reset)
        {
            CloudQueue queue = _queueClient.GetQueueReference(_queueName);
            await queue.CreateIfNotExistsAsync(token);
    
            var runningTasks = new ConcurrentDictionary<int, Task>();
    
            while (!token.IsCancellationRequested)
            {
                Debug.WriteLine("inLoop");
                // The default timeout is 90 seconds, so we won’t continuously poll the queue if there are no messages.
                // Pass in a cancellation token, because the operation can be long-running.
                CloudQueueMessage message = await queue.GetMessageAsync(token);
                if (message != null)
                {
                    var t = ProcessMessage(message, queue, token);
                    var c = t.ContinueWith(z => RemoveRunningTask(t.Id, runningTasks));
    
    
                    while (true)
                    {
                        if (runningTasks.TryAdd(t.Id, t))
                            break;
                        await Task.Delay(25);
                    }                                    
                }                    
                else
                {
                    try
                    {
                        await Task.Delay(500, token);
                    }
                    catch (Exception ex)
                    {
                        Debug.WriteLine(ex.Message);
                    }
                }
            }
            while (!runningTasks.IsEmpty)
            {
                Debug.WriteLine("Waiting for running tasks");
                await Task.Delay(500);
            }
            Debug.WriteLine("All tasks have finished, exiting ProcessMessagesAsync.");
            reset.Set();
        }
            public override void OnStop()
        {
            Debug.WriteLine("OnStop_Begin");
            tokenSource.Cancel();
            tokenSource.Token.WaitHandle.WaitOne();
            _reset.Wait();
            base.OnStop();
            Debug.WriteLine("Onstop_End");
            tokenSource.Dispose();
        }
    

相关问题