首页 文章

std :: condition_variable - 等待多个线程通知观察者

提问于
浏览
0

我的问题看起来像这样:

我有一个观察者,它持有一个std :: condition_variable和一个std :: mutex,我的工作线程对象有一个指向观察者的指针 . 每次工作线程完成其工作时,它调用m_Observer-> NotifyOne(),然后调用condition_variable的notify_one()函数 . 现在我想要做的是,启动一堆工作线程,每个线程具有不同的工作和不同的(独立的)数据,并等待所有这些线程发信号(使用m_Observer-> NotifyOne())观察者,以便我能够根据所有工作线程的结果继续工作 .

我的观察者看起来像这样:

class IAsyncObserver
{
private:
    std::condition_variable m_ObserverCV;
    bool m_bNotified;
    std::mutex m_Mutex;

public:
    IAsyncObserver()
    {
        m_bNotified = false;
    }

    ~IAsyncObserver()
    {
        /*m_bNotified = true;
        m_ObserverCV.notify_all();*/
    }

    void NotifyOne()
    {
        std::unique_lock<std::mutex> Lock(m_Mutex);
        m_bNotified = true;
        m_ObserverCV.notify_one();
    }

    void NotifyAll()
    {
        std::unique_lock<std::mutex> Lock(m_Mutex);
        m_bNotified = true;
        m_ObserverCV.notify_all();
    }

    void WaitForNotifications(uint32_t _uNumOfNotifications = 1)
    {
        uint32_t uNotifyCount = 0;
        while (uNotifyCount < _uNumOfNotifications)
        {
            std::unique_lock<std::mutex> Lock(m_Mutex);
            m_bNotified = false;
            m_ObserverCV.wait(Lock);

            if (m_bNotified)
                ++uNotifyCount;
        }
    }

}; // IAsyncObserver

其中_uNumOfNotifications是我想要等待的工作线程数 .

现在每个工作线程都应该运行一个模拟函数,该函数执行一次/秒数据垃圾的实际工作,然后暂停/等待,直到观察者通知 Worker 继续 .

worker的线程函数可能如下所示:

do{
    //suspend simulation
    while (m_PauseSimulation.load())
    {
        std::unique_lock<std::mutex> wait(m_WaitMutex);
        m_CV.wait(wait);
        if (m_RunSimulation.load() == false)
        {
            SignalObserver();
            return;
        }
    }

    //lock the data while simulating
    {
        std::lock_guard<std::mutex> lock(m_LockMutex);

        //update simulation 
        Simulate(static_cast<float>(m_fDeltaTime));

        m_PauseSimulation.store(true);
    }

    //notify the physics manager that work is done here
    SignalObserver();       

} while (m_RunSimulation.load());

SignalObserver()只调用m_Observer-> NotifyOne() .

现在的问题是,经过一段时间后,线程会在某处遇到死锁,并且观察者不会通知他们继续下一个时间步骤 . 问题可能在WaitForNotifications()函数中的某个地方,但我不确定 . Atm我只有一个工作线程,所以uNumOfNotifications = 1,但它仍然遇到问题,它在m_ObserverCV.wait(Lock)和m_CV.wait(等待)等待,我甚至不确定它是否真的是一个死锁或什么的与condition_variable,因为我试图从几个线程访问它 .

在这一点上,我想引用Ned Flanders的父亲的话:“我们什么都没做,而且都是出于想法!”

谢谢你的帮助 . 我很感激 .

法比安

编辑:

感谢所有有用的信息和建议 . 我最终实现了迈克尔的第二个想法,因为我没有找到关于std :: barrier的任何信息 . 所以这就是我所做的:

class IAsyncObserver
{
private:
    std::condition_variable m_ObserverCV;
    bool m_bNotified;
    std::mutex m_Mutex;

    uint32_t m_uNumOfNotifications;
    uint32_t m_uNotificationCount;

public:
    IAsyncObserver()
    {
        m_bNotified = false;
        m_uNumOfNotifications = 0;
        m_uNotificationCount = 0;
    }

    ~IAsyncObserver()
    {
        /*m_bNotified = true;
        m_ObserverCV.notify_all();*/
    }

    void SetBarrier(uint32_t _uNumOfNotifications = 1)
    {
        m_uNumOfNotifications = _uNumOfNotifications;
    }

    void NotifyBarrier()
    {
        std::unique_lock<std::mutex> Lock(m_Mutex);
        if (++m_uNotificationCount >= m_uNumOfNotifications)
        {
            m_bNotified = true;
            m_ObserverCV.notify_one();
        }
    }

    void WaitForNotifications()
    {
        std::unique_lock<std::mutex> Lock(m_Mutex);
        while (m_bNotified == false)
        {
            m_ObserverCV.wait(Lock);
        }
        m_uNotificationCount = 0;
    }

    void NotifyOne()
    {
        std::unique_lock<std::mutex> Lock(m_Mutex);
        m_bNotified = true;
        m_ObserverCV.notify_one();
    }

    void NotifyAll()
    {
        std::unique_lock<std::mutex> Lock(m_Mutex);
        m_bNotified = true;
        m_ObserverCV.notify_all();
    }

}; // IAsyncObserver

在我的“主要”功能中:MassSpringSystem和RigidBodySystem是我的工作人员

//update systems here:
    {
        SetBarrier(m_uTotalNotifyCount);

        {   //start MassSpringSystems
            std::lock_guard<std::mutex> lock(m_LockMutex);
            for (std::shared_ptr<MassSpringSystem> MSS : m_MassSpringSystems)
            {
                MSS->SetDeltaTime(fDeltaTime);
                MSS->Continue();
            }
        }

        //ATTENTION this system works directly on the m_OctreeEntities!
        {   //start RigidBodySystems
            m_RigidBodySystem.SetDeltaTime(fDeltaTime);
            m_RigidBodySystem.AddData(m_RigidBodies);
            m_RigidBodySystem.Continue();
        }

        //wait for all systems to finish -> till they call SignalObserver
        WaitForNotifications();
    }

并且在上面的工作者的线程函数中,但这次SignalObserver调用NotifyBarrier()

现在一切都很好 . 一个简单而强大的解决方案,谢谢!

1 回答

  • 2

    您尝试以不打算使用的方式使用条件变量 - 在这种情况下,您假设您可以计算通知 . 你不能 . 您可能会因此失去通知,并且您正在计算标准允许的虚假唤醒 .

    相反,您应该使用在互斥锁下递增的计数器,并仅在计数器达到工作人员数量时发出条件变量的信号 . (最后在每个 Worker 中这样做) . 主线程在条件变量上保持休眠状态,直到计数器达到预期值 . (当然,必须使用您用于递增的互斥锁来验证计数器) . 根据我的意见,用原子替换互斥计数器(不要将其静音)似乎是不可能的,因为你无法原子地检查计数器并睡在condvar上,所以你将获得一个竞争条件而不会使计数器静音 .

    从boost线程中获知的另一个同步原语是屏障,它没有进入C 11.你构造了一个屏障,并将工作线程的数量加上一个作为构造函数参数传递给它 . 所有工作线程都应该在结束时等待条件变量,主线程应该在构造工作线程之后等待 . 所有线程都将阻塞该障碍,直到所有工作线程和主线程都阻塞,并且此时将被释放 . 因此,如果主线程被释放,您就知道所有工作者都已完成 . 这有一个问题:没有工作线程完成(并释放相关的管理资源),直到所有工作线程都被完成,这可能是也可能不是你的问题 . This question presents an implementation of boost::barrier using C++11 threading facilities.

相关问题