1
0
mirror of https://github.com/samba-team/samba.git synced 2025-03-08 04:58:40 +03:00

ctdb-common: Add trivial FD monitoring abstraction

Signed-off-by: Martin Schwenke <martin@meltin.net>
Reviewed-by: Amitay Isaacs <amitay@gmail.com>
This commit is contained in:
Martin Schwenke 2022-02-01 11:44:48 +11:00 committed by Martin Schwenke
parent f9467cdf3b
commit 8d04235f46
3 changed files with 831 additions and 2 deletions

602
ctdb/common/tmon.c Normal file
View File

@ -0,0 +1,602 @@
/*
Trivial FD monitoring
Copyright (C) Martin Schwenke & Amitay Isaacs, DataDirect Networks 2022
This program is free software; you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation; either version 3 of the License, or
(at your option) any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program; if not, see <http://www.gnu.org/licenses/>.
*/
#include "replace.h"
#include <ctype.h>
#include "lib/util/blocking.h"
#include "lib/util/sys_rw.h"
#include "lib/util/tevent_unix.h"
#include "lib/util/util.h"
#include "lib/util/smb_strtox.h"
#include "lib/async_req/async_sock.h"
#include "common/tmon.h"
enum tmon_message_type {
TMON_MSG_EXIT = 1,
TMON_MSG_ERRNO,
TMON_MSG_PING,
TMON_MSG_ASCII,
TMON_MSG_CUSTOM,
};
struct tmon_pkt {
enum tmon_message_type type;
uint16_t val;
};
struct tmon_buf {
uint8_t data[4];
};
static void tmon_packet_push(struct tmon_pkt *pkt,
struct tmon_buf *buf)
{
uint16_t type_n, val_n;
type_n = htons(pkt->type);
val_n = htons(pkt->val);
memcpy(&buf->data[0], &type_n, 2);
memcpy(&buf->data[2], &val_n, 2);
}
static void tmon_packet_pull(struct tmon_buf *buf,
struct tmon_pkt *pkt)
{
uint16_t type_n, val_n;
memcpy(&type_n, &buf->data[0], 2);
memcpy(&val_n, &buf->data[2], 2);
pkt->type = ntohs(type_n);
pkt->val = ntohs(val_n);
}
static int tmon_packet_write(int fd, struct tmon_pkt *pkt)
{
struct tmon_buf buf;
ssize_t n;
tmon_packet_push(pkt, &buf);
n = sys_write(fd, &buf.data[0], sizeof(buf.data));
if (n == -1) {
return errno;
}
return 0;
}
bool tmon_set_exit(struct tmon_pkt *pkt)
{
*pkt = (struct tmon_pkt) {
.type = TMON_MSG_EXIT,
};
return true;
}
bool tmon_set_errno(struct tmon_pkt *pkt, int err)
{
if (err < 0 && err > UINT16_MAX) {
return false;
}
*pkt = (struct tmon_pkt) {
.type = TMON_MSG_ERRNO,
.val = (uint16_t)err,
};
return true;
}
bool tmon_set_ping(struct tmon_pkt *pkt)
{
*pkt = (struct tmon_pkt) {
.type = TMON_MSG_PING,
};
return true;
}
bool tmon_set_ascii(struct tmon_pkt *pkt, char c)
{
if (!isascii(c)) {
return false;
}
*pkt = (struct tmon_pkt) {
.type = TMON_MSG_ASCII,
.val = (uint16_t)c,
};
return true;
}
bool tmon_set_custom(struct tmon_pkt *pkt, uint16_t val)
{
*pkt = (struct tmon_pkt) {
.type = TMON_MSG_CUSTOM,
.val = val,
};
return true;
}
static bool tmon_parse_exit(struct tmon_pkt *pkt)
{
if (pkt->type != TMON_MSG_EXIT) {
return false;
}
if (pkt->val != 0) {
return false;
}
return true;
}
static bool tmon_parse_errno(struct tmon_pkt *pkt, int *err)
{
if (pkt->type != TMON_MSG_ERRNO) {
return false;
}
*err= (int)pkt->val;
return true;
}
bool tmon_parse_ping(struct tmon_pkt *pkt)
{
if (pkt->type != TMON_MSG_PING) {
return false;
}
if (pkt->val != 0) {
return false;
}
return true;
}
bool tmon_parse_ascii(struct tmon_pkt *pkt, char *c)
{
if (pkt->type != TMON_MSG_ASCII) {
return false;
}
if (!isascii((int)pkt->val)) {
return false;
}
*c = (char)pkt->val;
return true;
}
bool tmon_parse_custom(struct tmon_pkt *pkt, uint16_t *val)
{
if (pkt->type != TMON_MSG_CUSTOM) {
return false;
}
*val = pkt->val;
return true;
}
struct tmon_state {
int fd;
int direction;
struct tevent_context *ev;
bool monitor_close;
unsigned long write_interval;
unsigned long read_timeout;
struct tmon_actions actions;
struct tevent_timer *timer;
void *private_data;
};
static void tmon_readable(struct tevent_req *subreq);
static bool tmon_set_timeout(struct tevent_req *req,
struct tevent_context *ev);
static void tmon_timedout(struct tevent_context *ev,
struct tevent_timer *te,
struct timeval now,
void *private_data);
static void tmon_write_loop(struct tevent_req *subreq);
struct tevent_req *tmon_send(TALLOC_CTX *mem_ctx,
struct tevent_context *ev,
int fd,
int direction,
unsigned long read_timeout,
unsigned long write_interval,
struct tmon_actions *actions,
void *private_data)
{
struct tevent_req *req, *subreq;
struct tmon_state *state;
bool status;
req = tevent_req_create(mem_ctx, &state, struct tmon_state);
if (req == NULL) {
return NULL;
}
if (actions != NULL) {
/* If FD isn't readable then read actions are invalid */
if (!(direction & TMON_FD_READ) &&
(actions->timeout_callback != NULL ||
actions->read_callback != NULL ||
read_timeout != 0)) {
tevent_req_error(req, EINVAL);
return tevent_req_post(req, ev);
}
/* If FD isn't writeable then write actions are invalid */
if (!(direction & TMON_FD_WRITE) &&
(actions->write_callback != NULL ||
write_interval != 0)) {
tevent_req_error(req, EINVAL);
return tevent_req_post(req, ev);
}
/* Can't specify write interval without a callback */
if (state->write_interval != 0 &&
state->actions.write_callback == NULL) {
tevent_req_error(req, EINVAL);
return tevent_req_post(req, ev);
}
}
state->fd = fd;
state->direction = direction;
state->ev = ev;
state->write_interval = write_interval;
state->read_timeout = read_timeout;
state->private_data = private_data;
if (actions != NULL) {
state->actions = *actions;
}
status = set_close_on_exec(fd);
if (!status) {
tevent_req_error(req, errno);
return tevent_req_post(req, ev);
}
if (direction & TMON_FD_READ) {
subreq = wait_for_read_send(state, ev, fd, true);
if (tevent_req_nomem(subreq, req)) {
return tevent_req_post(req, ev);
}
tevent_req_set_callback(subreq, tmon_readable, req);
}
if (state->read_timeout != 0) {
status = tmon_set_timeout(req, state->ev);
if (!status) {
tevent_req_error(req, ENOMEM);
return tevent_req_post(req, ev);
}
}
if (state->write_interval != 0) {
subreq = tevent_wakeup_send(
state,
state->ev,
tevent_timeval_current_ofs(state->write_interval, 0));
if (tevent_req_nomem(subreq, req)) {
return tevent_req_post(req, state->ev);
}
tevent_req_set_callback(subreq, tmon_write_loop, req);
}
return req;
}
static void tmon_readable(struct tevent_req *subreq)
{
struct tevent_req *req = tevent_req_callback_data(
subreq, struct tevent_req);
struct tmon_state *state = tevent_req_data( req, struct tmon_state);
struct tmon_buf buf;
struct tmon_pkt pkt;
ssize_t nread;
bool status;
int err;
int ret;
status = wait_for_read_recv(subreq, &ret);
TALLOC_FREE(subreq);
if (!status) {
if (ret == EPIPE && state->actions.close_callback != NULL) {
ret = state->actions.close_callback(state->private_data);
if (ret == TMON_STATUS_EXIT) {
ret = 0;
}
}
if (ret == 0) {
tevent_req_done(req);
} else {
tevent_req_error(req, ret);
}
return;
}
nread = sys_read(state->fd, buf.data, sizeof(buf.data));
if (nread == -1) {
tevent_req_error(req, errno);
return;
}
if (nread == 0) {
/* Can't happen, treat like EPIPE, above */
tevent_req_error(req, EPIPE);
return;
}
if (nread != sizeof(buf.data)) {
tevent_req_error(req, EPROTO);
return;
}
tmon_packet_pull(&buf, &pkt);
switch (pkt.type) {
case TMON_MSG_EXIT:
status = tmon_parse_exit(&pkt);
if (!status) {
tevent_req_error(req, EPROTO);
return;
}
tevent_req_done(req);
return;
case TMON_MSG_ERRNO:
status = tmon_parse_errno(&pkt, &err);
if (!status) {
err = EPROTO;
}
tevent_req_error(req, err);
return;
default:
break;
}
if (state->actions.read_callback == NULL) {
/* Shouldn't happen, other end should not write */
tevent_req_error(req, EIO);
return;
}
ret = state->actions.read_callback(state->private_data, &pkt);
if (ret == TMON_STATUS_EXIT) {
tevent_req_done(req);
return;
}
if (ret != 0) {
tevent_req_error(req, ret);
return;
}
subreq = wait_for_read_send(state, state->ev, state->fd, true);
if (tevent_req_nomem(subreq, req)) {
return;
}
tevent_req_set_callback(subreq, tmon_readable, req);
/* Reset read timeout */
if (state->read_timeout != 0) {
status = tmon_set_timeout(req, state->ev);
if (!status) {
tevent_req_error(req, ENOMEM);
return;
}
}
}
static bool tmon_set_timeout(struct tevent_req *req,
struct tevent_context *ev)
{
struct tmon_state *state = tevent_req_data(
req, struct tmon_state);
struct timeval endtime =
tevent_timeval_current_ofs(state->read_timeout, 0);
TALLOC_FREE(state->timer);
state->timer = tevent_add_timer(ev, req, endtime, tmon_timedout, req);
if (tevent_req_nomem(state->timer, req)) {
return false;
}
return true;
}
static void tmon_timedout(struct tevent_context *ev,
struct tevent_timer *te,
struct timeval now,
void *private_data)
{
struct tevent_req *req = talloc_get_type_abort(
private_data, struct tevent_req);
struct tmon_state *state = tevent_req_data(req, struct tmon_state);
int ret;
TALLOC_FREE(state->timer);
if (state->actions.timeout_callback != NULL) {
ret = state->actions.timeout_callback(state->private_data);
if (ret == TMON_STATUS_EXIT) {
ret = 0;
}
} else {
ret = ETIMEDOUT;
}
if (ret == 0) {
tevent_req_done(req);
} else {
tevent_req_error(req, ret);
}
}
static void tmon_write_loop(struct tevent_req *subreq)
{
struct tevent_req *req = tevent_req_callback_data(
subreq, struct tevent_req);
struct tmon_state *state = tevent_req_data(
req, struct tmon_state);
struct tmon_pkt pkt;
int ret;
bool status;
status = tevent_wakeup_recv(subreq);
TALLOC_FREE(subreq);
if (!status) {
/* Ignore error */
}
ret = state->actions.write_callback(state->private_data, &pkt);
if (ret == TMON_STATUS_EXIT) {
tevent_req_done(req);
return;
}
if (ret == TMON_STATUS_SKIP) {
goto done;
}
if (ret != 0) {
tevent_req_error(req, ret);
return;
}
status = tmon_write(req, &pkt);
if (!status) {
return;
}
done:
subreq = tevent_wakeup_send(
state,
state->ev,
tevent_timeval_current_ofs(state->write_interval, 0));
if (tevent_req_nomem(subreq, req)) {
return;
}
tevent_req_set_callback(subreq, tmon_write_loop, req);
}
bool tmon_write(struct tevent_req *req, struct tmon_pkt *pkt)
{
struct tmon_state *state = tevent_req_data(
req, struct tmon_state);
int ret;
if (state->fd == -1) {
return false;
}
if (!(state->direction & TMON_FD_WRITE)) {
tevent_req_error(req, EINVAL);
return false;
}
ret = tmon_packet_write(state->fd, pkt);
if (ret != 0) {
if (ret == EPIPE && state->actions.close_callback != NULL) {
ret = state->actions.close_callback(state->private_data);
if (ret == TMON_STATUS_EXIT) {
ret = 0;
}
}
if (ret == 0) {
tevent_req_done(req);
} else {
tevent_req_error(req, ret);
}
state->fd = -1;
return false;
}
return true;
}
bool tmon_recv(struct tevent_req *req, int *perr)
{
if (tevent_req_is_unix_error(req, perr)) {
return false;
}
return true;
}
static int ping_writer(void *private_data, struct tmon_pkt *pkt)
{
tmon_set_ping(pkt);
return 0;
}
static int ping_reader(void *private_data, struct tmon_pkt *pkt)
{
bool status;
/* Only expect pings */
status = tmon_parse_ping(pkt);
if (!status) {
return EPROTO;
}
return 0;
}
struct tevent_req *tmon_ping_send(TALLOC_CTX *mem_ctx,
struct tevent_context *ev,
int fd,
int direction,
unsigned long timeout,
unsigned long interval)
{
struct tevent_req *req;
struct tmon_actions actions = {
.write_callback = NULL,
};
if ((direction & TMON_FD_WRITE) && interval != 0) {
actions.write_callback = ping_writer;
}
if ((direction & TMON_FD_READ) && timeout != 0) {
actions.read_callback = ping_reader;
}
req = tmon_send(mem_ctx,
ev,
fd,
direction,
timeout,
interval,
&actions,
NULL);
return req;
}
bool tmon_ping_recv(struct tevent_req *req, int *perr)
{
bool status;
status = tmon_recv(req, perr);
return status;
}

218
ctdb/common/tmon.h Normal file
View File

@ -0,0 +1,218 @@
/*
Trivial FD monitoring
Copyright (C) Martin Schwenke & Amitay Isaacs, DataDirect Networks 2022
This program is free software; you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation; either version 3 of the License, or
(at your option) any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program; if not, see <http://www.gnu.org/licenses/>.
*/
#ifndef __CTDB_TMON_H__
#define __CTDB_TMON_H__
#include <talloc.h>
#include <tevent.h>
/**
* @file tmon.h
*
* @brief Interprocess file descriptor (pipe and socketpair) monitoring
*
* Assumes 2 processes connected by a pipe(2) or a socketpair(2). A
* simple protocol is defined to allow sending various types of status
* information. When a pipe(2) is used the reader can monitor for
* close and read packets, while the sender can write packets. When a
* socketpair(2) is used then both ends can monitor for close, and
* read and write packets. A read timeout can be specified,
* terminating the computation if no packets are received.
*
* A simplified interface is provided to monitor for close and allow
* sending/monitoring of one-way ping packets. A ping timeout occurs
* when one end is expecting pings but none are received during the
* timeout interval - no response is sent to pings, they merely reset
* a timer on the receiving end.
*/
struct tmon_pkt;
struct tmon_actions {
int (*write_callback)(void *private_data, struct tmon_pkt *pkt);
int (*timeout_callback)(void *private_data);
int (*read_callback)(void *private_data, struct tmon_pkt *pkt);
int (*close_callback)(void *private_data);
};
/*
* Return value from write_callback() and read_callback() to cause the
* computation to exit successfully. For consistency this can also be
* used with timeout_callback() and close_callback().
*/
#define TMON_STATUS_EXIT (-1)
/* Return value from write_callback() to skip write */
#define TMON_STATUS_SKIP (-2)
/* For direction, below */
#define TMON_FD_READ 0x1
#define TMON_FD_WRITE 0x2
#define TMON_FD_BOTH (TMON_FD_READ | TMON_FD_WRITE)
/**
* @brief Async computation to start FD monitoring
*
* @param[in] mem_ctx Talloc memory context
* @param[in] ev Tevent context
* @param[in] fd File descriptor for "this" end of pipe/socketpair
* @param[in] direction Read, write or both - for sanity checking
* @param[in] read_timeout Seconds to trigger timeout when no packets received
* @param[in] write_interval Seconds to trigger write_callback
* @param[in] actions struct containing callbacks
* @param[in] private_data Passed to callbacks
* @return new tevent request or NULL on failure
*
* @note read_timeout implies monitor_close
*
* @note The computation will complete when:
*
* - The writing end closes (e.g. writer process terminates) - EPIPE
* - read_timeout is non-zero and timeout occurs - ETIMEDOUT
* - Packets received with no read_callback defined - EIO
* - Invalid or unexpected packet received - EPROTO
* - File descriptor readable but no bytes to read - error: EPIPE
* - Invalid combination of direction, callbacks, timeouts: EINVAL
* - An unexpected error occurs - other
*
* @note action callbacks return an int that can be used to trigger
* other errors or override an error. For example:
*
* - write_callback() can return non-zero errno, causing an error
* - close_callback() can return zero, overriding the default EPIPE error
* - timeout_callback() can return something other than ETIMEDOUT
* - read_callback() can return EPROTO for unexpected packet types
*
* Reading of exit and errno packets is handled internally (read
* callback is never called). Write callback can return special
* value TMON_STATUS_SKIP to avoid sending any data.
*/
struct tevent_req *tmon_send(TALLOC_CTX *mem_ctx,
struct tevent_context *ev,
int fd,
int direction,
unsigned long read_timeout,
unsigned long write_interval,
struct tmon_actions *actions,
void *private_data);
/**
* @brief Async computation to end FD monitoring
*
* @param[in] req Tevent request
* @param[out] perr errno in case of failure
* @return true on success, false on failure
*/
bool tmon_recv(struct tevent_req *req, int *perr);
/**
* @brief Fill in an exit packet
*
* @param[in,out] pkt An exit packet
* @return true on success, false on failure
*/
bool tmon_set_exit(struct tmon_pkt *pkt);
/**
* @brief Fill in an errno packet
*
* @param[in,out] pkt An errno packet
* @param[in] err An errno to send in packet
* @return true on success, false on failure
*/
bool tmon_set_errno(struct tmon_pkt *pkt, int err);
/**
* @brief Fill in a ping packet
*
* @param[in,out] pkt A ping packet
* @return true on success, false on failure
*/
bool tmon_set_ping(struct tmon_pkt *pkt);
/**
* @brief Fill in an ASCII packet
*
* @param[in,out] pkt An ASCII packet
* @param[in] c An ASCII character to send in packet
* @return true on success, false on failure
*/
bool tmon_set_ascii(struct tmon_pkt *pkt, char c);
/**
* @brief Fill in a custom packet
*
* @param[in,out] pkt A custom packet
* @param[in] val A uint16_t to send in a custom packet
* @return true on success, false on failure
*/
bool tmon_set_custom(struct tmon_pkt *pkt, uint16_t val);
/**
* @brief Validate a ping packet
*
* @param[in] pkt A ping packet
* @return true on success, false on failure
*/
bool tmon_parse_ping(struct tmon_pkt *pkt);
/**
* @brief Validate ASCII packet and parse out character
*
* @param[in] pkt An ASCII packet
* @param[out] c An ASCII character value from packet
* @return true on success, false on failure
*/
bool tmon_parse_ascii(struct tmon_pkt *pkt, char *c);
/**
* @brief Validate custom packet and parse out value
*
* @param[in] pkt A custom packet
* @param[out] val A uint16_t value from packet
* @return true on success, false on failure
*/
bool tmon_parse_custom(struct tmon_pkt *pkt, uint16_t *val);
/**
* @brief Write a packet
*
* @param[in] req Tevent request created by tmon_send
* @param[in] pkt Packet to write
* @return true on sucess, false on failure
*/
bool tmon_write(struct tevent_req *req, struct tmon_pkt *pkt);
/**
* @brief Async computation to start ping monitoring
*
* @param[in] mem_ctx Talloc memory context
* @param[in] ev Tevent context
* @param[in] fd File descriptor for "this" end of pipe/socketpair
* @param[in] direction Read, write or both - for sanity checking
* @param[in] timeout Timeout for pings on receiving end
* @param[in] interval Send a ping packet every interval seconds
*/
struct tevent_req *tmon_ping_send(TALLOC_CTX *mem_ctx,
struct tevent_context *ev,
int fd,
int direction,
unsigned long timeout,
unsigned long interval);
bool tmon_ping_recv(struct tevent_req *req, int *perr);
#endif /* __CTDB_TMON_H__ */

View File

@ -441,10 +441,19 @@ def build(bld):
run_proc.c
sock_client.c
srvid.c
tmon.c
tunable.c
'''),
deps='''samba-util sys_rw tevent-util
replace talloc tevent tdb popt''')
deps='''samba-util
LIBASYNC_REQ
sys_rw
tevent-util
replace
talloc
tevent
tdb
popt
''')
bld.SAMBA_SUBSYSTEM('ctdb-logging-conf',
source='common/logging_conf.c',