我正在使用boost ASIO作为TCP客户端 . 在大多数情况下,ASIO是一个美观的事件循环,用于读写 . 实际上只有一个客户端由ASIO管理 .

体系结构是这样的 - TCP服务器流连续消息 . 客户端将读取消息,处理消息并使用正确的代码恢复 .

我的代码在客户端运行 . 有一个运行io_service的线程 . io_service线程使用boost lockfree SPSC队列读取消息并将其分发到N个工作线程 . 处理后的工作人员将回复发布到io_service线程 .

对我来说最重要的问题是读写速度 . 所以我使用同步读写 .

阅读代码:

void read ()
 {

     if (_connected && !_readInProgress) {
         _socket.async_read_some(boost::asio::null_buffers(),
                 make_boost_alloc_handler(_readAllocator,
                     [self = shared_from_this(), this] (ErrorType err, unsigned a)
                     {
                          connection()->handleRead(err);
                          _readInProgress = false;

                          if (err) disconnect();
                          else asyncRead();

                     });
         _readInProgress = true;
     }
 }

基本上我使用read_some和nullbuffer()然后直接使用Unix系统调用来读取消息 . 读取给出N个消息,这些消息在循环中排队到线程 .

我想在反向使用boost SPSC队列来从worker中写入套接字 .

写:

// Get the queue to post writes
 auto getWriteQ ()
 {
     static thread_local auto q = 
         std::make_shared< LFQType >(_epoch);
     return q;
 }

因此,每个线程使用getWriteQ获取线程本地Q.对队列的写入如下所示:

void write (Buf& buf) override
 {
     auto q = getWriteQ();

     while (!q->enqueue(buf) && _connected);

     if (!_connected) return;

      _ioService.post( [self = shared_from_this(), this, q]() 
      {
           writeHelper(q); });
      }
}

现在这是无效的,因为我们为每次写入做了一个ioservice帖子 . 一次写入处理程序实际上使用sendmmsg()在单个系统调用中写入多达32条消息

所以我正在寻找两件事的帮助:

  • 设计有什么好处吗?

  • 任何最小化no的万无一失的方法 . 的帖子 . 我在考虑保持一个原子入队计数 . 工作线程将执行此操作 - 写入线程执行此操作 - (伪代码)

bool post = false;

if(enqueue_count == 0)post = true

//将消息排入队列

enqueue_count

if(post)//发布队列事件

io-service线程做到了这一点 -

enqueue_count -= num_processed;
if (enqueue_count) 
// repost the queue for further processing

如果enqueue_count是原子的,这会工作吗?