1
0
mirror of https://github.com/samba-team/samba.git synced 2024-12-22 13:34:15 +03:00

ctdb-client: Use sock_client abstraction for eventd client

Signed-off-by: Amitay Isaacs <amitay@gmail.com>
Reviewed-by: Martin Schwenke <martin@meltin.net>

Autobuild-User(master): Martin Schwenke <martins@samba.org>
Autobuild-Date(master): Fri Sep  1 12:49:27 CEST 2017 on sn-devel-144
This commit is contained in:
Amitay Isaacs 2017-06-29 16:25:57 +10:00 committed by Martin Schwenke
parent dcc1eaf542
commit dccd9630fb

View File

@ -18,7 +18,6 @@
*/
#include "replace.h"
#include "system/filesys.h"
#include "system/network.h"
#include <talloc.h>
@ -28,27 +27,85 @@
#include "lib/util/tevent_unix.h"
#include "common/logging.h"
#include "common/reqid.h"
#include "common/comm.h"
#include "common/sock_client.h"
#include "protocol/protocol_api.h"
#include "client/client_event.h"
struct ctdb_event_context {
struct reqid_context *idr;
struct comm_context *comm;
int fd;
ctdb_client_callback_func_t callback;
void *private_data;
struct sock_client_context *sockc;
};
static int ctdb_event_connect(struct ctdb_event_context *eclient,
struct tevent_context *ev,
const char *sockpath);
static int ctdb_event_msg_request_push(void *request_data, uint32_t reqid,
TALLOC_CTX *mem_ctx,
uint8_t **buf, size_t *buflen,
void *private_data)
{
struct ctdb_event_request *request =
(struct ctdb_event_request *)request_data;
int ret;
sock_packet_header_set_reqid(&request->header, reqid);
*buflen = ctdb_event_request_len(request);
*buf = talloc_size(mem_ctx, *buflen);
if (*buf == NULL) {
return ENOMEM;
}
ret = ctdb_event_request_push(request, *buf, buflen);
if (ret != 0) {
return ret;
}
return 0;
}
static int ctdb_event_msg_reply_pull(uint8_t *buf, size_t buflen,
TALLOC_CTX *mem_ctx, void **reply_data,
void *private_data)
{
struct ctdb_event_reply *reply;
int ret;
reply = talloc_zero(mem_ctx, struct ctdb_event_reply);
if (reply == NULL) {
return ENOMEM;
}
ret = ctdb_event_reply_pull(buf, buflen, reply, reply);
if (ret != 0) {
talloc_free(reply);
return ret;
}
*reply_data = reply;
return 0;
}
static int ctdb_event_msg_reply_reqid(uint8_t *buf, size_t buflen,
uint32_t *reqid, void *private_data)
{
struct sock_packet_header header;
size_t np;
int ret;
ret = sock_packet_header_pull(buf, buflen, &header, &np);
if (ret != 0) {
return ret;
}
*reqid = header.reqid;
return 0;
}
struct sock_client_proto_funcs event_proto_funcs = {
.request_push = ctdb_event_msg_request_push,
.reply_pull = ctdb_event_msg_reply_pull,
.reply_reqid = ctdb_event_msg_reply_reqid,
};
static int ctdb_event_context_destructor(struct ctdb_event_context *eclient);
int ctdb_event_init(TALLOC_CTX *mem_ctx, struct tevent_context *ev,
const char *sockpath, struct ctdb_event_context **out)
@ -62,267 +119,57 @@ int ctdb_event_init(TALLOC_CTX *mem_ctx, struct tevent_context *ev,
return ENOMEM;
}
ret = reqid_init(eclient, INT_MAX-200, &eclient->idr);
if (ret != 0) {
DEBUG(DEBUG_ERR, ("reqid_init() failed, ret=%d\n", ret));
talloc_free(eclient);
return ret;
}
eclient->fd = -1;
ret = ctdb_event_connect(eclient, ev, sockpath);
ret = sock_client_setup(eclient, ev, sockpath,
&event_proto_funcs, eclient,
&eclient->sockc);
if (ret != 0) {
talloc_free(eclient);
return ret;
}
talloc_set_destructor(eclient, ctdb_event_context_destructor);
*out = eclient;
return 0;
}
static int ctdb_event_context_destructor(struct ctdb_event_context *eclient)
{
if (eclient->fd != -1) {
close(eclient->fd);
eclient->fd = -1;
}
return 0;
}
static void event_read_handler(uint8_t *buf, size_t buflen,
void *private_data);
static void event_dead_handler(void *private_data);
static int ctdb_event_connect(struct ctdb_event_context *eclient,
struct tevent_context *ev, const char *sockpath)
{
struct sockaddr_un addr;
size_t len;
int fd, ret;
if (sockpath == NULL) {
DEBUG(DEBUG_ERR, ("socket path cannot be NULL\n"));
return EINVAL;
}
memset(&addr, 0, sizeof(addr));
addr.sun_family = AF_UNIX;
len = strlcpy(addr.sun_path, sockpath, sizeof(addr.sun_path));
if (len >= sizeof(addr.sun_path)) {
DEBUG(DEBUG_ERR, ("socket path too long, len=%zu\n",
strlen(sockpath)));
return ENAMETOOLONG;
}
fd = socket(AF_UNIX, SOCK_STREAM, 0);
if (fd == -1) {
ret = errno;
DEBUG(DEBUG_ERR, ("socket() failed, errno=%d\n", ret));
return ret;
}
ret = connect(fd, (struct sockaddr *)&addr, sizeof(addr));
if (ret == -1) {
ret = errno;
DEBUG(DEBUG_ERR, ("connect() failed, errno=%d\n", ret));
close(fd);
return ret;
}
eclient->fd = fd;
ret = comm_setup(eclient, ev, fd, event_read_handler, eclient,
event_dead_handler, eclient, &eclient->comm);
if (ret != 0) {
DEBUG(DEBUG_ERR, ("comm_setup() failed, ret=%d\n", ret));
close(fd);
eclient->fd = -1;
return ret;
}
return 0;
}
static void ctdb_event_msg_reply(struct ctdb_event_context *eclient,
uint8_t *buf, size_t buflen);
static void event_read_handler(uint8_t *buf, size_t buflen,
void *private_data)
{
struct ctdb_event_context *eclient = talloc_get_type_abort(
private_data, struct ctdb_event_context);
ctdb_event_msg_reply(eclient, buf, buflen);
}
static void event_dead_handler(void *private_data)
{
struct ctdb_event_context *eclient = talloc_get_type_abort(
private_data, struct ctdb_event_context);
ctdb_client_callback_func_t callback = eclient->callback;
void *callback_data = eclient->private_data;
talloc_free(eclient);
if (callback != NULL) {
callback(callback_data);
return;
}
DEBUG(DEBUG_NOTICE, ("connection to daemon closed, exiting\n"));
exit(1);
}
void ctdb_event_set_disconnect_callback(struct ctdb_event_context *eclient,
ctdb_client_callback_func_t callback,
void *private_data)
{
eclient->callback = callback;
eclient->private_data = private_data;
sock_client_set_disconnect_callback(eclient->sockc,
callback, private_data);
}
/*
* Handle eventd_request and eventd_reply
*/
struct ctdb_event_msg_state {
struct ctdb_event_context *eclient;
uint32_t reqid;
struct tevent_req *req;
struct ctdb_event_reply *reply;
};
static int ctdb_event_msg_state_destructor(struct ctdb_event_msg_state *state);
static void ctdb_event_msg_done(struct tevent_req *subreq);
struct tevent_req *ctdb_event_msg_send(TALLOC_CTX *mem_ctx,
struct tevent_context *ev,
struct ctdb_event_context *eclient,
struct ctdb_event_request *request)
{
struct tevent_req *req, *subreq;
struct ctdb_event_msg_state *state;
uint8_t *buf;
size_t buflen;
int ret;
req = tevent_req_create(mem_ctx, &state, struct ctdb_event_msg_state);
if (req == NULL) {
return NULL;
}
state->eclient = eclient;
state->reqid = reqid_new(eclient->idr, state);
if (state->reqid == REQID_INVALID) {
talloc_free(req);
return NULL;
}
state->req = req;
talloc_set_destructor(state, ctdb_event_msg_state_destructor);
sock_packet_header_set_reqid(&request->header, state->reqid);
buflen = ctdb_event_request_len(request);
buf = talloc_size(state, buflen);
if (tevent_req_nomem(buf, req)) {
return tevent_req_post(req, ev);
}
ret = ctdb_event_request_push(request, buf, &buflen);
if (ret != 0) {
tevent_req_error(req, ret);
return tevent_req_post(req, ev);
}
subreq = comm_write_send(state, ev, eclient->comm, buf, buflen);
if (tevent_req_nomem(subreq, req)) {
return tevent_req_post(req, ev);
}
tevent_req_set_callback(subreq, ctdb_event_msg_done, req);
struct tevent_req *req;
req = sock_client_msg_send(mem_ctx, ev, eclient->sockc,
tevent_timeval_zero(), request);
return req;
}
static int ctdb_event_msg_state_destructor(struct ctdb_event_msg_state *state)
{
reqid_remove(state->eclient->idr, state->reqid);
return 0;
}
static void ctdb_event_msg_done(struct tevent_req *subreq)
{
struct tevent_req *req = tevent_req_callback_data(
subreq, struct tevent_req);
int ret;
bool status;
status = comm_write_recv(subreq, &ret);
TALLOC_FREE(subreq);
if (! status) {
tevent_req_error(req, ret);
return;
}
/* Wait for the reply or timeout */
}
static void ctdb_event_msg_reply(struct ctdb_event_context *eclient,
uint8_t *buf, size_t buflen)
{
struct ctdb_event_reply *reply;
struct ctdb_event_msg_state *state;
int ret;
reply = talloc_zero(eclient, struct ctdb_event_reply);
if (reply == NULL) {
D_WARNING("memory allocation error\n");
return;
}
ret = ctdb_event_reply_pull(buf, buflen, reply, reply);
if (ret != 0) {
D_WARNING("Invalid packet received, ret=%d\n", ret);
return;
}
state = reqid_find(eclient->idr, reply->header.reqid,
struct ctdb_event_msg_state);
if (state == NULL) {
return;
}
if (reply->header.reqid != state->reqid) {
return;
}
state->reply = talloc_steal(state, reply);
tevent_req_done(state->req);
}
bool ctdb_event_msg_recv(struct tevent_req *req, int *perr,
TALLOC_CTX *mem_ctx,
struct ctdb_event_reply **reply)
{
struct ctdb_event_msg_state *state = tevent_req_data(
req, struct ctdb_event_msg_state);
int ret;
void *reply_data;
bool status;
if (tevent_req_is_unix_error(req, &ret)) {
if (perr != NULL) {
*perr = ret;
}
return false;
status = sock_client_msg_recv(req, perr, mem_ctx, &reply_data);
if (status && reply != NULL) {
*reply = talloc_get_type_abort(
reply_data, struct ctdb_event_reply);
}
if (reply != NULL) {
*reply = talloc_steal(mem_ctx, state->reply);
}
return true;
return status;
}
/*