From 472b96d6d3ddc05d0e52f23e6142ffdf0a0d0a35 Mon Sep 17 00:00:00 2001 From: Andrew Tridgell Date: Thu, 3 May 2007 12:16:03 +1000 Subject: [PATCH 1/2] first stage of efficient non-blocking ctdb traverse (This used to be ctdb commit 4c23e6f26bde421bb56b55de9d6cd3e319b2be40) --- ctdb/Makefile.in | 2 +- ctdb/common/ctdb_io.c | 10 +- ctdb/common/ctdb_traverse.c | 178 ++++++++++++++++++++++++++++++++++++ ctdb/include/ctdb_private.h | 1 + ctdb/tools/ctdb_control.c | 1 + 5 files changed, 188 insertions(+), 4 deletions(-) create mode 100644 ctdb/common/ctdb_traverse.c diff --git a/ctdb/Makefile.in b/ctdb/Makefile.in index 1fc3ea9a405..154dc9abe8e 100644 --- a/ctdb/Makefile.in +++ b/ctdb/Makefile.in @@ -32,7 +32,7 @@ CTDB_COMMON_OBJ = common/ctdb.o common/ctdb_daemon.o common/ctdb_client.o \ common/ctdb_io.o common/util.o common/ctdb_util.o \ common/ctdb_call.o common/ctdb_ltdb.o common/ctdb_lockwait.o \ common/ctdb_message.o common/cmdline.o common/ctdb_control.o \ - lib/util/debug.o common/ctdb_recover.o + lib/util/debug.o common/ctdb_recover.o common/ctdb_traverse.o CTDB_TCP_OBJ = tcp/tcp_connect.o tcp/tcp_io.o tcp/tcp_init.o diff --git a/ctdb/common/ctdb_io.c b/ctdb/common/ctdb_io.c index 6a5aa928b04..d09339561ae 100644 --- a/ctdb/common/ctdb_io.c +++ b/ctdb/common/ctdb_io.c @@ -214,9 +214,13 @@ int ctdb_queue_send(struct ctdb_queue *queue, uint8_t *data, uint32_t length) struct ctdb_queue_pkt *pkt; uint32_t length2; - /* enforce the length and alignment rules from the tcp packet allocator */ - length2 = (length+(queue->alignment-1)) & ~(queue->alignment-1); - *(uint32_t *)data = length2; + if (queue->alignment) { + /* enforce the length and alignment rules from the tcp packet allocator */ + length2 = (length+(queue->alignment-1)) & ~(queue->alignment-1); + *(uint32_t *)data = length2; + } else { + length2 = length; + } if (length2 != length) { memset(data+length, 0, length2-length); diff --git a/ctdb/common/ctdb_traverse.c b/ctdb/common/ctdb_traverse.c new file mode 100644 index 00000000000..7ecedc82564 --- /dev/null +++ b/ctdb/common/ctdb_traverse.c @@ -0,0 +1,178 @@ +/* + efficient async ctdb traverse + + Copyright (C) Andrew Tridgell 2007 + + This library is free software; you can redistribute it and/or + modify it under the terms of the GNU Lesser General Public + License as published by the Free Software Foundation; either + version 2 of the License, or (at your option) any later version. + + This library is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + Lesser General Public License for more details. + + You should have received a copy of the GNU Lesser General Public + License along with this library; if not, write to the Free Software + Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA +*/ + +#include "includes.h" +#include "lib/events/events.h" +#include "system/filesys.h" +#include "system/wait.h" +#include "db_wrap.h" +#include "lib/tdb/include/tdb.h" +#include "../include/ctdb_private.h" + +typedef void (*ctdb_traverse_fn_t)(void *private_data, TDB_DATA key, TDB_DATA data); + +/* + structure used to pass the data between the child and parent + */ +struct ctdb_traverse_data { + uint32_t length; + uint32_t keylen; + uint32_t datalen; + uint8_t data[1]; +}; + +/* + handle returned to caller - freeing this handler will kill the child and + terminate the traverse + */ +struct ctdb_traverse_handle { + struct ctdb_db_context *ctdb_db; + int fd[2]; + pid_t child; + void *private_data; + ctdb_traverse_fn_t callback; + struct timeval start_time; + struct ctdb_queue *queue; +}; + +/* + called when data is available from the child + */ +static void ctdb_traverse_handler(uint8_t *rawdata, size_t length, void *private_data) +{ + struct ctdb_traverse_handle *h = talloc_get_type(private_data, + struct ctdb_traverse_handle); + TDB_DATA key, data; + ctdb_traverse_fn_t callback = h->callback; + void *p = h->private_data; + struct ctdb_traverse_data *tdata = (struct ctdb_traverse_data *)rawdata; + + if (rawdata == NULL || length < 4 || length != tdata->length) { + /* end of traverse */ + talloc_free(h); + callback(p, tdb_null, tdb_null); + return; + } + + key.dsize = tdata->keylen; + key.dptr = &tdata->data[0]; + data.dsize = tdata->datalen; + data.dptr = &tdata->data[tdata->keylen]; + + callback(p, key, data); +} + +/* + destroy a in-flight traverse operation + */ +static int traverse_destructor(struct ctdb_traverse_handle *h) +{ + close(h->fd[0]); + kill(h->child, SIGKILL); + waitpid(h->child, NULL, 0); + return 0; +} + +/* + callback from tdb_traverse_read()x + */ +static int ctdb_traverse_fn(struct tdb_context *tdb, TDB_DATA key, TDB_DATA data, void *p) +{ + struct ctdb_traverse_handle *h = talloc_get_type(p, struct ctdb_traverse_handle); + struct ctdb_traverse_data *d; + size_t length = offsetof(struct ctdb_traverse_data, data) + key.dsize + data.dsize; + d = (struct ctdb_traverse_data *)talloc_size(h, length); + if (d == NULL) { + /* error handling is tricky in this child code .... */ + return -1; + } + d->length = length; + d->keylen = key.dsize; + d->datalen = data.dsize; + memcpy(&d->data[0], key.dptr, key.dsize); + memcpy(&d->data[key.dsize], data.dptr, data.dsize); + if (ctdb_queue_send(h->queue, (uint8_t *)d, d->length) != 0) { + return -1; + } + return 0; +} + +/* + setup a non-blocking traverse of a tdb. The callback function will + be called on every record in the local ltdb. To stop the travserse, + talloc_free() the travserse_handle. + */ +struct ctdb_traverse_handle *ctdb_traverse(struct ctdb_db_context *ctdb_db, + ctdb_traverse_fn_t callback, + void *private_data) +{ + struct ctdb_traverse_handle *h; + int ret; + + ctdb_db->ctdb->status.traverse_calls++; + + if (!(h = talloc_zero(ctdb_db, struct ctdb_traverse_handle))) { + return NULL; + } + + ret = pipe(h->fd); + + if (ret != 0) { + talloc_free(h); + return NULL; + } + + h->child = fork(); + + if (h->child == (pid_t)-1) { + close(h->fd[0]); + close(h->fd[1]); + talloc_free(h); + return NULL; + } + + h->callback = callback; + h->private_data = private_data; + h->ctdb_db = ctdb_db; + + if (h->child == 0) { + /* start the traverse in the child */ + close(h->fd[0]); + tdb_traverse_read(ctdb_db->ltdb->tdb, ctdb_traverse_fn, h); + _exit(0); + } + + close(h->fd[1]); + talloc_set_destructor(h, traverse_destructor); + + /* + setup a packet queue between the child and the parent. This + copes with all the async and packet boundary issues + */ + h->queue = ctdb_queue_setup(ctdb_db->ctdb, h, h->fd[0], 0, ctdb_traverse_handler, h); + if (h->queue == NULL) { + talloc_free(h); + return NULL; + } + + h->start_time = timeval_current(); + + return h; +} diff --git a/ctdb/include/ctdb_private.h b/ctdb/include/ctdb_private.h index d14ce336861..3b836e6a1a9 100644 --- a/ctdb/include/ctdb_private.h +++ b/ctdb/include/ctdb_private.h @@ -157,6 +157,7 @@ struct ctdb_status { uint32_t total_calls; uint32_t pending_calls; uint32_t lockwait_calls; + uint32_t traverse_calls; uint32_t pending_lockwait_calls; uint32_t __last_counter; /* hack for control_status_all */ uint32_t max_hop_count; diff --git a/ctdb/tools/ctdb_control.c b/ctdb/tools/ctdb_control.c index dee21528a59..0efe24a8f73 100644 --- a/ctdb/tools/ctdb_control.c +++ b/ctdb/tools/ctdb_control.c @@ -110,6 +110,7 @@ static void show_status(struct ctdb_status *s) printf(" total_calls %u\n", s->total_calls); printf(" pending_calls %u\n", s->pending_calls); printf(" lockwait_calls %u\n", s->lockwait_calls); + printf(" traverse_calls %u\n", s->traverse_calls); printf(" pending_lockwait_calls %u\n", s->pending_lockwait_calls); printf(" max_hop_count %u\n", s->max_hop_count); printf(" max_call_latency %.6f sec\n", s->max_call_latency); From de0f848556894784276037973fc6488273defb7c Mon Sep 17 00:00:00 2001 From: Andrew Tridgell Date: Thu, 3 May 2007 13:44:27 +1000 Subject: [PATCH 2/2] don't zero beyond packet header unnecessarily (This used to be ctdb commit 4cf88ca2ce81db8fe10b0dfedb81d99a2bd93328) --- ctdb/common/ctdb_daemon.c | 17 +++++++++++------ 1 file changed, 11 insertions(+), 6 deletions(-) diff --git a/ctdb/common/ctdb_daemon.c b/ctdb/common/ctdb_daemon.c index 5d619e0a907..55af203c581 100644 --- a/ctdb/common/ctdb_daemon.c +++ b/ctdb/common/ctdb_daemon.c @@ -727,7 +727,9 @@ struct ctdb_req_header *_ctdbd_allocate_pkt(struct ctdb_context *ctdb, { int size; struct ctdb_req_header *hdr; - size = ((length+1)+(CTDB_DS_ALIGNMENT-1)) & ~(CTDB_DS_ALIGNMENT-1); + + length = MAX(length, slength); + size = (length+(CTDB_DS_ALIGNMENT-1)) & ~(CTDB_DS_ALIGNMENT-1); hdr = (struct ctdb_req_header *)talloc_size(mem_ctx, size); if (hdr == NULL) { @@ -736,9 +738,9 @@ struct ctdb_req_header *_ctdbd_allocate_pkt(struct ctdb_context *ctdb, return NULL; } talloc_set_name_const(hdr, type); - memset(hdr, 0, size); + memset(hdr, 0, slength); + hdr->length = length; hdr->operation = operation; - hdr->length = size; hdr->ctdb_magic = CTDB_MAGIC; hdr->ctdb_version = CTDB_VERSION; hdr->srcnode = ctdb->vnn; @@ -761,7 +763,10 @@ struct ctdb_req_header *_ctdb_transport_allocate(struct ctdb_context *ctdb, { int size; struct ctdb_req_header *hdr; - size = ((length+1)+(CTDB_DS_ALIGNMENT-1)) & ~(CTDB_DS_ALIGNMENT-1); + + length = MAX(length, slength); + size = (length+(CTDB_DS_ALIGNMENT-1)) & ~(CTDB_DS_ALIGNMENT-1); + hdr = (struct ctdb_req_header *)ctdb->methods->allocate_pkt(mem_ctx, size); if (hdr == NULL) { DEBUG(0,("Unable to allocate transport packet for operation %u of length %u\n", @@ -769,9 +774,9 @@ struct ctdb_req_header *_ctdb_transport_allocate(struct ctdb_context *ctdb, return NULL; } talloc_set_name_const(hdr, type); - memset(hdr, 0, size); + memset(hdr, 0, slength); + hdr->length = length; hdr->operation = operation; - hdr->length = size; hdr->ctdb_magic = CTDB_MAGIC; hdr->ctdb_version = CTDB_VERSION; hdr->generation = ctdb->vnn_map->generation;