Library tutorials & articles

Handling multiple socket read & write operations

A business logic thread pool

Our requirements for the business logic thread pool are that it should be flexible and capable of increasing and decreasing the number of worker threads as the load on the server dictates. Passing work items into the thread pool should be a non blocking operation so that the IO threads can operate at maximum efficiency but we need to be able to know when a work item hasnt been picked up by a thread within a certain time period so that we can add more threads to the pool. We also need to keep a track off the number of idle threads that we have and, every so often, reduce the number of threads in the pool to conserve resources in times of low server loading.

As you would probably expect, the thread pool uses IO Completion Ports to dispatch work items to worker threads. To be able to monitor how long a work item takes to be processed and therefore be able to work out when we need to add more threads to the pool we use an event. When we dispatch a work item to the IO Completion Port we wait on the event for a configurable timeout period. When a thread picks up a work item from the completion port the first thing that it does is signal the event. If all threads are busy when we dispatch our work item our timeout may expire before a thread signals the event. In this case we may wish to add another thread to the pool to deal with the work load. The dispatch code could look something like this:

  void CThreadPool::HandleDispatch(
      ULONG_PTR completionKey,
      DWORD dwNumBytes,
      OVERLAPPED *pOverlapped)
  {
      m_dispatchCompleteEvent.Reset();
      bool processed = false;
 
      m_workPort.PostStatus(completionKey, dwNumBytes, pOverlapped);
      // wait for someone to toggle the 'got message' event?
      bool threadStarted = false;
      while (!processed)
      {
        DWORD result = m_dispatchCompleteEvent.Wait(m_timeoutMillis);
        if (result == WAIT_OBJECT_0)
        {
            processed = true;
        }
        else if (result == WAIT_TIMEOUT)
        {
            if (!threadStarted && m_processingThreads == m_activeThreads && (size_t)m_activeThreads < m_maxThreads)
            {           
              StartWorkerThread();
              threadStarted = true;
            }
        }
        else
        {
            throw CWin32Exception(_T("CThreadPool::Dispatch()"), GetLastError());
        }
      }
  }

Whilst there are threads available to process the work items we don't need to start new threads. As soon as all of the threads in the pool are active we may timeout during the dispatch and then, if we're not already running with the maximum number of threads that we've been configured for, we start a new thread. The actual code is slightly more complex as it handles shutdown requests and adjusts the timeout when we're already running at our maximum number of threads. The dispatcher needs to know how many threads we have in the pool and how many of those threads are processing so each worker thread calls back to the thread pool to let the pool know what state it's in.

The problem with this piece of work item dispatch code is that it doesn't fulfill our requirement of being able to dispatch a work item to the pool in a non blocking fashion. To achieve that, we add another level of indirection, and another IO Completion Port.

Comments

  1. 08 May 2007 at 11:15
    It's up to you. You may select anyone you want, but better except some standart ports. Read documentation.
  2. 13 Feb 2004 at 01:46

    Hi there,


    How do I find the port of the machine for scoket connection.


    Thanks


    PPCC

  3. 01 Jan 1999 at 00:00

    This thread is for discussions of Handling multiple socket read & write operations.

Leave a comment

Sign in or Join us (it's free).

Len Holgate Len has been programming for over 20 years, having first started with a Sinclair ZX-80. Now he runs his own consulting company, JetByte Limited. JetByte provides contract programming and consultanc...

Related discussion

Related podcasts

  • Listener Feedback 67

    This mailbag episode includes FASM, scripts, sockets, SSL/TLS, HTTPS, Windows 7's XP mode, and more. Security Now wiki shownotes For 16kpbs versions, transcripts, and notes (including fixes), visit Steve's site: grc.com, also the home of the best disk maintenance and recovery utility ever written...

Want to stay in touch with what's going on? Follow us on twitter!