首页 文章

AMQP-CPP:TCP处理程序中的管道错误

提问于
浏览
1

不幸的是,在我的项目中,我总是在事件处理程序中的onError函数中出现错误消息“Broken pipe” . 不幸的是,我从未进入onConnected状态 . 事件处理程序中的监视器Funktion使用Flag AMQP :: readable调用两次 . 之后,调用它时没有设置标志,这是我的管道坏了的时候 .

这是我在代码中所做的 .

首先我打开连接:

int Communicator_RabbitMQ::Open(string device)
{

    AMQP::Address address(AMQP::Address("amqp://test:test@localhost/"));

            // make a connection
    m_connection = std::make_shared< AMQP::TcpConnection> (&oCommunicator_RabbitMQ_Handler, address);


    // we need a channel too
    m_channel = std::make_shared <AMQP::TcpChannel> (m_connection.get());

    m_channel->declareExchange("my-exchange", AMQP::fanout);
    m_channel->declareQueue("my-queue");
    m_channel->bindQueue("my-exchange", "my-queue", "my-routing-key");


    m_channel->declareExchange("cyos_tx_exchange", AMQP::direct);
    m_channel->declareQueue("cyos_queue");
    m_channel->bindQueue("cyos_tx_exchange", "cyos_queue", "");


    return true;
}

然后我在我的线程中循环调用read函数:

string Communicator_RabbitMQ::Read()
{

    int result = 0;
    int maxfd = 1;


    struct timeval tv
    {
        1, 0
    };

    string returnValue; //Rückgabe der Methode  
    string message;     // Nachricht aus RabbitMQ

    try
    {

        FD_ZERO(&oCommunicator_RabbitMQ_Handler.m_set);
        FD_SET(oCommunicator_RabbitMQ_Handler.m_fd, &oCommunicator_RabbitMQ_Handler.m_set);


        if (oCommunicator_RabbitMQ_Handler.m_fd != -1)
        {
            maxfd = oCommunicator_RabbitMQ_Handler.m_fd + 1;
        }

        result = select(FD_SETSIZE, &oCommunicator_RabbitMQ_Handler.m_set, NULL, NULL, &tv);

        if ((result == -1) && errno == EINTR)
        {
            TRACE(L"Error in socket");
        }
        else if (result > 0)
        {
            if (oCommunicator_RabbitMQ_Handler.m_flags & AMQP::readable)
                TRACE(L"Got something");

            if (FD_ISSET(oCommunicator_RabbitMQ_Handler.m_fd, &oCommunicator_RabbitMQ_Handler.m_set))
            {
                m_connection->process(oCommunicator_RabbitMQ_Handler.m_fd, oCommunicator_RabbitMQ_Handler.m_flags);

            }
        }

    }
    catch (exception e)
    {
        cout << e.what();
    }

    return "";

}

这是TCP事件处理程序:

#pragma once

class Communicator_RabbitMQ_Handler : public AMQP::TcpHandler
{
private:


    /**
    * Method that is called when the connection succeeded
    * @param socket Pointer to the socket
    */
    virtual void onConnected(AMQP::TcpConnection* connection)
    {
        std::cout << "connected" << std::endl;
    }

        /**
         *  When the connection ends up in an error state this method is called.
         *  This happens when data comes in that does not match the AMQP protocol
         *
         *  After this method is called, the connection no longer is in a valid
         *  state and can be used. In normal circumstances this method is not called.
         *
         *  @param  connection      The connection that entered the error state
         *  @param  message         Error message
         */
    virtual void onError(AMQP::TcpConnection* connection, const char* message)
    {
        // report error
        std::cout << "AMQP TCPConnection error: " << message << std::endl;
    }

        /**
         *  Method that is called when the connection was closed.
         *  @param  connection      The connection that was closed and that is now unusable
         */
    virtual void onClosed(AMQP::TcpConnection* connection)
    {
        std::cout << "closed" << std::endl;
    }


        /**
         *  Method that is called by AMQP-CPP to register a filedescriptor for readability or writability
         *  @param  connection  The TCP connection object that is reporting
         *  @param  fd          The filedescriptor to be monitored
         *  @param  flags       Should the object be monitored for readability or writability?
         */
    virtual void monitor(AMQP::TcpConnection* connection, int fd, int flags)
    {
        //TRACE(L"Communicator_RabbitMQ_Handler, monitor called, %d, %d, %x", fd, flags, &m_set);
        // we did not yet have this watcher - but that is ok if no filedescriptor was registered
        if (flags == 0) 
            return;


        if (flags & AMQP::readable)
        {
            FD_SET(fd, &m_set);
            m_fd = fd;
            m_flags = flags;
        }
    }



public:
    Communicator_RabbitMQ_Handler() = default;

    int m_fd = -1;
    int m_flags = 0;
    fd_set m_set;

};

RabbitMQ日志条目:

2018-07-02 07:04:50.272 [info] <0.9653.0> accepting AMQP connection <0.9653.0> ([::1]:39602 -> [::1]:5672)
2018-07-02 07:04:50.273 [warning] <0.9653.0> closing AMQP connection <0.9653.0> ([::1]:39602 -> [::1]:5672):
{handshake_timeout,handshake}

1 回答

  • 0

    我终于通过在rabbitmq.config文件中将hanshake超时增加到20秒来解决这个问题 . 我刚刚在该文件中添加了以下内容:

    handshake_timeout = 20000
    

    该值以毫秒为单位,默认值为10秒,这对我的解决方案来说似乎不够 .

相关问题