首页 文章

修改的 生产环境 者消费算法

提问于
浏览
2

我正在修改 生产环境 者/消费者问题 . 然而,有竞争条件,我正在讨论最好的方法 . 可能有更清洁的方式,我想知道是否有人做过类似的事情,如果可能的话,可以分享更好的解决方案

它使用队列作为普通的 生产环境 者/消费者开始 . 一个 生产环境 者线程从磁盘读取项目并在共享队列上排队 . 然后,多个消费者线程尝试将项目出列以进行处理 . 但是每个项目都有一个必须与消费者标签匹配的标签(如线程ID) . 消费者线程查看队列的前面并检查项目的标记 . 如果它与消费者线程的标记不匹配,则消费者必须进入休眠状态并等待,直到队列的前面有一个与其标记匹配的项目 . 有点令人困惑,但下面的伪代码有希望澄清算法:

struct item
{
   // This is unique tag that only a specific consumer can consumer
    int    consumerTag; 
    // data for the consumer to consume
    void   *pData;
}

///////////////////////////////
// PRODUCER THREAD -> only 1
///////////////////////////////
// producer reads items
// each item has a tag to a specific consumer
while (item = read())
{
    lock(queue)
    if (queueNotFull)
    {
        enqueue(item);
    }
    else
    {
       // check front of the queue, notify worker.
       Sleep(); // Releases Queue Mutex upon entering
       // requires the mutex after it has been awaken
    }
    unlock(queue);
    wakeUpAllConsumers();
}
-------------------------------------------------------
///////////////////////////////
// CONSUMER THREAD -> many threads
///////////////////////////////
// my tag is it like at thread id,
// each consumer has a unique tag
myTag = getThreadTAG()
while (true)
{
    lock (queue);
    if (queueNotEmpty)
    {
        item = queueFront()
        if (myTag == item->id)
        {
           // this item is for me, let's dequeue and process
           item = dequeue();
           process();
        }
        else
        {
           // This is not for me let's go to sleep
           Sleep(); // Releases Queue Mutex
          // re-acquire mutex
        }
    }
    else
    {
        Sleep();    // Releases Queue Mutex
       // re-acquire mutex
    }

    unlock (queue);
    wakeUpProducer();
}

然而,上述算法存在问题 . 让我们考虑以下事件并假设:

item.tag = 1表示此项必须仅由具有相同标记的使用者使用 . 我将此表示为consumer.tag = 1

  • 生产环境 者读取 item.tag=1 并排队

  • 生产环境 者唤醒所有消费者线程( consumer.tag=1consumer.tag=2 等等......现在都清醒并检查队列的前面)

  • 制作人读取 item.tag=2 并入队

  • Producer唤醒所有消费者线程

  • 队列现在有 [item.tag=1, item.tag=2]

  • consumer.tag=2 wakes up and peek at the front of the queue, but item.tag = 1与 consumer.tag=1 不匹配;因此,它会入睡 . consumer.tag=2 现在正在睡觉 .

  • consumer.tag=1 醒来并查看队列的前面, item.tag=1consumer.tag=1 匹配 . 出列并通知 生产环境 者它可以消耗更多 .

  • 制作人完成阅读数据并退出 . 现在队列有 item.tag=2consumer.tag=2 正在休眠,从不消耗该数据 . 请注意,可能有很多消费者 . 所以最后许多消费者最终都会睡觉和排队

我想在生成器线程的末尾添加一个循环,它一直唤醒所有休眠线程,直到队列为空 .

// PRODUCER THREAD
// Process the rest
while (queueIsNotEmpty)
{
     WakeUpAllConsumer();
     Sleep();
}

但我相信必须有一种更优雅的方式来处理这个问题 . 任何想法让我知道

谢谢!

1 回答

  • 0

    我曾经遇到类似的事情(在一个所有线程都可以处理所有项目的设置中),以及我在那里使用的解决方案,尽管不是那么优雅,但是当制作人完成阅读数据时,最后一次唤醒所有人 .
    在这里,这不会真正起作用,因为如果队列中有第三个项目,那么该项目可能会被遗忘 . 我建议的是两种方式之一:

    • 每当消费者将项目排队时唤醒所有线程 . 这是我能想到的唯一方法,以确保不遗余力 . (只有在 isProducerFinishedReading == true 保存一个资源/时间时才能执行此模式 .

    • 重新设计系统有10个队列,然后当一个项目添加到队列 n 时,消费者线程 n 被唤醒 . 完成元素后,它会再次检查队列中是否有新项目要处理 . 在任何情况下, 生产环境 者应该在读取完成后检查所有队列的长度并唤醒相应的线程 .

    希望有所帮助 .

    EDIT:

    • 每次线程完成工作时,它应该再次检查队列,如果项目有"his",则它完成工作 . 如果一个线程可以唤醒其他线程,那么它应该唤醒相应的线程 .

相关问题