1
0
mirror of https://github.com/samba-team/samba.git synced 2024-12-22 13:34:15 +03:00

added new multi-record transaction commit code

(This used to be ctdb commit 9ff3380099fe6f4d39de126db0826971a10ee692)
This commit is contained in:
Andrew Tridgell 2008-07-30 19:57:00 +10:00
parent bc1aed395c
commit 98502135e7
3 changed files with 189 additions and 53 deletions

View File

@ -430,7 +430,6 @@ struct ctdb_db_context {
struct ctdb_registered_call *calls; /* list of registered calls */
uint32_t seqnum;
struct timed_event *te;
uint32_t client_tdb_flags;
};
@ -547,6 +546,9 @@ enum ctdb_controls {CTDB_CONTROL_PROCESS_EXISTS = 0,
CTDB_CONTROL_GET_CAPABILITIES = 80,
CTDB_CONTROL_START_PERSISTENT_UPDATE = 81,
CTDB_CONTROL_CANCEL_PERSISTENT_UPDATE= 82,
CTDB_CONTROL_TRANS2_COMMIT = 83,
CTDB_CONTROL_TRANS2_FINISHED = 84,
CTDB_CONTROL_TRANS2_ERROR = 85,
};
/*
@ -813,8 +815,6 @@ int ctdb_ltdb_fetch(struct ctdb_db_context *ctdb_db,
TALLOC_CTX *mem_ctx, TDB_DATA *data);
int ctdb_ltdb_store(struct ctdb_db_context *ctdb_db, TDB_DATA key,
struct ctdb_ltdb_header *header, TDB_DATA data);
int ctdb_ltdb_persistent_store(struct ctdb_db_context *ctdb_db, TDB_DATA key,
struct ctdb_ltdb_header *header, TDB_DATA data);
int32_t ctdb_control_start_persistent_update(struct ctdb_context *ctdb,
struct ctdb_req_control *c,
TDB_DATA recdata);
@ -1121,6 +1121,11 @@ int32_t ctdb_ltdb_set_seqnum_frequency(struct ctdb_context *ctdb, uint32_t frequ
struct ctdb_rec_data *ctdb_marshall_record(TALLOC_CTX *mem_ctx, uint32_t reqid,
TDB_DATA key, struct ctdb_ltdb_header *, TDB_DATA data);
struct ctdb_rec_data *ctdb_marshall_loop_next(struct ctdb_marshall_buffer *m, struct ctdb_rec_data *r,
uint32_t *reqid,
struct ctdb_ltdb_header *header,
TDB_DATA *key, TDB_DATA *data);
int32_t ctdb_control_pull_db(struct ctdb_context *ctdb, TDB_DATA indata, TDB_DATA *outdata);
int32_t ctdb_control_push_db(struct ctdb_context *ctdb, TDB_DATA indata);
int32_t ctdb_control_set_dmaster(struct ctdb_context *ctdb, TDB_DATA indata);
@ -1308,6 +1313,9 @@ int32_t ctdb_control_persistent_store(struct ctdb_context *ctdb,
int32_t ctdb_control_update_record(struct ctdb_context *ctdb,
struct ctdb_req_control *c, TDB_DATA recdata,
bool *async_reply);
int32_t ctdb_control_trans2_commit(struct ctdb_context *ctdb,
struct ctdb_req_control *c,
TDB_DATA recdata, bool *async_reply);
int32_t ctdb_control_transaction_start(struct ctdb_context *ctdb, uint32_t id);
int32_t ctdb_control_transaction_commit(struct ctdb_context *ctdb, uint32_t id);
@ -1353,4 +1361,9 @@ int ctdb_control_reload_nodes_file(struct ctdb_context *ctdb, uint32_t opcode);
int32_t ctdb_dump_memory(struct ctdb_context *ctdb, TDB_DATA *outdata);
int32_t ctdb_control_get_capabilities(struct ctdb_context *ctdb, TDB_DATA *outdata);
int32_t ctdb_control_trans2_finished(struct ctdb_context *ctdb,
struct ctdb_req_control *c);
int32_t ctdb_control_trans2_error(struct ctdb_context *ctdb,
struct ctdb_req_control *c);
#endif

View File

@ -400,6 +400,15 @@ static int32_t ctdb_control_dispatch(struct ctdb_context *ctdb,
case CTDB_CONTROL_CANCEL_PERSISTENT_UPDATE:
return ctdb_control_cancel_persistent_update(ctdb, c, indata);
case CTDB_CONTROL_TRANS2_COMMIT:
return ctdb_control_trans2_commit(ctdb, c, indata, async_reply);
case CTDB_CONTROL_TRANS2_ERROR:
return ctdb_control_trans2_error(ctdb, c);
case CTDB_CONTROL_TRANS2_FINISHED:
return ctdb_control_trans2_finished(ctdb, c);
default:
DEBUG(DEBUG_CRIT,(__location__ " Unknown CTDB control opcode %u\n", opcode));
return -1;

View File

@ -50,6 +50,8 @@ static void ctdb_persistent_callback(struct ctdb_context *ctdb,
status, errormsg));
state->status = status;
state->errormsg = errormsg;
DEBUG(DEBUG_ERR,(__location__ " Forcing recovery\n"));
ctdb->recovery_mode = CTDB_RECOVERY_ACTIVE;
}
state->num_pending--;
if (state->num_pending == 0) {
@ -67,19 +69,21 @@ static void ctdb_persistent_store_timeout(struct event_context *ev, struct timed
struct ctdb_persistent_state *state = talloc_get_type(private_data, struct ctdb_persistent_state);
ctdb_request_control_reply(state->ctdb, state->c, NULL, -1, "timeout in ctdb_persistent_state");
DEBUG(DEBUG_ERR,(__location__ " Forcing recovery\n"));
state->ctdb->recovery_mode = CTDB_RECOVERY_ACTIVE;
talloc_free(state);
}
/*
store a persistent record - called from a ctdb client when it has updated
a record in a persistent database. The client will have the record
store a set of persistent records - called from a ctdb client when it has updated
some records in a persistent database. The client will have the record
locked for the duration of this call. The client is the dmaster when
this call is made
*/
int32_t ctdb_control_persistent_store(struct ctdb_context *ctdb,
struct ctdb_req_control *c,
TDB_DATA recdata, bool *async_reply)
int32_t ctdb_control_trans2_commit(struct ctdb_context *ctdb,
struct ctdb_req_control *c,
TDB_DATA recdata, bool *async_reply)
{
struct ctdb_client *client = ctdb_reqid_find(ctdb, c->client_id, struct ctdb_client);
struct ctdb_persistent_state *state;
@ -89,8 +93,26 @@ int32_t ctdb_control_persistent_store(struct ctdb_context *ctdb,
DEBUG(DEBUG_ERR,(__location__ " can not match persistent_store to a client. Returning error\n"));
return -1;
}
if (client->num_persistent_updates > 0) {
client->num_persistent_updates--;
/* handling num_persistent_updates is a bit strange -
there are 3 cases
1) very old clients, which never called CTDB_CONTROL_START_PERSISTENT_UPDATE
They don't expect num_persistent_updates to be used at all
2) less old clients, which uses CTDB_CONTROL_START_PERSISTENT_UPDATE, and expected
this commit to then decrement it
3) new clients which use TRANS2 commit functions, and
expect this function to increment the counter, and
then have it decremented in ctdb_control_trans2_error
or ctdb_control_trans2_finished
*/
if (c->opcode == CTDB_CONTROL_PERSISTENT_STORE) {
if (client->num_persistent_updates > 0) {
client->num_persistent_updates--;
}
} else {
client->num_persistent_updates++;
}
state = talloc_zero(ctdb, struct ctdb_persistent_state);
@ -147,10 +169,7 @@ int32_t ctdb_control_persistent_store(struct ctdb_context *ctdb,
struct ctdb_persistent_write_state {
struct ctdb_db_context *ctdb_db;
TDB_DATA key;
TDB_DATA data;
struct ctdb_ltdb_header *header;
struct tdb_context *tdb;
struct ctdb_marshall_buffer *m;
struct ctdb_req_control *c;
};
@ -160,32 +179,65 @@ struct ctdb_persistent_write_state {
*/
static int ctdb_persistent_store(struct ctdb_persistent_write_state *state)
{
struct ctdb_ltdb_header oldheader;
int ret;
int ret, i;
struct ctdb_rec_data *rec = NULL;
struct ctdb_marshall_buffer *m = state->m;
/* fetch the old header and ensure the rsn is less than the new rsn */
ret = ctdb_ltdb_fetch(state->ctdb_db, state->key, &oldheader, NULL, NULL);
if (ret != 0) {
DEBUG(DEBUG_ERR,("Failed to fetch old record for db_id 0x%08x in ctdb_persistent_store\n",
state->ctdb_db->db_id));
ret = tdb_transaction_start(state->ctdb_db->ltdb->tdb);
if (ret == -1) {
DEBUG(DEBUG_ERR,("Failed to start transaction for db_id 0x%08x in ctdb_persistent_store\n",
state->ctdb_db->db_id));
return -1;
}
if (oldheader.rsn >= state->header->rsn) {
DEBUG(DEBUG_CRIT,("existing header for db_id 0x%08x has larger RSN %llu than new RSN %llu in ctdb_persistent_store\n",
state->ctdb_db->db_id,
(unsigned long long)oldheader.rsn, (unsigned long long)state->header->rsn));
return -1;
for (i=0;i<m->count;i++) {
struct ctdb_ltdb_header oldheader;
struct ctdb_ltdb_header header;
TDB_DATA key, data;
rec = ctdb_marshall_loop_next(m, rec, NULL, &header, &key, &data);
if (rec == NULL) {
DEBUG(DEBUG_ERR,("Failed to get next record %d for db_id 0x%08x in ctdb_persistent_store\n",
i, state->ctdb_db->db_id));
goto failed;
}
/* fetch the old header and ensure the rsn is less than the new rsn */
ret = ctdb_ltdb_fetch(state->ctdb_db, key, &oldheader, NULL, NULL);
if (ret != 0) {
DEBUG(DEBUG_ERR,("Failed to fetch old record for db_id 0x%08x in ctdb_persistent_store\n",
state->ctdb_db->db_id));
goto failed;
}
if (oldheader.rsn >= header.rsn) {
DEBUG(DEBUG_CRIT,("existing header for db_id 0x%08x has larger RSN %llu than new RSN %llu in ctdb_persistent_store\n",
state->ctdb_db->db_id,
(unsigned long long)oldheader.rsn, (unsigned long long)header.rsn));
goto failed;
}
ret = ctdb_ltdb_store(state->ctdb_db, key, &header, data);
if (ret != 0) {
DEBUG(DEBUG_CRIT,("Failed to store record for db_id 0x%08x in ctdb_persistent_store\n",
state->ctdb_db->db_id));
return -1;
}
}
ret = ctdb_ltdb_persistent_store(state->ctdb_db, state->key, state->header, state->data);
if (ret != 0) {
DEBUG(DEBUG_CRIT,("Failed to store record for db_id 0x%08x in ctdb_persistent_store\n",
state->ctdb_db->db_id));
ret = tdb_transaction_commit(state->ctdb_db->ltdb->tdb);
if (ret == -1) {
DEBUG(DEBUG_ERR,("Failed to commit transaction for db_id 0x%08x in ctdb_persistent_store\n",
state->ctdb_db->db_id));
return -1;
}
return 0;
failed:
tdb_transaction_cancel(state->ctdb_db->ltdb->tdb);
return -1;
}
@ -357,20 +409,19 @@ int32_t ctdb_control_update_record(struct ctdb_context *ctdb,
struct ctdb_req_control *c, TDB_DATA recdata,
bool *async_reply)
{
struct ctdb_rec_data *rec = (struct ctdb_rec_data *)&recdata.dptr[0];
struct ctdb_db_context *ctdb_db;
uint32_t db_id = rec->reqid;
struct ctdb_persistent_write_state *state;
struct childwrite_handle *handle;
struct ctdb_marshall_buffer *m = (struct ctdb_marshall_buffer *)recdata.dptr;
if (ctdb->recovery_mode != CTDB_RECOVERY_NORMAL) {
DEBUG(DEBUG_DEBUG,("rejecting ctdb_control_update_record when recovery active\n"));
return -1;
}
ctdb_db = find_ctdb_db(ctdb, db_id);
ctdb_db = find_ctdb_db(ctdb, m->db_id);
if (ctdb_db == NULL) {
DEBUG(DEBUG_ERR,("Unknown database 0x%08x in ctdb_control_update_record\n", db_id));
DEBUG(DEBUG_ERR,("Unknown database 0x%08x in ctdb_control_update_record\n", m->db_id));
return -1;
}
@ -379,23 +430,7 @@ int32_t ctdb_control_update_record(struct ctdb_context *ctdb,
state->ctdb_db = ctdb_db;
state->c = c;
state->tdb = ctdb_db->ltdb->tdb;
state->key.dptr = &rec->data[0];
state->key.dsize = rec->keylen;
state->data.dptr = &rec->data[rec->keylen];
state->data.dsize = rec->datalen;
if (state->data.dsize < sizeof(struct ctdb_ltdb_header)) {
DEBUG(DEBUG_CRIT,("Invalid data size %u in ctdb_control_update_record\n",
(unsigned)state->data.dsize));
talloc_free(state);
return -1;
}
state->header = (struct ctdb_ltdb_header *)&state->data.dptr[0];
state->data.dptr += sizeof(struct ctdb_ltdb_header);
state->data.dsize -= sizeof(struct ctdb_ltdb_header);
state->m = m;
/* create a child process to take out a transaction and
write the data.
@ -421,8 +456,49 @@ int32_t ctdb_control_update_record(struct ctdb_context *ctdb,
}
/*
called when a client has finished a local commit in a transaction to
a persistent database
*/
int32_t ctdb_control_trans2_finished(struct ctdb_context *ctdb,
struct ctdb_req_control *c)
{
struct ctdb_client *client = ctdb_reqid_find(ctdb, c->client_id, struct ctdb_client);
if (client->num_persistent_updates == 0) {
DEBUG(DEBUG_ERR, (__location__ " ERROR: num_persistent_updates == 0\n"));
return -1;
}
client->num_persistent_updates--;
return 0;
}
/*
called when a client gets an error committing its database
during a transaction commit
*/
int32_t ctdb_control_trans2_error(struct ctdb_context *ctdb,
struct ctdb_req_control *c)
{
struct ctdb_client *client = ctdb_reqid_find(ctdb, c->client_id, struct ctdb_client);
if (client->num_persistent_updates == 0) {
DEBUG(DEBUG_ERR, (__location__ " ERROR: num_persistent_updates == 0\n"));
return -1;
}
client->num_persistent_updates--;
DEBUG(DEBUG_ERR,(__location__ " Forcing recovery\n"));
client->ctdb->recovery_mode = CTDB_RECOVERY_ACTIVE;
return 0;
}
/*
backwards compatibility:
start a persistent store operation. passing both the key, header and
data to the daemon. If the client disconnects before it has issued
a persistent_update call to the daemon we trigger a full recovery
@ -445,9 +521,14 @@ int32_t ctdb_control_start_persistent_update(struct ctdb_context *ctdb,
return 0;
}
/*
backwards compatibility:
called to tell ctdbd that it is no longer doing a persistent update
*/
int32_t ctdb_control_cancel_persistent_update(struct ctdb_context *ctdb,
struct ctdb_req_control *c,
TDB_DATA recdata)
struct ctdb_req_control *c,
TDB_DATA recdata)
{
struct ctdb_client *client = ctdb_reqid_find(ctdb, c->client_id, struct ctdb_client);
@ -462,3 +543,36 @@ int32_t ctdb_control_cancel_persistent_update(struct ctdb_context *ctdb,
return 0;
}
/*
backwards compatibility:
single record varient of ctdb_control_trans2_commit for older clients
*/
int32_t ctdb_control_persistent_store(struct ctdb_context *ctdb,
struct ctdb_req_control *c,
TDB_DATA recdata, bool *async_reply)
{
struct ctdb_marshall_buffer *m;
struct ctdb_rec_data *rec = (struct ctdb_rec_data *)recdata.dptr;
TDB_DATA key, data;
if (recdata.dsize != offsetof(struct ctdb_rec_data, data) +
rec->keylen + rec->datalen) {
DEBUG(DEBUG_ERR, (__location__ " Bad data size in recdata\n"));
return -1;
}
key.dptr = &rec->data[0];
key.dsize = rec->keylen;
data.dptr = &rec->data[rec->keylen];
data.dsize = rec->datalen;
m = ctdb_marshall_add(c, NULL, rec->reqid, rec->reqid, key, NULL, data);
CTDB_NO_MEMORY(ctdb, m);
return ctdb_control_trans2_commit(ctdb, c, ctdb_marshall_finish(m), async_reply);
}