//
// partial listing of mtwc.h
//
const int NUM_WORKER_THREADS = 0 ; // use default, 2 * #CPUs + 2
const int THREAD_CONCURRENCY = 0 ; // use default, 1 per CPU
const int READ_BUFFER_SIZE = 4096 ;
class CWordCountContext {
public:
CWordCountContext();
enum enState {
STATE_UNOPENED,
STATE_OPENED,
STATE_CLOSED,
STATE_ERROR_OPENING,
STATE_ERROR_READING
};
LPSTR lpszFilename ;
HANDLE hFile ;
OVERLAPPED overlapped ;
char buffer[ READ_BUFFER_SIZE ];
enState state;
int spaceState ;
DWORD dwNumChars ;
DWORD dwNumWords ;
DWORD dwNumLines ;
};
class CWordCountThreadPool : public CThreadPool {
public:
CWordCountThreadPool( DWORD numThreads, DWORD threadConcurrency,
DWORD numFiles );
~CWordCountThreadPool();
CWorkerThread* CreateWorkerThread( CThreadPool* pPool );
void WaitUntilDone();
void ProcessedFile();
protected:
long m_NumFiles ;
HANDLE m_hDone ;
};
class CWordCountWorker : public CWorkerThread {
public:
CWordCountWorker( CWordCountThreadPool* pPool ) :
CWorkerThread( pPool ) {}
void OnReceivedCompletionPacket( BOOL bResult,
DWORD dwNumberOfBytesTransferred, DWORD dwKey,
LPOVERLAPPED lpOverlapped );
protected:
void readFile( CWordCountContext* aCtx );
void closeFile( CWordCountContext* aCtx );
};
/ -------------------------------------------------------------------
// Partial listing of mtwc.cpp
void CWordCountWorker::OnReceivedCompletionPacket(
BOOL bResult, DWORD dwNumberOfBytesTransferred, DWORD dwKey,
LPOVERLAPPED lpOverlapped )
{
CWordCountContext* pContext =
reinterpret_cast<CWordCountContext*>(dwKey);
if( pContext->state == CWordCountContext::STATE_UNOPENED ) {
pContext->hFile = CreateFile( pContext->lpszFilename,
GENERIC_READ, FILE_SHARE_READ, NULL, OPEN_EXISTING,
FILE_FLAG_OVERLAPPED | FILE_FLAG_SEQUENTIAL_SCAN, NULL );
if( pContext->hFile == INVALID_HANDLE_VALUE ) {
pContext->state = CWordCountContext::STATE_ERROR_OPENING;
closeFile( pContext );
} else {
// Associate overlapped operations on this file
// with our I/O completion port
m_pThreadPool->AssociateFile( pContext->hFile,
reinterpret_cast<DWORD>(pContext) );
pContext->state = CWordCountContext::STATE_OPENED ;
readFile( pContext ); // kick off first read...
} // else
} else if( pContext->state == CWordCountContext::STATE_OPENED ) {
if( dwNumberOfBytesTransferred == 0 ) {
// we are at end of file
closeFile( pContext );
} else {
// This is the guts of the word-counting...
for( DWORD i=0; i<dwNumberOfBytesTransferred; i++ ) {
char c = pContext->buffer[i];
if( c == '\n' ) {
pContext->dwNumLines++ ;
} else if( isspace( c ) ) {
pContext->spaceState = 0;
} else if( pContext->spaceState == 0 ) {
pContext->spaceState = 1 ;
pContext->dwNumWords++ ;
} // else
} // for
pContext->dwNumChars += dwNumberOfBytesTransferred ;
lpOverlapped->Offset += dwNumberOfBytesTransferred ;
readFile( pContext );
} // else
} else {
// unknown state--should never happen
assert(0);
} // else
}
void CWordCountWorker::readFile( CWordCountContext* pContext )
{
BOOL bResult = ReadFile( pContext->hFile, &(pContext->buffer),
READ_BUFFER_SIZE, NULL, &(pContext->overlapped) );
if( bResult == FALSE ) {
DWORD dwLastError = GetLastError();
if( dwLastError == ERROR_IO_PENDING ) {
// asynchronous read was queued, this is normal result...
} else if( dwLastError == ERROR_HANDLE_EOF ) {
closeFile( pContext );
} else {
pContext->state = CWordCountContext::STATE_ERROR_READING;
closeFile( pContext );
} // else
} // if
}
void CWordCountWorker::closeFile( CWordCountContext* pContext )
{
pContext->state = CWordCountContext::STATE_CLOSED ;
if( pContext->hFile != INVALID_HANDLE_VALUE ) {
CloseHandle( pContext->hFile );
} // if
dynamic_cast<CWordCountThreadPool*>(
m_pThreadPool)->ProcessedFile();
}
CWordCountThreadPool::CWordCountThreadPool( DWORD numThreads,
DWORD threadConcurrency, DWORD numFiles ) :
CThreadPool( numThreads, threadConcurrency ),
m_NumFiles( numFiles )
{
assert( numFiles > 0 );
m_hDone = CreateEvent( NULL, TRUE, FALSE, NULL );
}
void CWordCountThreadPool::ProcessedFile()
{
InterlockedDecrement( &m_NumFiles );
if( m_NumFiles == 0 ) {
SetEvent( m_hDone );
} // if
}
int main( int argc, char* argv[] )
{
CWordCountContext* aCtx = NULL ;
try {
if( argc < 2 ) {
cerr << "Usage: mtwc <file1> [ <file2> ... <filen> ]"
<< endl
<< "Counts lines, words, chars in files and reports"
" results" << endl;
return 0 ;
} // if
int numFiles = argc-1 ; // -1 since program name
// is first argument
aCtx = new CWordCountContext[ numFiles ];
CWordCountThreadPool threadPool(NUM_WORKER_THREADS,
THREAD_CONCURRENCY, numFiles);
threadPool.Start();
for( int i=0; i<numFiles; i++ ) {
// assign file name to context
aCtx[i].lpszFilename = argv[i+1];
// kick off a worker thread
threadPool.PostQueuedCompletionStatus(
reinterpret_cast<DWORD>( &(aCtx[i]) ) );
} // for
// wait for threads to complete...
threadPool.WaitUntilDone();
threadPool.Stop();
reportResults( aCtx, numFiles );
} catch( ... ) {
cerr << "Unhandled exception" << endl ;
} // catch
if( aCtx != NULL ) {
delete [] aCtx ;
} // if
return 0 ;
}
/* End of File */