首页 文章

TThreadedQueue不具备多个消费者的能力?

提问于
浏览
43

尝试在单个 生产环境 者多个消费者方案中使用TThreadedQueue(Generics.Collections) . (DELPHI-XE) . 我们的想法是将对象推入队列,让几个工作线程排空队列 .

但它没有按预期工作 . 当两个或多个工作线程调用PopItem时,将从TThreadedQueue抛出访问冲突 .

如果使用临界区序列化对PopItem的调用,则一切正常 .

当然,TThreadedQueue应该能够处理多个消费者,所以我错过了什么,或者这是TThreadedQueue中的一个纯粹的错误?

这是一个产生错误的简单示例 .

program TestThreadedQueue;

{$APPTYPE CONSOLE}

uses
//  FastMM4 in '..\..\..\FastMM4\FastMM4.pas',
  Windows,
  Messages,
  Classes,
  SysUtils,
  SyncObjs,
  Generics.Collections;

type TThreadTaskMsg =
       class(TObject)
         private
           threadID  : integer;
           threadMsg : string;
         public
           Constructor Create( ID : integer; const msg : string);
       end;

type TThreadReader =
       class(TThread)
         private
           fPopQueue   : TThreadedQueue<TObject>;
           fSync       : TCriticalSection;
           fMsg        : TThreadTaskMsg;
           fException  : Exception;
           procedure DoSync;
           procedure DoHandleException;
         public
           Constructor Create( popQueue : TThreadedQueue<TObject>;
                               sync     : TCriticalSection);
           procedure Execute; override;
       end;

Constructor TThreadReader.Create( popQueue : TThreadedQueue<TObject>;
                                  sync     : TCriticalSection);
begin
  fPopQueue:=            popQueue;
  fMsg:=                 nil;
  fSync:=                sync;
  Self.FreeOnTerminate:= FALSE;
  fException:=           nil;

  Inherited Create( FALSE);
end;

procedure TThreadReader.DoSync ;
begin
  WriteLn(fMsg.threadMsg + ' ' + IntToStr(fMsg.threadId));
end;

procedure TThreadReader.DoHandleException;
begin
  WriteLn('Exception ->' + fException.Message);
end;

procedure TThreadReader.Execute;
var signal : TWaitResult;
begin
  NameThreadForDebugging('QueuePop worker');
  while not Terminated do
  begin
    try
      {- Calling PopItem can return empty without waittime !? Let other threads in by sleeping. }
      Sleep(20);
      {- Serializing calls to PopItem works }
      if Assigned(fSync) then fSync.Enter;
      try
        signal:= fPopQueue.PopItem( TObject(fMsg));
      finally
        if Assigned(fSync) then fSync.Release;
      end;
      if (signal = wrSignaled) then
      begin
        try
          if Assigned(fMsg) then
          begin
            fMsg.threadMsg:= '<Thread id :' +IntToStr( Self.threadId) + '>';
            fMsg.Free; // We are just dumping the message in this test
            //Synchronize( Self.DoSync);
            //PostMessage( fParentForm.Handle,WM_TestQueue_Message,Cardinal(fMsg),0);
          end;
        except
          on E:Exception do begin
          end;
        end;
      end;
      except
       FException:= Exception(ExceptObject);
      try
        if not (FException is EAbort) then
        begin
          {Synchronize(} DoHandleException; //);
        end;
      finally
        FException:= nil;
      end;
   end;
  end;
end;

Constructor TThreadTaskMsg.Create( ID : Integer; Const msg : string);
begin
  Inherited Create;

  threadID:= ID;
  threadMsg:= msg;
end;

var
    fSync : TCriticalSection;
    fThreadQueue : TThreadedQueue<TObject>;
    fReaderArr : array[1..4] of TThreadReader;
    i : integer;

begin
  try
    IsMultiThread:= TRUE;

    fSync:=        TCriticalSection.Create;
    fThreadQueue:= TThreadedQueue<TObject>.Create(1024,1,100);
    try
      {- Calling without fSync throws exceptions when two or more threads calls PopItem
         at the same time }
      WriteLn('Creating worker threads ...');
      for i:= 1 to 4 do fReaderArr[i]:= TThreadReader.Create( fThreadQueue,Nil);
      {- Calling with fSync works ! }
      //for i:= 1 to 4 do fReaderArr[i]:= TThreadReader.Create( fThreadQueue,fSync);
       WriteLn('Init done. Pushing items ...');

      for i:= 1 to 100 do fThreadQueue.PushItem( TThreadTaskMsg.Create( i,''));

      ReadLn;

    finally
      for i:= 1 to 4 do fReaderArr[i].Free;
      fThreadQueue.Free;
      fSync.Free;
    end;

  except
    on E: Exception do
      begin
        Writeln(E.ClassName, ': ', E.Message);
        ReadLn;
      end;
  end;
end.

Update :导致TThreadedQueue崩溃的TMonitor中的错误在Delphi XE2中得到修复 .

Update 2 :上面的测试强调队列处于空状态 . Darian Miller发现强调队列处于满状态,仍然可以重现XE2中的错误 . 错误再一次出现在TMonitor中 . 有关详细信息,请参阅下面的答案 . 还有QC101114的链接 .

Update 3 :随着Delphi-XE2更新4, TMonitor 已经宣布了一个可以解决 TThreadedQueue 中问题的修复程序 . 到目前为止,我的测试无法再现 TThreadedQueue 中的任何错误 . 当队列为空且已满时,测试单个 生产环境 者/多个使用者线程 . 还测试了多个 生产环境 者/多个消费者 . 我将读取器线程和写入器线程从1改为100,没有任何故障 . 但是知道历史,我敢于让别人打破 TMonitor .

5 回答

  • 4

    好吧,没有大量的测试很难确定,但在TThreadedQueue或TMonitor中看起来肯定是一个bug . 无论哪种方式,它都在RTL而不是您的代码中 . 您应该将其作为QC报告提交,并使用上面的示例作为“如何重现”代码 .

  • 19

    我建议你在使用线程,并行性等时使用OmniThreadLibrary http://www.thedelphigeek.com/search/label/OmniThreadLibrary.Primoz做得非常好,在网站上你会找到很多有用的文档 .

  • 10

    您的示例似乎在XE2下正常工作,但是如果我们填满您的队列,则它会在PushItem上失败 . (在XE2 Update1下测试)

    要重现,只需将任务创建从100增加到1100(您的队列深度设置为1024)

    for i:= 1 to 1100 do fThreadQueue.PushItem( TThreadTaskMsg.Create( i,''));
    

    每次在Windows 7上都会为我而死 . 我最初尝试不断推动压力测试它,它在第30循环失败...然后在第16循环...然后在第65循环,所以在不同的时间间隔但它在某些时候一直失败点 .

    iLoop := 0;
      while iLoop < 1000 do
      begin
        Inc(iLoop);
        WriteLn('Loop: ' + IntToStr(iLoop));  
        for i:= 1 to 100 do fThreadQueue.PushItem( TThreadTaskMsg.Create( i,''));
      end;
    
  • 3

    我找了TThreadedQueue类,但似乎没有在我的D2009中 . 我并不是真的会为此而自杀 - Delphi线程支持一直都是错误的...错误......'非最优'并且我怀疑TThreadedQueue没有什么不同:)

    为什么要将泛型用于P-C( 生产环境 者/消费者)对象?一个简单的TObjectQueue后代将会很好 - 几十年来一直使用它 - 适用于多个 生产环境 者/消费者:

    unit MinimalSemaphorePCqueue;
    
    { Absolutely minimal P-C queue based on TobjectQueue and a semaphore.
    
    The semaphore count reflects the queue count
    'push' will always succeed unless memory runs out, then you're stuft anyway.
    'pop' has a timeout parameter as well as the address of where any received
    object is to be put.
    'pop' returns immediately with 'true' if there is an object on the queue
    available for it.
    'pop' blocks the caller if the queue is empty and the timeout is not 0.
    'pop' returns false if the timeout is exceeded before an object is available
    from the queue.
    'pop' returns true if an object is available from the queue before the timeout
    is exceeded.
    If multiple threads have called 'pop' and are blocked because the queue is
    empty, a single 'push' will make only one of the waiting threads ready.
    
    
    Methods to push/pop from the queue
    A 'semaHandle' property that can be used in a 'waitForMultipleObjects' call.
    When the handle is signaled, the 'peek' method will retrieve the queued object.
    }
    interface
    
    uses
      Windows, Messages, SysUtils, Classes,syncObjs,contnrs;
    
    
    type
    
    pObject=^Tobject;
    
    
    TsemaphoreMailbox=class(TobjectQueue)
    private
      countSema:Thandle;
    protected
      access:TcriticalSection;
    public
      property semaHandle:Thandle read countSema;
      constructor create; virtual;
      procedure push(aObject:Tobject); virtual;
      function pop(pResObject:pObject;timeout:DWORD):boolean;  virtual;
      function peek(pResObject:pObject):boolean;  virtual;
      destructor destroy; override;
    end;
    
    
    implementation
    
    { TsemaphoreMailbox }
    
    constructor TsemaphoreMailbox.create;
    begin
    {$IFDEF D2009}
       inherited Create;
    {$ELSE}
      inherited create;
    {$ENDIF}
      access:=TcriticalSection.create;
      countSema:=createSemaphore(nil,0,maxInt,nil);
    end;
    
    destructor TsemaphoreMailbox.destroy;
    begin
      access.free;
      closeHandle(countSema);
      inherited;
    end;
    
    function TsemaphoreMailbox.pop(pResObject: pObject;
      timeout: DWORD): boolean;
    // dequeues an object, if one is available on the queue.  If the queue is empty,
    // the caller is blocked until either an object is pushed on or the timeout
    // period expires
    begin // wait for a unit from the semaphore
      result:=(WAIT_OBJECT_0=waitForSingleObject(countSema,timeout));
      if result then // if a unit was supplied before the timeout,
      begin
        access.acquire;
        try
          pResObject^:=inherited pop; // get an object from the queue
        finally
          access.release;
        end;
      end;
    end;
    
    procedure TsemaphoreMailbox.push(aObject: Tobject);
    // pushes an object onto the queue.  If threads are waiting in a 'pop' call,
    // one of them is made ready.
    begin
      access.acquire;
      try
        inherited push(aObject); // shove the object onto the queue
      finally
        access.release;
      end;
      releaseSemaphore(countSema,1,nil); // release one unit to semaphore
    end;
    
    function TsemaphoreMailbox.peek(pResObject: pObject): boolean;
    begin
      access.acquire;
      try
        result:=(count>0);
        if result then pResObject^:=inherited pop; // get an object from the queue
      finally
        access.release;
      end;
    end;
    end.
    
  • 1

    我不认为TThreadedQueue应该支持多个消费者 . 根据帮助文件,它是一个FIFO . 我的印象是有一个线程推动而另一个(只有一个!)弹出 .

相关问题