1
0
mirror of https://github.com/samba-team/samba.git synced 2025-02-28 01:58:17 +03:00

lib: Use ctdbd_req_send/recv in ctdb_parse_send/recv

Signed-off-by: Volker Lendecke <vl@samba.org>
Reviewed-by: Ralph Boehme <slow@samba.org>
This commit is contained in:
Volker Lendecke 2020-03-11 11:03:06 +01:00 committed by Ralph Boehme
parent 7a7d56c562
commit 296114cf47
2 changed files with 13 additions and 341 deletions

View File

@ -1360,195 +1360,6 @@ struct ctdb_pkt_send_state {
size_t packet_len;
};
static void ctdb_pkt_send_cleanup(struct tevent_req *req,
enum tevent_req_state req_state);
/**
* Asynchronously send a ctdb packet given as iovec array
*
* Note: the passed iov array is not const here. Similar
* functions in samba take a const array and create a copy
* before calling iov_advance() on the array.
*
* This function will modify the iov array! But
* this is a static function and our only caller
* ctdb_parse_send/recv is prepared for this to
* happen!
**/
static struct tevent_req *ctdb_pkt_send_send(TALLOC_CTX *mem_ctx,
struct tevent_context *ev,
struct ctdbd_connection *conn,
uint32_t reqid,
struct iovec *iov,
int iovcnt,
enum dbwrap_req_state *req_state)
{
struct tevent_req *req = NULL;
struct ctdb_pkt_send_state *state = NULL;
ssize_t nwritten;
bool ok;
DBG_DEBUG("sending async ctdb reqid [%" PRIu32 "]\n", reqid);
req = tevent_req_create(mem_ctx, &state, struct ctdb_pkt_send_state);
if (req == NULL) {
return NULL;
}
*state = (struct ctdb_pkt_send_state) {
.ev = ev,
.conn = conn,
.req = req,
.reqid = reqid,
.iov = iov,
.iovcnt = iovcnt,
.packet_len = iov_buflen(iov, iovcnt),
};
tevent_req_set_cleanup_fn(req, ctdb_pkt_send_cleanup);
*req_state = DBWRAP_REQ_QUEUED;
if (ctdbd_conn_has_async_sends(conn)) {
/*
* Can't attempt direct write with messages already queued and
* possibly in progress
*/
DLIST_ADD_END(conn->send_list, state);
return req;
}
/*
* Attempt a direct write. If this returns short, schedule the
* remaining data as an async write, otherwise we're already done.
*/
nwritten = writev(conn->fd, state->iov, state->iovcnt);
if ((size_t)nwritten == state->packet_len) {
DBG_DEBUG("Finished sending reqid [%" PRIu32 "]\n", reqid);
*req_state = DBWRAP_REQ_DISPATCHED;
tevent_req_done(req);
return tevent_req_post(req, ev);
}
if (nwritten == -1) {
if (errno != EINTR && errno != EAGAIN && errno != EWOULDBLOCK) {
cluster_fatal("cluster write error\n");
}
nwritten = 0;
}
DBG_DEBUG("Posting async write of reqid [%" PRIu32"]"
"after short write [%zd]\n", reqid, nwritten);
ok = iov_advance(&state->iov, &state->iovcnt, nwritten);
if (!ok) {
*req_state = DBWRAP_REQ_ERROR;
tevent_req_error(req, EIO);
return tevent_req_post(req, ev);
}
/*
* As this is the first async write req we post, we must enable
* fd-writable events.
*/
TEVENT_FD_WRITEABLE(conn->fde);
DLIST_ADD_END(conn->send_list, state);
return req;
}
static int ctdb_pkt_send_state_destructor(struct ctdb_pkt_send_state *state)
{
struct ctdbd_connection *conn = state->conn;
if (conn == NULL) {
return 0;
}
if (state->req == NULL) {
DBG_DEBUG("Removing cancelled reqid [%" PRIu32"]\n",
state->reqid);
state->conn = NULL;
DLIST_REMOVE(conn->send_list, state);
return 0;
}
DBG_DEBUG("Reparenting cancelled reqid [%" PRIu32"]\n",
state->reqid);
talloc_reparent(state->req, conn, state);
state->req = NULL;
return -1;
}
static void ctdb_pkt_send_cleanup(struct tevent_req *req,
enum tevent_req_state req_state)
{
struct ctdb_pkt_send_state *state = tevent_req_data(
req, struct ctdb_pkt_send_state);
struct ctdbd_connection *conn = state->conn;
size_t missing_len = 0;
if (conn == NULL) {
return;
}
missing_len = iov_buflen(state->iov, state->iovcnt);
if (state->packet_len == missing_len) {
/*
* We haven't yet started sending this one, so we can just
* remove it from the pending list
*/
missing_len = 0;
}
if (missing_len != 0) {
uint8_t *buf = NULL;
if (req_state != TEVENT_REQ_RECEIVED) {
/*
* Wait til the req_state is TEVENT_REQ_RECEIVED, as
* that will be the final state when the request state
* is talloc_free'd from tallloc_req_received(). Which
* ensures we only run the following code *ONCE*!
*/
return;
}
DBG_DEBUG("Cancelling in-flight reqid [%" PRIu32"]\n",
state->reqid);
/*
* A request in progress of being sent. Reparent the iov buffer
* so we can continue sending the request. See also the comment
* in ctdbd_parse_send() when copying the key buffer.
*/
buf = iov_concat(state, state->iov, state->iovcnt);
if (buf == NULL) {
cluster_fatal("iov_concat error\n");
return;
}
state->iovcnt = 1;
state->_iov.iov_base = buf;
state->_iov.iov_len = missing_len;
state->iov = &state->_iov;
talloc_set_destructor(state, ctdb_pkt_send_state_destructor);
return;
}
DBG_DEBUG("Removing pending reqid [%" PRIu32"]\n", state->reqid);
state->conn = NULL;
DLIST_REMOVE(conn->send_list, state);
if (!ctdbd_conn_has_async_sends(conn)) {
DBG_DEBUG("No more sends, disabling fd-writable events\n");
TEVENT_FD_NOT_WRITEABLE(conn->fde);
}
}
static int ctdb_pkt_send_handler(struct ctdbd_connection *conn)
{
struct ctdb_pkt_send_state *state = NULL;
@ -1606,19 +1417,6 @@ static int ctdb_pkt_send_handler(struct ctdbd_connection *conn)
return 0;
}
static int ctdb_pkt_send_recv(struct tevent_req *req)
{
int ret;
if (tevent_req_is_unix_error(req, &ret)) {
tevent_req_received(req);
return ret;
}
tevent_req_received(req);
return 0;
}
struct ctdb_pkt_recv_state {
struct ctdb_pkt_recv_state *prev, *next;
struct tevent_context *ev;
@ -1634,56 +1432,6 @@ struct ctdb_pkt_recv_state {
struct ctdb_req_header *hdr;
};
static void ctdb_pkt_recv_cleanup(struct tevent_req *req,
enum tevent_req_state req_state);
static struct tevent_req *ctdb_pkt_recv_send(TALLOC_CTX *mem_ctx,
struct tevent_context *ev,
struct ctdbd_connection *conn,
uint32_t reqid)
{
struct tevent_req *req = NULL;
struct ctdb_pkt_recv_state *state = NULL;
req = tevent_req_create(mem_ctx, &state, struct ctdb_pkt_recv_state);
if (req == NULL) {
return NULL;
}
*state = (struct ctdb_pkt_recv_state) {
.ev = ev,
.conn = conn,
.reqid = reqid,
.req = req,
};
tevent_req_set_cleanup_fn(req, ctdb_pkt_recv_cleanup);
/*
* fd-readable event is always set for the fde, no need to deal with
* that here.
*/
DLIST_ADD_END(conn->recv_list, state);
DBG_DEBUG("Posted receive reqid [%" PRIu32 "]\n", state->reqid);
return req;
}
static void ctdb_pkt_recv_cleanup(struct tevent_req *req,
enum tevent_req_state req_state)
{
struct ctdb_pkt_recv_state *state = tevent_req_data(
req, struct ctdb_pkt_recv_state);
struct ctdbd_connection *conn = state->conn;
if (conn == NULL) {
return;
}
state->conn = NULL;
DLIST_REMOVE(conn->recv_list, state);
}
static int ctdb_pkt_recv_handler(struct ctdbd_connection *conn)
{
struct ctdb_pkt_recv_state *state = NULL;
@ -1793,26 +1541,6 @@ static int ctdb_pkt_recv_handler(struct ctdbd_connection *conn)
return 0;
}
static int ctdb_pkt_recv_recv(struct tevent_req *req,
TALLOC_CTX *mem_ctx,
struct ctdb_req_header **_hdr)
{
struct ctdb_pkt_recv_state *state = tevent_req_data(
req, struct ctdb_pkt_recv_state);
int error;
if (tevent_req_is_unix_error(req, &error)) {
DBG_ERR("ctdb_read_req failed %s\n", strerror(error));
tevent_req_received(req);
return error;
}
*_hdr = talloc_move(mem_ctx, &state->hdr);
tevent_req_received(req);
return 0;
}
static int ctdbd_connection_destructor(struct ctdbd_connection *c)
{
TALLOC_FREE(c->fde);
@ -2188,10 +1916,8 @@ struct ctdbd_parse_state {
TDB_DATA data,
void *private_data);
void *private_data;
enum dbwrap_req_state *req_state;
};
static void ctdbd_parse_pkt_send_done(struct tevent_req *subreq);
static void ctdbd_parse_done(struct tevent_req *subreq);
struct tevent_req *ctdbd_parse_send(TALLOC_CTX *mem_ctx,
@ -2218,13 +1944,14 @@ struct tevent_req *ctdbd_parse_send(TALLOC_CTX *mem_ctx,
return NULL;
}
*req_state = DBWRAP_REQ_DISPATCHED;
*state = (struct ctdbd_parse_state) {
.ev = ev,
.conn = conn,
.reqid = ctdbd_next_reqid(conn),
.parser = parser,
.private_data = private_data,
.req_state = req_state,
};
flags = local_copy ? CTDB_WANT_READONLY : 0;
@ -2264,55 +1991,17 @@ struct tevent_req *ctdbd_parse_send(TALLOC_CTX *mem_ctx,
state->iov[1].iov_base = state->key.dptr;
state->iov[1].iov_len = state->key.dsize;
/*
* Note that ctdb_pkt_send_send()
* will modify state->iov using
* iov_advance() without making a copy.
*/
subreq = ctdb_pkt_send_send(state,
ev,
conn,
state->reqid,
state->iov,
ARRAY_SIZE(state->iov),
req_state);
subreq = ctdbd_req_send(
state, ev, conn, state->iov, ARRAY_SIZE(state->iov));
if (tevent_req_nomem(subreq, req)) {
*req_state = DBWRAP_REQ_ERROR;
return tevent_req_post(req, ev);
}
tevent_req_set_callback(subreq, ctdbd_parse_pkt_send_done, req);
tevent_req_set_callback(subreq, ctdbd_parse_done, req);
return req;
}
static void ctdbd_parse_pkt_send_done(struct tevent_req *subreq)
{
struct tevent_req *req = tevent_req_callback_data(
subreq, struct tevent_req);
struct ctdbd_parse_state *state = tevent_req_data(
req, struct ctdbd_parse_state);
int ret;
ret = ctdb_pkt_send_recv(subreq);
TALLOC_FREE(subreq);
if (tevent_req_error(req, ret)) {
DBG_DEBUG("ctdb_pkt_send_recv failed %s\n", strerror(ret));
return;
}
subreq = ctdb_pkt_recv_send(state,
state->ev,
state->conn,
state->reqid);
if (tevent_req_nomem(subreq, req)) {
return;
}
*state->req_state = DBWRAP_REQ_DISPATCHED;
tevent_req_set_callback(subreq, ctdbd_parse_done, req);
return;
}
static void ctdbd_parse_done(struct tevent_req *subreq)
{
struct tevent_req *req = tevent_req_callback_data(
@ -2323,12 +2012,10 @@ static void ctdbd_parse_done(struct tevent_req *subreq)
struct ctdb_reply_call_old *reply = NULL;
int ret;
DBG_DEBUG("async parse request finished\n");
ret = ctdb_pkt_recv_recv(subreq, state, &hdr);
ret = ctdbd_req_recv(subreq, state, &hdr);
TALLOC_FREE(subreq);
if (tevent_req_error(req, ret)) {
DBG_ERR("ctdb_pkt_recv_recv returned %s\n", strerror(ret));
DBG_DEBUG("ctdb_req_recv failed %s\n", strerror(ret));
return;
}
SMB_ASSERT(hdr != NULL);
@ -2360,14 +2047,5 @@ static void ctdbd_parse_done(struct tevent_req *subreq)
int ctdbd_parse_recv(struct tevent_req *req)
{
int error;
if (tevent_req_is_unix_error(req, &error)) {
DBG_DEBUG("async parse returned %s\n", strerror(error));
tevent_req_received(req);
return error;
}
tevent_req_received(req);
return 0;
return tevent_req_simple_recv_unix(req);
}

View File

@ -92,10 +92,11 @@ static int ctdb_async_ctx_init_internal(TALLOC_CTX *mem_ctx,
}
become_root();
ret = ctdbd_init_connection(mem_ctx,
lp_ctdbd_socket(),
lp_ctdb_timeout(),
&ctdb_async_ctx.async_conn);
ret = ctdbd_init_async_connection(
mem_ctx,
lp_ctdbd_socket(),
lp_ctdb_timeout(),
&ctdb_async_ctx.async_conn);
unbecome_root();
if (ret != 0 || ctdb_async_ctx.async_conn == NULL) {
@ -103,13 +104,6 @@ static int ctdb_async_ctx_init_internal(TALLOC_CTX *mem_ctx,
return EIO;
}
ret = ctdbd_setup_fde(ctdb_async_ctx.async_conn, ev);
if (ret != 0) {
DBG_ERR("ctdbd_setup_fde failed\n");
TALLOC_FREE(ctdb_async_ctx.async_conn);
return ret;
}
ctdb_async_ctx.initialized = true;
return 0;
}