SPProcPool: 在多线程或事件驱动环境下使用的进程池实现
作者:iunknown
SPProcPool 是一个能够在多线程或事件驱动环境下使用的进程池实现。
主页:
http://code.google.com/p/spprocpool/
下载:
spprocpool
多线程的好处是各个线程能够共享一个地址空间,因此对一些需要全局排序来调度的任务,使用多线程可以比较方便地实现。比如类似 postfix/qmgr 的
模块,如果使用多线程的话,那么所有的邮件能够在一个优先队列中排队,按队列顺序进行投递;如果投递失败了,那么重新插入队列。
但另一方面,如果具体的任务处理部分已经有了实现,但不是线程安全的,这种问题要怎么来解决呢?
一个最直观的解决方法是每个任务直接 fork 一次。但是这种做法有几个细节问题需要认真考虑:
1.在子进程中如何把结果返回给调度进程?常见的方法是使用 pipe
2.每个任务 fork 一次,估计很多人的第一反应就是性能会怎么样?
3.如果调度进程中,除了负责 fork 的线程,还有其他的线程存在,那么就存在 fork-with-thread 的问题。
>>具体的内容可以参考:
http://www.opengroup.org/onlinepubs/000095399/functions/fork.html
针对上面存在的问题,在每个任务直接 fork 一次的基础上,做了一些改进,就形成了这样的一个进程池实现:
1.在前面的方案中,存在调度进程(Sched)和工作进程(Worker)这两类进程。
>>为了避免 fork-with-thread 的问题,再增加一类管理进程(Manager)。管理进程的主要职责就是负责 fork 工作进程。
2.通常 Manager 是由 Sched fork 出来的,它们之间存在一条通信的 pipe (MgrPipe) 。
>>创建一个新的工作进程的流程如下:Sched 创建一个 pipe (WorkerPipe),把其中的一端用 send_fd 的方法发送给 Manager,
>>然后 Manager fork 一个 Worker 出来,并且把 WorkerPipe 传递给 Worker 。这样就在 Sched 和 Worker 之间建立了一个 Pipe 。
3.Worker 在被 fork 出来之后,通常就阻塞在读 WorkerPipe 上面。Sched 通过 WorkerPipe 发送任务给 Worker 。
>>Worker 完成任务之后,通过 WorkerPipe 发送结果给 Sched 。Worker 可以不断地重复这个过程,这样就达到了一个池的效果。
4.对于使用 libevent 这类事件驱动类型的
程序,这个进程池也能方便地被调用。
>>因为 Worker 曝露出来的是一个 PipeFd,能够方便地加入到 libevent 的事件循环中。这类事件驱动类的程序,
>>通常使用单线程实现,当具体的任务处理可能需要耗费比较长时间的时候,就需要使用多线程或者多进程来辅助了。
按惯例,展示一个 Echo 的例子。
复制内容到剪贴板
代码:
class SP_EchoWorker : public SP_ProcWorker {
public:
SP_EchoWorker(){}
virtual ~SP_EchoWorker(){}
virtual void process( const SP_ProcInfo * procInfo ) {
for( ; ; ) {
char buff[ 256 ] = { 0 };
int len = read( procInfo->getPipeFd(), buff, sizeof( buff ) );
if( len > 0 ) {
char newBuff[ 256 ] = { 0 };
snprintf( newBuff, sizeof( newBuff ), "<%d> %s", getpid(), buff );
write( procInfo->getPipeFd(), newBuff, strlen( newBuff ) );
} else {
break;
}
}
}
};
class SP_EchoWorkerFactory : public SP_ProcWorkerFactory {
public:
SP_EchoWorkerFactory(){}
virtual ~SP_EchoWorkerFactory(){}
virtual SP_ProcWorker * create() const {
return new SP_EchoWorker();
}
};
void * echoWorkerCaller( void * args )
{
const char * text = "Hello, world!";
SP_ProcPool * procPool = (SP_ProcPool*)args;
SP_ProcInfo * info = procPool->get();
if( write( info->getPipeFd(), text, strlen( text ) ) > 0 ) {
char buff[ 256 ] = { 0 };
memset( buff, 0, sizeof( buff ) );
if( read( info->getPipeFd(), buff, sizeof( buff ) - 1 ) ) {
printf( "read: %d - %s\n", info->getPid(), buff );
}
}
procPool->save( info );
return NULL;
}
int main( int argc, char * argv[] )
{
SP_ProcManager manager( new SP_EchoWorkerFactory() );
manager.start();
SP_ProcPool * procPool = manager.getProcPool();
static int MAX_TEST_THREAD = 10;
pthread_t threadArray[ MAX_TEST_THREAD ];
for( int i = 0; i < MAX_TEST_THREAD; i++ ) {
pthread_create( &( threadArray[i] ), NULL, echoWorkerCaller, procPool );
}
for( int i = 0; i < MAX_TEST_THREAD; i++ ) {
pthread_join( threadArray[i], NULL );
}
procPool->dump();
return 0;
}