mirror of
https://github.com/samba-team/samba.git
synced 2024-12-22 13:34:15 +03:00
ctdb-daemon: Drop implementation of RECEIVE_RECORDS control
BUG: https://bugzilla.samba.org/show_bug.cgi?id=13641 Signed-off-by: Amitay Isaacs <amitay@gmail.com> Reviewed-by: Martin Schwenke <martin@meltin.net>
This commit is contained in:
parent
e15cdc652d
commit
d18385ea2a
@ -821,8 +821,6 @@ int32_t ctdb_control_start_recovery(struct ctdb_context *ctdb,
|
||||
|
||||
int32_t ctdb_control_try_delete_records(struct ctdb_context *ctdb,
|
||||
TDB_DATA indata, TDB_DATA *outdata);
|
||||
int32_t ctdb_control_receive_records(struct ctdb_context *ctdb,
|
||||
TDB_DATA indata, TDB_DATA *outdata);
|
||||
|
||||
int32_t ctdb_control_get_capabilities(struct ctdb_context *ctdb,
|
||||
TDB_DATA *outdata);
|
||||
|
@ -650,7 +650,7 @@ static int32_t ctdb_control_dispatch(struct ctdb_context *ctdb,
|
||||
return ctdb_control_reload_public_ips(ctdb, c, async_reply);
|
||||
|
||||
case CTDB_CONTROL_RECEIVE_RECORDS:
|
||||
return ctdb_control_receive_records(ctdb, indata, outdata);
|
||||
return control_not_implemented("RECEIVE_RECORDS", NULL);
|
||||
|
||||
case CTDB_CONTROL_DB_DETACH:
|
||||
return ctdb_control_db_detach(ctdb, indata, client_id);
|
||||
|
@ -1330,205 +1330,6 @@ int32_t ctdb_control_try_delete_records(struct ctdb_context *ctdb, TDB_DATA inda
|
||||
return 0;
|
||||
}
|
||||
|
||||
/**
|
||||
* Store a record as part of the vacuum process:
|
||||
* This is called from the RECEIVE_RECORD control which
|
||||
* the lmaster uses to send the current empty copy
|
||||
* to all nodes for storing, before it lets the other
|
||||
* nodes delete the records in the second phase with
|
||||
* the TRY_DELETE_RECORDS control.
|
||||
*
|
||||
* Only store if we are not lmaster or dmaster, and our
|
||||
* rsn is <= the provided rsn. Use non-blocking locks.
|
||||
*
|
||||
* return 0 if the record was successfully stored.
|
||||
* return !0 if the record still exists in the tdb after returning.
|
||||
*/
|
||||
static int store_tdb_record(struct ctdb_context *ctdb,
|
||||
struct ctdb_db_context *ctdb_db,
|
||||
struct ctdb_rec_data_old *rec)
|
||||
{
|
||||
TDB_DATA key, data, data2;
|
||||
struct ctdb_ltdb_header *hdr, *hdr2;
|
||||
int ret;
|
||||
|
||||
key.dsize = rec->keylen;
|
||||
key.dptr = &rec->data[0];
|
||||
data.dsize = rec->datalen;
|
||||
data.dptr = &rec->data[rec->keylen];
|
||||
|
||||
if (ctdb_lmaster(ctdb, &key) == ctdb->pnn) {
|
||||
DEBUG(DEBUG_INFO, (__location__ " Called store_tdb_record "
|
||||
"where we are lmaster\n"));
|
||||
return -1;
|
||||
}
|
||||
|
||||
if (data.dsize != sizeof(struct ctdb_ltdb_header)) {
|
||||
DEBUG(DEBUG_ERR, (__location__ " Bad record size\n"));
|
||||
return -1;
|
||||
}
|
||||
|
||||
hdr = (struct ctdb_ltdb_header *)data.dptr;
|
||||
|
||||
/* use a non-blocking lock */
|
||||
if (tdb_chainlock_nonblock(ctdb_db->ltdb->tdb, key) != 0) {
|
||||
DEBUG(DEBUG_INFO, (__location__ " Failed to lock chain in non-blocking mode\n"));
|
||||
return -1;
|
||||
}
|
||||
|
||||
data2 = tdb_fetch(ctdb_db->ltdb->tdb, key);
|
||||
if (data2.dptr == NULL || data2.dsize < sizeof(struct ctdb_ltdb_header)) {
|
||||
if (tdb_store(ctdb_db->ltdb->tdb, key, data, 0) == -1) {
|
||||
DEBUG(DEBUG_ERR, (__location__ "Failed to store record\n"));
|
||||
ret = -1;
|
||||
goto done;
|
||||
}
|
||||
DEBUG(DEBUG_INFO, (__location__ " Stored record\n"));
|
||||
ret = 0;
|
||||
goto done;
|
||||
}
|
||||
|
||||
hdr2 = (struct ctdb_ltdb_header *)data2.dptr;
|
||||
|
||||
if (hdr2->rsn > hdr->rsn) {
|
||||
DEBUG(DEBUG_INFO, (__location__ " Skipping record with "
|
||||
"rsn=%llu - called with rsn=%llu\n",
|
||||
(unsigned long long)hdr2->rsn,
|
||||
(unsigned long long)hdr->rsn));
|
||||
ret = -1;
|
||||
goto done;
|
||||
}
|
||||
|
||||
/* do not allow vacuuming of records that have readonly flags set. */
|
||||
if (hdr->flags & CTDB_REC_RO_FLAGS) {
|
||||
DEBUG(DEBUG_INFO,(__location__ " Skipping record with readonly "
|
||||
"flags set\n"));
|
||||
ret = -1;
|
||||
goto done;
|
||||
}
|
||||
if (hdr2->flags & CTDB_REC_RO_FLAGS) {
|
||||
DEBUG(DEBUG_INFO,(__location__ " Skipping record with readonly "
|
||||
"flags set\n"));
|
||||
ret = -1;
|
||||
goto done;
|
||||
}
|
||||
|
||||
if (hdr2->dmaster == ctdb->pnn) {
|
||||
DEBUG(DEBUG_INFO, (__location__ " Attempted to store record "
|
||||
"where we are the dmaster\n"));
|
||||
ret = -1;
|
||||
goto done;
|
||||
}
|
||||
|
||||
if (tdb_store(ctdb_db->ltdb->tdb, key, data, 0) != 0) {
|
||||
DEBUG(DEBUG_INFO,(__location__ " Failed to store record\n"));
|
||||
ret = -1;
|
||||
goto done;
|
||||
}
|
||||
|
||||
ret = 0;
|
||||
|
||||
done:
|
||||
tdb_chainunlock(ctdb_db->ltdb->tdb, key);
|
||||
free(data2.dptr);
|
||||
return ret;
|
||||
}
|
||||
|
||||
|
||||
|
||||
/**
|
||||
* Try to store all these records as part of the vacuuming process
|
||||
* and return the records we failed to store.
|
||||
*/
|
||||
int32_t ctdb_control_receive_records(struct ctdb_context *ctdb,
|
||||
TDB_DATA indata, TDB_DATA *outdata)
|
||||
{
|
||||
struct ctdb_marshall_buffer *reply = (struct ctdb_marshall_buffer *)indata.dptr;
|
||||
struct ctdb_db_context *ctdb_db;
|
||||
int i;
|
||||
struct ctdb_rec_data_old *rec;
|
||||
struct ctdb_marshall_buffer *records;
|
||||
|
||||
if (indata.dsize < offsetof(struct ctdb_marshall_buffer, data)) {
|
||||
DEBUG(DEBUG_ERR,
|
||||
(__location__ " invalid data in receive_records\n"));
|
||||
return -1;
|
||||
}
|
||||
|
||||
ctdb_db = find_ctdb_db(ctdb, reply->db_id);
|
||||
if (!ctdb_db) {
|
||||
DEBUG(DEBUG_ERR, (__location__ " Unknown db 0x%08x\n",
|
||||
reply->db_id));
|
||||
return -1;
|
||||
}
|
||||
|
||||
DEBUG(DEBUG_DEBUG, ("starting receive_records of %u records for "
|
||||
"dbid 0x%x\n", reply->count, reply->db_id));
|
||||
|
||||
/* create a blob to send back the records we could not store */
|
||||
records = (struct ctdb_marshall_buffer *)
|
||||
talloc_zero_size(outdata,
|
||||
offsetof(struct ctdb_marshall_buffer, data));
|
||||
if (records == NULL) {
|
||||
DEBUG(DEBUG_ERR, (__location__ " Out of memory\n"));
|
||||
return -1;
|
||||
}
|
||||
records->db_id = ctdb_db->db_id;
|
||||
|
||||
rec = (struct ctdb_rec_data_old *)&reply->data[0];
|
||||
for (i=0; i<reply->count; i++) {
|
||||
TDB_DATA key, data;
|
||||
|
||||
key.dptr = &rec->data[0];
|
||||
key.dsize = rec->keylen;
|
||||
data.dptr = &rec->data[key.dsize];
|
||||
data.dsize = rec->datalen;
|
||||
|
||||
if (data.dsize < sizeof(struct ctdb_ltdb_header)) {
|
||||
DEBUG(DEBUG_CRIT, (__location__ " bad ltdb record "
|
||||
"in indata\n"));
|
||||
talloc_free(records);
|
||||
return -1;
|
||||
}
|
||||
|
||||
/*
|
||||
* If we can not store the record we must add it to the reply
|
||||
* so the lmaster knows it may not purge this record.
|
||||
*/
|
||||
if (store_tdb_record(ctdb, ctdb_db, rec) != 0) {
|
||||
size_t old_size;
|
||||
struct ctdb_ltdb_header *hdr;
|
||||
|
||||
hdr = (struct ctdb_ltdb_header *)data.dptr;
|
||||
data.dptr += sizeof(*hdr);
|
||||
data.dsize -= sizeof(*hdr);
|
||||
|
||||
DEBUG(DEBUG_INFO, (__location__ " Failed to store "
|
||||
"record with hash 0x%08x in vacuum "
|
||||
"via RECEIVE_RECORDS\n",
|
||||
ctdb_hash(&key)));
|
||||
|
||||
old_size = talloc_get_size(records);
|
||||
records = talloc_realloc_size(outdata, records,
|
||||
old_size + rec->length);
|
||||
if (records == NULL) {
|
||||
DEBUG(DEBUG_ERR, (__location__ " Failed to "
|
||||
"expand\n"));
|
||||
return -1;
|
||||
}
|
||||
records->count++;
|
||||
memcpy(old_size+(uint8_t *)records, rec, rec->length);
|
||||
}
|
||||
|
||||
rec = (struct ctdb_rec_data_old *)(rec->length + (uint8_t *)rec);
|
||||
}
|
||||
|
||||
*outdata = ctdb_marshall_finish(records);
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
report capabilities
|
||||
*/
|
||||
|
Loading…
Reference in New Issue
Block a user