Doing the work
We now have a thread pool that fulfills our requirements of automatic expansion and contraction depending upon load and non blocking dispatch for users. The remaining thing to do is allow the derived class to provide its own WorkerThread
class to do the work. The worker thread class must implement the following interface:
virtual bool Initialise();
virtual void Process(
ULONG_PTR completionKey,
DWORD dwNumBytes,
OVERLAPPED *pOverlapped) = 0;
virtual void Shutdown();
Initialise()
is called when it's first created, Shutdown()
is called when the thread is terminating and Process()
is called for each work item.
A socket server with a business logic thread pool
Now that we have a suitable thread pool we can integrate it with our fake POP3 socket server so that the actual processing of commands can occur in the business logic pool whilst the IO pool is left to get on with the IO operations. We can also move socket closure off to the business logic pool so that we don't block the IO threads with a lingering socket close.
The first thing we need to do is create and configure our thread pool. Then we can pass a reference to it to our socket server class so that it can pass a reference to it to our IO threads.
CThreadPool pool(
5, // initial number of threads to create
5, // minimum number of threads to keep in the pool
10, // maximum number of threads in the pool
5, // maximum number of "dormant" threads
5000, // pool maintenance period (millis)
100, // dispatch timeout (millis)
10000); // dispatch timeout for when pool is at max threads
pool.Start();
CSocketServer server(
INADDR_ANY, // address to listen on
5001, // port to listen on
10, // max number of sockets to keep in the pool
10, // max number of buffers to keep in the pool
1024, // buffer size
pool);
server.Start();
When our socket server has a complete, distinct, message to process it can dispatch it to the thread pool for processing, rather than processing it on one of its IO threads.
void CSocketServer::ProcessCommand(
CSocketServer::Socket *pSocket,
CIOBuffer *pBuffer)
{
pSocket->AddRef();
pBuffer->AddRef();
m_pool.Dispatch(reinterpret_cast(pSocket), 0, pBuffer->GetAsOverlapped());
}
Since we're passing the socket and IO buffer to another thread we have to increment their reference counts so that they dont get cleared up from underneath us. Over in our business logic thread we can finally process the message, and then release the references we took on the socket and IO buffer.
void CThreadPoolWorkerThread::Process(
ULONG_PTR completionKey,
DWORD operation,
OVERLAPPED *pOverlapped)
{
Socket *pSocket = reinterpret_cast(completionKey);
CIOBuffer *pBuffer = CIOBuffer::FromOverlapped(pOverlapped);
ProcessMessage(pSocket, pBuffer);
pSocket->Release();
pBuffer->Release();
}
Since the socket class marshals all IO requests back to the IO thread pool we can safely make read and write requests from our business logic thread even though the thread may be terminated before the IO requests completes.
Maintaining per-connection state
The final thing that our server may need to do is associate some internal server state with a particular socket connection, the Socket class makes this particularly easy as it provides the following member functions:
void *GetUserPtr() const;
void SetUserPtr(void *pData);
unsigned long GetUserData() const;
void SetUserData(unsigned long data);
These provide access to a single void *
user data pointer which is stored in the Socket. The common usage pattern for this user data is as follows: When the connection is established the socket server is notified by OnConnectionEstablished()
, the server can allocate a new per-connection data structure and associate it with the socket passed to OnConnectionEstablished()
by calling SetUserPtr()
in subsequent read and write completions the pointer to the per-connection user data structure can be extracted with GetUserPtr()
. When the connection is terminated the server is notified by OnConnectionClosed and the per-connection user data can be retrieved and deleted.
Although there are two versions of the user data access functions, one for a void *
and one for an unsigned long
there is only a single storage location. The two versions are merely for convenience and to reduce casting if the user data required is simply an index into an internal server structure rather than a pointer.
The example server marshals the OnConnectionEstablished()
and OnConnectionClosed()
calls across to the business logic thread pool and maintains some fairly trivial per-connection user data there. The data we maintain is the address of the client connection (obtained from the buffer passed into OnConnectionEstablished()
and the number of messages that have been processed on this particular connection.
Comments