首页 文章

如何指定Parallel.ForEach中执行的并行任务数? [重复]

提问于
浏览
2

这个问题在这里已有答案:

我有~500个任务,每个任务需要约5秒,大部分时间浪费在等待远程资源回复上 . 我想定义应该自己生成的线程数(经过一些测试)并在这些线程上运行任务 . 当一个任务完成时,我想在可用的线程上生成另一个任务 .

我发现 System.Threading.Tasks 最容易实现我想要的,但我认为不可能指定应该并行执行的任务数量 . 对于我的机器,它总是大约8(四核cpu) . 是否有可能以某种方式告诉应该并行执行多少任务?如果不是最简单的方法来实现我想要的东西? (我试过线程,但代码要复杂得多) . 我尝试增加 MaxDegreeOfParallelism 参数,但它只限制了最大数量,所以这里没有运气......

这是我目前的代码:

using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;

namespace ConsoleApplication1
{
    class Program
    {
        private static List<string> _list = new List<string>();
        private static int _toProcess = 0;

        static void Main(string[] args)
        {   
            for (int i = 0; i < 1000; ++i)
            {
                _list.Add("parameter" + i);
            }

            var w = new Worker();
            var w2 = new StringAnalyzer();

            Parallel.ForEach(_list, new ParallelOptions() { MaxDegreeOfParallelism = 32 }, item =>
            {
                ++_toProcess;
                string data = w.DoWork(item);
                w2.AnalyzeProcessedString(data);
            });

            Console.WriteLine("Finished");           
            Console.ReadKey();
        }

        static void Done(Task<string> t)
        {            
            Console.WriteLine(t.Result);
            --_toProcess;
        }
    }

    class Worker
    {
        public string DoWork(string par)
        {
            // It's a long running but not CPU heavy task (downloading stuff from the internet)
            System.Threading.Thread.Sleep(5000);            
            return par + " processed";
        }
    }

    class StringAnalyzer
    {
        public void AnalyzeProcessedString(string data)
        {
            // Rather short, not CPU heavy
            System.Threading.Thread.Sleep(1000);
            Console.WriteLine(data + " and analyzed");
        }
    }
}

2 回答

  • 0

    正如L.B所提到的,.NET Framework具有执行I / O操作的方法(对数据库,Web服务等的请求)using IOCP internally,它们可以通过它们的名称识别 - 它按照惯例以Async结束 . 因此,您可以使用它们来构建可以同时处理多个请求的可伸缩的可伸缩应用程序 .

    EDIT :我已经完全用现代最佳实践重写了代码示例,因此它变得更易读,更简单,更易于使用 .

    对于.NET 4.5,我们可以使用async-await方法:

    class Program
    {
        static void Main(string[] args)
        {
            var task = Worker.DoWorkAsync();
            task.Wait(); //stop and wait until our async method completed
    
            foreach (var item in task.Result)
            {
                Console.WriteLine(item);
            }
    
            Console.ReadLine();
        }
    }
    
    static class Worker
    {
        public async static Task<IEnumerable<string>> DoWorkAsync()
        {
            List<string> results = new List<string>();
    
            for (int i = 0; i < 10; i++)
            {
                var request = (HttpWebRequest)WebRequest.Create("http://microsoft.com");
                using (var response = await request.GetResponseAsync())
                {
                    results.Add(response.ContentType);
                }
            }
    
            return results;
        }
    }
    

    Here是关于使用async-await进行异步编程的MSDN教程 .

  • 6

    假设您在获取资源时可以使用 HttpClient.GetStringAsync 等本机异步方法,

    int numTasks = 20;
    SemaphoreSlim semaphore = new SemaphoreSlim(numTasks);
    HttpClient client = new HttpClient();
    
    List<string> result = new List<string>();
    foreach(var url in urls)
    {
        semaphore.Wait();
    
        client.GetStringAsync(url)
              .ContinueWith(t => {
                  lock (result) result.Add(t.Result);
                  semaphore.Release();
              });
    }
    
    for (int i = 0; i < numTasks; i++) semaphore.Wait();
    

    由于 GetStringAsync 内部使用IO完成端口(与大多数其他异步IO方法一样)而不是创建新线程,因此这可以是您所追求的解决方案 .

    另见http://blog.stephencleary.com/2013/11/there-is-no-thread.html

相关问题