// // Partial listing of threadPool.h // class CThreadPool { public: CThreadPool( DWORD numThreads = 0, DWORD threadConcurrency = 0 ); virtual ~CThreadPool(); void Start(); void Stop(); BOOL IsStarted() const ; void WaitForThreadCompletion(); virtual CWorkerThread* CreateWorkerThread( CThreadPool* pPool ) = 0 ; virtual void DestroyWorkerThread( CWorkerThread* pThread ); BOOL GetQueuedCompletionStatus( LPDWORD lpNumberOfBytesTransferred, LPDWORD lpdwKey, LPOVERLAPPED *lpOverlapped, DWORD dwTimeout = INFINITE ); BOOL AssociateFile( HANDLE hFile, DWORD dwKey ); BOOL PostQueuedCompletionStatus( DWORD dwKey, DWORD dwBytesTransferred=0, LPOVERLAPPED lpOverlapped=NULL ); protected: void createWorkerThreads(); void destroyWorkerThreads(); void startWorkerThreads(); void stopWorkerThreads(); protected: BOOL m_bStarted ; DWORD m_NumThreads ; DWORD m_ThreadConcurrency ; HANDLE m_hCompletionPort ; CWorkerThread** m_ThreadArray ; }; // ------------------------------------------------------------------ // Partial listing of threadPool.cpp CThreadPool::CThreadPool( DWORD numWorkerThreads, DWORD threadConcurrency ) : m_bStarted( FALSE ), m_NumThreads( numWorkerThreads ), m_ThreadConcurrency( threadConcurrency ), m_hCompletionPort( INVALID_HANDLE_VALUE ), m_ThreadArray( NULL ) { assert( numWorkerThreads >= 0 && numWorkerThreads <= 64 ); if( m_NumThreads == 0 ) { // if default value of 0 was used, create n // threads where n = 2 * number of processors + 2 SYSTEM_INFO sysinfo ; GetSystemInfo( &sysinfo ) ; m_NumThreads = sysinfo.dwNumberOfProcessors * 2 + 2; } // if m_ThreadArray = new CWorkerThread*[ m_NumThreads ]; } void CThreadPool::Start() { if( ! m_bStarted ) { m_hCompletionPort = CreateIoCompletionPort( INVALID_HANDLE_VALUE, NULL, 0, m_ThreadConcurrency ); assert( m_hCompletionPort != NULL ); createWorkerThreads(); startWorkerThreads(); m_bStarted = TRUE ; } // if } void CThreadPool::Stop() { if( m_bStarted ) { stopWorkerThreads(); destroyWorkerThreads(); CloseHandle( m_hCompletionPort ); m_bStarted = FALSE ; } // if } BOOL CThreadPool::AssociateFile( HANDLE hFile, DWORD dwKey ) { // Associate the file handle with our existing completion port HANDLE hResult = CreateIoCompletionPort( hFile, m_hCompletionPort, dwKey, m_ThreadConcurrency ); return (hResult != NULL ); } void CThreadPool::createWorkerThreads() { // Instantiate threads by calling virtual function // in derived class for( DWORD i=0; i<m_NumThreads; i++ ) { m_ThreadArray[ i ] = CreateWorkerThread( this ); assert( m_ThreadArray[ i ] != NULL ); } // for } void CThreadPool::destroyWorkerThreads() { for( DWORD i=0; i<m_NumThreads; i++ ) { DestroyWorkerThread( m_ThreadArray[i] ); m_ThreadArray[ i ] = NULL ; } // for } void CThreadPool::startWorkerThreads() { for( DWORD i=0; i<m_NumThreads; i++ ) { m_ThreadArray[i]->Start(); } // for } void CThreadPool::stopWorkerThreads() { for( DWORD i=0; i<m_NumThreads; i++ ) { BOOL bResult = PostQueuedCompletionStatus( STOP_WORKER_THREAD ); assert( bResult == TRUE ); } // while WaitForThreadCompletion(); } /* End of File */