1
0
mirror of https://github.com/samba-team/samba.git synced 2025-01-10 01:18:15 +03:00

ctdb-common: Add sock_daemon abstraction

Signed-off-by: Amitay Isaacs <amitay@gmail.com>
Reviewed-by: Martin Schwenke <martin@meltin.net>
This commit is contained in:
Amitay Isaacs 2016-09-03 23:27:23 +10:00 committed by Amitay Isaacs
parent 75a25d1331
commit 7ae530c2ab
5 changed files with 1718 additions and 0 deletions

831
ctdb/common/sock_daemon.c Normal file
View File

@ -0,0 +1,831 @@
/*
A server based on unix domain socket
Copyright (C) Amitay Isaacs 2016
This program is free software; you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation; either version 3 of the License, or
(at your option) any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program; if not, see <http://www.gnu.org/licenses/>.
*/
#include "replace.h"
#include "system/filesys.h"
#include "system/network.h"
#include "system/wait.h"
#include <talloc.h>
#include <tevent.h>
#include "lib/async_req/async_sock.h"
#include "lib/util/debug.h"
#include "lib/util/blocking.h"
#include "lib/util/dlinklist.h"
#include "lib/util/tevent_unix.h"
#include "common/logging.h"
#include "common/reqid.h"
#include "common/comm.h"
#include "common/pidfile.h"
#include "common/sock_daemon.h"
struct sock_socket {
struct sock_socket *prev, *next;
const char *sockpath;
struct sock_socket_funcs *funcs;
void *private_data;
int fd;
struct tevent_req *req;
};
struct sock_client {
struct sock_client *prev, *next;
struct tevent_req *req;
struct sock_client_context *client_ctx;
};
struct sock_client_context {
struct tevent_context *ev;
struct sock_socket *sock;
int fd;
struct comm_context *comm;
struct sock_client *client;
};
struct sock_daemon_context {
struct sock_daemon_funcs *funcs;
void *private_data;
struct pidfile_context *pid_ctx;
struct sock_socket *socket_list;
struct tevent_req *req;
};
/*
* Process a single client
*/
static void sock_client_read_handler(uint8_t *buf, size_t buflen,
void *private_data);
static void sock_client_read_done(struct tevent_req *subreq);
static void sock_client_dead_handler(void *private_data);
static int sock_client_context_destructor(
struct sock_client_context *client_ctx);
static int sock_client_context_init(TALLOC_CTX *mem_ctx,
struct tevent_context *ev,
struct sock_socket *sock,
int client_fd,
struct sock_client *client,
struct sock_client_context **result)
{
struct sock_client_context *client_ctx;
int ret;
client_ctx = talloc_zero(mem_ctx, struct sock_client_context);
if (client_ctx == NULL) {
return ENOMEM;
}
client_ctx->ev = ev;
client_ctx->sock = sock;
client_ctx->fd = client_fd;
client_ctx->client = client;
ret = comm_setup(client_ctx, ev, client_fd,
sock_client_read_handler, client_ctx,
sock_client_dead_handler, client_ctx,
&client_ctx->comm);
if (ret != 0) {
talloc_free(client_ctx);
return ret;
}
if (sock->funcs->connect != NULL) {
bool status;
status = sock->funcs->connect(client_ctx, sock->private_data);
if (! status) {
talloc_free(client_ctx);
close(client_fd);
return 0;
}
}
talloc_set_destructor(client_ctx, sock_client_context_destructor);
*result = client_ctx;
return 0;
}
static void sock_client_read_handler(uint8_t *buf, size_t buflen,
void *private_data)
{
struct sock_client_context *client_ctx = talloc_get_type_abort(
private_data, struct sock_client_context);
struct sock_socket *sock = client_ctx->sock;
struct tevent_req *subreq;
subreq = sock->funcs->read_send(client_ctx, client_ctx->ev,
client_ctx, buf, buflen,
sock->private_data);
if (subreq == NULL) {
talloc_free(client_ctx);
return;
}
tevent_req_set_callback(subreq, sock_client_read_done, client_ctx);
}
static void sock_client_read_done(struct tevent_req *subreq)
{
struct sock_client_context *client_ctx = tevent_req_callback_data(
subreq, struct sock_client_context);
struct sock_socket *sock = client_ctx->sock;
int ret;
bool status;
status = sock->funcs->read_recv(subreq, &ret);
if (! status) {
D_ERR("client read failed with ret=%d\n", ret);
talloc_free(client_ctx);
}
}
static void sock_client_dead_handler(void *private_data)
{
struct sock_client_context *client_ctx = talloc_get_type_abort(
private_data, struct sock_client_context);
struct sock_socket *sock = client_ctx->sock;
if (sock->funcs->disconnect != NULL) {
sock->funcs->disconnect(client_ctx, sock->private_data);
}
talloc_free(client_ctx);
}
static int sock_client_context_destructor(
struct sock_client_context *client_ctx)
{
TALLOC_FREE(client_ctx->client);
TALLOC_FREE(client_ctx->comm);
if (client_ctx->fd != -1) {
close(client_ctx->fd);
client_ctx->fd = -1;
}
return 0;
}
/*
* Process a single listening socket
*/
static int socket_setup(const char *sockpath, bool remove_before_use)
{
struct sockaddr_un addr;
size_t len;
int ret, fd;
memset(&addr, 0, sizeof(addr));
addr.sun_family = AF_UNIX;
len = strlcpy(addr.sun_path, sockpath, sizeof(addr.sun_path));
if (len >= sizeof(addr.sun_path)) {
D_ERR("socket path too long: %s\n", sockpath);
return -1;
}
fd = socket(AF_UNIX, SOCK_STREAM, 0);
if (fd == -1) {
D_ERR("socket create failed - %s\n", sockpath);
return -1;
}
ret = set_blocking(fd, false);
if (ret != 0) {
D_ERR("socket set nonblocking failed - %s\n", sockpath);
close(fd);
return -1;
}
if (remove_before_use) {
unlink(sockpath);
}
ret = bind(fd, (struct sockaddr *)&addr, sizeof(addr));
if (ret != 0) {
D_ERR("socket bind failed - %s\n", sockpath);
close(fd);
return -1;
}
ret = listen(fd, 10);
if (ret != 0) {
D_ERR("socket listen failed - %s\n", sockpath);
close(fd);
return -1;
}
return fd;
}
static int sock_socket_destructor(struct sock_socket *sock);
static int sock_socket_init(TALLOC_CTX *mem_ctx, const char *sockpath,
struct sock_socket_funcs *funcs,
void *private_data,
bool remove_before_use,
struct sock_socket **result)
{
struct sock_socket *sock;
if (funcs == NULL) {
return EINVAL;
}
if (funcs->read_send == NULL || funcs->read_recv == NULL) {
return EINVAL;
}
sock = talloc_zero(mem_ctx, struct sock_socket);
if (sock == NULL) {
return ENOMEM;
}
sock->sockpath = sockpath;
sock->funcs = funcs;
sock->private_data = private_data;
sock->fd = socket_setup(sockpath, remove_before_use);
if (sock->fd == -1) {
talloc_free(sock);
return EIO;
}
talloc_set_destructor(sock, sock_socket_destructor);
*result = sock;
return 0;
}
static int sock_socket_destructor(struct sock_socket *sock)
{
if (sock->fd == -1) {
close(sock->fd);
sock->fd = -1;
}
unlink(sock->sockpath);
return 0;
}
struct sock_socket_start_state {
struct tevent_context *ev;
struct sock_socket *sock;
struct sock_client *client_list;
};
static int sock_socket_start_state_destructor(
struct sock_socket_start_state *state);
static void sock_socket_start_new_client(struct tevent_req *subreq);
static int sock_socket_start_client_destructor(struct sock_client *client);
static struct tevent_req *sock_socket_start_send(TALLOC_CTX *mem_ctx,
struct tevent_context *ev,
struct sock_socket *sock)
{
struct tevent_req *req, *subreq;
struct sock_socket_start_state *state;
req = tevent_req_create(mem_ctx, &state,
struct sock_socket_start_state);
if (req == NULL) {
return NULL;
}
state->ev = ev;
state->sock = sock;
talloc_set_destructor(state, sock_socket_start_state_destructor);
subreq = accept_send(state, ev, sock->fd);
if (tevent_req_nomem(subreq, req)) {
return tevent_req_post(req, ev);
}
tevent_req_set_callback(subreq, sock_socket_start_new_client, req);
return req;
}
static int sock_socket_start_state_destructor(
struct sock_socket_start_state *state)
{
struct sock_client *client;
while ((client = state->client_list) != NULL) {
talloc_free(client);
}
return 0;
}
static void sock_socket_start_new_client(struct tevent_req *subreq)
{
struct tevent_req *req = tevent_req_callback_data(
subreq, struct tevent_req);
struct sock_socket_start_state *state = tevent_req_data(
req, struct sock_socket_start_state);
struct sock_client *client;
int client_fd, ret;
client_fd = accept_recv(subreq, NULL, NULL, &ret);
TALLOC_FREE(subreq);
if (client_fd == -1) {
D_ERR("failed to accept new connection\n");
}
subreq = accept_send(state, state->ev, state->sock->fd);
if (tevent_req_nomem(subreq, req)) {
return;
}
tevent_req_set_callback(subreq, sock_socket_start_new_client, req);
if (client_fd == -1) {
return;
}
client = talloc_zero(state, struct sock_client);
if (tevent_req_nomem(client, req)) {
close(client_fd);
return;
}
client->req = req;
ret = sock_client_context_init(client, state->ev, state->sock,
client_fd, client, &client->client_ctx);
if (ret != 0) {
talloc_free(client);
return;
}
talloc_set_destructor(client, sock_socket_start_client_destructor);
DLIST_ADD(state->client_list, client);
}
static int sock_socket_start_client_destructor(struct sock_client *client)
{
struct sock_socket_start_state *state = tevent_req_data(
client->req, struct sock_socket_start_state);
DLIST_REMOVE(state->client_list, client);
TALLOC_FREE(client->client_ctx);
return 0;
}
static bool sock_socket_start_recv(struct tevent_req *req, int *perr)
{
struct sock_socket_start_state *state = tevent_req_data(
req, struct sock_socket_start_state);
int ret;
state->sock->req = NULL;
if (tevent_req_is_unix_error(req, &ret)) {
if (perr != NULL) {
*perr = ret;
}
return false;
}
return true;
}
/*
* Send message to a client
*/
struct sock_socket_write_state {
int status;
};
static void sock_socket_write_done(struct tevent_req *subreq);
struct tevent_req *sock_socket_write_send(TALLOC_CTX *mem_ctx,
struct tevent_context *ev,
struct sock_client_context *client_ctx,
uint8_t *buf, size_t buflen)
{
struct tevent_req *req, *subreq;
struct sock_socket_write_state *state;
req = tevent_req_create(mem_ctx, &state,
struct sock_socket_write_state);
if (req == NULL) {
return NULL;
}
subreq = comm_write_send(state, ev, client_ctx->comm, buf, buflen);
if (tevent_req_nomem(subreq, req)) {
return tevent_req_post(req, ev);
}
tevent_req_set_callback(subreq, sock_socket_write_done, req);
return req;
}
static void sock_socket_write_done(struct tevent_req *subreq)
{
struct tevent_req *req = tevent_req_callback_data(
subreq, struct tevent_req);
struct sock_socket_write_state *state = tevent_req_data(
req, struct sock_socket_write_state);
int ret;
bool status;
status = comm_write_recv(subreq, &ret);
TALLOC_FREE(subreq);
if (! status) {
state->status = ret;
return;
}
}
bool sock_socket_write_recv(struct tevent_req *req, int *perr)
{
struct sock_socket_write_state *state = tevent_req_data(
req, struct sock_socket_write_state);
int ret;
if (tevent_req_is_unix_error(req, &ret)) {
if (perr != NULL) {
*perr = ret;
}
return false;
}
if (state->status != 0) {
if (perr != NULL) {
*perr = state->status;
}
return false;
}
if (perr != NULL) {
*perr = 0;
}
return true;
}
/*
* Socket daemon
*/
static int sock_daemon_context_destructor(struct sock_daemon_context *sockd);
int sock_daemon_setup(TALLOC_CTX *mem_ctx, const char *daemon_name,
const char *logging, const char *debug_level,
const char *pidfile,
struct sock_daemon_funcs *funcs,
void *private_data,
struct sock_daemon_context **out)
{
struct sock_daemon_context *sockd;
int ret;
sockd = talloc_zero(mem_ctx, struct sock_daemon_context);
if (sockd == NULL) {
return ENOMEM;
}
sockd->funcs = funcs;
sockd->private_data = private_data;
ret = logging_init(sockd, logging, debug_level, daemon_name);
if (ret != 0) {
fprintf(stderr,
"Failed to initialize logging, logging=%s, debug=%s\n",
logging, debug_level);
return ret;
}
if (pidfile != NULL) {
ret = pidfile_create(sockd, pidfile, &sockd->pid_ctx);
if (ret != 0) {
talloc_free(sockd);
return EEXIST;
}
}
talloc_set_destructor(sockd, sock_daemon_context_destructor);
*out = sockd;
return 0;
}
static int sock_daemon_context_destructor(struct sock_daemon_context *sockd)
{
if (sockd->req != NULL) {
tevent_req_done(sockd->req);
}
return 0;
}
int sock_daemon_add_unix(struct sock_daemon_context *sockd,
const char *sockpath,
struct sock_socket_funcs *funcs,
void *private_data)
{
struct sock_socket *sock;
int ret;
bool remove_before_use = false;
remove_before_use = (sockd->pid_ctx != NULL) ? true : false;
ret = sock_socket_init(sockd, sockpath, funcs, private_data,
remove_before_use, &sock);
if (ret != 0) {
return ret;
}
D_NOTICE("listening on %s\n", sockpath);
DLIST_ADD(sockd->socket_list, sock);
return 0;
}
/*
* Run socket daemon
*/
struct sock_daemon_start_state {
struct tevent_context *ev;
struct sock_daemon_context *sockd;
pid_t pid_watch;
int fd;
};
static void sock_daemon_started(struct tevent_req *subreq);
static void sock_daemon_signal_handler(struct tevent_context *ev,
struct tevent_signal *se,
int signum, int count, void *siginfo,
void *private_data);
static void sock_daemon_socket_fail(struct tevent_req *subreq);
static void sock_daemon_watch_pid(struct tevent_req *subreq);
static void sock_daemon_reconfigure(struct sock_daemon_start_state *state);
static void sock_daemon_shutdown(struct sock_daemon_start_state *state);
struct tevent_req *sock_daemon_run_send(TALLOC_CTX *mem_ctx,
struct tevent_context *ev,
struct sock_daemon_context *sockd,
pid_t pid_watch)
{
struct tevent_req *req, *subreq;
struct sock_daemon_start_state *state;
struct tevent_signal *se;
struct sock_socket *sock;
req = tevent_req_create(mem_ctx, &state,
struct sock_daemon_start_state);
if (req == NULL) {
return NULL;
}
state->ev = ev;
state->sockd = sockd;
state->pid_watch = pid_watch;
state->fd = -1;
subreq = tevent_wakeup_send(state, ev,
tevent_timeval_current_ofs(0, 0));
if (tevent_req_nomem(subreq, req)) {
return tevent_req_post(req, ev);
}
tevent_req_set_callback(subreq, sock_daemon_started, req);
se = tevent_add_signal(ev, state, SIGHUP, 0,
sock_daemon_signal_handler, req);
if (tevent_req_nomem(se, req)) {
return tevent_req_post(req, ev);
}
se = tevent_add_signal(ev, state, SIGUSR1, 0,
sock_daemon_signal_handler, req);
if (tevent_req_nomem(se, req)) {
return tevent_req_post(req, ev);
}
se = tevent_add_signal(ev, state, SIGINT, 0,
sock_daemon_signal_handler, req);
if (tevent_req_nomem(se, req)) {
return tevent_req_post(req, ev);
}
se = tevent_add_signal(ev, state, SIGTERM, 0,
sock_daemon_signal_handler, req);
if (tevent_req_nomem(se, req)) {
return tevent_req_post(req, ev);
}
for (sock = sockd->socket_list; sock != NULL; sock = sock->next) {
subreq = sock_socket_start_send(state, ev, sock);
if (tevent_req_nomem(subreq, req)) {
return tevent_req_post(req, ev);
}
tevent_req_set_callback(subreq, sock_daemon_socket_fail, req);
sock->req = subreq;
}
if (pid_watch > 1) {
subreq = tevent_wakeup_send(state, ev,
tevent_timeval_current_ofs(1,0));
if (tevent_req_nomem(subreq, req)) {
return tevent_req_post(req, ev);
}
tevent_req_set_callback(subreq, sock_daemon_watch_pid, req);
}
sockd->req = req;
return req;
}
static void sock_daemon_started(struct tevent_req *subreq)
{
struct tevent_req *req = tevent_req_callback_data(
subreq, struct tevent_req);
struct sock_daemon_start_state *state = tevent_req_data(
req, struct sock_daemon_start_state);
struct sock_daemon_context *sockd = state->sockd;
D_NOTICE("daemon started, pid=%u\n", getpid());
if (sockd->funcs != NULL && sockd->funcs->startup != NULL) {
sockd->funcs->startup(sockd->private_data);
}
}
static void sock_daemon_signal_handler(struct tevent_context *ev,
struct tevent_signal *se,
int signum, int count, void *siginfo,
void *private_data)
{
struct tevent_req *req = talloc_get_type_abort(
private_data, struct tevent_req);
struct sock_daemon_start_state *state = tevent_req_data(
req, struct sock_daemon_start_state);
D_NOTICE("Received signal %d\n", signum);
if (signum == SIGHUP || signum == SIGUSR1) {
sock_daemon_reconfigure(state);
return;
}
if (signum == SIGINT || signum == SIGTERM) {
sock_daemon_shutdown(state);
tevent_req_error(req, EINTR);
}
}
static void sock_daemon_reconfigure(struct sock_daemon_start_state *state)
{
struct sock_daemon_context *sockd = state->sockd;
if (sockd->funcs != NULL && sockd->funcs->reconfigure != NULL) {
sockd->funcs->reconfigure(sockd->private_data);
}
}
static void sock_daemon_shutdown(struct sock_daemon_start_state *state)
{
struct sock_daemon_context *sockd = state->sockd;
struct sock_socket *sock;
D_NOTICE("Shutting down\n");
while ((sock = sockd->socket_list) != NULL) {
DLIST_REMOVE(sockd->socket_list, sock);
TALLOC_FREE(sock->req);
TALLOC_FREE(sock);
}
if (sockd->funcs != NULL && sockd->funcs->shutdown != NULL) {
sockd->funcs->shutdown(sockd->private_data);
}
TALLOC_FREE(sockd->pid_ctx);
}
static void sock_daemon_socket_fail(struct tevent_req *subreq)
{
struct tevent_req *req = tevent_req_callback_data(
subreq, struct tevent_req);
struct sock_daemon_start_state *state = tevent_req_data(
req, struct sock_daemon_start_state);
int ret = 0;
bool status;
status = sock_socket_start_recv(subreq, &ret);
TALLOC_FREE(subreq);
if (! status) {
tevent_req_error(req, ret);
} else {
tevent_req_done(req);
}
sock_daemon_shutdown(state);
}
static void sock_daemon_watch_pid(struct tevent_req *subreq)
{
struct tevent_req *req = tevent_req_callback_data(
subreq, struct tevent_req);
struct sock_daemon_start_state *state = tevent_req_data(
req, struct sock_daemon_start_state);
int ret;
bool status;
status = tevent_wakeup_recv(subreq);
TALLOC_FREE(subreq);
if (! status) {
tevent_req_error(req, EIO);
return;
}
ret = kill(state->pid_watch, 0);
if (ret == -1) {
if (errno == ESRCH) {
D_ERR("PID %d gone away, exiting\n", state->pid_watch);
sock_daemon_shutdown(state);
tevent_req_error(req, ESRCH);
return;
} else {
D_ERR("Failed to check PID status %d, ret=%d\n",
state->pid_watch, errno);
}
}
subreq = tevent_wakeup_send(state, state->ev,
tevent_timeval_current_ofs(5,0));
if (tevent_req_nomem(subreq, req)) {
return;
}
tevent_req_set_callback(subreq, sock_daemon_watch_pid, req);
}
bool sock_daemon_run_recv(struct tevent_req *req, int *perr)
{
int ret;
if (tevent_req_is_unix_error(req, &ret)) {
if (perr != NULL) {
*perr = ret;
}
return false;
}
return true;
}
int sock_daemon_run(struct tevent_context *ev,
struct sock_daemon_context *sockd,
pid_t pid_watch)
{
struct tevent_req *req;
int ret;
bool status;
req = sock_daemon_run_send(ev, ev, sockd, pid_watch);
if (req == NULL) {
return ENOMEM;
}
tevent_req_poll(req, ev);
status = sock_daemon_run_recv(req, &ret);
sockd->req = NULL;
TALLOC_FREE(req);
if (! status) {
return ret;
}
return 0;
}

204
ctdb/common/sock_daemon.h Normal file
View File

@ -0,0 +1,204 @@
/*
A server based on unix domain socket
Copyright (C) Amitay Isaacs 2016
This program is free software; you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation; either version 3 of the License, or
(at your option) any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program; if not, see <http://www.gnu.org/licenses/>.
*/
#ifndef __CTDB_SOCK_DAEMON_H__
#define __CTDB_SOCK_DAEMON_H__
#include <talloc.h>
#include <tevent.h>
#include "common/logging.h"
/**
* @file sock_daemon.h
*
* @brief A framework for a server based on unix-domain sockets.
*
* This abstraction allows to build simple servers that communicate using
* unix-domain sockets. It takes care of the common boilerplate.
*/
/**
* @brief The abstract socket daemon context
*/
struct sock_daemon_context;
/**
* @brief The abstract socket client context
*/
struct sock_client_context;
/**
* @brief The callback routines called during daemon life cycle
*
* startup() is called when the daemon starts running
* either via sock_daemon_run() or via sock_daemon_run_send()
* reconfigure() is called when process receives SIGUSR1 or SIGHUP
* shutdown() is called when process receives SIGINT or SIGTERM
*/
struct sock_daemon_funcs {
void (*startup)(void *private_data);
void (*reconfigure)(void *private_data);
void (*shutdown)(void *private_data);
};
/**
* @brief The callback routines called for an unix-domain socket
*
* connect() is called when there is a new connection
*
* @param[in] client The new socket client context
* @param[in] private_data Private data set with the socket
* @retun true if connection should be accepted, false otherwise
*
*
* disconnect() is called when client closes connection
*
* @param[in] client The socket client context
* @param[in] private_data Private data associated with the socket
*
*
* read_send() starts the async computation to process data on the socket
*
* @param[in] mem_ctx Talloc memory context
* @param[in] ev Tevent context
* @param[in] client The socket client context
* @param[in] buf Data received from the client
* @param[in] buflen Length of the data
* @param[i] private_data Private data associatedwith the socket
* @return new tevent reques, or NULL on failure
*
*
* read_recv() ends the async computation to process data on the socket
*
* @param[in] req Tevent request
* @param[out] perr errno in case of failure
* @return true on success, false on failure
*
*/
struct sock_socket_funcs {
bool (*connect)(struct sock_client_context *client,
void *private_data);
void (*disconnect)(struct sock_client_context *client,
void *private_data);
struct tevent_req * (*read_send)(TALLOC_CTX *mem_ctx,
struct tevent_context *ev,
struct sock_client_context *client,
uint8_t *buf, size_t buflen,
void *private_data);
bool (*read_recv)(struct tevent_req *req, int *perr);
};
/**
* @brief Async computation to send data to the client
*
* @param[in] mem_ctx Talloc memory context
* @param[in] ev Tevent context
* @param[in] client The socket client context
* @param[in] buf Data to be sent to the client
* @param[in] buflen Length of the data
* @return new tevent request, or NULL on failure
*/
struct tevent_req *sock_socket_write_send(TALLOC_CTX *mem_ctx,
struct tevent_context *ev,
struct sock_client_context *client,
uint8_t *buf, size_t buflen);
/**
* @brief Async computation end to send data to client
*
* @param[in] req Tevent request
* @param[out] perr errno in case of failure
* @return true on success, false on failure
*/
bool sock_socket_write_recv(struct tevent_req *req, int *perr);
/**
* @brief Create a new socket daemon
*
* @param[in] mem_ctx Talloc memory context
* @param[in] daemon_name Name of the daemon, used for logging
* @param[in] logging Logging setup string
* @param[in] debug_level Debug level to log at
* @param[in] pidfile PID file to create, NULL if no PID file required
* @param[in] funcs Socket daemon callback routines
* @param[in] private_data Private data associated with callback routines
* @param[out] result New socket daemon context
* @return 0 on success, errno on failure
*/
int sock_daemon_setup(TALLOC_CTX *mem_ctx, const char *daemon_name,
const char *logging, const char *debug_level,
const char *pidfile,
struct sock_daemon_funcs *funcs,
void *private_data,
struct sock_daemon_context **result);
/**
* @brief Create and listen to the unix domain socket
*
* @param[in] sockd Socket daemon context
* @param[in] sockpath Unix domain socket path
* @param[in] funcs socket callback routines
* @param[in] private_data Private data associated with callback routines
* @return 0 on success, errno on failure
*/
int sock_daemon_add_unix(struct sock_daemon_context *sockd,
const char *sockpath,
struct sock_socket_funcs *funcs,
void *private_data);
/**
* @brief Async computation start to run a socket daemon
*
* @param[in] mem_ctx Talloc memory context
* @param[in] ev Tevent context
* @param[in] sockd The socket daemon context
* @param[in] pid_watch PID to watch. If PID goes away, shutdown.
* @return new tevent request, NULL on failure
*/
struct tevent_req *sock_daemon_run_send(TALLOC_CTX *mem_ctx,
struct tevent_context *ev,
struct sock_daemon_context *sockd,
pid_t pid_watch);
/**
* @brief Async computation end to run a socket daemon
*
* @param[in] req Tevent request
* @param[out] perr errno in case of failure
* @return true on success, false on failure
*/
bool sock_daemon_run_recv(struct tevent_req *req, int *perr);
/**
* @brief Sync way to start a daemon
*
* @param[in] ev Tevent context
* @param[in] sockd The socket daemon context
* @param[in] pid_watch PID to watch. If PID goes away, shutdown.
* @return 0 on success, errno on failure
*
* This call will return only on shutdown of the daemon
*/
int sock_daemon_run(struct tevent_context *ev,
struct sock_daemon_context *sockd,
pid_t pid_watch);
#endif /* __CTDB_SOCK_DAEMON_H__ */

View File

@ -0,0 +1,44 @@
#!/bin/sh
. "${TEST_SCRIPTS_DIR}/unit.sh"
pidfile="${TEST_VAR_DIR}/sock_daemon_test.pid.$$"
sockpath="${TEST_VAR_DIR}/sock_daemon_test.sock.$$"
remove_files ()
{
rm -f "$pidfile"
rm -f "$sockpath"
}
test_cleanup remove_files
result_filter ()
{
_pid="[0-9][0-9]*"
sed -e "s|pid=${_pid}|pid=PID|" \
-e "s|PID ${_pid}|PID PID|" \
-e "s|\[${_pid}\]|[PID]|"
}
ok <<EOF
test1[PID]: listening on $sockpath
test2[PID]: listening on $sockpath
test2[PID]: daemon started, pid=PID
test2[PID]: Received signal 1
test2[PID]: Received signal 10
test2[PID]: Received signal 15
test2[PID]: Shutting down
test3[PID]: listening on $sockpath
test3[PID]: daemon started, pid=PID
test3[PID]: PID PID gone away, exiting
test3[PID]: Shutting down
test4[PID]: daemon started, pid=PID
test5[PID]: listening on $sockpath
test5[PID]: daemon started, pid=PID
test5[PID]: Received signal 15
test5[PID]: Shutting down
EOF
unit_test sock_daemon_test "$pidfile" "$sockpath"

View File

@ -0,0 +1,632 @@
/*
sock daemon tests
Copyright (C) Amitay Isaacs 2016
This program is free software; you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation; either version 3 of the License, or
(at your option) any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program; if not, see <http://www.gnu.org/licenses/>.
*/
#include "replace.h"
#include "system/filesys.h"
#include "system/network.h"
#include "system/wait.h"
#include <assert.h>
#include "common/logging.c"
#include "common/pkt_read.c"
#include "common/pkt_write.c"
#include "common/comm.c"
#include "common/pidfile.c"
#include "common/sock_daemon.c"
#include "common/sock_io.c"
static struct tevent_req *dummy_read_send(TALLOC_CTX *mem_ctx,
struct tevent_context *ev,
struct sock_client_context *client,
uint8_t *buf, size_t buflen,
void *private_data)
{
return NULL;
}
static bool dummy_read_recv(struct tevent_req *req, int *perr)
{
if (perr != NULL) {
*perr = EINVAL;
}
return false;
}
static struct sock_socket_funcs dummy_socket_funcs = {
.read_send = dummy_read_send,
.read_recv = dummy_read_recv,
};
static void test1(TALLOC_CTX *mem_ctx, const char *pidfile,
const char *sockpath)
{
struct sock_daemon_context *sockd;
struct stat st;
int ret;
ret = sock_daemon_setup(mem_ctx, "test1", "file:", "NOTICE", pidfile,
NULL, NULL, &sockd);
assert(ret == 0);
assert(sockd != NULL);
ret = stat(pidfile, &st);
assert(ret == 0);
assert(S_ISREG(st.st_mode));
ret = sock_daemon_add_unix(sockd, sockpath, &dummy_socket_funcs, NULL);
assert(ret == 0);
ret = stat(sockpath, &st);
assert(ret == 0);
assert(S_ISSOCK(st.st_mode));
talloc_free(sockd);
ret = stat(pidfile, &st);
assert(ret == -1);
ret = stat(sockpath, &st);
assert(ret == -1);
}
static void test2_startup(void *private_data)
{
int fd = *(int *)private_data;
int ret = 1;
ssize_t nwritten;
nwritten = write(fd, &ret, sizeof(ret));
assert(nwritten == sizeof(ret));
}
static void test2_reconfigure(void *private_data)
{
int fd = *(int *)private_data;
int ret = 2;
ssize_t nwritten;
nwritten = write(fd, &ret, sizeof(ret));
assert(nwritten == sizeof(ret));
}
static void test2_shutdown(void *private_data)
{
int fd = *(int *)private_data;
int ret = 3;
ssize_t nwritten;
nwritten = write(fd, &ret, sizeof(ret));
assert(nwritten == sizeof(ret));
}
static struct sock_daemon_funcs test2_funcs = {
.startup = test2_startup,
.reconfigure = test2_reconfigure,
.shutdown = test2_shutdown,
};
static void test2(TALLOC_CTX *mem_ctx, const char *pidfile,
const char *sockpath)
{
struct stat st;
int fd[2];
pid_t pid, pid2;
int ret;
ssize_t n;
ret = pipe(fd);
assert(ret == 0);
pid = fork();
assert(pid != -1);
if (pid == 0) {
struct tevent_context *ev;
struct sock_daemon_context *sockd;
close(fd[0]);
ev = tevent_context_init(mem_ctx);
assert(ev != NULL);
ret = sock_daemon_setup(mem_ctx, "test2", "file:", "NOTICE",
pidfile, &test2_funcs, &fd[1], &sockd);
assert(ret == 0);
ret = sock_daemon_add_unix(sockd, sockpath,
&dummy_socket_funcs, NULL);
assert(ret == 0);
ret = sock_daemon_run(ev, sockd, -1);
assert(ret == EINTR);
exit(0);
}
close(fd[1]);
n = read(fd[0], &ret, sizeof(ret));
assert(n == sizeof(ret));
assert(ret == 1);
ret = kill(pid, SIGHUP);
assert(ret == 0);
n = read(fd[0], &ret, sizeof(ret));
assert(n == sizeof(ret));
assert(ret == 2);
ret = kill(pid, SIGUSR1);
assert(ret == 0);
n = read(fd[0], &ret, sizeof(ret));
assert(n == sizeof(ret));
assert(ret == 2);
ret = kill(pid, SIGTERM);
assert(ret == 0);
n = read(fd[0], &ret, sizeof(ret));
assert(n == sizeof(ret));
assert(ret == 3);
pid2 = waitpid(pid, &ret, 0);
assert(pid2 == pid);
assert(WEXITSTATUS(ret) == 0);
close(fd[0]);
ret = stat(pidfile, &st);
assert(ret == -1);
ret = stat(sockpath, &st);
assert(ret == -1);
}
static void test3(TALLOC_CTX *mem_ctx, const char *pidfile,
const char *sockpath)
{
struct stat st;
pid_t pid_watch, pid, pid2;
int ret;
pid_watch = fork();
assert(pid_watch != -1);
if (pid_watch == 0) {
sleep(10);
exit(0);
}
pid = fork();
assert(pid != -1);
if (pid == 0) {
struct tevent_context *ev;
struct sock_daemon_context *sockd;
ev = tevent_context_init(mem_ctx);
assert(ev != NULL);
ret = sock_daemon_setup(mem_ctx, "test3", "file:", "NOTICE",
NULL, NULL, NULL, &sockd);
assert(ret == 0);
ret = sock_daemon_add_unix(sockd, sockpath,
&dummy_socket_funcs, NULL);
assert(ret == 0);
ret = sock_daemon_run(ev, sockd, pid_watch);
assert(ret == ESRCH);
exit(0);
}
pid2 = waitpid(pid_watch, &ret, 0);
assert(pid2 == pid_watch);
assert(WEXITSTATUS(ret) == 0);
pid2 = waitpid(pid, &ret, 0);
assert(pid2 == pid);
assert(WEXITSTATUS(ret) == 0);
ret = stat(pidfile, &st);
assert(ret == -1);
ret = stat(sockpath, &st);
assert(ret == -1);
}
static void test4_handler(struct tevent_context *ev,
struct tevent_timer *te,
struct timeval curtime,
void *private_data)
{
struct sock_daemon_context *sockd = talloc_get_type_abort(
private_data, struct sock_daemon_context);
talloc_free(sockd);
}
static void test4(TALLOC_CTX *mem_ctx, const char *pidfile,
const char *sockpath)
{
struct stat st;
pid_t pid, pid2;
int ret;
pid = fork();
assert(pid != -1);
if (pid == 0) {
struct tevent_context *ev;
struct sock_daemon_context *sockd;
struct tevent_timer *te;
ev = tevent_context_init(mem_ctx);
assert(ev != NULL);
ret = sock_daemon_setup(mem_ctx, "test4", "file:", "NOTICE",
NULL, NULL, NULL, &sockd);
assert(ret == 0);
te = tevent_add_timer(ev, ev, tevent_timeval_current_ofs(10,0),
test4_handler, sockd);
assert(te != NULL);
ret = sock_daemon_run(ev, sockd, -1);
assert(ret == 0);
exit(0);
}
pid2 = waitpid(pid, &ret, 0);
assert(pid2 == pid);
assert(WEXITSTATUS(ret) == 0);
ret = stat(pidfile, &st);
assert(ret == -1);
ret = stat(sockpath, &st);
assert(ret == -1);
}
#define TEST5_MAX_CLIENTS 10
struct test5_pkt {
uint32_t len;
int data;
};
struct test5_client_state {
int id;
int fd;
bool done;
};
static void test5_client_callback(uint8_t *buf, size_t buflen,
void *private_data)
{
struct test5_client_state *state =
(struct test5_client_state *)private_data;
struct test5_pkt *pkt;
ssize_t n;
int ret;
if (buf == NULL) {
assert(buflen == 0);
ret = 0;
} else {
assert(buflen == sizeof(struct test5_pkt));
pkt = (struct test5_pkt *)buf;
assert(pkt->len == sizeof(struct test5_pkt));
ret = pkt->data;
}
assert(state->fd != -1);
n = write(state->fd, (void *)&ret, sizeof(int));
assert(n == sizeof(int));
state->done = true;
}
static int test5_client(const char *sockpath, int id)
{
pid_t pid;
int fd[2];
int ret;
ssize_t n;
ret = pipe(fd);
assert(ret == 0);
pid = fork();
assert(pid != -1);
if (pid == 0) {
struct tevent_context *ev;
struct test5_client_state state;
struct sock_queue *queue;
struct test5_pkt pkt;
int conn;
close(fd[0]);
ev = tevent_context_init(NULL);
assert(ev != NULL);
conn = sock_connect(sockpath);
assert(conn != -1);
state.id = id;
state.fd = fd[1];
state.done = false;
queue = sock_queue_setup(ev, ev, conn,
test5_client_callback, &state);
assert(queue != NULL);
pkt.len = 8;
pkt.data = 0xbaba;
ret = sock_queue_write(queue, (uint8_t *)&pkt,
sizeof(struct test5_pkt));
assert(ret == 0);
while (! state.done) {
tevent_loop_once(ev);
}
close(fd[0]);
state.fd = -1;
sleep(10);
exit(0);
}
close(fd[1]);
ret = 0;
n = read(fd[0], &ret, sizeof(ret));
if (n == 0) {
fprintf(stderr, "client id %d read 0 bytes\n", id);
}
assert(n == 0 || n == sizeof(ret));
close(fd[0]);
return ret;
}
struct test5_server_state {
int num_clients;
};
static bool test5_connect(struct sock_client_context *client,
void *private_data)
{
struct test5_server_state *state =
(struct test5_server_state *)private_data;
if (state->num_clients == TEST5_MAX_CLIENTS) {
return false;
}
state->num_clients += 1;
assert(state->num_clients <= TEST5_MAX_CLIENTS);
return true;
}
static void test5_disconnect(struct sock_client_context *client,
void *private_data)
{
struct test5_server_state *state =
(struct test5_server_state *)private_data;
state->num_clients -= 1;
assert(state->num_clients >= 0);
}
struct test5_read_state {
struct test5_pkt reply;
};
static void test5_read_done(struct tevent_req *subreq);
static struct tevent_req *test5_read_send(TALLOC_CTX *mem_ctx,
struct tevent_context *ev,
struct sock_client_context *client,
uint8_t *buf, size_t buflen,
void *private_data)
{
struct test5_server_state *server_state =
(struct test5_server_state *)private_data;
struct tevent_req *req, *subreq;
struct test5_read_state *state;
struct test5_pkt *pkt;
req = tevent_req_create(mem_ctx, &state, struct test5_read_state);
assert(req != NULL);
assert(buflen == sizeof(struct test5_pkt));
pkt = (struct test5_pkt *)buf;
assert(pkt->data == 0xbaba);
state->reply.len = sizeof(struct test5_pkt);
state->reply.data = server_state->num_clients;
subreq = sock_socket_write_send(state, ev, client,
(uint8_t *)&state->reply,
state->reply.len);
assert(subreq != NULL);
tevent_req_set_callback(subreq, test5_read_done, req);
return req;
}
static void test5_read_done(struct tevent_req *subreq)
{
struct tevent_req *req = tevent_req_callback_data(
subreq, struct tevent_req);
int ret;
bool status;
status = sock_socket_write_recv(subreq, &ret);
TALLOC_FREE(subreq);
if (! status) {
tevent_req_error(req, ret);
return;
}
tevent_req_done(req);
}
static bool test5_read_recv(struct tevent_req *req, int *perr)
{
int ret;
if (tevent_req_is_unix_error(req, &ret)) {
if (perr != NULL) {
*perr = ret;
}
return false;
}
return true;
}
static struct sock_socket_funcs test5_client_funcs = {
.connect = test5_connect,
.disconnect = test5_disconnect,
.read_send = test5_read_send,
.read_recv = test5_read_recv,
};
static void test5_startup(void *private_data)
{
int fd = *(int *)private_data;
int ret = 1;
ssize_t nwritten;
nwritten = write(fd, &ret, sizeof(ret));
assert(nwritten == sizeof(ret));
close(fd);
}
static struct sock_daemon_funcs test5_funcs = {
.startup = test5_startup,
};
static void test5(TALLOC_CTX *mem_ctx, const char *pidfile,
const char *sockpath)
{
pid_t pid_server, pid;
int fd[2], ret, i;
ssize_t n;
pid = getpid();
ret = pipe(fd);
assert(ret == 0);
pid_server = fork();
assert(pid_server != -1);
if (pid_server == 0) {
struct tevent_context *ev;
struct sock_daemon_context *sockd;
struct test5_server_state state;
close(fd[0]);
ev = tevent_context_init(mem_ctx);
assert(ev != NULL);
ret = sock_daemon_setup(mem_ctx, "test5", "file:", "NOTICE",
pidfile, &test5_funcs, &fd[1], &sockd);
assert(ret == 0);
state.num_clients = 0;
ret = sock_daemon_add_unix(sockd, sockpath,
&test5_client_funcs, &state);
assert(ret == 0);
ret = sock_daemon_run(ev, sockd, pid);
assert(ret == EINTR);
exit(0);
}
close(fd[1]);
n = read(fd[0], &ret, sizeof(ret));
assert(n == sizeof(ret));
assert(ret == 1);
close(fd[0]);
for (i=0; i<100; i++) {
ret = test5_client(sockpath, i);
if (i < TEST5_MAX_CLIENTS) {
assert(ret == i+1);
} else {
assert(ret == 0);
}
}
for (i=0; i<100; i++) {
pid = wait(&ret);
assert(pid != -1);
}
ret = kill(pid_server, SIGTERM);
assert(ret == 0);
}
int main(int argc, const char **argv)
{
TALLOC_CTX *mem_ctx;
const char *pidfile, *sockpath;
if (argc != 3) {
fprintf(stderr, "%s <pidfile> <sockpath>\n", argv[0]);
exit(1);
}
pidfile = argv[1];
sockpath = argv[2];
mem_ctx = talloc_new(NULL);
assert(mem_ctx != NULL);
test1(mem_ctx, pidfile, sockpath);
test2(mem_ctx, pidfile, sockpath);
test3(mem_ctx, pidfile, sockpath);
test4(mem_ctx, pidfile, sockpath);
test5(mem_ctx, pidfile, sockpath);
return 0;
}

View File

@ -423,6 +423,12 @@ def build(bld):
includes='include',
deps='replace talloc tevent tdb tdb-wrap')
bld.SAMBA_SUBSYSTEM('ctdb-server-util',
source=bld.SUBDIR('common',
'''sock_daemon.c'''),
deps='''samba-util ctdb-util tevent-util
replace talloc tevent''')
bld.SAMBA_SUBSYSTEM('ctdb-ipalloc',
source=bld.SUBDIR('server',
'''ipalloc_deterministic.c
@ -715,6 +721,7 @@ def build(bld):
'protocol_client_test',
'pidfile_test',
'run_proc_test',
'sock_daemon_test',
]
for target in ctdb_unit_tests: