features/bitrot: cleanup, v2

This patch uses "cleanup, v1" infrastrcuture to cleanup scrubber
(data structures, threads, timers, etc..) on brick disconnection.
Signer is not cleaned up yet: probably would be done as part of
another patch.

Change-Id: I78a92b8a7f02b2f39078aa9a5a6b101fc499fd70
BUG: 1231619
Signed-off-by: Venky Shankar <vshankar@redhat.com>
Reviewed-on: http://review.gluster.org/11148
Tested-by: NetBSD Build System <jenkins@build.gluster.org>
Tested-by: Gluster Build System <jenkins@build.gluster.com>
Reviewed-by: Raghavendra Bhat <raghavendra@redhat.com>
This commit is contained in:
Venky Shankar
2015-06-04 08:50:48 +05:30
committed by Raghavendra Bhat
parent 367049879e
commit 6ab37f0cb4
8 changed files with 214 additions and 67 deletions

View File

@ -218,14 +218,20 @@ void gf_tw_add_timer (struct tvec_base *base, struct gf_tw_timer_list *timer)
/** /**
* Remove a timer from the timer wheel * Remove a timer from the timer wheel
*/ */
void gf_tw_del_timer (struct tvec_base *base, struct gf_tw_timer_list *timer) int gf_tw_del_timer (struct tvec_base *base, struct gf_tw_timer_list *timer)
{ {
int ret = 0;
pthread_spin_lock (&base->lock); pthread_spin_lock (&base->lock);
{ {
if (timer_pending (timer)) if (timer_pending (timer)) {
ret = 1;
__gf_tw_detach_timer (timer); __gf_tw_detach_timer (timer);
}
} }
pthread_spin_unlock (&base->lock); pthread_spin_unlock (&base->lock);
return ret;
} }
int gf_tw_mod_timer_pending (struct tvec_base *base, int gf_tw_mod_timer_pending (struct tvec_base *base,

View File

@ -66,7 +66,7 @@ struct gf_tw_timer_list {
struct tvec_base *gf_tw_init_timers (); struct tvec_base *gf_tw_init_timers ();
int gf_tw_cleanup_timers (struct tvec_base *); int gf_tw_cleanup_timers (struct tvec_base *);
void gf_tw_add_timer (struct tvec_base *, struct gf_tw_timer_list *); void gf_tw_add_timer (struct tvec_base *, struct gf_tw_timer_list *);
void gf_tw_del_timer (struct tvec_base *, struct gf_tw_timer_list *); int gf_tw_del_timer (struct tvec_base *, struct gf_tw_timer_list *);
int gf_tw_mod_timer_pending (struct tvec_base *, int gf_tw_mod_timer_pending (struct tvec_base *,
struct gf_tw_timer_list *, unsigned long); struct gf_tw_timer_list *, unsigned long);

View File

@ -4019,3 +4019,14 @@ out:
return ret; return ret;
} }
void
_mask_cancellation (void)
{
(void) pthread_setcancelstate (PTHREAD_CANCEL_DISABLE, NULL);
}
void
_unmask_cancellation (void)
{
(void) pthread_setcancelstate (PTHREAD_CANCEL_ENABLE, NULL);
}

View File

@ -760,4 +760,7 @@ gf_nread (int fd, void *buf, size_t count);
ssize_t ssize_t
gf_nwrite (int fd, const void *buf, size_t count); gf_nwrite (int fd, const void *buf, size_t count);
void _mask_cancellation (void);
void _unmask_cancellation (void);
#endif /* _COMMON_UTILS_H */ #endif /* _COMMON_UTILS_H */

View File

@ -387,6 +387,14 @@ br_scrubber_scrub_begin (xlator_t *this, struct br_fsscan_entry *fsentry)
return ret; return ret;
} }
static void
_br_lock_cleaner (void *arg)
{
pthread_mutex_t *mutex = arg;
pthread_mutex_unlock (mutex);
}
static void static void
wait_for_scrubbing (xlator_t *this, struct br_scanfs *fsscan) wait_for_scrubbing (xlator_t *this, struct br_scanfs *fsscan)
{ {
@ -396,8 +404,10 @@ wait_for_scrubbing (xlator_t *this, struct br_scanfs *fsscan)
priv = this->private; priv = this->private;
fsscrub = &priv->fsscrub; fsscrub = &priv->fsscrub;
pthread_cleanup_push (_br_lock_cleaner, &fsscan->waitlock);
pthread_mutex_lock (&fsscan->waitlock); pthread_mutex_lock (&fsscan->waitlock);
{ {
pthread_cleanup_push (_br_lock_cleaner, &fsscrub->mutex);
pthread_mutex_lock (&fsscrub->mutex); pthread_mutex_lock (&fsscrub->mutex);
{ {
list_replace_init (&fsscan->queued, &fsscan->ready); list_replace_init (&fsscan->queued, &fsscan->ready);
@ -406,12 +416,14 @@ wait_for_scrubbing (xlator_t *this, struct br_scanfs *fsscan)
pthread_cond_broadcast (&fsscrub->cond); pthread_cond_broadcast (&fsscrub->cond);
} }
pthread_mutex_unlock (&fsscrub->mutex); pthread_mutex_unlock (&fsscrub->mutex);
pthread_cleanup_pop (0);
while (fsscan->entries != 0) while (fsscan->entries != 0)
pthread_cond_wait pthread_cond_wait
(&fsscan->waitcond, &fsscan->waitlock); (&fsscan->waitcond, &fsscan->waitlock);
} }
pthread_mutex_unlock (&fsscan->waitlock); pthread_mutex_unlock (&fsscan->waitlock);
pthread_cleanup_pop (0);
} }
static inline void static inline void
@ -460,6 +472,8 @@ br_fsscanner_handle_entry (xlator_t *subvol,
this = child->this; this = child->this;
fsscan = &child->fsscan; fsscan = &child->fsscan;
_mask_cancellation ();
fsentry = GF_CALLOC (1, sizeof (*fsentry), gf_br_mt_br_fsscan_entry_t); fsentry = GF_CALLOC (1, sizeof (*fsentry), gf_br_mt_br_fsscan_entry_t);
if (!fsentry) if (!fsentry)
goto error_return; goto error_return;
@ -494,6 +508,8 @@ br_fsscanner_handle_entry (xlator_t *subvol,
} }
UNLOCK (&fsscan->entrylock); UNLOCK (&fsscan->entrylock);
_unmask_cancellation ();
if (scrub) if (scrub)
wait_for_scrubbing (this, fsscan); wait_for_scrubbing (this, fsscan);
@ -530,6 +546,7 @@ br_fsscanner_log_time (xlator_t *this, br_child_t *child, const char *sfx)
static void static void
br_fsscanner_wait_until_kicked (struct br_scanfs *fsscan) br_fsscanner_wait_until_kicked (struct br_scanfs *fsscan)
{ {
pthread_cleanup_push (_br_lock_cleaner, &fsscan->wakelock);
pthread_mutex_lock (&fsscan->wakelock); pthread_mutex_lock (&fsscan->wakelock);
{ {
while (!fsscan->kick) while (!fsscan->kick)
@ -538,6 +555,7 @@ br_fsscanner_wait_until_kicked (struct br_scanfs *fsscan)
fsscan->kick = _gf_false; fsscan->kick = _gf_false;
} }
pthread_mutex_unlock (&fsscan->wakelock); pthread_mutex_unlock (&fsscan->wakelock);
pthread_cleanup_pop (0);
} }
void * void *
@ -773,13 +791,6 @@ br_scrubber_calc_scale (xlator_t *this,
} }
static void
br_scrubber_cleanup_handler (void *arg)
{
struct br_scrubber *fsscrub = arg;
pthread_mutex_unlock (&fsscrub->mutex);
}
static inline br_child_t * static inline br_child_t *
_br_scrubber_get_next_child (struct br_scrubber *fsscrub) _br_scrubber_get_next_child (struct br_scrubber *fsscrub)
{ {
@ -839,7 +850,7 @@ static void
br_scrubber_pick_entry (struct br_scrubber *fsscrub, br_scrubber_pick_entry (struct br_scrubber *fsscrub,
struct br_fsscan_entry **fsentry) struct br_fsscan_entry **fsentry)
{ {
pthread_cleanup_push (br_scrubber_cleanup_handler, fsscrub); pthread_cleanup_push (_br_lock_cleaner, &fsscrub->mutex);
pthread_mutex_lock (&fsscrub->mutex); pthread_mutex_lock (&fsscrub->mutex);
{ {

View File

@ -1098,7 +1098,7 @@ br_set_child_state (br_child_t *child, br_child_state_t state)
* Also, we register to the changelog library to subscribe for event * Also, we register to the changelog library to subscribe for event
* notifications. * notifications.
*/ */
static inline int32_t static int32_t
br_enact_signer (xlator_t *this, br_child_t *child, br_stub_init_t *stub) br_enact_signer (xlator_t *this, br_child_t *child, br_stub_init_t *stub)
{ {
int32_t ret = 0; int32_t ret = 0;
@ -1140,32 +1140,16 @@ br_enact_signer (xlator_t *this, br_child_t *child, br_stub_init_t *stub)
return -1; return -1;
} }
static inline int32_t static int32_t
br_enact_scrubber (xlator_t *this, br_child_t *child) br_launch_scrubber (xlator_t *this, br_child_t *child,
struct br_scanfs *fsscan, struct br_scrubber *fsscrub)
{ {
int32_t ret = 0; int32_t ret = -1;
br_private_t *priv = NULL; br_private_t *priv = NULL;
struct br_scanfs *fsscan = NULL;
struct br_scrubber *fsscrub = NULL;
priv = this->private; priv = this->private;
fsscan = &child->fsscan;
fsscrub = &priv->fsscrub;
LOCK_INIT (&fsscan->entrylock);
pthread_mutex_init (&fsscan->waitlock, NULL);
pthread_cond_init (&fsscan->waitcond, NULL);
fsscan->entries = 0;
INIT_LIST_HEAD (&fsscan->queued);
INIT_LIST_HEAD (&fsscan->ready);
/* init scheduler related variables */
fsscan->kick = _gf_false; fsscan->kick = _gf_false;
pthread_mutex_init (&fsscan->wakelock, NULL);
pthread_cond_init (&fsscan->wakecond, NULL);
ret = gf_thread_create (&child->thread, NULL, br_fsscanner, child); ret = gf_thread_create (&child->thread, NULL, br_fsscanner, child);
if (ret != 0) { if (ret != 0) {
gf_msg (this->name, GF_LOG_ALERT, 0, BRB_MSG_SPAWN_FAILED, gf_msg (this->name, GF_LOG_ALERT, 0, BRB_MSG_SPAWN_FAILED,
@ -1181,7 +1165,7 @@ br_enact_scrubber (xlator_t *this, br_child_t *child)
} }
pthread_mutex_unlock (&priv->lock); pthread_mutex_unlock (&priv->lock);
if (ret) if (ret)
goto error_return; goto cleanup_thread;
/** /**
* Everything has been setup.. add this subvolume to scrubbers * Everything has been setup.. add this subvolume to scrubbers
@ -1196,6 +1180,50 @@ br_enact_scrubber (xlator_t *this, br_child_t *child)
return 0; return 0;
cleanup_thread:
(void) gf_thread_cleanup_xint (child->thread);
error_return:
return -1;
}
static int32_t
br_enact_scrubber (xlator_t *this, br_child_t *child)
{
int32_t ret = 0;
br_private_t *priv = NULL;
struct br_scanfs *fsscan = NULL;
struct br_scrubber *fsscrub = NULL;
priv = this->private;
fsscan = &child->fsscan;
fsscrub = &priv->fsscrub;
/**
* if this child already witnesses a successfull connection earlier
* there's no need to initialize mutexes, condvars, etc..
*/
if (_br_child_witnessed_connection (child))
return br_launch_scrubber (this, child, fsscan, fsscrub);
LOCK_INIT (&fsscan->entrylock);
pthread_mutex_init (&fsscan->waitlock, NULL);
pthread_cond_init (&fsscan->waitcond, NULL);
fsscan->entries = 0;
INIT_LIST_HEAD (&fsscan->queued);
INIT_LIST_HEAD (&fsscan->ready);
/* init scheduler related variables */
pthread_mutex_init (&fsscan->wakelock, NULL);
pthread_cond_init (&fsscan->wakecond, NULL);
ret = br_launch_scrubber (this, child, fsscan, fsscrub);
if (ret)
goto error_return;
return 0;
error_return: error_return:
LOCK_DESTROY (&fsscan->entrylock); LOCK_DESTROY (&fsscan->entrylock);
pthread_mutex_destroy (&fsscan->waitlock); pthread_mutex_destroy (&fsscan->waitlock);
@ -1218,6 +1246,7 @@ br_child_enaction (xlator_t *this, br_child_t *child, br_stub_init_t *stub)
ret = br_enact_signer (this, child, stub); ret = br_enact_signer (this, child, stub);
if (!ret) { if (!ret) {
child->witnessed = 1;
_br_set_child_state (child, BR_CHILD_STATE_CONNECTED); _br_set_child_state (child, BR_CHILD_STATE_CONNECTED);
gf_log (this->name, GF_LOG_INFO, gf_log (this->name, GF_LOG_INFO,
"Connected to brick %s..", child->brick_path); "Connected to brick %s..", child->brick_path);
@ -1299,6 +1328,100 @@ br_brick_connect (xlator_t *this, br_child_t *child)
return ret; return ret;
} }
/* TODO: cleanup signer */
static int32_t
br_cleanup_signer (xlator_t *this, br_child_t *child)
{
return 0;
}
static int32_t
br_cleanup_scrubber (xlator_t *this, br_child_t *child)
{
int32_t ret = 0;
br_private_t *priv = NULL;
struct br_scanfs *fsscan = NULL;
struct br_scrubber *fsscrub = NULL;
priv = this->private;
fsscan = &child->fsscan;
fsscrub = &priv->fsscrub;
/**
* 0x0: child (brick) goes out of rotation
*
* This is fully safe w.r.t. entries for this child being actively
* scrubbed. Each of the scrubber thread(s) would finish scrubbing
* the entry (probably failing due to disconnection) and either
* putting the entry back into the queue or continuing further.
* Either way, pending entries for this child's queue need not be
* drained; entries just sit there in the queued/ready list to be
* consumed later upon re-connection.
*/
pthread_mutex_lock (&fsscrub->mutex);
{
list_del_init (&child->list);
}
pthread_mutex_unlock (&fsscrub->mutex);
/**
* 0x1: cleanup scanner thread
*
* The pending timer needs to be removed _after_ cleaning up the
* filesystem scanner (scheduling the next scrub time is not a
* cancellation point).
*/
ret = gf_thread_cleanup_xint (child->thread);
if (ret)
gf_log (this->name, GF_LOG_ERROR,
"Error cleaning up scanner thread");
/**
* 0x2: free()up resources
*/
if (fsscan->timer) {
(void) gf_tw_del_timer (priv->timer_wheel, fsscan->timer);
GF_FREE (fsscan->timer);
fsscan->timer = NULL;
}
gf_log (this->name, GF_LOG_INFO,
"Cleaned up scrubber for brick [%s]", child->brick_path);
return 0;
}
/**
* OK.. this child has made it's mind to go down the drain. So,
* let's clean up what it touched. (NOTE: there's no need to clean
* the inode table, it's just reused taking care of stale inodes)
*/
int32_t
br_brick_disconnect (xlator_t *this, br_child_t *child)
{
int32_t ret = 0;
br_private_t *priv = this->private;
LOCK (&child->lock);
{
if (!_br_is_child_connected (child))
goto unblock;
/* child is on death row.. */
_br_set_child_state (child, BR_CHILD_STATE_DISCONNECTED);
if (priv->iamscrubber)
ret = br_cleanup_scrubber (this, child);
else
ret = br_cleanup_signer (this, child);
}
unblock:
UNLOCK (&child->lock);
return ret;
}
/** /**
* This function is executed in a separate thread. The thread gets the * This function is executed in a separate thread. The thread gets the
* brick from where CHILD_UP has received from the queue and gets the * brick from where CHILD_UP has received from the queue and gets the
@ -1419,7 +1542,7 @@ notify (xlator_t *this, int32_t event, void *data, ...)
{ {
child = &priv->children[idx]; child = &priv->children[idx];
if (child->child_up == 1) if (child->child_up == 1)
goto unblock; goto unblock_0;
priv->up_children++; priv->up_children++;
child->child_up = 1; child->child_up = 1;
@ -1430,7 +1553,7 @@ notify (xlator_t *this, int32_t event, void *data, ...)
_br_qchild_event (this, child, br_brick_connect); _br_qchild_event (this, child, br_brick_connect);
pthread_cond_signal (&priv->cond); pthread_cond_signal (&priv->cond);
} }
unblock: unblock_0:
pthread_mutex_unlock (&priv->lock); pthread_mutex_unlock (&priv->lock);
if (priv->up_children == priv->child_count) if (priv->up_children == priv->child_count)
@ -1447,11 +1570,17 @@ notify (xlator_t *this, int32_t event, void *data, ...)
pthread_mutex_lock (&priv->lock); pthread_mutex_lock (&priv->lock);
{ {
if (priv->children[idx].child_up == 1) { child = &priv->children[idx];
priv->children[idx].child_up = 0; if (child->child_up == 0)
priv->up_children--; goto unblock_1;
}
child->child_up = 0;
priv->up_children--;
_br_qchild_event (this, child, br_brick_disconnect);
pthread_cond_signal (&priv->cond);
} }
unblock_1:
pthread_mutex_unlock (&priv->lock); pthread_mutex_unlock (&priv->lock);
if (priv->up_children == 0) if (priv->up_children == 0)
@ -1644,6 +1773,7 @@ br_init_children (xlator_t *this, br_private_t *priv)
child = &priv->children[i]; child = &priv->children[i];
LOCK_INIT (&child->lock); LOCK_INIT (&child->lock);
child->witnessed = 0;
br_set_child_state (child, BR_CHILD_STATE_DISCONNECTED); br_set_child_state (child, BR_CHILD_STATE_DISCONNECTED);
child->this = this; child->this = this;

View File

@ -80,8 +80,10 @@ typedef enum br_child_state {
} br_child_state_t; } br_child_state_t;
struct br_child { struct br_child {
gf_lock_t lock; gf_lock_t lock; /* protects child state */
br_child_state_t c_state; char witnessed; /* witnessed at least one succesfull
connection */
br_child_state_t c_state; /* current state of this child */
char child_up; /* Indicates whether this child is char child_up; /* Indicates whether this child is
up or not */ up or not */
@ -231,4 +233,10 @@ _br_child_failed_conn (br_child_t *child)
return (child->c_state == BR_CHILD_STATE_CONNFAILED); return (child->c_state == BR_CHILD_STATE_CONNFAILED);
} }
static inline int
_br_child_witnessed_connection (br_child_t *child)
{
return (child->witnessed == 1);
}
#endif /* __BIT_ROT_H__ */ #endif /* __BIT_ROT_H__ */

View File

@ -22,28 +22,6 @@
#include "changelog-rpc-common.h" #include "changelog-rpc-common.h"
#include <pthread.h> #include <pthread.h>
static inline void
__mask_cancellation (xlator_t *this)
{
int ret = 0;
ret = pthread_setcancelstate (PTHREAD_CANCEL_DISABLE, NULL);
if (ret)
gf_log (this->name, GF_LOG_WARNING,
"failed to disable thread cancellation");
}
static inline void
__unmask_cancellation (xlator_t *this)
{
int ret = 0;
ret = pthread_setcancelstate (PTHREAD_CANCEL_ENABLE, NULL);
if (ret)
gf_log (this->name, GF_LOG_WARNING,
"failed to enable thread cancellation");
}
static void static void
changelog_cleanup_free_mutex (void *arg_mutex) changelog_cleanup_free_mutex (void *arg_mutex)
{ {
@ -1333,7 +1311,7 @@ changelog_rollover (void *data)
continue; continue;
} }
__mask_cancellation (this); _mask_cancellation ();
LOCK (&priv->lock); LOCK (&priv->lock);
{ {
@ -1343,7 +1321,7 @@ changelog_rollover (void *data)
} }
UNLOCK (&priv->lock); UNLOCK (&priv->lock);
__unmask_cancellation (this); _unmask_cancellation ();
} }
return NULL; return NULL;
@ -1371,14 +1349,14 @@ changelog_fsync_thread (void *data)
if (ret) if (ret)
continue; continue;
__mask_cancellation (this); _mask_cancellation ();
ret = changelog_inject_single_event (this, priv, &cld); ret = changelog_inject_single_event (this, priv, &cld);
if (ret) if (ret)
gf_log (this->name, GF_LOG_ERROR, gf_log (this->name, GF_LOG_ERROR,
"failed to inject fsync event"); "failed to inject fsync event");
__unmask_cancellation (this); _unmask_cancellation ();
} }
return NULL; return NULL;