mirror of
https://github.com/samba-team/samba.git
synced 2025-03-19 18:50:24 +03:00
merge from tridge
(This used to be ctdb commit 45081eadb89fdaf8c831e161ae7df816beda934b)
This commit is contained in:
commit
9f0c8b566c
@ -712,8 +712,6 @@ struct ctdb_call_state *ctdb_daemon_call_send(struct ctdb_db_context *ctdb_db,
|
||||
*/
|
||||
int ctdb_daemon_call_recv(struct ctdb_call_state *state, struct ctdb_call *call)
|
||||
{
|
||||
struct ctdb_fetch_handle *rec;
|
||||
|
||||
while (state->state < CTDB_CALL_DONE) {
|
||||
event_loop_once(state->node->ctdb->ev);
|
||||
}
|
||||
@ -723,18 +721,6 @@ int ctdb_daemon_call_recv(struct ctdb_call_state *state, struct ctdb_call *call)
|
||||
return -1;
|
||||
}
|
||||
|
||||
rec = state->fetch_private;
|
||||
|
||||
/* ugly hack to manage forced migration */
|
||||
if (rec != NULL) {
|
||||
rec->header = state->header;
|
||||
rec->data->dptr = talloc_steal(rec, state->call.reply_data.dptr);
|
||||
rec->data->dsize = state->call.reply_data.dsize;
|
||||
talloc_set_name_const(rec->data->dptr, __location__);
|
||||
talloc_free(state);
|
||||
return 0;
|
||||
}
|
||||
|
||||
if (state->call.reply_data.dsize) {
|
||||
call->reply_data.dptr = talloc_memdup(state->node->ctdb,
|
||||
state->call.reply_data.dptr,
|
||||
|
@ -48,44 +48,6 @@ static void ctdb_reply_connect_wait(struct ctdb_context *ctdb,
|
||||
ctdb->num_connected = r->num_connected;
|
||||
}
|
||||
|
||||
enum fetch_lock_state { CTDB_FETCH_LOCK_WAIT, CTDB_FETCH_LOCK_DONE, CTDB_FETCH_LOCK_ERROR };
|
||||
|
||||
/*
|
||||
state of a in-progress ctdb call
|
||||
*/
|
||||
struct ctdb_fetch_lock_state {
|
||||
enum fetch_lock_state state;
|
||||
struct ctdb_db_context *ctdb_db;
|
||||
struct ctdb_reply_fetch_lock *r;
|
||||
struct ctdb_req_fetch_lock *req;
|
||||
struct ctdb_ltdb_header header;
|
||||
};
|
||||
|
||||
|
||||
|
||||
/*
|
||||
called in the client when we receive a CTDB_REPLY_FETCH_LOCK from the daemon
|
||||
|
||||
This packet comes in response to a CTDB_REQ_FETCH_LOCK request packet. It
|
||||
contains any reply data from the call
|
||||
*/
|
||||
void ctdb_reply_fetch_lock(struct ctdb_context *ctdb, struct ctdb_req_header *hdr)
|
||||
{
|
||||
struct ctdb_reply_fetch_lock *r = (struct ctdb_reply_fetch_lock *)hdr;
|
||||
struct ctdb_fetch_lock_state *state;
|
||||
|
||||
state = idr_find_type(ctdb->idr, hdr->reqid, struct ctdb_fetch_lock_state);
|
||||
if (state == NULL) {
|
||||
DEBUG(0, ("reqid %d not found at %s\n", hdr->reqid,
|
||||
__location__));
|
||||
return;
|
||||
}
|
||||
|
||||
state->r = talloc_steal(state, r);
|
||||
|
||||
state->state = CTDB_FETCH_LOCK_DONE;
|
||||
}
|
||||
|
||||
/*
|
||||
state of a in-progress ctdb call in client
|
||||
*/
|
||||
@ -176,10 +138,6 @@ static void ctdb_client_read_cb(uint8_t *data, size_t cnt, void *args)
|
||||
ctdb_reply_connect_wait(ctdb, hdr);
|
||||
break;
|
||||
|
||||
case CTDB_REPLY_FETCH_LOCK:
|
||||
ctdb_reply_fetch_lock(ctdb, hdr);
|
||||
break;
|
||||
|
||||
default:
|
||||
DEBUG(0,("bogus operation code:%d\n",hdr->operation));
|
||||
}
|
||||
@ -505,85 +463,6 @@ void ctdb_connect_wait(struct ctdb_context *ctdb)
|
||||
ctdb_daemon_connect_wait(ctdb);
|
||||
}
|
||||
|
||||
static int ctdb_fetch_lock_destructor(struct ctdb_fetch_lock_state *state)
|
||||
{
|
||||
idr_remove(state->ctdb_db->ctdb->idr, state->req->hdr.reqid);
|
||||
return 0;
|
||||
}
|
||||
|
||||
/*
|
||||
send a fetch lock request from the client to the daemon. This just asks for
|
||||
a record migration to the current node
|
||||
*/
|
||||
static struct ctdb_fetch_lock_state *ctdb_client_fetch_lock_send(struct ctdb_db_context *ctdb_db,
|
||||
TALLOC_CTX *mem_ctx,
|
||||
TDB_DATA key)
|
||||
{
|
||||
struct ctdb_fetch_lock_state *state;
|
||||
struct ctdb_context *ctdb = ctdb_db->ctdb;
|
||||
struct ctdb_req_fetch_lock *req;
|
||||
int len, res;
|
||||
|
||||
/* if the domain socket is not yet open, open it */
|
||||
if (ctdb->daemon.sd==-1) {
|
||||
ux_socket_connect(ctdb);
|
||||
}
|
||||
|
||||
state = talloc_zero(ctdb_db, struct ctdb_fetch_lock_state);
|
||||
if (state == NULL) {
|
||||
DEBUG(0, (__location__ " failed to allocate state\n"));
|
||||
return NULL;
|
||||
}
|
||||
state->state = CTDB_FETCH_LOCK_WAIT;
|
||||
state->ctdb_db = ctdb_db;
|
||||
len = offsetof(struct ctdb_req_fetch_lock, key) + key.dsize;
|
||||
state->req = req = ctdbd_allocate_pkt(state, len);
|
||||
if (req == NULL) {
|
||||
DEBUG(0, (__location__ " failed to allocate packet\n"));
|
||||
return NULL;
|
||||
}
|
||||
ZERO_STRUCT(*req);
|
||||
talloc_set_name_const(req, "ctdbd req_fetch_lock packet");
|
||||
|
||||
req->hdr.length = len;
|
||||
req->hdr.ctdb_magic = CTDB_MAGIC;
|
||||
req->hdr.ctdb_version = CTDB_VERSION;
|
||||
req->hdr.operation = CTDB_REQ_FETCH_LOCK;
|
||||
req->hdr.reqid = idr_get_new(ctdb->idr, state, 0xFFFF);
|
||||
req->db_id = ctdb_db->db_id;
|
||||
req->keylen = key.dsize;
|
||||
memcpy(&req->key[0], key.dptr, key.dsize);
|
||||
|
||||
talloc_set_destructor(state, ctdb_fetch_lock_destructor);
|
||||
|
||||
res = ctdb_client_queue_pkt(ctdb, &req->hdr);
|
||||
if (res != 0) {
|
||||
return NULL;
|
||||
}
|
||||
|
||||
return state;
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
make a recv call to the local ctdb daemon - called from client context
|
||||
|
||||
This is called when the program wants to wait for a ctdb_fetch_lock to complete and get the
|
||||
results. This call will block unless the call has already completed.
|
||||
*/
|
||||
int ctdb_client_fetch_lock_recv(struct ctdb_fetch_lock_state *state)
|
||||
{
|
||||
while (state->state < CTDB_FETCH_LOCK_DONE) {
|
||||
event_loop_once(state->ctdb_db->ctdb->ev);
|
||||
}
|
||||
if (state->state != CTDB_FETCH_LOCK_DONE) {
|
||||
talloc_free(state);
|
||||
return -1;
|
||||
}
|
||||
talloc_free(state);
|
||||
return 0;
|
||||
}
|
||||
|
||||
/*
|
||||
cancel a ctdb_fetch_lock operation, releasing the lock
|
||||
*/
|
||||
@ -593,6 +472,19 @@ static int fetch_lock_destructor(struct ctdb_record_handle *h)
|
||||
return 0;
|
||||
}
|
||||
|
||||
/*
|
||||
force the migration of a record to this node
|
||||
*/
|
||||
static int ctdb_client_force_migration(struct ctdb_db_context *ctdb_db, TDB_DATA key)
|
||||
{
|
||||
struct ctdb_call call;
|
||||
ZERO_STRUCT(call);
|
||||
call.call_id = CTDB_NULL_FUNC;
|
||||
call.key = key;
|
||||
call.flags = CTDB_IMMEDIATE_MIGRATION;
|
||||
return ctdb_call(ctdb_db, &call);
|
||||
}
|
||||
|
||||
/*
|
||||
get a lock on a record, and return the records data. Blocks until it gets the lock
|
||||
*/
|
||||
@ -601,7 +493,6 @@ struct ctdb_record_handle *ctdb_fetch_lock(struct ctdb_db_context *ctdb_db, TALL
|
||||
{
|
||||
int ret;
|
||||
struct ctdb_record_handle *h;
|
||||
struct ctdb_fetch_lock_state *state;
|
||||
|
||||
/*
|
||||
procedure is as follows:
|
||||
@ -654,13 +545,10 @@ again:
|
||||
DEBUG(4,("ctdb_fetch_lock: done local fetch\n"));
|
||||
|
||||
if (h->header.dmaster != ctdb_db->ctdb->vnn) {
|
||||
/* we're not the dmaster - ask the ctdb daemon to make us dmaster */
|
||||
state = ctdb_client_fetch_lock_send(ctdb_db, mem_ctx, key);
|
||||
ctdb_ltdb_unlock(ctdb_db, key);
|
||||
DEBUG(4,("ctdb_fetch_lock: done fetch_lock_send\n"));
|
||||
ret = ctdb_client_fetch_lock_recv(state);
|
||||
ret = ctdb_client_force_migration(ctdb_db, key);
|
||||
if (ret != 0) {
|
||||
DEBUG(4,("ctdb_fetch_lock: fetch_lock_recv failed\n"));
|
||||
DEBUG(4,("ctdb_fetch_lock: force_migration failed\n"));
|
||||
talloc_free(h);
|
||||
return NULL;
|
||||
}
|
||||
|
@ -122,41 +122,6 @@ static void daemon_request_register_message_handler(struct ctdb_client *client,
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
send a fetch lock request (actually a ctdb_call()) to a remote node
|
||||
*/
|
||||
static struct ctdb_call_state *ctdb_daemon_fetch_lock_send(struct ctdb_db_context *ctdb_db,
|
||||
TALLOC_CTX *mem_ctx,
|
||||
TDB_DATA key, struct ctdb_ltdb_header *header,
|
||||
TDB_DATA *data)
|
||||
{
|
||||
struct ctdb_call *call;
|
||||
struct ctdb_fetch_handle *rec;
|
||||
struct ctdb_call_state *state;
|
||||
|
||||
rec = talloc(mem_ctx, struct ctdb_fetch_handle);
|
||||
CTDB_NO_MEMORY_NULL(ctdb_db->ctdb, rec);
|
||||
|
||||
|
||||
call = talloc(rec, struct ctdb_call);
|
||||
ZERO_STRUCT(*call);
|
||||
call->call_id = CTDB_FETCH_FUNC;
|
||||
call->key = key;
|
||||
call->flags = CTDB_IMMEDIATE_MIGRATION;
|
||||
|
||||
rec->ctdb_db = ctdb_db;
|
||||
rec->key = key;
|
||||
rec->key.dptr = talloc_memdup(rec, key.dptr, key.dsize);
|
||||
rec->data = data;
|
||||
|
||||
state = ctdb_daemon_call_send_remote(ctdb_db, call, header);
|
||||
state->fetch_private = rec;
|
||||
talloc_steal(state, rec);
|
||||
talloc_steal(mem_ctx, state);
|
||||
|
||||
return state;
|
||||
}
|
||||
|
||||
/*
|
||||
called when the daemon gets a shutdown request from a client
|
||||
*/
|
||||
@ -211,96 +176,6 @@ static void daemon_request_shutdown(struct ctdb_client *client,
|
||||
}
|
||||
|
||||
|
||||
struct client_fetch_lock_data {
|
||||
struct ctdb_client *client;
|
||||
struct ctdb_req_fetch_lock *f;
|
||||
};
|
||||
|
||||
/*
|
||||
send a fetch lock error reply to the client
|
||||
*/
|
||||
static void daemon_fetch_lock_reply(struct ctdb_client *client,
|
||||
struct ctdb_req_fetch_lock *f,
|
||||
int state)
|
||||
{
|
||||
struct ctdb_reply_fetch_lock r;
|
||||
|
||||
ZERO_STRUCT(r);
|
||||
r.hdr.length = sizeof(r);
|
||||
r.hdr.ctdb_magic = CTDB_MAGIC;
|
||||
r.hdr.ctdb_version = CTDB_VERSION;
|
||||
r.hdr.operation = CTDB_REPLY_FETCH_LOCK;
|
||||
r.hdr.reqid = f->hdr.reqid;
|
||||
r.state = state;
|
||||
|
||||
ctdb_queue_send(client->queue, (uint8_t *)&r.hdr, r.hdr.length);
|
||||
}
|
||||
|
||||
/*
|
||||
called when a remote fetch lock finishes
|
||||
*/
|
||||
static void daemon_fetch_lock_complete(struct ctdb_call_state *state)
|
||||
{
|
||||
struct client_fetch_lock_data *data = talloc_get_type(state->async.private_data,
|
||||
struct client_fetch_lock_data);
|
||||
struct ctdb_client *client = talloc_get_type(data->client, struct ctdb_client);
|
||||
|
||||
daemon_fetch_lock_reply(client, data->f, 0);
|
||||
talloc_free(state);
|
||||
}
|
||||
|
||||
|
||||
|
||||
/*
|
||||
called when the daemon gets a fetch lock request from a client
|
||||
*/
|
||||
static void daemon_request_fetch_lock(struct ctdb_client *client,
|
||||
struct ctdb_req_fetch_lock *f)
|
||||
{
|
||||
struct ctdb_call_state *state;
|
||||
TDB_DATA key, *data;
|
||||
struct ctdb_db_context *ctdb_db;
|
||||
struct client_fetch_lock_data *fl_data;
|
||||
struct ctdb_ltdb_header header;
|
||||
int ret;
|
||||
|
||||
ctdb_db = find_ctdb_db(client->ctdb, f->db_id);
|
||||
if (ctdb_db == NULL) {
|
||||
daemon_fetch_lock_reply(client, f, -1);
|
||||
return;
|
||||
}
|
||||
|
||||
key.dsize = f->keylen;
|
||||
key.dptr = &f->key[0];
|
||||
|
||||
/* XXX - needs non-blocking lock here */
|
||||
|
||||
ret = ctdb_ltdb_fetch(ctdb_db, key, &header, ctdb_db, NULL);
|
||||
if (ret != 0) {
|
||||
daemon_fetch_lock_reply(client, f, -1);
|
||||
return;
|
||||
}
|
||||
|
||||
if (header.dmaster == ctdb_db->ctdb->vnn) {
|
||||
/* we already are the dmaster */
|
||||
daemon_fetch_lock_reply(client, f, 0);
|
||||
return;
|
||||
}
|
||||
|
||||
data = talloc(client, TDB_DATA);
|
||||
data->dptr = NULL;
|
||||
data->dsize = 0;
|
||||
|
||||
state = ctdb_daemon_fetch_lock_send(ctdb_db, client, key, &header, data);
|
||||
talloc_steal(state, data);
|
||||
|
||||
fl_data = talloc(state, struct client_fetch_lock_data);
|
||||
fl_data->client = client;
|
||||
fl_data->f = talloc_steal(fl_data, f);
|
||||
|
||||
state->async.fn = daemon_fetch_lock_complete;
|
||||
state->async.private_data = fl_data;
|
||||
}
|
||||
|
||||
/*
|
||||
called when the daemon gets a connect wait request from a client
|
||||
@ -510,10 +385,6 @@ static void daemon_incoming_packet(struct ctdb_client *client, void *data, size_
|
||||
daemon_request_connect_wait(client, (struct ctdb_req_connect_wait *)hdr);
|
||||
break;
|
||||
|
||||
case CTDB_REQ_FETCH_LOCK:
|
||||
daemon_request_fetch_lock(client, (struct ctdb_req_fetch_lock *)hdr);
|
||||
break;
|
||||
|
||||
case CTDB_REQ_SHUTDOWN:
|
||||
daemon_request_shutdown(client, (struct ctdb_req_shutdown *)hdr);
|
||||
break;
|
||||
|
@ -45,9 +45,8 @@ struct ctdb_db_context *ctdb_db_handle(struct ctdb_context *ctdb, const char *na
|
||||
/*
|
||||
this is the dummy null procedure that all databases support
|
||||
*/
|
||||
static int ctdb_fetch_func(struct ctdb_call_info *call)
|
||||
static int ctdb_null_func(struct ctdb_call_info *call)
|
||||
{
|
||||
call->reply_data = &call->record_data;
|
||||
return 0;
|
||||
}
|
||||
|
||||
@ -105,9 +104,10 @@ struct ctdb_db_context *ctdb_attach(struct ctdb_context *ctdb, const char *name,
|
||||
|
||||
|
||||
/*
|
||||
all databases support the "fetch" function. we need this in order to do forced migration of records
|
||||
all databases support the "null" function. we need this in
|
||||
order to do forced migration of records
|
||||
*/
|
||||
ret = ctdb_set_call(ctdb_db, ctdb_fetch_func, CTDB_FETCH_FUNC);
|
||||
ret = ctdb_set_call(ctdb_db, ctdb_null_func, CTDB_NULL_FUNC);
|
||||
if (ret != 0) {
|
||||
talloc_free(ctdb_db);
|
||||
return NULL;
|
||||
|
@ -30,8 +30,8 @@
|
||||
#define CTDB_DS_ALIGNMENT 8
|
||||
|
||||
|
||||
#define CTDB_NULL_FUNC 0xf0000001
|
||||
|
||||
#define CTDB_FETCH_FUNC 0xf0000001
|
||||
/*
|
||||
an installed ctdb remote call
|
||||
*/
|
||||
@ -193,7 +193,6 @@ struct ctdb_call_state {
|
||||
struct ctdb_call call;
|
||||
int redirect_count;
|
||||
struct ctdb_ltdb_header header;
|
||||
void *fetch_private;
|
||||
struct {
|
||||
void (*fn)(struct ctdb_call_state *);
|
||||
void *private_data;
|
||||
@ -226,9 +225,7 @@ enum ctdb_operation {
|
||||
CTDB_REQ_REGISTER = 1000,
|
||||
CTDB_REQ_CONNECT_WAIT = 1001,
|
||||
CTDB_REPLY_CONNECT_WAIT = 1002,
|
||||
CTDB_REQ_FETCH_LOCK = 1003,
|
||||
CTDB_REPLY_FETCH_LOCK = 1004,
|
||||
CTDB_REQ_SHUTDOWN = 1005
|
||||
CTDB_REQ_SHUTDOWN = 1003
|
||||
};
|
||||
|
||||
#define CTDB_MAGIC 0x43544442 /* CTDB */
|
||||
|
Loading…
x
Reference in New Issue
Block a user