我的问题看起来像这样:
我有一个观察者,它持有一个std :: condition_variable和一个std :: mutex,我的工作线程对象有一个指向观察者的指针 . 每次工作线程完成其工作时,它调用m_Observer-> NotifyOne(),然后调用condition_variable的notify_one()函数 . 现在我想要做的是,启动一堆工作线程,每个线程具有不同的工作和不同的(独立的)数据,并等待所有这些线程发信号(使用m_Observer-> NotifyOne())观察者,以便我能够根据所有工作线程的结果继续工作 .
我的观察者看起来像这样:
class IAsyncObserver
{
private:
std::condition_variable m_ObserverCV;
bool m_bNotified;
std::mutex m_Mutex;
public:
IAsyncObserver()
{
m_bNotified = false;
}
~IAsyncObserver()
{
/*m_bNotified = true;
m_ObserverCV.notify_all();*/
}
void NotifyOne()
{
std::unique_lock<std::mutex> Lock(m_Mutex);
m_bNotified = true;
m_ObserverCV.notify_one();
}
void NotifyAll()
{
std::unique_lock<std::mutex> Lock(m_Mutex);
m_bNotified = true;
m_ObserverCV.notify_all();
}
void WaitForNotifications(uint32_t _uNumOfNotifications = 1)
{
uint32_t uNotifyCount = 0;
while (uNotifyCount < _uNumOfNotifications)
{
std::unique_lock<std::mutex> Lock(m_Mutex);
m_bNotified = false;
m_ObserverCV.wait(Lock);
if (m_bNotified)
++uNotifyCount;
}
}
}; // IAsyncObserver
其中_uNumOfNotifications是我想要等待的工作线程数 .
现在每个工作线程都应该运行一个模拟函数,该函数执行一次/秒数据垃圾的实际工作,然后暂停/等待,直到观察者通知 Worker 继续 .
worker的线程函数可能如下所示:
do{
//suspend simulation
while (m_PauseSimulation.load())
{
std::unique_lock<std::mutex> wait(m_WaitMutex);
m_CV.wait(wait);
if (m_RunSimulation.load() == false)
{
SignalObserver();
return;
}
}
//lock the data while simulating
{
std::lock_guard<std::mutex> lock(m_LockMutex);
//update simulation
Simulate(static_cast<float>(m_fDeltaTime));
m_PauseSimulation.store(true);
}
//notify the physics manager that work is done here
SignalObserver();
} while (m_RunSimulation.load());
SignalObserver()只调用m_Observer-> NotifyOne() .
现在的问题是,经过一段时间后,线程会在某处遇到死锁,并且观察者不会通知他们继续下一个时间步骤 . 问题可能在WaitForNotifications()函数中的某个地方,但我不确定 . Atm我只有一个工作线程,所以uNumOfNotifications = 1,但它仍然遇到问题,它在m_ObserverCV.wait(Lock)和m_CV.wait(等待)等待,我甚至不确定它是否真的是一个死锁或什么的与condition_variable,因为我试图从几个线程访问它 .
在这一点上,我想引用Ned Flanders的父亲的话:“我们什么都没做,而且都是出于想法!”
谢谢你的帮助 . 我很感激 .
法比安
编辑:
感谢所有有用的信息和建议 . 我最终实现了迈克尔的第二个想法,因为我没有找到关于std :: barrier的任何信息 . 所以这就是我所做的:
class IAsyncObserver
{
private:
std::condition_variable m_ObserverCV;
bool m_bNotified;
std::mutex m_Mutex;
uint32_t m_uNumOfNotifications;
uint32_t m_uNotificationCount;
public:
IAsyncObserver()
{
m_bNotified = false;
m_uNumOfNotifications = 0;
m_uNotificationCount = 0;
}
~IAsyncObserver()
{
/*m_bNotified = true;
m_ObserverCV.notify_all();*/
}
void SetBarrier(uint32_t _uNumOfNotifications = 1)
{
m_uNumOfNotifications = _uNumOfNotifications;
}
void NotifyBarrier()
{
std::unique_lock<std::mutex> Lock(m_Mutex);
if (++m_uNotificationCount >= m_uNumOfNotifications)
{
m_bNotified = true;
m_ObserverCV.notify_one();
}
}
void WaitForNotifications()
{
std::unique_lock<std::mutex> Lock(m_Mutex);
while (m_bNotified == false)
{
m_ObserverCV.wait(Lock);
}
m_uNotificationCount = 0;
}
void NotifyOne()
{
std::unique_lock<std::mutex> Lock(m_Mutex);
m_bNotified = true;
m_ObserverCV.notify_one();
}
void NotifyAll()
{
std::unique_lock<std::mutex> Lock(m_Mutex);
m_bNotified = true;
m_ObserverCV.notify_all();
}
}; // IAsyncObserver
在我的“主要”功能中:MassSpringSystem和RigidBodySystem是我的工作人员
//update systems here:
{
SetBarrier(m_uTotalNotifyCount);
{ //start MassSpringSystems
std::lock_guard<std::mutex> lock(m_LockMutex);
for (std::shared_ptr<MassSpringSystem> MSS : m_MassSpringSystems)
{
MSS->SetDeltaTime(fDeltaTime);
MSS->Continue();
}
}
//ATTENTION this system works directly on the m_OctreeEntities!
{ //start RigidBodySystems
m_RigidBodySystem.SetDeltaTime(fDeltaTime);
m_RigidBodySystem.AddData(m_RigidBodies);
m_RigidBodySystem.Continue();
}
//wait for all systems to finish -> till they call SignalObserver
WaitForNotifications();
}
并且在上面的工作者的线程函数中,但这次SignalObserver调用NotifyBarrier()
现在一切都很好 . 一个简单而强大的解决方案,谢谢!
1 回答
您尝试以不打算使用的方式使用条件变量 - 在这种情况下,您假设您可以计算通知 . 你不能 . 您可能会因此失去通知,并且您正在计算标准允许的虚假唤醒 .
相反,您应该使用在互斥锁下递增的计数器,并仅在计数器达到工作人员数量时发出条件变量的信号 . (最后在每个 Worker 中这样做) . 主线程在条件变量上保持休眠状态,直到计数器达到预期值 . (当然,必须使用您用于递增的互斥锁来验证计数器) . 根据我的意见,用原子替换互斥计数器(不要将其静音)似乎是不可能的,因为你无法原子地检查计数器并睡在condvar上,所以你将获得一个竞争条件而不会使计数器静音 .
从boost线程中获知的另一个同步原语是屏障,它没有进入C 11.你构造了一个屏障,并将工作线程的数量加上一个作为构造函数参数传递给它 . 所有工作线程都应该在结束时等待条件变量,主线程应该在构造工作线程之后等待 . 所有线程都将阻塞该障碍,直到所有工作线程和主线程都阻塞,并且此时将被释放 . 因此,如果主线程被释放,您就知道所有工作者都已完成 . 这有一个问题:没有工作线程完成(并释放相关的管理资源),直到所有工作线程都被完成,这可能是也可能不是你的问题 . This question presents an implementation of boost::barrier using C++11 threading facilities.