/* ctdb database library Utility functions to read/write blobs of data from a file descriptor and handle the case where we might need multiple read/writes to get all the data. Copyright (C) Andrew Tridgell 2006 This program is free software; you can redistribute it and/or modify it under the terms of the GNU General Public License as published by the Free Software Foundation; either version 3 of the License, or (at your option) any later version. This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details. You should have received a copy of the GNU General Public License along with this program; if not, see . */ #include "includes.h" #include "lib/tdb/include/tdb.h" #include "lib/tevent/tevent.h" #include "lib/util/dlinklist.h" #include "system/network.h" #include "system/filesys.h" #include "../include/ctdb_private.h" #include "../include/ctdb_client.h" #include /* structures for packet queueing - see common/ctdb_io.c */ struct ctdb_partial { uint8_t *data; uint32_t length; }; struct ctdb_queue_pkt { struct ctdb_queue_pkt *next, *prev; uint8_t *data; uint32_t length; uint32_t full_length; }; struct ctdb_queue { struct ctdb_context *ctdb; struct ctdb_partial partial; /* partial input packet */ struct ctdb_queue_pkt *out_queue, *out_queue_tail; uint32_t out_queue_length; struct fd_event *fde; int fd; size_t alignment; void *private_data; ctdb_queue_cb_fn_t callback; bool *destroyed; const char *name; }; int ctdb_queue_length(struct ctdb_queue *queue) { return queue->out_queue_length; } static void dump_packet(unsigned char *data, size_t len) { size_t i; char *p = talloc_array(NULL, char, len*3 + 1); if (!p) { DEBUG(DEBUG_CRIT,("Packet talloc fail")); return; } for (i = 0; i < len; i++) sprintf(p + i*3, " %02x", data[i]); DEBUG(DEBUG_CRIT,("Contents: %s\n", p)); talloc_free(p); } /* called when an incoming connection is readable This function MUST be safe for reentry via the queue callback! */ static void queue_io_read(struct ctdb_queue *queue) { int num_ready = 0; uint32_t sz_bytes_req; uint32_t pkt_size; uint32_t pkt_bytes_remaining; uint32_t to_read; ssize_t nread; uint8_t *data; if (ioctl(queue->fd, FIONREAD, &num_ready) != 0) { return; } if (num_ready == 0) { /* the descriptor has been closed */ goto failed; } if (queue->partial.data == NULL) { /* starting fresh, allocate buf for size bytes */ sz_bytes_req = sizeof(pkt_size); queue->partial.data = talloc_size(queue, sz_bytes_req); if (queue->partial.data == NULL) { DEBUG(DEBUG_ERR,("read error alloc failed for %u\n", sz_bytes_req)); goto failed; } } else if (queue->partial.length < sizeof(pkt_size)) { /* yet to find out the packet length */ sz_bytes_req = sizeof(pkt_size) - queue->partial.length; } else { /* partial packet, length known, full buf allocated */ sz_bytes_req = 0; } data = queue->partial.data; if (sz_bytes_req > 0) { to_read = MIN(sz_bytes_req, num_ready); nread = read(queue->fd, data + queue->partial.length, to_read); if (nread <= 0) { DEBUG(DEBUG_ERR,("read error nread=%d\n", (int)nread)); goto failed; } queue->partial.length += nread; if (nread < sz_bytes_req) { /* not enough to know the length */ DEBUG(DEBUG_DEBUG,("Partial packet length read\n")); return; } /* size now known, allocate buffer for the full packet */ queue->partial.data = talloc_realloc_size(queue, data, *(uint32_t *)data); if (queue->partial.data == NULL) { DEBUG(DEBUG_ERR,("read error alloc failed for %u\n", *(uint32_t *)data)); goto failed; } data = queue->partial.data; num_ready -= nread; } pkt_size = *(uint32_t *)data; if (pkt_size == 0) { DEBUG(DEBUG_CRIT,("Invalid packet of length 0\n")); goto failed; } pkt_bytes_remaining = pkt_size - queue->partial.length; to_read = MIN(pkt_bytes_remaining, num_ready); nread = read(queue->fd, data + queue->partial.length, to_read); if (nread <= 0) { DEBUG(DEBUG_ERR,("read error nread=%d\n", (int)nread)); goto failed; } queue->partial.length += nread; if (queue->partial.length < pkt_size) { DEBUG(DEBUG_DEBUG,("Partial packet data read\n")); return; } queue->partial.data = NULL; queue->partial.length = 0; /* it is the responsibility of the callback to free 'data' */ queue->callback(data, pkt_size, queue->private_data); return; failed: queue->callback(NULL, 0, queue->private_data); } /* used when an event triggers a dead queue */ static void queue_dead(struct event_context *ev, struct timed_event *te, struct timeval t, void *private_data) { struct ctdb_queue *queue = talloc_get_type(private_data, struct ctdb_queue); queue->callback(NULL, 0, queue->private_data); } /* called when an incoming connection is writeable */ static void queue_io_write(struct ctdb_queue *queue) { while (queue->out_queue) { struct ctdb_queue_pkt *pkt = queue->out_queue; ssize_t n; if (queue->ctdb->flags & CTDB_FLAG_TORTURE) { n = write(queue->fd, pkt->data, 1); } else { n = write(queue->fd, pkt->data, pkt->length); } if (n == -1 && errno != EAGAIN && errno != EWOULDBLOCK) { if (pkt->length != pkt->full_length) { /* partial packet sent - we have to drop it */ DLIST_REMOVE(queue->out_queue, pkt); queue->out_queue_length--; talloc_free(pkt); } talloc_free(queue->fde); queue->fde = NULL; queue->fd = -1; event_add_timed(queue->ctdb->ev, queue, timeval_zero(), queue_dead, queue); return; } if (n <= 0) return; if (n != pkt->length) { pkt->length -= n; pkt->data += n; return; } DLIST_REMOVE(queue->out_queue, pkt); queue->out_queue_length--; talloc_free(pkt); } EVENT_FD_NOT_WRITEABLE(queue->fde); } /* called when an incoming connection is readable or writeable */ static void queue_io_handler(struct event_context *ev, struct fd_event *fde, uint16_t flags, void *private_data) { struct ctdb_queue *queue = talloc_get_type(private_data, struct ctdb_queue); if (flags & EVENT_FD_READ) { queue_io_read(queue); } else { queue_io_write(queue); } } /* queue a packet for sending */ int ctdb_queue_send(struct ctdb_queue *queue, uint8_t *data, uint32_t length) { struct ctdb_queue_pkt *pkt; uint32_t length2, full_length; if (queue->alignment) { /* enforce the length and alignment rules from the tcp packet allocator */ length2 = (length+(queue->alignment-1)) & ~(queue->alignment-1); *(uint32_t *)data = length2; } else { length2 = length; } if (length2 != length) { memset(data+length, 0, length2-length); } full_length = length2; /* if the queue is empty then try an immediate write, avoiding queue overhead. This relies on non-blocking sockets */ if (queue->out_queue == NULL && queue->fd != -1 && !(queue->ctdb->flags & CTDB_FLAG_TORTURE)) { ssize_t n = write(queue->fd, data, length2); if (n == -1 && errno != EAGAIN && errno != EWOULDBLOCK) { talloc_free(queue->fde); queue->fde = NULL; queue->fd = -1; event_add_timed(queue->ctdb->ev, queue, timeval_zero(), queue_dead, queue); /* yes, we report success, as the dead node is handled via a separate event */ return 0; } if (n > 0) { data += n; length2 -= n; } if (length2 == 0) return 0; } pkt = talloc(queue, struct ctdb_queue_pkt); CTDB_NO_MEMORY(queue->ctdb, pkt); pkt->data = talloc_memdup(pkt, data, length2); CTDB_NO_MEMORY(queue->ctdb, pkt->data); pkt->length = length2; pkt->full_length = full_length; if (queue->out_queue == NULL && queue->fd != -1) { EVENT_FD_WRITEABLE(queue->fde); } DLIST_ADD_END(queue->out_queue, pkt, NULL); queue->out_queue_length++; if (queue->ctdb->tunable.verbose_memory_names != 0) { struct ctdb_req_header *hdr = (struct ctdb_req_header *)pkt->data; switch (hdr->operation) { case CTDB_REQ_CONTROL: { struct ctdb_req_control *c = (struct ctdb_req_control *)hdr; talloc_set_name(pkt, "ctdb_queue_pkt: %s control opcode=%u srvid=%llu datalen=%u", queue->name, (unsigned)c->opcode, (unsigned long long)c->srvid, (unsigned)c->datalen); break; } case CTDB_REQ_MESSAGE: { struct ctdb_req_message *m = (struct ctdb_req_message *)hdr; talloc_set_name(pkt, "ctdb_queue_pkt: %s message srvid=%llu datalen=%u", queue->name, (unsigned long long)m->srvid, (unsigned)m->datalen); break; } default: talloc_set_name(pkt, "ctdb_queue_pkt: %s operation=%u length=%u src=%u dest=%u", queue->name, (unsigned)hdr->operation, (unsigned)hdr->length, (unsigned)hdr->srcnode, (unsigned)hdr->destnode); break; } } return 0; } /* setup the fd used by the queue */ int ctdb_queue_set_fd(struct ctdb_queue *queue, int fd) { queue->fd = fd; talloc_free(queue->fde); queue->fde = NULL; if (fd != -1) { queue->fde = event_add_fd(queue->ctdb->ev, queue, fd, EVENT_FD_READ, queue_io_handler, queue); if (queue->fde == NULL) { return -1; } tevent_fd_set_auto_close(queue->fde); if (queue->out_queue) { EVENT_FD_WRITEABLE(queue->fde); } } return 0; } /* If someone sets up this pointer, they want to know if the queue is freed */ static int queue_destructor(struct ctdb_queue *queue) { if (queue->destroyed != NULL) *queue->destroyed = true; return 0; } /* setup a packet queue on a socket */ struct ctdb_queue *ctdb_queue_setup(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx, int fd, int alignment, ctdb_queue_cb_fn_t callback, void *private_data, const char *fmt, ...) { struct ctdb_queue *queue; va_list ap; queue = talloc_zero(mem_ctx, struct ctdb_queue); CTDB_NO_MEMORY_NULL(ctdb, queue); va_start(ap, fmt); queue->name = talloc_vasprintf(mem_ctx, fmt, ap); va_end(ap); CTDB_NO_MEMORY_NULL(ctdb, queue->name); queue->ctdb = ctdb; queue->fd = fd; queue->alignment = alignment; queue->private_data = private_data; queue->callback = callback; if (fd != -1) { if (ctdb_queue_set_fd(queue, fd) != 0) { talloc_free(queue); return NULL; } } talloc_set_destructor(queue, queue_destructor); return queue; }