首页 文章

基于多线程的RabbitMQ消费者

提问于
浏览
9

我们有一个Windows服务,它监听单个RabbitMQ队列并处理消息 .

我们希望扩展相同的Windows服务,以便它可以侦听RabbitMQ的多个队列并处理消息 .

不确定是否可以通过使用多线程来实现,因为每个线程都必须监听(阻塞)队列 .

由于我对多线程很陌生,需要关注以下几点的高级指南,这将有助于我开始构建原型 .

  • 是否可以使用线程在单个应用程序中侦听多个队列?

  • 如何处理如果任何单个线程被关闭(由于异常等)的情况,如何在不重新启动整个Windows服务的情况下返回 .

  • 任何设计模式或开源实现都可以帮助我处理这种情况 .

2 回答

  • 8

    我喜欢你写你的问题的方式 - 它开始时非常广泛,专注于具体细节 . 我已成功实现了一些非常相似的东西,目前我正在开发一个开源项目,以吸取我的经验并将它们带回社区 . 不幸的是,虽然 - 我还没有整齐地打包我的代码,这对你没有多大帮助!无论如何,回答你的问题:

    1. Is it possible to use threading for multiple queues.

    答:是的,但它可能充满了陷阱 . 也就是说,RabbitMQ .NET库并不是那里编写得最好的代码,我发现它是AMQP协议的一个相对麻烦的实现 . 其中一个最有害的警告是它如何处理"receiving"或"consuming"行为,如果你不小心,这很容易导致死锁 . 幸运的是,它在API文档中得到了很好的说明 . Advice - 如果可以,请使用单例连接对象 . 然后,在每个线程中,使用连接创建新的 IModel 和相应的使用者 .

    2. How to gracefully handle exceptions in threads - 我认为这是另一个话题,我不会在这里解决它,因为你可以使用几种方法 .

    3. Any open-source projects? - 我喜欢EasyNetQ背后的想法,尽管我最后还是自己动手了 . 我希望记得在我的开源项目完成后回过头来看,因为我相信它比EasyNetQ更好 .

  • 8

    您可能会发现this answer非常有帮助 . 我对RabbitMQ的工作原理有了一个非常基本的了解,但我可能会在每个线程的每个通道上继续使用一个用户,正如那里所建议的那样 .

    为此组织线程模型肯定有多个选项 . 实际实现将取决于您需要如何处理来自多个队列的消息:并行,或者通过聚合它们并序列化处理 . 以下代码是一个控制台应用程序,它实现了后一种情况的模拟 . 它使用Task Parallel LibraryBlockingCollection类(这对于这种任务非常方便) .

    using System;
    using System.Collections.Concurrent;
    using System.Collections.Generic;
    using System.Linq;
    using System.Threading;
    using System.Threading.Tasks;
    
    namespace Console_21842880
    {
        class Program
        {
            BlockingCollection<object> _commonQueue;
    
            // process an individual queue
            void ProcessQueue(int id, BlockingCollection<object> queue, CancellationToken token)
            {
                while (true)
                {
                    // observe cancellation
                    token.ThrowIfCancellationRequested();
                    // get a message, this blocks and waits
                    var message = queue.Take(token);
    
                    // process this message
                    // just place it to the common queue
                    var wrapperMessage = "queue " + id + ", message: " + message;
                    _commonQueue.Add(wrapperMessage);
                }
            }
    
            // process the common aggregated queue
            void ProcessCommonQeueue(CancellationToken token)
            {
                while (true)
                {
                    // observe cancellation
                    token.ThrowIfCancellationRequested();
                    // this blocks and waits
    
                    // get a message, this blocks and waits
                    var message = _commonQueue.Take(token);
    
                    // process this message
                    Console.WriteLine(message.ToString());
                }
            }
    
            // run the whole process
            async Task RunAsync(CancellationToken token)
            {
                var queues = new List<BlockingCollection<object>>();
                _commonQueue = new BlockingCollection<object>();
    
                // start individual queue processors
                var tasks = Enumerable.Range(0, 4).Select((i) =>
                {
                    var queue = new BlockingCollection<object>();
                    queues.Add(queue);
    
                    return Task.Factory.StartNew(
                        () => ProcessQeueue(i, queue, token), 
                        TaskCreationOptions.LongRunning);
                }).ToList();
    
                // start the common queue processor
                tasks.Add(Task.Factory.StartNew(
                    () => ProcessCommonQeueue(token),
                    TaskCreationOptions.LongRunning));
    
                // start the simulators
                tasks.AddRange(Enumerable.Range(0, 4).Select((i) => 
                    SimulateMessagesAsync(queues, token)));
    
                // wait for all started tasks to complete
                await Task.WhenAll(tasks);
            }
    
            // simulate a message source
            async Task SimulateMessagesAsync(List<BlockingCollection<object>> queues, CancellationToken token)
            {
                var random = new Random(Environment.TickCount);
                while (true)
                {
                    token.ThrowIfCancellationRequested();
                    await Task.Delay(random.Next(100, 1000));
                    var queue = queues[random.Next(0, queues.Count)];
                    var message = Guid.NewGuid().ToString() + " " +  DateTime.Now.ToString();
                    queue.Add(message);
                }
            }
    
            // entry point
            static void Main(string[] args)
            {
                Console.WriteLine("Ctrl+C to stop...");
    
                var cts = new CancellationTokenSource();
                Console.CancelKeyPress += (s, e) =>
                {
                    // cancel upon Ctrl+C
                    e.Cancel = true;
                    cts.Cancel();
                };
    
                try
                {
                    new Program().RunAsync(cts.Token).Wait();
                }
                catch (Exception ex)
                {
                    if (ex is AggregateException)
                        ex = ex.InnerException;
                    Console.WriteLine(ex.Message);
                }
    
                Console.WriteLine("Press Enter to exit");
                Console.ReadLine();
            }
        }
    }
    

    另一个想法可能是使用Reactive Extensions (Rx) . 如果你能想到事件中到达的消息,Rx可以帮助将它们聚合成单个流 .

相关问题