Handling multiple socket read & write operations

A business logic thread pool

Our requirements for the business logic thread pool are that it should be flexible and capable of increasing and decreasing the number of worker threads as the load on the server dictates. Passing work items into the thread pool should be a non blocking operation so that the IO threads can operate at maximum efficiency but we need to be able to know when a work item hasnt been picked up by a thread within a certain time period so that we can add more threads to the pool. We also need to keep a track off the number of idle threads that we have and, every so often, reduce the number of threads in the pool to conserve resources in times of low server loading.

As you would probably expect, the thread pool uses IO Completion Ports to dispatch work items to worker threads. To be able to monitor how long a work item takes to be processed and therefore be able to work out when we need to add more threads to the pool we use an event. When we dispatch a work item to the IO Completion Port we wait on the event for a configurable timeout period. When a thread picks up a work item from the completion port the first thing that it does is signal the event. If all threads are busy when we dispatch our work item our timeout may expire before a thread signals the event. In this case we may wish to add another thread to the pool to deal with the work load. The dispatch code could look something like this:

  void CThreadPool::HandleDispatch(
      ULONG_PTR completionKey,
      DWORD dwNumBytes,
      OVERLAPPED *pOverlapped)
  {
      m_dispatchCompleteEvent.Reset();
      bool processed = false;
 
      m_workPort.PostStatus(completionKey, dwNumBytes, pOverlapped);
      // wait for someone to toggle the 'got message' event?
      bool threadStarted = false;
      while (!processed)
      {
        DWORD result = m_dispatchCompleteEvent.Wait(m_timeoutMillis);
        if (result == WAIT_OBJECT_0)
        {
            processed = true;
        }
        else if (result == WAIT_TIMEOUT)
        {
            if (!threadStarted && m_processingThreads == m_activeThreads && (size_t)m_activeThreads < m_maxThreads)
            {           
              StartWorkerThread();
              threadStarted = true;
            }
        }
        else
        {
            throw CWin32Exception(_T("CThreadPool::Dispatch()"), GetLastError());
        }
      }
  }

Whilst there are threads available to process the work items we don't need to start new threads. As soon as all of the threads in the pool are active we may timeout during the dispatch and then, if we're not already running with the maximum number of threads that we've been configured for, we start a new thread. The actual code is slightly more complex as it handles shutdown requests and adjusts the timeout when we're already running at our maximum number of threads. The dispatcher needs to know how many threads we have in the pool and how many of those threads are processing so each worker thread calls back to the thread pool to let the pool know what state it's in.

The problem with this piece of work item dispatch code is that it doesn't fulfill our requirement of being able to dispatch a work item to the pool in a non blocking fashion. To achieve that, we add another level of indirection, and another IO Completion Port.

You might also like...

Comments

About the author

Len Holgate United Kingdom

Len has been programming for over 20 years, having first started with a Sinclair ZX-80. Now he runs his own consulting company, JetByte Li...

Interested in writing for us? Find out more.

Contribute

Why not write for us? Or you could submit an event or a user group in your area. Alternatively just tell us what you think!

Our tools

We've got automatic conversion tools to convert C# to VB.NET, VB.NET to C#. Also you can compress javascript and compress css and generate sql connection strings.

“Beware of bugs in the above code; I have only proved it correct, not tried it.” - Donald Knuth