首页 文章

QueueClient.Receive()的异步方法?

提问于
浏览
4

我正在使用服务总线来连接Web角色和worker角色 . 我的worker角色处于连续循环中,我收到了Web角色使用QueueClient.Receive()方法发送的消息 .

但是使用此方法,如果服务总线队列上没有消息,它将等待几秒钟来接收消息,而不是移动到下一行以进一步执行 . 我希望有一些接收消息的异步方法?或者至少某种方式设置这个等待时间?

我从QueueClient的msdn文档中找到了这个BeginReceive方法,我希望这将是我的问题的答案,但我不知道如何使用这种方法 . 方法参数是异步回调和对象状态,我不知道它们是什么 .

有任何想法吗?

UPDATE: 感谢Sandrino的一个很好的解决方案,它的工作是异步的 . 但异步现在给了我一些问题 . 我的VS崩溃了 . 不确定是什么问题 . 以下是我正在使用的代码 .

Worker 角色:

public override void Run()
    {
while (!IsStopped)
        {                
                // Receive the message from Web Role to upload the broadcast to queue
                BroadcastClient.BeginReceive(OnWebRoleMessageReceived, null);                    

                // Receive the message from SignalR BroadcastHub
                SignalRClient.BeginReceive(OnSignalRMessageReceived, null);                    

            }
}


public void OnWebRoleMessageReceived(IAsyncResult iar)
    {
        BrokeredMessage receivedBroadcastMessage = null;
        receivedBroadcastMessage = BroadcastClient.EndReceive(iar);

        if (receivedBroadcastMessage != null)
        {   //process message
           receivedBroadcastMessage.Complete();
        }
    }

public void OnSignalRMessageReceived(IAsyncResult iar)
    {
        BrokeredMessage receivedSignalRMessage = null;
        receivedSignalRMessage = SignalRClient.EndReceive(iar);

        if (receivedSignalRMessage != null)
        {
            //process message
           receivedSignalRMessage.Complete();

           WorkerRoleClient.Send(signalRMessage);
        }
     }

我是否错过任何使VS过度工作和崩溃的事情?因为在转移到BeginReceive之前,当使用QueueClient.Receive时,它工作正常并且不会崩溃 .

谢谢

3 回答

  • 1

    BeginReceive 方法是你的方法 . 您通常会这样称呼它:

    void SomeMethod() 
    {
         ...
         client.BeginReceive(TimeSpan.FromMinutes(5), OnMessageReceived, null);
         ...
    }
    
    void OnMessageReceived(IAsyncResult iar)
    {
         var msg = client.EndReceive(iar);
         if (msg != null)
         {
             var body = msg.GetBody<MyMessageType>();
             ...
         }
    }
    
  • 6

    这就是我做的方式(扩展Sandrino De Mattia的解决方案):

    void SomeMethod() 
    {
        ...
        client.BeginReceive(TimeSpan.FromSeconds(5), OnMessageReceived, null);
        ...
    }
    
    void OnMessageReceived(IAsyncResult iar)
    {
        if(!IsStopped)
        {
            var msg = client.EndReceive(iar);
            if (msg != null)
            {
                var body = msg.GetBody<MyMessageType>();
    
                ... //Do something interesting with the message
    
                //Remove the message from the queue
                msg.Complete();
    
                client.BeginReceive(TimeSpan.FromSeconds(5), OnMessageReceived, null);
            }
        }
    }
    

    这样我就有了一个带有停止机制的“无限循环” .

  • 1

    最新版本的Azure ServiceBus SDK(download link)提供了对异步接收消息的完全支持:

    async Task TestMethod()
    {
        string connectionString = CloudConfigurationManager.GetSetting("Microsoft.ServiceBus.ConnectionString");
    
        QueueClient Client = QueueClient.CreateFromConnectionString(connectionString, "TestQueue");
        var message = await Client.ReceiveAsync();
    }
    

相关问题