cluster/dht: Refactor rebalance code

Created init and cleanup functions for certain
functionality in order to improve readability.
Removed unused code.

Change-Id: Ia6a2f4ab64923b6ea8e10487227fb5621eec1488
updates: bz#1586363
Signed-off-by: N Balachandran <nbalacha@redhat.com>
This commit is contained in:
N Balachandran 2018-06-06 12:57:50 +05:30
parent 9647f0c64b
commit 5702ff3012

View File

@ -4369,6 +4369,71 @@ gf_tier_wait_fix_lookup (gf_defrag_info_t *defrag) {
}
/******************Tier background Fix layout functions END********************/
int
dht_init_local_subvols_and_nodeuuids (xlator_t *this, dht_conf_t *conf,
loc_t *loc)
{
dict_t *dict = NULL;
gf_defrag_info_t *defrag = NULL;
uuid_t *uuid_ptr = NULL;
int ret = -1;
int i = 0;
int j = 0;
defrag = conf->defrag;
if (defrag->cmd != GF_DEFRAG_CMD_START_TIER) {
/* Find local subvolumes */
ret = syncop_getxattr (this, loc, &dict,
GF_REBAL_FIND_LOCAL_SUBVOL,
NULL, NULL);
if (ret && (ret != -ENODATA)) {
gf_msg (this->name, GF_LOG_ERROR, -ret, 0, "local "
"subvolume determination failed with error: %d",
-ret);
ret = -1;
goto out;
}
if (!ret)
goto out;
}
ret = syncop_getxattr (this, loc, &dict,
GF_REBAL_OLD_FIND_LOCAL_SUBVOL,
NULL, NULL);
if (ret) {
gf_msg (this->name, GF_LOG_ERROR, -ret, 0, "local "
"subvolume determination failed with error: %d",
-ret);
ret = -1;
goto out;
}
ret = 0;
out:
if (ret) {
return ret;
}
for (i = 0 ; i < conf->local_subvols_cnt; i++) {
gf_msg (this->name, GF_LOG_INFO, 0, 0, "local subvol: "
"%s", conf->local_subvols[i]->name);
for (j = 0; j < conf->local_nodeuuids[i].count; j++) {
uuid_ptr = &(conf->local_nodeuuids[i].elements[j].uuid);
gf_msg (this->name, GF_LOG_INFO, 0, 0,
"node uuid : %s",
uuid_utoa(*uuid_ptr));
}
}
return ret;
}
/* Functions for the rebalance estimates feature */
uint64_t
gf_defrag_subvol_file_size (xlator_t *this, loc_t *root_loc)
@ -4387,24 +4452,6 @@ gf_defrag_subvol_file_size (xlator_t *this, loc_t *root_loc)
return ((buf.f_blocks - buf.f_bfree) * buf.f_frsize);
}
uint64_t
gf_defrag_subvol_file_cnt (xlator_t *this, loc_t *root_loc)
{
int ret = -1;
struct statvfs buf = {0,};
if (!this)
return 0;
ret = syncop_statfs (this, root_loc, &buf, NULL, NULL);
if (ret) {
/* Aargh! */
return 0;
}
return (buf.f_files - buf.f_ffree);
}
uint64_t
gf_defrag_total_file_size (xlator_t *this, loc_t *root_loc)
{
@ -4420,7 +4467,7 @@ gf_defrag_total_file_size (xlator_t *this, loc_t *root_loc)
for (i = 0 ; i < conf->local_subvols_cnt; i++) {
size_files = gf_defrag_subvol_file_size (conf->local_subvols[i],
root_loc);
root_loc);
total_size += size_files;
gf_msg (this->name, GF_LOG_INFO, 0, 0, "local subvol: %s,"
"cnt = %"PRIu64, conf->local_subvols[i]->name,
@ -4434,88 +4481,6 @@ gf_defrag_total_file_size (xlator_t *this, loc_t *root_loc)
}
uint64_t
gf_defrag_total_file_cnt (xlator_t *this, loc_t *root_loc)
{
dht_conf_t *conf = NULL;
int i = 0;
uint64_t num_files = 0;
uint64_t total_entries = 0;
conf = this->private;
if (!conf) {
return 0;
}
for (i = 0 ; i < conf->local_subvols_cnt; i++) {
num_files = gf_defrag_subvol_file_cnt (conf->local_subvols[i],
root_loc);
total_entries += num_files;
gf_msg (this->name, GF_LOG_INFO, 0, 0, "local subvol: %s,"
"cnt = %"PRIu64, conf->local_subvols[i]->name,
num_files);
}
/* FIXFIXFIX: halve the number of files to negate .glusterfs contents
We need a better way to figure this out */
total_entries = total_entries/2;
if (total_entries > 20000)
total_entries += 10000;
gf_msg (this->name, GF_LOG_INFO, 0, 0,
"Total number of files = %"PRIu64, total_entries);
return total_entries;
}
int
dht_get_local_subvols_and_nodeuuids (xlator_t *this, dht_conf_t *conf,
loc_t *loc)
{
dict_t *dict = NULL;
gf_defrag_info_t *defrag = NULL;
int ret = -1;
defrag = conf->defrag;
if (defrag->cmd != GF_DEFRAG_CMD_START_TIER) {
/* Find local subvolumes */
ret = syncop_getxattr (this, loc, &dict,
GF_REBAL_FIND_LOCAL_SUBVOL,
NULL, NULL);
if (ret && (ret != -ENODATA)) {
gf_msg (this->name, GF_LOG_ERROR, -ret, 0, "local "
"subvolume determination failed with error: %d",
-ret);
ret = -1;
goto out;
}
if (!ret)
goto out;
}
ret = syncop_getxattr (this, loc, &dict,
GF_REBAL_OLD_FIND_LOCAL_SUBVOL,
NULL, NULL);
if (ret) {
gf_msg (this->name, GF_LOG_ERROR, -ret, 0, "local "
"subvolume determination failed with error: %d",
-ret);
ret = -1;
goto out;
}
ret = 0;
out:
return ret;
}
static void*
dht_file_counter_thread (void *args)
{
@ -4572,6 +4537,176 @@ dht_file_counter_thread (void *args)
return NULL;
}
int
gf_defrag_estimates_cleanup (xlator_t *this, gf_defrag_info_t *defrag,
pthread_t filecnt_thread)
{
int ret = -1;
/* Wake up the filecounter thread.
* By now the defrag status will no longer be
* GF_DEFRAG_STATUS_STARTED so the thread will exit the loop.
*/
pthread_mutex_lock (&defrag->fc_mutex);
{
pthread_cond_broadcast (&defrag->fc_wakeup_cond);
}
pthread_mutex_unlock (&defrag->fc_mutex);
ret = pthread_join (filecnt_thread, NULL);
if (ret) {
gf_msg ("dht", GF_LOG_ERROR, ret, 0,
"file_counter_thread: pthread_join failed.");
ret = -1;
}
return ret;
}
int
gf_defrag_estimates_init (xlator_t *this, loc_t *loc,
pthread_t *filecnt_thread)
{
int ret = -1;
dht_conf_t *conf = NULL;
gf_defrag_info_t *defrag = NULL;
conf = this->private;
defrag = conf->defrag;
g_totalsize = gf_defrag_total_file_size (this, loc);
if (!g_totalsize) {
gf_msg (this->name, GF_LOG_ERROR, 0, 0, "Failed to get "
"the total data size. Unable to estimate "
"time to complete rebalance.");
goto out;
}
ret = gf_thread_create (filecnt_thread, NULL,
&dht_file_counter_thread,
(void *)defrag, "dhtfcnt");
if (ret) {
gf_msg (this->name, GF_LOG_ERROR, ret, 0, "Failed to "
"create the file counter thread ");
ret = -1;
}
ret = 0;
out:
return ret;
}
/* Init and cleanup functions for parallel file migration*/
int
gf_defrag_parallel_migration_init (xlator_t *this, gf_defrag_info_t *defrag,
pthread_t **tid_array, int *thread_index)
{
int ret = -1;
int thread_spawn_count = 0;
int index = 0;
pthread_t *tid = NULL;
char thread_name[GF_THREAD_NAMEMAX] = {0,};
if (!defrag)
goto out;
/* Initialize global entry queue */
defrag->queue = GF_CALLOC (1, sizeof (struct dht_container),
gf_dht_mt_container_t);
if (!defrag->queue) {
gf_msg (this->name, GF_LOG_ERROR, ENOMEM, 0,
"Failed to initialise migration queue");
ret = -1;
goto out;
}
INIT_LIST_HEAD (&(defrag->queue[0].list));
thread_spawn_count = MAX (MAX_REBAL_THREADS, 4);
gf_msg_debug (this->name, 0, "thread_spawn_count: %d",
thread_spawn_count);
tid = GF_CALLOC (thread_spawn_count, sizeof (pthread_t),
gf_common_mt_pthread_t);
if (!tid) {
gf_msg (this->name, GF_LOG_ERROR, ENOMEM, 0,
"Failed to create migration threads");
ret = -1;
goto out;
}
defrag->current_thread_count = thread_spawn_count;
/*Spawn Threads Here*/
while (index < thread_spawn_count) {
snprintf (thread_name, sizeof(thread_name),
"%s%d", "dhtmig", index + 1);
ret = gf_thread_create (&(tid[index]), NULL,
&gf_defrag_task, (void *)defrag,
thread_name);
if (ret != 0) {
gf_msg ("DHT", GF_LOG_ERROR, ret, 0,
"Thread[%d] creation failed. ",
index);
ret = -1;
goto out;
} else {
gf_log ("DHT", GF_LOG_INFO, "Thread[%d] "
"creation successful", index);
}
index++;
}
ret = 0;
out:
*thread_index = index;
*tid_array = tid;
return ret;
}
int
gf_defrag_parallel_migration_cleanup (gf_defrag_info_t *defrag,
pthread_t *tid_array, int thread_index)
{
int ret = -1;
int i = 0;
if (!defrag)
goto out;
/* Wake up all migration threads */
pthread_mutex_lock (&defrag->dfq_mutex);
{
defrag->crawl_done = 1;
pthread_cond_broadcast (&defrag->parallel_migration_cond);
pthread_cond_broadcast (&defrag->df_wakeup_thread);
}
pthread_mutex_unlock (&defrag->dfq_mutex);
/*Wait for all the threads to complete their task*/
for (i = 0; i < thread_index; i++) {
pthread_join (tid_array[i], NULL);
}
GF_FREE (tid_array);
/* Cleanup the migration queue */
if (defrag->queue) {
gf_dirent_free (defrag->queue[0].df_entry);
INIT_LIST_HEAD (&(defrag->queue[0].list));
}
GF_FREE (defrag->queue);
ret = 0;
out:
return ret;
}
int
@ -4580,28 +4715,22 @@ gf_defrag_start_crawl (void *data)
xlator_t *this = NULL;
dht_conf_t *conf = NULL;
gf_defrag_info_t *defrag = NULL;
int ret = -1;
loc_t loc = {0,};
struct iatt iatt = {0,};
struct iatt parent = {0,};
dict_t *fix_layout = NULL;
dict_t *migrate_data = NULL;
dict_t *status = NULL;
glusterfs_ctx_t *ctx = NULL;
dht_methods_t *methods = NULL;
int i = 0;
int thread_index = 0;
int err = 0;
int thread_spawn_count = 0;
pthread_t *tid = NULL;
char thread_name[GF_THREAD_NAMEMAX] = {0,};
pthread_t filecnt_thread;
gf_boolean_t is_tier_detach = _gf_false;
call_frame_t *statfs_frame = NULL;
xlator_t *old_THIS = NULL;
int j = 0;
int ret = -1;
loc_t loc = {0,};
struct iatt iatt = {0,};
struct iatt parent = {0,};
int thread_index = 0;
pthread_t *tid = NULL;
pthread_t filecnt_thread;
gf_boolean_t is_tier_detach = _gf_false;
gf_boolean_t fc_thread_started = _gf_false;
uuid_t *uuid_ptr = NULL;
this = data;
if (!this)
@ -4717,6 +4846,8 @@ gf_defrag_start_crawl (void *data)
}
if (defrag->cmd != GF_DEFRAG_CMD_START_LAYOUT_FIX) {
/* We need to migrate files */
migrate_data = dict_new ();
if (!migrate_data) {
defrag->total_failures++;
@ -4732,102 +4863,32 @@ gf_defrag_start_crawl (void *data)
goto out;
}
ret = dht_get_local_subvols_and_nodeuuids (this, conf, &loc);
ret = dht_init_local_subvols_and_nodeuuids (this, conf, &loc);
if (ret) {
ret = -1;
goto out;
}
for (i = 0 ; i < conf->local_subvols_cnt; i++) {
gf_msg (this->name, GF_LOG_INFO, 0, 0, "local subvols "
"are %s", conf->local_subvols[i]->name);
for (j = 0; j < conf->local_nodeuuids[i].count; j++) {
uuid_ptr = &(conf->local_nodeuuids[i].elements[j].uuid);
gf_msg (this->name, GF_LOG_INFO, 0, 0,
"node uuids are %s",
uuid_utoa(*uuid_ptr));
}
}
g_totalsize = gf_defrag_total_file_size (this, &loc);
if (!g_totalsize) {
gf_msg (this->name, GF_LOG_ERROR, 0, 0, "Failed to get "
"the total data size. Unable to estimate "
"time to complete rebalance.");
}
g_totalfiles = gf_defrag_total_file_cnt (this, &loc);
if (!g_totalfiles) {
gf_msg (this->name, GF_LOG_ERROR, 0, 0, "Failed to get "
"the total number of files. Unable to estimate "
"time to complete rebalance.");
}
ret = gf_thread_create (&filecnt_thread, NULL,
&dht_file_counter_thread,
(void *)defrag, "dhtfcnt");
/* Initialise the structures required for parallel migration */
ret = gf_defrag_parallel_migration_init (this, defrag, &tid,
&thread_index);
if (ret) {
gf_msg (this->name, GF_LOG_ERROR, ret, 0, "Failed to "
"create the file counter thread ");
gf_msg (this->name, GF_LOG_ERROR, 0, 0,
"Aborting rebalance.");
goto out;
}
ret = gf_defrag_estimates_init (this, &loc, &filecnt_thread);
if (ret) {
/* Not a fatal error. Allow the rebalance to proceed*/
ret = 0;
} else {
fc_thread_started = _gf_true;
}
/* Initialize global entry queue */
defrag->queue = GF_CALLOC (1, sizeof (struct dht_container),
gf_dht_mt_container_t);
if (!defrag->queue) {
gf_log (this->name, GF_LOG_ERROR, "No memory for "
"queue");
ret = -1;
goto out;
}
INIT_LIST_HEAD (&(defrag->queue[0].list));
thread_spawn_count = MAX (MAX_REBAL_THREADS, 4);
gf_msg_debug (this->name, 0, "thread_spawn_count: %d",
thread_spawn_count);
tid = GF_CALLOC (thread_spawn_count, sizeof (pthread_t),
gf_common_mt_pthread_t);
if (!tid) {
gf_log (this->name, GF_LOG_ERROR, "Insufficient memory "
"for tid");
ret = -1;
goto out;
}
defrag->current_thread_count = thread_spawn_count;
/*Spawn Threads Here*/
while (thread_index < thread_spawn_count) {
snprintf (thread_name, sizeof(thread_name),
"%s%d", "dhtdf", thread_index + 1);
err = gf_thread_create (&(tid[thread_index]), NULL,
&gf_defrag_task, (void *)defrag,
thread_name);
if (err != 0) {
gf_log ("DHT", GF_LOG_ERROR,
"Thread[%d] creation failed. "
"Aborting Rebalance",
thread_index);
ret = -1;
goto out;
} else {
gf_log ("DHT", GF_LOG_INFO, "Thread[%d] "
"creation successful", thread_index);
}
thread_index++;
}
}
if (defrag->cmd == GF_DEFRAG_CMD_START_TIER) {
/* Fix layout for attach tier */
ret = gf_tier_start_fix_layout (this, &loc, defrag, fix_layout);
@ -4882,23 +4943,6 @@ out:
defrag->defrag_status = GF_DEFRAG_STATUS_FAILED;
}
pthread_mutex_lock (&defrag->dfq_mutex);
{
defrag->crawl_done = 1;
pthread_cond_broadcast (
&defrag->parallel_migration_cond);
pthread_cond_broadcast (
&defrag->df_wakeup_thread);
}
pthread_mutex_unlock (&defrag->dfq_mutex);
/*Wait for all the threads to complete their task*/
for (i = 0; i < thread_index; i++) {
pthread_join (tid[i], NULL);
}
GF_FREE (tid);
if (defrag->cmd == GF_DEFRAG_CMD_START_TIER) {
/* Wait for the tier fixlayout to
@ -4913,10 +4957,7 @@ out:
gf_tier_clear_fix_layout (this, &loc, defrag);
}
if (defrag->queue) {
gf_dirent_free (defrag->queue[0].df_entry);
INIT_LIST_HEAD (&(defrag->queue[0].list));
}
gf_defrag_parallel_migration_cleanup (defrag, tid, thread_index);
if ((defrag->defrag_status != GF_DEFRAG_STATUS_STOPPED) &&
(defrag->defrag_status != GF_DEFRAG_STATUS_FAILED)) {
@ -4924,13 +4965,7 @@ out:
}
if (fc_thread_started) {
pthread_mutex_lock (&defrag->fc_mutex);
{
pthread_cond_broadcast (&defrag->fc_wakeup_cond);
}
pthread_mutex_unlock (&defrag->fc_mutex);
pthread_join (filecnt_thread, NULL);
gf_defrag_estimates_cleanup (this, defrag, filecnt_thread);
}
dht_send_rebalance_event (this, defrag->cmd, defrag->defrag_status);
@ -4947,7 +4982,6 @@ out:
}
UNLOCK (&defrag->lock);
GF_FREE (defrag->queue);
GF_FREE (defrag);
conf->defrag = NULL;
@ -5075,84 +5109,6 @@ out:
}
uint64_t
gf_defrag_get_estimates (dht_conf_t *conf)
{
gf_defrag_info_t *defrag = NULL;
loc_t loc = {0,};
double rate_lookedup = 0;
uint64_t dirs_processed = 0;
uint64_t files_processed = 0;
uint64_t total_processed = 0;
uint64_t tmp_count = 0;
uint64_t time_to_complete = 0;
struct timeval now = {0,};
double elapsed = 0;
defrag = conf->defrag;
if (!g_totalfiles)
goto out;
gettimeofday (&now, NULL);
elapsed = now.tv_sec - defrag->start_time.tv_sec;
/* I tried locking before accessing num_files_lookedup and
* num_dirs_processed but the status function
* never seemed to get the lock, causing the status cli to
* hang.
*/
dirs_processed = defrag->num_dirs_processed;
files_processed = defrag->num_files_lookedup;
total_processed = files_processed + dirs_processed;
if (total_processed > g_totalfiles) {
/* lookup the number of files again
* The problem here is that not all the newly added files
* might need to be processed. So this need not work
* in some cases
*/
dht_build_root_loc (defrag->root_inode, &loc);
g_totalfiles = gf_defrag_total_file_cnt (defrag->this, &loc);
if (!g_totalfiles)
goto out;
}
/* rate at which files looked up */
rate_lookedup = (total_processed)/elapsed;
/* We initially sum up dirs across all local subvols because we get the
* file count from the inodes on each subvol.
* The same directories will be counted for each subvol but
* we want them to be counted once.
*/
tmp_count = g_totalfiles
- (dirs_processed * (conf->local_subvols_cnt - 1));
if (rate_lookedup) {
time_to_complete = (tmp_count)/rate_lookedup;
} else {
gf_msg (THIS->name, GF_LOG_ERROR, 0, 0,
"Unable to calculate estimated time for rebalance");
}
gf_log (THIS->name, GF_LOG_INFO,
"TIME: (count) total_processed=%"PRIu64" tmp_cnt = %"PRIu64","
"rate_lookedup=%f", total_processed, tmp_count,
rate_lookedup);
out:
return time_to_complete;
}
int
gf_defrag_status_get (dht_conf_t *conf, dict_t *dict)
{
@ -5196,18 +5152,6 @@ gf_defrag_status_get (dht_conf_t *conf, dict_t *dict)
if ((defrag->cmd != GF_DEFRAG_CMD_START_TIER)
&& (defrag->defrag_status == GF_DEFRAG_STATUS_STARTED)) {
/*
time_to_complete = gf_defrag_get_estimates (conf);
if (time_to_complete && (time_to_complete > elapsed))
time_left = time_to_complete - elapsed;
gf_log (THIS->name, GF_LOG_INFO,
"TIME: Estimated total time to complete based on"
" count = %"PRIu64 " seconds, seconds left = %"PRIu64"",
time_to_complete, time_left);
*/
time_to_complete = gf_defrag_get_estimates_based_on_size (conf);
if (time_to_complete && (time_to_complete > elapsed))