diff --git a/source3/include/ctdbd_conn.h b/source3/include/ctdbd_conn.h index 7ae2ec40139..161d8608b1c 100644 --- a/source3/include/ctdbd_conn.h +++ b/source3/include/ctdbd_conn.h @@ -108,6 +108,21 @@ struct ctdb_req_header; void ctdbd_prep_hdr_next_reqid( struct ctdbd_connection *conn, struct ctdb_req_header *hdr); +/* + * Async ctdb_request. iov[0] must start with an initialized + * struct ctdb_req_header + */ +struct tevent_req *ctdbd_req_send( + TALLOC_CTX *mem_ctx, + struct tevent_context *ev, + struct ctdbd_connection *conn, + struct iovec *iov, + size_t num_iov); +int ctdbd_req_recv( + struct tevent_req *req, + TALLOC_CTX *mem_ctx, + struct ctdb_req_header **reply); + struct tevent_req *ctdbd_parse_send(TALLOC_CTX *mem_ctx, struct tevent_context *ev, struct ctdbd_connection *conn, diff --git a/source3/lib/ctdbd_conn.c b/source3/lib/ctdbd_conn.c index 20f47aae173..c6555914ff6 100644 --- a/source3/lib/ctdbd_conn.c +++ b/source3/lib/ctdbd_conn.c @@ -36,6 +36,7 @@ #include "lib/util/sys_rw.h" #include "lib/util/blocking.h" #include "ctdb/include/ctdb_protocol.h" +#include "lib/async_req/async_sock.h" /* paths to these include files come from --with-ctdb= in configure */ @@ -85,6 +86,8 @@ struct ctdbd_connection { * Outgoing queue for writev_send of asynchronous ctdb requests */ struct tevent_queue *outgoing; + struct tevent_req **pending; + struct tevent_req *read_req; }; static void ctdbd_async_socket_handler(struct tevent_context *ev, @@ -99,7 +102,8 @@ static bool ctdbd_conn_has_async_sends(struct ctdbd_connection *conn) static bool ctdbd_conn_has_async_reqs(struct ctdbd_connection *conn) { - return (conn->fde != NULL); + size_t len = talloc_array_length(conn->pending); + return ((len != 0) || (conn->fde != NULL)); } static uint32_t ctdbd_next_reqid(struct ctdbd_connection *conn) @@ -1850,6 +1854,328 @@ void ctdbd_prep_hdr_next_reqid( }; } +struct ctdbd_pkt_read_state { + uint8_t *pkt; +}; + +static ssize_t ctdbd_pkt_read_more( + uint8_t *buf, size_t buflen, void *private_data); +static void ctdbd_pkt_read_done(struct tevent_req *subreq); + +static struct tevent_req *ctdbd_pkt_read_send( + TALLOC_CTX *mem_ctx, struct tevent_context *ev, int fd) +{ + struct tevent_req *req = NULL, *subreq = NULL; + struct ctdbd_pkt_read_state *state = NULL; + + req = tevent_req_create(mem_ctx, &state, struct ctdbd_pkt_read_state); + if (req == NULL) { + return NULL; + } + subreq = read_packet_send(state, ev, fd, 4, ctdbd_pkt_read_more, NULL); + if (tevent_req_nomem(subreq, req)) { + return tevent_req_post(req, ev); + } + tevent_req_set_callback(subreq, ctdbd_pkt_read_done, req); + return req; +} + +static ssize_t ctdbd_pkt_read_more( + uint8_t *buf, size_t buflen, void *private_data) +{ + uint32_t msglen; + if (buflen < 4) { + return -1; + } + if (buflen > 4) { + return 0; /* Been here, done */ + } + memcpy(&msglen, buf, 4); + + if (msglen < sizeof(struct ctdb_req_header)) { + return -1; + } + return msglen - sizeof(msglen); +} + +static void ctdbd_pkt_read_done(struct tevent_req *subreq) +{ + struct tevent_req *req = tevent_req_callback_data( + subreq, struct tevent_req); + struct ctdbd_pkt_read_state *state = tevent_req_data( + req, struct ctdbd_pkt_read_state); + ssize_t nread; + int err; + + nread = read_packet_recv(subreq, state, &state->pkt, &err); + TALLOC_FREE(subreq); + if (nread == -1) { + tevent_req_error(req, err); + return; + } + tevent_req_done(req); +} + +static int ctdbd_pkt_read_recv( + struct tevent_req *req, TALLOC_CTX *mem_ctx, uint8_t **pkt) +{ + struct ctdbd_pkt_read_state *state = tevent_req_data( + req, struct ctdbd_pkt_read_state); + int err; + + if (tevent_req_is_unix_error(req, &err)) { + return err; + } + *pkt = talloc_move(mem_ctx, &state->pkt); + tevent_req_received(req); + return 0; +} + +static bool ctdbd_conn_receive_next(struct ctdbd_connection *conn); +static void ctdbd_conn_received(struct tevent_req *subreq); + +struct ctdbd_req_state { + struct ctdbd_connection *conn; + struct tevent_context *ev; + uint32_t reqid; + struct ctdb_req_header *reply; +}; + +static void ctdbd_req_unset_pending(struct tevent_req *req) +{ + struct ctdbd_req_state *state = tevent_req_data( + req, struct ctdbd_req_state); + struct ctdbd_connection *conn = state->conn; + size_t num_pending = talloc_array_length(conn->pending); + size_t i, num_after; + + tevent_req_set_cleanup_fn(req, NULL); + + if (num_pending == 1) { + /* + * conn->read_req is a child of conn->pending + */ + TALLOC_FREE(conn->pending); + conn->read_req = NULL; + return; + } + + for (i=0; ipending[i]) { + break; + } + } + if (i == num_pending) { + /* + * Something's seriously broken. Just returning here is the + * right thing nevertheless, the point of this routine is to + * remove ourselves from conn->pending. + */ + return; + } + + num_after = num_pending - i - 1; + if (num_after > 0) { + memmove(&conn->pending[i], + &conn->pending[i] + 1, + sizeof(*conn->pending) * num_after); + } + conn->pending = talloc_realloc( + NULL, conn->pending, struct tevent_req *, num_pending - 1); +} + +static void ctdbd_req_cleanup( + struct tevent_req *req, enum tevent_req_state req_state) +{ + ctdbd_req_unset_pending(req); +} + +static bool ctdbd_req_set_pending(struct tevent_req *req) +{ + struct ctdbd_req_state *state = tevent_req_data( + req, struct ctdbd_req_state); + struct ctdbd_connection *conn = state->conn; + struct tevent_req **pending = NULL; + size_t num_pending = talloc_array_length(conn->pending); + bool ok; + + pending = talloc_realloc( + conn, conn->pending, struct tevent_req *, num_pending + 1); + if (pending == NULL) { + return false; + } + pending[num_pending] = req; + conn->pending = pending; + + tevent_req_set_cleanup_fn(req, ctdbd_req_cleanup); + + ok = ctdbd_conn_receive_next(conn); + if (!ok) { + ctdbd_req_unset_pending(req); + return false; + } + + return true; +} + +static bool ctdbd_conn_receive_next(struct ctdbd_connection *conn) +{ + size_t num_pending = talloc_array_length(conn->pending); + struct tevent_req *req = NULL; + struct ctdbd_req_state *state = NULL; + + if (conn->read_req != NULL) { + return true; + } + if (num_pending == 0) { + /* + * done for now + */ + return true; + } + + req = conn->pending[0]; + state = tevent_req_data(req, struct ctdbd_req_state); + + conn->read_req = ctdbd_pkt_read_send( + conn->pending, state->ev, conn->fd); + if (conn->read_req == NULL) { + return false; + } + tevent_req_set_callback(conn->read_req, ctdbd_conn_received, conn); + return true; +} + +static void ctdbd_conn_received(struct tevent_req *subreq) +{ + struct ctdbd_connection *conn = tevent_req_callback_data( + subreq, struct ctdbd_connection); + TALLOC_CTX *frame = talloc_stackframe(); + uint8_t *pkt = NULL; + int ret; + struct ctdb_req_header *hdr = NULL; + uint32_t reqid; + struct tevent_req *req = NULL; + struct ctdbd_req_state *state = NULL; + size_t i, num_pending; + bool ok; + + SMB_ASSERT(subreq == conn->read_req); + conn->read_req = NULL; + + ret = ctdbd_pkt_read_recv(subreq, frame, &pkt); + TALLOC_FREE(subreq); + if (ret != 0) { + cluster_fatal("ctdbd_pkt_read failed\n"); + } + + hdr = (struct ctdb_req_header *)pkt; + reqid = hdr->reqid; + num_pending = talloc_array_length(conn->pending); + + for (i=0; ipending[i]; + state = tevent_req_data(req, struct ctdbd_req_state); + if (state->reqid == reqid) { + break; + } + } + + if (i == num_pending) { + /* not found */ + TALLOC_FREE(frame); + return; + } + + state->reply = talloc_move(state, &hdr); + tevent_req_defer_callback(req, state->ev); + tevent_req_done(req); + + TALLOC_FREE(frame); + + ok = ctdbd_conn_receive_next(conn); + if (!ok) { + cluster_fatal("ctdbd_conn_receive_next failed\n"); + } +} + +static void ctdbd_req_written(struct tevent_req *subreq); + +struct tevent_req *ctdbd_req_send( + TALLOC_CTX *mem_ctx, + struct tevent_context *ev, + struct ctdbd_connection *conn, + struct iovec *iov, + size_t num_iov) +{ + struct tevent_req *req = NULL, *subreq = NULL; + struct ctdbd_req_state *state = NULL; + struct ctdb_req_header *hdr = NULL; + bool ok; + + req = tevent_req_create(mem_ctx, &state, struct ctdbd_req_state); + if (req == NULL) { + return NULL; + } + state->conn = conn; + state->ev = ev; + + if ((num_iov == 0) || + (iov[0].iov_len < sizeof(struct ctdb_req_header))) { + tevent_req_error(req, EINVAL); + return tevent_req_post(req, ev); + } + hdr = iov[0].iov_base; + state->reqid = hdr->reqid; + + ok = ctdbd_req_set_pending(req); + if (!ok) { + tevent_req_oom(req); + return tevent_req_post(req, ev); + } + + subreq = writev_send( + state, ev, conn->outgoing, conn->fd, false, iov, num_iov); + if (tevent_req_nomem(subreq, req)) { + return tevent_req_post(req, ev); + } + tevent_req_set_callback(subreq, ctdbd_req_written, req); + + return req; +} + +static void ctdbd_req_written(struct tevent_req *subreq) +{ + struct tevent_req *req = tevent_req_callback_data( + subreq, struct tevent_req); + ssize_t nwritten; + int err; + + nwritten = writev_recv(subreq, &err); + TALLOC_FREE(subreq); + if (nwritten == -1) { + tevent_req_error(req, err); + return; + } +} + +int ctdbd_req_recv( + struct tevent_req *req, + TALLOC_CTX *mem_ctx, + struct ctdb_req_header **reply) +{ + struct ctdbd_req_state *state = tevent_req_data( + req, struct ctdbd_req_state); + int err; + + if (tevent_req_is_unix_error(req, &err)) { + return err; + } + *reply = talloc_move(mem_ctx, &state->reply); + tevent_req_received(req); + return 0; +} + struct ctdbd_parse_state { struct tevent_context *ev; struct ctdbd_connection *conn;