MEDIUM: mworker: seamless reload use the internal sockpairs

With the master worker, the seamless reload was still requiring an
external stats socket to the previous process, which is a pain to
configure.

This patch implements a way to use the internal socketpair between the
master and the workers to transfer the sockets during the reload.
This way, the master will always try to transfer the socket, even
without any configuration.

The master will still reload with the -x argument, followed by the
sockpair@ syntax. ( ex -x sockpair@4 ). Which use the FD of internal CLI
to the worker.
This commit is contained in:
William Lallemand 2021-11-24 18:45:37 +01:00
parent 82d5f013f9
commit 2be557f7cb
5 changed files with 63 additions and 60 deletions

View File

@ -13755,8 +13755,8 @@ defer-accept
expose-fd listeners expose-fd listeners
This option is only usable with the stats socket. It gives your stats socket This option is only usable with the stats socket. It gives your stats socket
the capability to pass listeners FD to another HAProxy process. the capability to pass listeners FD to another HAProxy process.
During a reload with the master-worker mode, the process is automatically In master-worker mode, this is not required anymore, the listeners will be
reexecuted adding -x and one of the stats socket with this option. passed using the internal socketpairs between the master and the workers.
See also "-x" in the management guide. See also "-x" in the management guide.
force-sslv3 force-sslv3

View File

@ -327,6 +327,9 @@ list of options is :
bind new ones. This is useful to avoid missing any new connection when bind new ones. This is useful to avoid missing any new connection when
reloading the configuration on Linux. The capability must be enable on the reloading the configuration on Linux. The capability must be enable on the
stats socket using "expose-fd listeners" in your configuration. stats socket using "expose-fd listeners" in your configuration.
In master-worker mode, the master will use this option upon a reload with
the "sockpair@" syntax, which allows the master to connect directly to a
worker without using stats socket declared in the configuration.
A safe way to start HAProxy from an init file consists in forcing the daemon A safe way to start HAProxy from an init file consists in forcing the daemon
mode, storing existing pids to a pid file and using this pid file to notify mode, storing existing pids to a pid file and using this pid file to notify

View File

@ -2906,6 +2906,7 @@ int mworker_cli_sockpair_new(struct mworker_proc *mworker_proc, int proc)
bind_conf->level &= ~ACCESS_LVL_MASK; bind_conf->level &= ~ACCESS_LVL_MASK;
bind_conf->level |= ACCESS_LVL_ADMIN; /* TODO: need to lower the rights with a CLI keyword*/ bind_conf->level |= ACCESS_LVL_ADMIN; /* TODO: need to lower the rights with a CLI keyword*/
bind_conf->level |= ACCESS_FD_LISTENERS;
if (!memprintf(&path, "sockpair@%d", mworker_proc->ipc_fd[1])) { if (!memprintf(&path, "sockpair@%d", mworker_proc->ipc_fd[1])) {
ha_alert("Cannot allocate listener.\n"); ha_alert("Cannot allocate listener.\n");

View File

@ -230,8 +230,6 @@ static int oldpids_sig; /* use USR1 or TERM */
/* Path to the unix socket we use to retrieve listener sockets from the old process */ /* Path to the unix socket we use to retrieve listener sockets from the old process */
static const char *old_unixsocket; static const char *old_unixsocket;
static char *cur_unixsocket = NULL;
int atexit_flag = 0; int atexit_flag = 0;
int nb_oldpids = 0; int nb_oldpids = 0;
@ -651,41 +649,6 @@ int delete_oldpid(int pid)
} }
static void get_cur_unixsocket()
{
/* if -x was used, try to update the stat socket if not available anymore */
if (global.cli_fe) {
struct bind_conf *bind_conf;
/* pass through all stats socket */
list_for_each_entry(bind_conf, &global.cli_fe->conf.bind, by_fe) {
struct listener *l;
list_for_each_entry(l, &bind_conf->listeners, by_bind) {
if (l->rx.addr.ss_family == AF_UNIX &&
(bind_conf->level & ACCESS_FD_LISTENERS)) {
const struct sockaddr_un *un;
un = (struct sockaddr_un *)&l->rx.addr;
/* priority to old_unixsocket */
if (!cur_unixsocket) {
cur_unixsocket = strdup(un->sun_path);
} else {
if (old_unixsocket && strcmp(un->sun_path, old_unixsocket) == 0) {
free(cur_unixsocket);
cur_unixsocket = strdup(old_unixsocket);
return;
}
}
}
}
}
}
if (!cur_unixsocket && old_unixsocket)
cur_unixsocket = strdup(old_unixsocket);
}
/* /*
* When called, this function reexec haproxy with -sf followed by current * When called, this function reexec haproxy with -sf followed by current
* children PIDs and possibly old children PIDs if they didn't leave yet. * children PIDs and possibly old children PIDs if they didn't leave yet.
@ -699,6 +662,7 @@ static void mworker_reexec()
char *msg = NULL; char *msg = NULL;
struct rlimit limit; struct rlimit limit;
struct per_thread_deinit_fct *ptdf; struct per_thread_deinit_fct *ptdf;
struct mworker_proc *current_child = NULL;
mworker_block_signals(); mworker_block_signals();
#if defined(USE_SYSTEMD) #if defined(USE_SYSTEMD)
@ -763,6 +727,9 @@ static void mworker_reexec()
next_argv[next_argc++] = "-sf"; next_argv[next_argc++] = "-sf";
list_for_each_entry(child, &proc_list, list) { list_for_each_entry(child, &proc_list, list) {
if (!(child->options & PROC_O_LEAVING) && (child->options & PROC_O_TYPE_WORKER))
current_child = child;
if (!(child->options & (PROC_O_TYPE_WORKER|PROC_O_TYPE_PROG)) || child->pid <= -1 ) if (!(child->options & (PROC_O_TYPE_WORKER|PROC_O_TYPE_PROG)) || child->pid <= -1 )
continue; continue;
if ((next_argv[next_argc++] = memprintf(&msg, "%d", child->pid)) == NULL) if ((next_argv[next_argc++] = memprintf(&msg, "%d", child->pid)) == NULL)
@ -770,10 +737,17 @@ static void mworker_reexec()
msg = NULL; msg = NULL;
} }
} }
/* add the -x option with the stat socket */
if (cur_unixsocket) {
next_argv[next_argc++] = "-x"; if (getenv("HAPROXY_MWORKER_WAIT_ONLY") == NULL) {
next_argv[next_argc++] = (char *)cur_unixsocket;
if (current_child) {
/* add the -x option with the socketpair of the current worker */
next_argv[next_argc++] = "-x";
if ((next_argv[next_argc++] = memprintf(&msg, "sockpair@%d", current_child->ipc_fd[0])) == NULL)
goto alloc_error;
msg = NULL;
}
} }
/* copy the previous options */ /* copy the previous options */
@ -3009,7 +2983,6 @@ int main(int argc, char **argv)
} }
} }
} }
get_cur_unixsocket();
/* We will loop at most 100 times with 10 ms delay each time. /* We will loop at most 100 times with 10 ms delay each time.
* That's at most 1 second. We only send a signal to old pids * That's at most 1 second. We only send a signal to old pids

View File

@ -31,6 +31,7 @@
#include <haproxy/listener.h> #include <haproxy/listener.h>
#include <haproxy/log.h> #include <haproxy/log.h>
#include <haproxy/namespace.h> #include <haproxy/namespace.h>
#include <haproxy/proto_sockpair.h>
#include <haproxy/sock.h> #include <haproxy/sock.h>
#include <haproxy/sock_inet.h> #include <haproxy/sock_inet.h>
#include <haproxy/tools.h> #include <haproxy/tools.h>
@ -285,6 +286,47 @@ int sock_get_old_sockets(const char *unixsocket)
int cur_fd = 0; int cur_fd = 0;
size_t maxoff = 0, curoff = 0; size_t maxoff = 0, curoff = 0;
if (strncmp("sockpair@", unixsocket, strlen("sockpair@")) == 0) {
/* sockpair for master-worker usage */
int sv[2];
int dst_fd;
dst_fd = strtoll(unixsocket + strlen("sockpair@"), NULL, 0);
if (socketpair(PF_UNIX, SOCK_STREAM, 0, sv) == -1) {
ha_warning("socketpair(): Cannot create socketpair. Giving up.\n");
}
if (send_fd_uxst(dst_fd, sv[0]) == -1) {
ha_alert("socketpair: cannot transfer socket.\n");
close(sv[0]);
close(sv[1]);
goto out;
}
close(sv[0]); /* we don't need this side anymore */
sock = sv[1];
} else {
/* Unix socket */
sock = socket(PF_UNIX, SOCK_STREAM, 0);
if (sock < 0) {
ha_warning("Failed to connect to the old process socket '%s'\n", unixsocket);
goto out;
}
strncpy(addr.sun_path, unixsocket, sizeof(addr.sun_path) - 1);
addr.sun_path[sizeof(addr.sun_path) - 1] = 0;
addr.sun_family = PF_UNIX;
ret = connect(sock, (struct sockaddr *)&addr, sizeof(addr));
if (ret < 0) {
ha_warning("Failed to connect to the old process socket '%s'\n", unixsocket);
goto out;
}
}
memset(&msghdr, 0, sizeof(msghdr)); memset(&msghdr, 0, sizeof(msghdr));
cmsgbuf = malloc(CMSG_SPACE(sizeof(int)) * MAX_SEND_FD); cmsgbuf = malloc(CMSG_SPACE(sizeof(int)) * MAX_SEND_FD);
if (!cmsgbuf) { if (!cmsgbuf) {
@ -292,22 +334,6 @@ int sock_get_old_sockets(const char *unixsocket)
goto out; goto out;
} }
sock = socket(PF_UNIX, SOCK_STREAM, 0);
if (sock < 0) {
ha_warning("Failed to connect to the old process socket '%s'\n", unixsocket);
goto out;
}
strncpy(addr.sun_path, unixsocket, sizeof(addr.sun_path) - 1);
addr.sun_path[sizeof(addr.sun_path) - 1] = 0;
addr.sun_family = PF_UNIX;
ret = connect(sock, (struct sockaddr *)&addr, sizeof(addr));
if (ret < 0) {
ha_warning("Failed to connect to the old process socket '%s'\n", unixsocket);
goto out;
}
setsockopt(sock, SOL_SOCKET, SO_RCVTIMEO, (void *)&tv, sizeof(tv)); setsockopt(sock, SOL_SOCKET, SO_RCVTIMEO, (void *)&tv, sizeof(tv));
iov.iov_base = &fd_nb; iov.iov_base = &fd_nb;
iov.iov_len = sizeof(fd_nb); iov.iov_len = sizeof(fd_nb);