首页 文章

并行使用TPL中预期的双线程

提问于
浏览
1

我将从一个基本的解释开始,我将如何理解一些工作,然后用tldr结束这一切;如果人们只是希望达到我在这里的实际问题 . 如果我对这里的任何事情的理解是错误的,请纠正我 .

TPL代表任务并行库,它是.NET 4.0的答案,试图进一步简化线程以方便开发人员使用 . 如果你不熟悉它,(在一个非常基础的层面上)你启动一个新的Task对象并传递一个委托,然后在一个从线程池中取出的后台线程上运行(通过使用线程池而不是真正制作一个通过使用这些现有线程而不是创建和处理新线程来保存新线程,时间和资源 .

根据我的理解,C#中的Parallel.ForEach命令将为它应该执行的每个委托生成一个新线程(可能来自一个线程池),可能的例外是自动内联一个甚至可能更多的迭代,如果编译器决定它们将足够快地发生以使其更有效 .

关于我的目标最相关的背景信息:

我正在尝试制作一个快速程序,启动一个Task与程序的其余部分同时运行 . 在这个Task中,Parallel.ForEach运行3'次迭代 . 总的来说,我们希望程序现在运行总共5个线程(最多):主线程为1,实际任务为1,Parallel.ForEach最多为3 . 每个线程都有自己的目标要完成(虽然Parallel.ForEach都有相同的目标,其相关的itemNumber值不同,以便计算 . 当主线程完成所有目标时,它使用Task.Wait()等待在完成任务,等待Parallel.ForEach完成 . 然后使用和验证值 .

tldr;实际问题:

当运行上述想法时,Parallel.ForEach似乎正在初始化两倍于SynchronizationContexts(一个TPL对象本质上是另一个线程),正如我所期望的那样并运行所有这些,但是只等待它们的预期数量 . 因为Parallel.ForEach() . Wait()命令在预期的线程数量运行时完成,所以Task也会完成,因为它认为一切都已完成 . 然后主程序选择任务已完成,并且当它验证当前没有更多后台线程在运行时,有时剩余的Parrallel.ForEach()尚未完成,因此抛出错误 .

已验证线程数量与我在每次SynchronizationContext的后调用(Async方法踢球者)上打印到调试窗口时所说的相匹配 . 每个线程也由主线程对象引用,否则计划在完成任务时将其处理掉,但由于未完成线程的引用仍然是由于未完全预期创建的,因此处理不能正确进行 .

Thread testThread = Thread.CurrentThread;
Task backgroundTask = taskFactory.StartNew(() =>
{
    Thread rootTaskThread = Thread.CurrentThread;
    Assert.AreNotEqual(testThread, rootTaskThread, "First task should not inline");
    Thread.Sleep(TimeSpan.FromSeconds(2));

    Parallel.ForEach(new[] { 1, 2, 3, 4 },
       new ParallelOptions { TaskScheduler = taskFactory.Scheduler }, (int item) => {
        Thread.Sleep(TimeSpan.FromSeconds(1));
     });
});

在上面的示例中,主线程,backgroundTask任务和8个Parallel.ForEach线程最终存在,其中最后9个在SynchronizationContexts上创建 .

在我的自定义的SynchronizationContext中重写的唯一方法是post,如下所示:

public override void Post(SendOrPostCallback d, object state){
    Request requestOrNull = Request.ExistsForCurrentThread() ? Request.GetForCurrentThread() as Request : null;
    Request.IAsyncContextData requestData = null;

    if (requestOrNull != null){
       requestData = requestOrNull.CaptureDataForNewThreadAndIncrementReferenceCount();
    }

    Debug.WriteLine("Task started - request data " + (requestData == null ? "DOES NOT EXIST" : "EXISTS"));

    base.Post((object internalState) => {
        // Capture the spawned thread state and restore the originating thread state
        try{
            if (requestData != null){
                Request.AttachToAsynchronousContext(requestData);
            }
            d(state);
        }
        finally{
            // Restore original spawned thread state
            if (requestData != null){
            // Disposes the request if this is the last reference to it
                Request.DetachFromAsynchronousContext(requestData);
            }
        Debug.WriteLine("Task completed - request data " + (requestData == null ? "DOES NOT EXIST" : "EXISTS"));
        }
    }, state);
 }

我相信TaskScheduler只做了它所需的基本内容:

private readonly RequestSynchronizationContext context;
private readonly ConcurrentQueue<Task> tasks = new ConcurrentQueue<Task>();

public RequestTaskScheduler(RequestSynchronizationContext synchronizationContext)
{
    this.context = synchronizationContext;
}

protected override void QueueTask(Task task){
    this.tasks.Enqueue(task);
    this.context.Post((object state) => {
        Task nextTask;
        if (this.tasks.TryDequeue(out nextTask)) 
            this.TryExecuteTask(nextTask);
    }, null);
}

protected override bool TryExecuteTaskInline(Task task, bool taskWasPreviouslyQueued){
    if (SynchronizationContext.Current == this.context)
        return this.TryExecuteTask(task);
    else
        return false;
}

protected override IEnumerable<Task> GetScheduledTasks(){
    return this.tasks.ToArray();
}

TaskFactory:

public RequestTaskFactory(RequestTaskScheduler taskScheduler)
    : base(taskScheduler)
{ }

有关为何可能发生这种情况的任何想法?

1 回答

  • 0

    任务本身不会创建线程 . 由TaskScheduler决定如何使操作异步,如果可以的话 . 例如某些操作使用异步IO,在这种情况下,硬件使其成为异步,而不是另一个工作线程 .

    如何调用continuation取决于同步上下文 . 该上下文不是另一个线程,它只是抽象了可以运行操作的标准 . 例如,在WPF,WinForms,Silverlight等中,有一个UI同步上下文,需要在 specific thread (UI线程或主线程,以避免异常)上执行操作 .

    ForEach将尝试创建线程(更具体地说,它将尝试询问同步上下文以启动多个异步操作) . 调度程序实际上定义了它是如何做到的 . 如果你给它三个任务,它可能会创建三个线程,也可能不会 . 它决定了三个并发线程是否是好事 . 如果你只有两个核心,为例如,ForEach不会创建两个以上的线程,因为由于上下文切换开销,使用单个线程并按顺序运行代码可能会更糟 .

    目前尚不清楚“初始化两倍于SynchronizationContext”的含义 . 这些不是线程 . 你只是意味着它创造了比你预期更多的线程吗?或者你的意思是Post被称为超出预期?您的SynchronizationContext类基于什么? (即什么是它的基类) . 基础的作用很大程度上定义了Post调用的方式 . 它可能感觉需要创建另一个异步操作来跟踪其他操作...如何使调度程序使用此上下文?

    SynchronizationContext早在TPL之前就已存在(首先出现在.NET 2.0中) . 它所做的一件事是管理异步操作请求 . 你发布的是否明白这一点并不清楚 .

    Update: 第一次调用QueueTask来自StartNew . 对QueueTask的第二次调用来自对ForEach的调用,对QueueTask的第三次调用间接来自QueueTask中的TryExecuteTask . 接下来的4次QueueTask调用是传递给ForEach的主体 .

    根据负载,QueueTask最多可被调用3次 . 如果我在QueueTask上调试和中断,QueueTask只被调用7次 .

    在这一点上,因为你很难说为什么有时会有一些额外的QueueTask调用 . 它可能来自你有效地要求调度程序从's already executing asynchronously. My guess is that it's简单计时的任务中排队另一个异步任务的方式 . QueueTask可以如此快速地调用(因为_12000059_t知道任务已经排队并强制执行任务(强制另一次调用QueueTask) .

    如果QueueTask实际上导致对QueueTask的另一次调用,因为它还没有调度它所在的任务,这就解释了为什么最多有10次调用QueueTask . 这是TryExeucteTask调用导致每个ForEach主体的“双重”调用......

相关问题