1
0
mirror of https://github.com/samba-team/samba.git synced 2025-02-03 13:47:25 +03:00

call: transfer the record flags in the ctdb call packets.

This way, the MIGRATED_WITH_DATA information can be transported
along with the records. This is important for vacuuming to function
properly.

The record flags are appended to the data section of the ctdb_req_dmaster
and ctdb_reply_dmaster structs.

Pair-Programmed-With: Stefan Metzmacher <metze@samba.org>

(This used to be ctdb commit 945187d64cfc7bd30a0c3b0d548cbe582d95dde3)
This commit is contained in:
Michael Adam 2010-12-10 14:02:33 +01:00
parent 2ad1c3f6c7
commit eb1b7d1c05

View File

@ -181,7 +181,7 @@ static void ctdb_send_dmaster_reply(struct ctdb_db_context *ctdb_db,
tmp_ctx = talloc_new(ctdb);
/* send the CTDB_REPLY_DMASTER */
len = offsetof(struct ctdb_reply_dmaster, data) + key.dsize + data.dsize;
len = offsetof(struct ctdb_reply_dmaster, data) + key.dsize + data.dsize + sizeof(uint32_t);
r = ctdb_transport_allocate(ctdb, tmp_ctx, CTDB_REPLY_DMASTER, len,
struct ctdb_reply_dmaster);
CTDB_NO_MEMORY_FATAL(ctdb, r);
@ -194,6 +194,7 @@ static void ctdb_send_dmaster_reply(struct ctdb_db_context *ctdb_db,
r->db_id = ctdb_db->db_id;
memcpy(&r->data[0], key.dptr, key.dsize);
memcpy(&r->data[key.dsize], data.dptr, data.dsize);
memcpy(&r->data[key.dsize+data.dsize], &header->flags, sizeof(uint32_t));
ctdb_queue_packet(ctdb, &r->hdr);
@ -232,7 +233,8 @@ static void ctdb_call_send_dmaster(struct ctdb_db_context *ctdb_db,
return;
}
len = offsetof(struct ctdb_req_dmaster, data) + key->dsize + data->dsize;
len = offsetof(struct ctdb_req_dmaster, data) + key->dsize + data->dsize
+ sizeof(uint32_t);
r = ctdb_transport_allocate(ctdb, ctdb, CTDB_REQ_DMASTER, len,
struct ctdb_req_dmaster);
CTDB_NO_MEMORY_FATAL(ctdb, r);
@ -245,6 +247,7 @@ static void ctdb_call_send_dmaster(struct ctdb_db_context *ctdb_db,
r->datalen = data->dsize;
memcpy(&r->data[0], key->dptr, key->dsize);
memcpy(&r->data[key->dsize], data->dptr, data->dsize);
memcpy(&r->data[key->dsize + data->dsize], &header->flags, sizeof(uint32_t));
header->dmaster = c->hdr.srcnode;
if (ctdb_ltdb_store(ctdb_db, *key, header, *data) != 0) {
@ -262,10 +265,10 @@ static void ctdb_call_send_dmaster(struct ctdb_db_context *ctdb_db,
must be called with the chainlock held. This function releases the chainlock
*/
static void ctdb_become_dmaster(struct ctdb_db_context *ctdb_db,
static void ctdb_become_dmaster(struct ctdb_db_context *ctdb_db,
struct ctdb_req_header *hdr,
TDB_DATA key, TDB_DATA data,
uint64_t rsn)
uint64_t rsn, uint32_t record_flags)
{
struct ctdb_call_state *state;
struct ctdb_context *ctdb = ctdb_db->ctdb;
@ -349,12 +352,19 @@ void ctdb_request_dmaster(struct ctdb_context *ctdb, struct ctdb_req_header *hdr
TDB_DATA key, data, data2;
struct ctdb_ltdb_header header;
struct ctdb_db_context *ctdb_db;
uint32_t record_flags = 0;
size_t len;
int ret;
key.dptr = c->data;
key.dsize = c->keylen;
data.dptr = c->data + c->keylen;
data.dsize = c->datalen;
len = offsetof(struct ctdb_req_dmaster, data) + key.dsize + data.dsize
+ sizeof(uint32_t);
if (len <= c->hdr.length) {
record_flags = *(uint32_t *)&c->data[c->keylen + c->datalen];
}
ctdb_db = find_ctdb_db(ctdb, c->db_id);
if (!ctdb_db) {
@ -411,10 +421,13 @@ void ctdb_request_dmaster(struct ctdb_context *ctdb, struct ctdb_req_header *hdr
/* use the rsn from the sending node */
header.rsn = c->rsn;
/* store the record flags from the sending node */
header.flags = record_flags;
/* check if the new dmaster is the lmaster, in which case we
skip the dmaster reply */
if (c->dmaster == ctdb->pnn) {
ctdb_become_dmaster(ctdb_db, hdr, key, data, c->rsn);
ctdb_become_dmaster(ctdb_db, hdr, key, data, c->rsn, record_flags);
} else {
ctdb_send_dmaster_reply(ctdb_db, &header, key, data, c->dmaster, hdr->reqid);
@ -587,6 +600,8 @@ void ctdb_reply_dmaster(struct ctdb_context *ctdb, struct ctdb_req_header *hdr)
struct ctdb_reply_dmaster *c = (struct ctdb_reply_dmaster *)hdr;
struct ctdb_db_context *ctdb_db;
TDB_DATA key, data;
uint32_t record_flags = 0;
size_t len;
int ret;
ctdb_db = find_ctdb_db(ctdb, c->db_id);
@ -599,6 +614,11 @@ void ctdb_reply_dmaster(struct ctdb_context *ctdb, struct ctdb_req_header *hdr)
key.dsize = c->keylen;
data.dptr = &c->data[key.dsize];
data.dsize = c->datalen;
len = offsetof(struct ctdb_reply_dmaster, data) + key.dsize + data.dsize
+ sizeof(uint32_t);
if (len <= c->hdr.length) {
record_flags = *(uint32_t *)&c->data[c->keylen + c->datalen];
}
ret = ctdb_ltdb_lock_requeue(ctdb_db, key, hdr,
ctdb_call_input_pkt, ctdb, False);
@ -610,7 +630,7 @@ void ctdb_reply_dmaster(struct ctdb_context *ctdb, struct ctdb_req_header *hdr)
return;
}
ctdb_become_dmaster(ctdb_db, hdr, key, data, c->rsn);
ctdb_become_dmaster(ctdb_db, hdr, key, data, c->rsn, record_flags);
}