diff --git a/ctdb/client/ctdb_client.c b/ctdb/client/ctdb_client.c index 2666ca2a31e..3d9c6ed5ffc 100644 --- a/ctdb/client/ctdb_client.c +++ b/ctdb/client/ctdb_client.c @@ -3994,6 +3994,295 @@ static bool g_lock_unlock(TALLOC_CTX *mem_ctx, return true; } + +struct ctdb_transaction_handle { + struct ctdb_db_context *ctdb_db; + struct ctdb_db_context *g_lock_db; + char *lock_name; + uint32_t reqid; + /* + * we store reads and writes done under a transaction: + * - one list stores both reads and writes (m_all) + * - the other just writes (m_write) + */ + struct ctdb_marshall_buffer *m_all; + struct ctdb_marshall_buffer *m_write; +}; + +static int ctdb_transaction_destructor(struct ctdb_transaction_handle *h) +{ + g_lock_unlock(h, h->g_lock_db, h->lock_name, h->reqid); + ctdb_reqid_remove(h->ctdb_db->ctdb, h->reqid); + return 0; +} + + +/** + * start a transaction on a database + */ +struct ctdb_transaction_handle *ctdb_transaction_start(struct ctdb_db_context *ctdb_db, + TALLOC_CTX *mem_ctx) +{ + struct ctdb_transaction_handle *h; + struct ctdb_server_id id; + + h = talloc_zero(mem_ctx, struct ctdb_transaction_handle); + if (h == NULL) { + DEBUG(DEBUG_ERR, (__location__ " memory allocation error\n")); + return NULL; + } + + h->ctdb_db = ctdb_db; + h->lock_name = talloc_asprintf(h, "transaction_db_0x%08x", + (unsigned int)ctdb_db->db_id); + if (h->lock_name == NULL) { + DEBUG(DEBUG_ERR, (__location__ " talloc asprintf failed\n")); + talloc_free(h); + return NULL; + } + + h->g_lock_db = ctdb_attach(h->ctdb_db->ctdb, timeval_current_ofs(3,0), + "g_lock.tdb", false, 0); + if (!h->g_lock_db) { + DEBUG(DEBUG_ERR, (__location__ " unable to attach to g_lock.tdb\n")); + talloc_free(h); + return NULL; + } + + id.type = SERVER_TYPE_SAMBA; + id.pnn = ctdb_get_pnn(ctdb_db->ctdb); + id.server_id = getpid(); + + if (ctdb_ctrl_register_server_id(ctdb_db->ctdb, timeval_current_ofs(3,0), + &id) != 0) { + DEBUG(DEBUG_ERR, (__location__ " unable to register server id\n")); + talloc_free(h); + return NULL; + } + + h->reqid = ctdb_reqid_new(h->ctdb_db->ctdb, h); + + if (!g_lock_lock(h, h->g_lock_db, h->lock_name, h->reqid)) { + DEBUG(DEBUG_ERR, (__location__ " Error locking g_lock.tdb\n")); + talloc_free(h); + return NULL; + } + + talloc_set_destructor(h, ctdb_transaction_destructor); + return h; +} + +/** + * fetch a record inside a transaction + */ +int ctdb_transaction_fetch(struct ctdb_transaction_handle *h, + TALLOC_CTX *mem_ctx, + TDB_DATA key, TDB_DATA *data) +{ + struct ctdb_ltdb_header header; + int ret; + + ZERO_STRUCT(header); + + ret = ctdb_ltdb_fetch(h->ctdb_db, key, &header, mem_ctx, data); + if (ret == -1 && header.dmaster == (uint32_t)-1) { + /* record doesn't exist yet */ + *data = tdb_null; + ret = 0; + } + + if (ret != 0) { + return ret; + } + + h->m_all = ctdb_marshall_add(h, h->m_all, h->ctdb_db->db_id, 1, key, NULL, *data); + if (h->m_all == NULL) { + DEBUG(DEBUG_ERR,(__location__ " Failed to add to marshalling record\n")); + return -1; + } + + return 0; +} + +/** + * stores a record inside a transaction + */ +int ctdb_transaction_store(struct ctdb_transaction_handle *h, + TDB_DATA key, TDB_DATA data) +{ + TALLOC_CTX *tmp_ctx = talloc_new(h); + struct ctdb_ltdb_header header; + TDB_DATA olddata; + int ret; + + /* we need the header so we can update the RSN */ + ret = ctdb_ltdb_fetch(h->ctdb_db, key, &header, tmp_ctx, &olddata); + if (ret == -1 && header.dmaster == (uint32_t)-1) { + /* the record doesn't exist - create one with us as dmaster. + This is only safe because we are in a transaction and this + is a persistent database */ + ZERO_STRUCT(header); + } else if (ret != 0) { + DEBUG(DEBUG_ERR,(__location__ " Failed to fetch record\n")); + talloc_free(tmp_ctx); + return ret; + } + + if (data.dsize == olddata.dsize && + memcmp(data.dptr, olddata.dptr, data.dsize) == 0 && + header.rsn != 0) { + /* save writing the same data */ + talloc_free(tmp_ctx); + return 0; + } + + header.dmaster = h->ctdb_db->ctdb->pnn; + header.rsn++; + + h->m_all = ctdb_marshall_add(h, h->m_all, h->ctdb_db->db_id, 0, key, NULL, data); + if (h->m_all == NULL) { + DEBUG(DEBUG_ERR,(__location__ " Failed to add to marshalling record\n")); + talloc_free(tmp_ctx); + return -1; + } + + h->m_write = ctdb_marshall_add(h, h->m_write, h->ctdb_db->db_id, 0, key, &header, data); + if (h->m_write == NULL) { + DEBUG(DEBUG_ERR,(__location__ " Failed to add to marshalling record\n")); + talloc_free(tmp_ctx); + return -1; + } + + talloc_free(tmp_ctx); + return 0; +} + +static int ctdb_fetch_db_seqnum(struct ctdb_db_context *ctdb_db, uint64_t *seqnum) +{ + const char *keyname = CTDB_DB_SEQNUM_KEY; + TDB_DATA key, data; + struct ctdb_ltdb_header header; + int ret; + + key.dptr = (uint8_t *)discard_const(keyname); + key.dsize = strlen(keyname) + 1; + + ret = ctdb_ltdb_fetch(ctdb_db, key, &header, ctdb_db, &data); + if (ret != 0) { + *seqnum = 0; + return 0; + } + + if (data.dsize != sizeof(*seqnum)) { + DEBUG(DEBUG_ERR, (__location__ " Invalid data recived len=%zi\n", + data.dsize)); + talloc_free(data.dptr); + return -1; + } + + *seqnum = *(uint64_t *)data.dptr; + talloc_free(data.dptr); + + return 0; +} + + +static int ctdb_store_db_seqnum(struct ctdb_transaction_handle *h, + uint64_t seqnum) +{ + const char *keyname = CTDB_DB_SEQNUM_KEY; + TDB_DATA key, data; + + key.dptr = (uint8_t *)discard_const(keyname); + key.dsize = strlen(keyname) + 1; + + data.dptr = (uint8_t *)&seqnum; + data.dsize = sizeof(seqnum); + + return ctdb_transaction_store(h, key, data); +} + + +/** + * commit a transaction + */ +int ctdb_transaction_commit(struct ctdb_transaction_handle *h) +{ + int ret; + uint64_t old_seqnum, new_seqnum; + int32_t status; + struct timeval timeout; + + if (h->m_write == NULL) { + /* no changes were made */ + talloc_free(h); + return 0; + } + + ret = ctdb_fetch_db_seqnum(h->ctdb_db, &old_seqnum); + if (ret != 0) { + DEBUG(DEBUG_ERR, (__location__ " failed to fetch db sequence number\n")); + ret = -1; + goto done; + } + + new_seqnum = old_seqnum + 1; + ret = ctdb_store_db_seqnum(h, new_seqnum); + if (ret != 0) { + DEBUG(DEBUG_ERR, (__location__ " failed to store db sequence number\n")); + ret = -1; + goto done; + } + +again: + timeout = timeval_current_ofs(3,0); + ret = ctdb_control(h->ctdb_db->ctdb, CTDB_CURRENT_NODE, + h->ctdb_db->db_id, + CTDB_CONTROL_TRANS3_COMMIT, 0, + ctdb_marshall_finish(h->m_write), NULL, NULL, + &status, &timeout, NULL); + if (ret != 0 || status != 0) { + /* + * TRANS3_COMMIT control will only fail if recovery has been + * triggered. Check if the database has been updated or not. + */ + ret = ctdb_fetch_db_seqnum(h->ctdb_db, &new_seqnum); + if (ret != 0) { + DEBUG(DEBUG_ERR, (__location__ " failed to fetch db sequence number\n")); + goto done; + } + + if (new_seqnum == old_seqnum) { + /* Database not yet updated, try again */ + goto again; + } + + if (new_seqnum != (old_seqnum + 1)) { + DEBUG(DEBUG_ERR, (__location__ " new seqnum [%llu] != old seqnum [%llu] + 1\n", + (long long unsigned)new_seqnum, + (long long unsigned)old_seqnum)); + ret = -1; + goto done; + } + } + + ret = 0; + +done: + talloc_free(h); + return ret; +} + +/** + * cancel a transaction + */ +int ctdb_transaction_cancel(struct ctdb_transaction_handle *h) +{ + talloc_free(h); + return 0; +} + +#if 0 /** * check whether a transaction is active on a given db on a given node */ @@ -4437,6 +4726,7 @@ again: talloc_free(h); return 0; } +#endif /* recovery daemon ping to main daemon diff --git a/ctdb/include/ctdb_client.h b/ctdb/include/ctdb_client.h index 35ce6d1b26f..ca2f7620e5b 100644 --- a/ctdb/include/ctdb_client.h +++ b/ctdb/include/ctdb_client.h @@ -546,11 +546,6 @@ struct ctdb_client_control_state *ctdb_ctrl_getcapabilities_send(struct ctdb_con int ctdb_ctrl_getcapabilities_recv(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx, struct ctdb_client_control_state *state, uint32_t *capabilities); - -int32_t ctdb_ctrl_transaction_active(struct ctdb_context *ctdb, - uint32_t destnode, - uint32_t db_id); - struct ctdb_marshall_buffer *ctdb_marshall_add(TALLOC_CTX *mem_ctx, struct ctdb_marshall_buffer *m, uint64_t db_id,