mirror of
https://github.com/samba-team/samba.git
synced 2024-12-22 13:34:15 +03:00
initial change to remove store_unlock pdu and use tdb chainlock in the client
(This used to be ctdb commit 87dd265d2c61125ca2fa922cfcf9371a234fff0c)
This commit is contained in:
parent
ee9712620f
commit
481e029768
@ -782,27 +782,3 @@ struct ctdb_record_handle *ctdb_fetch_lock(struct ctdb_db_context *ctdb_db, TALL
|
||||
}
|
||||
|
||||
|
||||
int ctdb_store_unlock(struct ctdb_record_handle *rec, TDB_DATA data)
|
||||
{
|
||||
int ret;
|
||||
struct ctdb_ltdb_header header;
|
||||
struct ctdb_db_context *ctdb_db = talloc_get_type(rec->ctdb_db, struct ctdb_db_context);
|
||||
|
||||
if (ctdb_db->ctdb->flags & CTDB_FLAG_DAEMON_MODE) {
|
||||
return ctdb_client_store_unlock(rec, data);
|
||||
}
|
||||
|
||||
/* should be avoided if possible hang header off rec ? */
|
||||
ret = ctdb_ltdb_fetch(rec->ctdb_db, rec->key, &header, NULL, NULL);
|
||||
if (ret) {
|
||||
ctdb_set_error(rec->ctdb_db->ctdb, "Fetch of locally held record failed");
|
||||
talloc_free(rec);
|
||||
return ret;
|
||||
}
|
||||
|
||||
ret = ctdb_ltdb_store(rec->ctdb_db, rec->key, &header, data);
|
||||
|
||||
talloc_free(rec);
|
||||
|
||||
return ret;
|
||||
}
|
||||
|
@ -78,33 +78,6 @@ void ctdb_reply_fetch_lock(struct ctdb_context *ctdb, struct ctdb_req_header *hd
|
||||
}
|
||||
}
|
||||
|
||||
/*
|
||||
called in the client when we receive a CTDB_REPLY_STORE_UNLOCK from the daemon
|
||||
|
||||
This packet comes in response to a CTDB_REQ_STORE_UNLOCK request packet. It
|
||||
contains any reply data from the call
|
||||
*/
|
||||
void ctdb_reply_store_unlock(struct ctdb_context *ctdb, struct ctdb_req_header *hdr)
|
||||
{
|
||||
struct ctdb_reply_store_unlock *c = (struct ctdb_reply_store_unlock *)hdr;
|
||||
struct ctdb_call_state *state;
|
||||
|
||||
state = idr_find(ctdb->idr, hdr->reqid);
|
||||
if (state == NULL) return;
|
||||
|
||||
state->call.status = c->state;
|
||||
|
||||
talloc_steal(state, c);
|
||||
|
||||
/* get an extra reference here - this prevents the free in ctdb_recv_pkt()
|
||||
from freeing the data */
|
||||
(void)talloc_reference(state, c);
|
||||
|
||||
state->state = CTDB_CALL_DONE;
|
||||
if (state->async.fn) {
|
||||
state->async.fn(state);
|
||||
}
|
||||
}
|
||||
/*
|
||||
this is called in the client, when data comes in from the daemon
|
||||
*/
|
||||
@ -151,10 +124,6 @@ static void ctdb_client_read_cb(uint8_t *data, size_t cnt, void *args)
|
||||
ctdb_reply_fetch_lock(ctdb, hdr);
|
||||
break;
|
||||
|
||||
case CTDB_REPLY_STORE_UNLOCK:
|
||||
ctdb_reply_store_unlock(ctdb, hdr);
|
||||
break;
|
||||
|
||||
default:
|
||||
printf("bogus operation code:%d\n",hdr->operation);
|
||||
}
|
||||
@ -525,61 +494,6 @@ struct ctdb_call_state *ctdb_client_fetch_lock_send(struct ctdb_db_context *ctdb
|
||||
}
|
||||
|
||||
|
||||
struct ctdb_call_state *ctdb_client_store_unlock_send(
|
||||
struct ctdb_record_handle *rh,
|
||||
TALLOC_CTX *mem_ctx,
|
||||
TDB_DATA data)
|
||||
{
|
||||
struct ctdb_call_state *state;
|
||||
struct ctdb_db_context *ctdb_db = talloc_get_type(rh->ctdb_db, struct ctdb_db_context);
|
||||
struct ctdb_context *ctdb = ctdb_db->ctdb;
|
||||
struct ctdb_req_store_unlock *req;
|
||||
int len, res;
|
||||
|
||||
/* if the domain socket is not yet open, open it */
|
||||
if (ctdb->daemon.sd==-1) {
|
||||
ux_socket_connect(ctdb);
|
||||
}
|
||||
|
||||
state = talloc_zero(ctdb_db, struct ctdb_call_state);
|
||||
if (state == NULL) {
|
||||
printf("failed to allocate state\n");
|
||||
return NULL;
|
||||
}
|
||||
state->state = CTDB_CALL_WAIT;
|
||||
state->ctdb_db = ctdb_db;
|
||||
len = offsetof(struct ctdb_req_store_unlock, data) + rh->key.dsize + data.dsize;
|
||||
state->c = ctdbd_allocate_pkt(ctdb, len);
|
||||
if (state->c == NULL) {
|
||||
printf("failed to allocate packet\n");
|
||||
return NULL;
|
||||
}
|
||||
ZERO_STRUCT(*state->c);
|
||||
talloc_set_name_const(state->c, "ctdbd req_store_unlock packet");
|
||||
talloc_steal(state, state->c);
|
||||
|
||||
req = (struct ctdb_req_store_unlock *)state->c;
|
||||
req->hdr.length = len;
|
||||
req->hdr.ctdb_magic = CTDB_MAGIC;
|
||||
req->hdr.ctdb_version = CTDB_VERSION;
|
||||
req->hdr.operation = CTDB_REQ_STORE_UNLOCK;
|
||||
req->hdr.reqid = idr_get_new(ctdb->idr, state, 0xFFFF);
|
||||
req->db_id = ctdb_db->db_id;
|
||||
req->keylen = rh->key.dsize;
|
||||
req->datalen = data.dsize;
|
||||
memcpy(&req->data[0], rh->key.dptr, rh->key.dsize);
|
||||
memcpy(&req->data[req->keylen], data.dptr, data.dsize);
|
||||
|
||||
res = ctdb_client_queue_pkt(ctdb, &req->hdr);
|
||||
if (res != 0) {
|
||||
return NULL;
|
||||
}
|
||||
|
||||
talloc_free(req);
|
||||
|
||||
return state;
|
||||
}
|
||||
|
||||
/*
|
||||
make a recv call to the local ctdb daemon - called from client context
|
||||
|
||||
@ -615,25 +529,6 @@ struct ctdb_record_handle *ctdb_client_fetch_lock_recv(struct ctdb_call_state *s
|
||||
return rec;
|
||||
}
|
||||
|
||||
/*
|
||||
make a recv call to the local ctdb daemon - called from client context
|
||||
|
||||
This is called when the program wants to wait for a ctdb_store_unlock to complete and get the
|
||||
results. This call will block unless the call has already completed.
|
||||
*/
|
||||
int ctdb_client_store_unlock_recv(struct ctdb_call_state *state, struct ctdb_record_handle *rec)
|
||||
{
|
||||
while (state->state < CTDB_CALL_DONE) {
|
||||
event_loop_once(state->ctdb_db->ctdb->ev);
|
||||
}
|
||||
if (state->state != CTDB_CALL_DONE) {
|
||||
ctdb_set_error(state->node->ctdb, "%s", state->errmsg);
|
||||
}
|
||||
|
||||
talloc_free(state);
|
||||
return state->state;
|
||||
}
|
||||
|
||||
struct ctdb_record_handle *ctdb_client_fetch_lock(struct ctdb_db_context *ctdb_db,
|
||||
TALLOC_CTX *mem_ctx,
|
||||
TDB_DATA key,
|
||||
@ -641,22 +536,53 @@ struct ctdb_record_handle *ctdb_client_fetch_lock(struct ctdb_db_context *ctdb_d
|
||||
{
|
||||
struct ctdb_call_state *state;
|
||||
struct ctdb_record_handle *rec;
|
||||
struct ctdb_ltdb_header header;
|
||||
int ret;
|
||||
|
||||
state = ctdb_client_fetch_lock_send(ctdb_db, mem_ctx, key);
|
||||
rec = ctdb_client_fetch_lock_recv(state, mem_ctx, key, data);
|
||||
ret = ctdb_ltdb_lock(ctdb_db, key);
|
||||
if (ret != 0) {
|
||||
printf("failed to lock ltdb record\n");
|
||||
return NULL;
|
||||
}
|
||||
|
||||
ret = ctdb_ltdb_fetch(ctdb_db, key, &header, ctdb_db, &data);
|
||||
if (ret != 0) {
|
||||
ctdb_ltdb_unlock(ctdb_db, key);
|
||||
return NULL;
|
||||
}
|
||||
|
||||
|
||||
if (header.dmaster != ctdb->vnn) {
|
||||
state = ctdb_client_fetch_lock_send(ctdb_db, mem_ctx, key);
|
||||
rec = ctdb_client_fetch_lock_recv(state, mem_ctx, key, data);
|
||||
return rec;
|
||||
}
|
||||
|
||||
return rec;
|
||||
}
|
||||
|
||||
/*
|
||||
a helper function for the client that will store the new data for the
|
||||
record and release the tdb chainlock
|
||||
*/
|
||||
int ctdb_client_store_unlock(struct ctdb_record_handle *rec, TDB_DATA data)
|
||||
{
|
||||
struct ctdb_call_state *state;
|
||||
int res;
|
||||
int ret;
|
||||
struct ctdb_ltdb_header header;
|
||||
struct ctdb_db_context *ctdb_db = talloc_get_type(rec->ctdb_db, struct ctdb_db_context);
|
||||
|
||||
state = ctdb_client_store_unlock_send(rec, rec, data);
|
||||
res = ctdb_client_store_unlock_recv(state, rec);
|
||||
/* should be avoided if possible hang header off rec ? */
|
||||
ret = ctdb_ltdb_fetch(rec->ctdb_db, rec->key, &header, NULL, NULL);
|
||||
if (ret) {
|
||||
ctdb_set_error(rec->ctdb_db->ctdb, "Fetch of locally held record failed");
|
||||
talloc_free(rec);
|
||||
return ret;
|
||||
}
|
||||
|
||||
ret = ctdb_ltdb_store(ctdb_db, rec->key, &header, data);
|
||||
|
||||
ctdb_ltdb_unlock(ctdb_db, rec->key);
|
||||
|
||||
talloc_free(rec);
|
||||
|
||||
return res;
|
||||
return ret;
|
||||
}
|
||||
|
@ -204,63 +204,6 @@ static void daemon_request_fetch_lock(struct ctdb_client *client,
|
||||
state->async.private_data = fl_data;
|
||||
}
|
||||
|
||||
/*
|
||||
called when the daemon gets a store unlock request from a client
|
||||
|
||||
this would never block?
|
||||
*/
|
||||
static void daemon_request_store_unlock(struct ctdb_client *client,
|
||||
struct ctdb_req_store_unlock *f)
|
||||
{
|
||||
struct ctdb_db_context *ctdb_db;
|
||||
struct ctdb_reply_store_unlock r;
|
||||
uint32_t caller = ctdb_get_vnn(client->ctdb);
|
||||
struct ctdb_ltdb_header header;
|
||||
TDB_DATA key, data;
|
||||
int res;
|
||||
|
||||
ctdb_db = find_ctdb_db(client->ctdb, f->db_id);
|
||||
|
||||
/* write the data to ltdb */
|
||||
key.dsize = f->keylen;
|
||||
key.dptr = &f->data[0];
|
||||
res = ctdb_ltdb_fetch(ctdb_db, key, &header, NULL, NULL);
|
||||
if (res) {
|
||||
ctdb_set_error(ctdb_db->ctdb, "Fetch of locally held record failed");
|
||||
res = -1;
|
||||
goto done;
|
||||
}
|
||||
if (header.laccessor != caller) {
|
||||
header.lacount = 0;
|
||||
}
|
||||
header.laccessor = caller;
|
||||
header.lacount++;
|
||||
data.dsize = f->datalen;
|
||||
data.dptr = &f->data[f->keylen];
|
||||
res = ctdb_ltdb_store(ctdb_db, key, &header, data);
|
||||
if ( res != 0) {
|
||||
ctdb_set_error(ctdb_db->ctdb, "ctdb_call tdb_store failed\n");
|
||||
}
|
||||
|
||||
|
||||
done:
|
||||
/* now send the reply */
|
||||
ZERO_STRUCT(r);
|
||||
|
||||
r.hdr.length = sizeof(r);
|
||||
r.hdr.ctdb_magic = CTDB_MAGIC;
|
||||
r.hdr.ctdb_version = CTDB_VERSION;
|
||||
r.hdr.operation = CTDB_REPLY_STORE_UNLOCK;
|
||||
r.hdr.reqid = f->hdr.reqid;
|
||||
r.state = res;
|
||||
|
||||
res = ctdb_queue_send(client->queue, (uint8_t *)&r.hdr, r.hdr.length);
|
||||
if (res != 0) {
|
||||
printf("Failed to queue a store unlock response\n");
|
||||
return;
|
||||
}
|
||||
}
|
||||
|
||||
/*
|
||||
called when the daemon gets a connect wait request from a client
|
||||
*/
|
||||
@ -421,9 +364,6 @@ static void client_incoming_packet(struct ctdb_client *client, void *data, size_
|
||||
case CTDB_REQ_FETCH_LOCK:
|
||||
daemon_request_fetch_lock(client, (struct ctdb_req_fetch_lock *)hdr);
|
||||
break;
|
||||
case CTDB_REQ_STORE_UNLOCK:
|
||||
daemon_request_store_unlock(client, (struct ctdb_req_store_unlock *)hdr);
|
||||
break;
|
||||
default:
|
||||
printf("daemon: unrecognized operation:%d\n",hdr->operation);
|
||||
}
|
||||
|
@ -238,50 +238,8 @@ void fetch_lock(int fd, uint32_t db_id, TDB_DATA key)
|
||||
|
||||
void store_unlock(int fd, uint32_t db_id, TDB_DATA key, TDB_DATA data)
|
||||
{
|
||||
struct ctdb_req_store_unlock *req;
|
||||
struct ctdb_reply_store_unlock *rep;
|
||||
uint32_t length;
|
||||
int len, cnt, tot;
|
||||
|
||||
len = offsetof(struct ctdb_req_store_unlock, data) + key.dsize + data.dsize;
|
||||
req = malloc(len);
|
||||
|
||||
req->hdr.length = len;
|
||||
req->hdr.ctdb_magic = CTDB_MAGIC;
|
||||
req->hdr.ctdb_version = CTDB_VERSION;
|
||||
req->hdr.operation = CTDB_REQ_STORE_UNLOCK;
|
||||
req->hdr.reqid = 1;
|
||||
req->db_id = db_id;
|
||||
req->keylen = key.dsize;
|
||||
req->datalen = data.dsize;
|
||||
memcpy(&req->data[0], key.dptr, key.dsize);
|
||||
memcpy(&req->data[key.dsize], data.dptr, data.dsize);
|
||||
|
||||
cnt=write(fd, req, len);
|
||||
|
||||
|
||||
/* wait fot the reply */
|
||||
/* read the 4 bytes of length for the pdu */
|
||||
cnt=0;
|
||||
tot=4;
|
||||
while(cnt!=tot){
|
||||
int numread;
|
||||
numread=read(fd, ((char *)&length)+cnt, tot-cnt);
|
||||
if(numread>0){
|
||||
cnt+=numread;
|
||||
}
|
||||
}
|
||||
/* read the rest of the pdu */
|
||||
rep = malloc(length);
|
||||
tot=length;
|
||||
while(cnt!=tot){
|
||||
int numread;
|
||||
numread=read(fd, ((char *)rep)+cnt, tot-cnt);
|
||||
if(numread>0){
|
||||
cnt+=numread;
|
||||
}
|
||||
}
|
||||
printf("store unlock reply: state:%d\n",rep->state);
|
||||
/*XXX write the tdb record and drop the chainlock*/
|
||||
printf("store_unlock example not implemented\n");
|
||||
}
|
||||
|
||||
int main(int argc, const char *argv[])
|
||||
|
@ -172,19 +172,15 @@ int ctdb_send_message(struct ctdb_context *ctdb, uint32_t vnn,
|
||||
|
||||
|
||||
/*
|
||||
fetch and lock a ctdb record. Underneath this will force the
|
||||
Fetch a ctdb record from a remote node
|
||||
. Underneath this will force the
|
||||
dmaster for the record to be moved to the local node.
|
||||
|
||||
The lock is released when is talloc_free() is called on the
|
||||
returned ctdb_record_handle.
|
||||
*/
|
||||
struct ctdb_record_handle *ctdb_fetch_lock(struct ctdb_db_context *ctdb_db, TALLOC_CTX *mem_ctx, TDB_DATA key, TDB_DATA *data);
|
||||
|
||||
/*
|
||||
change the data in a record held with a ctdb_record_handle
|
||||
if the new data is zero length, this implies a delete of the record
|
||||
*/
|
||||
int ctdb_store_unlock(struct ctdb_record_handle *rec, TDB_DATA data);
|
||||
int ctdb_client_store_unlock(struct ctdb_record_handle *rec, TDB_DATA data);
|
||||
|
||||
|
||||
int ctdb_register_message_handler(struct ctdb_context *ctdb,
|
||||
TALLOC_CTX *mem_ctx,
|
||||
@ -194,4 +190,5 @@ int ctdb_register_message_handler(struct ctdb_context *ctdb,
|
||||
|
||||
struct ctdb_db_context *find_ctdb_db(struct ctdb_context *ctdb, uint32_t id);
|
||||
|
||||
|
||||
#endif
|
||||
|
@ -218,8 +218,6 @@ enum ctdb_operation {
|
||||
CTDB_REPLY_CONNECT_WAIT = 1002,
|
||||
CTDB_REQ_FETCH_LOCK = 1003,
|
||||
CTDB_REPLY_FETCH_LOCK = 1004,
|
||||
CTDB_REQ_STORE_UNLOCK = 1005,
|
||||
CTDB_REPLY_STORE_UNLOCK = 1006
|
||||
};
|
||||
|
||||
#define CTDB_MAGIC 0x43544442 /* CTDB */
|
||||
@ -317,18 +315,6 @@ struct ctdb_reply_fetch_lock {
|
||||
uint32_t datalen;
|
||||
uint8_t data[1]; /* data[] */
|
||||
};
|
||||
struct ctdb_req_store_unlock {
|
||||
struct ctdb_req_header hdr;
|
||||
uint32_t db_id;
|
||||
uint32_t keylen;
|
||||
uint32_t datalen;
|
||||
uint8_t data[1]; /* key[] and data[] */
|
||||
};
|
||||
|
||||
struct ctdb_reply_store_unlock {
|
||||
struct ctdb_req_header hdr;
|
||||
uint32_t state;
|
||||
};
|
||||
|
||||
/* internal prototypes */
|
||||
void ctdb_set_error(struct ctdb_context *ctdb, const char *fmt, ...) PRINTF_ATTRIBUTE(2,3);
|
||||
@ -448,11 +434,6 @@ struct ctdb_record_handle *ctdb_client_fetch_lock(struct ctdb_db_context *ctdb_d
|
||||
TALLOC_CTX *mem_ctx,
|
||||
TDB_DATA key, TDB_DATA *data);
|
||||
|
||||
/*
|
||||
do a store unlock from a client to the local daemon
|
||||
*/
|
||||
int ctdb_client_store_unlock(struct ctdb_record_handle *rec, TDB_DATA data);
|
||||
|
||||
struct lockwait_handle *ctdb_lockwait(struct ctdb_db_context *ctdb_db,
|
||||
TDB_DATA key,
|
||||
void (*callback)(void *), void *private_data);
|
||||
|
@ -88,7 +88,7 @@ static void bench_fetch_1node(struct ctdb_context *ctdb)
|
||||
msg_count, ctdb_get_vnn(ctdb));
|
||||
data.dsize = strlen((const char *)data.dptr)+1;
|
||||
|
||||
ret = ctdb_store_unlock(rec, data);
|
||||
ret = ctdb_client_store_unlock(rec, data);
|
||||
if (ret != 0) {
|
||||
printf("Failed to store record\n");
|
||||
}
|
||||
|
@ -59,7 +59,7 @@ void test1(struct ctdb_db_context *ctdb_db)
|
||||
|
||||
store_data.dptr = discard_const("data to store");
|
||||
store_data.dsize = strlen((const char *)store_data.dptr)+1;
|
||||
ret = ctdb_store_unlock(rh, store_data);
|
||||
ret = ctdb_client_store_unlock(rh, store_data);
|
||||
|
||||
rh = ctdb_fetch_lock(ctdb_db, ctdb_db, key, &data2);
|
||||
/* hopefully data2 will now contain the record written above */
|
||||
@ -71,7 +71,7 @@ void test1(struct ctdb_db_context *ctdb_db)
|
||||
}
|
||||
|
||||
/* just write it back to unlock it */
|
||||
ret = ctdb_store_unlock(rh, store_data);
|
||||
ret = ctdb_client_store_unlock(rh, store_data);
|
||||
}
|
||||
|
||||
void child(int srvid, struct event_context *ev, struct ctdb_context *ctdb, struct ctdb_db_context *ctdb_db)
|
||||
|
Loading…
Reference in New Issue
Block a user