首页 文章

为什么使用队列作为 生产环境 者和消费者之间的通信缓冲区?

提问于
浏览
0

我看到许多使用队列作为共享数据缓冲区的 生产环境 者 - 消费者模式的描述和示例 . 使用队列而不是单个元素数据缓冲区的原因是什么?

在每个 生产环境 者 - 消费者实现中, 生产环境 者将比消费者处理数据更快地生成数据,或者 生产环境 者将生成比消费者处理数据更慢的数据 . 这意味着队列最终将填充到其最大容量,或者在消费者使用该数据元素之前它将仅包含一个数据元素 . 一旦队列已满, 生产环境 者将需要等待消费者才能添加另一个数据元素 . 当队列只包含一个元素时,消费者将需要等待 生产环境 者才能消耗另一个元素 . 换句话说,队列的行为类似于单个元素缓冲区 .

与处理单个数据元素相比,队列操作需要更多的处理开销和程序复杂性 . 应该有某种优势来证明增加的复杂性和开销 .

生产环境 者 - 消费者模式有两个广泛的子模式 . 第一个是同步的 生产环境 者 - 消费者模式 . 这种模式确保消费者消费消费者产生的每个数据元素,并且仅消费一次 . 上面讨论了同步模式 . 第二个是抽样消费者 .

抽样消费者仅消费 生产环境 者生成的数据元素的样本 . 如果消费者比 生产环境 者 生产环境 数据元素更快地消费数据元素,则消费者被称为过度采样缓冲区 . 如果消费者使用比 生产环境 者生成数据元素更慢的数据元素,则消费者被称为对数据元素进行欠采样 . 欠采样设计的一个例子是气象站,它以100Hz(每秒100次)的速率产生温度读数,同时以1Hz(每秒1次)的速率产生报告 . 消费者不需要每秒读取100个数据点并产生这些读数的平均值 . 相反,它可以每秒读取1次并报告该读数 . 如果消费者仅对1%的读数进行采样,则无需在缓冲区中提供数据元素队列 .

有界队列通常用于控制内存消耗 . 无界队列总是有可能耗尽堆内存,这可能导致应用程序失败 .

当有许多 生产环境 者时,有界的多元素队列似乎是个好主意 . 如果许多 生产环境 者生成的所有数据都需要以完全相同的方式处理,那么将所有数据写入公共缓冲区以便一个或多个消费者可以处理数据可能是合理的 . 与多元素队列相关联的成本很高 . 多元素队列使用的内存速率大约等于每个数据元素的大小乘以有界队列中的元素数 . 多元素队列需要比单个元素队列更复杂的逻辑 . 复杂性始终是正确性的敌人 .

当使用多元素队列时,使用者总是读取队列中最长时间的数据元素 . 毕竟,队列是FIFO数据结构 . 生产环境 者将新元素写入队列,直到队列填满 . 此时,生成器等待队列上的空间,或者它们覆盖队列中的最新元素 . 无论 生产环境 者在做什么,消费者仍然会读取队列中最旧的元素 .

当队列填充队列的行为时,就像单个元素队列的行为一样 . 在阻塞生成器/消费者模式中,完整队列强制 生产环境 者等待消费者从队列中读取 . 单个元素队列始终为full或empty . 完整的多元素队列只包含许多等待消费的数据元素,以及一个有资格消费的元素 .

单个元素队列简单地消除了队列中等待符合条件的数据的时间 .

在模式的消费者方面,消费者只能读取队列中最旧的元素,无论队列中有多少元素 . 消费者可以使用元素,也可以不使用元素 . 队列的大小对于消费者是不可见的 . 如果队列为空,则消费者必须暂停等待可用数据,或者它必须主动为队列提取可用数据 . 对于采样队列,如上所述,使用者可以简单地处理队列中最旧的值,并且队列从不将自身标记为空 .

使用多元素队列实现采样使用者非常困难 . 单个元素队列就可以了 . 生产环境 者只需覆盖队列中的任何内容,消费者就会读取队列中的任何内容 .

以下是用Ada编写的抽样 生产环境 者/消费者模式的示例 .

------------------------------------------------------------------
-- Sampling Consumer --
------------------------------------------------------------------
with Ada.Text_IO; use Ada.Text_IO;

procedure Sampling_PC is
   protected Buf is
      procedure Write(Item : in Integer);
      function Read return Integer;
      procedure Set_Done;
      function Get_Done return Boolean;
   private
      Value : Integer := Integer'First;
      Is_Done : Boolean := False;
   end Buf;

   protected body Buf is
      procedure Write(Item : in Integer) is
      begin
         Value := Item;
      end Write;
      function Read return Integer is
      begin
         return Value;
      end Read;
      procedure Set_Done is
      begin
         Is_Done := True;
      end Set_Done;
      function Get_Done return Boolean is
      begin
         return Is_Done;
      end Get_Done;
   end Buf;

   task Consumer;

   task body Consumer is
   begin
      while not Buf.Get_Done loop
         Put_Line("Consumer read" & Integer'Image(Buf.Read));
      end loop;
   end Consumer;

begin
   for I in 1..10 loop
      Put_Line("Producer writing" & Integer'Image(I));
      Buf.Write(I);
   end loop;
   Buf.Set_Done;
end Sampling_PC;

对于那些不熟悉Ada任务的人来说,可能需要一些解释 . 在上面的示例中,Buf是受保护的对象 . 在Ada术语中,受保护对象是在任务之间用作共享缓冲区的对象(类似于线程) . 每个缓冲区都实现了访问其内部数据元素的方法 . 方法的种类是过程,条目和功能 . 过程具有对受保护对象的无条件读/写访问权限 . 每个过程都自动操作受保护对象的读/写锁 . 一个条目非常类似于一个过程,除了它添加一个控制条件,就像线程中的条件变量一样 . 条目不仅实现读/写锁定以防止多个同时写入器和重叠读/写操作,还为等待条件变为TRUE的任务实现队列 . 受保护对象的函数提供对受保护对象的只读访问 . 函数自动操作共享读锁,以便多个任务可以同时从受保护对象读取 . 同时读取不会破坏受保护对象 .

上面的示例仅使用过程和函数 . Consumer任务通过使用函数读取受保护对象,而 生产环境 者(在这种情况下是程序主要任务)使用过程写入受保护对象 .

使用多个 生产环境 者和多个消费者的 生产环境 者 - 消费者模式的一个例子是:

------------------------------------------------------------------
-- Multiple producers and consumers sharing the same buffer --
------------------------------------------------------------------
with Ada.Text_IO; use Ada.Text_Io;

procedure N_Prod_Con is
   protected Buffer is
      Entry Write(Item : in Integer);
      Entry Read(Item : Out Integer);
   private
      Value  : Integer := Integer'Last;
      Is_New : Boolean := False;
   end Buffer;
   protected body Buffer is
      Entry Write(Item : in Integer) when not Is_New is
      begin
         Value := Item;
         Is_New := True;
      end Write;
      Entry Read(Item : out Integer) when Is_New is
      begin
         Item := Value;
         Is_New := False;
      end Read;
   end Buffer;

   task type Producers(Id : Positive) is
      Entry Stop;
   end Producers;
   task body Producers is
      Num : Positive := 1;
   begin
      loop
         select
            accept Stop;
            exit;
         or
            delay 0.0001;
         end select;
         Put_Line("Producer" & Integer'Image(Id) & " writing" & Integer'Image(Num));
         Buffer.Write(Num);
         Num := Num + 1;
      end loop;
   end Producers;

   task type Consumers(Id : Positive) is
      Entry Stop;
   end Consumers;

   task body Consumers is
      Num : Integer;
   begin
      loop
         select
            accept stop;
            exit;
         or
            delay 0.0001;
         end select;
         Buffer.Read(Num);
         Put_Line("Consumer" & Integer'Image(ID) & " read" & Integer'Image(Num));
      end loop;
   end Consumers;
   P1 : Producers(1);
   P2 : Producers(2);
   P3 : Producers(3);
   C1 : Consumers(1);
   C2 : Consumers(2);
   C3 : Consumers(3);
begin
   delay 0.2;
   P1.Stop;
   P2.Stop;
   P3.Stop;
   C1.Stop;
   C2.Stop;
   C3.Stop;
end N_Prod_Con;

1 回答

  • 2

    在每个 生产环境 者 - 消费者实施中, 生产环境 者将比消费者处理数据更快地生成数据,或者 生产环境 者将生成比消费者处理数据更慢的数据 . 这意味着队列最终将填充到其最大容量,或者在消费者使用该数据元素之前它将仅包含一个数据元素 .

    这假设 生产环境 者以恒定的速度 生产环境 ,消费者以恒定的速度消费 . 它还假设消费者的数量是固定的,例如,线程池实现不一定是这种情况,其可以响应于负载而改变线程的数量 .

    一旦我们放松了所有这些假设,拥有一个队列就有意义了 . 除此之外,它还允许系统优雅地处理需求中的临时峰值 .

    这并不是说容量为1的队列(阻塞或覆盖)没有其用途 . 在某些情况下,这正是所需要的 .

相关问题