1
0
mirror of https://github.com/samba-team/samba.git synced 2025-01-10 01:18:15 +03:00
samba-mirror/ctdb/common/tmon.c

Ignoring revisions in .git-blame-ignore-revs. Click here to bypass and see the normal blame view.

603 lines
12 KiB
C
Raw Normal View History

/*
Trivial FD monitoring
Copyright (C) Martin Schwenke & Amitay Isaacs, DataDirect Networks 2022
This program is free software; you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation; either version 3 of the License, or
(at your option) any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program; if not, see <http://www.gnu.org/licenses/>.
*/
#include "replace.h"
#include <ctype.h>
#include "lib/util/blocking.h"
#include "lib/util/sys_rw.h"
#include "lib/util/tevent_unix.h"
#include "lib/util/util.h"
#include "lib/util/smb_strtox.h"
#include "lib/async_req/async_sock.h"
#include "common/tmon.h"
enum tmon_message_type {
TMON_MSG_EXIT = 1,
TMON_MSG_ERRNO,
TMON_MSG_PING,
TMON_MSG_ASCII,
TMON_MSG_CUSTOM,
};
struct tmon_pkt {
enum tmon_message_type type;
uint16_t val;
};
struct tmon_buf {
uint8_t data[4];
};
static void tmon_packet_push(struct tmon_pkt *pkt,
struct tmon_buf *buf)
{
uint16_t type_n, val_n;
type_n = htons(pkt->type);
val_n = htons(pkt->val);
memcpy(&buf->data[0], &type_n, 2);
memcpy(&buf->data[2], &val_n, 2);
}
static void tmon_packet_pull(struct tmon_buf *buf,
struct tmon_pkt *pkt)
{
uint16_t type_n, val_n;
memcpy(&type_n, &buf->data[0], 2);
memcpy(&val_n, &buf->data[2], 2);
pkt->type = ntohs(type_n);
pkt->val = ntohs(val_n);
}
static int tmon_packet_write(int fd, struct tmon_pkt *pkt)
{
struct tmon_buf buf;
ssize_t n;
tmon_packet_push(pkt, &buf);
n = sys_write(fd, &buf.data[0], sizeof(buf.data));
if (n == -1) {
return errno;
}
return 0;
}
bool tmon_set_exit(struct tmon_pkt *pkt)
{
*pkt = (struct tmon_pkt) {
.type = TMON_MSG_EXIT,
};
return true;
}
bool tmon_set_errno(struct tmon_pkt *pkt, int err)
{
if (err <= 0 || err > UINT16_MAX) {
return false;
}
*pkt = (struct tmon_pkt) {
.type = TMON_MSG_ERRNO,
.val = (uint16_t)err,
};
return true;
}
bool tmon_set_ping(struct tmon_pkt *pkt)
{
*pkt = (struct tmon_pkt) {
.type = TMON_MSG_PING,
};
return true;
}
bool tmon_set_ascii(struct tmon_pkt *pkt, char c)
{
if (!isascii(c)) {
return false;
}
*pkt = (struct tmon_pkt) {
.type = TMON_MSG_ASCII,
.val = (uint16_t)c,
};
return true;
}
bool tmon_set_custom(struct tmon_pkt *pkt, uint16_t val)
{
*pkt = (struct tmon_pkt) {
.type = TMON_MSG_CUSTOM,
.val = val,
};
return true;
}
static bool tmon_parse_exit(struct tmon_pkt *pkt)
{
if (pkt->type != TMON_MSG_EXIT) {
return false;
}
if (pkt->val != 0) {
return false;
}
return true;
}
static bool tmon_parse_errno(struct tmon_pkt *pkt, int *err)
{
if (pkt->type != TMON_MSG_ERRNO) {
return false;
}
*err= (int)pkt->val;
return true;
}
bool tmon_parse_ping(struct tmon_pkt *pkt)
{
if (pkt->type != TMON_MSG_PING) {
return false;
}
if (pkt->val != 0) {
return false;
}
return true;
}
bool tmon_parse_ascii(struct tmon_pkt *pkt, char *c)
{
if (pkt->type != TMON_MSG_ASCII) {
return false;
}
if (!isascii((int)pkt->val)) {
return false;
}
*c = (char)pkt->val;
return true;
}
bool tmon_parse_custom(struct tmon_pkt *pkt, uint16_t *val)
{
if (pkt->type != TMON_MSG_CUSTOM) {
return false;
}
*val = pkt->val;
return true;
}
struct tmon_state {
int fd;
int direction;
struct tevent_context *ev;
bool monitor_close;
unsigned long write_interval;
unsigned long read_timeout;
struct tmon_actions actions;
struct tevent_timer *timer;
void *private_data;
};
static void tmon_readable(struct tevent_req *subreq);
static bool tmon_set_timeout(struct tevent_req *req,
struct tevent_context *ev);
static void tmon_timedout(struct tevent_context *ev,
struct tevent_timer *te,
struct timeval now,
void *private_data);
static void tmon_write_loop(struct tevent_req *subreq);
struct tevent_req *tmon_send(TALLOC_CTX *mem_ctx,
struct tevent_context *ev,
int fd,
int direction,
unsigned long read_timeout,
unsigned long write_interval,
struct tmon_actions *actions,
void *private_data)
{
struct tevent_req *req, *subreq;
struct tmon_state *state;
bool status;
req = tevent_req_create(mem_ctx, &state, struct tmon_state);
if (req == NULL) {
return NULL;
}
if (actions != NULL) {
/* If FD isn't readable then read actions are invalid */
if (!(direction & TMON_FD_READ) &&
(actions->timeout_callback != NULL ||
actions->read_callback != NULL ||
read_timeout != 0)) {
tevent_req_error(req, EINVAL);
return tevent_req_post(req, ev);
}
/* If FD isn't writeable then write actions are invalid */
if (!(direction & TMON_FD_WRITE) &&
(actions->write_callback != NULL ||
write_interval != 0)) {
tevent_req_error(req, EINVAL);
return tevent_req_post(req, ev);
}
/* Can't specify write interval without a callback */
if (state->write_interval != 0 &&
state->actions.write_callback == NULL) {
tevent_req_error(req, EINVAL);
return tevent_req_post(req, ev);
}
}
state->fd = fd;
state->direction = direction;
state->ev = ev;
state->write_interval = write_interval;
state->read_timeout = read_timeout;
state->private_data = private_data;
if (actions != NULL) {
state->actions = *actions;
}
status = set_close_on_exec(fd);
if (!status) {
tevent_req_error(req, errno);
return tevent_req_post(req, ev);
}
if (direction & TMON_FD_READ) {
subreq = wait_for_read_send(state, ev, fd, true);
if (tevent_req_nomem(subreq, req)) {
return tevent_req_post(req, ev);
}
tevent_req_set_callback(subreq, tmon_readable, req);
}
if (state->read_timeout != 0) {
status = tmon_set_timeout(req, state->ev);
if (!status) {
tevent_req_error(req, ENOMEM);
return tevent_req_post(req, ev);
}
}
if (state->write_interval != 0) {
subreq = tevent_wakeup_send(
state,
state->ev,
tevent_timeval_current_ofs(state->write_interval, 0));
if (tevent_req_nomem(subreq, req)) {
return tevent_req_post(req, state->ev);
}
tevent_req_set_callback(subreq, tmon_write_loop, req);
}
return req;
}
static void tmon_readable(struct tevent_req *subreq)
{
struct tevent_req *req = tevent_req_callback_data(
subreq, struct tevent_req);
struct tmon_state *state = tevent_req_data( req, struct tmon_state);
struct tmon_buf buf;
struct tmon_pkt pkt;
ssize_t nread;
bool status;
int err;
int ret;
status = wait_for_read_recv(subreq, &ret);
TALLOC_FREE(subreq);
if (!status) {
if (ret == EPIPE && state->actions.close_callback != NULL) {
ret = state->actions.close_callback(state->private_data);
if (ret == TMON_STATUS_EXIT) {
ret = 0;
}
}
if (ret == 0) {
tevent_req_done(req);
} else {
tevent_req_error(req, ret);
}
return;
}
nread = sys_read(state->fd, buf.data, sizeof(buf.data));
if (nread == -1) {
tevent_req_error(req, errno);
return;
}
if (nread == 0) {
/* Can't happen, treat like EPIPE, above */
tevent_req_error(req, EPIPE);
return;
}
if (nread != sizeof(buf.data)) {
tevent_req_error(req, EPROTO);
return;
}
tmon_packet_pull(&buf, &pkt);
switch (pkt.type) {
case TMON_MSG_EXIT:
status = tmon_parse_exit(&pkt);
if (!status) {
tevent_req_error(req, EPROTO);
return;
}
tevent_req_done(req);
return;
case TMON_MSG_ERRNO:
status = tmon_parse_errno(&pkt, &err);
if (!status) {
err = EPROTO;
}
tevent_req_error(req, err);
return;
default:
break;
}
if (state->actions.read_callback == NULL) {
/* Shouldn't happen, other end should not write */
tevent_req_error(req, EIO);
return;
}
ret = state->actions.read_callback(state->private_data, &pkt);
if (ret == TMON_STATUS_EXIT) {
tevent_req_done(req);
return;
}
if (ret != 0) {
tevent_req_error(req, ret);
return;
}
subreq = wait_for_read_send(state, state->ev, state->fd, true);
if (tevent_req_nomem(subreq, req)) {
return;
}
tevent_req_set_callback(subreq, tmon_readable, req);
/* Reset read timeout */
if (state->read_timeout != 0) {
status = tmon_set_timeout(req, state->ev);
if (!status) {
tevent_req_error(req, ENOMEM);
return;
}
}
}
static bool tmon_set_timeout(struct tevent_req *req,
struct tevent_context *ev)
{
struct tmon_state *state = tevent_req_data(
req, struct tmon_state);
struct timeval endtime =
tevent_timeval_current_ofs(state->read_timeout, 0);
TALLOC_FREE(state->timer);
state->timer = tevent_add_timer(ev, req, endtime, tmon_timedout, req);
if (tevent_req_nomem(state->timer, req)) {
return false;
}
return true;
}
static void tmon_timedout(struct tevent_context *ev,
struct tevent_timer *te,
struct timeval now,
void *private_data)
{
struct tevent_req *req = talloc_get_type_abort(
private_data, struct tevent_req);
struct tmon_state *state = tevent_req_data(req, struct tmon_state);
int ret;
TALLOC_FREE(state->timer);
if (state->actions.timeout_callback != NULL) {
ret = state->actions.timeout_callback(state->private_data);
if (ret == TMON_STATUS_EXIT) {
ret = 0;
}
} else {
ret = ETIMEDOUT;
}
if (ret == 0) {
tevent_req_done(req);
} else {
tevent_req_error(req, ret);
}
}
static void tmon_write_loop(struct tevent_req *subreq)
{
struct tevent_req *req = tevent_req_callback_data(
subreq, struct tevent_req);
struct tmon_state *state = tevent_req_data(
req, struct tmon_state);
struct tmon_pkt pkt;
int ret;
bool status;
status = tevent_wakeup_recv(subreq);
TALLOC_FREE(subreq);
if (!status) {
/* Ignore error */
}
ret = state->actions.write_callback(state->private_data, &pkt);
if (ret == TMON_STATUS_EXIT) {
tevent_req_done(req);
return;
}
if (ret == TMON_STATUS_SKIP) {
goto done;
}
if (ret != 0) {
tevent_req_error(req, ret);
return;
}
status = tmon_write(req, &pkt);
if (!status) {
return;
}
done:
subreq = tevent_wakeup_send(
state,
state->ev,
tevent_timeval_current_ofs(state->write_interval, 0));
if (tevent_req_nomem(subreq, req)) {
return;
}
tevent_req_set_callback(subreq, tmon_write_loop, req);
}
bool tmon_write(struct tevent_req *req, struct tmon_pkt *pkt)
{
struct tmon_state *state = tevent_req_data(
req, struct tmon_state);
int ret;
if (state->fd == -1) {
return false;
}
if (!(state->direction & TMON_FD_WRITE)) {
tevent_req_error(req, EINVAL);
return false;
}
ret = tmon_packet_write(state->fd, pkt);
if (ret != 0) {
if (ret == EPIPE && state->actions.close_callback != NULL) {
ret = state->actions.close_callback(state->private_data);
if (ret == TMON_STATUS_EXIT) {
ret = 0;
}
}
if (ret == 0) {
tevent_req_done(req);
} else {
tevent_req_error(req, ret);
}
state->fd = -1;
return false;
}
return true;
}
bool tmon_recv(struct tevent_req *req, int *perr)
{
if (tevent_req_is_unix_error(req, perr)) {
return false;
}
return true;
}
static int ping_writer(void *private_data, struct tmon_pkt *pkt)
{
tmon_set_ping(pkt);
return 0;
}
static int ping_reader(void *private_data, struct tmon_pkt *pkt)
{
bool status;
/* Only expect pings */
status = tmon_parse_ping(pkt);
if (!status) {
return EPROTO;
}
return 0;
}
struct tevent_req *tmon_ping_send(TALLOC_CTX *mem_ctx,
struct tevent_context *ev,
int fd,
int direction,
unsigned long timeout,
unsigned long interval)
{
struct tevent_req *req;
struct tmon_actions actions = {
.write_callback = NULL,
};
if ((direction & TMON_FD_WRITE) && interval != 0) {
actions.write_callback = ping_writer;
}
if ((direction & TMON_FD_READ) && timeout != 0) {
actions.read_callback = ping_reader;
}
req = tmon_send(mem_ctx,
ev,
fd,
direction,
timeout,
interval,
&actions,
NULL);
return req;
}
bool tmon_ping_recv(struct tevent_req *req, int *perr)
{
bool status;
status = tmon_recv(req, perr);
return status;
}