mirror of
https://github.com/samba-team/samba.git
synced 2025-02-28 01:58:17 +03:00
- fully separate the client version of ctdb_call from the daemon
version. The client version is different enough that this is worthwhile - enable local shortcut for client version of ctdb_call - add idr_find_type(), with better error reporting in case of type mismatch (This used to be ctdb commit 2388094c5f7b6ce003e86b05620c06217d63b49c)
This commit is contained in:
parent
b79e29c779
commit
d0af75d1fa
@ -47,9 +47,9 @@
|
||||
/*
|
||||
local version of ctdb_call
|
||||
*/
|
||||
static int ctdb_call_local(struct ctdb_db_context *ctdb_db, struct ctdb_call *call,
|
||||
struct ctdb_ltdb_header *header, TDB_DATA *data,
|
||||
uint32_t caller)
|
||||
int ctdb_call_local(struct ctdb_db_context *ctdb_db, struct ctdb_call *call,
|
||||
struct ctdb_ltdb_header *header, TDB_DATA *data,
|
||||
uint32_t caller)
|
||||
{
|
||||
struct ctdb_call_info *c;
|
||||
struct ctdb_registered_call *fn;
|
||||
@ -426,17 +426,12 @@ void ctdb_reply_call(struct ctdb_context *ctdb, struct ctdb_req_header *hdr)
|
||||
struct ctdb_reply_call *c = (struct ctdb_reply_call *)hdr;
|
||||
struct ctdb_call_state *state;
|
||||
|
||||
state = idr_find(ctdb->idr, hdr->reqid);
|
||||
state = idr_find_type(ctdb->idr, hdr->reqid, struct ctdb_call_state);
|
||||
if (state == NULL) {
|
||||
DEBUG(0, ("reqid %d not found\n", hdr->reqid));
|
||||
return;
|
||||
}
|
||||
|
||||
if (!talloc_get_type(state, struct ctdb_call_state)) {
|
||||
DEBUG(0,("ctdb idr type error at %s\n", __location__));
|
||||
return;
|
||||
}
|
||||
|
||||
state->call.reply_data.dptr = c->data;
|
||||
state->call.reply_data.dsize = c->datalen;
|
||||
state->call.status = c->status;
|
||||
@ -463,16 +458,11 @@ void ctdb_reply_dmaster(struct ctdb_context *ctdb, struct ctdb_req_header *hdr)
|
||||
struct ctdb_db_context *ctdb_db;
|
||||
TDB_DATA data;
|
||||
|
||||
state = idr_find(ctdb->idr, hdr->reqid);
|
||||
state = idr_find_type(ctdb->idr, hdr->reqid, struct ctdb_call_state);
|
||||
if (state == NULL) {
|
||||
return;
|
||||
}
|
||||
|
||||
if (!talloc_get_type(state, struct ctdb_call_state)) {
|
||||
DEBUG(0,("ctdb idr type error at %s\n", __location__));
|
||||
return;
|
||||
}
|
||||
|
||||
ctdb_db = state->ctdb_db;
|
||||
|
||||
data.dptr = c->data;
|
||||
@ -508,14 +498,9 @@ void ctdb_reply_error(struct ctdb_context *ctdb, struct ctdb_req_header *hdr)
|
||||
struct ctdb_reply_error *c = (struct ctdb_reply_error *)hdr;
|
||||
struct ctdb_call_state *state;
|
||||
|
||||
state = idr_find(ctdb->idr, hdr->reqid);
|
||||
state = idr_find_type(ctdb->idr, hdr->reqid, struct ctdb_call_state);
|
||||
if (state == NULL) return;
|
||||
|
||||
if (!talloc_get_type(state, struct ctdb_call_state)) {
|
||||
DEBUG(0,("ctdb idr type error at %s\n", __location__));
|
||||
return;
|
||||
}
|
||||
|
||||
talloc_steal(state, c);
|
||||
|
||||
state->state = CTDB_CALL_ERROR;
|
||||
@ -538,14 +523,9 @@ void ctdb_reply_redirect(struct ctdb_context *ctdb, struct ctdb_req_header *hdr)
|
||||
struct ctdb_reply_redirect *c = (struct ctdb_reply_redirect *)hdr;
|
||||
struct ctdb_call_state *state;
|
||||
|
||||
state = idr_find(ctdb->idr, hdr->reqid);
|
||||
state = idr_find_type(ctdb->idr, hdr->reqid, struct ctdb_call_state);
|
||||
if (state == NULL) return;
|
||||
|
||||
if (!talloc_get_type(state, struct ctdb_call_state)) {
|
||||
DEBUG(0,("ctdb idr type error at %s\n", __location__));
|
||||
return;
|
||||
}
|
||||
|
||||
talloc_steal(state, c);
|
||||
|
||||
/* don't allow for too many redirects */
|
||||
|
@ -74,24 +74,54 @@ void ctdb_reply_fetch_lock(struct ctdb_context *ctdb, struct ctdb_req_header *hd
|
||||
struct ctdb_reply_fetch_lock *r = (struct ctdb_reply_fetch_lock *)hdr;
|
||||
struct ctdb_fetch_lock_state *state;
|
||||
|
||||
state = idr_find(ctdb->idr, hdr->reqid);
|
||||
state = idr_find_type(ctdb->idr, hdr->reqid, struct ctdb_fetch_lock_state);
|
||||
if (state == NULL) {
|
||||
DEBUG(0, ("reqid %d not found at %s\n", hdr->reqid,
|
||||
__location__));
|
||||
return;
|
||||
}
|
||||
|
||||
if (!talloc_get_type(state, struct ctdb_fetch_lock_state)) {
|
||||
DEBUG(0, ("ctdb idr type error at %s, it's a %s\n",
|
||||
__location__, talloc_get_name(state)));
|
||||
return;
|
||||
}
|
||||
|
||||
state->r = talloc_steal(state, r);
|
||||
|
||||
state->state = CTDB_FETCH_LOCK_DONE;
|
||||
}
|
||||
|
||||
/*
|
||||
state of a in-progress ctdb call in client
|
||||
*/
|
||||
struct ctdb_client_call_state {
|
||||
enum call_state state;
|
||||
uint32_t reqid;
|
||||
struct ctdb_db_context *ctdb_db;
|
||||
struct ctdb_call call;
|
||||
};
|
||||
|
||||
/*
|
||||
called when a CTDB_REPLY_CALL packet comes in in the client
|
||||
|
||||
This packet comes in response to a CTDB_REQ_CALL request packet. It
|
||||
contains any reply data from the call
|
||||
*/
|
||||
static void ctdb_client_reply_call(struct ctdb_context *ctdb, struct ctdb_req_header *hdr)
|
||||
{
|
||||
struct ctdb_reply_call *c = (struct ctdb_reply_call *)hdr;
|
||||
struct ctdb_client_call_state *state;
|
||||
|
||||
state = idr_find_type(ctdb->idr, hdr->reqid, struct ctdb_client_call_state);
|
||||
if (state == NULL) {
|
||||
DEBUG(0, ("reqid %d not found\n", hdr->reqid));
|
||||
return;
|
||||
}
|
||||
|
||||
state->call.reply_data.dptr = c->data;
|
||||
state->call.reply_data.dsize = c->datalen;
|
||||
state->call.status = c->status;
|
||||
|
||||
talloc_steal(state, c);
|
||||
|
||||
state->state = CTDB_CALL_DONE;
|
||||
}
|
||||
|
||||
/*
|
||||
this is called in the client, when data comes in from the daemon
|
||||
*/
|
||||
@ -135,7 +165,7 @@ static void ctdb_client_read_cb(uint8_t *data, size_t cnt, void *args)
|
||||
|
||||
switch (hdr->operation) {
|
||||
case CTDB_REPLY_CALL:
|
||||
ctdb_reply_call(ctdb, hdr);
|
||||
ctdb_client_reply_call(ctdb, hdr);
|
||||
break;
|
||||
|
||||
case CTDB_REQ_MESSAGE:
|
||||
@ -194,37 +224,26 @@ struct ctdb_record_handle {
|
||||
struct ctdb_ltdb_header header;
|
||||
};
|
||||
|
||||
|
||||
/*
|
||||
make a recv call to the local ctdb daemon - called from client context
|
||||
|
||||
This is called when the program wants to wait for a ctdb_call to complete and get the
|
||||
results. This call will block unless the call has already completed.
|
||||
*/
|
||||
int ctdb_call_recv(struct ctdb_call_state *state, struct ctdb_call *call)
|
||||
int ctdb_call_recv(struct ctdb_client_call_state *state, struct ctdb_call *call)
|
||||
{
|
||||
struct ctdb_record_handle *rec;
|
||||
|
||||
while (state->state < CTDB_CALL_DONE) {
|
||||
event_loop_once(state->node->ctdb->ev);
|
||||
event_loop_once(state->ctdb_db->ctdb->ev);
|
||||
}
|
||||
if (state->state != CTDB_CALL_DONE) {
|
||||
ctdb_set_error(state->node->ctdb, "%s", state->errmsg);
|
||||
DEBUG(0,(__location__ " ctdb_call_recv failed\n"));
|
||||
talloc_free(state);
|
||||
return -1;
|
||||
}
|
||||
|
||||
rec = state->fetch_private;
|
||||
|
||||
/* ugly hack to manage forced migration */
|
||||
if (rec != NULL) {
|
||||
rec->data->dptr = talloc_steal(rec, state->call.reply_data.dptr);
|
||||
rec->data->dsize = state->call.reply_data.dsize;
|
||||
talloc_free(state);
|
||||
return 0;
|
||||
}
|
||||
|
||||
if (state->call.reply_data.dsize) {
|
||||
call->reply_data.dptr = talloc_memdup(state->node->ctdb,
|
||||
call->reply_data.dptr = talloc_memdup(state->ctdb_db,
|
||||
state->call.reply_data.dptr,
|
||||
state->call.reply_data.dsize);
|
||||
call->reply_data.dsize = state->call.reply_data.dsize;
|
||||
@ -244,13 +263,41 @@ int ctdb_call_recv(struct ctdb_call_state *state, struct ctdb_call *call)
|
||||
/*
|
||||
destroy a ctdb_call in client
|
||||
*/
|
||||
static int ctdb_client_call_destructor(struct ctdb_call_state *state)
|
||||
static int ctdb_client_call_destructor(struct ctdb_client_call_state *state)
|
||||
{
|
||||
idr_remove(state->node->ctdb->idr, state->c->hdr.reqid);
|
||||
idr_remove(state->ctdb_db->ctdb->idr, state->reqid);
|
||||
return 0;
|
||||
}
|
||||
|
||||
/*
|
||||
construct an event driven local ctdb_call
|
||||
|
||||
this is used so that locally processed ctdb_call requests are processed
|
||||
in an event driven manner
|
||||
*/
|
||||
static struct ctdb_client_call_state *ctdb_client_call_local_send(struct ctdb_db_context *ctdb_db,
|
||||
struct ctdb_call *call,
|
||||
struct ctdb_ltdb_header *header,
|
||||
TDB_DATA *data)
|
||||
{
|
||||
struct ctdb_client_call_state *state;
|
||||
struct ctdb_context *ctdb = ctdb_db->ctdb;
|
||||
int ret;
|
||||
|
||||
state = talloc_zero(ctdb_db, struct ctdb_client_call_state);
|
||||
CTDB_NO_MEMORY_NULL(ctdb, state);
|
||||
|
||||
talloc_steal(state, data->dptr);
|
||||
|
||||
state->state = CTDB_CALL_DONE;
|
||||
state->call = *call;
|
||||
state->ctdb_db = ctdb_db;
|
||||
|
||||
ret = ctdb_call_local(ctdb_db, &state->call, header, data, ctdb->vnn);
|
||||
talloc_steal(state, state->call.reply_data.dptr);
|
||||
|
||||
return state;
|
||||
}
|
||||
|
||||
/*
|
||||
make a ctdb call to the local daemon - async send. Called from client context.
|
||||
@ -258,83 +305,85 @@ static int ctdb_client_call_destructor(struct ctdb_call_state *state)
|
||||
This constructs a ctdb_call request and queues it for processing.
|
||||
This call never blocks.
|
||||
*/
|
||||
struct ctdb_call_state *ctdb_call_send(struct ctdb_db_context *ctdb_db,
|
||||
struct ctdb_client_call_state *ctdb_call_send(struct ctdb_db_context *ctdb_db,
|
||||
struct ctdb_call *call)
|
||||
{
|
||||
struct ctdb_call_state *state;
|
||||
struct ctdb_client_call_state *state;
|
||||
struct ctdb_context *ctdb = ctdb_db->ctdb;
|
||||
struct ctdb_ltdb_header header;
|
||||
TDB_DATA data;
|
||||
int ret;
|
||||
size_t len;
|
||||
struct ctdb_req_call *c;
|
||||
|
||||
/* if the domain socket is not yet open, open it */
|
||||
if (ctdb->daemon.sd==-1) {
|
||||
ux_socket_connect(ctdb);
|
||||
}
|
||||
|
||||
ret = ctdb_ltdb_fetch(ctdb_db, call->key, &header, ctdb_db, &data);
|
||||
ret = ctdb_ltdb_lock(ctdb_db, call->key);
|
||||
if (ret != 0) {
|
||||
DEBUG(0,(__location__ " Failed to get chainlock\n"));
|
||||
return NULL;
|
||||
}
|
||||
|
||||
ret = ctdb_ltdb_fetch(ctdb_db, call->key, &header, ctdb_db, &data);
|
||||
if (ret != 0) {
|
||||
ctdb_ltdb_unlock(ctdb_db, call->key);
|
||||
DEBUG(0,(__location__ " Failed to fetch record\n"));
|
||||
return NULL;
|
||||
}
|
||||
|
||||
#if 0
|
||||
if (header.dmaster == ctdb->vnn && !(ctdb->flags & CTDB_FLAG_SELF_CONNECT)) {
|
||||
state = ctdb_call_local_send(ctdb_db, call, &header, &data);
|
||||
state = ctdb_client_call_local_send(ctdb_db, call, &header, &data);
|
||||
talloc_free(data.dptr);
|
||||
ctdb_ltdb_unlock(ctdb_db, call->key);
|
||||
return state;
|
||||
}
|
||||
#endif
|
||||
|
||||
state = talloc_zero(ctdb_db, struct ctdb_call_state);
|
||||
ctdb_ltdb_unlock(ctdb_db, call->key);
|
||||
talloc_free(data.dptr);
|
||||
|
||||
state = talloc_zero(ctdb_db, struct ctdb_client_call_state);
|
||||
if (state == NULL) {
|
||||
DEBUG(0, (__location__ " failed to allocate state\n"));
|
||||
return NULL;
|
||||
}
|
||||
|
||||
talloc_steal(state, data.dptr);
|
||||
|
||||
len = offsetof(struct ctdb_req_call, data) + call->key.dsize + call->call_data.dsize;
|
||||
state->c = ctdbd_allocate_pkt(state, len);
|
||||
if (state->c == NULL) {
|
||||
c = ctdbd_allocate_pkt(state, len);
|
||||
if (c == NULL) {
|
||||
DEBUG(0, (__location__ " failed to allocate packet\n"));
|
||||
return NULL;
|
||||
}
|
||||
talloc_set_name_const(state->c, "ctdbd req_call packet");
|
||||
talloc_set_name_const(c, "ctdb client req_call packet");
|
||||
memset(c, 0, offsetof(struct ctdb_req_call, data));
|
||||
|
||||
state->c->hdr.length = len;
|
||||
state->c->hdr.ctdb_magic = CTDB_MAGIC;
|
||||
state->c->hdr.ctdb_version = CTDB_VERSION;
|
||||
state->c->hdr.operation = CTDB_REQ_CALL;
|
||||
state->c->hdr.destnode = header.dmaster;
|
||||
state->c->hdr.srcnode = ctdb->vnn;
|
||||
c->hdr.length = len;
|
||||
c->hdr.ctdb_magic = CTDB_MAGIC;
|
||||
c->hdr.ctdb_version = CTDB_VERSION;
|
||||
c->hdr.operation = CTDB_REQ_CALL;
|
||||
/* this limits us to 16k outstanding messages - not unreasonable */
|
||||
state->c->hdr.reqid = idr_get_new(ctdb->idr, state, 0xFFFF);
|
||||
state->c->flags = call->flags;
|
||||
state->c->db_id = ctdb_db->db_id;
|
||||
state->c->callid = call->call_id;
|
||||
state->c->keylen = call->key.dsize;
|
||||
state->c->calldatalen = call->call_data.dsize;
|
||||
memcpy(&state->c->data[0], call->key.dptr, call->key.dsize);
|
||||
memcpy(&state->c->data[call->key.dsize],
|
||||
c->hdr.reqid = idr_get_new(ctdb->idr, state, 0xFFFF);
|
||||
c->flags = call->flags;
|
||||
c->db_id = ctdb_db->db_id;
|
||||
c->callid = call->call_id;
|
||||
c->keylen = call->key.dsize;
|
||||
c->calldatalen = call->call_data.dsize;
|
||||
memcpy(&c->data[0], call->key.dptr, call->key.dsize);
|
||||
memcpy(&c->data[call->key.dsize],
|
||||
call->call_data.dptr, call->call_data.dsize);
|
||||
state->call = *call;
|
||||
state->call.call_data.dptr = &state->c->data[call->key.dsize];
|
||||
state->call.key.dptr = &state->c->data[0];
|
||||
state->call.call_data.dptr = &c->data[call->key.dsize];
|
||||
state->call.key.dptr = &c->data[0];
|
||||
|
||||
state->node = ctdb->nodes[header.dmaster];
|
||||
state->state = CTDB_CALL_WAIT;
|
||||
state->header = header;
|
||||
state->ctdb_db = ctdb_db;
|
||||
state->reqid = c->hdr.reqid;
|
||||
|
||||
talloc_set_destructor(state, ctdb_client_call_destructor);
|
||||
|
||||
ctdb_client_queue_pkt(ctdb, &state->c->hdr);
|
||||
|
||||
/*XXX set up timeout to cleanup if server doesnt respond
|
||||
event_add_timed(ctdb->ev, state, timeval_current_ofs(CTDB_REQ_TIMEOUT, 0),
|
||||
ctdb_call_timeout, state);
|
||||
*/
|
||||
ctdb_client_queue_pkt(ctdb, &c->hdr);
|
||||
|
||||
return state;
|
||||
}
|
||||
@ -345,7 +394,7 @@ struct ctdb_call_state *ctdb_call_send(struct ctdb_db_context *ctdb_db,
|
||||
*/
|
||||
int ctdb_call(struct ctdb_db_context *ctdb_db, struct ctdb_call *call)
|
||||
{
|
||||
struct ctdb_call_state *state;
|
||||
struct ctdb_client_call_state *state;
|
||||
|
||||
state = ctdb_call_send(ctdb_db, call);
|
||||
return ctdb_call_recv(state, call);
|
||||
|
@ -103,3 +103,17 @@ uint32_t ctdb_hash(const TDB_DATA *key)
|
||||
return (1103515243 * value + 12345);
|
||||
}
|
||||
|
||||
/*
|
||||
a type checking varient of idr_find
|
||||
*/
|
||||
void *_idr_find_type(struct idr_context *idp, int id, const char *type, const char *location)
|
||||
{
|
||||
void *p = idr_find(idp, id);
|
||||
if (p && talloc_check_name(p, type) == NULL) {
|
||||
DEBUG(0,("%s idr_find_type expected type %s but got %s\n",
|
||||
location, type, talloc_get_name(p)));
|
||||
return NULL;
|
||||
}
|
||||
return p;
|
||||
}
|
||||
|
||||
|
@ -167,8 +167,8 @@ int ctdb_set_message_handler(struct ctdb_context *ctdb, uint32_t srvid,
|
||||
|
||||
|
||||
int ctdb_call(struct ctdb_db_context *ctdb_db, struct ctdb_call *call);
|
||||
struct ctdb_call_state *ctdb_call_send(struct ctdb_db_context *ctdb_db, struct ctdb_call *call);
|
||||
int ctdb_call_recv(struct ctdb_call_state *state, struct ctdb_call *call);
|
||||
struct ctdb_client_call_state *ctdb_call_send(struct ctdb_db_context *ctdb_db, struct ctdb_call *call);
|
||||
int ctdb_call_recv(struct ctdb_client_call_state *state, struct ctdb_call *call);
|
||||
|
||||
/* send a ctdb message */
|
||||
int ctdb_send_message(struct ctdb_context *ctdb, uint32_t vnn,
|
||||
|
@ -459,4 +459,11 @@ struct ctdb_call_state *ctdb_daemon_call_send_remote(struct ctdb_db_context *ctd
|
||||
|
||||
void ctdb_request_finished(struct ctdb_context *ctdb, struct ctdb_req_header *hdr);
|
||||
|
||||
int ctdb_call_local(struct ctdb_db_context *ctdb_db, struct ctdb_call *call,
|
||||
struct ctdb_ltdb_header *header, TDB_DATA *data,
|
||||
uint32_t caller);
|
||||
|
||||
void *_idr_find_type(struct idr_context *idp, int id, const char *type, const char *location);
|
||||
#define idr_find_type(idp, id, type) (type *)_idr_find_type(idp, id, #type, __location__)
|
||||
|
||||
#endif
|
||||
|
@ -3,7 +3,7 @@
|
||||
killall -q ctdb_bench
|
||||
|
||||
echo "Trying 2 nodes"
|
||||
bin/ctdb_bench --nlist tests/nodes.txt --listen 127.0.0.2:9001 $* &
|
||||
bin/ctdb_bench --nlist tests/nodes.txt --listen 127.0.0.1:9001 $*
|
||||
$VALGRIND bin/ctdb_bench --nlist tests/nodes.txt --listen 127.0.0.2:9001 $* &
|
||||
$VALGRIND bin/ctdb_bench --nlist tests/nodes.txt --listen 127.0.0.1:9001 $*
|
||||
wait
|
||||
|
||||
|
Loading…
x
Reference in New Issue
Block a user