Divide and conquer is a good strategy for partitioning a large job, provided you don't divide too much. Windows NT helps you guess right.
Introduction
I/O Completion Ports (IOCPs) are often cited as the best way to write scalable, multithreaded servers. In general, however, they can also be applied to any problem requiring a pool of worker threads. This article introduces code that encapsulates the functionality of Windows NT IOCPs and their associated worker threads in reusable C++ classes. It then demonstrates a multithreaded word-count program that uses these classes.
I/O Completion Port Overview
I/O Completion Ports are a mechanism provided by Windows NT version 3.5 and higher. They provide the most efficient way possible to do overlapped file I/O. Calling the function
CreateIoCompletionPort(HANDLE hFile, HANDLE hExistingPort, DWORD dwKey, DWORD dwConcurrency)associates an overlapped file handle (which may represent a disk file, socket, named pipe, etc.) with the IOCP. From that point on, any I/O completion events for that handle are queued asynchronously to the port.
To service these events, you then create one or more worker threads that wait on the port by calling the function
GetQueuedCompletionStatus( HANDLE hIOCP, LPDWORD lpdwBytesTransferred, LPDWORD lpdwKey, LPOVERLAPPED* lpOverlapped, DWORD dwTimeout)When a completion packet arrives, one of these waiting threads is automatically unblocked to handle the event. The operating system (OS) unblocks IOCP threads in a last in, first out (LIFO) order, since threads that have run recently are less likely to have had their resources swapped out to disk. The dwConcurrency parameter to CreateIoCompletionPort is significant: the OS will attempt to keep the number of concurrently running threads for this IOCP at the value you specify. (A value of zero will cause the IOCP to automatically allocate one thread per CPU, which is the recommended default.) The best part is, if a worker thread blocks, the OS notices this and automatically unblocks another worker thread to keep the concurrency at the desired level. This kind of load balancing would be difficult or impossible to do at the application level.
The function that allows IOCPs to serve as a general-purpose thread pooling mechanism is as follows:
PostQueuedCompletionStatus( HANDLE hIOCP, DWORD dwBytesTransferred, DWORD dwKey, LPOVERLAPPED lpOverlapped).This function allows any thread in the same process to communicate with the worker threads, independent of any I/O related communication. Through creative use of the dwKey parameter you can send any kind of information you want to a worker thread.
When I was first learning about IOCPs, I was surprised at how simple the APIs are. I expected some additional API function to formally associate a thread with the completion port, but there is none. Any thread that calls GetQueuedCompletionStatus is automatically associated with that IOCP. While I like the fact that threads can be dynamically associated with an IOCP, for the kinds of programs I write I want a more structured approach. The classes explained below are the result of my desire to more closely integrate the notion of the IOCP-based thread pool and the worker threads that attend to it.
The CThreadPoolClass
CThreadPool (see Listing 1) is a wrapper class for an IOCP that is responsible for creating and destroying the IOCP and managing the worker threads associated with it. The CThreadPool constructor takes two parameters: the number of worker threads to create and the desired thread concurrency. Both parameters have defaults: the number of threads defaults to two times the number of CPUs plus two (a value recommended by the book Multithreaded Applications in Win32 [1]). The concurrency defaults to one thread per CPU. To make use of this class, you must first derive from it, since CThreadPool contains the pure virtual function
virtual CWorkerThread* CreateWorkerThread( CThreadPool* pPool ) = 0;which is called n times by CThreadPool,where n is the number of worker threads indicated in the constructor. You can usually implement this function as a oneliner, returning the result of operator new on your CWorkerThread-derived class.
Once you have instantiated a CThreadPool-derived object, call Start to make it begin running. Start creates an array of worker threads as explained above, and then calls each thread's Start function. The end result is that the worker threads begin waiting for the IOCP to receive completion packets. Of course, a completion packet will never get sent to the port unless you either associate an overlapped file handle with the port, or manually post a completion packet to the port.
To associate a file with the port, call CThreadPool's AssociateFile(HANDLE hFile, DWORD dwKey) member function. This function forms a simple wrapper for the Windows NT CreateIoCompletionPort function. From that point on, any I/O operations on the specified handle will result in a completion packet being sent to the port. As explained below, this will ultimately result in a call to CWorkerThread::OnReceivedCompletionPacket. The dwKey parameter is called the "completion key" in the Win32 documentation. This parameter usually represents a user-defined context object that contains state information about the associated handle. I show an example of this when I describe the sample application below.
To manually post a completion packet to the IOCP, call CThreadPool's PostQueuedCompletionStatus member function. You can actually build an entire application around CThreadPool without ever associating a file handle with the pool. You can control a pool of worker threads simply by passing different key values to PostQueuedCompletionStatus. The possibilities are unlimited. The key could be a pointer to a user-defined structure indicating a portion of a word-processing document to spell-check. It could be an index into an array of arrays that need to be sorted, or anything else you can dream up.
To shut down a thread pool and destroy the worker threads, call CThreadPool::Stop. Deciding when to call this function is important: for instance, if you were building a server in a Windows NT Service, you would probably call Stop when the service was about to shut down. In this case, the lifetime of the thread pool would be dependent on the lifetime of the application. However, if the reverse were true (i.e., the lifetime of the application were determined by how much work the worker threads needed to accomplish) you would need to build some other means into your program to know when it was safe to shut down the thread pool. This is one of the issues I faced in the example program discussed below.
The Stop function stops all worker threads by repeatedly calling PostQueuedCompletionStatus with the special key value STOP_WORKER_THREAD, which is explained below. In the same way that worker thread creation is handled by calling an overridden version of CThreadPool::CreateWorkerThread, thread destruction is handled by calling DestroyWorkerThread(CWorkerThread* pThread) once for each thread created. By default, CThreadPool assumes ownership of the allocated worker thread objects, so the default implementation of DestroyWorkerThread simply calls delete on the pointer passed in. If you want to manage the lifetime of your worker thread objects yourself, you can override this virtual function.
The CWorkerThread Class
The CWorkerThread class (Listing 2) is an abstraction of a Win32 thread, specially designed to work with CThreadPool. A single instance of class CWorkerThread encapsulates a single Win32 thread. Like CThreadPool, you cannot use CWorkerThread directly. Instead, you must derive your own class from it and override the OnReceivedCompletionPacket function. This is the virtual function called by CThreadPool when a completion packet is received on the IOCP.
Although you could instantiate and start an instance of CWorkerThread manually, there is no need to do so. CThreadPool instantiates all the worker threads it needs via the call to CreateWorkerThread. The thread pool starts each worker thread by calling CWorkerThread::Start. A C++ member function cannot be used as a Win32 thread function (because of the hidden this pointer), so CWorkerThread employs the usual workaround of making a static member function (ThreadEntryProc) the thread function and casting the DWORD parameter back into a CWorkerThread pointer. ThreadEntryProc then calls the member function ThreadMain through this pointer. Note that CWorkerThread uses a manual-reset event object to insure that the Start function does not return until the newly created thread has begun running.
The heart of CWorkerThread is in its implementation of ThreadMain. ThreadMain's body consists of a loop that repeatedly calls GetQueuedCompletionStatus on the parent CThreadPool object's IOCP. This is all it takes to associate the thread with the IOCP. When a completion packet is received, one of two things occurs. Either a normal completion packet is received, in which case ThreadMain calls the derived CWorkerThread object's OnReceivedCompletionPacket function to handle the event; or, a special packet with a dwKey value of STOP_WORKER_THREAD is received. This is a special value known to CThreadPool and CWorkerThread, which causes the worker thread to exit. This method gives CThreadPool a way to perform an orderly shutdown of the thread pool and associated IOCP.
The Multithreaded WordCount Program
MTWC (Listing 3) is a multithreaded variation on the venerable WC word-counting program. This Windows NT console-based program takes one or more filenames as arguments and reports back the total size, number of words, and number of lines in each file (see Table 1) .
A standard implementation of WC would use a single thread. It would simply iterate through the list of files provided performing the count on each one. This approach is not scalable and would not benefit at all from running on a machine with more than one processor. Another approach might be to spawn a thread for each file and gather the results at the end. Unfortunately, this approach runs out of steam when the number of files being processed is large (say, 100 files), because of the high resource demand that number of threads would place on the OS. Furthermore, as the number of threads increases the OS spends more and more of its time just performing context switches between threads and not getting any real work done. This approach could also make for problems in gathering the results of the worker threads, since the standard means of waiting for multiple threads to complete (WaitForMultipleObjects) has a limit of 64 handles.
MTWC uses the IOCP-based thread pooling classes explained above to spread the work of counting words in n files among a small number of threads. The number of worker threads and the thread concurrency are specified by the constants NUM_WORKER_THREADS and THREAD_CONCURRENCY in mtwc.h. Probably the best way to get an initial feel for how the program works is to sprinkle output statements throughout the program and vary the values of these two constants. (Including the result of GetCurrentThreadId in the output is useful.)
Since a fixed number of threads must process a potentially large number of files, we need some place to store context information for each file. The context in this case consists of the file handle, name, OVERLAPPED structure, state (i.e. unopened, opened, in the process of reading, etc.), and of course the counts for characters, words, and lines. These attributes are encapsulated in the CWordCountContext class. MTWC's main function allocates a CWordCountContext instance for each file specified on the command line. It then kicks off the worker threads by calling CThreadPool::PostQueuedCompletionStatus once for each file, passing in the CWordCountContext object for that file.
The bulk of the implementation code in MTWC resides in CWordCountWorker::OnReceivedCompletionPacket. This function examines the current state of the context passed in via the dwKey parameter and takes the appropriate action. The context will be in one of the following states:
- STATE_UNOPENED Since the CWordCountContext object's initial state is UNOPENED, the first thing the worker thread does is attempt to open the file. If the file cannot be opened, the thread sets the state to STATE_ERROR_OPENING. Otherwise, the worker thread kicks off the first overlapped read against the file. As a default I have set the worker threads to read 4 KB at a time. The input buffer is a part of the CWordCountContext object.
- STATE_OPENED This state indicates that the file is currently being read. If the worker thread reaches the end of file, it closes the file. Otherwise, the thread counts the characters, words, and lines in the context's input buffer and issues the next overlapped read, after adjusting the offset value in the context's OVERLAPPED structure.
- STATE_CLOSED, STATE_ERROR_OPENING, STATE_ERROR_READING When a file is successfully read and closed, the thread places the context into the STATE_CLOSED state. If the file cannot be opened, the thread sets its state to STATE_ERROR_OPENING. If an error occurs while the thread is reading the file, it sets the state to STATE_ERROR_READING. The program distinguishes between the three states so it can report a specific error when the results are displayed.
As I alluded to previously, when the application lifetime is driven by the lifetime of the worker threads (the opposite of a traditional server application) you must build your own means of knowing when it is appropriate to shut down the thread pool. To accomplish this, I store the number of files to process in my CThreadPool-derived class, CWordCountThreadPool. Whenever one of the worker threads closes a file, it also notifies the thread pool object that it is done using the file by calling CWordCountThreadPool::ProcessedFile. This action causes the count to be decremented. When the value reaches zero the class signals a manual-reset event object. main waits on this event by calling CWordCountThreadPool::WaitUntilDone. Only after this function returns does main call the thread pool's Stop function.
Further Uses
Although MTWC is not a server, it still represents a fairly traditional use of IOCPs, since it deals with overlapped files. However, there are plenty of non file-based applications that can benefit from this type of thread pooling. Imagine an image-processing application that needs to perform some sort of transformation on a large bitmapped image. The standard approach would be to scan over the bits of the image in sequence, using a single thread. An alternate approach that would work well with the IOCP classes I've described would break the image into n pieces and create a context object for each chunk, indicating its area within the master image. The next step would be to fire off n calls to CThreadPool::PostQueuedCompletionStatus, passing in the appropriate instance of the context object. A synchronization mechanism similar to the one in MTWC could be used to gather the results of the different worker threads when they are done with their respective sections of the image.
Multithreading with NT I/O Completion Ports is not just for servers. Any application that can be designed to process its data in parallel can benefit from using IOCP-based thread pooling. Applications built this way will automatically benefit when run on the coming wave of multiple-processor machines. o
References
Jim Beveridge and Robert Wiener. Multithreading Applications in Win32 (Addison-Wesley Developers Press, 1997).
Ralph Davis. Win32 Network Programming (Addison-Wesley Developers Press, 1996).
John Vert. "Writing Scalable Applications for Windows NT," Microsoft Developers Network Library, http://premium.microsoft.com/msdn/library/techart/scalabil.htm.
"Writing Great Windows NT Server Applications," Microsoft Developers Network Library, http://premium.microsoft.com/msdn/library/bkgrnd/svrappol.htm.
Kevin Manley is a Systems Software Developer for Health Systems Technologies, Inc. in Seattle, WA. He has a BS degree in Electrical Engineering from Worcester Polytechnic Institute. He may be reached at kmanley@hst.com.