1
0
mirror of https://github.com/samba-team/samba.git synced 2024-12-28 07:21:54 +03:00

all persistent databases now do all stores via automatic transactions

(This used to be commit 76fbe56e82)
This commit is contained in:
Andrew Tridgell 2008-08-07 19:14:16 +10:00 committed by Michael Adam
parent 4d76ed4f38
commit 5031f2a6e2

View File

@ -34,6 +34,7 @@ struct db_ctdb_transaction_handle {
};
struct db_ctdb_ctx {
struct db_context *db;
struct tdb_wrap *wtdb;
uint32 db_id;
struct db_ctdb_transaction_handle *transaction;
@ -400,6 +401,40 @@ static struct db_record *db_ctdb_fetch_locked_transaction(struct db_ctdb_ctx *ct
return result;
}
static int db_ctdb_record_destructor(struct db_record *rec)
{
struct db_ctdb_transaction_handle *h = talloc_get_type_abort(
rec->private_data, struct db_ctdb_transaction_handle);
h->ctx->db->transaction_cancel(h->ctx->db);
return 0;
}
/*
auto-create a transaction for persistent databases
*/
static struct db_record *db_ctdb_fetch_locked_persistent(struct db_ctdb_ctx *ctx,
TALLOC_CTX *mem_ctx,
TDB_DATA key)
{
int res;
struct db_record *rec;
res = db_ctdb_transaction_start(ctx->db);
if (res == -1) {
return NULL;
}
rec = db_ctdb_fetch_locked_transaction(ctx, mem_ctx, key);
if (rec == NULL) {
ctx->db->transaction_cancel(ctx->db);
return NULL;
}
/* destroy this transaction when we release the lock */
talloc_set_destructor((struct db_record *)talloc_new(rec), db_ctdb_record_destructor);
return rec;
}
/*
stores a record inside a transaction
@ -681,130 +716,6 @@ static NTSTATUS db_ctdb_store(struct db_record *rec, TDB_DATA data, int flag)
}
/* for persistent databases the store is a bit different. We have to
ask the ctdb daemon to push the record to all nodes after the
store */
static NTSTATUS db_ctdb_store_persistent(struct db_record *rec, TDB_DATA data, int flag)
{
struct db_ctdb_rec *crec;
struct db_record *record;
TDB_DATA cdata;
int ret;
NTSTATUS status;
uint32_t count;
int max_retries = lp_parm_int(-1, "dbwrap ctdb", "max store retries", 5);
for (count = 0, status = NT_STATUS_UNSUCCESSFUL, record = rec;
(count < max_retries) && !NT_STATUS_IS_OK(status);
count++)
{
if (count > 0) {
/* retry */
/*
* There is a hack here: We use rec as a memory
* context and re-use it as the record struct ptr.
* We don't free the record data allocated
* in each turn. So all gets freed when the caller
* releases the original record. This is because
* we don't get the record passed in by reference
* in the first place and the caller relies on
* having to free the record himself.
*/
record = fetch_locked_internal(crec->ctdb_ctx,
rec,
rec->key,
true /* persistent */);
if (record == NULL) {
DEBUG(5, ("fetch_locked_internal failed.\n"));
status = NT_STATUS_NO_MEMORY;
break;
}
}
crec = talloc_get_type_abort(record->private_data,
struct db_ctdb_rec);
cdata.dsize = sizeof(crec->header) + data.dsize;
if (!(cdata.dptr = SMB_MALLOC_ARRAY(uint8, cdata.dsize))) {
return NT_STATUS_NO_MEMORY;
}
crec->header.rsn++;
memcpy(cdata.dptr, &crec->header, sizeof(crec->header));
memcpy(cdata.dptr + sizeof(crec->header), data.dptr, data.dsize);
status = ctdbd_start_persistent_update(
messaging_ctdbd_connection(),
crec->ctdb_ctx->db_id,
rec->key,
cdata);
if (NT_STATUS_IS_OK(status)) {
ret = tdb_store(crec->ctdb_ctx->wtdb->tdb, rec->key,
cdata, TDB_REPLACE);
status = (ret == 0)
? NT_STATUS_OK
: tdb_error_to_ntstatus(
crec->ctdb_ctx->wtdb->tdb);
}
/*
* release the lock *now* in order to prevent deadlocks.
*
* There is a tradeoff: Usually, the record is still locked
* after db->store operation. This lock is usually released
* via the talloc destructor with the TALLOC_FREE to
* the record. So we have two choices:
*
* - Either re-lock the record after the call to persistent_store
* or cancel_persistent update and this way not changing any
* assumptions callers may have about the state, but possibly
* introducing new race conditions.
*
* - Or don't lock the record again but just remove the
* talloc_destructor. This is less racy but assumes that
* the lock is always released via TALLOC_FREE of the record.
*
* I choose the first variant for now since it seems less racy.
* We can't guarantee that we succeed in getting the lock
* anyways. The only real danger here is that a caller
* performs multiple store operations after a fetch_locked()
* which is currently not the case.
*/
tdb_chainunlock(crec->ctdb_ctx->wtdb->tdb, rec->key);
talloc_set_destructor(record, NULL);
/* now tell ctdbd to update this record on all other nodes */
if (NT_STATUS_IS_OK(status)) {
status = ctdbd_persistent_store(
messaging_ctdbd_connection(),
crec->ctdb_ctx->db_id,
rec->key,
cdata);
} else {
ctdbd_cancel_persistent_update(
messaging_ctdbd_connection(),
crec->ctdb_ctx->db_id,
rec->key,
cdata);
break;
}
SAFE_FREE(cdata.dptr);
} /* retry-loop */
if (!NT_STATUS_IS_OK(status)) {
DEBUG(5, ("ctdbd_persistent_store failed after "
"%d retries with error %s - giving up.\n",
count, nt_errstr(status)));
}
SAFE_FREE(cdata.dptr);
return status;
}
static NTSTATUS db_ctdb_delete(struct db_record *rec)
{
@ -821,21 +732,6 @@ static NTSTATUS db_ctdb_delete(struct db_record *rec)
}
static NTSTATUS db_ctdb_delete_persistent(struct db_record *rec)
{
TDB_DATA data;
/*
* We have to store the header with empty data. TODO: Fix the
* tdb-level cleanup
*/
ZERO_STRUCT(data);
return db_ctdb_store_persistent(rec, data, 0);
}
static int db_ctdb_record_destr(struct db_record* data)
{
struct db_ctdb_rec *crec = talloc_get_type_abort(
@ -867,10 +763,6 @@ static struct db_record *fetch_locked_internal(struct db_ctdb_ctx *ctx,
TDB_DATA ctdb_data;
int migrate_attempts = 0;
if (ctx->transaction != NULL) {
return db_ctdb_fetch_locked_transaction(ctx, mem_ctx, key);
}
if (!(result = talloc(mem_ctx, struct db_record))) {
DEBUG(0, ("talloc failed\n"));
return NULL;
@ -913,13 +805,8 @@ again:
return NULL;
}
if (persistent) {
result->store = db_ctdb_store_persistent;
result->delete_rec = db_ctdb_delete_persistent;
} else {
result->store = db_ctdb_store;
result->delete_rec = db_ctdb_delete;
}
result->store = db_ctdb_store;
result->delete_rec = db_ctdb_delete;
talloc_set_destructor(result, db_ctdb_record_destr);
ctdb_data = tdb_fetch(ctx->wtdb->tdb, key);
@ -988,6 +875,14 @@ static struct db_record *db_ctdb_fetch_locked(struct db_context *db,
struct db_ctdb_ctx *ctx = talloc_get_type_abort(db->private_data,
struct db_ctdb_ctx);
if (ctx->transaction != NULL) {
return db_ctdb_fetch_locked_transaction(ctx, mem_ctx, key);
}
if (db->persistent) {
return db_ctdb_fetch_locked_persistent(ctx, mem_ctx, key);
}
return fetch_locked_internal(ctx, mem_ctx, key, db->persistent);
}
@ -1210,6 +1105,7 @@ struct db_context *db_open_ctdb(TALLOC_CTX *mem_ctx,
}
db_ctdb->transaction = NULL;
db_ctdb->db = result;
if (!NT_STATUS_IS_OK(ctdbd_db_attach(messaging_ctdbd_connection(),name, &db_ctdb->db_id, tdb_flags))) {
DEBUG(0, ("ctdbd_db_attach failed for %s\n", name));