首页 文章

限制并行运行的队列的使用者数量

提问于
浏览
0

我正在使用队列,有两个线程 . 一个是Enqueue,另一个是Dequeue . 分别称为 生产环境 者和消费者 . 生产环境 可以是无限的 . 但我需要限制消费者同时运行 . 我读到了“任务并行库”和“Parallel.For” . 但我不确定我应该在这里实施它们的方式 . 请建议我 . 以下是一些代码段,供您更好地理解该问题

static void Main(string[] args)
{

// The Producer code comes here
// ...

// The Consumer code comes here
Thread consumer = new Thread(new ThreadStart(PendingBookingConsumer));
consumer.Start();
}

private static void PendingBookingConsumer()
{
    try
    {
        while (true)
        {
            if (pendingBookingsQueue != null && pendingBookingsQueue.Count > 0)
            {
                PendingBooking oPendingBooking = pendingBookingsQueue.Dequeue();

                //Run the Console App
                string command = @"C:\ServerAgentConsole.exe";
                string args = oPendingBooking.Id + " " + oPendingBooking.ServiceAccountEmail.Trim() + " " + oPendingBooking.ServiceAccountPassword.Trim() + " " + oPendingBooking.ServiceAccountEmail.Trim()
                    + " " + oPendingBooking.MailBoxOwnerEmail.Trim() + " " + oPendingBooking.Method.Trim();

                Process process = new Process();
                process.StartInfo.FileName = command;
                process.StartInfo.Arguments = args;
                process.EnableRaisingEvents = true;

                process.Exited += (sender, e) =>
                {
                    Process myProcess = (Process)sender;
                    Console.WriteLine("Agent for booking ID :" + myProcess.StartInfo.Arguments[0] + " Done");
                };

                process.Start();
                Thread.Sleep(2);
            }
        }
    }
    catch (Exception ex)
    {
        Console.WriteLine(ex.Message);
    }
}

3 回答

  • 1

    Use one of the common techniques to process a BlockingCollection with a fixed degree of parallelism.Parallel.ForEach 选项中指定DOP .

    然后,使处理函数等待子进程:

    process.Start();
    process.WaitForExit();
    

    这样,您就可以随时拥有固定数量的子流程 .

  • 0

    您还可以考虑TPL Dataflow库,轻松实现 Producer/Consumer 模式:

    private static BufferBlock<int> m_buffer = new BufferBlock<int>>(
        new DataflowBlockOptions { BoundedCapacity = 10, MaxDegreeOfParallelism = 4 });
    
    // Producer
    private static async void Producer()
    {
        while(true)
        {
            await m_buffer.SendAsync(Produce());
        }
    }
    
    // Consumer
    private static async Task Consumer()
    {
        while(true)
        {
            Process(await m_buffer.ReceiveAsync());
        }
    }
    

    您可以看到 BoundedCapacity 用于限制技术,限制队列大小, MaxDegreeOfParallelism 用于限制并行消耗任务 .

    你可以下载introduction to TPL Dataflow from MSDN here .

    PS:How to: Implement a Producer-Consumer Dataflow Pattern在MSDN上

  • 0

    BlockingCollection支持上限

    BlockingCollection Constructor (Int32)

    汉斯评论说这只是收藏的大小 .
    也许你可以在消费者中并行执行此操作 .

相关问题