首页 文章

异步等待另一个请求而不阻塞

提问于
浏览
2

我有一个websocket应用程序,它是一个OWIN中间件 . 当请求进入时,会启动websocket处理程序的新实例,然后在循环中等待icoming消息,如下所示:

var buffer = new byte[1024*64];
Tuple<ArraySegment<byte>, WebSocketMessageType> received;
do
{
    received = await _webSocket.ReceiveMessage(buffer, _cancellationToken.Token);
    if (received.Item1.Count > 0 && someConditionForCallingDoSomething)
    {
        await DoSomething(received.Item1);
    }
    else if(isAnswer)
    {
        QueueAnswer(received.Item1);
    }
} while (received.Item2 != WebSocketMessageType.Close);

当数据可用时,从 _webSocket.ReceiveMessage 返回的任务将完成 .

DoSomething 处理其数据,然后通过websocket连接发送内容 . 然后它应该通过websocket连接等待消息 . 处理完此消息后,它应该做一些工作并返回(任务) . 也许这个小图解释了它更容易:

_______________
| DoSomething |
|-------------|
|    Work  ---------> Send WS message
|             |
|     ??      |
|             |
|  More Work <------- Receive WS message
|   |         |
|   V         |
|  return;    |
|_____________|

Do some work with the data
          |
          V
     Send a message
          |
          V
   Wait for an answer
          |
          V
    Process answer
          |
          V
        finish

我试图等待答案:

var answerCancel = new CancellationTokenSource();
answerCancel.CancelAfter(30 * 1000);

var answer = await Task.Run(async () => 
    {
        string tmpAnswer = null;

        while (!_concurrentAnswerDict.TryGetValue(someKey, out tmpAnswer)) {
            await Task.Delay(150, answerCancel.Token);
        }

        return tmpAnswer;
    }, answerCancel.Token);

但这似乎阻止,直到任务被取消 . 当我调试程序时,我在30秒后看到 QueueAnswer 的调用 . 我想, Task.Run 将在一个新线程中运行该函数,但它似乎没有 . 从 Task.Run 阻塞的角度来看,我认为它不起作用是合乎逻辑的,因为我等待执行 DoSomething ,因此接收新消息也将被阻止 .

我的问题是:我如何实现这样的行为?在完成之前,如何使 DoSomething 等待另一个websocket消息?

每个提示都要提前感谢您

卢卡斯

1 回答

  • 2

    首先,我建议使用SignalR,因为他们会为你处理很多这些难题 . 但如果你想自己做,请继续阅读......

    此外,我'm assuming that 1855379 and 1855380 messages can arrive on the same web socket in any order, and that you'正在使用 _concurrentAnswerDict 来协调来自 DoSomething 的传出"question"消息以及传入的"answer"消息 .

    在这种情况下,您将需要一个独立于 DoSomething 的"websocket reader"任务;你不能让你的读者 await DoSomething 因为这会妨碍你阅读答案 . 我认为's the main problem you'有 .

    这是一种罕见的情况,其中任务可能是不可接受的 . 假设 DoSomething 将捕获自己的异常并处理日志记录等等,那么我们可以将其视为独立的"main"并忽略它返回的任务:

    var buffer = new byte[1024*64];
    Tuple<ArraySegment<byte>, WebSocketMessageType> received;
    do
    {
      received = await _webSocket.ReceiveMessage(buffer, _cancellationToken.Token);
      if (received.Item1.Count > 0 && someConditionForCallingDoSomething)
      {
        var _ = DoSomething(received.Item1);
      }
      else if(isAnswer)
      {
        QueueAnswer(received.Item1);
      }
    } while (received.Item2 != WebSocketMessageType.Close);
    

    这应该允许 QueueAnswerDoSomething 尚未完成时运行 .

    我想,Task.Run将在一个新线程中运行该函数,但它似乎没有 . 从Task.Run阻塞的角度来看,我认为它不起作用是合乎逻辑的,因为我等待DoSomething的执行,因此接收新消息也将被阻止 .

    Task.Run 正在另一个线程中运行 . 但 DoSomething (异步)等待它完成,并且读取循环(异步)等待 DoSomething 完成,然后才能读取下一条消息 .

    其他说明:

    while (!_concurrentAnswerDict.TryGetValue(someKey, out tmpAnswer)) {
      await Task.Delay(150, answerCancel.Token);
    }
    

    这对我来说似乎很奇怪 . 我建议使用 TaskCompletionSource<Answer> 而不是 Answer 的键字典 . 然后, QueueAnswer 将调用 TaskCompletionSource<Answer>.SetResult ,此代码将等待 TaskCompletionSource<Answer>.Task (如果需要超时则与 Task.Delay 一起) .

相关问题