Business logic processing in a socket server

Doing the Work

Doing the work

We now have a thread pool that fulfills our requirements of automatic expansion and contraction depending upon load and non blocking dispatch for users. The remaining thing to do is allow the derived class to provide its own WorkerThread class to do the work. The worker thread class must implement the following interface:

  virtual bool Initialise();
  virtual void Process(
      ULONG_PTR completionKey,
      DWORD dwNumBytes,
      OVERLAPPED *pOverlapped) = 0;
  virtual void Shutdown();

Initialise() is called when it's first created, Shutdown() is called when the thread is terminating and Process() is called for each work item.

A socket server with a business logic thread pool

Now that we have a suitable thread pool we can integrate it with our fake POP3 socket server so that the actual processing of commands can occur in the business logic pool whilst the IO pool is left to get on with the IO operations. We can also move socket closure off to the business logic pool so that we don't block the IO threads with a lingering socket close.

The first thing we need to do is create and configure our thread pool. Then we can pass a reference to it to our socket server class so that it can pass a reference to it to our IO threads.

  CThreadPool pool(
      5,                            // initial number of threads to create
      5,                            // minimum number of threads to keep in the pool
      10,                          // maximum number of threads in the pool
      5,                            // maximum number of "dormant" threads
      5000,                        // pool maintenance period (millis)
      100,                          // dispatch timeout (millis)
      10000);                      // dispatch timeout for when pool is at max threads
  pool.Start();
  CSocketServer server(
      INADDR_ANY,                  // address to listen on
      5001,                        // port to listen on
      10,                          // max number of sockets to keep in the pool
      10,                          // max number of buffers to keep in the pool
      1024,                        // buffer size
      pool);
  server.Start();

When our socket server has a complete, distinct, message to process it can dispatch it to the thread pool for processing, rather than processing it on one of its IO threads.

  void CSocketServer::ProcessCommand(
      CSocketServer::Socket *pSocket,
      CIOBuffer *pBuffer)
  {
      pSocket->AddRef();
      pBuffer->AddRef();
      m_pool.Dispatch(reinterpret_cast(pSocket), 0, pBuffer->GetAsOverlapped());
  }

Since we're passing the socket and IO buffer to another thread we have to increment their reference counts so that they dont get cleared up from underneath us. Over in our business logic thread we can finally process the message, and then release the references we took on the socket and IO buffer.

void CThreadPoolWorkerThread::Process(
  ULONG_PTR completionKey,
  DWORD operation,
  OVERLAPPED *pOverlapped)
{
  Socket *pSocket = reinterpret_cast(completionKey);
  CIOBuffer *pBuffer = CIOBuffer::FromOverlapped(pOverlapped);
 
  ProcessMessage(pSocket, pBuffer);
  pSocket->Release();
  pBuffer->Release();
}

Since the socket class marshals all IO requests back to the IO thread pool we can safely make read and write requests from our business logic thread even though the thread may be terminated before the IO requests completes.

Maintaining per-connection state

The final thing that our server may need to do is associate some internal server state with a particular socket connection, the Socket class makes this particularly easy as it provides the following member functions:

  void *GetUserPtr() const;
  void SetUserPtr(void *pData);
  unsigned long GetUserData() const;
  void SetUserData(unsigned long data);

These provide access to a single void * user data pointer which is stored in the Socket. The common usage pattern for this user data is as follows: When the connection is established the socket server is notified by OnConnectionEstablished(), the server can allocate a new per-connection data structure and associate it with the socket passed to OnConnectionEstablished() by calling SetUserPtr() in subsequent read and write completions the pointer to the per-connection user data structure can be extracted with GetUserPtr(). When the connection is terminated the server is notified by OnConnectionClosed and the per-connection user data can be retrieved and deleted.

Although there are two versions of the user data access functions, one for a void * and one for an unsigned long there is only a single storage location. The two versions are merely for convenience and to reduce casting if the user data required is simply an index into an internal server structure rather than a pointer.

The example server marshals the OnConnectionEstablished() and OnConnectionClosed() calls across to the business logic thread pool and maintains some fairly trivial per-connection user data there. The data we maintain is the address of the client connection (obtained from the buffer passed into OnConnectionEstablished() and the number of messages that have been processed on this particular connection.

You might also like...

Comments

About the author

Len Holgate United Kingdom

Len has been programming for over 20 years, having first started with a Sinclair ZX-80. Now he runs his own consulting company, JetByte Li...

Interested in writing for us? Find out more.

Contribute

Why not write for us? Or you could submit an event or a user group in your area. Alternatively just tell us what you think!

Our tools

We've got automatic conversion tools to convert C# to VB.NET, VB.NET to C#. Also you can compress javascript and compress css and generate sql connection strings.

“Perl - The only language that looks the same before and after RSA encryption.” - Keith Bostic