尝试在单个 生产环境 者多个消费者方案中使用TThreadedQueue(Generics.Collections) . (DELPHI-XE) . 我们的想法是将对象推入队列,让几个工作线程排空队列 .
但它没有按预期工作 . 当两个或多个工作线程调用PopItem时,将从TThreadedQueue抛出访问冲突 .
如果使用临界区序列化对PopItem的调用,则一切正常 .
当然,TThreadedQueue应该能够处理多个消费者,所以我错过了什么,或者这是TThreadedQueue中的一个纯粹的错误?
这是一个产生错误的简单示例 .
program TestThreadedQueue;
{$APPTYPE CONSOLE}
uses
// FastMM4 in '..\..\..\FastMM4\FastMM4.pas',
Windows,
Messages,
Classes,
SysUtils,
SyncObjs,
Generics.Collections;
type TThreadTaskMsg =
class(TObject)
private
threadID : integer;
threadMsg : string;
public
Constructor Create( ID : integer; const msg : string);
end;
type TThreadReader =
class(TThread)
private
fPopQueue : TThreadedQueue<TObject>;
fSync : TCriticalSection;
fMsg : TThreadTaskMsg;
fException : Exception;
procedure DoSync;
procedure DoHandleException;
public
Constructor Create( popQueue : TThreadedQueue<TObject>;
sync : TCriticalSection);
procedure Execute; override;
end;
Constructor TThreadReader.Create( popQueue : TThreadedQueue<TObject>;
sync : TCriticalSection);
begin
fPopQueue:= popQueue;
fMsg:= nil;
fSync:= sync;
Self.FreeOnTerminate:= FALSE;
fException:= nil;
Inherited Create( FALSE);
end;
procedure TThreadReader.DoSync ;
begin
WriteLn(fMsg.threadMsg + ' ' + IntToStr(fMsg.threadId));
end;
procedure TThreadReader.DoHandleException;
begin
WriteLn('Exception ->' + fException.Message);
end;
procedure TThreadReader.Execute;
var signal : TWaitResult;
begin
NameThreadForDebugging('QueuePop worker');
while not Terminated do
begin
try
{- Calling PopItem can return empty without waittime !? Let other threads in by sleeping. }
Sleep(20);
{- Serializing calls to PopItem works }
if Assigned(fSync) then fSync.Enter;
try
signal:= fPopQueue.PopItem( TObject(fMsg));
finally
if Assigned(fSync) then fSync.Release;
end;
if (signal = wrSignaled) then
begin
try
if Assigned(fMsg) then
begin
fMsg.threadMsg:= '<Thread id :' +IntToStr( Self.threadId) + '>';
fMsg.Free; // We are just dumping the message in this test
//Synchronize( Self.DoSync);
//PostMessage( fParentForm.Handle,WM_TestQueue_Message,Cardinal(fMsg),0);
end;
except
on E:Exception do begin
end;
end;
end;
except
FException:= Exception(ExceptObject);
try
if not (FException is EAbort) then
begin
{Synchronize(} DoHandleException; //);
end;
finally
FException:= nil;
end;
end;
end;
end;
Constructor TThreadTaskMsg.Create( ID : Integer; Const msg : string);
begin
Inherited Create;
threadID:= ID;
threadMsg:= msg;
end;
var
fSync : TCriticalSection;
fThreadQueue : TThreadedQueue<TObject>;
fReaderArr : array[1..4] of TThreadReader;
i : integer;
begin
try
IsMultiThread:= TRUE;
fSync:= TCriticalSection.Create;
fThreadQueue:= TThreadedQueue<TObject>.Create(1024,1,100);
try
{- Calling without fSync throws exceptions when two or more threads calls PopItem
at the same time }
WriteLn('Creating worker threads ...');
for i:= 1 to 4 do fReaderArr[i]:= TThreadReader.Create( fThreadQueue,Nil);
{- Calling with fSync works ! }
//for i:= 1 to 4 do fReaderArr[i]:= TThreadReader.Create( fThreadQueue,fSync);
WriteLn('Init done. Pushing items ...');
for i:= 1 to 100 do fThreadQueue.PushItem( TThreadTaskMsg.Create( i,''));
ReadLn;
finally
for i:= 1 to 4 do fReaderArr[i].Free;
fThreadQueue.Free;
fSync.Free;
end;
except
on E: Exception do
begin
Writeln(E.ClassName, ': ', E.Message);
ReadLn;
end;
end;
end.
Update :导致TThreadedQueue崩溃的TMonitor中的错误在Delphi XE2中得到修复 .
Update 2 :上面的测试强调队列处于空状态 . Darian Miller发现强调队列处于满状态,仍然可以重现XE2中的错误 . 错误再一次出现在TMonitor中 . 有关详细信息,请参阅下面的答案 . 还有QC101114的链接 .
Update 3 :随着Delphi-XE2更新4, TMonitor
已经宣布了一个可以解决 TThreadedQueue
中问题的修复程序 . 到目前为止,我的测试无法再现 TThreadedQueue
中的任何错误 . 当队列为空且已满时,测试单个 生产环境 者/多个使用者线程 . 还测试了多个 生产环境 者/多个消费者 . 我将读取器线程和写入器线程从1改为100,没有任何故障 . 但是知道历史,我敢于让别人打破 TMonitor
.
5 回答
好吧,没有大量的测试很难确定,但在TThreadedQueue或TMonitor中看起来肯定是一个bug . 无论哪种方式,它都在RTL而不是您的代码中 . 您应该将其作为QC报告提交,并使用上面的示例作为“如何重现”代码 .
我建议你在使用线程,并行性等时使用OmniThreadLibrary http://www.thedelphigeek.com/search/label/OmniThreadLibrary.Primoz做得非常好,在网站上你会找到很多有用的文档 .
您的示例似乎在XE2下正常工作,但是如果我们填满您的队列,则它会在PushItem上失败 . (在XE2 Update1下测试)
要重现,只需将任务创建从100增加到1100(您的队列深度设置为1024)
每次在Windows 7上都会为我而死 . 我最初尝试不断推动压力测试它,它在第30循环失败...然后在第16循环...然后在第65循环,所以在不同的时间间隔但它在某些时候一直失败点 .
我找了TThreadedQueue类,但似乎没有在我的D2009中 . 我并不是真的会为此而自杀 - Delphi线程支持一直都是错误的...错误......'非最优'并且我怀疑TThreadedQueue没有什么不同:)
为什么要将泛型用于P-C( 生产环境 者/消费者)对象?一个简单的TObjectQueue后代将会很好 - 几十年来一直使用它 - 适用于多个 生产环境 者/消费者:
我不认为TThreadedQueue应该支持多个消费者 . 根据帮助文件,它是一个FIFO . 我的印象是有一个线程推动而另一个(只有一个!)弹出 .