1
0
mirror of https://github.com/samba-team/samba.git synced 2025-01-25 06:04:04 +03:00

added seqnum propogation code to ctdb

(This used to be ctdb commit be2572b1b09eaaa1ea6a726d60f16996f9407d13)
This commit is contained in:
Andrew Tridgell 2007-05-04 22:18:00 +10:00
parent ed3e847785
commit fccc585f5a
6 changed files with 116 additions and 15 deletions

View File

@ -530,6 +530,7 @@ struct ctdb_context *ctdb_init(struct event_context *ev)
ctdb->upcalls = &ctdb_upcalls;
ctdb->idr = idr_init(ctdb);
ctdb->max_lacount = CTDB_DEFAULT_MAX_LACOUNT;
ctdb->seqnum_frequency = CTDB_DEFAULT_SEQNUM_FREQUENCY;
return ctdb;
}

View File

@ -395,15 +395,17 @@ static int32_t ctdb_control_dispatch(struct ctdb_context *ctdb,
case CTDB_CONTROL_DEREGISTER_SRVID:
return daemon_deregister_message_handler(ctdb, client_id, srvid);
case CTDB_CONTROL_ENABLE_SEQNUM: {
uint32_t db_id;
struct ctdb_db_context *ctdb_db;
CHECK_CONTROL_DATA_SIZE(sizeof(db_id));
ctdb_db = find_ctdb_db(ctdb, db_id);
if (!ctdb_db) return -1;
tdb_enable_seqnum(ctdb_db->ltdb->tdb);
return 0;
}
case CTDB_CONTROL_ENABLE_SEQNUM:
CHECK_CONTROL_DATA_SIZE(sizeof(uint32_t));
return ctdb_ltdb_enable_seqnum(ctdb, *(uint32_t *)indata.dptr);
case CTDB_CONTROL_UPDATE_SEQNUM:
CHECK_CONTROL_DATA_SIZE(sizeof(uint32_t));
return ctdb_ltdb_update_seqnum(ctdb, *(uint32_t *)indata.dptr, srcnode);
case CTDB_CONTROL_SET_SEQNUM_FREQUENCY:
CHECK_CONTROL_DATA_SIZE(sizeof(uint32_t));
return ctdb_ltdb_set_seqnum_frequency(ctdb, *(uint32_t *)indata.dptr);
default:
DEBUG(0,(__location__ " Unknown CTDB control opcode %u\n", opcode));

View File

@ -398,3 +398,78 @@ int32_t ctdb_control_db_attach(struct ctdb_context *ctdb, TDB_DATA indata,
return 0;
}
/*
called when a broadcast seqnum update comes in
*/
int32_t ctdb_ltdb_update_seqnum(struct ctdb_context *ctdb, uint32_t db_id, uint32_t srcnode)
{
struct ctdb_db_context *ctdb_db;
if (srcnode == ctdb->vnn) {
/* don't update ourselves! */
return 0;
}
ctdb_db = find_ctdb_db(ctdb, db_id);
if (!ctdb_db) {
DEBUG(0,("Unknown db_id 0x%x in ctdb_ltdb_update_seqnum\n"));
return -1;
}
tdb_increment_seqnum_nonblock(ctdb_db->ltdb->tdb);
ctdb_db->seqnum = tdb_get_seqnum(ctdb_db->ltdb->tdb);
return 0;
}
/*
timer to check for seqnum changes in a ltdb and propogate them
*/
static void ctdb_ltdb_seqnum_check(struct event_context *ev, struct timed_event *te,
struct timeval t, void *p)
{
struct ctdb_db_context *ctdb_db = talloc_get_type(p, struct ctdb_db_context);
struct ctdb_context *ctdb = ctdb_db->ctdb;
uint32_t new_seqnum = tdb_get_seqnum(ctdb_db->ltdb->tdb);
if (new_seqnum != ctdb_db->seqnum) {
/* something has changed - propogate it */
TDB_DATA data;
data.dptr = (uint8_t *)&ctdb_db->db_id;
data.dsize = sizeof(uint32_t);
ctdb_daemon_send_control(ctdb, CTDB_BROADCAST_VNN, 0,
CTDB_CONTROL_UPDATE_SEQNUM, 0, CTDB_CTRL_FLAG_NOREPLY,
data, NULL, NULL);
}
ctdb_db->seqnum = new_seqnum;
/* setup a new timer */
event_add_timed(ctdb->ev, ctdb_db, timeval_current_ofs(ctdb->seqnum_frequency, 0),
ctdb_ltdb_seqnum_check, ctdb_db);
}
/*
enable seqnum handling on this db
*/
int32_t ctdb_ltdb_enable_seqnum(struct ctdb_context *ctdb, uint32_t db_id)
{
struct ctdb_db_context *ctdb_db;
ctdb_db = find_ctdb_db(ctdb, db_id);
if (!ctdb_db) {
DEBUG(0,("Unknown db_id 0x%x in ctdb_ltdb_enable_seqnum\n"));
return -1;
}
event_add_timed(ctdb->ev, ctdb_db, timeval_current_ofs(ctdb->seqnum_frequency, 0),
ctdb_ltdb_seqnum_check, ctdb_db);
tdb_enable_seqnum(ctdb_db->ltdb->tdb);
ctdb_db->seqnum = tdb_get_seqnum(ctdb_db->ltdb->tdb);
return 0;
}
/*
enable seqnum handling on this db
*/
int32_t ctdb_ltdb_set_seqnum_frequency(struct ctdb_context *ctdb, uint32_t frequency)
{
ctdb->seqnum_frequency = frequency;
return 0;
}

View File

@ -37,6 +37,7 @@
#define CTDB_BROADCAST_VNN 0xF0000002
#define CTDB_MAX_REDIRECT_COUNT 3
#define CTDB_DEFAULT_SEQNUM_FREQUENCY 1
/*
an installed ctdb remote call
@ -210,6 +211,7 @@ struct ctdb_context {
struct ctdb_status status;
struct ctdb_vnn_map *vnn_map;
uint32_t num_clients;
uint32_t seqnum_frequency;
};
struct ctdb_db_context {
@ -220,6 +222,7 @@ struct ctdb_db_context {
const char *db_path;
struct tdb_wrap *ltdb;
struct ctdb_registered_call *calls; /* list of registered calls */
uint32_t seqnum;
};
@ -283,6 +286,8 @@ enum ctdb_controls {CTDB_CONTROL_PROCESS_EXISTS,
CTDB_CONTROL_REGISTER_SRVID,
CTDB_CONTROL_DEREGISTER_SRVID,
CTDB_CONTROL_ENABLE_SEQNUM,
CTDB_CONTROL_UPDATE_SEQNUM,
CTDB_CONTROL_SET_SEQNUM_FREQUENCY,
};
@ -689,4 +694,8 @@ int daemon_register_message_handler(struct ctdb_context *ctdb, uint32_t client_i
int ctdb_deregister_message_handler(struct ctdb_context *ctdb, uint64_t srvid, void *private_data);
int daemon_deregister_message_handler(struct ctdb_context *ctdb, uint32_t client_id, uint64_t srvid);
int32_t ctdb_ltdb_enable_seqnum(struct ctdb_context *ctdb, uint32_t db_id);
int32_t ctdb_ltdb_update_seqnum(struct ctdb_context *ctdb, uint32_t db_id, uint32_t srcnode);
int32_t ctdb_ltdb_set_seqnum_frequency(struct ctdb_context *ctdb, uint32_t frequency);
#endif

View File

@ -31,10 +31,10 @@
TDB_DATA tdb_null;
/*
increment the tdb sequence number if the tdb has been opened using
non-blocking increment of the tdb sequence number if the tdb has been opened using
the TDB_SEQNUM flag
*/
static void tdb_increment_seqnum(struct tdb_context *tdb)
void tdb_increment_seqnum_nonblock(struct tdb_context *tdb)
{
tdb_off_t seqnum=0;
@ -42,16 +42,29 @@ static void tdb_increment_seqnum(struct tdb_context *tdb)
return;
}
if (tdb_brlock(tdb, TDB_SEQNUM_OFS, F_WRLCK, F_SETLKW, 1, 1) != 0) {
return;
}
/* we ignore errors from this, as we have no sane way of
dealing with them.
*/
tdb_ofs_read(tdb, TDB_SEQNUM_OFS, &seqnum);
seqnum++;
tdb_ofs_write(tdb, TDB_SEQNUM_OFS, &seqnum);
}
/*
increment the tdb sequence number if the tdb has been opened using
the TDB_SEQNUM flag
*/
static void tdb_increment_seqnum(struct tdb_context *tdb)
{
if (!(tdb->flags & TDB_SEQNUM)) {
return;
}
if (tdb_brlock(tdb, TDB_SEQNUM_OFS, F_WRLCK, F_SETLKW, 1, 1) != 0) {
return;
}
tdb_increment_seqnum_nonblock(tdb);
tdb_brlock(tdb, TDB_SEQNUM_OFS, F_UNLCK, F_SETLKW, 1, 1);
}

View File

@ -132,6 +132,7 @@ int tdb_hash_size(struct tdb_context *tdb);
size_t tdb_map_size(struct tdb_context *tdb);
int tdb_get_flags(struct tdb_context *tdb);
void tdb_enable_seqnum(struct tdb_context *tdb);
void tdb_increment_seqnum_nonblock(struct tdb_context *tdb);
/* Low level locking functions: use with care */
int tdb_chainlock(struct tdb_context *tdb, TDB_DATA key);