2009-04-23 19:23:13 +04:00
/*
* Unix SMB / CIFS implementation .
* thread pool implementation
* Copyright ( C ) Volker Lendecke 2009
*
* This program is free software ; you can redistribute it and / or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation ; either version 3 of the License , or
* ( at your option ) any later version .
*
* This program is distributed in the hope that it will be useful ,
* but WITHOUT ANY WARRANTY ; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE . See the
* GNU General Public License for more details .
*
* You should have received a copy of the GNU General Public License
* along with this program . If not , see < http : //www.gnu.org/licenses/>.
*/
# include <errno.h>
# include <stdio.h>
# include <unistd.h>
# include <stdlib.h>
# include <string.h>
# include <pthread.h>
# include <signal.h>
# include <assert.h>
# include <fcntl.h>
# include "pthreadpool.h"
struct pthreadpool_job {
struct pthreadpool_job * next ;
int id ;
void ( * fn ) ( void * private_data ) ;
void * private_data ;
} ;
struct pthreadpool {
/*
* Control access to this struct
*/
pthread_mutex_t mutex ;
/*
* Threads waiting for work do so here
*/
pthread_cond_t condvar ;
/*
* List of work jobs
*/
struct pthreadpool_job * jobs , * last_job ;
/*
* pipe for signalling
*/
int sig_pipe [ 2 ] ;
/*
* indicator to worker threads that they should shut down
*/
int shutdown ;
/*
* maximum number of threads
*/
int max_threads ;
/*
* Number of threads
*/
int num_threads ;
/*
* Number of idle threads
*/
int num_idle ;
/*
* An array of threads that require joining , the array has
* " max_threads " elements . It contains " num_exited " ids .
*/
int num_exited ;
pthread_t exited [ 1 ] ; /* We alloc more */
} ;
/*
* Initialize a thread pool
*/
int pthreadpool_init ( unsigned max_threads , struct pthreadpool * * presult )
{
struct pthreadpool * pool ;
size_t size ;
int ret ;
size = sizeof ( struct pthreadpool ) + max_threads * sizeof ( pthread_t ) ;
pool = ( struct pthreadpool * ) malloc ( size ) ;
if ( pool = = NULL ) {
return ENOMEM ;
}
ret = pthread_mutex_init ( & pool - > mutex , NULL ) ;
if ( ret ! = 0 ) {
free ( pool ) ;
return ret ;
}
ret = pthread_cond_init ( & pool - > condvar , NULL ) ;
if ( ret ! = 0 ) {
pthread_mutex_destroy ( & pool - > mutex ) ;
free ( pool ) ;
return ret ;
}
pool - > shutdown = 0 ;
pool - > jobs = pool - > last_job = NULL ;
pool - > num_threads = 0 ;
pool - > num_exited = 0 ;
pool - > max_threads = max_threads ;
pool - > num_idle = 0 ;
pool - > sig_pipe [ 0 ] = - 1 ;
pool - > sig_pipe [ 1 ] = - 1 ;
* presult = pool ;
return 0 ;
}
/*
* Create and return a file descriptor which becomes readable when a job has
* finished
*/
int pthreadpool_sig_fd ( struct pthreadpool * pool )
{
int result , ret ;
ret = pthread_mutex_lock ( & pool - > mutex ) ;
if ( ret ! = 0 ) {
errno = ret ;
return - 1 ;
}
if ( pool - > sig_pipe [ 0 ] ! = - 1 ) {
result = pool - > sig_pipe [ 0 ] ;
goto done ;
}
ret = pipe ( pool - > sig_pipe ) ;
if ( ret = = - 1 ) {
result = - 1 ;
goto done ;
}
result = pool - > sig_pipe [ 0 ] ;
done :
ret = pthread_mutex_unlock ( & pool - > mutex ) ;
assert ( ret = = 0 ) ;
return result ;
}
/*
* Do a pthread_join ( ) on all children that have exited , pool - > mutex must be
* locked
*/
static void pthreadpool_join_children ( struct pthreadpool * pool )
{
int i ;
for ( i = 0 ; i < pool - > num_exited ; i + + ) {
pthread_join ( pool - > exited [ i ] , NULL ) ;
}
pool - > num_exited = 0 ;
}
/*
* Fetch a finished job number from the signal pipe
*/
int pthreadpool_finished_job ( struct pthreadpool * pool )
{
int result , ret , fd ;
ssize_t nread ;
ret = pthread_mutex_lock ( & pool - > mutex ) ;
if ( ret ! = 0 ) {
errno = ret ;
return - 1 ;
}
/*
* Just some cleanup under the mutex
*/
pthreadpool_join_children ( pool ) ;
fd = pool - > sig_pipe [ 0 ] ;
ret = pthread_mutex_unlock ( & pool - > mutex ) ;
assert ( ret = = 0 ) ;
if ( fd = = - 1 ) {
errno = EINVAL ;
return - 1 ;
}
nread = - 1 ;
errno = EINTR ;
while ( ( nread = = - 1 ) & & ( errno = = EINTR ) ) {
nread = read ( fd , & result , sizeof ( int ) ) ;
}
/*
* TODO : handle nread > 0 & & nread < sizeof ( int )
*/
/*
* Lock the mutex to provide a memory barrier for data from the worker
* thread to the main thread . The pipe access itself does not have to
* be locked , for sizeof ( int ) the write to a pipe is atomic , and only
* one thread reads from it . But we need to lock the mutex briefly
* even if we don ' t do anything under the lock , to make sure we can
* see all memory the helper thread has written .
*/
ret = pthread_mutex_lock ( & pool - > mutex ) ;
if ( ret = = - 1 ) {
errno = ret ;
return - 1 ;
}
ret = pthread_mutex_unlock ( & pool - > mutex ) ;
assert ( ret = = 0 ) ;
return result ;
}
/*
* Destroy a thread pool , finishing all threads working for it
*/
int pthreadpool_destroy ( struct pthreadpool * pool )
{
int ret , ret1 ;
ret = pthread_mutex_lock ( & pool - > mutex ) ;
if ( ret ! = 0 ) {
return ret ;
}
if ( pool - > num_threads > 0 ) {
/*
* We have active threads , tell them to finish , wait for that .
*/
pool - > shutdown = 1 ;
if ( pool - > num_idle > 0 ) {
/*
* Wake the idle threads . They will find pool - > quit to
* be set and exit themselves
*/
ret = pthread_cond_broadcast ( & pool - > condvar ) ;
if ( ret ! = 0 ) {
pthread_mutex_unlock ( & pool - > mutex ) ;
return ret ;
}
}
while ( ( pool - > num_threads > 0 ) | | ( pool - > num_exited > 0 ) ) {
if ( pool - > num_exited > 0 ) {
pthreadpool_join_children ( pool ) ;
continue ;
}
/*
* A thread that shuts down will also signal
* pool - > condvar
*/
ret = pthread_cond_wait ( & pool - > condvar , & pool - > mutex ) ;
if ( ret ! = 0 ) {
pthread_mutex_unlock ( & pool - > mutex ) ;
return ret ;
}
}
}
ret = pthread_mutex_unlock ( & pool - > mutex ) ;
if ( ret ! = 0 ) {
return ret ;
}
ret = pthread_mutex_destroy ( & pool - > mutex ) ;
ret1 = pthread_cond_destroy ( & pool - > condvar ) ;
if ( ( ret = = 0 ) & & ( ret1 = = 0 ) ) {
free ( pool ) ;
}
if ( ret ! = 0 ) {
return ret ;
}
return ret1 ;
}
/*
* Prepare for pthread_exit ( ) , pool - > mutex must be locked
*/
static void pthreadpool_server_exit ( struct pthreadpool * pool )
{
pool - > num_threads - = 1 ;
pool - > exited [ pool - > num_exited ] = pthread_self ( ) ;
pool - > num_exited + = 1 ;
}
static void * pthreadpool_server ( void * arg )
{
struct pthreadpool * pool = ( struct pthreadpool * ) arg ;
int res ;
res = pthread_mutex_lock ( & pool - > mutex ) ;
if ( res ! = 0 ) {
return NULL ;
}
while ( 1 ) {
struct timespec timeout ;
struct pthreadpool_job * job ;
/*
* idle - wait at most 1 second . If nothing happens in that
* time , exit this thread .
*/
2009-05-01 23:34:12 +04:00
timeout . tv_sec = time ( NULL ) + 1 ;
timeout . tv_nsec = 0 ;
2009-04-23 19:23:13 +04:00
while ( ( pool - > jobs = = NULL ) & & ( pool - > shutdown = = 0 ) ) {
pool - > num_idle + = 1 ;
res = pthread_cond_timedwait (
& pool - > condvar , & pool - > mutex , & timeout ) ;
pool - > num_idle - = 1 ;
if ( res = = ETIMEDOUT ) {
if ( pool - > jobs = = NULL ) {
/*
* we timed out and still no work for
* us . Exit .
*/
pthreadpool_server_exit ( pool ) ;
pthread_mutex_unlock ( & pool - > mutex ) ;
return NULL ;
}
break ;
}
assert ( res = = 0 ) ;
}
job = pool - > jobs ;
if ( job ! = NULL ) {
int fd = pool - > sig_pipe [ 1 ] ;
ssize_t written ;
/*
* Ok , there ' s work for us to do , remove the job from
* the pthreadpool list
*/
pool - > jobs = job - > next ;
if ( pool - > last_job = = job ) {
pool - > last_job = NULL ;
}
/*
* Do the work with the mutex unlocked : - )
*/
res = pthread_mutex_unlock ( & pool - > mutex ) ;
assert ( res = = 0 ) ;
job - > fn ( job - > private_data ) ;
written = sizeof ( int ) ;
res = pthread_mutex_lock ( & pool - > mutex ) ;
assert ( res = = 0 ) ;
if ( fd ! = - 1 ) {
written = write ( fd , & job - > id , sizeof ( int ) ) ;
}
free ( job ) ;
if ( written ! = sizeof ( int ) ) {
pthreadpool_server_exit ( pool ) ;
pthread_mutex_unlock ( & pool - > mutex ) ;
return NULL ;
}
}
if ( ( pool - > jobs = = NULL ) & & ( pool - > shutdown ! = 0 ) ) {
/*
* No more work to do and we ' re asked to shut down , so
* exit
*/
pthreadpool_server_exit ( pool ) ;
if ( pool - > num_threads = = 0 ) {
/*
* Ping the main thread waiting for all of us
* workers to have quit .
*/
pthread_cond_broadcast ( & pool - > condvar ) ;
}
pthread_mutex_unlock ( & pool - > mutex ) ;
return NULL ;
}
}
}
int pthreadpool_add_job ( struct pthreadpool * pool , int job_id ,
void ( * fn ) ( void * private_data ) , void * private_data )
{
struct pthreadpool_job * job ;
pthread_t thread_id ;
int res ;
sigset_t mask , omask ;
job = ( struct pthreadpool_job * ) malloc ( sizeof ( struct pthreadpool_job ) ) ;
if ( job = = NULL ) {
return ENOMEM ;
}
job - > fn = fn ;
job - > private_data = private_data ;
job - > id = job_id ;
job - > next = NULL ;
res = pthread_mutex_lock ( & pool - > mutex ) ;
if ( res ! = 0 ) {
free ( job ) ;
return res ;
}
/*
* Just some cleanup under the mutex
*/
pthreadpool_join_children ( pool ) ;
/*
* Add job to the end of the queue
*/
if ( pool - > jobs = = NULL ) {
pool - > jobs = job ;
}
else {
pool - > last_job - > next = job ;
}
pool - > last_job = job ;
if ( pool - > num_idle > 0 ) {
/*
* We have idle threads , wake one .
*/
res = pthread_cond_signal ( & pool - > condvar ) ;
pthread_mutex_unlock ( & pool - > mutex ) ;
return res ;
}
if ( pool - > num_threads > = pool - > max_threads ) {
/*
* No more new threads , we just queue the request
*/
pthread_mutex_unlock ( & pool - > mutex ) ;
return 0 ;
}
/*
* Create a new worker thread . It should not receive any signals .
*/
sigfillset ( & mask ) ;
res = pthread_sigmask ( SIG_BLOCK , & mask , & omask ) ;
if ( res ! = 0 ) {
pthread_mutex_unlock ( & pool - > mutex ) ;
return res ;
}
res = pthread_create ( & thread_id , NULL , pthreadpool_server ,
( void * ) pool ) ;
if ( res = = 0 ) {
pool - > num_threads + = 1 ;
}
assert ( pthread_sigmask ( SIG_SETMASK , & omask , NULL ) = = 0 ) ;
pthread_mutex_unlock ( & pool - > mutex ) ;
return res ;
}