Business logic processing in a socket server

Page 2 of 4
  1. Introduction
  2. A business logic thread pool
  3. Doing the Work
  4. Conclusion

A business logic thread pool

Our requirements for the business logic thread pool are that it should be flexible and capable of increasing and decreasing the number of worker threads as the load on the server dictates. Passing work items into the thread pool should be a non blocking operation so that the IO threads can operate at maximum efficiency but we need to be able to know when a work item hasnt been picked up by a thread within a certain time period so that we can add more threads to the pool. We also need to keep a track off the number of idle threads that we have and, every so often, reduce the number of threads in the pool to conserve resources in times of low server loading.

As you would probably expect, the thread pool uses IO Completion Ports to dispatch work items to worker threads. To be able to monitor how long a work item takes to be processed and therefore be able to work out when we need to add more threads to the pool we use an event. When we dispatch a work item to the IO Completion Port we wait on the event for a configurable timeout period. When a thread picks up a work item from the completion port the first thing that it does is signal the event. If all threads are busy when we dispatch our work item our timeout may expire before a thread signals the event. In this case we may wish to add another thread to the pool to deal with the work load. The dispatch code could look something like this:

void CThreadPool::HandleDispatch(
      ULONG_PTR completionKey,
      DWORD dwNumBytes,
      OVERLAPPED *pOverlapped)
  {
      m_dispatchCompleteEvent.Reset();
      bool processed = false;
 
      m_workPort.PostStatus(completionKey, dwNumBytes, pOverlapped);
      // wait for someone to toggle the 'got message' event?
      bool threadStarted = false;
      while (!processed)
      {
        DWORD result = m_dispatchCompleteEvent.Wait(m_timeoutMillis);
        if (result == WAIT_OBJECT_0)
        {
            processed = true;
        }
        else if (result == WAIT_TIMEOUT)
        {
            if (!threadStarted && m_processingThreads == m_activeThreads && (size_t)m_activeThreads < m_maxThreads)
            {           
              StartWorkerThread();
              threadStarted = true;
            }
        }
        else
        {
            throw CWin32Exception(_T("CThreadPool::Dispatch()"), GetLastError());
        }
      }
  }

Whilst there are threads available to process the work items we don't need to start new threads. As soon as all of the threads in the pool are active we may timeout during the dispatch and then, if we're not already running with the maximum number of threads that we've been configured for, we start a new thread. The actual code is slightly more complex as it handles shutdown requests and adjusts the timeout when we're already running at our maximum number of threads. The dispatcher needs to know how many threads we have in the pool and how many of those threads are processing so each worker thread calls back to the thread pool to let the pool know what state it's in.

The problem with this piece of work item dispatch code is that it doesn't fulfill our requirement of being able to dispatch a work item to the pool in a non blocking fashion. To achieve that, we add another level of indirection, and another IO Completion Port.

Non blocking dispatch

To ensure that users wishing to dispatch a work item to the thread pool can do so without blocking we implement the user level dispatch function as follows:

void CThreadPool::Dispatch(
      ULONG_PTR completionKey,
      DWORD dwNumBytes /*= 0*/,
      OVERLAPPED *pOverlapped /*= 0*/)
  {
      if (completionKey == 0)
      {
        throw CException(_T("CThreadPool::Dispatch()"), _T("0 is an invalid value for completionKey"));
      }
      m_dispatchPort.PostStatus(completionKey, dwNumBytes, pOverlapped);
  }

The restriction on 0 valued completion keys is unfortunate but allows us to shut down the thread pool's dispatch thread by posting a 0 to its completion port. The thread pool now has two IO Completion Ports. The dispatch port is serviced by a single maintenance thread which executes the HandleDispatch() method to dispatch work items to the worker threads. Users dispatch without blocking and the maintenance thread dispatches in a blocking manner so that it can expand the thread pool when it needs to. The work item port is serviced by a variable number of threads. We've seen how we know when we need to expand the number of threads, now we'll look at how we reduce the number of threads when the work load is low.

Shutting down dormant threads

Often work items come in batches, the thread pool gets busy, expands, services all of the work items and then becomes less busy. At this point the pool contains threads which aren't being used but which are still consuming resources. These dormant threads can be safely shutdown as the pool can expand again as load increases. The question is, how do we decide when to shut down some threads?

The maintenance thread that handles our blocking dispatch also handles checking for dormant threads. Every so often (a configurable amount) the maintenance thread uses an algorithm to determine if it should shut some threads down. The current algorithm is as follows:

void CThreadPool::HandleDormantThreads()
  {
      if ((size_t)m_activeThreads > m_minThreads)
      {
        const size_t dormantThreads = m_activeThreads - m_processingThreads;
        if (dormantThreads > m_maxDormantThreads)
        {
            const size_t threadsToShutdown = (dormantThreads - m_maxDormantThreads) / 2 + 1;
            StopWorkerThreads(threadsToShutdown);
        }
      }
  }

If we have more threads than the minimum number we're allowed to have, find out how many threads aren't currently processing work items and if that number is more than the number of dormant threads that we're allowed to have, shut half of them down (rounding up). Stopping worker threads is a simple case of posting an IO completion key of 0 to the work port for each worker thread that we want to shut down.

You might also like...

Comments

About the author

Len Holgate United Kingdom

Len has been programming for over 20 years, having first started with a Sinclair ZX-80. Now he runs his own consulting company, JetByte Li...

Interested in writing for us? Find out more.

Contribute

Why not write for us? Or you could submit an event or a user group in your area. Alternatively just tell us what you think!

Our tools

We've got automatic conversion tools to convert C# to VB.NET, VB.NET to C#. Also you can compress javascript and compress css and generate sql connection strings.

“Some people, when confronted with a problem, think "I know, I’ll use regular expressions." Now they have two problems.” - Jamie Zawinski