首页 文章

boost :: asio async_receive_from线程之间共享的UDP endpoints ?

提问于
浏览
9

Boost asio特别允许多个线程在io_service上调用run()方法 . 这似乎是创建多线程UDP服务器的好方法 . 然而,我遇到了一个障碍,我正在努力寻找答案 .

查看典型的async_receive_from调用:

m_socket->async_receive_from(
        boost::asio::buffer(m_recv_buffer),
        m_remote_endpoint,
        boost::bind(
            &udp_server::handle_receive,
            this,
            boost::asio::placeholders::error,
            boost::asio::placeholders::bytes_transferred));

远程 endpoints 和消息缓冲区不会传递给处理程序,但是处于更高的作用域级别(在我的示例中为成员变量) . UDP消息到达时处理UDP消息的代码如下所示:

void dns_server::handle_receive(const boost::system::error_code &error, std::size_t size)
{
    // process message
    blah(m_recv_buffer, size);

    // send something back
    respond(m_remote_endpoint);
}

如果有多个线程在运行,那么同步如何工作?在线程之间共享一个 endpoints 和接收缓冲区意味着在同时消息到达的情况下,asio在另一个线程中调用处理程序之前等待处理程序在单个线程内完成 . 这似乎否定了允许多个线程首先调用run的观点 .

如果我想获得并发服务的请求,看起来我需要将工作包和 endpoints 的副本一起交给一个单独的线程,允许处理程序方法立即返回,以便asio可以继续并通过另一个与另一个调用run()的线程并行的消息 .

这看起来有点令人讨厌 . 我在这里错过了什么?

1 回答

  • 4

    在线程之间共享一个 endpoints 和接收缓冲区意味着asio等待处理程序在单个线程内完成

    如果您的意思是“在使用单个线程运行服务时”,那么这是正确的 .

    否则,情况并非如此 . 相反,当您同时调用单个服务对象(即套接字,而不是io_service)上的操作时,Asio只会说行为是“未定义的” .

    这似乎否定了允许多个线程首先调用run的观点 .

    除非处理需要相当长的时间 .

    Timer.5 sample 介绍的第一段似乎是对您主题的一个很好的阐述 .

    会话

    要分离特定于请求的数据(缓冲区和 endpoints ),您需要一些会话概念 . Asio中的一种流行机制是绑定 shared_ptr 或者来自此会话类的共享(boost bind支持直接绑定到boost :: shared_ptr实例) .

    Strand

    为了避免对 m_socket 成员的并发,非同步访问,您可以添加锁或使用上面链接的Timer.5示例中记录的 strand 方法 .

    演示

    这里供您享受 Daytime.6 异步UDP日间服务器,经过修改后可与许多服务IO线程配合使用 .

    请注意,从逻辑上讲,仍然只有一个IO线程( strand ),所以我们没有记录线程安全性 .

    但是,与官方样本不同,响应可能会按顺序排队,具体取决于 udp_session::handle_request 中实际处理所花费的时间 .

    请注意

    • a udp_session 类用于保存每个请求的缓冲区和远程 endpoints

    • 一个线程池,它能够在多个内核上扩展实际处理(而不是IO)的负载 .

    #include <ctime>
    #include <iostream>
    #include <string>
    #include <boost/array.hpp>
    #include <boost/bind.hpp>
    #include <boost/shared_ptr.hpp>
    #include <boost/enable_shared_from_this.hpp>
    #include <boost/make_shared.hpp>
    #include <boost/asio.hpp>
    #include <boost/thread.hpp>
    
    using namespace boost;
    using asio::ip::udp;
    using system::error_code;
    
    std::string make_daytime_string()
    {
        using namespace std; // For time_t, time and ctime;
        time_t now = time(0);
        return ctime(&now);
    }
    
    class udp_server; // forward declaration
    
    struct udp_session : enable_shared_from_this<udp_session> {
    
        udp_session(udp_server* server) : server_(server) {}
    
        void handle_request(const error_code& error);
    
        void handle_sent(const error_code& ec, std::size_t) {
            // here response has been sent
            if (ec) {
                std::cout << "Error sending response to " << remote_endpoint_ << ": " << ec.message() << "\n";
            }
        }
    
        udp::endpoint remote_endpoint_;
        array<char, 100> recv_buffer_;
        std::string message;
        udp_server* server_;
    };
    
    class udp_server
    {
        typedef shared_ptr<udp_session> shared_session;
      public:
        udp_server(asio::io_service& io_service)
            : socket_(io_service, udp::endpoint(udp::v4(), 1313)), 
              strand_(io_service)
        {
            receive_session();
        }
    
      private:
        void receive_session()
        {
            // our session to hold the buffer + endpoint
            auto session = make_shared<udp_session>(this);
    
            socket_.async_receive_from(
                    asio::buffer(session->recv_buffer_), 
                    session->remote_endpoint_,
                    strand_.wrap(
                        bind(&udp_server::handle_receive, this,
                            session, // keep-alive of buffer/endpoint
                            asio::placeholders::error,
                            asio::placeholders::bytes_transferred)));
        }
    
        void handle_receive(shared_session session, const error_code& ec, std::size_t /*bytes_transferred*/) {
            // now, handle the current session on any available pool thread
            socket_.get_io_service().post(bind(&udp_session::handle_request, session, ec));
    
            // immediately accept new datagrams
            receive_session();
        }
    
        void enqueue_response(shared_session const& session) {
            socket_.async_send_to(asio::buffer(session->message), session->remote_endpoint_,
                    strand_.wrap(bind(&udp_session::handle_sent, 
                            session, // keep-alive of buffer/endpoint
                            asio::placeholders::error,
                            asio::placeholders::bytes_transferred)));
        }
    
        udp::socket  socket_;
        asio::strand strand_;
    
        friend struct udp_session;
    };
    
    void udp_session::handle_request(const error_code& error)
    {
        if (!error || error == asio::error::message_size)
        {
            message = make_daytime_string(); // let's assume this might be slow
    
            // let the server coordinate actual IO
            server_->enqueue_response(shared_from_this());
        }
    }
    
    int main()
    {
        try {
            asio::io_service io_service;
            udp_server server(io_service);
    
            thread_group group;
            for (unsigned i = 0; i < thread::hardware_concurrency(); ++i)
                group.create_thread(bind(&asio::io_service::run, ref(io_service)));
    
            group.join_all();
        }
        catch (std::exception& e) {
            std::cerr << e.what() << std::endl;
        }
    }
    

    结束思考

    有趣的是,在大多数情况下,您会看到单线程版本的表现同样出色,并且没有理由使设计复杂化 .

    或者,您可以使用专用于IO的单线程 io_service ,并使用旧式工作池对请求进行后台处理(如果这确实是CPU密集型部分) . 首先,这简化了设计,其次,这可以提高IO任务的吞吐量,因为不再需要协调发布在链上的任务 .

相关问题