glusterd/snapshot : Clean up of old barrier code.

As a new barrier translator is introduced, we dont require
the old barrier code. Hence cleaning thar up.

Change-Id: Ieedca6f33a746898f0d2332fda1f1d4c86fff98f
BUG: 1061685
Signed-off-by: Sachin Pandit <spandit@redhat.com>
Reviewed-on: http://review.gluster.org/7577
Tested-by: Gluster Build System <jenkins@build.gluster.com>
Reviewed-by: Kaushal M <kaushal@redhat.com>
Reviewed-by: Vijaikumar Mallikarjuna <vmallika@redhat.com>
Reviewed-by: Vijay Bellur <vbellur@redhat.com>
This commit is contained in:
Sachin Pandit 2014-04-28 05:58:41 +05:30 committed by Vijay Bellur
parent e2034a84d2
commit d09b327a27
4 changed files with 0 additions and 478 deletions

View File

@ -1093,327 +1093,3 @@ out:
return ret;
}
int32_t
gf_barrier_transmit (server_conf_t *conf, gf_barrier_payload_t *payload)
{
gf_barrier_t *barrier = NULL;
int32_t ret = -1;
client_t *client = NULL;
gf_boolean_t lk_heal = _gf_false;
call_frame_t *frame = NULL;
server_state_t *state = NULL;
GF_VALIDATE_OR_GOTO ("barrier", conf, out);
GF_VALIDATE_OR_GOTO ("barrier", conf->barrier, out);
GF_VALIDATE_OR_GOTO ("barrier", payload, out);
barrier = conf->barrier;
frame = payload->frame;
if (frame) {
state = CALL_STATE (frame);
frame->local = NULL;
client = frame->root->client;
}
/* currently lk fops are not barrier'ed. This is reflecting code in
* server_submit_reply */
if (client)
lk_heal = ((server_conf_t *) client->this->private)->lk_heal;
ret = rpcsvc_submit_generic (payload->req, &payload->rsp, 1,
payload->payload, payload->payload_count,
payload->iobref);
iobuf_unref (payload->iob);
if (ret == -1) {
gf_log_callingfn ("", GF_LOG_ERROR, "Reply submission failed");
if (frame && client && !lk_heal) {
server_connection_cleanup (frame->this, client,
INTERNAL_LOCKS | POSIX_LOCKS);
} else {
/* TODO: Failure of open(dir), create, inodelk, entrylk
or lk fops send failure must be handled specially. */
}
goto ret;
}
ret = 0;
ret:
if (state) {
free_state (state);
}
if (frame) {
gf_client_unref (client);
STACK_DESTROY (frame->root);
}
if (payload->free_iobref) {
iobref_unref (payload->iobref);
}
out:
return ret;
}
gf_barrier_payload_t *
gf_barrier_dequeue (gf_barrier_t *barrier)
{
gf_barrier_payload_t *payload = NULL;
if (!barrier || list_empty (&barrier->queue))
return NULL;
payload = list_entry (barrier->queue.next,
gf_barrier_payload_t, list);
if (payload) {
list_del_init (&payload->list);
barrier->cur_size--;
}
return payload;
}
void*
gf_barrier_dequeue_start (void *data)
{
server_conf_t *conf = NULL;
gf_barrier_t *barrier = NULL;
gf_barrier_payload_t *payload = NULL;
conf = (server_conf_t *)data;
if (!conf || !conf->barrier)
return NULL;
barrier = conf->barrier;
LOCK (&barrier->lock);
{
while (barrier->cur_size) {
payload = gf_barrier_dequeue (barrier);
if (payload) {
if (gf_barrier_transmit (conf, payload)) {
gf_log ("server", GF_LOG_WARNING,
"Failed to transmit");
}
GF_FREE (payload);
}
}
}
UNLOCK (&barrier->lock);
return NULL;
}
void
gf_barrier_timeout (void *data)
{
server_conf_t *conf = NULL;
gf_barrier_t *barrier = NULL;
gf_boolean_t need_dequeue = _gf_false;
conf = (server_conf_t *)data;
if (!conf || !conf->barrier)
goto out;
barrier = conf->barrier;
gf_log ("", GF_LOG_INFO, "barrier timed-out");
LOCK (&barrier->lock);
{
need_dequeue = barrier->on;
barrier->on = _gf_false;
}
UNLOCK (&barrier->lock);
if (need_dequeue == _gf_true)
gf_barrier_dequeue_start (data);
out:
return;
}
int32_t
gf_barrier_start (xlator_t *this)
{
server_conf_t *conf = NULL;
gf_barrier_t *barrier = NULL;
int32_t ret = -1;
struct timespec time = {0,};
conf = this->private;
GF_VALIDATE_OR_GOTO ("server", this, out);
GF_VALIDATE_OR_GOTO (this->name, conf, out);
GF_VALIDATE_OR_GOTO (this->name, conf->barrier, out);
barrier = conf->barrier;
gf_log (this->name, GF_LOG_INFO, "barrier start called");
LOCK (&barrier->lock);
{
/* if barrier is on, reset timer */
if (barrier->on == _gf_true) {
ret = gf_timer_call_cancel (this->ctx, barrier->timer);
if (ret) {
gf_log (this->name, GF_LOG_ERROR, "Failed to "
"unset timer, failing barrier start");
goto unlock;
}
}
barrier->on = _gf_true;
time.tv_sec = barrier->time_out;
time.tv_nsec = 0;
barrier->timer = gf_timer_call_after (this->ctx, time,
gf_barrier_timeout,
(void *)conf);
if (!barrier->timer) {
gf_log (this->name, GF_LOG_ERROR, "Failed to set "
"timer, failing barrier start");
barrier->on = _gf_false;
}
}
unlock:
UNLOCK (&barrier->lock);
ret = 0;
out:
return ret;
}
int32_t
gf_barrier_stop (xlator_t *this)
{
server_conf_t *conf = NULL;
gf_barrier_t *barrier = NULL;
int32_t ret = -1;
gf_boolean_t need_dequeue = _gf_false;
conf = this->private;
GF_VALIDATE_OR_GOTO ("server", this, out);
GF_VALIDATE_OR_GOTO (this->name, conf, out);
GF_VALIDATE_OR_GOTO (this->name, conf->barrier, out);
barrier = conf->barrier;
gf_log (this->name, GF_LOG_INFO, "barrier stop called");
LOCK (&barrier->lock);
{
need_dequeue = barrier->on;
barrier->on = _gf_false;
}
UNLOCK (&barrier->lock);
if (need_dequeue == _gf_true) {
gf_timer_call_cancel (this->ctx, barrier->timer);
ret = gf_thread_create (&conf->barrier_th, NULL,
gf_barrier_dequeue_start,
conf);
if (ret) {
gf_log (this->name, GF_LOG_CRITICAL,
"Failed to start un-barriering");
goto out;
}
}
ret = 0;
out:
return ret;
}
int32_t
gf_barrier_fops_configure (xlator_t *this, gf_barrier_t *barrier, char *str)
{
int32_t ret = -1;
char *dup_str = NULL;
char *str_tok = NULL;
char *save_ptr = NULL;
uint64_t fops = 0;
/* by defaul fsync & flush needs to be barriered */
fops |= 1 << GFS3_OP_FSYNC;
fops |= 1 << GFS3_OP_FLUSH;
if (!str)
goto done;
dup_str = gf_strdup (str);
if (!dup_str)
goto done;
str_tok = strtok_r (dup_str, ",", &save_ptr);
if (!str_tok)
goto done;
fops = 0;
while (str_tok) {
if (!strcmp(str_tok, "writev")) {
fops |= ((uint64_t)1 << GFS3_OP_WRITE);
} else if (!strcmp(str_tok, "fsync")) {
fops |= ((uint64_t)1 << GFS3_OP_FSYNC);
} else if (!strcmp(str_tok, "read")) {
fops |= ((uint64_t)1 << GFS3_OP_READ);
} else if (!strcmp(str_tok, "rename")) {
fops |= ((uint64_t)1 << GFS3_OP_RENAME);
} else if (!strcmp(str_tok, "flush")) {
fops |= ((uint64_t)1 << GFS3_OP_FLUSH);
} else if (!strcmp(str_tok, "ftruncate")) {
fops |= ((uint64_t)1 << GFS3_OP_FTRUNCATE);
} else if (!strcmp(str_tok, "fallocate")) {
fops |= ((uint64_t)1 << GFS3_OP_FALLOCATE);
} else if (!strcmp(str_tok, "rmdir")) {
fops |= ((uint64_t)1 << GFS3_OP_RMDIR);
} else {
gf_log ("barrier", GF_LOG_ERROR,
"Invalid barrier fop %s", str_tok);
}
str_tok = strtok_r (NULL, ",", &save_ptr);
}
done:
LOCK (&barrier->lock);
{
barrier->fops = fops;
}
UNLOCK (&barrier->lock);
ret = 0;
GF_FREE (dup_str);
return ret;
}
void
gf_barrier_enqueue (gf_barrier_t *barrier, gf_barrier_payload_t *payload)
{
list_add_tail (&payload->list, &barrier->queue);
barrier->cur_size++;
}
gf_barrier_payload_t *
gf_barrier_payload (rpcsvc_request_t *req, struct iovec *rsp,
call_frame_t *frame, struct iovec *payload_orig,
int payloadcount, struct iobref *iobref,
struct iobuf *iob, gf_boolean_t free_iobref)
{
gf_barrier_payload_t *payload = NULL;
if (!rsp)
return NULL;
payload = GF_CALLOC (1, sizeof (*payload),1);
if (!payload)
return NULL;
INIT_LIST_HEAD (&payload->list);
payload->req = req;
memcpy (&payload->rsp, rsp, sizeof (struct iovec));
payload->frame = frame;
payload->payload = payload_orig;
payload->payload_count = payloadcount;
payload->iobref = iobref;
payload->iob = iob;
payload->free_iobref = free_iobref;
return payload;
}

View File

@ -29,10 +29,6 @@
#define IS_NOT_ROOT(pathlen) ((pathlen > 2)? 1 : 0)
#define is_fop_barriered(fops, procnum) (fops & ((uint64_t)1 << procnum))
#define barrier_add_to_queue(barrier) (barrier->on || barrier->cur_size)
void free_state (server_state_t *state);
void server_loc_wipe (loc_t *loc);
@ -61,16 +57,4 @@ int auth_set_username_passwd (dict_t *input_params, dict_t *config_params,
struct _client_t *client);
server_ctx_t *server_ctx_get (client_t *client, xlator_t *xlator);
int32_t gf_barrier_start (xlator_t *this);
int32_t gf_barrier_stop (xlator_t *this);
int32_t gf_barrier_fops_configure (xlator_t *this, gf_barrier_t *barrier,
char *str);
void gf_barrier_enqueue (gf_barrier_t *barrier, gf_barrier_payload_t *stub);
gf_barrier_payload_t *
gf_barrier_payload (rpcsvc_request_t *req, struct iovec *rsp,
call_frame_t *frame, struct iovec *payload,
int payloadcount, struct iobref *iobref,
struct iobuf *iob, gf_boolean_t free_iobref);
#endif /* !_SERVER_HELPERS_H */

View File

@ -142,9 +142,6 @@ server_submit_reply (call_frame_t *frame, rpcsvc_request_t *req, void *arg,
char new_iobref = 0;
client_t *client = NULL;
gf_boolean_t lk_heal = _gf_false;
server_conf_t *conf = NULL;
gf_barrier_t *barrier = NULL;
gf_barrier_payload_t *stub = NULL;
gf_boolean_t barriered = _gf_false;
GF_VALIDATE_OR_GOTO ("server", req, ret);
@ -153,7 +150,6 @@ server_submit_reply (call_frame_t *frame, rpcsvc_request_t *req, void *arg,
state = CALL_STATE (frame);
frame->local = NULL;
client = frame->root->client;
conf = (server_conf_t *) client->this->private;
}
if (client)
@ -176,32 +172,6 @@ server_submit_reply (call_frame_t *frame, rpcsvc_request_t *req, void *arg,
iobref_add (iobref, iob);
if (conf)
barrier = conf->barrier;
if (barrier) {
/* todo: write's with fd flags set to O_SYNC and O_DIRECT */
LOCK (&barrier->lock);
{
if (is_fop_barriered (barrier->fops, req->procnum) &&
(barrier_add_to_queue (barrier))) {
stub = gf_barrier_payload (req, &rsp, frame,
payload,
payloadcount, iobref,
iob, new_iobref);
if (stub) {
gf_barrier_enqueue (barrier, stub);
barriered = _gf_true;
} else {
gf_log ("", GF_LOG_ERROR, "Failed to "
" barrier fop %"PRIu64,
((uint64_t)1 << req->procnum));
}
}
}
UNLOCK (&barrier->lock);
if (barriered == _gf_true)
goto out;
}
/* Then, submit the message for transmission. */
ret = rpcsvc_submit_generic (req, &rsp, 1, payload, payloadcount,
iobref);
@ -243,7 +213,6 @@ ret:
if (new_iobref) {
iobref_unref (iobref);
}
out:
return ret;
}
@ -827,8 +796,6 @@ init (xlator_t *this)
server_conf_t *conf = NULL;
rpcsvc_listener_t *listener = NULL;
char *statedump_path = NULL;
gf_barrier_t *barrier = NULL;
char *str = NULL;
GF_VALIDATE_OR_GOTO ("init", this, out);
if (this->children == NULL) {
@ -977,37 +944,6 @@ init (xlator_t *this)
}
}
#endif
/* barrier related */
barrier = GF_CALLOC (1, sizeof (*barrier),1);
if (!barrier) {
gf_log (this->name, GF_LOG_WARNING,
"WARNING: Failed to allocate barrier");
ret = -1;
goto out;
}
LOCK_INIT (&barrier->lock);
INIT_LIST_HEAD (&barrier->queue);
barrier->on = _gf_false;
GF_OPTION_INIT ("barrier-queue-length", barrier->max_size,
int64, out);
GF_OPTION_INIT ("barrier-timeout", barrier->time_out,
uint64, out);
ret = dict_get_str (this->options, "barrier-fops", &str);
if (ret) {
gf_log (this->name, GF_LOG_DEBUG,
"setting barrier fops to default value");
}
ret = gf_barrier_fops_configure (this, barrier, str);
if (ret) {
gf_log (this->name, GF_LOG_ERROR,
"invalid barrier fops specified");
goto out;
}
conf->barrier = barrier;
this->private = conf;
ret = 0;
@ -1072,37 +1008,10 @@ notify (xlator_t *this, int32_t event, void *data, ...)
va_end (ap);
switch (event) {
case GF_EVENT_VOLUME_BARRIER_OP:
ret = dict_get_int32 (dict, "barrier", &val);
if (ret) {
gf_log (this->name, GF_LOG_ERROR,
"Wrong BARRIER event");
goto out;
}
/* !val un-barrier, if val, barrier */
if (val) {
ret = gf_barrier_start (this);
if (ret)
gf_log (this->name, GF_LOG_ERROR,
"Barrier start failed");
} else {
ret = gf_barrier_stop (this);
if (ret)
gf_log (this->name, GF_LOG_ERROR,
"Barrier stop failed");
}
ret = dict_set_int32 (output, "barrier-status", ret);
if (ret)
gf_log (this->name, GF_LOG_ERROR,
"Failed to set barrier-status in dict");
break;
/* todo: call default_notify to make other xlators handle it.*/
default:
default_notify (this, event, data);
break;
}
out:
return ret;
}
@ -1226,23 +1135,5 @@ struct volume_options options[] = {
"overrides the auth.allow option. By default, all"
" connections are allowed."
},
{.key = {"barrier-timeout"},
.type = GF_OPTION_TYPE_INT,
.default_value = "60",
.min = 0,
.max = 360,
.description = "Barrier timeout in seconds",
},
{.key = {"barrier-queue-length"},
.type = GF_OPTION_TYPE_INT,
.default_value = "4096",
.min = 0,
.max = 16384,
.description = "Barrier queue length",
},
{.key = {"barrier-fops"},
.type = GF_OPTION_TYPE_STR,
.description = "Allow a comma seperated fop lists",
},
{ .key = {NULL} },
};

View File

@ -28,34 +28,6 @@
#define GF_MAX_SOCKET_WINDOW_SIZE (1 * GF_UNIT_MB)
#define GF_MIN_SOCKET_WINDOW_SIZE (0)
struct _gf_barrier_payload {
rpcsvc_request_t *req;
struct iovec rsp;
call_frame_t *frame;
struct iovec *payload;
struct iobref *iobref;
struct iobuf *iob;
int payload_count;
gf_boolean_t free_iobref;
struct list_head list;
};
typedef struct _gf_barrier_payload gf_barrier_payload_t;
struct _gf_barrier {
gf_lock_t lock;
gf_boolean_t on;
gf_boolean_t force;
size_t cur_size;
int64_t max_size;
uint64_t fops;
gf_timer_t *timer;
uint64_t time_out;
struct list_head queue;
};
typedef struct _gf_barrier gf_barrier_t;
typedef enum {
INTERNAL_LOCKS = 1,
POSIX_LOCKS = 2,
@ -84,7 +56,6 @@ struct server_conf {
struct timespec grace_ts;
dict_t *auth_modules;
pthread_mutex_t mutex;
gf_barrier_t *barrier;
struct list_head xprt_list;
pthread_t barrier_th;
};