BUG/MAJOR: peers: Update peers section state from a thread-safe manner
It is the main part of this series. In the peer applet, only the peer flags are updated. It is now the responsibility of the resync process function to check changes on each peer to update the peers section state accordingly. Concretly, changes on the connection state (accepted, connected, released or renewed) are first reported at the peer level and then handled in __process_peer_state() function. In the same manner, when the learn status of a peer changes, the peers section state is no longer updated immediately. The resync task is woken up to deal with this changes. Thanks to these changes, the peers should be now really thread-safe. This patch relies on the following ones: * BUG/MINOR: peers: Report a resync was explicitly requested from a thread-safe manner * MINOR: peers: Add functions to commit peer changes from the resync task * MINOR: peers: sligthly adapt part processing the stopping signal * MINOR: peers: Add flags to report the peer state to the resync task * MINOR: peers: Add 2 peer flags about the peer learn status * MINOR: peers: Split resync process function to separate running/stopping states No bug was reported about the thread-safety of peers. Only a performance issue was encountered with a huge number of peers (> 50). So there is no reason to backport all these patches further than 2.9.
This commit is contained in:
parent
ef066fa186
commit
9425aeaffb
247
src/peers.c
247
src/peers.c
@ -1069,21 +1069,11 @@ void __peer_session_deinit(struct peer *peer)
|
|||||||
/* Re-init current table pointers to force announcement on re-connect */
|
/* Re-init current table pointers to force announcement on re-connect */
|
||||||
peer->remote_table = peer->last_local_table = peer->stop_local_table = NULL;
|
peer->remote_table = peer->last_local_table = peer->stop_local_table = NULL;
|
||||||
peer->appctx = NULL;
|
peer->appctx = NULL;
|
||||||
if (peer->flags & PEER_F_LEARN_ASSIGN) {
|
|
||||||
/* unassign current peer for learning */
|
|
||||||
peer->flags &= ~(PEER_F_LEARN_ASSIGN);
|
|
||||||
peers->flags &= ~(PEERS_F_RESYNC_ASSIGN|PEERS_F_RESYNC_PROCESS);
|
|
||||||
|
|
||||||
if (peer->local)
|
/* Mark peer as released */
|
||||||
peers->flags |= PEERS_F_RESYNC_LOCALABORT;
|
peer->flags &= PEER_STATE_RESET;
|
||||||
else
|
peer->flags |= PEER_F_ST_RELEASED;
|
||||||
peers->flags |= PEERS_F_RESYNC_REMOTEABORT;
|
|
||||||
/* reschedule a resync */
|
|
||||||
peers->resync_timeout = tick_add(now_ms, MS_TO_TICKS(5000));
|
|
||||||
}
|
|
||||||
/* reset teaching and learning flags to 0 */
|
|
||||||
peer->flags &= PEER_TEACH_RESET;
|
|
||||||
peer->flags &= PEER_LEARN_RESET;
|
|
||||||
task_wakeup(peers->sync_task, TASK_WOKEN_MSG);
|
task_wakeup(peers->sync_task, TASK_WOKEN_MSG);
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -2512,64 +2502,19 @@ static inline int peer_treat_awaited_msg(struct appctx *appctx, struct peer *pee
|
|||||||
else if (msg_head[1] == PEER_MSG_CTRL_RESYNCFINISHED) {
|
else if (msg_head[1] == PEER_MSG_CTRL_RESYNCFINISHED) {
|
||||||
TRACE_PROTO("received control message", PEERS_EV_CTRLMSG,
|
TRACE_PROTO("received control message", PEERS_EV_CTRLMSG,
|
||||||
NULL, &msg_head[1], peers->local->id, peer->id);
|
NULL, &msg_head[1], peers->local->id, peer->id);
|
||||||
if (peer->flags & PEER_F_LEARN_ASSIGN) {
|
if (peer->flags & PEER_F_LEARN_PROCESS) {
|
||||||
int commit_a_finish = 1;
|
peer->flags &= ~PEER_F_LEARN_PROCESS;
|
||||||
|
peer->flags |= PEER_F_LEARN_FINISHED;
|
||||||
peer->flags &= ~PEER_F_LEARN_ASSIGN;
|
|
||||||
peers->flags &= ~(PEERS_F_RESYNC_ASSIGN|PEERS_F_RESYNC_PROCESS);
|
|
||||||
if (peer->srv->shard) {
|
|
||||||
struct peer *ps;
|
|
||||||
|
|
||||||
peers->flags |= PEERS_F_RESYNC_REMOTEPARTIAL;
|
|
||||||
peer->flags |= PEER_F_LEARN_NOTUP2DATE;
|
|
||||||
for (ps = peers->remote; ps; ps = ps->next) {
|
|
||||||
if (ps->srv->shard == peer->srv->shard) {
|
|
||||||
/* flag all peers from same shard
|
|
||||||
* notup2date to disable request
|
|
||||||
* of a resync frm them
|
|
||||||
*/
|
|
||||||
ps->flags |= PEER_F_LEARN_NOTUP2DATE;
|
|
||||||
}
|
|
||||||
else if (ps->srv->shard && !(ps->flags & PEER_F_LEARN_NOTUP2DATE)) {
|
|
||||||
/* it remains some other shards not requested
|
|
||||||
* we don't commit a resync finish to request
|
|
||||||
* the other shards
|
|
||||||
*/
|
|
||||||
commit_a_finish = 0;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
if (!commit_a_finish) {
|
|
||||||
/* it remains some shard to request, we schedule a new request
|
|
||||||
*/
|
|
||||||
peers->resync_timeout = tick_add(now_ms, MS_TO_TICKS(PEER_RESYNC_TIMEOUT));
|
|
||||||
task_wakeup(peers->sync_task, TASK_WOKEN_MSG);
|
task_wakeup(peers->sync_task, TASK_WOKEN_MSG);
|
||||||
}
|
}
|
||||||
}
|
|
||||||
|
|
||||||
if (commit_a_finish) {
|
|
||||||
peers->flags |= (PEERS_F_RESYNC_LOCAL|PEERS_F_RESYNC_REMOTE);
|
|
||||||
if (peer->local)
|
|
||||||
peers->flags |= PEERS_F_RESYNC_LOCALFINISHED;
|
|
||||||
else
|
|
||||||
peers->flags |= PEERS_F_RESYNC_REMOTEFINISHED;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
peer->confirm++;
|
peer->confirm++;
|
||||||
}
|
}
|
||||||
else if (msg_head[1] == PEER_MSG_CTRL_RESYNCPARTIAL) {
|
else if (msg_head[1] == PEER_MSG_CTRL_RESYNCPARTIAL) {
|
||||||
TRACE_PROTO("received control message", PEERS_EV_CTRLMSG,
|
TRACE_PROTO("received control message", PEERS_EV_CTRLMSG,
|
||||||
NULL, &msg_head[1], peers->local->id, peer->id);
|
NULL, &msg_head[1], peers->local->id, peer->id);
|
||||||
if (peer->flags & PEER_F_LEARN_ASSIGN) {
|
if (peer->flags & PEER_F_LEARN_PROCESS) {
|
||||||
peer->flags &= ~PEER_F_LEARN_ASSIGN;
|
peer->flags &= ~PEER_F_LEARN_PROCESS;
|
||||||
peers->flags &= ~(PEERS_F_RESYNC_ASSIGN|PEERS_F_RESYNC_PROCESS);
|
peer->flags |= (PEER_F_LEARN_FINISHED|PEER_F_LEARN_NOTUP2DATE);
|
||||||
|
|
||||||
if (peer->local)
|
|
||||||
peers->flags |= PEERS_F_RESYNC_LOCALPARTIAL;
|
|
||||||
else
|
|
||||||
peers->flags |= PEERS_F_RESYNC_REMOTEPARTIAL;
|
|
||||||
peer->flags |= PEER_F_LEARN_NOTUP2DATE;
|
|
||||||
peers->resync_timeout = tick_add(now_ms, MS_TO_TICKS(PEER_RESYNC_TIMEOUT));
|
|
||||||
task_wakeup(peers->sync_task, TASK_WOKEN_MSG);
|
task_wakeup(peers->sync_task, TASK_WOKEN_MSG);
|
||||||
}
|
}
|
||||||
peer->confirm++;
|
peer->confirm++;
|
||||||
@ -2667,15 +2612,12 @@ static inline int peer_send_msgs(struct appctx *appctx,
|
|||||||
int repl;
|
int repl;
|
||||||
|
|
||||||
/* Need to request a resync */
|
/* Need to request a resync */
|
||||||
if ((peer->flags & PEER_F_LEARN_ASSIGN) &&
|
if ((peer->flags & (PEER_F_LEARN_ASSIGN|PEER_F_LEARN_PROCESS|PEER_F_LEARN_FINISHED)) == PEER_F_LEARN_ASSIGN) {
|
||||||
(peers->flags & PEERS_F_RESYNC_ASSIGN) &&
|
|
||||||
!(peers->flags & PEERS_F_RESYNC_PROCESS)) {
|
|
||||||
|
|
||||||
repl = peer_send_resync_reqmsg(appctx, peer, peers);
|
repl = peer_send_resync_reqmsg(appctx, peer, peers);
|
||||||
if (repl <= 0)
|
if (repl <= 0)
|
||||||
return repl;
|
return repl;
|
||||||
|
|
||||||
peers->flags |= PEERS_F_RESYNC_PROCESS;
|
peer->flags |= PEER_F_LEARN_PROCESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
/* Nothing to read, now we start to write */
|
/* Nothing to read, now we start to write */
|
||||||
@ -2906,6 +2848,9 @@ static inline void init_accepted_peer(struct peer *peer, struct peers *peers)
|
|||||||
/* Init confirm counter */
|
/* Init confirm counter */
|
||||||
peer->confirm = 0;
|
peer->confirm = 0;
|
||||||
|
|
||||||
|
peer->flags &= PEER_STATE_RESET;
|
||||||
|
peer->flags |= PEER_F_ST_ACCEPTED;
|
||||||
|
|
||||||
/* Init cursors */
|
/* Init cursors */
|
||||||
for (st = peer->tables; st ; st = st->next) {
|
for (st = peer->tables; st ; st = st->next) {
|
||||||
uint commitid, updateid;
|
uint commitid, updateid;
|
||||||
@ -2937,30 +2882,6 @@ static inline void init_accepted_peer(struct peer *peer, struct peers *peers)
|
|||||||
|
|
||||||
HA_RWLOCK_WRUNLOCK(STK_TABLE_LOCK, &st->table->updt_lock);
|
HA_RWLOCK_WRUNLOCK(STK_TABLE_LOCK, &st->table->updt_lock);
|
||||||
}
|
}
|
||||||
|
|
||||||
/* reset teaching and learning flags to 0 */
|
|
||||||
peer->flags &= PEER_TEACH_RESET;
|
|
||||||
peer->flags &= PEER_LEARN_RESET;
|
|
||||||
|
|
||||||
/* if current peer is local */
|
|
||||||
if (peer->local) {
|
|
||||||
/* if current host need resyncfrom local and no process assigned */
|
|
||||||
if ((peers->flags & PEERS_RESYNC_STATEMASK) == PEERS_RESYNC_FROMLOCAL &&
|
|
||||||
!(peers->flags & PEERS_F_RESYNC_ASSIGN)) {
|
|
||||||
/* assign local peer for a lesson, consider lesson already requested */
|
|
||||||
peer->flags |= PEER_F_LEARN_ASSIGN;
|
|
||||||
peers->flags |= (PEERS_F_RESYNC_ASSIGN|PEERS_F_RESYNC_PROCESS);
|
|
||||||
peers->flags |= PEERS_F_RESYNC_LOCALASSIGN;
|
|
||||||
}
|
|
||||||
|
|
||||||
}
|
|
||||||
else if ((peers->flags & PEERS_RESYNC_STATEMASK) == PEERS_RESYNC_FROMREMOTE &&
|
|
||||||
!(peers->flags & PEERS_F_RESYNC_ASSIGN)) {
|
|
||||||
/* assign peer for a lesson */
|
|
||||||
peer->flags |= PEER_F_LEARN_ASSIGN;
|
|
||||||
peers->flags |= PEERS_F_RESYNC_ASSIGN;
|
|
||||||
peers->flags |= PEERS_F_RESYNC_REMOTEASSIGN;
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/*
|
/*
|
||||||
@ -3003,28 +2924,14 @@ static inline void init_connected_peer(struct peer *peer, struct peers *peers)
|
|||||||
HA_RWLOCK_WRUNLOCK(STK_TABLE_LOCK, &st->table->updt_lock);
|
HA_RWLOCK_WRUNLOCK(STK_TABLE_LOCK, &st->table->updt_lock);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/* Awake main task */
|
||||||
|
task_wakeup(peers->sync_task, TASK_WOKEN_MSG);
|
||||||
|
|
||||||
/* Init confirm counter */
|
/* Init confirm counter */
|
||||||
peer->confirm = 0;
|
peer->confirm = 0;
|
||||||
|
|
||||||
/* reset teaching and learning flags to 0 */
|
peer->flags &= PEER_STATE_RESET;
|
||||||
peer->flags &= PEER_TEACH_RESET;
|
peer->flags |= PEER_F_ST_CONNECTED;
|
||||||
peer->flags &= PEER_LEARN_RESET;
|
|
||||||
|
|
||||||
/* If current peer is local */
|
|
||||||
if (peer->local) {
|
|
||||||
/* flag to start to teach lesson */
|
|
||||||
peer->flags |= PEER_F_TEACH_PROCESS;
|
|
||||||
}
|
|
||||||
else if ((peers->flags & PEERS_RESYNC_STATEMASK) == PEERS_RESYNC_FROMREMOTE &&
|
|
||||||
!(peers->flags & PEERS_F_RESYNC_ASSIGN)) {
|
|
||||||
/* If peer is remote and resync from remote is needed,
|
|
||||||
and no peer currently assigned */
|
|
||||||
|
|
||||||
/* assign peer for a lesson */
|
|
||||||
peer->flags |= PEER_F_LEARN_ASSIGN;
|
|
||||||
peers->flags |= PEERS_F_RESYNC_ASSIGN;
|
|
||||||
peers->flags |= PEERS_F_RESYNC_REMOTEASSIGN;
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/*
|
/*
|
||||||
@ -3108,6 +3015,11 @@ switchstate:
|
|||||||
*/
|
*/
|
||||||
curpeer->reconnect = tick_add(now_ms, MS_TO_TICKS(50 + ha_random() % 2000));
|
curpeer->reconnect = tick_add(now_ms, MS_TO_TICKS(50 + ha_random() % 2000));
|
||||||
peer_session_forceshutdown(curpeer);
|
peer_session_forceshutdown(curpeer);
|
||||||
|
|
||||||
|
/* old peer connection was replaced by a new one. */
|
||||||
|
curpeer->flags &= PEER_STATE_RESET;
|
||||||
|
curpeer->flags |= PEER_F_ST_RENEWED;
|
||||||
|
|
||||||
curpeer->heartbeat = TICK_ETERNITY;
|
curpeer->heartbeat = TICK_ETERNITY;
|
||||||
curpeer->coll++;
|
curpeer->coll++;
|
||||||
}
|
}
|
||||||
@ -3413,12 +3325,121 @@ static struct appctx *peer_session_create(struct peers *peers, struct peer *peer
|
|||||||
|
|
||||||
static void __process_peer_learn_status(struct peers *peers, struct peer *peer)
|
static void __process_peer_learn_status(struct peers *peers, struct peer *peer)
|
||||||
{
|
{
|
||||||
|
struct peer *ps;
|
||||||
|
|
||||||
|
if (peer->flags & PEER_F_LEARN_PROCESS)
|
||||||
|
peers->flags |= PEERS_F_RESYNC_PROCESS;
|
||||||
|
|
||||||
|
if (!(peer->flags & PEER_F_LEARN_FINISHED))
|
||||||
|
return;
|
||||||
|
|
||||||
|
if (peer->flags & PEER_F_LEARN_NOTUP2DATE) {
|
||||||
|
/* Partial resync */
|
||||||
|
peers->flags |= (peer->local ? PEERS_F_RESYNC_LOCALPARTIAL : PEERS_F_RESYNC_REMOTEPARTIAL);
|
||||||
|
peers->resync_timeout = tick_add(now_ms, MS_TO_TICKS(PEER_RESYNC_TIMEOUT));
|
||||||
|
}
|
||||||
|
else {
|
||||||
|
/* Full resync */
|
||||||
|
int commit_a_finish = 1;
|
||||||
|
|
||||||
|
if (peer->srv->shard) {
|
||||||
|
peers->flags |= PEERS_F_RESYNC_REMOTEPARTIAL;
|
||||||
|
peer->flags |= PEER_F_LEARN_NOTUP2DATE;
|
||||||
|
for (ps = peers->remote; ps; ps = ps->next) {
|
||||||
|
if (ps->srv->shard && ps != peer) {
|
||||||
|
if (ps->srv->shard == peer->srv->shard) {
|
||||||
|
/* flag all peers from same shard
|
||||||
|
* notup2date to disable request
|
||||||
|
* of a resync frm them
|
||||||
|
*/
|
||||||
|
ps->flags |= PEER_F_LEARN_NOTUP2DATE;
|
||||||
|
}
|
||||||
|
else if (!(ps->flags & PEER_F_LEARN_NOTUP2DATE)) {
|
||||||
|
/* it remains some other shards not requested
|
||||||
|
* we don't commit a resync finish to request
|
||||||
|
* the other shards
|
||||||
|
*/
|
||||||
|
commit_a_finish = 0;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if (!commit_a_finish) {
|
||||||
|
/* it remains some shard to request, we schedule a new request */
|
||||||
|
peers->resync_timeout = tick_add(now_ms, MS_TO_TICKS(PEER_RESYNC_TIMEOUT));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if (commit_a_finish) {
|
||||||
|
peers->flags |= (PEERS_F_RESYNC_LOCAL|PEERS_F_RESYNC_REMOTE);
|
||||||
|
peers->flags |= (peer->local ? PEERS_F_RESYNC_LOCALFINISHED : PEERS_F_RESYNC_REMOTEFINISHED);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
peer->flags &= ~(PEER_F_LEARN_ASSIGN|PEER_F_LEARN_PROCESS|PEER_F_LEARN_FINISHED);
|
||||||
|
peers->flags &= ~(PEERS_F_RESYNC_ASSIGN|PEERS_F_RESYNC_PROCESS);
|
||||||
}
|
}
|
||||||
|
|
||||||
static void __process_peer_state(struct peers *peers, struct peer *peer)
|
static void __process_peer_state(struct peers *peers, struct peer *peer)
|
||||||
{
|
{
|
||||||
if (peer->flags & PEER_F_RESYNC_REQUESTED)
|
if (peer->flags & PEER_F_RESYNC_REQUESTED)
|
||||||
peers->flags |= PEERS_F_RESYNC_REQUESTED;
|
peers->flags |= PEERS_F_RESYNC_REQUESTED;
|
||||||
|
|
||||||
|
/* Check peer state. Order is important */
|
||||||
|
if (peer->flags & (PEER_F_ST_RELEASED|PEER_F_ST_RENEWED)) {
|
||||||
|
if (peer->flags & PEER_F_LEARN_ASSIGN) {
|
||||||
|
/* unassign current peer for learning */
|
||||||
|
peers->flags &= ~(PEERS_F_RESYNC_ASSIGN|PEERS_F_RESYNC_PROCESS);
|
||||||
|
peers->flags |= (peer->local ? PEERS_F_RESYNC_LOCALABORT : PEERS_F_RESYNC_REMOTEABORT);
|
||||||
|
/* reschedule a resync */
|
||||||
|
peers->resync_timeout = tick_add(now_ms, MS_TO_TICKS(5000));
|
||||||
|
}
|
||||||
|
peer->flags &= PEER_TEACH_RESET;
|
||||||
|
peer->flags &= PEER_LEARN_RESET;
|
||||||
|
}
|
||||||
|
if (peer->flags & (PEER_F_ST_ACCEPTED|PEER_F_ST_RENEWED)) {
|
||||||
|
peer->flags &= PEER_TEACH_RESET;
|
||||||
|
peer->flags &= PEER_LEARN_RESET;
|
||||||
|
|
||||||
|
/* if current peer is local */
|
||||||
|
if (peer->local) {
|
||||||
|
/* if current host need resyncfrom local and no process assigned */
|
||||||
|
if ((peers->flags & PEERS_RESYNC_STATEMASK) == PEERS_RESYNC_FROMLOCAL &&
|
||||||
|
!(peers->flags & PEERS_F_RESYNC_ASSIGN)) {
|
||||||
|
/* assign local peer for a lesson, consider lesson already requested */
|
||||||
|
peer->flags |= (PEER_F_LEARN_ASSIGN|PEERS_F_RESYNC_PROCESS);
|
||||||
|
peers->flags |= (PEERS_F_RESYNC_ASSIGN|PEERS_F_RESYNC_PROCESS);
|
||||||
|
peers->flags |= PEERS_F_RESYNC_LOCALASSIGN;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
else if ((peers->flags & PEERS_RESYNC_STATEMASK) == PEERS_RESYNC_FROMREMOTE &&
|
||||||
|
!(peers->flags & PEERS_F_RESYNC_ASSIGN)) {
|
||||||
|
/* assign peer for a lesson */
|
||||||
|
peer->flags |= PEER_F_LEARN_ASSIGN;
|
||||||
|
peers->flags |= PEERS_F_RESYNC_ASSIGN;
|
||||||
|
peers->flags |= PEERS_F_RESYNC_REMOTEASSIGN;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if (peer->flags & PEER_F_ST_CONNECTED) {
|
||||||
|
peer->flags &= PEER_TEACH_RESET;
|
||||||
|
peer->flags &= PEER_LEARN_RESET;
|
||||||
|
|
||||||
|
/* If current peer is local */
|
||||||
|
if (peer->local) {
|
||||||
|
/* flag to start to teach lesson */
|
||||||
|
peer->flags |= PEER_F_TEACH_PROCESS;
|
||||||
|
}
|
||||||
|
else if ((peers->flags & PEERS_RESYNC_STATEMASK) == PEERS_RESYNC_FROMREMOTE &&
|
||||||
|
!(peers->flags & PEERS_F_RESYNC_ASSIGN)) {
|
||||||
|
/* If peer is remote and resync from remote is needed,
|
||||||
|
and no peer currently assigned */
|
||||||
|
|
||||||
|
/* assign peer for a lesson */
|
||||||
|
peer->flags |= PEER_F_LEARN_ASSIGN;
|
||||||
|
peers->flags |= PEERS_F_RESYNC_ASSIGN;
|
||||||
|
peers->flags |= PEERS_F_RESYNC_REMOTEASSIGN;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
peer->flags &= PEER_STATE_RESET;
|
||||||
}
|
}
|
||||||
|
|
||||||
static void __process_running_peer_sync(struct task *task, struct peers *peers, unsigned int state)
|
static void __process_running_peer_sync(struct task *task, struct peers *peers, unsigned int state)
|
||||||
|
Loading…
x
Reference in New Issue
Block a user