1
0
mirror of https://github.com/samba-team/samba.git synced 2025-03-27 22:50:26 +03:00

STICKY: add prototype code to make records stick to a node to "calm" down if they are found to be very hot and accessed by a lot of clients.

This can improve performance and stop clients from having to chase a rapidly migrating/bouncing record

(This used to be ctdb commit d0d98f7e45e5084b81335b004d50bddc80cdc219)
This commit is contained in:
Ronnie Sahlberg 2012-03-20 16:58:35 +11:00
parent 462cdbc5c4
commit fa3a06246a
10 changed files with 403 additions and 13 deletions

View File

@ -4630,3 +4630,41 @@ int ctdb_ctrl_set_db_readonly(struct ctdb_context *ctdb, uint32_t destnode, uint
state = ctdb_ctrl_set_db_readonly_send(ctdb, destnode, dbid);
return ctdb_ctrl_set_db_readonly_recv(ctdb, state);
}
/*
set a database to be sticky
*/
struct ctdb_client_control_state *
ctdb_ctrl_set_db_sticky_send(struct ctdb_context *ctdb, uint32_t destnode, uint32_t dbid)
{
TDB_DATA data;
data.dptr = (uint8_t *)&dbid;
data.dsize = sizeof(dbid);
return ctdb_control_send(ctdb, destnode, 0,
CTDB_CONTROL_SET_DB_STICKY, 0, data,
ctdb, NULL, NULL);
}
int ctdb_ctrl_set_db_sticky_recv(struct ctdb_context *ctdb, struct ctdb_client_control_state *state)
{
int ret;
int32_t res;
ret = ctdb_control_recv(ctdb, state, ctdb, NULL, &res, NULL);
if (ret != 0 || res != 0) {
DEBUG(DEBUG_ERR,(__location__ " ctdb_ctrl_set_db_sticky_recv failed ret:%d res:%d\n", ret, res));
return -1;
}
return 0;
}
int ctdb_ctrl_set_db_sticky(struct ctdb_context *ctdb, uint32_t destnode, uint32_t dbid)
{
struct ctdb_client_control_state *state;
state = ctdb_ctrl_set_db_sticky_send(ctdb, destnode, dbid);
return ctdb_ctrl_set_db_sticky_recv(ctdb, state);
}

View File

@ -219,6 +219,7 @@ struct ctdb_dbid_map {
uint32_t dbid;
#define CTDB_DB_FLAGS_PERSISTENT 0x01
#define CTDB_DB_FLAGS_READONLY 0x02
#define CTDB_DB_FLAGS_STICKY 0x04
uint8_t flags;
} dbs[1];
};
@ -621,4 +622,9 @@ ctdb_ctrl_set_db_readonly_send(struct ctdb_context *ctdb, uint32_t destnode, uin
int ctdb_ctrl_set_db_readonly_recv(struct ctdb_context *ctdb, struct ctdb_client_control_state *state);
int ctdb_ctrl_set_db_readonly(struct ctdb_context *ctdb, uint32_t destnode, uint32_t dbid);
struct ctdb_client_control_state *
ctdb_ctrl_set_db_sticky_send(struct ctdb_context *ctdb, uint32_t destnode, uint32_t dbid);
int ctdb_ctrl_set_db_sticky_recv(struct ctdb_context *ctdb, struct ctdb_client_control_state *state);
int ctdb_ctrl_set_db_sticky(struct ctdb_context *ctdb, uint32_t destnode, uint32_t dbid);
#endif /* _CTDB_CLIENT_H */

View File

@ -127,6 +127,9 @@ struct ctdb_tunable {
uint32_t deferred_rebalance_on_node_add;
uint32_t fetch_collapse;
uint32_t max_lacount;
uint32_t hopcount_make_sticky;
uint32_t sticky_duration;
uint32_t sticky_pindown;
};
/*
@ -501,6 +504,7 @@ struct ctdb_db_context {
uint32_t priority;
bool persistent;
bool readonly; /* Do we support read-only delegations ? */
bool sticky; /* Do we support sticky records ? */
const char *db_name;
const char *db_path;
struct tdb_wrap *ltdb;
@ -518,6 +522,7 @@ struct ctdb_db_context {
struct revokechild_handle *revokechild_active;
struct ctdb_persistent_state *persistent_state;
struct trbt_tree *delete_queue;
struct trbt_tree *sticky_records;
int (*ctdb_ltdb_store_fn)(struct ctdb_db_context *ctdb_db,
TDB_DATA key,
struct ctdb_ltdb_header *header,
@ -1461,4 +1466,6 @@ int32_t ctdb_control_get_db_statistics(struct ctdb_context *ctdb,
uint32_t db_id,
TDB_DATA *outdata);
int ctdb_set_db_sticky(struct ctdb_context *ctdb, struct ctdb_db_context *ctdb_db);
#endif

View File

@ -383,6 +383,7 @@ enum ctdb_controls {CTDB_CONTROL_PROCESS_EXISTS = 0,
CTDB_CONTROL_CHECK_SRVIDS = 130,
CTDB_CONTROL_TRAVERSE_START_EXT = 131,
CTDB_CONTROL_GET_DB_STATISTICS = 132,
CTDB_CONTROL_SET_DB_STICKY = 133,
};
/*

View File

@ -27,6 +27,13 @@
#include "system/network.h"
#include "system/filesys.h"
#include "../include/ctdb_private.h"
#include "../common/rb_tree.h"
struct ctdb_sticky_record {
struct ctdb_context *ctdb;
struct ctdb_db_context *ctdb_db;
TDB_CONTEXT *pindown;
};
/*
find the ctdb_db from a db index
@ -122,21 +129,30 @@ static void ctdb_send_error(struct ctdb_context *ctdb,
* going back to the LMASTER is configurable by the tunable
* "MaxRedirectCount".
*/
static void ctdb_call_send_redirect(struct ctdb_context *ctdb,
static void ctdb_call_send_redirect(struct ctdb_context *ctdb,
struct ctdb_db_context *ctdb_db,
TDB_DATA key,
struct ctdb_req_call *c,
struct ctdb_ltdb_header *header)
{
uint32_t lmaster = ctdb_lmaster(ctdb, &key);
c->hdr.destnode = lmaster;
if (ctdb->pnn == lmaster) {
c->hdr.destnode = header->dmaster;
} else if ((c->hopcount % ctdb->tunable.max_redirect_count) == 0) {
c->hdr.destnode = lmaster;
} else {
c->hdr.destnode = header->dmaster;
}
c->hopcount++;
if (c->hopcount%100 == 99) {
DEBUG(DEBUG_WARNING,("High hopcount %d dbid:0x%08x "
"key:0x%08x pnn:%d src:%d lmaster:%d "
"header->dmaster:%d dst:%d\n",
c->hopcount, ctdb_db->db_id, ctdb_hash(&key),
ctdb->pnn, c->hdr.srcnode, lmaster,
header->dmaster, c->hdr.destnode));
}
ctdb_queue_packet(ctdb, &c->hdr);
}
@ -258,6 +274,57 @@ static void ctdb_call_send_dmaster(struct ctdb_db_context *ctdb_db,
talloc_free(r);
}
static void ctdb_sticky_pindown_timeout(struct event_context *ev, struct timed_event *te,
struct timeval t, void *private_data)
{
struct ctdb_sticky_record *sr = talloc_get_type(private_data,
struct ctdb_sticky_record);
DEBUG(DEBUG_ERR,("Pindown timeout db:%s unstick record\n", sr->ctdb_db->db_name));
if (sr->pindown != NULL) {
talloc_free(sr->pindown);
sr->pindown = NULL;
}
}
static int
ctdb_set_sticky_pindown(struct ctdb_context *ctdb, struct ctdb_db_context *ctdb_db, TDB_DATA key)
{
TALLOC_CTX *tmp_ctx = talloc_new(NULL);
uint32_t *k;
struct ctdb_sticky_record *sr;
k = talloc_zero_size(tmp_ctx, ((key.dsize + 3) & 0xfffffffc) + 4);
if (k == NULL) {
DEBUG(DEBUG_ERR,("Failed to allocate key for sticky record\n"));
talloc_free(tmp_ctx);
return -1;
}
k[0] = (key.dsize + 3) / 4 + 1;
memcpy(&k[1], key.dptr, key.dsize);
sr = trbt_lookuparray32(ctdb_db->sticky_records, k[0], &k[0]);
if (sr == NULL) {
talloc_free(tmp_ctx);
return 0;
}
talloc_free(tmp_ctx);
if (sr->pindown == NULL) {
DEBUG(DEBUG_ERR,("Pinning down record in %s for %d ms\n", ctdb_db->db_name, ctdb->tunable.sticky_pindown));
sr->pindown = talloc_new(sr);
if (sr->pindown == NULL) {
DEBUG(DEBUG_ERR,("Failed to allocate pindown context for sticky record\n"));
return -1;
}
event_add_timed(ctdb->ev, sr->pindown, timeval_current_ofs(ctdb->tunable.sticky_pindown / 1000, (ctdb->tunable.sticky_pindown * 1000) % 1000000), ctdb_sticky_pindown_timeout, sr);
}
return 0;
}
/*
called when a CTDB_REPLY_DMASTER packet comes in, or when the lmaster
gets a CTDB_REQUEST_DMASTER for itself. We become the dmaster.
@ -305,6 +372,13 @@ static void ctdb_become_dmaster(struct ctdb_db_context *ctdb_db,
return;
}
/* we just became DMASTER and this database is "sticky",
see if the record is flagged as "hot" and set up a pin-down
context to stop migrations for a little while if so
*/
if (ctdb_db->sticky) {
ctdb_set_sticky_pindown(ctdb, ctdb_db, key);
}
if (state == NULL) {
DEBUG(DEBUG_ERR,("pnn %u Invalid reqid %u in ctdb_become_dmaster from node %u\n",
@ -452,6 +526,147 @@ void ctdb_request_dmaster(struct ctdb_context *ctdb, struct ctdb_req_header *hdr
}
}
static void ctdb_sticky_record_timeout(struct event_context *ev, struct timed_event *te,
struct timeval t, void *private_data)
{
struct ctdb_sticky_record *sr = talloc_get_type(private_data,
struct ctdb_sticky_record);
talloc_free(sr);
}
static void *ctdb_make_sticky_record_callback(void *parm, void *data)
{
if (data) {
DEBUG(DEBUG_ERR,("Already have sticky record registered. Free old %p and create new %p\n", data, parm));
talloc_free(data);
}
return parm;
}
static int
ctdb_make_record_sticky(struct ctdb_context *ctdb, struct ctdb_db_context *ctdb_db, TDB_DATA key)
{
TALLOC_CTX *tmp_ctx = talloc_new(NULL);
uint32_t *k;
struct ctdb_sticky_record *sr;
k = talloc_zero_size(tmp_ctx, ((key.dsize + 3) & 0xfffffffc) + 4);
if (k == NULL) {
DEBUG(DEBUG_ERR,("Failed to allocate key for sticky record\n"));
talloc_free(tmp_ctx);
return -1;
}
k[0] = (key.dsize + 3) / 4 + 1;
memcpy(&k[1], key.dptr, key.dsize);
sr = trbt_lookuparray32(ctdb_db->sticky_records, k[0], &k[0]);
if (sr != NULL) {
talloc_free(tmp_ctx);
return 0;
}
sr = talloc(ctdb_db->sticky_records, struct ctdb_sticky_record);
if (sr == NULL) {
talloc_free(tmp_ctx);
DEBUG(DEBUG_ERR,("Failed to allocate sticky record structure\n"));
return -1;
}
sr->ctdb = ctdb;
sr->ctdb_db = ctdb_db;
sr->pindown = NULL;
DEBUG(DEBUG_ERR,("Make record sticky in db %s\n", ctdb_db->db_name));
trbt_insertarray32_callback(ctdb_db->sticky_records, k[0], &k[0], ctdb_make_sticky_record_callback, sr);
event_add_timed(ctdb->ev, sr, timeval_current_ofs(ctdb->tunable.sticky_duration, 0), ctdb_sticky_record_timeout, sr);
talloc_free(tmp_ctx);
return 0;
}
struct pinned_down_requeue_handle {
struct ctdb_context *ctdb;
struct ctdb_req_header *hdr;
};
struct pinned_down_deferred_call {
struct ctdb_context *ctdb;
struct ctdb_req_header *hdr;
};
static void pinned_down_requeue(struct event_context *ev, struct timed_event *te,
struct timeval t, void *private_data)
{
struct pinned_down_requeue_handle *handle = talloc_get_type(private_data, struct pinned_down_requeue_handle);
struct ctdb_context *ctdb = handle->ctdb;
talloc_steal(ctdb, handle->hdr);
ctdb_call_input_pkt(ctdb, handle->hdr);
talloc_free(handle);
}
static int pinned_down_destructor(struct pinned_down_deferred_call *pinned_down)
{
struct ctdb_context *ctdb = pinned_down->ctdb;
struct pinned_down_requeue_handle *handle = talloc(ctdb, struct pinned_down_requeue_handle);
handle->ctdb = pinned_down->ctdb;
handle->hdr = pinned_down->hdr;
talloc_steal(handle, handle->hdr);
event_add_timed(ctdb->ev, handle, timeval_zero(), pinned_down_requeue, handle);
return 0;
}
static int
ctdb_defer_pinned_down_request(struct ctdb_context *ctdb, struct ctdb_db_context *ctdb_db, TDB_DATA key, struct ctdb_req_header *hdr)
{
TALLOC_CTX *tmp_ctx = talloc_new(NULL);
uint32_t *k;
struct ctdb_sticky_record *sr;
struct pinned_down_deferred_call *pinned_down;
k = talloc_zero_size(tmp_ctx, ((key.dsize + 3) & 0xfffffffc) + 4);
if (k == NULL) {
DEBUG(DEBUG_ERR,("Failed to allocate key for sticky record\n"));
talloc_free(tmp_ctx);
return -1;
}
k[0] = (key.dsize + 3) / 4 + 1;
memcpy(&k[1], key.dptr, key.dsize);
sr = trbt_lookuparray32(ctdb_db->sticky_records, k[0], &k[0]);
if (sr == NULL) {
talloc_free(tmp_ctx);
return -1;
}
talloc_free(tmp_ctx);
if (sr->pindown == NULL) {
return -1;
}
pinned_down = talloc(sr->pindown, struct pinned_down_deferred_call);
if (pinned_down == NULL) {
DEBUG(DEBUG_ERR,("Failed to allocate structure for deferred pinned down request\n"));
return -1;
}
pinned_down->ctdb = ctdb;
pinned_down->hdr = hdr;
talloc_set_destructor(pinned_down, pinned_down_destructor);
talloc_steal(pinned_down, hdr);
return 0;
}
/*
called when a CTDB_REQ_CALL packet comes in
@ -492,6 +707,18 @@ void ctdb_request_call(struct ctdb_context *ctdb, struct ctdb_req_header *hdr)
call->reply_data.dptr = NULL;
call->reply_data.dsize = 0;
/* If this record is pinned down we should defer the
request until the pindown times out
*/
if (ctdb_db->sticky) {
if (ctdb_defer_pinned_down_request(ctdb, ctdb_db, call->key, hdr) == 0) {
DEBUG(DEBUG_WARNING,("Defer request for pinned down record in %s\n", ctdb_db->db_name));
return;
}
}
/* determine if we are the dmaster for this key. This also
fetches the record data (if any), thus avoiding a 2nd fetch of the data
if the call will be answered locally */
@ -544,7 +771,7 @@ void ctdb_request_call(struct ctdb_context *ctdb, struct ctdb_req_header *hdr)
if ((header.dmaster != ctdb->pnn)
&& (!(header.flags & CTDB_REC_RO_HAVE_DELEGATIONS)) ) {
talloc_free(data.dptr);
ctdb_call_send_redirect(ctdb, call->key, c, &header);
ctdb_call_send_redirect(ctdb, ctdb_db, call->key, c, &header);
ret = ctdb_ltdb_unlock(ctdb_db, call->key);
if (ret != 0) {
@ -643,6 +870,16 @@ void ctdb_request_call(struct ctdb_context *ctdb, struct ctdb_req_header *hdr)
CTDB_INCREMENT_DB_STAT(ctdb_db, hop_count_bucket[bucket]);
/* If this database supports sticky records, then check if the
hopcount is big. If it is it means the record is hot and we
should make it sticky.
*/
if (ctdb_db->sticky && c->hopcount >= ctdb->tunable.hopcount_make_sticky) {
DEBUG(DEBUG_ERR, ("Hot record in database %s. Hopcount is %d. Make record sticky for %d seconds\n", ctdb_db->db_name, c->hopcount, ctdb->tunable.sticky_duration));
ctdb_make_record_sticky(ctdb, ctdb_db, call->key);
}
/* if this nodes has done enough consecutive calls on the same record
then give them the record
or if the node requested an immediate migration

View File

@ -150,6 +150,17 @@ static int32_t ctdb_control_dispatch(struct ctdb_context *ctdb,
CHECK_CONTROL_DATA_SIZE(0);
return ctdb_control_reload_nodes_file(ctdb, opcode);
case CTDB_CONTROL_SET_DB_STICKY: {
uint32_t db_id;
struct ctdb_db_context *ctdb_db;
CHECK_CONTROL_DATA_SIZE(sizeof(db_id));
db_id = *(uint32_t *)indata.dptr;
ctdb_db = find_ctdb_db(ctdb, db_id);
if (ctdb_db == NULL) return -1;
return ctdb_set_db_sticky(ctdb, ctdb_db);
}
case CTDB_CONTROL_SETVNNMAP:
return ctdb_control_setvnnmap(ctdb, opcode, indata, outdata);

View File

@ -1467,6 +1467,28 @@ int32_t ctdb_control_set_db_priority(struct ctdb_context *ctdb, TDB_DATA indata)
return 0;
}
int ctdb_set_db_sticky(struct ctdb_context *ctdb, struct ctdb_db_context *ctdb_db)
{
DEBUG(DEBUG_NOTICE,("set db sticky %s\n", ctdb_db->db_name));
if (ctdb_db->sticky) {
return 0;
}
if (ctdb_db->persistent) {
DEBUG(DEBUG_ERR,("Trying to set persistent database with sticky property\n"));
return -1;
}
ctdb_db->sticky_records = trbt_create(ctdb_db, 0);
ctdb_db->sticky = true;
return 0;
}
int32_t ctdb_control_get_db_statistics(struct ctdb_context *ctdb,
uint32_t db_id,
TDB_DATA *outdata)

View File

@ -193,6 +193,9 @@ ctdb_control_getdbmap(struct ctdb_context *ctdb, uint32_t opcode, TDB_DATA indat
if (ctdb_db->readonly != 0) {
dbid_map->dbs[i].flags |= CTDB_DB_FLAGS_READONLY;
}
if (ctdb_db->sticky != 0) {
dbid_map->dbs[i].flags |= CTDB_DB_FLAGS_STICKY;
}
}
return 0;

View File

@ -75,7 +75,10 @@ static const struct {
{ "RecoverPDBBySeqNum", 0, offsetof(struct ctdb_tunable, recover_pdb_by_seqnum), false },
{ "DeferredRebalanceOnNodeAdd", 300, offsetof(struct ctdb_tunable, deferred_rebalance_on_node_add) },
{ "FetchCollapse", 1, offsetof(struct ctdb_tunable, fetch_collapse) },
{ "MaxLACount", 20, offsetof(struct ctdb_tunable, max_lacount) }
{ "MaxLACount", 20, offsetof(struct ctdb_tunable, max_lacount) },
{ "HopcountMakeSticky", 50, offsetof(struct ctdb_tunable, hopcount_make_sticky) },
{ "StickyDuration", 600, offsetof(struct ctdb_tunable, sticky_duration) },
{ "StickyPindown", 200, offsetof(struct ctdb_tunable, sticky_pindown) }
};
/*

View File

@ -3937,13 +3937,14 @@ static int control_getdbmap(struct ctdb_context *ctdb, int argc, const char **ar
}
if(options.machinereadable){
printf(":ID:Name:Path:Persistent:Unhealthy:ReadOnly:\n");
printf(":ID:Name:Path:Persistent:Sticky:Unhealthy:ReadOnly:\n");
for(i=0;i<dbmap->num;i++){
const char *path;
const char *name;
const char *health;
bool persistent;
bool readonly;
bool sticky;
ctdb_ctrl_getdbpath(ctdb, TIMELIMIT(), options.pnn,
dbmap->dbs[i].dbid, ctdb, &path);
@ -3953,9 +3954,11 @@ static int control_getdbmap(struct ctdb_context *ctdb, int argc, const char **ar
dbmap->dbs[i].dbid, ctdb, &health);
persistent = dbmap->dbs[i].flags & CTDB_DB_FLAGS_PERSISTENT;
readonly = dbmap->dbs[i].flags & CTDB_DB_FLAGS_READONLY;
printf(":0x%08X:%s:%s:%d:%d:%d:\n",
sticky = dbmap->dbs[i].flags & CTDB_DB_FLAGS_STICKY;
printf(":0x%08X:%s:%s:%d:%d:%d:%d:\n",
dbmap->dbs[i].dbid, name, path,
!!(persistent), !!(health), !!(readonly));
!!(persistent), !!(sticky),
!!(health), !!(readonly));
}
return 0;
}
@ -3967,15 +3970,18 @@ static int control_getdbmap(struct ctdb_context *ctdb, int argc, const char **ar
const char *health;
bool persistent;
bool readonly;
bool sticky;
ctdb_ctrl_getdbpath(ctdb, TIMELIMIT(), options.pnn, dbmap->dbs[i].dbid, ctdb, &path);
ctdb_ctrl_getdbname(ctdb, TIMELIMIT(), options.pnn, dbmap->dbs[i].dbid, ctdb, &name);
ctdb_ctrl_getdbhealth(ctdb, TIMELIMIT(), options.pnn, dbmap->dbs[i].dbid, ctdb, &health);
persistent = dbmap->dbs[i].flags & CTDB_DB_FLAGS_PERSISTENT;
readonly = dbmap->dbs[i].flags & CTDB_DB_FLAGS_READONLY;
printf("dbid:0x%08x name:%s path:%s%s%s%s\n",
sticky = dbmap->dbs[i].flags & CTDB_DB_FLAGS_STICKY;
printf("dbid:0x%08x name:%s path:%s%s%s%s%s\n",
dbmap->dbs[i].dbid, name, path,
persistent?" PERSISTENT":"",
sticky?" STICKY":"",
readonly?" READONLY":"",
health?" UNHEALTHY":"");
}
@ -4010,6 +4016,7 @@ static int control_getdbstatus(struct ctdb_context *ctdb, int argc, const char *
const char *health;
bool persistent;
bool readonly;
bool sticky;
ctdb_ctrl_getdbname(ctdb, TIMELIMIT(), options.pnn, dbmap->dbs[i].dbid, ctdb, &name);
if (strcmp(name, db_name) != 0) {
@ -4020,9 +4027,11 @@ static int control_getdbstatus(struct ctdb_context *ctdb, int argc, const char *
ctdb_ctrl_getdbhealth(ctdb, TIMELIMIT(), options.pnn, dbmap->dbs[i].dbid, ctdb, &health);
persistent = dbmap->dbs[i].flags & CTDB_DB_FLAGS_PERSISTENT;
readonly = dbmap->dbs[i].flags & CTDB_DB_FLAGS_READONLY;
printf("dbid: 0x%08x\nname: %s\npath: %s\nPERSISTENT: %s\nREADONLY: %s\nHEALTH: %s\n",
sticky = dbmap->dbs[i].flags & CTDB_DB_FLAGS_STICKY;
printf("dbid: 0x%08x\nname: %s\npath: %s\nPERSISTENT: %s\nSTICKY: %s\nREADONLY: %s\nHEALTH: %s\n",
dbmap->dbs[i].dbid, name, path,
persistent?"yes":"no",
sticky?"yes":"no",
readonly?"yes":"no",
health?health:"OK");
return 0;
@ -4462,6 +4471,58 @@ static int control_getdbprio(struct ctdb_context *ctdb, int argc, const char **a
return 0;
}
/*
set the sticky records capability for a database
*/
static int control_setdbsticky(struct ctdb_context *ctdb, int argc, const char **argv)
{
TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
uint32_t db_id;
struct ctdb_dbid_map *dbmap=NULL;
int i, ret;
if (argc < 1) {
usage();
}
if (!strncmp(argv[0], "0x", 2)) {
db_id = strtoul(argv[0] + 2, NULL, 0);
} else {
ret = ctdb_ctrl_getdbmap(ctdb, TIMELIMIT(), options.pnn, tmp_ctx, &dbmap);
if (ret != 0) {
DEBUG(DEBUG_ERR, ("Unable to get dbids from node %u\n", options.pnn));
talloc_free(tmp_ctx);
return ret;
}
for(i=0;i<dbmap->num;i++){
const char *name;
ctdb_ctrl_getdbname(ctdb, TIMELIMIT(), options.pnn, dbmap->dbs[i].dbid, tmp_ctx, &name);
if(!strcmp(argv[0], name)){
talloc_free(discard_const(name));
break;
}
talloc_free(discard_const(name));
}
if (i == dbmap->num) {
DEBUG(DEBUG_ERR,("No database with name '%s' found\n", argv[0]));
talloc_free(tmp_ctx);
return -1;
}
db_id = dbmap->dbs[i].dbid;
}
ret = ctdb_ctrl_set_db_sticky(ctdb, options.pnn, db_id);
if (ret != 0) {
DEBUG(DEBUG_ERR,("Unable to set db to support sticky records\n"));
talloc_free(tmp_ctx);
return -1;
}
talloc_free(tmp_ctx);
return 0;
}
/*
set the readonly capability for a database
*/
@ -5541,7 +5602,8 @@ static const struct {
{ "setrecmasterrole", control_setrecmasterrole, false, false, "Set RECMASTER role to on/off", "{on|off}"},
{ "setdbprio", control_setdbprio, false, false, "Set DB priority", "<dbid> <prio:1-3>"},
{ "getdbprio", control_getdbprio, false, false, "Get DB priority", "<dbid>"},
{ "setdbreadonly", control_setdbreadonly, false, false, "Set DB readonly capable", "<dbid>"},
{ "setdbreadonly", control_setdbreadonly, false, false, "Set DB readonly capable", "<dbid>|<name>"},
{ "setdbsticky", control_setdbsticky, false, false, "Set DB sticky-records capable", "<dbid>|<name>"},
{ "msglisten", control_msglisten, false, false, "Listen on a srvid port for messages", "<msg srvid>"},
{ "msgsend", control_msgsend, false, false, "Send a message to srvid", "<srvid> <message>"},
{ "sync", control_ipreallocate, false, false, "wait until ctdbd has synced all state changes" },