1
0
mirror of https://github.com/samba-team/samba.git synced 2025-01-11 05:18:09 +03:00
samba-mirror/ctdb/common/comm.c
Amitay Isaacs 1543eedb8f ctdb-common: Add communication endpoint abstraction
Signed-off-by: Amitay Isaacs <amitay@gmail.com>
Reviewed-by: Martin Schwenke <martin@meltin.net>
2015-10-07 14:53:29 +02:00

405 lines
8.8 KiB
C

/*
Communication endpoint implementation
Copyright (C) Amitay Isaacs 2015
This program is free software; you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation; either version 3 of the License, or
(at your option) any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program; if not, see <http://www.gnu.org/licenses/>.
*/
#include "replace.h"
#include "system/network.h"
#include "system/filesys.h"
#include <talloc.h>
#include <tdb.h>
#include "lib/util/tevent_unix.h"
#include "pkt_read.h"
#include "pkt_write.h"
#include "comm.h"
static bool set_nonblocking(int fd)
{
int v;
v = fcntl(fd, F_GETFL, 0);
if (v == -1) {
return false;
}
if (fcntl(fd, F_SETFL, v | O_NONBLOCK) == -1) {
return false;
}
return true;
}
/*
* Communication endpoint around a socket
*/
#define SMALL_PKT_SIZE 1024
struct comm_context {
int fd;
comm_read_handler_fn read_handler;
void *read_private_data;
comm_dead_handler_fn dead_handler;
void *dead_private_data;
uint8_t small_pkt[SMALL_PKT_SIZE];
struct tevent_req *read_req, *write_req;
struct tevent_fd *fde;
struct tevent_queue *queue;
};
static void comm_fd_handler(struct tevent_context *ev,
struct tevent_fd *fde,
uint16_t flags, void *private_data);
static struct tevent_req *comm_read_send(TALLOC_CTX *mem_ctx,
struct tevent_context *ev,
struct comm_context *comm,
uint8_t *buf, size_t buflen);
static void comm_read_failed(struct tevent_req *req);
int comm_setup(TALLOC_CTX *mem_ctx, struct tevent_context *ev, int fd,
comm_read_handler_fn read_handler, void *read_private_data,
comm_dead_handler_fn dead_handler, void *dead_private_data,
struct comm_context **result)
{
struct comm_context *comm;
if (fd < 0) {
return EINVAL;
}
if (dead_handler == NULL) {
return EINVAL;
}
/* Socket queue relies on non-blocking sockets. */
if (!set_nonblocking(fd)) {
return EIO;
}
comm = talloc_zero(mem_ctx, struct comm_context);
if (comm == NULL) {
return ENOMEM;
}
comm->fd = fd;
comm->read_handler = read_handler;
comm->read_private_data = read_private_data;
comm->dead_handler = dead_handler;
comm->dead_private_data = dead_private_data;
comm->queue = tevent_queue_create(comm, "comm write queue");
if (comm->queue == NULL) {
goto fail;
}
/* Set up to write packets */
comm->fde = tevent_add_fd(ev, comm, fd, TEVENT_FD_READ,
comm_fd_handler, comm);
if (comm->fde == NULL) {
goto fail;
}
/* Set up to read packets */
if (read_handler != NULL) {
struct tevent_req *req;
req = comm_read_send(comm, ev, comm, comm->small_pkt,
SMALL_PKT_SIZE);
if (req == NULL) {
goto fail;
}
tevent_req_set_callback(req, comm_read_failed, comm);
comm->read_req = req;
}
*result = comm;
return 0;
fail:
talloc_free(comm);
return ENOMEM;
}
/*
* Read packets
*/
struct comm_read_state {
struct tevent_context *ev;
struct comm_context *comm;
uint8_t *buf;
size_t buflen;
struct tevent_req *subreq;
};
static ssize_t comm_read_more(uint8_t *buf, size_t buflen, void *private_data);
static void comm_read_done(struct tevent_req *subreq);
static struct tevent_req *comm_read_send(TALLOC_CTX *mem_ctx,
struct tevent_context *ev,
struct comm_context *comm,
uint8_t *buf, size_t buflen)
{
struct tevent_req *req, *subreq;
struct comm_read_state *state;
req = tevent_req_create(mem_ctx, &state, struct comm_read_state);
if (req == NULL) {
return NULL;
}
state->ev = ev;
state->comm = comm;
state->buf = buf;
state->buflen = buflen;
subreq = pkt_read_send(state, state->ev, comm->fd, sizeof(uint32_t),
state->buf, state->buflen,
comm_read_more, NULL);
if (tevent_req_nomem(subreq, req)) {
return tevent_req_post(req, ev);
}
state->subreq = subreq;
tevent_req_set_callback(subreq, comm_read_done, req);
return req;
}
static ssize_t comm_read_more(uint8_t *buf, size_t buflen, void *private_data)
{
uint32_t packet_len;
if (buflen < sizeof(uint32_t)) {
return sizeof(uint32_t) - buflen;
}
packet_len = *(uint32_t *)buf;
return packet_len - buflen;
}
static void comm_read_done(struct tevent_req *subreq)
{
struct tevent_req *req = tevent_req_callback_data(
subreq, struct tevent_req);
struct comm_read_state *state = tevent_req_data(
req, struct comm_read_state);
struct comm_context *comm = state->comm;
ssize_t nread;
uint8_t *buf;
bool free_buf;
int err = 0;
nread = pkt_read_recv(subreq, state, &buf, &free_buf, &err);
TALLOC_FREE(subreq);
state->subreq = NULL;
if (nread == -1) {
tevent_req_error(req, err);
return;
}
comm->read_handler(buf, nread, comm->read_private_data);
if (free_buf) {
talloc_free(buf);
}
subreq = pkt_read_send(state, state->ev, comm->fd, sizeof(uint32_t),
state->buf, state->buflen,
comm_read_more, NULL);
if (tevent_req_nomem(subreq, req)) {
return;
}
state->subreq = subreq;
tevent_req_set_callback(subreq, comm_read_done, req);
}
static void comm_read_recv(struct tevent_req *req, int *perr)
{
int err;
if (tevent_req_is_unix_error(req, &err)) {
if (perr != NULL) {
*perr = err;
}
}
}
static void comm_read_failed(struct tevent_req *req)
{
struct comm_context *comm = tevent_req_callback_data(
req, struct comm_context);
comm_read_recv(req, NULL);
TALLOC_FREE(req);
comm->read_req = NULL;
if (comm->dead_handler != NULL) {
comm->dead_handler(comm->dead_private_data);
}
}
/*
* Write packets
*/
struct comm_write_state {
struct tevent_context *ev;
struct comm_context *comm;
struct tevent_req *subreq;
uint8_t *buf;
size_t buflen, nwritten;
};
static void comm_write_trigger(struct tevent_req *req, void *private_data);
static void comm_write_done(struct tevent_req *subreq);
struct tevent_req *comm_write_send(TALLOC_CTX *mem_ctx,
struct tevent_context *ev,
struct comm_context *comm,
uint8_t *buf, size_t buflen)
{
struct tevent_req *req;
struct comm_write_state *state;
req = tevent_req_create(mem_ctx, &state, struct comm_write_state);
if (req == NULL) {
return NULL;
}
state->ev = ev;
state->comm = comm;
state->buf = buf;
state->buflen = buflen;
if (!tevent_queue_add_entry(comm->queue, ev, req,
comm_write_trigger, NULL)) {
talloc_free(req);
return NULL;
}
return req;
}
static void comm_write_trigger(struct tevent_req *req, void *private_data)
{
struct comm_write_state *state = tevent_req_data(
req, struct comm_write_state);
struct comm_context *comm = state->comm;
struct tevent_req *subreq;
comm->write_req = req;
subreq = pkt_write_send(state, state->ev, comm->fd,
state->buf, state->buflen);
if (tevent_req_nomem(subreq, req)) {
return;
}
state->subreq = subreq;
tevent_req_set_callback(subreq, comm_write_done, req);
TEVENT_FD_WRITEABLE(comm->fde);
}
static void comm_write_done(struct tevent_req *subreq)
{
struct tevent_req *req = tevent_req_callback_data(
subreq, struct tevent_req);
struct comm_write_state *state = tevent_req_data(
req, struct comm_write_state);
struct comm_context *comm = state->comm;
ssize_t nwritten;
int err = 0;
TEVENT_FD_NOT_WRITEABLE(comm->fde);
nwritten = pkt_write_recv(subreq, &err);
TALLOC_FREE(subreq);
state->subreq = NULL;
comm->write_req = NULL;
if (nwritten == -1) {
if (err == EPIPE) {
comm->dead_handler(comm->dead_private_data);
}
tevent_req_error(req, err);
return;
}
state->nwritten = nwritten;
tevent_req_done(req);
}
bool comm_write_recv(struct tevent_req *req, int *perr)
{
struct comm_write_state *state = tevent_req_data(
req, struct comm_write_state);
int err;
if (tevent_req_is_unix_error(req, &err)) {
if (perr != NULL) {
*perr = err;
}
return false;
}
if (state->nwritten != state->buflen) {
*perr = EIO;
return false;
}
*perr = 0;
return true;
}
static void comm_fd_handler(struct tevent_context *ev,
struct tevent_fd *fde,
uint16_t flags, void *private_data)
{
struct comm_context *comm = talloc_get_type_abort(
private_data, struct comm_context);
if (flags & TEVENT_FD_READ) {
struct comm_read_state *read_state;
if (comm->read_req == NULL) {
/* This should never happen */
abort();
}
read_state = tevent_req_data(comm->read_req,
struct comm_read_state);
pkt_read_handler(ev, fde, flags, read_state->subreq);
}
if (flags & TEVENT_FD_WRITE) {
struct comm_write_state *write_state;
if (comm->write_req == NULL) {
/* This should never happen */
abort();
}
write_state = tevent_req_data(comm->write_req,
struct comm_write_state);
pkt_write_handler(ev, fde, flags, write_state->subreq);
}
}