首页 文章

线程构建块concurrent_bounded_queue - 我如何“关闭”它?

提问于
浏览
3

我使用 concurrent_bounded_queue 英特尔TBB 4.1 Update 3进行 生产环境 者和消费者线程之间的通信:

队列类有一个名为 abort 的方法,它将 tbb::user_abort 抛出到阻塞队列实例的 poppush 的所有线程 . 两个线程之间的通信可能如下所示:

ConsThread | ProdThread
-----------+-------------
q.pop      |  get new data
(wait)     |  q.push
process    |  get new data
q.pop      |  no more data!
(wait)     |  q.abort
quit       |  quit

不幸的是,即使在这个简单的例子中我也无法使用它来可靠地关闭队列,因为如果某些消费者在调用 abort 之前没有完成处理之前的数据,他们将完成迭代并返回阻止 pop

ConsThread | ProdThread
-----------+-------------
q.pop      |  get new data
(wait)     |  q.push
process    |  get new data
process    |  no more data!
process    |  q.abort
process    |  quit
process    |
q.pop      |
(wait)     |
(wait)     |
(wait)     |
(so lonely)|

现在我正在使用一个中等恶心的黑客,它产生另一个非分离线程(加入消费者池线程)并等待它完成,同时为后来者发送更多 abort

bool areConsumerThreadsJoinedThankYou = false;
std::thread joiner(Joiner(consumerPool, &areConsumerThreadsJoinedThankYou));

while (!areConsumerThreadsJoinedThankYou) {
    rawQueue.abort();
    MAGIC_MSLEEP(100);
}

class Joiner 的实施非常多

void Joiner::operator()(void)
{
    for (auto it = this->m_threadPool.begin();
         it < this->m_threadPool.end();
         it++)
        (*it)->join();
    this->m_done = true;
    *(this->m_flag) = true;
}

这当然非常难看 . 有更基本的解决方案吗?

1 回答

  • 5

    创建指定的“EndOfData”项 . 如果您知道您有K个消费者,那么生成器在完成推送数据项后推送K“EndOfData”项 . 让每个消费者在弹出“EndOfData”项后退出 .

    如果事先不知道K,请让 生产环境 者推送一个“EndOfData”项 . 然后让每个弹出“EndOfData”项目的消费者在离开之前推送另一个“EndOfData”项目 . 完成所有消费者之后,将会有一个“EndOfData”项目,当队列被销毁时将被销毁 .

相关问题