MINOR: peers: Split resync process function to separate running/stopping states
The function responsible to deal with resynchro between all peers is now split in two subfunctions. The first one is used when HAProxy is running while the other one is used in soft-stop case. This patch is required to be able to refactor locking mechanism of the peers.
This commit is contained in:
parent
98583c4256
commit
4078893049
525
src/peers.c
525
src/peers.c
@ -3403,6 +3403,277 @@ static struct appctx *peer_session_create(struct peers *peers, struct peer *peer
|
||||
return NULL;
|
||||
}
|
||||
|
||||
static void __process_running_peer_sync(struct task *task, struct peers *peers, unsigned int state)
|
||||
{
|
||||
struct peer *ps;
|
||||
struct shared_table *st;
|
||||
|
||||
/* Acquire lock for all peers of the section */
|
||||
for (ps = peers->remote; ps; ps = ps->next)
|
||||
HA_SPIN_LOCK(PEER_LOCK, &ps->lock);
|
||||
|
||||
/* resync timeout set to TICK_ETERNITY means we just start
|
||||
* a new process and timer was not initialized.
|
||||
* We must arm this timer to switch to a request to a remote
|
||||
* node if incoming connection from old local process never
|
||||
* comes.
|
||||
*/
|
||||
if (peers->resync_timeout == TICK_ETERNITY)
|
||||
peers->resync_timeout = tick_add(now_ms, MS_TO_TICKS(PEER_RESYNC_TIMEOUT));
|
||||
|
||||
if (((peers->flags & PEERS_RESYNC_STATEMASK) == PEERS_RESYNC_FROMLOCAL) &&
|
||||
(!nb_oldpids || tick_is_expired(peers->resync_timeout, now_ms)) &&
|
||||
!(peers->flags & PEERS_F_RESYNC_ASSIGN)) {
|
||||
/* Resync from local peer needed
|
||||
no peer was assigned for the lesson
|
||||
and no old local peer found
|
||||
or resync timeout expire */
|
||||
|
||||
/* flag no more resync from local, to try resync from remotes */
|
||||
peers->flags |= PEERS_F_RESYNC_LOCAL;
|
||||
peers->flags |= PEERS_F_RESYNC_LOCALTIMEOUT;
|
||||
|
||||
/* reschedule a resync */
|
||||
peers->resync_timeout = tick_add(now_ms, MS_TO_TICKS(PEER_RESYNC_TIMEOUT));
|
||||
}
|
||||
|
||||
/* For each session */
|
||||
for (ps = peers->remote; ps; ps = ps->next) {
|
||||
/* For each remote peers */
|
||||
if (!ps->local) {
|
||||
if (!ps->appctx) {
|
||||
/* no active peer connection */
|
||||
if (ps->statuscode == 0 ||
|
||||
((ps->statuscode == PEER_SESS_SC_CONNECTCODE ||
|
||||
ps->statuscode == PEER_SESS_SC_SUCCESSCODE ||
|
||||
ps->statuscode == PEER_SESS_SC_CONNECTEDCODE) &&
|
||||
tick_is_expired(ps->reconnect, now_ms))) {
|
||||
/* connection never tried
|
||||
* or previous peer connection established with success
|
||||
* or previous peer connection failed while connecting
|
||||
* and reconnection timer is expired */
|
||||
|
||||
/* retry a connect */
|
||||
ps->appctx = peer_session_create(peers, ps);
|
||||
}
|
||||
else if (!tick_is_expired(ps->reconnect, now_ms)) {
|
||||
/* If previous session failed during connection
|
||||
* but reconnection timer is not expired */
|
||||
|
||||
/* reschedule task for reconnect */
|
||||
task->expire = tick_first(task->expire, ps->reconnect);
|
||||
}
|
||||
/* else do nothing */
|
||||
} /* !ps->appctx */
|
||||
else if (ps->statuscode == PEER_SESS_SC_SUCCESSCODE) {
|
||||
/* current peer connection is active and established */
|
||||
if (((peers->flags & PEERS_RESYNC_STATEMASK) == PEERS_RESYNC_FROMREMOTE) &&
|
||||
!(peers->flags & PEERS_F_RESYNC_ASSIGN) &&
|
||||
!(ps->flags & PEER_F_LEARN_NOTUP2DATE)) {
|
||||
/* Resync from a remote is needed
|
||||
* and no peer was assigned for lesson
|
||||
* and current peer may be up2date */
|
||||
|
||||
/* assign peer for the lesson */
|
||||
ps->flags |= PEER_F_LEARN_ASSIGN;
|
||||
peers->flags |= PEERS_F_RESYNC_ASSIGN;
|
||||
peers->flags |= PEERS_F_RESYNC_REMOTEASSIGN;
|
||||
|
||||
/* wake up peer handler to handle a request of resync */
|
||||
appctx_wakeup(ps->appctx);
|
||||
}
|
||||
else {
|
||||
int update_to_push = 0;
|
||||
|
||||
/* Awake session if there is data to push */
|
||||
for (st = ps->tables; st ; st = st->next) {
|
||||
if (st->last_pushed != st->table->localupdate) {
|
||||
/* wake up the peer handler to push local updates */
|
||||
update_to_push = 1;
|
||||
/* There is no need to send a heartbeat message
|
||||
* when some updates must be pushed. The remote
|
||||
* peer will consider <ps> peer as alive when it will
|
||||
* receive these updates.
|
||||
*/
|
||||
ps->flags &= ~PEER_F_HEARTBEAT;
|
||||
/* Re-schedule another one later. */
|
||||
ps->heartbeat = tick_add(now_ms, MS_TO_TICKS(PEER_HEARTBEAT_TIMEOUT));
|
||||
/* Refresh reconnect if necessary */
|
||||
if (tick_is_expired(ps->reconnect, now_ms))
|
||||
ps->reconnect = tick_add(now_ms, MS_TO_TICKS(PEER_RECONNECT_TIMEOUT));
|
||||
/* We are going to send updates, let's ensure we will
|
||||
* come back to send heartbeat messages or to reconnect.
|
||||
*/
|
||||
task->expire = tick_first(ps->reconnect, ps->heartbeat);
|
||||
appctx_wakeup(ps->appctx);
|
||||
break;
|
||||
}
|
||||
}
|
||||
/* When there are updates to send we do not reconnect
|
||||
* and do not send heartbeat message either.
|
||||
*/
|
||||
if (!update_to_push) {
|
||||
if (tick_is_expired(ps->reconnect, now_ms)) {
|
||||
if (ps->flags & PEER_F_ALIVE) {
|
||||
/* This peer was alive during a 'reconnect' period.
|
||||
* Flag it as not alive again for the next period.
|
||||
*/
|
||||
ps->flags &= ~PEER_F_ALIVE;
|
||||
ps->reconnect = tick_add(now_ms, MS_TO_TICKS(PEER_RECONNECT_TIMEOUT));
|
||||
}
|
||||
else {
|
||||
ps->reconnect = tick_add(now_ms, MS_TO_TICKS(50 + ha_random() % 2000));
|
||||
ps->heartbeat = TICK_ETERNITY;
|
||||
peer_session_forceshutdown(ps);
|
||||
ps->no_hbt++;
|
||||
}
|
||||
}
|
||||
else if (tick_is_expired(ps->heartbeat, now_ms)) {
|
||||
ps->heartbeat = tick_add(now_ms, MS_TO_TICKS(PEER_HEARTBEAT_TIMEOUT));
|
||||
ps->flags |= PEER_F_HEARTBEAT;
|
||||
appctx_wakeup(ps->appctx);
|
||||
}
|
||||
task->expire = tick_first(ps->reconnect, ps->heartbeat);
|
||||
}
|
||||
}
|
||||
/* else do nothing */
|
||||
} /* SUCCESSCODE */
|
||||
} /* !ps->peer->local */
|
||||
} /* for */
|
||||
|
||||
/* Resync from remotes expired: consider resync is finished */
|
||||
if (((peers->flags & PEERS_RESYNC_STATEMASK) == PEERS_RESYNC_FROMREMOTE) &&
|
||||
!(peers->flags & PEERS_F_RESYNC_ASSIGN) &&
|
||||
tick_is_expired(peers->resync_timeout, now_ms)) {
|
||||
/* Resync from remote peer needed
|
||||
* no peer was assigned for the lesson
|
||||
* and resync timeout expire */
|
||||
|
||||
/* flag no more resync from remote, consider resync is finished */
|
||||
peers->flags |= PEERS_F_RESYNC_REMOTE;
|
||||
peers->flags |= PEERS_F_RESYNC_REMOTETIMEOUT;
|
||||
}
|
||||
|
||||
if ((peers->flags & PEERS_RESYNC_STATEMASK) != PEERS_RESYNC_FINISHED) {
|
||||
/* Resync not finished*/
|
||||
/* reschedule task to resync timeout if not expired, to ended resync if needed */
|
||||
if (!tick_is_expired(peers->resync_timeout, now_ms))
|
||||
task->expire = tick_first(task->expire, peers->resync_timeout);
|
||||
}
|
||||
|
||||
/* Release lock for all peers of the section */
|
||||
for (ps = peers->remote; ps; ps = ps->next)
|
||||
HA_SPIN_UNLOCK(PEER_LOCK, &ps->lock);
|
||||
}
|
||||
|
||||
static void __process_stopping_peer_sync(struct task *task, struct peers *peers, unsigned int state)
|
||||
{
|
||||
struct peer *ps;
|
||||
struct shared_table *st;
|
||||
|
||||
/* Acquire lock for all peers of the section */
|
||||
for (ps = peers->remote; ps; ps = ps->next)
|
||||
HA_SPIN_LOCK(PEER_LOCK, &ps->lock);
|
||||
|
||||
if (state & TASK_WOKEN_SIGNAL) {
|
||||
/* We've just received the signal */
|
||||
if (!(peers->flags & PEERS_F_DONOTSTOP)) {
|
||||
/* add DO NOT STOP flag if not present */
|
||||
_HA_ATOMIC_INC(&jobs);
|
||||
peers->flags |= PEERS_F_DONOTSTOP;
|
||||
|
||||
/* disconnect all connected peers to process a local sync
|
||||
* this must be done only the first time we are switching
|
||||
* in stopping state
|
||||
*/
|
||||
for (ps = peers->remote; ps; ps = ps->next) {
|
||||
/* we're killing a connection, we must apply a random delay before
|
||||
* retrying otherwise the other end will do the same and we can loop
|
||||
* for a while.
|
||||
*/
|
||||
ps->reconnect = tick_add(now_ms, MS_TO_TICKS(50 + ha_random() % 2000));
|
||||
if (ps->appctx) {
|
||||
peer_session_forceshutdown(ps);
|
||||
}
|
||||
}
|
||||
|
||||
/* Set resync timeout for the local peer and request a immediate reconnect */
|
||||
peers->resync_timeout = tick_add(now_ms, MS_TO_TICKS(PEER_RESYNC_TIMEOUT));
|
||||
peers->local->reconnect = now_ms;
|
||||
}
|
||||
}
|
||||
|
||||
ps = peers->local;
|
||||
if (ps->flags & PEER_F_TEACH_COMPLETE) {
|
||||
if (peers->flags & PEERS_F_DONOTSTOP) {
|
||||
/* resync of new process was complete, current process can die now */
|
||||
_HA_ATOMIC_DEC(&jobs);
|
||||
peers->flags &= ~PEERS_F_DONOTSTOP;
|
||||
for (st = ps->tables; st ; st = st->next)
|
||||
HA_ATOMIC_DEC(&st->table->refcnt);
|
||||
}
|
||||
}
|
||||
else if (!ps->appctx) {
|
||||
/* Re-arm resync timeout if necessary */
|
||||
if (!tick_isset(peers->resync_timeout))
|
||||
peers->resync_timeout = tick_add(now_ms, MS_TO_TICKS(PEER_RESYNC_TIMEOUT));
|
||||
|
||||
/* If there's no active peer connection */
|
||||
if ((peers->flags & PEERS_RESYNC_STATEMASK) == PEERS_RESYNC_FINISHED &&
|
||||
!tick_is_expired(peers->resync_timeout, now_ms) &&
|
||||
(ps->statuscode == 0 ||
|
||||
ps->statuscode == PEER_SESS_SC_SUCCESSCODE ||
|
||||
ps->statuscode == PEER_SESS_SC_CONNECTEDCODE ||
|
||||
ps->statuscode == PEER_SESS_SC_TRYAGAIN)) {
|
||||
/* The resync is finished for the local peer and
|
||||
* the resync timeout is not expired and
|
||||
* connection never tried
|
||||
* or previous peer connection was successfully established
|
||||
* or previous tcp connect succeeded but init state incomplete
|
||||
* or during previous connect, peer replies a try again statuscode */
|
||||
|
||||
if (!tick_is_expired(ps->reconnect, now_ms)) {
|
||||
/* reconnection timer is not expired. reschedule task for reconnect */
|
||||
task->expire = tick_first(task->expire, ps->reconnect);
|
||||
}
|
||||
else {
|
||||
/* connect to the local peer if we must push a local sync */
|
||||
if (peers->flags & PEERS_F_DONOTSTOP) {
|
||||
peer_session_create(peers, ps);
|
||||
}
|
||||
}
|
||||
}
|
||||
else {
|
||||
/* Other error cases */
|
||||
if (peers->flags & PEERS_F_DONOTSTOP) {
|
||||
/* unable to resync new process, current process can die now */
|
||||
_HA_ATOMIC_DEC(&jobs);
|
||||
peers->flags &= ~PEERS_F_DONOTSTOP;
|
||||
for (st = ps->tables; st ; st = st->next)
|
||||
HA_ATOMIC_DEC(&st->table->refcnt);
|
||||
}
|
||||
}
|
||||
}
|
||||
else if (ps->statuscode == PEER_SESS_SC_SUCCESSCODE ) {
|
||||
/* Reset resync timeout during a resync */
|
||||
peers->resync_timeout = TICK_ETERNITY;
|
||||
|
||||
/* current peer connection is active and established
|
||||
* wake up all peer handlers to push remaining local updates */
|
||||
for (st = ps->tables; st ; st = st->next) {
|
||||
if (st->last_pushed != st->table->localupdate) {
|
||||
appctx_wakeup(ps->appctx);
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/* Release lock for all peers of the section */
|
||||
for (ps = peers->remote; ps; ps = ps->next)
|
||||
HA_SPIN_UNLOCK(PEER_LOCK, &ps->lock);
|
||||
|
||||
}
|
||||
|
||||
/*
|
||||
* Task processing function to manage re-connect, peer session
|
||||
* tasks wakeup on local update and heartbeat. Let's keep it exported so that it
|
||||
@ -3411,267 +3682,19 @@ static struct appctx *peer_session_create(struct peers *peers, struct peer *peer
|
||||
struct task *process_peer_sync(struct task * task, void *context, unsigned int state)
|
||||
{
|
||||
struct peers *peers = context;
|
||||
struct peer *ps;
|
||||
struct shared_table *st;
|
||||
|
||||
task->expire = TICK_ETERNITY;
|
||||
|
||||
/* Acquire lock for all peers of the section */
|
||||
for (ps = peers->remote; ps; ps = ps->next)
|
||||
HA_SPIN_LOCK(PEER_LOCK, &ps->lock);
|
||||
|
||||
if (!stopping) {
|
||||
/* Normal case (not soft stop)*/
|
||||
__process_running_peer_sync(task, peers, state);
|
||||
|
||||
/* resync timeout set to TICK_ETERNITY means we just start
|
||||
* a new process and timer was not initialized.
|
||||
* We must arm this timer to switch to a request to a remote
|
||||
* node if incoming connection from old local process never
|
||||
* comes.
|
||||
*/
|
||||
if (peers->resync_timeout == TICK_ETERNITY)
|
||||
peers->resync_timeout = tick_add(now_ms, MS_TO_TICKS(PEER_RESYNC_TIMEOUT));
|
||||
|
||||
if (((peers->flags & PEERS_RESYNC_STATEMASK) == PEERS_RESYNC_FROMLOCAL) &&
|
||||
(!nb_oldpids || tick_is_expired(peers->resync_timeout, now_ms)) &&
|
||||
!(peers->flags & PEERS_F_RESYNC_ASSIGN)) {
|
||||
/* Resync from local peer needed
|
||||
no peer was assigned for the lesson
|
||||
and no old local peer found
|
||||
or resync timeout expire */
|
||||
|
||||
/* flag no more resync from local, to try resync from remotes */
|
||||
peers->flags |= PEERS_F_RESYNC_LOCAL;
|
||||
peers->flags |= PEERS_F_RESYNC_LOCALTIMEOUT;
|
||||
|
||||
/* reschedule a resync */
|
||||
peers->resync_timeout = tick_add(now_ms, MS_TO_TICKS(PEER_RESYNC_TIMEOUT));
|
||||
}
|
||||
|
||||
/* For each session */
|
||||
for (ps = peers->remote; ps; ps = ps->next) {
|
||||
/* For each remote peers */
|
||||
if (!ps->local) {
|
||||
if (!ps->appctx) {
|
||||
/* no active peer connection */
|
||||
if (ps->statuscode == 0 ||
|
||||
((ps->statuscode == PEER_SESS_SC_CONNECTCODE ||
|
||||
ps->statuscode == PEER_SESS_SC_SUCCESSCODE ||
|
||||
ps->statuscode == PEER_SESS_SC_CONNECTEDCODE) &&
|
||||
tick_is_expired(ps->reconnect, now_ms))) {
|
||||
/* connection never tried
|
||||
* or previous peer connection established with success
|
||||
* or previous peer connection failed while connecting
|
||||
* and reconnection timer is expired */
|
||||
|
||||
/* retry a connect */
|
||||
ps->appctx = peer_session_create(peers, ps);
|
||||
}
|
||||
else if (!tick_is_expired(ps->reconnect, now_ms)) {
|
||||
/* If previous session failed during connection
|
||||
* but reconnection timer is not expired */
|
||||
|
||||
/* reschedule task for reconnect */
|
||||
task->expire = tick_first(task->expire, ps->reconnect);
|
||||
}
|
||||
/* else do nothing */
|
||||
} /* !ps->appctx */
|
||||
else if (ps->statuscode == PEER_SESS_SC_SUCCESSCODE) {
|
||||
/* current peer connection is active and established */
|
||||
if (((peers->flags & PEERS_RESYNC_STATEMASK) == PEERS_RESYNC_FROMREMOTE) &&
|
||||
!(peers->flags & PEERS_F_RESYNC_ASSIGN) &&
|
||||
!(ps->flags & PEER_F_LEARN_NOTUP2DATE)) {
|
||||
/* Resync from a remote is needed
|
||||
* and no peer was assigned for lesson
|
||||
* and current peer may be up2date */
|
||||
|
||||
/* assign peer for the lesson */
|
||||
ps->flags |= PEER_F_LEARN_ASSIGN;
|
||||
peers->flags |= PEERS_F_RESYNC_ASSIGN;
|
||||
peers->flags |= PEERS_F_RESYNC_REMOTEASSIGN;
|
||||
|
||||
/* wake up peer handler to handle a request of resync */
|
||||
appctx_wakeup(ps->appctx);
|
||||
}
|
||||
else {
|
||||
int update_to_push = 0;
|
||||
|
||||
/* Awake session if there is data to push */
|
||||
for (st = ps->tables; st ; st = st->next) {
|
||||
if (st->last_pushed != st->table->localupdate) {
|
||||
/* wake up the peer handler to push local updates */
|
||||
update_to_push = 1;
|
||||
/* There is no need to send a heartbeat message
|
||||
* when some updates must be pushed. The remote
|
||||
* peer will consider <ps> peer as alive when it will
|
||||
* receive these updates.
|
||||
*/
|
||||
ps->flags &= ~PEER_F_HEARTBEAT;
|
||||
/* Re-schedule another one later. */
|
||||
ps->heartbeat = tick_add(now_ms, MS_TO_TICKS(PEER_HEARTBEAT_TIMEOUT));
|
||||
/* Refresh reconnect if necessary */
|
||||
if (tick_is_expired(ps->reconnect, now_ms))
|
||||
ps->reconnect = tick_add(now_ms, MS_TO_TICKS(PEER_RECONNECT_TIMEOUT));
|
||||
/* We are going to send updates, let's ensure we will
|
||||
* come back to send heartbeat messages or to reconnect.
|
||||
*/
|
||||
task->expire = tick_first(ps->reconnect, ps->heartbeat);
|
||||
appctx_wakeup(ps->appctx);
|
||||
break;
|
||||
}
|
||||
}
|
||||
/* When there are updates to send we do not reconnect
|
||||
* and do not send heartbeat message either.
|
||||
*/
|
||||
if (!update_to_push) {
|
||||
if (tick_is_expired(ps->reconnect, now_ms)) {
|
||||
if (ps->flags & PEER_F_ALIVE) {
|
||||
/* This peer was alive during a 'reconnect' period.
|
||||
* Flag it as not alive again for the next period.
|
||||
*/
|
||||
ps->flags &= ~PEER_F_ALIVE;
|
||||
ps->reconnect = tick_add(now_ms, MS_TO_TICKS(PEER_RECONNECT_TIMEOUT));
|
||||
}
|
||||
else {
|
||||
ps->reconnect = tick_add(now_ms, MS_TO_TICKS(50 + ha_random() % 2000));
|
||||
ps->heartbeat = TICK_ETERNITY;
|
||||
peer_session_forceshutdown(ps);
|
||||
ps->no_hbt++;
|
||||
}
|
||||
}
|
||||
else if (tick_is_expired(ps->heartbeat, now_ms)) {
|
||||
ps->heartbeat = tick_add(now_ms, MS_TO_TICKS(PEER_HEARTBEAT_TIMEOUT));
|
||||
ps->flags |= PEER_F_HEARTBEAT;
|
||||
appctx_wakeup(ps->appctx);
|
||||
}
|
||||
task->expire = tick_first(ps->reconnect, ps->heartbeat);
|
||||
}
|
||||
}
|
||||
/* else do nothing */
|
||||
} /* SUCCESSCODE */
|
||||
} /* !ps->peer->local */
|
||||
} /* for */
|
||||
|
||||
/* Resync from remotes expired: consider resync is finished */
|
||||
if (((peers->flags & PEERS_RESYNC_STATEMASK) == PEERS_RESYNC_FROMREMOTE) &&
|
||||
!(peers->flags & PEERS_F_RESYNC_ASSIGN) &&
|
||||
tick_is_expired(peers->resync_timeout, now_ms)) {
|
||||
/* Resync from remote peer needed
|
||||
* no peer was assigned for the lesson
|
||||
* and resync timeout expire */
|
||||
|
||||
/* flag no more resync from remote, consider resync is finished */
|
||||
peers->flags |= PEERS_F_RESYNC_REMOTE;
|
||||
peers->flags |= PEERS_F_RESYNC_REMOTETIMEOUT;
|
||||
}
|
||||
|
||||
if ((peers->flags & PEERS_RESYNC_STATEMASK) != PEERS_RESYNC_FINISHED) {
|
||||
/* Resync not finished*/
|
||||
/* reschedule task to resync timeout if not expired, to ended resync if needed */
|
||||
if (!tick_is_expired(peers->resync_timeout, now_ms))
|
||||
task->expire = tick_first(task->expire, peers->resync_timeout);
|
||||
}
|
||||
} /* !stopping */
|
||||
}
|
||||
else {
|
||||
/* soft stop case */
|
||||
if (state & TASK_WOKEN_SIGNAL) {
|
||||
/* We've just received the signal */
|
||||
if (!(peers->flags & PEERS_F_DONOTSTOP)) {
|
||||
/* add DO NOT STOP flag if not present */
|
||||
_HA_ATOMIC_INC(&jobs);
|
||||
peers->flags |= PEERS_F_DONOTSTOP;
|
||||
|
||||
/* disconnect all connected peers to process a local sync
|
||||
* this must be done only the first time we are switching
|
||||
* in stopping state
|
||||
*/
|
||||
for (ps = peers->remote; ps; ps = ps->next) {
|
||||
/* we're killing a connection, we must apply a random delay before
|
||||
* retrying otherwise the other end will do the same and we can loop
|
||||
* for a while.
|
||||
*/
|
||||
ps->reconnect = tick_add(now_ms, MS_TO_TICKS(50 + ha_random() % 2000));
|
||||
if (ps->appctx) {
|
||||
peer_session_forceshutdown(ps);
|
||||
}
|
||||
}
|
||||
|
||||
/* Set resync timeout for the local peer and request a immediate reconnect */
|
||||
peers->resync_timeout = tick_add(now_ms, MS_TO_TICKS(PEER_RESYNC_TIMEOUT));
|
||||
peers->local->reconnect = now_ms;
|
||||
}
|
||||
}
|
||||
|
||||
ps = peers->local;
|
||||
if (ps->flags & PEER_F_TEACH_COMPLETE) {
|
||||
if (peers->flags & PEERS_F_DONOTSTOP) {
|
||||
/* resync of new process was complete, current process can die now */
|
||||
_HA_ATOMIC_DEC(&jobs);
|
||||
peers->flags &= ~PEERS_F_DONOTSTOP;
|
||||
for (st = ps->tables; st ; st = st->next)
|
||||
HA_ATOMIC_DEC(&st->table->refcnt);
|
||||
}
|
||||
}
|
||||
else if (!ps->appctx) {
|
||||
/* Re-arm resync timeout if necessary */
|
||||
if (!tick_isset(peers->resync_timeout))
|
||||
peers->resync_timeout = tick_add(now_ms, MS_TO_TICKS(PEER_RESYNC_TIMEOUT));
|
||||
|
||||
/* If there's no active peer connection */
|
||||
if ((peers->flags & PEERS_RESYNC_STATEMASK) == PEERS_RESYNC_FINISHED &&
|
||||
!tick_is_expired(peers->resync_timeout, now_ms) &&
|
||||
(ps->statuscode == 0 ||
|
||||
ps->statuscode == PEER_SESS_SC_SUCCESSCODE ||
|
||||
ps->statuscode == PEER_SESS_SC_CONNECTEDCODE ||
|
||||
ps->statuscode == PEER_SESS_SC_TRYAGAIN)) {
|
||||
/* The resync is finished for the local peer and
|
||||
* the resync timeout is not expired and
|
||||
* connection never tried
|
||||
* or previous peer connection was successfully established
|
||||
* or previous tcp connect succeeded but init state incomplete
|
||||
* or during previous connect, peer replies a try again statuscode */
|
||||
|
||||
if (!tick_is_expired(ps->reconnect, now_ms)) {
|
||||
/* reconnection timer is not expired. reschedule task for reconnect */
|
||||
task->expire = tick_first(task->expire, ps->reconnect);
|
||||
}
|
||||
else {
|
||||
/* connect to the local peer if we must push a local sync */
|
||||
if (peers->flags & PEERS_F_DONOTSTOP) {
|
||||
peer_session_create(peers, ps);
|
||||
}
|
||||
}
|
||||
}
|
||||
else {
|
||||
/* Other error cases */
|
||||
if (peers->flags & PEERS_F_DONOTSTOP) {
|
||||
/* unable to resync new process, current process can die now */
|
||||
_HA_ATOMIC_DEC(&jobs);
|
||||
peers->flags &= ~PEERS_F_DONOTSTOP;
|
||||
for (st = ps->tables; st ; st = st->next)
|
||||
HA_ATOMIC_DEC(&st->table->refcnt);
|
||||
}
|
||||
}
|
||||
}
|
||||
else if (ps->statuscode == PEER_SESS_SC_SUCCESSCODE ) {
|
||||
/* Reset resync timeout during a resync */
|
||||
peers->resync_timeout = TICK_ETERNITY;
|
||||
|
||||
/* current peer connection is active and established
|
||||
* wake up all peer handlers to push remaining local updates */
|
||||
for (st = ps->tables; st ; st = st->next) {
|
||||
if (st->last_pushed != st->table->localupdate) {
|
||||
appctx_wakeup(ps->appctx);
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
__process_stopping_peer_sync(task, peers, state);
|
||||
} /* stopping */
|
||||
|
||||
/* Release lock for all peers of the section */
|
||||
for (ps = peers->remote; ps; ps = ps->next)
|
||||
HA_SPIN_UNLOCK(PEER_LOCK, &ps->lock);
|
||||
|
||||
/* Wakeup for re-connect */
|
||||
return task;
|
||||
}
|
||||
|
Loading…
x
Reference in New Issue
Block a user