Threaded IO: implement handleClientsWithPendingWritesUsingThreads().

This is just an experiment for now, there are a couple of race
conditions, mostly harmless for the performance gain experiment that
this commit represents so far.

The general idea here is to take Redis single threaded and instead
fan-out on expansive kernel calls: write(2) in this case, but the same
concept could be easily implemented for read(2) and protcol parsing.

However just threading writes like in this commit, is enough to evaluate
if the approach is sounding.
This commit is contained in:
antirez 2017-10-24 08:35:05 +02:00
parent 0a6090bfd8
commit f468e653b5
3 changed files with 162 additions and 9 deletions

View File

@ -1065,9 +1065,17 @@ void freeClient(client *c) {
* a context where calling freeClient() is not possible, because the client
* should be valid for the continuation of the flow of the program. */
void freeClientAsync(client *c) {
/* We need to handle concurrent access to the server.clients_to_close list
* only in the freeClientAsync() function, since it's the only function that
* may access the list while Redis uses I/O threads. All the other accesses
* are in the context of the main thread while the other threads are
* idle. */
static pthread_mutex_t async_free_queue_mutex = PTHREAD_MUTEX_INITIALIZER;
if (c->flags & CLIENT_CLOSE_ASAP || c->flags & CLIENT_LUA) return;
c->flags |= CLIENT_CLOSE_ASAP;
pthread_mutex_lock(&async_free_queue_mutex);
listAddNodeTail(server.clients_to_close,c);
pthread_mutex_unlock(&async_free_queue_mutex);
}
void freeClientsInAsyncFreeQueue(void) {
@ -1091,7 +1099,12 @@ client *lookupClientByID(uint64_t id) {
}
/* Write data in output buffers to client. Return C_OK if the client
* is still valid after the call, C_ERR if it was freed. */
* is still valid after the call, C_ERR if it was freed because of some
* error.
*
* This function is called by threads, but always with handler_installed
* set to 0. So when handler_installed is set to 0 the function must be
* thread safe. */
int writeToClient(int fd, client *c, int handler_installed) {
ssize_t nwritten = 0, totwritten = 0;
size_t objlen;
@ -1153,14 +1166,15 @@ int writeToClient(int fd, client *c, int handler_installed) {
zmalloc_used_memory() < server.maxmemory) &&
!(c->flags & CLIENT_SLAVE)) break;
}
/* FIXME: Fixme, use atomic var for this. */
server.stat_net_output_bytes += totwritten;
if (nwritten == -1) {
if (errno == EAGAIN) {
nwritten = 0;
} else {
serverLog(LL_VERBOSE,
"Error writing to client: %s", strerror(errno));
freeClient(c);
// serverLog(LL_VERBOSE,
// "Error writing to client: %s", strerror(errno));
freeClientAsync(c);
return C_ERR;
}
}
@ -1173,11 +1187,15 @@ int writeToClient(int fd, client *c, int handler_installed) {
}
if (!clientHasPendingReplies(c)) {
c->sentlen = 0;
/* Note that writeToClient() is called in a threaded way, but
* adDeleteFileEvent() is not thread safe: however writeToClient()
* is always called with handler_installed set to 0 from threads
* so we are fine. */
if (handler_installed) aeDeleteFileEvent(server.el,c->fd,AE_WRITABLE);
/* Close connection after entire reply has been sent. */
if (c->flags & CLIENT_CLOSE_AFTER_REPLY) {
freeClient(c);
freeClientAsync(c);
return C_ERR;
}
}
@ -2452,3 +2470,131 @@ int processEventsWhileBlocked(void) {
}
return count;
}
/* =============================================================================
* Threaded I/O
* =========================================================================== */
#define SERVER_MAX_IO_THREADS 32
pthread_t io_threads[SERVER_MAX_IO_THREADS];
pthread_mutex_t io_threads_done_mutex = PTHREAD_MUTEX_INITIALIZER;
pthread_cond_t io_threads_done_cond = PTHREAD_COND_INITIALIZER;
pthread_mutex_t io_threads_idle_mutex = PTHREAD_MUTEX_INITIALIZER;
pthread_cond_t io_threads_idle_cond = PTHREAD_COND_INITIALIZER;
pthread_cond_t io_threads_start_cond = PTHREAD_COND_INITIALIZER;
int io_threads_done = 0; /* Number of threads that completed the work. */
int io_threads_idle = 0; /* Number of threads in idle state ready to go. */
list *io_threads_list[SERVER_MAX_IO_THREADS];
void *IOThreadMain(void *myid) {
/* The ID is the thread number (from 0 to server.iothreads_num-1), and is
* used by the thread to just manipulate a single sub-array of clients. */
long id = (unsigned long)myid;
while(1) {
/* ... Wait for start ... */
pthread_mutex_lock(&io_threads_idle_mutex);
io_threads_idle++;
pthread_cond_signal(&io_threads_idle_cond);
printf("[%ld] Waiting start...\n", id);
pthread_cond_wait(&io_threads_start_cond,&io_threads_idle_mutex);
printf("[%ld] Started\n", id);
pthread_mutex_unlock(&io_threads_idle_mutex);
printf("%d to handle\n", (int)listLength(io_threads_list[id]));
/* ... Process ... */
listIter li;
listNode *ln;
listRewind(io_threads_list[id],&li);
while((ln = listNext(&li))) {
client *c = listNodeValue(ln);
writeToClient(c->fd,c,0);
}
listEmpty(io_threads_list[id]);
/* Report success. */
pthread_mutex_lock(&io_threads_done_mutex);
io_threads_done++;
pthread_cond_signal(&io_threads_done_cond);
pthread_mutex_unlock(&io_threads_done_mutex);
printf("[%ld] Done\n", id);
}
}
/* Initialize the data structures needed for threaded I/O. */
void initThreadedIO(void) {
pthread_t tid;
server.io_threads_num = 4;
for (int i = 0; i < server.io_threads_num; i++) {
if (pthread_create(&tid,NULL,IOThreadMain,(void*)(long)i) != 0) {
serverLog(LL_WARNING,"Fatal: Can't initialize IO thread.");
exit(1);
}
io_threads[i] = tid;
io_threads_list[i] = listCreate();
}
}
int handleClientsWithPendingWritesUsingThreads(void) {
int processed = listLength(server.clients_pending_write);
if (processed == 0) return 0; /* Return ASAP if there are no clients. */
printf("%d TOTAL\n", processed);
/* Wait for all threads to be ready. */
pthread_mutex_lock(&io_threads_idle_mutex);
while(io_threads_idle < server.io_threads_num) {
pthread_cond_wait(&io_threads_idle_cond,&io_threads_idle_mutex);
}
printf("All threads are idle: %d\n", io_threads_idle);
io_threads_idle = 0;
pthread_mutex_unlock(&io_threads_idle_mutex);
/* Distribute the clients across N different lists. */
listIter li;
listNode *ln;
listRewind(server.clients_pending_write,&li);
int item_id = 0;
while((ln = listNext(&li))) {
client *c = listNodeValue(ln);
c->flags &= ~CLIENT_PENDING_WRITE;
int target_id = item_id % server.io_threads_num;
listAddNodeTail(io_threads_list[target_id],c);
item_id++;
}
/* Start all threads. */
printf("Send start condition\n");
pthread_mutex_lock(&io_threads_done_mutex);
io_threads_done = 0;
pthread_cond_broadcast(&io_threads_start_cond);
pthread_mutex_unlock(&io_threads_done_mutex);
/* Wait for all threads to end their work. */
pthread_mutex_lock(&io_threads_done_mutex);
while(io_threads_done < server.io_threads_num) {
pthread_cond_wait(&io_threads_done_cond,&io_threads_done_mutex);
}
pthread_mutex_unlock(&io_threads_done_mutex);
printf("All threads finshed\n");
/* Run the list of clients again to install the write handler where
* needed. */
listRewind(server.clients_pending_write,&li);
while((ln = listNext(&li))) {
client *c = listNodeValue(ln);
/* Install the write handler if there are pending writes in some
* of the clients. */
if (clientHasPendingReplies(c) &&
aeCreateFileEvent(server.el, c->fd, AE_WRITABLE,
sendReplyToClient, c) == AE_ERR)
{
freeClientAsync(c);
}
}
listEmpty(server.clients_pending_write);
return processed;
}

View File

@ -1981,9 +1981,6 @@ int serverCron(struct aeEventLoop *eventLoop, long long id, void *clientData) {
flushAppendOnlyFile(0);
}
/* Close clients that need to be closed asynchronous */
freeClientsInAsyncFreeQueue();
/* Clear the paused clients flag if needed. */
clientsArePaused(); /* Don't check return value, just use the side effect.*/
@ -2075,7 +2072,12 @@ void beforeSleep(struct aeEventLoop *eventLoop) {
flushAppendOnlyFile(0);
/* Handle writes with pending output buffers. */
handleClientsWithPendingWrites();
/* XXX: Put a condition based on number of waiting clients: if we
* have less than a given number of clients, use non threaded code. */
handleClientsWithPendingWritesUsingThreads();
/* Close clients that need to be closed asynchronous */
freeClientsInAsyncFreeQueue();
/* Before we are going to sleep, let the threads access the dataset by
* releasing the GIL. Redis main thread will not touch anything at this
@ -2861,6 +2863,7 @@ void initServer(void) {
slowlogInit();
latencyMonitorInit();
bioInit();
initThreadedIO();
server.initial_memory_usage = zmalloc_used_memory();
}

View File

@ -1062,6 +1062,8 @@ struct redisServer {
int protected_mode; /* Don't accept external connections. */
int gopher_enabled; /* If true the server will reply to gopher
queries. Will still serve RESP2 queries. */
int io_threads_num; /* Number of IO threads to use. */
/* RDB / AOF loading information */
int loading; /* We are loading data from disk if true */
off_t loading_total_bytes;
@ -1576,12 +1578,14 @@ void pauseClients(mstime_t duration);
int clientsArePaused(void);
int processEventsWhileBlocked(void);
int handleClientsWithPendingWrites(void);
int handleClientsWithPendingWritesUsingThreads(void);
int clientHasPendingReplies(client *c);
void unlinkClient(client *c);
int writeToClient(int fd, client *c, int handler_installed);
void linkClient(client *c);
void protectClient(client *c);
void unprotectClient(client *c);
void initThreadedIO(void);
#ifdef __GNUC__
void addReplyErrorFormat(client *c, const char *fmt, ...)