首页 文章

以非阻塞方式调用TaskCompletionSource.SetResult

提问于
浏览
23

我发现 TaskCompletionSource.SetResult(); 在返回之前调用等待任务的代码 . 在我的情况下,导致死锁 .

这是一个简单的版本,是在一个普通的 Thread 中启动的

void ReceiverRun()
    while (true)
    {
        var msg = ReadNextMessage();
        TaskCompletionSource<Response> task = requests[msg.RequestID];

        if(msg.Error == null)
            task.SetResult(msg);
        else
            task.SetException(new Exception(msg.Error));
    }
}

代码的“异步”部分看起来像这样 .

await SendAwaitResponse("first message");
SendAwaitResponse("second message").Wait();

Wait实际上嵌套在非异步调用中 .

SendAwaitResponse(简化)

public static Task<Response> SendAwaitResponse(string msg)
{
    var t = new TaskCompletionSource<Response>();
    requests.Add(GetID(msg), t);
    stream.Write(msg);
    return t.Task;
}

我的假设是第二个SendAwaitResponse将在ThreadPool线程中执行,但它会在为ReceiverRun创建的线程中继续 .

无论如何设置任务的结果而不继续等待代码?

该应用程序是 console application .

3 回答

  • 5

    我发现了TaskCompletionSource.SetResult();在返回之前调用等待任务的代码 . 在我的情况下,导致死锁 .

    是的,我有一个blog post记录这个(AFAIK它没有记录在MSDN上) . 死锁发生的原因有两个:

    • async 和阻塞代码的混合(即, async 方法正在调用 Wait ) .

    • 使用 TaskContinuationOptions.ExecuteSynchronously 计划任务延续 .

    我建议从最简单的解决方案开始:删除第一件事(1) . 即,不要混用 asyncWait 来电:

    await SendAwaitResponse("first message");
    SendAwaitResponse("second message").Wait();
    

    相反,始终使用 await

    await SendAwaitResponse("first message");
    await SendAwaitResponse("second message");
    

    如果需要,可以在调用堆栈的另一个位置(不是在 async 方法中)进行 Wait .

    这是我最推荐的解决方案 . 但是,如果你想尝试删除第二个东西(2),你可以做一些技巧:将 SetResult 包裹在 Task.Run 中以强制它进入一个单独的线程(我的AsyncEx library具有 *WithBackgroundContinuations 扩展方法,正是这样),或者给你的线程一个实际的上下文(比如我的AsyncContext type)并指定 ConfigureAwait(false) ,它将cause the continuation to ignore the ExecuteSynchronously flag .

    但这些解决方案要比分离 async 和阻塞代码复杂得多 .

    作为旁注,请看一下TPL Dataflow;听起来你可能觉得它很有用 .

  • 0

    由于您的应用程序是一个控制台应用程序,它运行在默认的synchronization context,其中 await 延续回调将在等待任务已完成的同一线程上调用 . 如果要在 await SendAwaitResponse 之后切换线程,可以使用 await Task.Yield() 来执行此操作:

    await SendAwaitResponse("first message");
    await Task.Yield(); 
    // will be continued on a pool thread
    // ...
    SendAwaitResponse("second message").Wait(); // so no deadlock
    

    您可以通过在 Task.Result 中存储 Thread.CurrentThread.ManagedThreadId 并将其与 await 之后的当前线程ID进行比较来进一步改进 . 如果你仍然在同一个线程上,请执行 await Task.Yield() .

    虽然我知道 SendAwaitResponse 是您实际代码的简化版本,但内部仍然完全同步(在您的问题中显示它的方式) . 你为什么期望有任何线程切换?

    无论如何,您可能应该重新设计逻辑,而不是假设您当前使用的是什么线程 . 避免混合 awaitTask.Wait() 并使所有代码异步 . 通常,可以在顶层某处坚持一个 Wait() (例如在 Main 内) .

    [EDITED]ReceiverRun 调用 task.SetResult(msg) 实际上将控制流传输到 task 上的 await 点 - 没有线程切换,因为默认同步上下文的行为 . 因此,执行实际消息处理的代码将接管 ReceiverRun 线程 . 最终,在同一个线程上调用 SendAwaitResponse("second message").Wait() ,导致死锁 .

    下面是一个控制台应用程序代码,模仿您的示例 . 它在 ProcessAsync 中使用 await Task.Yield() 来计划单独线程上的延续,因此控制流返回 ReceiverRun 并且没有死锁 .

    using System;
    using System.Collections.Concurrent;
    using System.Threading;
    using System.Threading.Tasks;
    
    namespace ConsoleApplication
    {
        class Program
        {
            class Worker
            {
                public struct Response
                {
                    public string message;
                    public int threadId;
                }
    
                CancellationToken _token;
                readonly ConcurrentQueue<string> _messages = new ConcurrentQueue<string>();
                readonly ConcurrentDictionary<string, TaskCompletionSource<Response>> _requests = new ConcurrentDictionary<string, TaskCompletionSource<Response>>();
    
                public Worker(CancellationToken token)
                {
                    _token = token;
                }
    
                string ReadNextMessage()
                {
                    // using Thread.Sleep(100) for test purposes here,
                    // should be using ManualResetEvent (or similar synchronization primitive),
                    // depending on how messages arrive
                    string message;
                    while (!_messages.TryDequeue(out message))
                    {
                        Thread.Sleep(100);
                        _token.ThrowIfCancellationRequested();
                    }
                    return message;
                }
    
                public void ReceiverRun()
                {
                    LogThread("Enter ReceiverRun");
                    while (true)
                    {
                        var msg = ReadNextMessage();
                        LogThread("ReadNextMessage: " + msg);
                        var tcs = _requests[msg];
                        tcs.SetResult(new Response { message = msg, threadId = Thread.CurrentThread.ManagedThreadId });
                        _token.ThrowIfCancellationRequested(); // this is how we terminate the loop
                    }
                }
    
                Task<Response> SendAwaitResponse(string msg)
                {
                    LogThread("SendAwaitResponse: " + msg);
                    var tcs = new TaskCompletionSource<Response>();
                    _requests.TryAdd(msg, tcs);
                    _messages.Enqueue(msg);
                    return tcs.Task;
                }
    
                public async Task ProcessAsync()
                {
                    LogThread("Enter Worker.ProcessAsync");
    
                    var task1 = SendAwaitResponse("first message");
                    await task1;
                    LogThread("result1: " + task1.Result.message);
                    // avoid deadlock for task2.Wait() with Task.Yield()
                    // comment this out and task2.Wait() will dead-lock
                    if (task1.Result.threadId == Thread.CurrentThread.ManagedThreadId)
                        await Task.Yield();
    
                    var task2 = SendAwaitResponse("second message");
                    task2.Wait();
                    LogThread("result2: " + task2.Result.message);
    
                    var task3 = SendAwaitResponse("third message");
                    // still on the same thread as with result 2, no deadlock for task3.Wait()
                    task3.Wait();
                    LogThread("result3: " + task3.Result.message);
    
                    var task4 = SendAwaitResponse("fourth message");
                    await task4;
                    LogThread("result4: " + task4.Result.message);
                    // avoid deadlock for task5.Wait() with Task.Yield()
                    // comment this out and task5.Wait() will dead-lock
                    if (task4.Result.threadId == Thread.CurrentThread.ManagedThreadId)
                        await Task.Yield();
    
                    var task5 = SendAwaitResponse("fifth message");
                    task5.Wait();
                    LogThread("result5: " + task5.Result.message);
    
                    LogThread("Leave Worker.ProcessAsync");
                }
    
                public static void LogThread(string message)
                {
                    Console.WriteLine("{0}, thread: {1}", message, Thread.CurrentThread.ManagedThreadId);
                }
            }
    
            static void Main(string[] args)
            {
                Worker.LogThread("Enter Main");
                var cts = new CancellationTokenSource(5000); // cancel after 5s
                var worker = new Worker(cts.Token);
                Task receiver = Task.Run(() => worker.ReceiverRun());
                Task main = worker.ProcessAsync();
                try
                {
                    Task.WaitAll(main, receiver);
                }
                catch (Exception e)
                {
                    Console.WriteLine("Exception: " + e.Message);
                }
                Worker.LogThread("Leave Main");
                Console.ReadLine();
            }
        }
    }
    

    这与在 ReceiverRun 中执行 Task.Run(() => task.SetResult(msg)) 没有太大区别 . 我能想到的唯一优势是你可以明确控制何时切换线程 . 这样,您可以尽可能长时间保持相同的线程(例如,对于 task2task3task4 ,但是在 task4 之后仍需要另一个线程切换以避免 task5.Wait() 上的死锁) .

    这两种解决方案最终都会使线程池增长,这在性能和可伸缩性方面都很糟糕 .

    现在,如果我们 replace task.Wait() with await task 在上面的代码中 ProcessAsync 内,我们将不必使用 await Task.Yield 并且仍然没有死锁 . 但是, await 内的 await 调用之后的整个 await 调用实际上将在 ReceiverRun 线程上执行 . 如只要我们不阻止此线程与其他 Wait() 样式的调用并且不要处理消息,这种方法可能正常工作(异步IO绑定 await 样式调用仍然应该没问题,并且它们实际上可能会触发隐式线程切换) .

    也就是说,我认为您需要一个单独的线程,其上安装了序列化同步上下文来处理消息(类似于 WindowsFormsSynchronizationContext ) . 这就是包含 awaits 的异步代码应该运行的地方 . 您仍然需要避免在该线程上使用 Task.Wait . 如果单个消息处理需要大量CPU限制工作,则应使用 Task.Run 进行此类工作 . 对于异步IO绑定调用,您可以保持在同一个线程上 .

    您可能希望从@StephenClearyNito Asynchronous Library查看 ActionDispatcher / ActionDispatcherSynchronizationContext ,以获取异步消息处理逻辑 . 希望斯蒂芬跳进来并提供更好的答案 .

  • 24

    “我的假设是第二个SendAwaitResponse将在ThreadPool线程中执行,但它会在为ReceiverRun创建的线程中继续 . ”

    它完全取决于您在SendAwaitResponse中执行的操作 . 异步和并发are not the same thing .

    退房:C# 5 Async/Await - is it concurrent?

相关问题