首页 文章

Azure队列工作者角色多线程示例

提问于
浏览
1

我们有4个Azure队列,可以通过直接REST API或我们提供的WCF服务进行填充 .

  • 我们希望有一个辅助角色来监控所有这4个队列 .

  • 我正在考虑使用从配置中读取队列名称等的多线程并旋转进程方法(从队列中读取消息并进行处理)

有人可以请给我一个关于如何在 Worker 角色中实现这个目标的示例或指导吗?

不太确定如果没有多线程可以实现上述目标,因为我对多线程很陌生 .

谢谢

3 回答

  • 1

    您可以为不同的任务启动不同的线程,但也要考虑非线程方法(根据您对消息的处理方式,可能会执行得更好或更差):

    while (true)
    {
        var msg = queue1.GetMessage();
        if (msg != null)
        {
            didSomething = true;
            // do something with it
            queue1.DeleteMessage(msg);
        }
        msg = queue2.GetMessage();
        if (msg != null)
        {
            didSomething = true;
            // do something with it
            queue2.DeleteMessage(msg);
        }
        // ...
        if (!didSomething) Thread.Sleep(TimeSpan.FromSeconds(1)); // so I don't enter a tight loop with nothing to do
    }
    
  • 0

    以下是我们当前的实现,以更好的方式完成您的请求(或者我们认为) . 也就是说,这段代码还需要一些 heavy 清理工作 . 不过,这是0.1的功能版本 .

    public class WorkerRole : RoleEntryPoint
    {
        public override void Run()
        {
            var logic = new WorkerAgent();
            logic.Go(false);
        }
    
        public override bool OnStart()
        {
            // Initialize our Cloud Storage Configuration.
            AzureStorageObject.Initialize(AzureConfigurationLocation.AzureProjectConfiguration);
    
            return base.OnStart();
        }
    }
    
    public class WorkerAgent
    {
        private const int _resistance_to_scaling_larger_queues = 9;
        private Dictionary<Type, int> _queueWeights = new Dictionary<Type, int>
                                                           {
                                                               {typeof (Queue1.Processor), 1},
                                                               {typeof (Queue2.Processor), 1},
                                                               {typeof (Queue3.Processor), 1},
                                                               {typeof (Queue4.Processor), 1},
                                                           };
    
        private readonly TimeSpan _minDelay = TimeSpan.FromMinutes(Convert.ToDouble(RoleEnvironment.GetConfigurationSettingValue("MinDelay")));
        private readonly TimeSpan _maxDelay = TimeSpan.FromMinutes(Convert.ToDouble(RoleEnvironment.GetConfigurationSettingValue("MaxDelay")));
        protected TimeSpan CurrentDelay { get; set; }
    
        public Func<string> GetSpecificQueueTypeToProcess { get; set; }
    
        /// <summary>
        /// This is a superset collection of all Queues that this WorkerAgent knows how to process, and the weight of focus it should receive.
        /// </summary>
        public Dictionary<Type, int> QueueWeights
        {
            get
            {
                return _queueWeights;
            }
            set
            {
                _queueWeights = value;
            }
        }
    
        public static TimeSpan QueueWeightCalibrationDelay
        {
            get { return TimeSpan.FromMinutes(15); }
        }
    
    
        protected Dictionary<Type, DateTime> QueueDelays = new Dictionary<Type, DateTime>();
    
    
        protected Dictionary<Type, AzureQueueMetaData> QueueMetaData { get; set; }
    
        public WorkerAgent(Func<string> getSpecificQueueTypeToProcess = null)
        {
            CurrentDelay = _minDelay;
            GetSpecificQueueTypeToProcess = getSpecificQueueTypeToProcess;
        }
    
        protected IProcessQueues CurrentProcessor { get; set; }
    
        /// <summary>
        /// Processes queue request(s).
        /// </summary>
        /// <param name="onlyProcessOnce">True to only process one time. False to process infinitely.</param>
        public void Go(bool onlyProcessOnce)
        {
            if (onlyProcessOnce)
            {
                ProcessOnce(false);
            }
            else
            {
                ProcessContinuously();
            }
        }
    
        public void ProcessContinuously()
        {
            while (true)
            {
                // temporary hack to get this started.
                ProcessOnce(true);
            }
        }
    
        /// <summary>
        /// Attempts to fetch and process a single queued request.
        /// </summary>
        public void ProcessOnce(bool shouldDelay)
        {
            PopulateQueueMetaData(QueueWeightCalibrationDelay);
    
            if (shouldDelay)
            {
                Thread.Sleep(CurrentDelay);
            }
    
            var typesToPickFrom = new List<Type>();
            foreach(var item in QueueWeights)
            {
                for (var i = 0; i < item.Value; i++)
                {
                    typesToPickFrom.Add(item.Key);
                }
            }
    
            var randomIndex = (new Random()).Next()%typesToPickFrom.Count;
            var typeToTryAndProcess = typesToPickFrom[randomIndex];
    
            CurrentProcessor = ObjectFactory.GetInstance(typeToTryAndProcess) as IProcessQueues;
            CleanQueueDelays();
    
            if (CurrentProcessor != null && !QueueDelays.ContainsKey(typeToTryAndProcess))
            {
                var errors = CurrentProcessor.Go();
    
                var amountToDelay = CurrentProcessor.NumberProcessed == 0 && !errors.Any()
                                   ? _maxDelay // the queue was empty
                                   : _minDelay; // else
    
                QueueDelays[CurrentProcessor.GetType()] = DateTime.Now + amountToDelay;
            }
            else
            {
                ProcessOnce(true);
            }
        }
    
        /// <summary>
        /// This method populates/refreshes the QueueMetaData collection.
        /// </summary>
        /// <param name="queueMetaDataCacheLimit">Specifies the length of time to cache the MetaData before refreshing it.</param>
        private void PopulateQueueMetaData(TimeSpan queueMetaDataCacheLimit)
        {
            if (QueueMetaData == null)
            {
                QueueMetaData = new Dictionary<Type, AzureQueueMetaData>();
            }
    
            var queuesWithoutMetaData = QueueWeights.Keys.Except(QueueMetaData.Keys).ToList();
            var expiredQueueMetaData = QueueMetaData.Where(qmd => qmd.Value.TimeMetaDataWasPopulated < (DateTime.Now - queueMetaDataCacheLimit)).Select(qmd => qmd.Key).ToList();
            var validQueueData = QueueMetaData.Where(x => !expiredQueueMetaData.Contains(x.Key)).ToList();
            var results = new Dictionary<Type, AzureQueueMetaData>();
    
            foreach (var queueProcessorType in queuesWithoutMetaData)
            {
                if (!results.ContainsKey(queueProcessorType))
                {
                    var queueProcessor = ObjectFactory.GetInstance(queueProcessorType) as IProcessQueues;
                    if (queueProcessor != null)
                    {
                        var queue = new AzureQueue(queueProcessor.PrimaryQueueName);
                        var metaData = queue.GetMetaData();
                        results.Add(queueProcessorType, metaData);
    
                        QueueWeights[queueProcessorType] = (metaData.ApproximateMessageCount) == 0
                                                      ? 1
                                                      : (int)Math.Log(metaData.ApproximateMessageCount, _resistance_to_scaling_larger_queues) + 1;
                    }
                }
            }
    
            foreach (var queueProcessorType in expiredQueueMetaData)
            {
                if (!results.ContainsKey(queueProcessorType))
                {
                    var queueProcessor = ObjectFactory.GetInstance(queueProcessorType) as IProcessQueues;
                    if (queueProcessor != null)
                    {
                        var queue = new AzureQueue(queueProcessor.PrimaryQueueName);
                        var metaData = queue.GetMetaData();
                        results.Add(queueProcessorType, metaData);
                    }
                }
            }
    
            QueueMetaData = results.Union(validQueueData).ToDictionary(data => data.Key, data => data.Value);
        }
    
        private void CleanQueueDelays()
        {
            QueueDelays = QueueDelays.Except(QueueDelays.Where(x => x.Value < DateTime.Now)).ToDictionary(x => x.Key, x => x.Value);
        }
    }
    

    有了这个,我们有一个单独的类知道如何处理每个队列,它实现了IProcessQueues . 我们使用我们希望它处理的每种类型加载 _queueWeights 集合 . 我们设置 _resistance_to_scaling_larger_queues 常量来控制我们希望如何扩展 . 请注意,这会以对数方式缩放(请参阅 PopulateQueueMetaData 方法) . 没有队列的权重小于1,即使它有0项 . 如果将 PopulateQueueMetaData 设置为 10 ,那么对于每个幅度增加10的数量级,那个类型's 2789655 gets increased by 1. For example, if you have QueueA with 0 items, QueueB with 0 items, and QueueC with 10 items, then your respective weights are 1, 1, and 2. This means QueueC has a 50% chance of being processed next while QueueA and QueueB each only have a 25% chance to be processed. If QueueC has 100 items, then your weights are 1, 1, 3 and your chances to be processed are 20%, 20%, 60%. This ensures that your empty queues don' t都会被遗忘 .

    这样做的另一件事是它有 _minDelay_maxDelay . 如果此代码认为队列中至少有一个项目,那么它将继续以 _minDelay 速率处理它 . 但是,如果它最后有0个项目,那么它将不允许以比 _maxDelay 速率更快的速度处理它 . 所以这意味着如果随机数生成器拉出具有0项的队列(无论重量),它将简单地跳过尝试处理它并继续下一次迭代 . (为了更好的存储事务效率,可以在此部分进行一些额外的优化,但这是一个很好的补充 . )

    我们在这里有几个自定义类(例如 AzureQueueAzureQueueMetaData ) - 一个基本上是 CloudQueue 的包装器,另一个存储一些信息,例如队列的近似计数 - 没有什么有趣的(只是一种方法)简化代码) .

    同样,我不称这个“漂亮”的代码,但是一些相当聪明的概念在这段代码中都实现了并且功能齐全 . 出于任何原因使用它 . :)

    最后,编写这样的代码可以让我们拥有一个可以处理更多队列的项目 . 如果我们发现这根本不是很好的话 . 将这个角色旋转到20个实体,直到它完成,然后将它们旋转回来 . 有一个特别讨厌的队列?从 _queueWeights 集合排队的注释,部署以管理其余队列,然后使用除 _queueWeights 集合中注释掉的所有其他队列再次重新部署它,然后再将其部署到另一组实例并进行调试没有a)让其他QueueProcessors干扰你的调试和b)你的调试干扰你的其他QueueProcessors . 最终,这提供了很多灵活性和效率 .

  • 2

    在worker角色的while循环内部,启动4个线程,就像编写多线程C#应用程序一样 . 当然,您需要定义四个不同的线程函数,并且这些函数应该具有单独的while循环来轮询队列 . 在worker的while循环结束时,只需等待线程完成 .

相关问题