首页 文章

C pthread阻塞队列死锁(我认为)

提问于
浏览
6

我遇到了pthreads的问题,我认为我遇到了僵局 . 我创建了一个我认为正在工作的阻塞队列,但在做了一些测试之后我发现如果我尝试取消阻塞在blocking_queue上的多个线程,我似乎陷入了僵局 .

阻塞队列非常简单,如下所示:

template <class T> class Blocking_Queue
{
public:
    Blocking_Queue()
    {
        pthread_mutex_init(&_lock, NULL);
        pthread_cond_init(&_cond, NULL);
    }

    ~Blocking_Queue()
    {
        pthread_mutex_destroy(&_lock);
        pthread_cond_destroy(&_cond);
    }

    void put(T t)
    {
        pthread_mutex_lock(&_lock);
        _queue.push(t);
        pthread_cond_signal(&_cond);
        pthread_mutex_unlock(&_lock);
    }

     T pull()
     {
        pthread_mutex_lock(&_lock);
        while(_queue.empty())
        {
            pthread_cond_wait(&_cond, &_lock);
        }

        T t = _queue.front();
        _queue.pop();

        pthread_mutex_unlock(&_lock);

        return t;
     }

priavte:
    std::queue<T> _queue;
    pthread_cond_t _cond;
    pthread_mutex_t _lock;
}

为了测试,我创建了4个线程来拉动这个阻塞队列 . 我向阻塞队列添加了一些print语句,每个线程都进入pthread_cond_wait()方法 . 但是,当我尝试在每个线程上调用pthread_cancel()和pthread_join()时,程序就会挂起 .

我还用一个线程对它进行了测试,它完美无缺 .

根据文档,pthread_cond_wait()是一个取消点,因此在这些线程上调用cancel会导致它们停止执行(这只适用于1个线程) . 但是pthread_mutex_lock不是取消点 . 可能会在调用pthread_cancel()时发生某些事情,取消的线程在终止之前获取互斥锁并且不解锁它,然后当下一个线程被取消时它无法获取互斥锁和死锁?或者还有别的我做错了 .

任何建议都很可爱 . 谢谢 :)

3 回答

  • 1

    我和pthread_cond_wait()/ pthread_cancel()有过类似的经历 . 由于某种原因,在线程返回后仍然存在锁定问题,并且无法解锁它,因为您必须在锁定的同一个线程中解锁 . 我在做pthread_mutex_destroy()时注意到了这些错误,因为我有一个 生产环境 者,单个消费者情况所以没有发生死锁 .

    pthread_cond_wait()应该在返回时锁定互斥锁,这可能已经发生了,但是由于我们强行取消了线程,最后的解锁没有通过 . 为了安全起见,我通常会尽量避免使用pthread_cancel(),因为有些平台甚至不支持这一点 . 您可以使用volatile bool或atomics并检查线程是否应该关闭 . 这样,互斥体也将被干净地处理 .

  • 1

    pthread_cancel() 最好避免 .

    您可以通过从那里抛出异常来解除阻塞阻塞在Blocking_Queue :: pull()上的所有线程 .

    队列中的一个弱点是 T t = _queue.front(); 调用可能引发异常的T的复制构造函数,使得队列互斥锁永久锁定 . 更好地使用C范围的锁 .

    以下是优雅线程终止的示例:

    $ cat test.cc
    #include <boost/thread/mutex.hpp>
    #include <boost/thread/thread.hpp>
    #include <boost/thread/condition_variable.hpp>
    #include <exception>
    #include <list>
    #include <stdio.h>
    
    struct BlockingQueueTerminate
        : std::exception
    {};
    
    template<class T>
    class BlockingQueue
    {
    private:
        boost::mutex mtx_;
        boost::condition_variable cnd_;
        std::list<T> q_;
        unsigned blocked_;
        bool stop_;
    
    public:
        BlockingQueue()
            : blocked_()
            , stop_()
        {}
    
        ~BlockingQueue()
        {
            this->stop(true);
        }
    
        void stop(bool wait)
        {
            // tell threads blocked on BlockingQueue::pull() to leave
            boost::mutex::scoped_lock lock(mtx_);
            stop_ = true;
            cnd_.notify_all();
    
            if(wait) // wait till all threads blocked on the queue leave BlockingQueue::pull()
                while(blocked_)
                    cnd_.wait(lock);
        }
    
        void put(T t)
        {
            boost::mutex::scoped_lock lock(mtx_);
            q_.push_back(t);
            cnd_.notify_one();
        }
    
        T pull()
        {
            boost::mutex::scoped_lock lock(mtx_);
    
            ++blocked_;
            while(!stop_ && q_.empty())
                cnd_.wait(lock);
            --blocked_;
    
            if(stop_) {
                cnd_.notify_all(); // tell stop() this thread has left
                throw BlockingQueueTerminate();
            }
    
            T front = q_.front();
            q_.pop_front();
            return front;
        }
    };
    
    void sleep_ms(unsigned ms)
    {
        // i am using old boost
        boost::thread::sleep(boost::get_system_time() + boost::posix_time::milliseconds(ms));
        // with latest one you can do this
        //boost::thread::sleep(boost::posix_time::milliseconds(10));
    }
    
    void thread(int n, BlockingQueue<int>* q)
    try
    {
        for(;;) {
            int m = q->pull();
            printf("thread %u: pulled %d\n", n, m);
            sleep_ms(10);
        }
    }
    catch(BlockingQueueTerminate&)
    {
        printf("thread %u: finished\n", n);
    }
    
    int main()
    {
        BlockingQueue<int> q;
    
        // create two threads
        boost::thread_group tg;
        tg.create_thread(boost::bind(thread, 1, &q));
        tg.create_thread(boost::bind(thread, 2, &q));
        for(int i = 1; i < 10; ++i)
            q.put(i);
        sleep_ms(100); // let the threads do something
        q.stop(false); // tell the threads to stop
        tg.join_all(); // wait till they stop
    }
    
    $ g++ -pthread -Wall -Wextra -o test -lboost_thread-mt test.cc
    
    $ ./test
    thread 2: pulled 1
    thread 1: pulled 2
    thread 1: pulled 3
    thread 2: pulled 4
    thread 1: pulled 5
    thread 2: pulled 6
    thread 1: pulled 7
    thread 2: pulled 8
    thread 1: pulled 9
    thread 2: finished
    thread 1: finished
    
  • 5

    我对pthread_cancel()并不熟悉 - 我更喜欢合作终止 .

    pthread_cancel()不会让你的互斥锁被锁定吗?我想你需要使用取消处理程序进行清理 .

相关问题