1
0
mirror of https://github.com/samba-team/samba.git synced 2025-01-12 09:18:10 +03:00

lib: Add ctdbd_req_send/recv

Signed-off-by: Volker Lendecke <vl@samba.org>
Reviewed-by: Ralph Boehme <slow@samba.org>
This commit is contained in:
Volker Lendecke 2020-03-20 13:58:21 +01:00 committed by Ralph Boehme
parent 177de8ec3f
commit e1418589b0
2 changed files with 342 additions and 1 deletions

View File

@ -108,6 +108,21 @@ struct ctdb_req_header;
void ctdbd_prep_hdr_next_reqid(
struct ctdbd_connection *conn, struct ctdb_req_header *hdr);
/*
* Async ctdb_request. iov[0] must start with an initialized
* struct ctdb_req_header
*/
struct tevent_req *ctdbd_req_send(
TALLOC_CTX *mem_ctx,
struct tevent_context *ev,
struct ctdbd_connection *conn,
struct iovec *iov,
size_t num_iov);
int ctdbd_req_recv(
struct tevent_req *req,
TALLOC_CTX *mem_ctx,
struct ctdb_req_header **reply);
struct tevent_req *ctdbd_parse_send(TALLOC_CTX *mem_ctx,
struct tevent_context *ev,
struct ctdbd_connection *conn,

View File

@ -36,6 +36,7 @@
#include "lib/util/sys_rw.h"
#include "lib/util/blocking.h"
#include "ctdb/include/ctdb_protocol.h"
#include "lib/async_req/async_sock.h"
/* paths to these include files come from --with-ctdb= in configure */
@ -85,6 +86,8 @@ struct ctdbd_connection {
* Outgoing queue for writev_send of asynchronous ctdb requests
*/
struct tevent_queue *outgoing;
struct tevent_req **pending;
struct tevent_req *read_req;
};
static void ctdbd_async_socket_handler(struct tevent_context *ev,
@ -99,7 +102,8 @@ static bool ctdbd_conn_has_async_sends(struct ctdbd_connection *conn)
static bool ctdbd_conn_has_async_reqs(struct ctdbd_connection *conn)
{
return (conn->fde != NULL);
size_t len = talloc_array_length(conn->pending);
return ((len != 0) || (conn->fde != NULL));
}
static uint32_t ctdbd_next_reqid(struct ctdbd_connection *conn)
@ -1850,6 +1854,328 @@ void ctdbd_prep_hdr_next_reqid(
};
}
struct ctdbd_pkt_read_state {
uint8_t *pkt;
};
static ssize_t ctdbd_pkt_read_more(
uint8_t *buf, size_t buflen, void *private_data);
static void ctdbd_pkt_read_done(struct tevent_req *subreq);
static struct tevent_req *ctdbd_pkt_read_send(
TALLOC_CTX *mem_ctx, struct tevent_context *ev, int fd)
{
struct tevent_req *req = NULL, *subreq = NULL;
struct ctdbd_pkt_read_state *state = NULL;
req = tevent_req_create(mem_ctx, &state, struct ctdbd_pkt_read_state);
if (req == NULL) {
return NULL;
}
subreq = read_packet_send(state, ev, fd, 4, ctdbd_pkt_read_more, NULL);
if (tevent_req_nomem(subreq, req)) {
return tevent_req_post(req, ev);
}
tevent_req_set_callback(subreq, ctdbd_pkt_read_done, req);
return req;
}
static ssize_t ctdbd_pkt_read_more(
uint8_t *buf, size_t buflen, void *private_data)
{
uint32_t msglen;
if (buflen < 4) {
return -1;
}
if (buflen > 4) {
return 0; /* Been here, done */
}
memcpy(&msglen, buf, 4);
if (msglen < sizeof(struct ctdb_req_header)) {
return -1;
}
return msglen - sizeof(msglen);
}
static void ctdbd_pkt_read_done(struct tevent_req *subreq)
{
struct tevent_req *req = tevent_req_callback_data(
subreq, struct tevent_req);
struct ctdbd_pkt_read_state *state = tevent_req_data(
req, struct ctdbd_pkt_read_state);
ssize_t nread;
int err;
nread = read_packet_recv(subreq, state, &state->pkt, &err);
TALLOC_FREE(subreq);
if (nread == -1) {
tevent_req_error(req, err);
return;
}
tevent_req_done(req);
}
static int ctdbd_pkt_read_recv(
struct tevent_req *req, TALLOC_CTX *mem_ctx, uint8_t **pkt)
{
struct ctdbd_pkt_read_state *state = tevent_req_data(
req, struct ctdbd_pkt_read_state);
int err;
if (tevent_req_is_unix_error(req, &err)) {
return err;
}
*pkt = talloc_move(mem_ctx, &state->pkt);
tevent_req_received(req);
return 0;
}
static bool ctdbd_conn_receive_next(struct ctdbd_connection *conn);
static void ctdbd_conn_received(struct tevent_req *subreq);
struct ctdbd_req_state {
struct ctdbd_connection *conn;
struct tevent_context *ev;
uint32_t reqid;
struct ctdb_req_header *reply;
};
static void ctdbd_req_unset_pending(struct tevent_req *req)
{
struct ctdbd_req_state *state = tevent_req_data(
req, struct ctdbd_req_state);
struct ctdbd_connection *conn = state->conn;
size_t num_pending = talloc_array_length(conn->pending);
size_t i, num_after;
tevent_req_set_cleanup_fn(req, NULL);
if (num_pending == 1) {
/*
* conn->read_req is a child of conn->pending
*/
TALLOC_FREE(conn->pending);
conn->read_req = NULL;
return;
}
for (i=0; i<num_pending; i++) {
if (req == conn->pending[i]) {
break;
}
}
if (i == num_pending) {
/*
* Something's seriously broken. Just returning here is the
* right thing nevertheless, the point of this routine is to
* remove ourselves from conn->pending.
*/
return;
}
num_after = num_pending - i - 1;
if (num_after > 0) {
memmove(&conn->pending[i],
&conn->pending[i] + 1,
sizeof(*conn->pending) * num_after);
}
conn->pending = talloc_realloc(
NULL, conn->pending, struct tevent_req *, num_pending - 1);
}
static void ctdbd_req_cleanup(
struct tevent_req *req, enum tevent_req_state req_state)
{
ctdbd_req_unset_pending(req);
}
static bool ctdbd_req_set_pending(struct tevent_req *req)
{
struct ctdbd_req_state *state = tevent_req_data(
req, struct ctdbd_req_state);
struct ctdbd_connection *conn = state->conn;
struct tevent_req **pending = NULL;
size_t num_pending = talloc_array_length(conn->pending);
bool ok;
pending = talloc_realloc(
conn, conn->pending, struct tevent_req *, num_pending + 1);
if (pending == NULL) {
return false;
}
pending[num_pending] = req;
conn->pending = pending;
tevent_req_set_cleanup_fn(req, ctdbd_req_cleanup);
ok = ctdbd_conn_receive_next(conn);
if (!ok) {
ctdbd_req_unset_pending(req);
return false;
}
return true;
}
static bool ctdbd_conn_receive_next(struct ctdbd_connection *conn)
{
size_t num_pending = talloc_array_length(conn->pending);
struct tevent_req *req = NULL;
struct ctdbd_req_state *state = NULL;
if (conn->read_req != NULL) {
return true;
}
if (num_pending == 0) {
/*
* done for now
*/
return true;
}
req = conn->pending[0];
state = tevent_req_data(req, struct ctdbd_req_state);
conn->read_req = ctdbd_pkt_read_send(
conn->pending, state->ev, conn->fd);
if (conn->read_req == NULL) {
return false;
}
tevent_req_set_callback(conn->read_req, ctdbd_conn_received, conn);
return true;
}
static void ctdbd_conn_received(struct tevent_req *subreq)
{
struct ctdbd_connection *conn = tevent_req_callback_data(
subreq, struct ctdbd_connection);
TALLOC_CTX *frame = talloc_stackframe();
uint8_t *pkt = NULL;
int ret;
struct ctdb_req_header *hdr = NULL;
uint32_t reqid;
struct tevent_req *req = NULL;
struct ctdbd_req_state *state = NULL;
size_t i, num_pending;
bool ok;
SMB_ASSERT(subreq == conn->read_req);
conn->read_req = NULL;
ret = ctdbd_pkt_read_recv(subreq, frame, &pkt);
TALLOC_FREE(subreq);
if (ret != 0) {
cluster_fatal("ctdbd_pkt_read failed\n");
}
hdr = (struct ctdb_req_header *)pkt;
reqid = hdr->reqid;
num_pending = talloc_array_length(conn->pending);
for (i=0; i<num_pending; i++) {
req = conn->pending[i];
state = tevent_req_data(req, struct ctdbd_req_state);
if (state->reqid == reqid) {
break;
}
}
if (i == num_pending) {
/* not found */
TALLOC_FREE(frame);
return;
}
state->reply = talloc_move(state, &hdr);
tevent_req_defer_callback(req, state->ev);
tevent_req_done(req);
TALLOC_FREE(frame);
ok = ctdbd_conn_receive_next(conn);
if (!ok) {
cluster_fatal("ctdbd_conn_receive_next failed\n");
}
}
static void ctdbd_req_written(struct tevent_req *subreq);
struct tevent_req *ctdbd_req_send(
TALLOC_CTX *mem_ctx,
struct tevent_context *ev,
struct ctdbd_connection *conn,
struct iovec *iov,
size_t num_iov)
{
struct tevent_req *req = NULL, *subreq = NULL;
struct ctdbd_req_state *state = NULL;
struct ctdb_req_header *hdr = NULL;
bool ok;
req = tevent_req_create(mem_ctx, &state, struct ctdbd_req_state);
if (req == NULL) {
return NULL;
}
state->conn = conn;
state->ev = ev;
if ((num_iov == 0) ||
(iov[0].iov_len < sizeof(struct ctdb_req_header))) {
tevent_req_error(req, EINVAL);
return tevent_req_post(req, ev);
}
hdr = iov[0].iov_base;
state->reqid = hdr->reqid;
ok = ctdbd_req_set_pending(req);
if (!ok) {
tevent_req_oom(req);
return tevent_req_post(req, ev);
}
subreq = writev_send(
state, ev, conn->outgoing, conn->fd, false, iov, num_iov);
if (tevent_req_nomem(subreq, req)) {
return tevent_req_post(req, ev);
}
tevent_req_set_callback(subreq, ctdbd_req_written, req);
return req;
}
static void ctdbd_req_written(struct tevent_req *subreq)
{
struct tevent_req *req = tevent_req_callback_data(
subreq, struct tevent_req);
ssize_t nwritten;
int err;
nwritten = writev_recv(subreq, &err);
TALLOC_FREE(subreq);
if (nwritten == -1) {
tevent_req_error(req, err);
return;
}
}
int ctdbd_req_recv(
struct tevent_req *req,
TALLOC_CTX *mem_ctx,
struct ctdb_req_header **reply)
{
struct ctdbd_req_state *state = tevent_req_data(
req, struct ctdbd_req_state);
int err;
if (tevent_req_is_unix_error(req, &err)) {
return err;
}
*reply = talloc_move(mem_ctx, &state->reply);
tevent_req_received(req);
return 0;
}
struct ctdbd_parse_state {
struct tevent_context *ev;
struct ctdbd_connection *conn;