From ddbe31e5aa866b29ee04368e080f61acb9a98fa8 Mon Sep 17 00:00:00 2001 From: Andrew Tridgell Date: Mon, 14 Jan 2008 15:11:10 +1100 Subject: [PATCH] fixed the bug that caused tdbtorture to fail It was an error in the new transaction code (This used to be ctdb commit 27f0dfdfb93d92859de3cbbd3874cfb38c13a169) --- ctdb/lib/tdb/common/tdb.c | 47 +++++++++++++++++++ ctdb/lib/tdb/common/transaction.c | 75 +++++++++++++++++++++++++++++++ ctdb/lib/tdb/include/tdb.h | 1 + 3 files changed, 123 insertions(+) diff --git a/ctdb/lib/tdb/common/tdb.c b/ctdb/lib/tdb/common/tdb.c index bf3abb71ac9..fd4e1cc8af2 100644 --- a/ctdb/lib/tdb/common/tdb.c +++ b/ctdb/lib/tdb/common/tdb.c @@ -715,6 +715,11 @@ int tdb_wipe_all(struct tdb_context *tdb) goto failed; } + if (tdb_ofs_write(tdb, TDB_RECOVERY_HEAD, &offset) == -1) { + TDB_LOG((tdb, TDB_DEBUG_FATAL,"tdb_wipe_all: failed to write recovery head\n")); + goto failed; + } + /* add all the rest of the file to the freelist */ data_len = (tdb->map_size - TDB_DATA_START(tdb->header.hash_size)) - sizeof(struct list_struct); if (data_len > 0) { @@ -738,3 +743,45 @@ failed: tdb_unlockall(tdb); return -1; } + + +/* + validate the integrity of all tdb hash chains. Useful when debugging + */ +int tdb_validate(struct tdb_context *tdb) +{ + int h; + for (h=-1;h<(int)tdb->header.hash_size;h++) { + tdb_off_t rec_ptr; + uint32_t count = 0; + if (tdb_ofs_read(tdb, TDB_HASH_TOP(h), &rec_ptr) == -1) { + TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_validate: failed ofs_read at top of hash %d\n", h)); + return -1; + } + while (rec_ptr) { + struct list_struct r; + tdb_off_t size; + + if (tdb_rec_read(tdb, rec_ptr, &r) == -1) { + TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_validate: failed rec_read h=%d rec_ptr=%u count=%u\n", + h, rec_ptr, count)); + return -1; + } + if (tdb_ofs_read(tdb, rec_ptr + sizeof(r) + r.rec_len - sizeof(tdb_off_t), &size) == -1) { + TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_validate: failed ofs_read h=%d rec_ptr=%u count=%u\n", + h, rec_ptr, count)); + return -1; + } + if (size != r.rec_len + sizeof(r)) { + TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_validate: failed size check size=%u h=%d rec_ptr=%u count=%u\n", + size, h, rec_ptr, count)); + return -1; + } + rec_ptr = r.next; + count++; + } + } + return 0; +} + + diff --git a/ctdb/lib/tdb/common/transaction.c b/ctdb/lib/tdb/common/transaction.c index 2da09face02..0ecfb9b7ff6 100644 --- a/ctdb/lib/tdb/common/transaction.c +++ b/ctdb/lib/tdb/common/transaction.c @@ -87,6 +87,7 @@ */ + /* hold the context of any current transaction */ @@ -280,6 +281,63 @@ fail: return -1; } + +/* + write while in a transaction - this varient never expands the transaction blocks, it only + updates existing blocks. This means it cannot change the recovery size +*/ +static int transaction_write_existing(struct tdb_context *tdb, tdb_off_t off, + const void *buf, tdb_len_t len) +{ + uint32_t blk; + + /* break it up into block sized chunks */ + while (len + (off % tdb->transaction->block_size) > tdb->transaction->block_size) { + tdb_len_t len2 = tdb->transaction->block_size - (off % tdb->transaction->block_size); + if (transaction_write_existing(tdb, off, buf, len2) != 0) { + return -1; + } + len -= len2; + off += len2; + if (buf != NULL) { + buf = (const void *)(len2 + (const char *)buf); + } + } + + if (len == 0) { + return 0; + } + + blk = off / tdb->transaction->block_size; + off = off % tdb->transaction->block_size; + + if (tdb->transaction->num_blocks <= blk || + tdb->transaction->blocks[blk] == NULL) { + return 0; + } + + /* overwrite part of an existing block */ + if (buf == NULL) { + memset(tdb->transaction->blocks[blk] + off, 0, len); + } else { + memcpy(tdb->transaction->blocks[blk] + off, buf, len); + } + if (blk == tdb->transaction->num_blocks-1) { + if (len + off > tdb->transaction->last_block_size) { + tdb->transaction->last_block_size = len + off; + } + } + + return 0; + +fail: + TDB_LOG((tdb, TDB_DEBUG_FATAL, "transaction_write: failed at off=%d len=%d\n", + (blk*tdb->transaction->block_size) + off, len)); + tdb->transaction->transaction_error = 1; + return -1; +} + + /* accelerated hash chain head search, using the cached hash heads */ @@ -629,6 +687,10 @@ static int tdb_recovery_allocate(struct tdb_context *tdb, TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_recovery_allocate: failed to write recovery head\n")); return -1; } + if (transaction_write_existing(tdb, TDB_RECOVERY_HEAD, &recovery_head, sizeof(tdb_off_t)) == -1) { + TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_recovery_allocate: failed to write recovery head\n")); + return -1; + } return 0; } @@ -726,6 +788,12 @@ static int transaction_setup_recovery(struct tdb_context *tdb, tdb->ecode = TDB_ERR_IO; return -1; } + if (transaction_write_existing(tdb, recovery_offset, data, sizeof(*rec) + recovery_size) == -1) { + TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_setup_recovery: failed to write secondary recovery data\n")); + free(data); + tdb->ecode = TDB_ERR_IO; + return -1; + } /* as we don't have ordered writes, we have to sync the recovery data before we update the magic to indicate that the recovery @@ -747,6 +815,11 @@ static int transaction_setup_recovery(struct tdb_context *tdb, tdb->ecode = TDB_ERR_IO; return -1; } + if (transaction_write_existing(tdb, *magic_offset, &magic, sizeof(magic)) == -1) { + TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_setup_recovery: failed to write secondary recovery magic\n")); + tdb->ecode = TDB_ERR_IO; + return -1; + } /* ensure the recovery magic marker is on disk */ if (transaction_sync(tdb, *magic_offset, sizeof(magic)) == -1) { @@ -778,6 +851,7 @@ int tdb_transaction_commit(struct tdb_context *tdb) return -1; } + if (tdb->transaction->nesting != 0) { tdb->transaction->nesting--; return 0; @@ -916,6 +990,7 @@ int tdb_transaction_commit(struct tdb_context *tdb) /* use a transaction cancel to free memory and remove the transaction locks */ tdb_transaction_cancel(tdb); + return 0; } diff --git a/ctdb/lib/tdb/include/tdb.h b/ctdb/lib/tdb/include/tdb.h index 371381049e9..0058d55793e 100644 --- a/ctdb/lib/tdb/include/tdb.h +++ b/ctdb/lib/tdb/include/tdb.h @@ -157,6 +157,7 @@ int tdb_printfreelist(struct tdb_context *tdb); int tdb_validate_freelist(struct tdb_context *tdb, int *pnum_entries); int tdb_wipe_all(struct tdb_context *tdb); int tdb_freelist_size(struct tdb_context *tdb); +int tdb_validate(struct tdb_context *tdb); extern TDB_DATA tdb_null;