A股上市公司传智教育(股票代码 003032)旗下技术交流社区北京昌平校区

 找回密码
 加入黑马

QQ登录

只需一步,快速开始

© 左手神刀 中级黑马   /  2013-7-20 10:37  /  1364 人查看  /  7 人回复  /   0 人收藏 转载请遵从CC协议 禁止商业使用本文

看完交通灯和银行系统后最大的疑惑就是对线程池还不是很了解。在百度上也找了些文章看,还是有点儿一知半解。谁有关于线程池的通俗易懂文章?求分享。

评分

参与人数 1黑马币 +3 收起 理由
杨兴庭 + 3 赞一个!

查看全部评分

7 个回复

倒序浏览
在以前的文章中,我已经描述了使用IOCP来编写一个可以支持大量数据连接的网络通信部分的内容。但是IOCP的作用仅限于用来编写网络通信吗?答案是否定的。在《windows核心编程》一书中,作者也说明了“使用IO完成端口允许我们向一个设备同时发送多个IO请求,它允许一个线程发出IO请求,另一个线程对结果进行处理,这项技术具有高度的伸缩性和最佳的灵活性”(windows核心编程第297页)。这就说明了IOCP不只是用来开发网络代码,只要是IO的投递处理都可以用IOCP来处理。这样的话我们就可以使用IOCP来开发我们的线程池了。
为什么非要用IOCP线程池呢?让我们来分析一下线程池的作用,以及IOCP线程池的好处。
当我们做服务端开发的时候,经常需要对一些慢设备进行操作(例如数据库,文件等等)。当我们只使用一个线程来处理慢设备的时候,会发现我们的程序运行起来CPU利用率过低,处理速度过慢。如果我们使用多线程同时来操作,就可以大大提高效率。但是我们自己写多线程调度的时候却存在2个难以处理的问题:
1:我们需要开多少个线程合适?因为我们知道过多的线程在CPU调度切换的时候会有相当大的消耗,从而造成效率的低下(一些初学者理解的线程越多速度越快的观点是错误的)。
2:我们如何调度线程?当我们操作多线程的时候我们如何选取合适的线程来处理呢?如果只用一个指定的线程那自然不行(这样的话和单线程没有任何区别了)。那我们又如何选择合适的线程呢?我想大家一定可以实现,但是实现的结果就是需要进行大量的判断,然后来选择合适的线程处理。这样本身增加了编码复杂度,而且降低了效率。(大道至简,这句话用在开发上简直是太精辟了)。
而这两个问题,IOCP却恰恰已经为我们解决了。
1:IOCP在一台机器上开的线程数量是有一定规定的。一般来说是CPU数量的*2 + 2。(有的书上说是*2)。
2:对于线程的调度,IOCP采用了后进先出的原则。例如当有A、B、C、D 4个等待线程队列,那么当数据投递过来以后IOCP会选择最后一次调用GetQueuedCompletionStatus的线程去处理这个数据。例如最后调用GetQueuedCompletionStatus的线程为A,那么线程A会被唤醒。同理当线程A正在处于繁忙状态时,则剩余的等待线程队列中的最后一个调用GetQueuedCompletionStatus函数的线程会被唤醒,去进行处理。
通过以上两点来看,使用IOCP来编写线程池对于开发服务端程序是非常有好处的。
那么我们怎样用IOCP来编写线程池呢?
首先我们来分析一下IOCP在网络通信过程中是如何实现的,这有助于我们用IOCP来编写线程池。
在网络通信中,当我们使用IOCP来接收数据的时候,我们首先需要投递一个WSARecv函数,用来通知系统我们已经投递了接收请求,当系统接收到数据以后,会自动填充重叠结构,并从函数GetQueuedCompletionStatus返回这个重叠结构和数据的长度。而且我们也可以使用函数
PostQueuedCompletionStatus来投递我们自己定义的消息。这一点在IOCP编写线程池的时候尤为重要。

下来我们看一下如何使用Delphi编写一个IOCP线程池。
创建一个IOCP线程池,至少需要2个基本类。
首先:我们定义一个管理工作线程的线程池类(TTheadPool),这个类用来将接收到的数据转发给相应的工作线程类,同时将工作线程类返回的数据进行排队处理。
其次:我们定义真正处理数据的工作线程类,此类用来对线程池类发来的数据进行相应的处理,同时将处理返回的数据通过线程池类返回。
现在我们分别来看一下这两个类如何定义和编写。
TTheadPool的作用是用来管理工作线程类,那么它至少需要 创建工作线程函数和释放工作线程函数。
//IOCP管理
m_hCompPort: THandle;
m_iWorkThreads: Word;
m_pWorkThreads: array of TWorkThread;
function  CreateWorkThreads: Boolean;
function  DestroyWorkThreads: Boolean;
procedure CreateCompPort;
procedure CloseCompPort;
m_hCompPort是完成端口
m_iWorkThreads是用来记录创建工作线程的数量
m_pWorkThreads是工作线程的动态数组
CreateWorkThreads函数用来创建工作线程
DestroyWorkThreads函数用来销毁工作线程
CreateCompPort函数用来创建完成端口
CloseCompPort函数用来关闭完成端口
下来看一下以上函数的实现
function TTheadPool.CreateWorkThreads: Boolean;
var
  I: Integer;
begin
  if m_pWorkThreads <> nil then
  begin
    Result:=True;
    Exit;
  end;
  m_iWorkThreads:=0;
  if m_WorkCount = 0 then
    SetLength(m_pWorkThreads, getNumberOfProcessor * 2 + 2)
  else
    SetLength(m_pWorkThreads, m_WorkCount);
  for I:=Low(m_pWorkThreads) to High(m_pWorkThreads) do
  begin
    m_pWorkThreads:=TWorkThread.Create(Self);
    Inc(m_iWorkThreads);
  end;
  if m_iWorkThreads = 0 then
  begin
    Result:=False;
    Exit;
end;
  Result:=True;
end;
function TTheadPool.DestroyWorkThreads: Boolean;
var
  I: Integer;
begin
  if m_pWorkThreads = nil then
  begin
    Result:=True;
    Exit;
  end;
  for i:=0 to m_iWorkThreads - 1 do
  begin
    PostQueuedCompletionStatus(m_hCompPort,0, 0, Pointer(SHUTDOWN_FLAG));
  end;
  Sleep(0);
  for I:=Low(m_pWorkThreads) to High(m_pWorkThreads)  do
  begin
    m_pWorkThreads.Destroy;
  end;
Result:=True;
end;
procedure TTheadPool.CreateCompPort;
begin
  if m_hCompPort = 0 then
  begin
    m_hCompPort:=CreateIOCompletionPort(INVALID_HANDLE_VALUE, 0, 0, 0);
  end;
end;
procedure TTheadPool.CloseCompPort;
begin
  if m_hCompPort <> 0 then
  begin
  CloseHandle(m_hCompPort);
  m_hCompPort:=0;
end;
end;

//数据合并与管理
QueueCS: TRTLCriticalSection;
First, Last: PMsgBuffer;
procedure Reponse;
procedure FreeQueue;
这段代码是用来加载消息队列,
Reponse函数对消息队列的消息进行处理
FreeQueue函数是用来当程序结束时,销毁未处理的队列消息
procedure TTheadPool.Reponse;
var
  pWork,
  pNext: PMsgBuffer;
begin
  pWork:=nil;
  pNext:=nil;
  EnterCriticalSection(QueueCS);
  try
    if not Assigned(pWork) then
    begin
      pWork:=First;
      First:=nil;
      Last:=nil;
    end;
  finally
    LeaveCriticalSection(QueueCS);
  end;
  while Assigned(pWork) do
  begin
    pNext:=pWork.Next;
    PostQueuedCompletionStatus(m_hCompPort, 0, 0, Pointer(pWork));
    pWork:=pNext;
  end;
end;
procedure TTheadPool.FreeQueue;
var
  pNext: PMsgBuffer;
begin
  while Assigned(First) do
  begin
    pNext:=First.Next;
    if Assigned(First.Buffer) then
    begin
      FreeMem(First.Buffer);
    end;
    Dispose(First);
    First:=pNext;
  end;
  First:=nil;
  Last:=nil;
end;
以上就是线程池管理类(TTheadPool)的处理过程和主要函数。
对于工作线程类来说处理起来就比较简单了。
TWorkThread = class(TThread)
  protected
    procedure Execute; override;
  public
    procedure Reponse(pMsg: PMsgBuffer);
    procedure TestWork(pBuf: Pchar; iBufLen, iSocketHandle: Integer);
  public
    Parent: TTheadPool;
    Constructor Create(TheadPool: TTheadPool);
    Destructor Destroy; Override;
  end;
  
Reponse函数是用来对于线程池管理类发送过来的消息进行分发的函数。
TestWork函数是对于线程池管理类发送过来的数据进行处理的实际的函数。
procedure TWorkThread.Execute;
var
  bRet: BOOL;
  iRecvlen: Cardinal;
  pMsg: PCRCMsgBuffer;
  iBufferlen: Cardinal;
begin
  while not Terminated do
  begin
    try
      pMsg:=nil;
      bRet:=GetQueuedCompletionStatus(Parent.m_hCompPort, iBufferlen, iRecvlen, POverlapped(pMsg), INFINITE);
      //主动断开
      if (Cardinal(pMsg) = SHUTDOWN_FLAG) then
      begin
        Terminate;
      end;
      //关闭
      if Terminated then
      begin
        Break;
      end;
      if bRet and Assigned(pMsg) then
      begin
        Reponse(pMsg);
      end;
    except
      Continue;
    end;
  end;
end;
procedure TCRCWorkThread.Reponse(pMsg: PCRCMsgBuffer);
begin
  case pMsg.MainType of
  USER_TEST:         //用户发送来的相关协议
    TestWork(pMsg.pData, pMsg.iDatalen, pMsg.SocketHandle);
  else
    g_MainThread.AddLog(Format('未知的信息:%d',[pMsg.MainType]));
  end;
  if Assigned(pMsg.pData) then
  begin
    FreeMem(pMsg.pData);
  end;
  Dispose(pMsg);
end;
这样我们就将一个IOCP的线程池编写完成,其中需要注意的2点。
1:SHUTDOWN_FLAG是自己定义的一个常量,当线程池类需要关闭的时候,线程池类可以通过PostQueuedCompletionStatus向每一个工作线程发送消息,通知工作线程关闭
2:在线程池类中申请的内存PMsgBuffer 是在工作线程中进行释放的。

评分

参与人数 1黑马币 +3 收起 理由
杨兴庭 + 3 赞一个!

查看全部评分

回复 使用道具 举报
深圳在漂移 发表于 2013-7-20 10:55
在以前的文章中,我已经描述了使用IOCP来编写一个可以支持大量数据连接的网络通信部分的内容。但是IOCP的作 ...

哥们,你这个复制粘贴太无脑了,Delphi的文章都粘贴过来了,提醒一下这样很容易被扣分的。

评分

参与人数 1黑马币 +3 收起 理由
杨兴庭 + 3 赞一个!

查看全部评分

回复 使用道具 举报
看来楼主只能靠多敲几遍代码慢慢体悟了。、。。。。。
回复 使用道具 举报
看文字还能念下去 看代码就蒙圈了。。。
回复 使用道具 举报
我怎么看着像是用c语言写的代码啊 ?
有java的没?
回复 使用道具 举报
wyy283 中级黑马 2013-7-25 11:40:54
7#
1:IOCP在一台机器上开的线程数量是有一定规定的。一般来说是CPU数量的*2 + 2。(有的书上说是*2)。
2:对于线程的调度,IOCP采用了后进先出的原则。例如当有A、B、C、D 4个等待线程队列,那么当数据投递过来以后IOCP会选择最后一次调用GetQueuedCompletionStatus的线程去处理这个数据。例如最后调用GetQueuedCompletionStatus的线程为A,那么线程A会被唤醒。同理当线程A正在处于繁忙状态时,则剩余的等待线程队列中的最后一个调用GetQueuedCompletionStatus函数的线程会被唤醒,去进行处理。
通过以上两点来看,使用IOCP来编写线程池对于开发服务端程序是非常有好处的。
那么我们怎样用IOCP来编写线程池呢?
首先我们来分析一下IOCP在网络通信过程中是如何实现的,这有助于我们用IOCP来编写线程池。
在网络通信中,当我们使用IOCP来接收数据的时候,我们首先需要投递一个WSARecv函数,用来通知系统我们已经投递了接收请求,当系统接收到数据以后,会自动填充重叠结构,并从函数GetQueuedCompletionStatus返回这个重叠结构和数据的长度。而且我们也可以使用函数
PostQueuedCompletionStatus来投递我们自己定义的消息。这一点在IOCP编写线程池的时候尤为重要。
回复 使用道具 举报
网上有java.util.current包的教程,可以去搜索看一下。不仅有说线程池,
回复 使用道具 举报
您需要登录后才可以回帖 登录 | 加入黑马