1
0
mirror of https://github.com/samba-team/samba.git synced 2024-12-22 13:34:15 +03:00

ctdb-daemon: Defer all calls when processing dmaster packets

When CTDB receives DMASTER_REQUEST or DMASTER_REPLY packet, the specified
record needs to be updated as soon as possible to avoid inconsistent
dmaster information between nodes.  During this time, queue up all calls
for that record and process them only after dmaster request/reply has
been processed.

Signed-off-by: Amitay Isaacs <amitay@gmail.com>
Reviewed-by: Martin Schwenke <martin@meltin.net>
This commit is contained in:
Amitay Isaacs 2014-08-15 15:20:36 +10:00 committed by Martin Schwenke
parent deb7bb89b3
commit ef59f2e6bb
3 changed files with 136 additions and 1 deletions

View File

@ -594,6 +594,7 @@ struct ctdb_db_context {
so we can avoid sending duplicate fetch requests
*/
struct trbt_tree *deferred_fetch;
struct trbt_tree *defer_dmaster;
struct ctdb_db_statistics statistics;

View File

@ -409,7 +409,125 @@ static void ctdb_become_dmaster(struct ctdb_db_context *ctdb_db,
}
}
struct dmaster_defer_call {
struct dmaster_defer_call *next, *prev;
struct ctdb_context *ctdb;
struct ctdb_req_header *hdr;
};
struct dmaster_defer_queue {
struct dmaster_defer_call *deferred_calls;
};
static void dmaster_defer_reprocess(struct tevent_context *ev,
struct tevent_timer *te,
struct timeval t,
void *private_data)
{
struct dmaster_defer_call *call = talloc_get_type(
private_data, struct dmaster_defer_call);
ctdb_input_pkt(call->ctdb, call->hdr);
talloc_free(call);
}
static int dmaster_defer_queue_destructor(struct dmaster_defer_queue *ddq)
{
while (ddq->deferred_calls != NULL) {
struct dmaster_defer_call *call = ddq->deferred_calls;
DLIST_REMOVE(ddq->deferred_calls, call);
talloc_steal(call->ctdb, call);
tevent_add_timer(call->ctdb->ev, call, timeval_zero(),
dmaster_defer_reprocess, call);
}
return 0;
}
static void *insert_ddq_callback(void *parm, void *data)
{
if (data) {
talloc_free(data);
}
return parm;
}
/**
* This function is used to reigster a key in database that needs to be updated.
* Any requests for that key should get deferred till this is completed.
*/
static int dmaster_defer_setup(struct ctdb_db_context *ctdb_db,
struct ctdb_req_header *hdr,
TDB_DATA key)
{
uint32_t *k;
struct dmaster_defer_queue *ddq;
k = ctdb_key_to_idkey(hdr, key);
if (k == NULL) {
DEBUG(DEBUG_ERR, ("Failed to allocate key for dmaster defer setup\n"));
return -1;
}
/* Already exists */
ddq = trbt_lookuparray32(ctdb_db->defer_dmaster, k[0], k);
if (ddq != NULL) {
talloc_free(k);
return 0;
}
ddq = talloc(hdr, struct dmaster_defer_queue);
if (ddq == NULL) {
DEBUG(DEBUG_ERR, ("Failed to allocate dmaster defer queue\n"));
talloc_free(k);
return -1;
}
ddq->deferred_calls = NULL;
trbt_insertarray32_callback(ctdb_db->defer_dmaster, k[0], k,
insert_ddq_callback, ddq);
talloc_set_destructor(ddq, dmaster_defer_queue_destructor);
talloc_free(k);
return 0;
}
static int dmaster_defer_add(struct ctdb_db_context *ctdb_db,
struct ctdb_req_header *hdr,
TDB_DATA key)
{
struct dmaster_defer_queue *ddq;
struct dmaster_defer_call *call;
uint32_t *k;
k = ctdb_key_to_idkey(hdr, key);
if (k == NULL) {
DEBUG(DEBUG_ERR, ("Failed to allocate key for dmaster defer add\n"));
return -1;
}
ddq = trbt_lookuparray32(ctdb_db->defer_dmaster, k[0], k);
if (ddq == NULL) {
talloc_free(k);
return -1;
}
talloc_free(k);
call = talloc(ddq, struct dmaster_defer_call);
if (call == NULL) {
DEBUG(DEBUG_ERR, ("Failed to allocate dmaster defer call\n"));
return -1;
}
call->ctdb = ctdb_db->ctdb;
call->hdr = talloc_steal(call, hdr);
DLIST_ADD_END(ddq->deferred_calls, call, NULL);
return 0;
}
/*
called when a CTDB_REQ_DMASTER packet comes in
@ -446,6 +564,8 @@ void ctdb_request_dmaster(struct ctdb_context *ctdb, struct ctdb_req_header *hdr
return;
}
dmaster_defer_setup(ctdb_db, hdr, key);
/* fetch the current record */
ret = ctdb_ltdb_lock_fetch_requeue(ctdb_db, key, &header, hdr, &data2,
ctdb_call_input_pkt, ctdb, false);
@ -760,6 +880,10 @@ void ctdb_request_call(struct ctdb_context *ctdb, struct ctdb_req_header *hdr)
}
}
if (dmaster_defer_add(ctdb_db, hdr, call->key) == 0) {
talloc_free(call);
return;
}
/* determine if we are the dmaster for this key. This also
fetches the record data (if any), thus avoiding a 2nd fetch of the data
@ -1110,6 +1234,8 @@ void ctdb_reply_dmaster(struct ctdb_context *ctdb, struct ctdb_req_header *hdr)
sizeof(record_flags));
}
dmaster_defer_setup(ctdb_db, hdr, key);
ret = ctdb_ltdb_lock_requeue(ctdb_db, key, hdr,
ctdb_call_input_pkt, ctdb, false);
if (ret == -2) {

View File

@ -955,6 +955,14 @@ again:
return -1;
}
ctdb_db->defer_dmaster = trbt_create(ctdb_db, 0);
if (ctdb_db->defer_dmaster == NULL) {
DEBUG(DEBUG_ERR, ("Failed to create defer dmaster rb tree for %s\n",
ctdb_db->db_name));
talloc_free(ctdb_db);
return -1;
}
DLIST_ADD(ctdb->db_list, ctdb_db);
/* setting this can help some high churn databases */