我正在使用boost ASIO作为TCP客户端 . 在大多数情况下,ASIO是一个美观的事件循环,用于读写 . 实际上只有一个客户端由ASIO管理 .
体系结构是这样的 - TCP服务器流连续消息 . 客户端将读取消息,处理消息并使用正确的代码恢复 .
我的代码在客户端运行 . 有一个运行io_service的线程 . io_service线程使用boost lockfree SPSC队列读取消息并将其分发到N个工作线程 . 处理后的工作人员将回复发布到io_service线程 .
对我来说最重要的问题是读写速度 . 所以我使用同步读写 .
阅读代码:
void read ()
{
if (_connected && !_readInProgress) {
_socket.async_read_some(boost::asio::null_buffers(),
make_boost_alloc_handler(_readAllocator,
[self = shared_from_this(), this] (ErrorType err, unsigned a)
{
connection()->handleRead(err);
_readInProgress = false;
if (err) disconnect();
else asyncRead();
});
_readInProgress = true;
}
}
基本上我使用read_some和nullbuffer()然后直接使用Unix系统调用来读取消息 . 读取给出N个消息,这些消息在循环中排队到线程 .
我想在反向使用boost SPSC队列来从worker中写入套接字 .
写:
// Get the queue to post writes
auto getWriteQ ()
{
static thread_local auto q =
std::make_shared< LFQType >(_epoch);
return q;
}
因此,每个线程使用getWriteQ获取线程本地Q.对队列的写入如下所示:
void write (Buf& buf) override
{
auto q = getWriteQ();
while (!q->enqueue(buf) && _connected);
if (!_connected) return;
_ioService.post( [self = shared_from_this(), this, q]()
{
writeHelper(q); });
}
}
现在这是无效的,因为我们为每次写入做了一个ioservice帖子 . 一次写入处理程序实际上使用sendmmsg()在单个系统调用中写入多达32条消息
所以我正在寻找两件事的帮助:
-
设计有什么好处吗?
-
任何最小化no的万无一失的方法 . 的帖子 . 我在考虑保持一个原子入队计数 . 工作线程将执行此操作 - 写入线程执行此操作 - (伪代码)
bool post = false;
if(enqueue_count == 0)post = true
//将消息排入队列
enqueue_count
if(post)//发布队列事件
io-service线程做到了这一点 -
enqueue_count -= num_processed;
if (enqueue_count)
// repost the queue for further processing
如果enqueue_count是原子的,这会工作吗?