2009-04-23 19:23:13 +04:00
/*
* Unix SMB / CIFS implementation .
* thread pool implementation
* Copyright ( C ) Volker Lendecke 2009
*
* This program is free software ; you can redistribute it and / or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation ; either version 3 of the License , or
* ( at your option ) any later version .
*
* This program is distributed in the hope that it will be useful ,
* but WITHOUT ANY WARRANTY ; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE . See the
* GNU General Public License for more details .
*
* You should have received a copy of the GNU General Public License
* along with this program . If not , see < http : //www.gnu.org/licenses/>.
*/
2011-09-21 05:19:58 +04:00
# include "config.h"
2009-04-23 19:23:13 +04:00
# include <errno.h>
# include <stdio.h>
# include <stdlib.h>
# include <string.h>
# include <pthread.h>
# include <signal.h>
# include <assert.h>
# include <fcntl.h>
2011-09-21 05:19:58 +04:00
# include "system/time.h"
2011-10-08 14:10:20 +04:00
# include "system/filesys.h"
2012-04-09 11:17:29 +04:00
# include "replace.h"
2009-04-23 19:23:13 +04:00
# include "pthreadpool.h"
2011-04-22 13:47:11 +04:00
# include "lib/util/dlinklist.h"
2009-04-23 19:23:13 +04:00
struct pthreadpool_job {
int id ;
void ( * fn ) ( void * private_data ) ;
void * private_data ;
} ;
struct pthreadpool {
2011-04-22 13:47:11 +04:00
/*
* List pthreadpools for fork safety
*/
struct pthreadpool * prev , * next ;
2009-04-23 19:23:13 +04:00
/*
* Control access to this struct
*/
pthread_mutex_t mutex ;
/*
* Threads waiting for work do so here
*/
pthread_cond_t condvar ;
/*
2014-03-21 20:53:26 +04:00
* Array of jobs
2009-04-23 19:23:13 +04:00
*/
2014-03-21 20:53:26 +04:00
size_t jobs_array_len ;
struct pthreadpool_job * jobs ;
size_t head ;
size_t num_jobs ;
2009-04-23 19:23:13 +04:00
/*
* pipe for signalling
*/
int sig_pipe [ 2 ] ;
/*
* indicator to worker threads that they should shut down
*/
int shutdown ;
/*
* maximum number of threads
*/
int max_threads ;
/*
* Number of threads
*/
int num_threads ;
/*
* Number of idle threads
*/
int num_idle ;
/*
2011-04-25 22:05:31 +04:00
* An array of threads that require joining .
2009-04-23 19:23:13 +04:00
*/
int num_exited ;
2011-04-25 22:05:31 +04:00
pthread_t * exited ; /* We alloc more */
2009-04-23 19:23:13 +04:00
} ;
2011-04-22 13:47:11 +04:00
static pthread_mutex_t pthreadpools_mutex = PTHREAD_MUTEX_INITIALIZER ;
static struct pthreadpool * pthreadpools = NULL ;
static pthread_once_t pthreadpool_atfork_initialized = PTHREAD_ONCE_INIT ;
static void pthreadpool_prep_atfork ( void ) ;
2009-04-23 19:23:13 +04:00
/*
* Initialize a thread pool
*/
int pthreadpool_init ( unsigned max_threads , struct pthreadpool * * presult )
{
struct pthreadpool * pool ;
int ret ;
2011-04-25 22:05:31 +04:00
pool = ( struct pthreadpool * ) malloc ( sizeof ( struct pthreadpool ) ) ;
2009-04-23 19:23:13 +04:00
if ( pool = = NULL ) {
return ENOMEM ;
}
2014-03-21 20:53:26 +04:00
pool - > jobs_array_len = 4 ;
pool - > jobs = calloc (
pool - > jobs_array_len , sizeof ( struct pthreadpool_job ) ) ;
if ( pool - > jobs = = NULL ) {
free ( pool ) ;
return ENOMEM ;
}
pool - > head = pool - > num_jobs = 0 ;
2011-04-22 13:47:11 +04:00
ret = pipe ( pool - > sig_pipe ) ;
if ( ret = = - 1 ) {
int err = errno ;
2014-03-21 20:53:26 +04:00
free ( pool - > jobs ) ;
2011-04-22 13:47:11 +04:00
free ( pool ) ;
return err ;
}
2009-04-23 19:23:13 +04:00
ret = pthread_mutex_init ( & pool - > mutex , NULL ) ;
if ( ret ! = 0 ) {
2011-04-28 00:18:12 +04:00
close ( pool - > sig_pipe [ 0 ] ) ;
close ( pool - > sig_pipe [ 1 ] ) ;
2014-03-21 20:53:26 +04:00
free ( pool - > jobs ) ;
2009-04-23 19:23:13 +04:00
free ( pool ) ;
return ret ;
}
ret = pthread_cond_init ( & pool - > condvar , NULL ) ;
if ( ret ! = 0 ) {
pthread_mutex_destroy ( & pool - > mutex ) ;
2011-04-28 00:18:12 +04:00
close ( pool - > sig_pipe [ 0 ] ) ;
close ( pool - > sig_pipe [ 1 ] ) ;
2014-03-21 20:53:26 +04:00
free ( pool - > jobs ) ;
2009-04-23 19:23:13 +04:00
free ( pool ) ;
return ret ;
}
pool - > shutdown = 0 ;
pool - > num_threads = 0 ;
pool - > num_exited = 0 ;
2011-04-25 22:05:31 +04:00
pool - > exited = NULL ;
2009-04-23 19:23:13 +04:00
pool - > max_threads = max_threads ;
pool - > num_idle = 0 ;
2011-04-22 13:47:11 +04:00
ret = pthread_mutex_lock ( & pthreadpools_mutex ) ;
if ( ret ! = 0 ) {
pthread_cond_destroy ( & pool - > condvar ) ;
pthread_mutex_destroy ( & pool - > mutex ) ;
2011-04-28 00:18:12 +04:00
close ( pool - > sig_pipe [ 0 ] ) ;
close ( pool - > sig_pipe [ 1 ] ) ;
2014-03-21 20:53:26 +04:00
free ( pool - > jobs ) ;
2011-04-22 13:47:11 +04:00
free ( pool ) ;
return ret ;
}
DLIST_ADD ( pthreadpools , pool ) ;
ret = pthread_mutex_unlock ( & pthreadpools_mutex ) ;
assert ( ret = = 0 ) ;
pthread_once ( & pthreadpool_atfork_initialized , pthreadpool_prep_atfork ) ;
2009-04-23 19:23:13 +04:00
* presult = pool ;
2011-04-22 13:47:11 +04:00
2009-04-23 19:23:13 +04:00
return 0 ;
}
2011-04-22 13:47:11 +04:00
static void pthreadpool_prepare ( void )
2009-04-23 19:23:13 +04:00
{
2011-04-22 13:47:11 +04:00
int ret ;
struct pthreadpool * pool ;
2009-04-23 19:23:13 +04:00
2011-04-22 13:47:11 +04:00
ret = pthread_mutex_lock ( & pthreadpools_mutex ) ;
assert ( ret = = 0 ) ;
pool = pthreadpools ;
while ( pool ! = NULL ) {
ret = pthread_mutex_lock ( & pool - > mutex ) ;
assert ( ret = = 0 ) ;
pool = pool - > next ;
2009-04-23 19:23:13 +04:00
}
2011-04-22 13:47:11 +04:00
}
static void pthreadpool_parent ( void )
{
int ret ;
struct pthreadpool * pool ;
2014-03-03 15:20:41 +04:00
for ( pool = DLIST_TAIL ( pthreadpools ) ;
pool ! = NULL ;
pool = DLIST_PREV ( pool ) ) {
2011-04-22 13:47:11 +04:00
ret = pthread_mutex_unlock ( & pool - > mutex ) ;
assert ( ret = = 0 ) ;
2009-04-23 19:23:13 +04:00
}
2011-04-22 13:47:11 +04:00
ret = pthread_mutex_unlock ( & pthreadpools_mutex ) ;
assert ( ret = = 0 ) ;
}
static void pthreadpool_child ( void )
{
int ret ;
struct pthreadpool * pool ;
2014-03-03 15:20:41 +04:00
for ( pool = DLIST_TAIL ( pthreadpools ) ;
pool ! = NULL ;
pool = DLIST_PREV ( pool ) ) {
2011-04-22 13:47:11 +04:00
close ( pool - > sig_pipe [ 0 ] ) ;
close ( pool - > sig_pipe [ 1 ] ) ;
ret = pipe ( pool - > sig_pipe ) ;
assert ( ret = = 0 ) ;
pool - > num_threads = 0 ;
2011-04-25 22:05:31 +04:00
2011-04-22 13:47:11 +04:00
pool - > num_exited = 0 ;
2011-04-25 22:05:31 +04:00
free ( pool - > exited ) ;
pool - > exited = NULL ;
2011-04-22 13:47:11 +04:00
pool - > num_idle = 0 ;
2014-03-21 20:53:26 +04:00
pool - > head = 0 ;
pool - > num_jobs = 0 ;
2011-04-22 13:47:11 +04:00
ret = pthread_mutex_unlock ( & pool - > mutex ) ;
assert ( ret = = 0 ) ;
2009-04-23 19:23:13 +04:00
}
2011-04-22 13:47:11 +04:00
ret = pthread_mutex_unlock ( & pthreadpools_mutex ) ;
2009-04-23 19:23:13 +04:00
assert ( ret = = 0 ) ;
2011-04-22 13:47:11 +04:00
}
static void pthreadpool_prep_atfork ( void )
{
pthread_atfork ( pthreadpool_prepare , pthreadpool_parent ,
pthreadpool_child ) ;
}
/*
* Return the file descriptor which becomes readable when a job has
* finished
*/
2011-04-24 12:09:45 +04:00
int pthreadpool_signal_fd ( struct pthreadpool * pool )
2011-04-22 13:47:11 +04:00
{
return pool - > sig_pipe [ 0 ] ;
2009-04-23 19:23:13 +04:00
}
/*
* Do a pthread_join ( ) on all children that have exited , pool - > mutex must be
* locked
*/
static void pthreadpool_join_children ( struct pthreadpool * pool )
{
int i ;
for ( i = 0 ; i < pool - > num_exited ; i + + ) {
2015-11-10 11:56:56 +03:00
int ret ;
ret = pthread_join ( pool - > exited [ i ] , NULL ) ;
if ( ret ! = 0 ) {
/*
* Severe internal error , we can ' t do much but
* abort here .
*/
abort ( ) ;
}
2009-04-23 19:23:13 +04:00
}
pool - > num_exited = 0 ;
2011-04-25 22:05:31 +04:00
/*
* Deliberately not free and NULL pool - > exited . That will be
* re - used by realloc later .
*/
2009-04-23 19:23:13 +04:00
}
/*
* Fetch a finished job number from the signal pipe
*/
2014-03-24 14:39:56 +04:00
int pthreadpool_finished_jobs ( struct pthreadpool * pool , int * jobids ,
unsigned num_jobids )
2009-04-23 19:23:13 +04:00
{
2014-03-24 14:39:56 +04:00
ssize_t to_read , nread ;
2009-04-23 19:23:13 +04:00
nread = - 1 ;
errno = EINTR ;
2014-03-24 14:39:56 +04:00
to_read = sizeof ( int ) * num_jobids ;
2009-04-23 19:23:13 +04:00
while ( ( nread = = - 1 ) & & ( errno = = EINTR ) ) {
2014-03-24 14:39:56 +04:00
nread = read ( pool - > sig_pipe [ 0 ] , jobids , to_read ) ;
2009-04-23 19:23:13 +04:00
}
2011-04-22 13:47:11 +04:00
if ( nread = = - 1 ) {
2014-03-24 14:39:56 +04:00
return - errno ;
2011-04-22 13:47:11 +04:00
}
2014-03-24 14:39:56 +04:00
if ( ( nread % sizeof ( int ) ) ! = 0 ) {
return - EINVAL ;
2009-04-23 19:23:13 +04:00
}
2014-03-24 14:39:56 +04:00
return nread / sizeof ( int ) ;
2009-04-23 19:23:13 +04:00
}
/*
* Destroy a thread pool , finishing all threads working for it
*/
int pthreadpool_destroy ( struct pthreadpool * pool )
{
int ret , ret1 ;
ret = pthread_mutex_lock ( & pool - > mutex ) ;
if ( ret ! = 0 ) {
return ret ;
}
2014-03-21 20:53:26 +04:00
if ( ( pool - > num_jobs ! = 0 ) | | pool - > shutdown ) {
2011-04-22 13:47:11 +04:00
ret = pthread_mutex_unlock ( & pool - > mutex ) ;
assert ( ret = = 0 ) ;
return EBUSY ;
}
2009-04-23 19:23:13 +04:00
if ( pool - > num_threads > 0 ) {
/*
* We have active threads , tell them to finish , wait for that .
*/
pool - > shutdown = 1 ;
if ( pool - > num_idle > 0 ) {
/*
2014-01-27 14:08:03 +04:00
* Wake the idle threads . They will find
* pool - > shutdown to be set and exit themselves
2009-04-23 19:23:13 +04:00
*/
ret = pthread_cond_broadcast ( & pool - > condvar ) ;
if ( ret ! = 0 ) {
pthread_mutex_unlock ( & pool - > mutex ) ;
return ret ;
}
}
while ( ( pool - > num_threads > 0 ) | | ( pool - > num_exited > 0 ) ) {
if ( pool - > num_exited > 0 ) {
pthreadpool_join_children ( pool ) ;
continue ;
}
/*
* A thread that shuts down will also signal
* pool - > condvar
*/
ret = pthread_cond_wait ( & pool - > condvar , & pool - > mutex ) ;
if ( ret ! = 0 ) {
pthread_mutex_unlock ( & pool - > mutex ) ;
return ret ;
}
}
}
ret = pthread_mutex_unlock ( & pool - > mutex ) ;
if ( ret ! = 0 ) {
return ret ;
}
ret = pthread_mutex_destroy ( & pool - > mutex ) ;
ret1 = pthread_cond_destroy ( & pool - > condvar ) ;
2011-04-22 13:47:11 +04:00
if ( ret ! = 0 ) {
return ret ;
}
if ( ret1 ! = 0 ) {
return ret1 ;
2009-04-23 19:23:13 +04:00
}
2011-04-22 13:47:11 +04:00
ret = pthread_mutex_lock ( & pthreadpools_mutex ) ;
2009-04-23 19:23:13 +04:00
if ( ret ! = 0 ) {
return ret ;
}
2011-04-22 13:47:11 +04:00
DLIST_REMOVE ( pthreadpools , pool ) ;
ret = pthread_mutex_unlock ( & pthreadpools_mutex ) ;
assert ( ret = = 0 ) ;
close ( pool - > sig_pipe [ 0 ] ) ;
pool - > sig_pipe [ 0 ] = - 1 ;
close ( pool - > sig_pipe [ 1 ] ) ;
pool - > sig_pipe [ 1 ] = - 1 ;
2011-04-25 22:05:31 +04:00
free ( pool - > exited ) ;
2014-03-21 20:53:26 +04:00
free ( pool - > jobs ) ;
2011-04-22 13:47:11 +04:00
free ( pool ) ;
return 0 ;
2009-04-23 19:23:13 +04:00
}
/*
* Prepare for pthread_exit ( ) , pool - > mutex must be locked
*/
static void pthreadpool_server_exit ( struct pthreadpool * pool )
{
2011-04-25 22:05:31 +04:00
pthread_t * exited ;
2009-04-23 19:23:13 +04:00
pool - > num_threads - = 1 ;
2011-04-25 22:05:31 +04:00
exited = ( pthread_t * ) realloc (
2013-05-12 14:44:41 +04:00
pool - > exited , sizeof ( pthread_t ) * ( pool - > num_exited + 1 ) ) ;
2011-04-25 22:05:31 +04:00
if ( exited = = NULL ) {
/* lost a thread status */
return ;
}
pool - > exited = exited ;
2009-04-23 19:23:13 +04:00
pool - > exited [ pool - > num_exited ] = pthread_self ( ) ;
pool - > num_exited + = 1 ;
}
2014-03-21 20:53:26 +04:00
static bool pthreadpool_get_job ( struct pthreadpool * p ,
struct pthreadpool_job * job )
{
if ( p - > num_jobs = = 0 ) {
return false ;
}
* job = p - > jobs [ p - > head ] ;
p - > head = ( p - > head + 1 ) % p - > jobs_array_len ;
p - > num_jobs - = 1 ;
return true ;
}
static bool pthreadpool_put_job ( struct pthreadpool * p ,
int id ,
void ( * fn ) ( void * private_data ) ,
void * private_data )
{
struct pthreadpool_job * job ;
if ( p - > num_jobs = = p - > jobs_array_len ) {
struct pthreadpool_job * tmp ;
size_t new_len = p - > jobs_array_len * 2 ;
tmp = realloc (
p - > jobs , sizeof ( struct pthreadpool_job ) * new_len ) ;
if ( tmp = = NULL ) {
return false ;
}
p - > jobs = tmp ;
/*
* We just doubled the jobs array . The array implements a FIFO
* queue with a modulo - based wraparound , so we have to memcpy
* the jobs that are logically at the queue end but physically
* before the queue head into the reallocated area . The new
* space starts at the current jobs_array_len , and we have to
* copy everything before the current head job into the new
* area .
*/
memcpy ( & p - > jobs [ p - > jobs_array_len ] , p - > jobs ,
sizeof ( struct pthreadpool_job ) * p - > head ) ;
p - > jobs_array_len = new_len ;
}
job = & p - > jobs [ ( p - > head + p - > num_jobs ) % p - > jobs_array_len ] ;
job - > id = id ;
job - > fn = fn ;
job - > private_data = private_data ;
p - > num_jobs + = 1 ;
return true ;
}
2009-04-23 19:23:13 +04:00
static void * pthreadpool_server ( void * arg )
{
struct pthreadpool * pool = ( struct pthreadpool * ) arg ;
int res ;
res = pthread_mutex_lock ( & pool - > mutex ) ;
if ( res ! = 0 ) {
return NULL ;
}
while ( 1 ) {
2011-04-22 13:47:11 +04:00
struct timespec ts ;
2014-03-21 20:53:26 +04:00
struct pthreadpool_job job ;
2009-04-23 19:23:13 +04:00
/*
* idle - wait at most 1 second . If nothing happens in that
* time , exit this thread .
*/
2011-06-05 23:30:16 +04:00
clock_gettime ( CLOCK_REALTIME , & ts ) ;
ts . tv_sec + = 1 ;
2009-04-23 19:23:13 +04:00
2014-03-21 20:53:26 +04:00
while ( ( pool - > num_jobs = = 0 ) & & ( pool - > shutdown = = 0 ) ) {
2009-04-23 19:23:13 +04:00
pool - > num_idle + = 1 ;
res = pthread_cond_timedwait (
2011-04-22 13:47:11 +04:00
& pool - > condvar , & pool - > mutex , & ts ) ;
2009-04-23 19:23:13 +04:00
pool - > num_idle - = 1 ;
if ( res = = ETIMEDOUT ) {
2014-03-21 20:53:26 +04:00
if ( pool - > num_jobs = = 0 ) {
2009-04-23 19:23:13 +04:00
/*
* we timed out and still no work for
* us . Exit .
*/
pthreadpool_server_exit ( pool ) ;
pthread_mutex_unlock ( & pool - > mutex ) ;
return NULL ;
}
break ;
}
assert ( res = = 0 ) ;
}
2014-03-21 20:53:26 +04:00
if ( pthreadpool_get_job ( pool , & job ) ) {
2009-04-23 19:23:13 +04:00
ssize_t written ;
2014-03-21 20:53:26 +04:00
int sig_pipe = pool - > sig_pipe [ 1 ] ;
2009-04-23 19:23:13 +04:00
/*
2011-04-22 13:47:11 +04:00
* Do the work with the mutex unlocked
2009-04-23 19:23:13 +04:00
*/
res = pthread_mutex_unlock ( & pool - > mutex ) ;
assert ( res = = 0 ) ;
2014-03-21 20:53:26 +04:00
job . fn ( job . private_data ) ;
2009-04-23 19:23:13 +04:00
res = pthread_mutex_lock ( & pool - > mutex ) ;
assert ( res = = 0 ) ;
2014-08-21 23:55:06 +04:00
written = write ( sig_pipe , & job . id , sizeof ( job . id ) ) ;
2009-04-23 19:23:13 +04:00
if ( written ! = sizeof ( int ) ) {
pthreadpool_server_exit ( pool ) ;
pthread_mutex_unlock ( & pool - > mutex ) ;
return NULL ;
}
}
2014-03-21 20:53:26 +04:00
if ( ( pool - > num_jobs = = 0 ) & & ( pool - > shutdown ! = 0 ) ) {
2009-04-23 19:23:13 +04:00
/*
* No more work to do and we ' re asked to shut down , so
* exit
*/
pthreadpool_server_exit ( pool ) ;
if ( pool - > num_threads = = 0 ) {
/*
* Ping the main thread waiting for all of us
* workers to have quit .
*/
pthread_cond_broadcast ( & pool - > condvar ) ;
}
pthread_mutex_unlock ( & pool - > mutex ) ;
return NULL ;
}
}
}
int pthreadpool_add_job ( struct pthreadpool * pool , int job_id ,
void ( * fn ) ( void * private_data ) , void * private_data )
{
pthread_t thread_id ;
int res ;
sigset_t mask , omask ;
res = pthread_mutex_lock ( & pool - > mutex ) ;
if ( res ! = 0 ) {
return res ;
}
2011-04-22 13:47:11 +04:00
if ( pool - > shutdown ) {
/*
* Protect against the pool being shut down while
* trying to add a job
*/
res = pthread_mutex_unlock ( & pool - > mutex ) ;
assert ( res = = 0 ) ;
return EINVAL ;
}
2009-04-23 19:23:13 +04:00
/*
* Just some cleanup under the mutex
*/
pthreadpool_join_children ( pool ) ;
/*
* Add job to the end of the queue
*/
2014-03-21 20:53:26 +04:00
if ( ! pthreadpool_put_job ( pool , job_id , fn , private_data ) ) {
pthread_mutex_unlock ( & pool - > mutex ) ;
return ENOMEM ;
2009-04-23 19:23:13 +04:00
}
if ( pool - > num_idle > 0 ) {
/*
* We have idle threads , wake one .
*/
res = pthread_cond_signal ( & pool - > condvar ) ;
pthread_mutex_unlock ( & pool - > mutex ) ;
return res ;
}
2011-04-25 22:05:31 +04:00
if ( ( pool - > max_threads ! = 0 ) & &
( pool - > num_threads > = pool - > max_threads ) ) {
2009-04-23 19:23:13 +04:00
/*
* No more new threads , we just queue the request
*/
pthread_mutex_unlock ( & pool - > mutex ) ;
return 0 ;
}
/*
* Create a new worker thread . It should not receive any signals .
*/
sigfillset ( & mask ) ;
res = pthread_sigmask ( SIG_BLOCK , & mask , & omask ) ;
if ( res ! = 0 ) {
pthread_mutex_unlock ( & pool - > mutex ) ;
return res ;
}
res = pthread_create ( & thread_id , NULL , pthreadpool_server ,
( void * ) pool ) ;
if ( res = = 0 ) {
pool - > num_threads + = 1 ;
}
assert ( pthread_sigmask ( SIG_SETMASK , & omask , NULL ) = = 0 ) ;
pthread_mutex_unlock ( & pool - > mutex ) ;
return res ;
}