//
// Partial listing of threadPool.h
//
class CThreadPool {
public:
CThreadPool( DWORD numThreads = 0, DWORD threadConcurrency = 0 );
virtual ~CThreadPool();
void Start();
void Stop();
BOOL IsStarted() const ;
void WaitForThreadCompletion();
virtual CWorkerThread*
CreateWorkerThread( CThreadPool* pPool ) = 0 ;
virtual void DestroyWorkerThread( CWorkerThread* pThread );
BOOL
GetQueuedCompletionStatus( LPDWORD lpNumberOfBytesTransferred,
LPDWORD lpdwKey, LPOVERLAPPED *lpOverlapped,
DWORD dwTimeout = INFINITE );
BOOL AssociateFile( HANDLE hFile, DWORD dwKey );
BOOL PostQueuedCompletionStatus( DWORD dwKey,
DWORD dwBytesTransferred=0, LPOVERLAPPED lpOverlapped=NULL );
protected:
void createWorkerThreads();
void destroyWorkerThreads();
void startWorkerThreads();
void stopWorkerThreads();
protected:
BOOL m_bStarted ;
DWORD m_NumThreads ;
DWORD m_ThreadConcurrency ;
HANDLE m_hCompletionPort ;
CWorkerThread** m_ThreadArray ;
};
// ------------------------------------------------------------------
// Partial listing of threadPool.cpp
CThreadPool::CThreadPool( DWORD numWorkerThreads,
DWORD threadConcurrency ) :
m_bStarted( FALSE ),
m_NumThreads( numWorkerThreads ),
m_ThreadConcurrency( threadConcurrency ),
m_hCompletionPort( INVALID_HANDLE_VALUE ),
m_ThreadArray( NULL )
{
assert( numWorkerThreads >= 0 && numWorkerThreads <= 64 );
if( m_NumThreads == 0 ) {
// if default value of 0 was used, create n
// threads where n = 2 * number of processors + 2
SYSTEM_INFO sysinfo ;
GetSystemInfo( &sysinfo ) ;
m_NumThreads = sysinfo.dwNumberOfProcessors * 2 + 2;
} // if
m_ThreadArray = new CWorkerThread*[ m_NumThreads ];
}
void CThreadPool::Start()
{
if( ! m_bStarted ) {
m_hCompletionPort =
CreateIoCompletionPort( INVALID_HANDLE_VALUE, NULL, 0,
m_ThreadConcurrency );
assert( m_hCompletionPort != NULL );
createWorkerThreads();
startWorkerThreads();
m_bStarted = TRUE ;
} // if
}
void CThreadPool::Stop()
{
if( m_bStarted ) {
stopWorkerThreads();
destroyWorkerThreads();
CloseHandle( m_hCompletionPort );
m_bStarted = FALSE ;
} // if
}
BOOL CThreadPool::AssociateFile( HANDLE hFile, DWORD dwKey )
{
// Associate the file handle with our existing completion port
HANDLE hResult =
CreateIoCompletionPort( hFile, m_hCompletionPort, dwKey,
m_ThreadConcurrency );
return (hResult != NULL );
}
void CThreadPool::createWorkerThreads()
{
// Instantiate threads by calling virtual function
// in derived class
for( DWORD i=0; i<m_NumThreads; i++ ) {
m_ThreadArray[ i ] = CreateWorkerThread( this );
assert( m_ThreadArray[ i ] != NULL );
} // for
}
void CThreadPool::destroyWorkerThreads()
{
for( DWORD i=0; i<m_NumThreads; i++ ) {
DestroyWorkerThread( m_ThreadArray[i] );
m_ThreadArray[ i ] = NULL ;
} // for
}
void CThreadPool::startWorkerThreads()
{
for( DWORD i=0; i<m_NumThreads; i++ ) {
m_ThreadArray[i]->Start();
} // for
}
void CThreadPool::stopWorkerThreads()
{
for( DWORD i=0; i<m_NumThreads; i++ ) {
BOOL bResult =
PostQueuedCompletionStatus( STOP_WORKER_THREAD );
assert( bResult == TRUE );
} // while
WaitForThreadCompletion();
}
/* End of File */