本页使用了标题或全文手工转换

线程池

维基百科,自由的百科全书
跳转至: 导航搜索

线程池:一种线程使用模式。线程池维护着多个线程,等待着监督管理者分配可并发执行的任务。这避免了在处理短时间任务时创建与销毁线程的代价。可用线程数量应该取决于可用的并发处理器、处理器内核、内存、网络sockets等的数量。 例如,线程数一般取cpu数量+2比较合适,线程数过多会导致额外的线程切换开销。

任务调度以执行线程的常见方法是使用同步队列,称作任务队列。池中的线程等待队列中的任务,并把执行完的任务放入完成队列中。

线程池模式一般分为两种:HS/HA半同步/半异步模式、L/F领导者与跟随者模式。

  • 半同步/半异步模式又称为生产者消费者模式,是比较常见的实现方式,比较简单。分为同步层、队列层、异步层三层。同步层的主线程处理工作任务并存入工作队列,工作线程从工作队列取出任务进行处理,如果工作队列为空,则取不到任务的工作线程进入挂起状态。由于线程间有数据通信,因此不适于大数据量交换的场合。
  • 领导者跟随者模式,在线程池中的线程可处在3种状态之一:领导者leader、追随者follower或工作者processor。任何时刻线程池只有一个领导者线程。事件到达时,领导者线程负责消息分离,并从处于追随者线程中选出一个来当继任领导者,然后将自身设置为工作者状态去处置该事件。处理完毕后工作者线程将自身的状态置为追随者。这一模式实现复杂,但避免了线程间交换任务数据,提高了CPU cache相似性。在ACE(Adaptive Communication Environment)中,提供了领导者跟随者模式实现。

线程池的伸缩性对性能有较大的影响。

  • 创建太多线程,将会浪费一定的资源,有些线程未被充分使用。
  • 銷毀太多執行緒,將導致之後浪費時間再次創建它們。
  • 创建线程太慢,将会导致長時間的等待,性能變差。
  • 銷毀執行緒太慢,导致其它執行緒资源饥饿。

Windows API的线程池函数[编辑]

Windows操作系统API提供了一套线程池的实现接口。[1]可以方便地创建、使用线程池。使用下述用户模式的对象来管理线程池及相关的数据:

  • 线程池对象(pool object)包含了一组工作线程(worker threads)。每个进程可以创建多个线程池对象,也可以直接使用进程确省的线程池。
  • 回调环境(Threadpool Callback Environment)线程池可以关联多个回调环境,回调环境可单独指定在线程池内的调度优先级。
  • 清理群(clean-up group)对象关联于一个回调环境。用于追溯(track)线程缓冲池回调。每次创建线程池的IO、work、timer、wait等对象,会在cleanup group中增加一个成员。关闭线程池的IO、work、timer、wait等对象,会在cleanup group中删除相应的成员。
  • 线程池工作对象(work object)可异步多次投寄给线程池去执行它的回调函数。实际上就是以异步方式调用回调函数。通过SetThreadpoolWait投递工作对象到任务队列。
  • 线程池定时器对象(timer object)在定时器到期时投寄给线程池去执行它的回调函数。 创建(CreateThreadpoolTimer)并设定(SetThreadpoolTimer)定时器对象。当定时器对象到期时,工作线程会执行定时器对象的回调函数。
  • 线程池等待对象(wait object)在可等待句柄(waitable handle)被触发(signaled)时投寄给线程池去执行它的回调函数。创建(CreateThreadpoolWait)并设定(SetThreadpoolWait)等待对象。当可等待对象(waitable object)的句柄被通知(signaled)或超时到期时,工作线程会执行等待对象的回调函数。
  • 线程池I/O对象把文件句柄关联到线程池的I/O完成端口(completion port)。当异步I/O操作完成,一个工作线程取得操作的状态并调用I/O对象的回调函数。

相关的API函数:[2]

  • CreateThreadpool 创建一个线程池数据结构
  • CloseThreadpool 关闭一个线程池
  • SetThreadpoolThreadMaximum 设置一个线程池的线程数量上限
  • SetThreadpoolThreadMinimum 设置一个线程池的线程数量下限
  • InitializeThreadpoolEnvironment 初始化一个回调环境
  • DestroyThreadpoolEnvironment 删除回调环境
  • SetThreadpoolCallbackPool 把指定的线程缓冲池关联到指定的线程池回调环境。创建线程池的IO、work、timer、wait等对象会关联到一个回调环境上。如果使用系统默认的回调环境,需要自行负责回调函数的DLL保持装入状态,自行管理线程池内的各种对象的生存期。线程池可以关联多个回调环境,回调环境可单独指定在线程池内的调度优先级。
  • SetThreadpoolCallbackLibrary 指示在回调函数未执行完的时候其所在的DLL库不能卸载
  • SetThreadpoolCallbackPriority 设定回调环境在所属线程池内的优先级
  • SetThreadpoolCallbackRunsLong 指示当前回调函数将运行较长时间。这有助于线程池判断是否创建新线程
  • CreateThreadpoolCleanupGroup 创建一个cleanup group数据结构,关联到一个回调环境上
  • SetThreadpoolCallbackCleanupGroup 把指定的cleanup group关联到指定的回调环境
  • CloseThreadpoolCleanupGroup 关闭一个cleanup group数据结构。cleanup group不能包含任何成员。
  • CloseThreadpoolCleanupGroupMembers 把指定cleanup group的所有work、timer、wait等对象释放掉。对于正在执行的回调函数,会等待其执行完毕。参数fCancelPendingCallbacks确定是否尚未开始执行的任务执行。
  • CreateThreadpoolWork 创建一个线程池工作对象的数据结构
  • SubmitThreadpoolWork 投递(post)一个工作对象到线程池中。一个工作线程将会调用执行这个工作对象的回调函数。
  • CreateThreadpoolTimer 创建一个线程池定时器对象的数据结构
  • SetThreadpoolTimer 设定一个线程定时器对象的参数。当定时器到期时,一个工作线程会调用这个定时器对象的回调函数
  • IsThreadpoolTimerSet
  • CreateThreadpoolWait 创建一个线程池等待(wait)对象的数据结构
  • SetThreadpoolWait 设定一个线程等待对象与一个可等待对象(waitable object)的句柄绑定并设定超时时间。当可等待对象的句柄被通知(signaled)或指定的超时到期,一个工作线程会调用这个等待对象的回调函数。如果可等待对象的句柄被设置为NULL,线程池等待对象将停止把回调函数入任务队列,但已在队列的回调函数仍会被调度执行。
  • CloseThreadpoolWait 关闭一个线程等待对象,然后才能关闭可等待对象(waitable object)的句柄。
  • WaitForThreadpoolWaitCallbacks 等待指定的线程池等待对象执行完成或者取消还没开始执行的线程池等待对象。
  • WaitForThreadpoolIoCallbacks 等待指定的线程池IO对象执行完成或者取消还没开始执行的线程池IO对象。
  • WaitForThreadpoolTimerCallbacks 等待指定的线程池定时器对象执行完成或者取消还没开始执行的线程池定时器对象。
  • WaitForThreadpoolWorkCallbacks 等待指定的线程池工作对象执行完成或者取消还没开始执行的线程池工作对象。
  • CreateThreadpoolIo 创建一个IO完成对象
  • CancelThreadpoolIo 在IO失败或者IO直接同步完成时,取消通知(notification)
  • CloseThreadpoolIo 释放IO完成对象。应等待所有IO操作完成,关闭IO文件句柄后再调用本函数
  • StartThreadpoolIo 每次初始化异步IO操作前应先调用本函数,否则线程池会忽略IO操作的完成并内存泄露。
  • TrySubmitThreadpoolCallback 请求线程池一个工作线程调用指定的回调函数
  • DisassociateCurrentThreadFromCallback 把执行当前回调函数的线程与初始化当前回调函数的对象去关联。这使得所有等待当前回调函数完成的线程被释放
  • FreeLibraryWhenCallbackReturns
  • LeaveCriticalSectionWhenCallbackReturns
  • ReleaseMutexWhenCallbackReturns
  • ReleaseSemaphoreWhenCallbackReturns
  • SetEventWhenCallbackReturns

示例程序如下:

#include <windows.h>
#include <tchar.h>
#include <stdio.h>

//
// Thread pool wait callback function template
//
VOID CALLBACK MyWaitCallback(PTP_CALLBACK_INSTANCE Instance, PVOID Parameter, PTP_WAIT Wait, TP_WAIT_RESULT WaitResult)
{
	// Instance, Parameter, Wait, and WaitResult not used in this example.
	UNREFERENCED_PARAMETER(Instance); 	UNREFERENCED_PARAMETER(Parameter);	UNREFERENCED_PARAMETER(Wait); 	UNREFERENCED_PARAMETER(WaitResult);

	// Do something when the wait is over.
	_tprintf(_T("MyWaitCallback: wait is over.\n"));
}


//
// Thread pool timer callback function template
//
VOID CALLBACK MyTimerCallback(PTP_CALLBACK_INSTANCE Instance, PVOID Parameter, PTP_TIMER Timer)
{
	// Instance, Parameter, and Timer not used in this example.
	UNREFERENCED_PARAMETER(Instance); UNREFERENCED_PARAMETER(Parameter); UNREFERENCED_PARAMETER(Timer);

	// Do something when the timer fires.
	_tprintf(_T("MyTimerCallback: timer has fired.\n"));
}


//
// This is the thread pool work callback function.
//
VOID CALLBACK MyWorkCallback( PTP_CALLBACK_INSTANCE Instance, PVOID Parameter, PTP_WORK Work)
{
	// Instance, Parameter, and Work not used in this example.
	UNREFERENCED_PARAMETER(Instance);UNREFERENCED_PARAMETER(Parameter);UNREFERENCED_PARAMETER(Work);

	// Do something when the work callback is invoked.
	_tprintf(_T("MyWorkCallback: Task performed.\n"));
}

 

int main(void)
{
	PTP_WAIT Wait = NULL;
	PTP_WAIT_CALLBACK waitcallback = MyWaitCallback;
	HANDLE hEvent = NULL;
	UINT i = 0;
	UINT rollback = 0;

	// Create an auto-reset event and initialized as nonsignaled.
	hEvent = CreateEvent(NULL, FALSE, FALSE, NULL);

	if (NULL == hEvent) {
		// Error Handling
		return 0;
	}

	rollback = 1; // CreateEvent succeeded

	Wait = CreateThreadpoolWait(waitcallback,
		NULL,     // 回调函数的输入参数
		NULL);    // 使用确省的回调环境

	if (NULL == Wait) {
		_tprintf(_T("CreateThreadpoolWait failed. LastError: %u\n"),GetLastError());
		goto new_wait_cleanup;
	}

	rollback = 2; // CreateThreadpoolWait succeeded

				  // must re-register the event with the wait object before signaling it each time to trigger the wait callback
				  // each time before signaling the event to trigger the wait callback.
	for (i = 0; i < 5; i++) {
		SetThreadpoolWait(Wait, //线程池等待对象
			hEvent,             //内核等待对象
			NULL);              //超时设定

		SetEvent(hEvent);       //触发内核等待对象

		// Delay for the waiter thread to act if necessary.
		Sleep(500);

		// Block here until the callback function is done executing.
		WaitForThreadpoolWaitCallbacks(Wait, FALSE);
	}

new_wait_cleanup:
	switch (rollback) {
	case 2:
		// Unregister the wait by setting the event to NULL.
		SetThreadpoolWait(Wait, NULL, NULL); //the wait object will cease to queue new callbacks (but callbacks already queued will still occur

		// Close the wait.
		CloseThreadpoolWait(Wait);

	case 1:
		// Close the event.
		CloseHandle(hEvent);

	default:
		break;
	}
	

	BOOL bRet = FALSE;
	PTP_WORK work = NULL;
	PTP_TIMER timer = NULL;
	PTP_POOL pool = NULL;
	PTP_WORK_CALLBACK workcallback = MyWorkCallback;
	PTP_TIMER_CALLBACK timercallback = MyTimerCallback;
	TP_CALLBACK_ENVIRON CallBackEnviron;
	PTP_CLEANUP_GROUP cleanupgroup = NULL;
	FILETIME FileDueTime;
	ULARGE_INTEGER ulDueTime;
	rollback = 0;

	InitializeThreadpoolEnvironment(&CallBackEnviron); //不使用确省的线程池与确省的回调环境

	// Create a custom, dedicated thread pool.
	pool = CreateThreadpool(NULL);

	if (NULL == pool) {
		_tprintf(_T("CreateThreadpool failed. LastError: %u\n"), GetLastError());
		goto main_cleanup;
	}

	rollback = 1; // pool creation succeeded

				  // The thread pool is made persistent simply by setting both the minimum and maximum threads to 1.
	SetThreadpoolThreadMaximum(pool, 1);
	bRet = SetThreadpoolThreadMinimum(pool, 1);

	if (FALSE == bRet) {
		_tprintf(_T("SetThreadpoolThreadMinimum failed. LastError: %u\n"),GetLastError());
		goto main_cleanup;
	}

	// Create a cleanup group for this thread pool.
	cleanupgroup = CreateThreadpoolCleanupGroup();

	if (NULL == cleanupgroup) {
		_tprintf(_T("CreateThreadpoolCleanupGroup failed. LastError: %u\n"),GetLastError());
		goto main_cleanup;
	}

	rollback = 2;  // Cleanup group creation succeeded

				   // Associate the callback environment with our thread pool.
	SetThreadpoolCallbackPool(&CallBackEnviron, pool);

	// Associate the cleanup group with our thread pool.
	// Objects created with the same callback environment as the cleanup group become members of the cleanup group.
	SetThreadpoolCallbackCleanupGroup(&CallBackEnviron, //回调环境
		cleanupgroup,                                   //Cleanup Group
		NULL);                                          //Cleanup Group的回调函数,当释放其所包含的对象之前先调用该回调函数


	// Create work with the callback environment.
	work = CreateThreadpoolWork(workcallback, //回调函数
		NULL,                                 //回调函数的输入参数
		&CallBackEnviron);                    //回调环境

	if (NULL == work) {
		_tprintf(_T("CreateThreadpoolWork failed. LastError: %u\n"), GetLastError());
		goto main_cleanup;
	}

	rollback = 3;  // Creation of work succeeded


				   // Submit the work to the pool. Because this was a pre-allocated work item (using CreateThreadpoolWork), it is guaranteed to execute.
	SubmitThreadpoolWork(work);	

	// Create a timer with the same callback environment.
	timer = CreateThreadpoolTimer(timercallback, //回调函数
		NULL,                                    //回调函数的输入参数
		&CallBackEnviron);                       //回调环境


	if (NULL == timer) {
		_tprintf(_T("CreateThreadpoolTimer failed. LastError: %u\n"), GetLastError());
		goto main_cleanup;
	}

	rollback = 4;  // Timer creation succeeded

				   // Set the timer to fire in one second.
	ulDueTime.QuadPart = (ULONGLONG)-(1 * 10 * 1000 * 1000);
	FileDueTime.dwHighDateTime = ulDueTime.HighPart;
	FileDueTime.dwLowDateTime = ulDueTime.LowPart;

	SetThreadpoolTimer(timer, //线程池定时器对象
		&FileDueTime,         //到期时间
		0,                    //周期时期,为0则表示一次性定时器
		0);                   //操作系统调用回调函数的最大延迟时间


	// Delay for the timer to be fired
	Sleep(1500);


	// Wait for all callbacks to finish.
	// CloseThreadpoolCleanupGroupMembers also releases objects that are members of the cleanup group, 
	// so it is not necessary to call close functions on individual objects after calling CloseThreadpoolCleanupGroupMembers.
	CloseThreadpoolCleanupGroupMembers(cleanupgroup, //Cleanup Group
		FALSE,                                       //为真则取消还未开始执行的pending的回调函数
		NULL);                                       //CleanupGroup回调函数的输入参数

	// Already cleaned up the work item with the
	// CloseThreadpoolCleanupGroupMembers, so set rollback to 2.
	rollback = 2;
	goto main_cleanup;

main_cleanup:
	// Clean up any individual pieces manually
	// Notice the fall-through structure of the switch.
	// Clean up in reverse order.

	switch (rollback) {
	case 4:
	case 3:
		// Clean up the cleanup group members.
		CloseThreadpoolCleanupGroupMembers(cleanupgroup,FALSE, NULL);
	case 2:
		// Clean up the cleanup group.
		CloseThreadpoolCleanupGroup(cleanupgroup);
	case 1:
		// Clean up the pool.
		CloseThreadpool(pool);
	default:
		break;
	}

	return 0;
}

.NET Framework的线程池实现[编辑]

命名空间System.Threading中的类ThreadPool提供一个线程池,该线程池可用于执行任务、发送工作项、处理异步 I/O、代表其他线程等待以及处理计时器。[3]

参看[编辑]

参考文献[编辑]

  1. ^ MSDN:Using the Thread Pool Functions
  2. ^ MSDN:Thread Pool API
  3. ^ MSDN: ThreadPool 类