delphi – TThreadedQueue不能多个消费者?

delphi – TThreadedQueue不能多个消费者?,第1张

概述试图在单个生产者多消费者方案中使用TThreadedQueue(Generics.Collections)。 (Delphi-XE)。 想法是将对象推入队列,并让几个工作线程排空队列。 它不能按预期工作,但。 当两个或多个工作线程调用PopItem时,会从TThreadedQueue抛出访问冲突。 如果PopItem的调用序列化了一个临界区,一切都很好。 当然TThreadedQueue应该能够处 试图在单个生产者多消费者方案中使用TThreadedQueue(Generics.Collections)。 (Delphi-XE)。
想法是将对象推入队列,并让几个工作线程排空队列。

它不能按预期工作,但。
当两个或多个工作线程调用PopItem时,会从TThreadedQueue抛出访问冲突。

如果PopItem的调用序列化了一个临界区,一切都很好。

当然TThreadedQueue应该能够处理多个消费者,所以我缺少的东西或者这是一个纯的BUG在TThreadedQueue?

这里是一个简单的例子来产生错误。

program TestThreadedQueue;{$APPTYPE CONSolE}uses//  FastMM4 in '..\..\..\FastMM4\FastMM4.pas',windows,Messages,Classes,SysUtils,SyncObJs,Generics.Collections;type TThreadTaskMsg =       class(TObject)         private           threadID  : integer;           threadMsg : string;         public           Constructor Create( ID : integer; const msg : string);       end;type TThreadReader =       class(TThread)         private           fPopQueue   : TThreadedQueue<TObject>;           fSync       : TCriticalSection;           fMsg        : TThreadTaskMsg;           fException  : Exception;           procedure DoSync;           procedure DoHandleException;         public           Constructor Create( popQueue : TThreadedQueue<TObject>;                               sync     : TCriticalSection);           procedure Execute; overrIDe;       end;Constructor TThreadReader.Create( popQueue : TThreadedQueue<TObject>;                                  sync     : TCriticalSection);begin  fPopQueue:=            popQueue;  fMsg:=                 nil;  fSync:=                sync;  Self.FreeOnTerminate:= FALSE;  fException:=           nil;  inherited Create( FALSE);end;procedure TThreadReader.DoSync ;begin  WriteLn(fMsg.threadMsg + ' ' + IntToStr(fMsg.threadID));end;procedure TThreadReader.DoHandleException;begin  WriteLn('Exception ->' + fException.Message);end;procedure TThreadReader.Execute;var signal : TWaitResult;begin  nameThreadForDeBUGging('QueuePop worker');  while not Terminated do  begin    try      {- Calling PopItem can return empty without waittime !? Let other threads in by sleePing. }      Sleep(20);      {- Serializing calls to PopItem works }      if Assigned(fSync) then fSync.Enter;      try        signal:= fPopQueue.PopItem( TObject(fMsg));      finally        if Assigned(fSync) then fSync.Release;      end;      if (signal = wrSignaled) then      begin        try          if Assigned(fMsg) then          begin            fMsg.threadMsg:= '<Thread ID :' +IntToStr( Self.threadID) + '>';            fMsg.Free; // We are just dumPing the message in this test            //Synchronize( Self.DoSync);            //PostMessage( fParentForm.Handle,WM_TestQueue_Message,Cardinal(fMsg),0);          end;        except          on E:Exception do begin          end;        end;      end;      except       FException:= Exception(ExceptObject);      try        if not (FException is EAbort) then        begin          {Synchronize(} DoHandleException; //);        end;      finally        FException:= nil;      end;   end;  end;end;Constructor TThreadTaskMsg.Create( ID : Integer; Const msg : string);begin  inherited Create;  threadID:= ID;  threadMsg:= msg;end;var    fSync : TCriticalSection;    fThreadQueue : TThreadedQueue<TObject>;    fReaderArr : array[1..4] of TThreadReader;    i : integer;begin  try    IsMultiThread:= TRUE;    fSync:=        TCriticalSection.Create;    fThreadQueue:= TThreadedQueue<TObject>.Create(1024,1,100);    try      {- Calling without fSync throws exceptions when two or more threads calls PopItem         at the same time }      WriteLn('Creating worker threads ...');      for i:= 1 to 4 do fReaderArr[i]:= TThreadReader.Create( fThreadQueue,Nil);      {- Calling with fSync works ! }      //for i:= 1 to 4 do fReaderArr[i]:= TThreadReader.Create( fThreadQueue,fSync);       WriteLn('Init done. Pushing items ...');      for i:= 1 to 100 do fThreadQueue.PushItem( TThreadTaskMsg.Create( i,''));      ReadLn;    finally      for i:= 1 to 4 do fReaderArr[i].Free;      fThreadQueue.Free;      fSync.Free;    end;  except    on E: Exception do      begin        Writeln(E.Classname,': ',E.Message);        ReadLn;      end;  end;end.

更新:导致TThreadedQueue崩溃的TMonitor中的错误是在Delphi XE2中修复的。

更新2:上面的测试强调队列在空状态。 Darian Miller发现,在满状态下强调队列,仍然可以再现XE2中的错误。错误再次出现在TMonitor中。有关更多信息,请参阅下面的答案。还有一个链接到QC101114。

更新3:
有了Delphi-XE2更新4,有一个宣布的修正TMonitor将解决问题在TThreadedQueue。我的测试到目前为止不能再现任何错误在TThreadedQueue了。
当队列为空且已满时,测试单个生产者/多个消费者线程。
还测试了多个生产者/多个消费者。我改变了读者线程和写线程从1到100没有任何毛刺。但是知道历史,我敢于别人打破TMonitor。

解决方法 嗯,很难确定没有大量的测试,但它肯定看起来像是一个错误,无论是在TThreadedQueue或在TMonitor。无论哪种方式,它在RTL而不是你的代码。您应该将此文件作为QC报告,并使用上面的示例作为“如何再现”代码。 总结

以上是内存溢出为你收集整理的delphi – TThreadedQueue不能多个消费者?全部内容,希望文章能够帮你解决delphi – TThreadedQueue不能多个消费者?所遇到的程序开发问题。

如果觉得内存溢出网站内容还不错,欢迎将内存溢出网站推荐给程序员好友。

欢迎分享,转载请注明来源:内存溢出

原文地址: http://outofmemory.cn/langs/1283101.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2022-06-09
下一篇 2022-06-09

发表评论

登录后才能评论

评论列表(0条)

保存