haproxy/src/proto_tcp.c

821 lines
25 KiB
C
Raw Normal View History

/*
* AF_INET/AF_INET6 SOCK_STREAM protocol layer (tcp)
*
* Copyright 2000-2013 Willy Tarreau <w@1wt.eu>
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version
* 2 of the License, or (at your option) any later version.
*
*/
#include <ctype.h>
#include <errno.h>
#include <fcntl.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <time.h>
#include <sys/param.h>
#include <sys/socket.h>
#include <sys/types.h>
#include <netinet/tcp.h>
#include <netinet/in.h>
#include <haproxy/api.h>
#include <haproxy/arg.h>
#include <haproxy/connection.h>
#include <haproxy/errors.h>
#include <haproxy/fd.h>
#include <haproxy/global.h>
#include <haproxy/list.h>
#include <haproxy/listener.h>
#include <haproxy/log.h>
#include <haproxy/namespace.h>
#include <haproxy/port_range.h>
#include <haproxy/proto_tcp.h>
#include <haproxy/protocol.h>
#include <haproxy/proxy-t.h>
#include <haproxy/sock.h>
#include <haproxy/sock_inet.h>
#include <haproxy/tools.h>
static int tcp_bind_listener(struct listener *listener, char *errmsg, int errlen);
static int tcp_suspend_receiver(struct receiver *rx);
static int tcp_resume_receiver(struct receiver *rx);
static void tcp_enable_listener(struct listener *listener);
static void tcp_disable_listener(struct listener *listener);
static void tcpv4_add_listener(struct listener *listener, int port);
static void tcpv6_add_listener(struct listener *listener, int port);
/* Note: must not be declared <const> as its list will be overwritten */
static struct protocol proto_tcpv4 = {
.name = "tcpv4",
.fam = &proto_fam_inet4,
.ctrl_type = SOCK_STREAM,
.sock_domain = AF_INET,
.sock_type = SOCK_STREAM,
.sock_prot = IPPROTO_TCP,
.add = tcpv4_add_listener,
.listen = tcp_bind_listener,
.enable = tcp_enable_listener,
.disable = tcp_disable_listener,
.unbind = default_unbind_listener,
.suspend = default_suspend_listener,
.resume = default_resume_listener,
.rx_enable = sock_enable,
.rx_disable = sock_disable,
.rx_unbind = sock_unbind,
.rx_suspend = tcp_suspend_receiver,
.rx_resume = tcp_resume_receiver,
.rx_listening = sock_accept_conn,
.accept = &listener_accept,
.connect = tcp_connect_server,
.receivers = LIST_HEAD_INIT(proto_tcpv4.receivers),
.nb_receivers = 0,
};
INITCALL1(STG_REGISTER, protocol_register, &proto_tcpv4);
/* Note: must not be declared <const> as its list will be overwritten */
static struct protocol proto_tcpv6 = {
.name = "tcpv6",
.fam = &proto_fam_inet6,
.ctrl_type = SOCK_STREAM,
.sock_domain = AF_INET6,
.sock_type = SOCK_STREAM,
.sock_prot = IPPROTO_TCP,
.add = tcpv6_add_listener,
.listen = tcp_bind_listener,
.enable = tcp_enable_listener,
.disable = tcp_disable_listener,
.unbind = default_unbind_listener,
.suspend = default_suspend_listener,
.resume = default_resume_listener,
.rx_enable = sock_enable,
.rx_disable = sock_disable,
.rx_unbind = sock_unbind,
.rx_suspend = tcp_suspend_receiver,
.rx_resume = tcp_resume_receiver,
.rx_listening = sock_accept_conn,
.accept = &listener_accept,
.connect = tcp_connect_server,
.receivers = LIST_HEAD_INIT(proto_tcpv6.receivers),
.nb_receivers = 0,
};
INITCALL1(STG_REGISTER, protocol_register, &proto_tcpv6);
/* Binds ipv4/ipv6 address <local> to socket <fd>, unless <flags> is set, in which
* case we try to bind <remote>. <flags> is a 2-bit field consisting of :
* - 0 : ignore remote address (may even be a NULL pointer)
* - 1 : use provided address
* - 2 : use provided port
* - 3 : use both
*
* The function supports multiple foreign binding methods :
* - linux_tproxy: we directly bind to the foreign address
* The second one can be used as a fallback for the first one.
* This function returns 0 when everything's OK, 1 if it could not bind, to the
* local address, 2 if it could not bind to the foreign address.
*/
int tcp_bind_socket(int fd, int flags, struct sockaddr_storage *local, struct sockaddr_storage *remote)
{
struct sockaddr_storage bind_addr;
int foreign_ok = 0;
int ret;
static THREAD_LOCAL int ip_transp_working = 1;
static THREAD_LOCAL int ip6_transp_working = 1;
switch (local->ss_family) {
case AF_INET:
if (flags && ip_transp_working) {
/* This deserves some explanation. Some platforms will support
* multiple combinations of certain methods, so we try the
* supported ones until one succeeds.
*/
if (sock_inet4_make_foreign(fd))
foreign_ok = 1;
else
ip_transp_working = 0;
}
break;
case AF_INET6:
if (flags && ip6_transp_working) {
if (sock_inet6_make_foreign(fd))
foreign_ok = 1;
else
ip6_transp_working = 0;
}
break;
}
if (flags) {
memset(&bind_addr, 0, sizeof(bind_addr));
bind_addr.ss_family = remote->ss_family;
switch (remote->ss_family) {
case AF_INET:
if (flags & 1)
((struct sockaddr_in *)&bind_addr)->sin_addr = ((struct sockaddr_in *)remote)->sin_addr;
if (flags & 2)
((struct sockaddr_in *)&bind_addr)->sin_port = ((struct sockaddr_in *)remote)->sin_port;
break;
case AF_INET6:
if (flags & 1)
((struct sockaddr_in6 *)&bind_addr)->sin6_addr = ((struct sockaddr_in6 *)remote)->sin6_addr;
if (flags & 2)
((struct sockaddr_in6 *)&bind_addr)->sin6_port = ((struct sockaddr_in6 *)remote)->sin6_port;
break;
default:
/* we don't want to try to bind to an unknown address family */
foreign_ok = 0;
}
}
setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, &one, sizeof(one));
if (foreign_ok) {
if (is_inet_addr(&bind_addr)) {
ret = bind(fd, (struct sockaddr *)&bind_addr, get_addr_len(&bind_addr));
if (ret < 0)
return 2;
}
}
else {
if (is_inet_addr(local)) {
ret = bind(fd, (struct sockaddr *)local, get_addr_len(local));
if (ret < 0)
return 1;
}
}
if (!flags)
return 0;
if (!foreign_ok)
/* we could not bind to a foreign address */
return 2;
return 0;
}
/*
* This function initiates a TCP connection establishment to the target assigned
* to connection <conn> using (si->{target,dst}). A source address may be
* pointed to by conn->src in case of transparent proxying. Normal source
* bind addresses are still determined locally (due to the possible need of a
* source port). conn->target may point either to a valid server or to a backend,
* depending on conn->target. Only OBJ_TYPE_PROXY and OBJ_TYPE_SERVER are
* supported. The <data> parameter is a boolean indicating whether there are data
* waiting for being sent or not, in order to adjust data write polling and on
* some platforms, the ability to avoid an empty initial ACK. The <flags> argument
* allows the caller to force using a delayed ACK when establishing the connection
* - 0 = no delayed ACK unless data are advertised and backend has tcp-smart-connect
* - CONNECT_DELACK_SMART_CONNECT = delayed ACK if backend has tcp-smart-connect, regardless of data
* - CONNECT_DELACK_ALWAYS = delayed ACK regardless of backend options
*
* Note that a pending send_proxy message accounts for data.
*
* It can return one of :
* - SF_ERR_NONE if everything's OK
* - SF_ERR_SRVTO if there are no more servers
* - SF_ERR_SRVCL if the connection was refused by the server
* - SF_ERR_PRXCOND if the connection has been limited by the proxy (maxconn)
* - SF_ERR_RESOURCE if a system resource is lacking (eg: fd limits, ports, ...)
* - SF_ERR_INTERNAL for any other purely internal errors
* Additionally, in the case of SF_ERR_RESOURCE, an emergency log will be emitted.
*
* The connection's fd is inserted only when SF_ERR_NONE is returned, otherwise
* it's invalid and the caller has nothing to do.
*/
int tcp_connect_server(struct connection *conn, int flags)
{
int fd;
struct server *srv;
struct proxy *be;
struct conn_src *src;
int use_fastopen = 0;
struct sockaddr_storage *addr;
conn->flags |= CO_FL_WAIT_L4_CONN; /* connection in progress */
switch (obj_type(conn->target)) {
case OBJ_TYPE_PROXY:
be = objt_proxy(conn->target);
srv = NULL;
break;
case OBJ_TYPE_SERVER:
srv = objt_server(conn->target);
be = srv->proxy;
/* Make sure we check that we have data before activating
* TFO, or we could trigger a kernel issue whereby after
* a successful connect() == 0, any subsequent connect()
* will return EINPROGRESS instead of EISCONN.
*/
use_fastopen = (srv->flags & SRV_F_FASTOPEN) &&
((flags & (CONNECT_CAN_USE_TFO | CONNECT_HAS_DATA)) ==
(CONNECT_CAN_USE_TFO | CONNECT_HAS_DATA));
break;
default:
conn->flags |= CO_FL_ERROR;
return SF_ERR_INTERNAL;
}
if (!conn->dst) {
conn->flags |= CO_FL_ERROR;
return SF_ERR_INTERNAL;
}
fd = conn->handle.fd = sock_create_server_socket(conn);
MAJOR: namespace: add Linux network namespace support This patch makes it possible to create binds and servers in separate namespaces. This can be used to proxy between multiple completely independent virtual networks (with possibly overlapping IP addresses) and a non-namespace-aware proxy implementation that supports the proxy protocol (v2). The setup is something like this: net1 on VLAN 1 (namespace 1) -\ net2 on VLAN 2 (namespace 2) -- haproxy ==== proxy (namespace 0) net3 on VLAN 3 (namespace 3) -/ The proxy is configured to make server connections through haproxy and sending the expected source/target addresses to haproxy using the proxy protocol. The network namespace setup on the haproxy node is something like this: = 8< = $ cat setup.sh ip netns add 1 ip link add link eth1 type vlan id 1 ip link set eth1.1 netns 1 ip netns exec 1 ip addr add 192.168.91.2/24 dev eth1.1 ip netns exec 1 ip link set eth1.$id up ... = 8< = = 8< = $ cat haproxy.cfg frontend clients bind 127.0.0.1:50022 namespace 1 transparent default_backend scb backend server mode tcp server server1 192.168.122.4:2222 namespace 2 send-proxy-v2 = 8< = A bind line creates the listener in the specified namespace, and connections originating from that listener also have their network namespace set to that of the listener. A server line either forces the connection to be made in a specified namespace or may use the namespace from the client-side connection if that was set. For more documentation please read the documentation included in the patch itself. Signed-off-by: KOVACS Tamas <ktamas@balabit.com> Signed-off-by: Sarkozi Laszlo <laszlo.sarkozi@balabit.com> Signed-off-by: KOVACS Krisztian <hidden@balabit.com>
2014-11-17 17:11:45 +03:00
if (fd == -1) {
qfprintf(stderr, "Cannot get a server socket.\n");
if (errno == ENFILE) {
conn->err_code = CO_ER_SYS_FDLIM;
send_log(be, LOG_EMERG,
"Proxy %s reached system FD limit (maxsock=%d). Please check system tunables.\n",
be->id, global.maxsock);
}
else if (errno == EMFILE) {
conn->err_code = CO_ER_PROC_FDLIM;
send_log(be, LOG_EMERG,
"Proxy %s reached process FD limit (maxsock=%d). Please check 'ulimit-n' and restart.\n",
be->id, global.maxsock);
}
else if (errno == ENOBUFS || errno == ENOMEM) {
conn->err_code = CO_ER_SYS_MEMLIM;
send_log(be, LOG_EMERG,
"Proxy %s reached system memory limit (maxsock=%d). Please check system tunables.\n",
be->id, global.maxsock);
}
else if (errno == EAFNOSUPPORT || errno == EPROTONOSUPPORT) {
conn->err_code = CO_ER_NOPROTO;
}
else
conn->err_code = CO_ER_SOCK_ERR;
/* this is a resource error */
conn->flags |= CO_FL_ERROR;
return SF_ERR_RESOURCE;
}
if (fd >= global.maxsock) {
/* do not log anything there, it's a normal condition when this option
* is used to serialize connections to a server !
*/
ha_alert("socket(): not enough free sockets. Raise -n argument. Giving up.\n");
close(fd);
conn->err_code = CO_ER_CONF_FDLIM;
conn->flags |= CO_FL_ERROR;
return SF_ERR_PRXCOND; /* it is a configuration limit */
}
if ((fcntl(fd, F_SETFL, O_NONBLOCK)==-1) ||
(setsockopt(fd, IPPROTO_TCP, TCP_NODELAY, &one, sizeof(one)) == -1)) {
qfprintf(stderr,"Cannot set client socket to non blocking mode.\n");
close(fd);
conn->err_code = CO_ER_SOCK_ERR;
conn->flags |= CO_FL_ERROR;
return SF_ERR_INTERNAL;
}
if (master == 1 && (fcntl(fd, F_SETFD, FD_CLOEXEC) == -1)) {
ha_alert("Cannot set CLOEXEC on client socket.\n");
close(fd);
conn->err_code = CO_ER_SOCK_ERR;
conn->flags |= CO_FL_ERROR;
return SF_ERR_INTERNAL;
}
if (be->options & PR_O_TCP_SRV_KA) {
setsockopt(fd, SOL_SOCKET, SO_KEEPALIVE, &one, sizeof(one));
#ifdef TCP_KEEPCNT
if (be->srvtcpka_cnt)
setsockopt(fd, IPPROTO_TCP, TCP_KEEPCNT, &be->srvtcpka_cnt, sizeof(be->srvtcpka_cnt));
#endif
#ifdef TCP_KEEPIDLE
if (be->srvtcpka_idle)
setsockopt(fd, IPPROTO_TCP, TCP_KEEPIDLE, &be->srvtcpka_idle, sizeof(be->srvtcpka_idle));
#endif
#ifdef TCP_KEEPINTVL
if (be->srvtcpka_intvl)
setsockopt(fd, IPPROTO_TCP, TCP_KEEPINTVL, &be->srvtcpka_intvl, sizeof(be->srvtcpka_intvl));
#endif
}
/* allow specific binding :
* - server-specific at first
* - proxy-specific next
*/
if (srv && srv->conn_src.opts & CO_SRC_BIND)
src = &srv->conn_src;
else if (be->conn_src.opts & CO_SRC_BIND)
src = &be->conn_src;
else
src = NULL;
if (src) {
int ret, flags = 0;
if (conn->src && is_inet_addr(conn->src)) {
switch (src->opts & CO_SRC_TPROXY_MASK) {
case CO_SRC_TPROXY_CLI:
conn_set_private(conn);
/* fall through */
case CO_SRC_TPROXY_ADDR:
flags = 3;
break;
case CO_SRC_TPROXY_CIP:
case CO_SRC_TPROXY_DYN:
conn_set_private(conn);
flags = 1;
break;
}
}
#ifdef SO_BINDTODEVICE
/* Note: this might fail if not CAP_NET_RAW */
if (src->iface_name)
setsockopt(fd, SOL_SOCKET, SO_BINDTODEVICE, src->iface_name, src->iface_len + 1);
#endif
if (src->sport_range) {
int attempts = 10; /* should be more than enough to find a spare port */
struct sockaddr_storage sa;
ret = 1;
memcpy(&sa, &src->source_addr, sizeof(sa));
do {
/* note: in case of retry, we may have to release a previously
* allocated port, hence this loop's construct.
*/
port_range_release_port(fdinfo[fd].port_range, fdinfo[fd].local_port);
fdinfo[fd].port_range = NULL;
if (!attempts)
break;
attempts--;
fdinfo[fd].local_port = port_range_alloc_port(src->sport_range);
if (!fdinfo[fd].local_port) {
conn->err_code = CO_ER_PORT_RANGE;
break;
}
fdinfo[fd].port_range = src->sport_range;
set_host_port(&sa, fdinfo[fd].local_port);
ret = tcp_bind_socket(fd, flags, &sa, conn->src);
if (ret != 0)
conn->err_code = CO_ER_CANT_BIND;
} while (ret != 0); /* binding NOK */
}
else {
#ifdef IP_BIND_ADDRESS_NO_PORT
static THREAD_LOCAL int bind_address_no_port = 1;
setsockopt(fd, SOL_IP, IP_BIND_ADDRESS_NO_PORT, (const void *) &bind_address_no_port, sizeof(int));
#endif
ret = tcp_bind_socket(fd, flags, &src->source_addr, conn->src);
if (ret != 0)
conn->err_code = CO_ER_CANT_BIND;
}
if (unlikely(ret != 0)) {
port_range_release_port(fdinfo[fd].port_range, fdinfo[fd].local_port);
fdinfo[fd].port_range = NULL;
close(fd);
if (ret == 1) {
ha_alert("Cannot bind to source address before connect() for backend %s. Aborting.\n",
be->id);
send_log(be, LOG_EMERG,
"Cannot bind to source address before connect() for backend %s.\n",
be->id);
} else {
ha_alert("Cannot bind to tproxy source address before connect() for backend %s. Aborting.\n",
be->id);
send_log(be, LOG_EMERG,
"Cannot bind to tproxy source address before connect() for backend %s.\n",
be->id);
}
conn->flags |= CO_FL_ERROR;
return SF_ERR_RESOURCE;
}
}
#if defined(TCP_QUICKACK)
/* disabling tcp quick ack now allows the first request to leave the
* machine with the first ACK. We only do this if there are pending
* data in the buffer.
*/
if (flags & (CONNECT_DELACK_ALWAYS) ||
((flags & CONNECT_DELACK_SMART_CONNECT ||
(flags & CONNECT_HAS_DATA) || conn->send_proxy_ofs) &&
(be->options2 & PR_O2_SMARTCON)))
setsockopt(fd, IPPROTO_TCP, TCP_QUICKACK, &zero, sizeof(zero));
#endif
#ifdef TCP_USER_TIMEOUT
/* there is not much more we can do here when it fails, it's still minor */
if (srv && srv->tcp_ut)
setsockopt(fd, IPPROTO_TCP, TCP_USER_TIMEOUT, &srv->tcp_ut, sizeof(srv->tcp_ut));
#endif
if (use_fastopen) {
#if defined(TCP_FASTOPEN_CONNECT)
setsockopt(fd, IPPROTO_TCP, TCP_FASTOPEN_CONNECT, &one, sizeof(one));
#endif
}
if (global.tune.server_sndbuf)
setsockopt(fd, SOL_SOCKET, SO_SNDBUF, &global.tune.server_sndbuf, sizeof(global.tune.server_sndbuf));
if (global.tune.server_rcvbuf)
setsockopt(fd, SOL_SOCKET, SO_RCVBUF, &global.tune.server_rcvbuf, sizeof(global.tune.server_rcvbuf));
addr = (conn->flags & CO_FL_SOCKS4) ? &srv->socks4_addr : conn->dst;
if (connect(fd, (const struct sockaddr *)addr, get_addr_len(addr)) == -1) {
if (errno == EINPROGRESS || errno == EALREADY) {
/* common case, let's wait for connect status */
conn->flags |= CO_FL_WAIT_L4_CONN;
}
else if (errno == EISCONN) {
/* should normally not happen but if so, indicates that it's OK */
conn->flags &= ~CO_FL_WAIT_L4_CONN;
}
else if (errno == EAGAIN || errno == EADDRINUSE || errno == EADDRNOTAVAIL) {
char *msg;
if (errno == EAGAIN || errno == EADDRNOTAVAIL) {
msg = "no free ports";
conn->err_code = CO_ER_FREE_PORTS;
}
else {
msg = "local address already in use";
conn->err_code = CO_ER_ADDR_INUSE;
}
qfprintf(stderr,"Connect() failed for backend %s: %s.\n", be->id, msg);
port_range_release_port(fdinfo[fd].port_range, fdinfo[fd].local_port);
fdinfo[fd].port_range = NULL;
close(fd);
send_log(be, LOG_ERR, "Connect() failed for backend %s: %s.\n", be->id, msg);
conn->flags |= CO_FL_ERROR;
return SF_ERR_RESOURCE;
} else if (errno == ETIMEDOUT) {
//qfprintf(stderr,"Connect(): ETIMEDOUT");
port_range_release_port(fdinfo[fd].port_range, fdinfo[fd].local_port);
fdinfo[fd].port_range = NULL;
close(fd);
conn->err_code = CO_ER_SOCK_ERR;
conn->flags |= CO_FL_ERROR;
return SF_ERR_SRVTO;
} else {
// (errno == ECONNREFUSED || errno == ENETUNREACH || errno == EACCES || errno == EPERM)
//qfprintf(stderr,"Connect(): %d", errno);
port_range_release_port(fdinfo[fd].port_range, fdinfo[fd].local_port);
fdinfo[fd].port_range = NULL;
close(fd);
conn->err_code = CO_ER_SOCK_ERR;
conn->flags |= CO_FL_ERROR;
return SF_ERR_SRVCL;
}
}
else {
/* connect() == 0, this is great! */
conn->flags &= ~CO_FL_WAIT_L4_CONN;
}
conn->flags |= CO_FL_ADDR_TO_SET;
MAJOR: connection: add two new flags to indicate readiness of control/transport Currently the control and transport layers of a connection are supposed to be initialized when their respective pointers are not NULL. This will not work anymore when we plan to reuse connections, because there is an asymmetry between the accept() side and the connect() side : - on accept() side, the fd is set first, then the ctrl layer then the transport layer ; upon error, they must be undone in the reverse order, then the FD must be closed. The FD must not be deleted if the control layer was not yet initialized ; - on the connect() side, the fd is set last and there is no reliable way to know if it has been initialized or not. In practice it's initialized to -1 first but this is hackish and supposes that local FDs only will be used forever. Also, there are even less solutions for keeping trace of the transport layer's state. Also it is possible to support delayed close() when something (eg: logs) tracks some information requiring the transport and/or control layers, making it even more difficult to clean them. So the proposed solution is to add two flags to the connection : - CO_FL_CTRL_READY is set when the control layer is initialized (fd_insert) and cleared after it's released (fd_delete). - CO_FL_XPRT_READY is set when the control layer is initialized (xprt->init) and cleared after it's released (xprt->close). The functions have been adapted to rely on this and not on the pointers anymore. conn_xprt_close() was unused and dangerous : it did not close the control layer (eg: the socket itself) but still marks the transport layer as closed, preventing any future call to conn_full_close() from finishing the job. The problem comes from conn_full_close() in fact. It needs to close the xprt and ctrl layers independantly. After that we're still having an issue : we don't know based on ->ctrl alone whether the fd was registered or not. For this we use the two new flags CO_FL_XPRT_READY and CO_FL_CTRL_READY. We now rely on this and not on conn->xprt nor conn->ctrl anymore to decide what remains to be done on the connection. In order not to miss some flag assignments, we introduce conn_ctrl_init() to initialize the control layer, register the fd using fd_insert() and set the flag, and conn_ctrl_close() which unregisters the fd and removes the flag, but only if the transport layer was closed. Similarly, at the transport layer, conn_xprt_init() calls ->init and sets the flag, while conn_xprt_close() checks the flag, calls ->close and clears the flag, regardless xprt_ctx or xprt_st. This also ensures that the ->init and the ->close functions are called only once each and in the correct order. Note that conn_xprt_close() does nothing if the transport layer is still tracked. conn_full_close() now simply calls conn_xprt_close() then conn_full_close() in turn, which do nothing if CO_FL_XPRT_TRACKED is set. In order to handle the error path, we also provide conn_force_close() which ignores CO_FL_XPRT_TRACKED and closes the transport and the control layers in turns. All relevant instances of fd_delete() have been replaced with conn_force_close(). Now we always know what state the connection is in and we can expect to split its initialization.
2013-10-21 18:30:56 +04:00
conn_ctrl_init(conn); /* registers the FD */
fdtab[fd].linger_risk = 1; /* close hard if needed */
if (conn->flags & CO_FL_WAIT_L4_CONN) {
fd_want_send(fd);
fd_cant_send(fd);
MINOR: connection: avoid a useless recvfrom() on outgoing connections When a connect() doesn't immediately succeed (i.e. most of the times), fd_cant_send() is called to enable polling. But given that we don't mark that we cannot receive either, we end up performing a failed recvfrom() immediately when the connect() is finally confirmed, as indicated in issue #253. This patch simply adds fd_cant_recv() as well so that we're only notified once the recv path is ready. The reason it was not there is purely historic, as in the past when there was the fd cache, doing it would have caused a pending recv request to be placed into the fd cache, hence a useless recvfrom() upon success (i.e. what happens now). Without this patch, forwarding 100k connections does this: % time seconds usecs/call calls errors syscall ------ ----------- ----------- --------- --------- ---------------- 17.51 0.704229 7 100000 100000 connect 16.75 0.673875 3 200000 sendto 16.24 0.653222 3 200036 close 10.82 0.435082 1 300000 100000 recvfrom 10.37 0.417266 1 300012 setsockopt 7.12 0.286511 1 199954 epoll_ctl 6.80 0.273447 2 100000 shutdown 5.34 0.214942 2 100005 socket 4.65 0.187137 1 105002 5002 accept4 3.35 0.134757 1 100004 fcntl 0.61 0.024585 4 5858 epoll_wait With the patch: % time seconds usecs/call calls errors syscall ------ ----------- ----------- --------- --------- ---------------- 18.04 0.697365 6 100000 100000 connect 17.40 0.672471 3 200000 sendto 17.03 0.658134 3 200036 close 10.57 0.408459 1 300012 setsockopt 7.69 0.297270 1 200000 recvfrom 7.32 0.282934 1 199922 epoll_ctl 7.09 0.274027 2 100000 shutdown 5.59 0.216041 2 100005 socket 4.87 0.188352 1 104697 4697 accept4 3.35 0.129641 1 100004 fcntl 0.65 0.024959 4 5337 1 epoll_wait Note the total disappearance of 1/3 of failed recvfrom() *without* adding any extra syscall anywhere else. The trace of an HTTP health check is now totally clean, with no useless syscall at all anymore: 09:14:21.959255 connect(9, {sa_family=AF_INET, sin_port=htons(8000), sin_addr=inet_addr("127.0.0.1")}, 16) = -1 EINPROGRESS (Operation now in progress) 09:14:21.959292 epoll_ctl(4, EPOLL_CTL_ADD, 9, {EPOLLIN|EPOLLOUT|EPOLLRDHUP, {u32=9, u64=9}}) = 0 09:14:21.959315 epoll_wait(4, [{EPOLLOUT, {u32=9, u64=9}}], 200, 1000) = 1 09:14:21.959376 sendto(9, "OPTIONS / HTTP/1.0\r\ncontent-leng"..., 41, MSG_DONTWAIT|MSG_NOSIGNAL, NULL, 0) = 41 09:14:21.959436 epoll_wait(4, [{EPOLLOUT, {u32=9, u64=9}}], 200, 1000) = 1 09:14:21.959456 epoll_ctl(4, EPOLL_CTL_MOD, 9, {EPOLLIN|EPOLLRDHUP, {u32=9, u64=9}}) = 0 09:14:21.959512 epoll_wait(4, [{EPOLLIN|EPOLLRDHUP, {u32=9, u64=9}}], 200, 1000) = 1 09:14:21.959548 recvfrom(9, "HTTP/1.0 200\r\nContent-length: 0\r"..., 16320, 0, NULL, NULL) = 126 09:14:21.959570 close(9) = 0 With the edge-triggered poller, it gets even better: 09:29:15.776201 connect(9, {sa_family=AF_INET, sin_port=htons(8000), sin_addr=inet_addr("127.0.0.1")}, 16) = -1 EINPROGRESS (Operation now in progress) 09:29:15.776256 epoll_ctl(4, EPOLL_CTL_ADD, 9, {EPOLLIN|EPOLLOUT|EPOLLRDHUP|EPOLLET, {u32=9, u64=9}}) = 0 09:29:15.776287 epoll_wait(4, [{EPOLLOUT, {u32=9, u64=9}}], 200, 1000) = 1 09:29:15.776320 sendto(9, "OPTIONS / HTTP/1.0\r\ncontent-leng"..., 41, MSG_DONTWAIT|MSG_NOSIGNAL, NULL, 0) = 41 09:29:15.776374 epoll_wait(4, [{EPOLLIN|EPOLLOUT|EPOLLRDHUP, {u32=9, u64=9}}], 200, 1000) = 1 09:29:15.776406 recvfrom(9, "HTTP/1.0 200\r\nContent-length: 0\r"..., 16320, 0, NULL, NULL) = 126 09:29:15.776434 close(9) = 0 It could make sense to backport this patch to 2.2 and maybe 2.1 after it has been sufficiently checked for absence of side effects in 2.3-dev, as some people had reported an extra overhead like in issue #168.
2020-07-31 09:59:09 +03:00
fd_cant_recv(fd);
}
MEDIUM: connection: enable reading only once the connection is confirmed In order to address the absurd polling sequence described in issue #253, let's make sure we disable receiving on a connection until it's established. Previously with bottom-top I/Os, we were almost certain that a connection was ready when the first I/O was confirmed. Now we can enter various functions, including process_stream(), which will attempt to read something, will fail, and will then subscribe. But we don't want them to try to receive if we know the connection didn't complete. The first prerequisite for this is to mark the connection as not ready for receiving until it's validated. But we don't want to mark it as not ready for sending because we know that attempting I/Os later is extremely likely to work without polling. Once the connection is confirmed we re-enable recv readiness. In order for this event to be taken into account, the call to tcp_connect_probe() was moved earlier, between the attempt to send() and the attempt to recv(). This way if tcp_connect_probe() enables reading, we have a chance to immediately fall back to this and read the possibly pending data. Now the trace looks like the following. It's far from being perfect but we've already saved one recvfrom() and one epollctl(): epoll_wait(3, [], 200, 0) = 0 socket(AF_INET, SOCK_STREAM, IPPROTO_TCP) = 7 fcntl(7, F_SETFL, O_RDONLY|O_NONBLOCK) = 0 setsockopt(7, SOL_TCP, TCP_NODELAY, [1], 4) = 0 connect(7, {sa_family=AF_INET, sin_port=htons(8000), sin_addr=inet_addr("127.0.0.1")}, 16) = -1 EINPROGRESS (Operation now in progress) epoll_ctl(3, EPOLL_CTL_ADD, 7, {EPOLLIN|EPOLLOUT|EPOLLRDHUP, {u32=7, u64=7}}) = 0 epoll_wait(3, [{EPOLLOUT, {u32=7, u64=7}}], 200, 1000) = 1 connect(7, {sa_family=AF_INET, sin_port=htons(8000), sin_addr=inet_addr("127.0.0.1")}, 16) = 0 getsockopt(7, SOL_SOCKET, SO_ERROR, [0], [4]) = 0 sendto(7, "OPTIONS / HTTP/1.0\r\n\r\n", 22, MSG_DONTWAIT|MSG_NOSIGNAL, NULL, 0) = 22 epoll_ctl(3, EPOLL_CTL_MOD, 7, {EPOLLIN|EPOLLRDHUP, {u32=7, u64=7}}) = 0 epoll_wait(3, [{EPOLLIN|EPOLLRDHUP, {u32=7, u64=7}}], 200, 1000) = 1 getsockopt(7, SOL_SOCKET, SO_ERROR, [0], [4]) = 0 getsockopt(7, SOL_SOCKET, SO_ERROR, [0], [4]) = 0 recvfrom(7, "HTTP/1.0 200\r\nContent-length: 0\r\nX-req: size=22, time=0 ms\r\nX-rsp: id=dummy, code=200, cache=1, size=0, time=0 ms (0 real)\r\n\r\n", 16384, 0, NULL, NULL) = 126 close(7) = 0
2019-09-05 18:05:05 +03:00
REORG: connection: rename the data layer the "transport layer" While working on the changes required to make the health checks use the new connections, it started to become obvious that some naming was not logical at all in the connections. Specifically, it is not logical to call the "data layer" the layer which is in charge for all the handshake and which does not yet provide a data layer once established until a session has allocated all the required buffers. In fact, it's more a transport layer, which makes much more sense. The transport layer offers a medium on which data can transit, and it offers the functions to move these data when the upper layer requests this. And it is the upper layer which iterates over the transport layer's functions to move data which should be called the data layer. The use case where it's obvious is with embryonic sessions : an incoming SSL connection is accepted. Only the connection is allocated, not the buffers nor stream interface, etc... The connection handles the SSL handshake by itself. Once this handshake is complete, we can't use the data functions because the buffers and stream interface are not there yet. Hence we have to first call a specific function to complete the session initialization, after which we'll be able to use the data functions. This clearly proves that SSL here is only a transport layer and that the stream interface constitutes the data layer. A similar change will be performed to rename app_cb => data, but the two could not be in the same commit for obvious reasons.
2012-10-03 02:19:48 +04:00
if (conn_xprt_init(conn) < 0) {
conn_full_close(conn);
conn->flags |= CO_FL_ERROR;
return SF_ERR_RESOURCE;
}
return SF_ERR_NONE; /* connection is OK */
}
/* This function tries to bind a TCPv4/v6 listener. It may return a warning or
* an error message in <errmsg> if the message is at most <errlen> bytes long
* (including '\0'). Note that <errmsg> may be NULL if <errlen> is also zero.
* The return value is composed from ERR_ABORT, ERR_WARN,
* ERR_ALERT, ERR_RETRYABLE and ERR_FATAL. ERR_NONE indicates that everything
* was alright and that no message was returned. ERR_RETRYABLE means that an
* error occurred but that it may vanish after a retry (eg: port in use), and
* ERR_FATAL indicates a non-fixable error. ERR_WARN and ERR_ALERT do not alter
* the meaning of the error, but just indicate that a message is present which
* should be displayed with the respective level. Last, ERR_ABORT indicates
* that it's pointless to try to start other listeners. No error message is
* returned if errlen is NULL.
*/
int tcp_bind_listener(struct listener *listener, char *errmsg, int errlen)
{
int fd, err;
int ready;
char *msg = NULL;
err = ERR_NONE;
/* ensure we never return garbage */
if (errlen)
*errmsg = 0;
if (listener->state != LI_ASSIGNED)
return ERR_NONE; /* already bound */
if (!(listener->rx.flags & RX_F_BOUND)) {
msg = "receiving socket not bound";
goto tcp_return;
}
fd = listener->rx.fd;
if (listener->options & LI_O_NOLINGER)
setsockopt(fd, SOL_SOCKET, SO_LINGER, &nolinger, sizeof(struct linger));
else {
struct linger tmplinger;
socklen_t len = sizeof(tmplinger);
if (getsockopt(fd, SOL_SOCKET, SO_LINGER, &tmplinger, &len) == 0 &&
(tmplinger.l_onoff == 1 || tmplinger.l_linger == 0)) {
tmplinger.l_onoff = 0;
tmplinger.l_linger = 0;
setsockopt(fd, SOL_SOCKET, SO_LINGER, &tmplinger,
sizeof(tmplinger));
}
}
#if defined(TCP_MAXSEG)
if (listener->maxseg > 0) {
if (setsockopt(fd, IPPROTO_TCP, TCP_MAXSEG,
&listener->maxseg, sizeof(listener->maxseg)) == -1) {
msg = "cannot set MSS";
err |= ERR_WARN;
}
} else {
/* we may want to try to restore the default MSS if the socket was inherited */
int tmpmaxseg = -1;
int defaultmss;
socklen_t len = sizeof(tmpmaxseg);
if (listener->rx.addr.ss_family == AF_INET)
defaultmss = sock_inet_tcp_maxseg_default;
else
defaultmss = sock_inet6_tcp_maxseg_default;
getsockopt(fd, IPPROTO_TCP, TCP_MAXSEG, &tmpmaxseg, &len);
if (defaultmss > 0 &&
tmpmaxseg != defaultmss &&
setsockopt(fd, IPPROTO_TCP, TCP_MAXSEG, &defaultmss, sizeof(defaultmss)) == -1) {
msg = "cannot set MSS";
err |= ERR_WARN;
}
}
#endif
#if defined(TCP_USER_TIMEOUT)
if (listener->tcp_ut) {
if (setsockopt(fd, IPPROTO_TCP, TCP_USER_TIMEOUT,
&listener->tcp_ut, sizeof(listener->tcp_ut)) == -1) {
msg = "cannot set TCP User Timeout";
err |= ERR_WARN;
}
} else
setsockopt(fd, IPPROTO_TCP, TCP_USER_TIMEOUT, &zero,
sizeof(zero));
#endif
#if defined(TCP_DEFER_ACCEPT)
if (listener->options & LI_O_DEF_ACCEPT) {
/* defer accept by up to one second */
int accept_delay = 1;
if (setsockopt(fd, IPPROTO_TCP, TCP_DEFER_ACCEPT, &accept_delay, sizeof(accept_delay)) == -1) {
msg = "cannot enable DEFER_ACCEPT";
err |= ERR_WARN;
}
} else
setsockopt(fd, IPPROTO_TCP, TCP_DEFER_ACCEPT, &zero,
sizeof(zero));
#endif
#if defined(TCP_FASTOPEN)
if (listener->options & LI_O_TCP_FO) {
/* TFO needs a queue length, let's use the configured backlog */
int qlen = listener_backlog(listener);
if (setsockopt(fd, IPPROTO_TCP, TCP_FASTOPEN, &qlen, sizeof(qlen)) == -1) {
msg = "cannot enable TCP_FASTOPEN";
err |= ERR_WARN;
}
} else {
socklen_t len;
int qlen;
len = sizeof(qlen);
/* Only disable fast open if it was enabled, we don't want
* the kernel to create a fast open queue if there's none.
*/
if (getsockopt(fd, IPPROTO_TCP, TCP_FASTOPEN, &qlen, &len) == 0 &&
qlen != 0) {
if (setsockopt(fd, IPPROTO_TCP, TCP_FASTOPEN, &zero,
sizeof(zero)) == -1) {
msg = "cannot disable TCP_FASTOPEN";
err |= ERR_WARN;
}
}
}
#endif
ready = sock_accept_conn(&listener->rx) > 0;
if (!ready && /* only listen if not already done by external process */
listen(fd, listener_backlog(listener)) == -1) {
err |= ERR_RETRYABLE | ERR_ALERT;
msg = "cannot listen to socket";
goto tcp_close_return;
}
#if defined(TCP_QUICKACK)
if (listener->options & LI_O_NOQUICKACK)
setsockopt(fd, IPPROTO_TCP, TCP_QUICKACK, &zero, sizeof(zero));
else
setsockopt(fd, IPPROTO_TCP, TCP_QUICKACK, &one, sizeof(one));
#endif
/* the socket is ready */
listener_set_state(listener, LI_LISTEN);
goto tcp_return;
tcp_close_return:
close(fd);
tcp_return:
if (msg && errlen) {
char pn[INET6_ADDRSTRLEN];
addr_to_str(&listener->rx.addr, pn, sizeof(pn));
snprintf(errmsg, errlen, "%s [%s:%d]", msg, pn, get_host_port(&listener->rx.addr));
}
return err;
}
/* Add <listener> to the list of tcpv4 listeners, on port <port>. The
* listener's state is automatically updated from LI_INIT to LI_ASSIGNED.
* The number of listeners for the protocol is updated.
*
* Must be called with proto_lock held.
*
*/
static void tcpv4_add_listener(struct listener *listener, int port)
{
if (listener->state != LI_INIT)
return;
listener_set_state(listener, LI_ASSIGNED);
listener->rx.proto = &proto_tcpv4;
((struct sockaddr_in *)(&listener->rx.addr))->sin_port = htons(port);
LIST_ADDQ(&proto_tcpv4.receivers, &listener->rx.proto_list);
proto_tcpv4.nb_receivers++;
}
/* Add <listener> to the list of tcpv6 listeners, on port <port>. The
* listener's state is automatically updated from LI_INIT to LI_ASSIGNED.
* The number of listeners for the protocol is updated.
*
* Must be called with proto_lock held.
*
*/
static void tcpv6_add_listener(struct listener *listener, int port)
{
if (listener->state != LI_INIT)
return;
listener_set_state(listener, LI_ASSIGNED);
listener->rx.proto = &proto_tcpv6;
((struct sockaddr_in *)(&listener->rx.addr))->sin_port = htons(port);
LIST_ADDQ(&proto_tcpv6.receivers, &listener->rx.proto_list);
proto_tcpv6.nb_receivers++;
}
/* Enable receipt of incoming connections for listener <l>. The receiver must
* still be valid. Does nothing in early boot (needs fd_updt).
*/
static void tcp_enable_listener(struct listener *l)
{
if (fd_updt)
fd_want_recv(l->rx.fd);
}
/* Disable receipt of incoming connections for listener <l>. The receiver must
* still be valid. Does nothing in early boot (needs fd_updt).
*/
static void tcp_disable_listener(struct listener *l)
{
if (fd_updt)
fd_stop_recv(l->rx.fd);
}
/* Suspend a receiver. Returns < 0 in case of failure, 0 if the receiver
* was totally stopped, or > 0 if correctly suspended.
*/
static int tcp_suspend_receiver(struct receiver *rx)
{
const struct sockaddr sa = { .sa_family = AF_UNSPEC };
int ret;
if (connect(rx->fd, &sa, sizeof(sa)) < 0)
goto check_already_done;
fd_stop_recv(rx->fd);
return 1;
check_already_done:
/* in case one of the shutdown() above fails, it might be because we're
* dealing with a socket that is shared with other processes doing the
* same. Let's check if it's still accepting connections.
*/
ret = sock_accept_conn(rx);
if (ret <= 0) {
/* unrecoverable or paused by another process */
fd_stop_recv(rx->fd);
return ret == 0;
}
/* still listening, that's not good */
return -1;
}
/* Resume a receiver. Returns < 0 in case of failure, 0 if the receiver
* was totally stopped, or > 0 if correctly suspended.
*/
static int tcp_resume_receiver(struct receiver *rx)
{
struct listener *l = LIST_ELEM(rx, struct listener *, rx);
if (rx->fd < 0)
return 0;
if (listen(rx->fd, listener_backlog(l)) == 0) {
fd_want_recv(l->rx.fd);
return 1;
}
return -1;
}
/*
* Local variables:
* c-indent-level: 8
* c-basic-offset: 8
* End:
*/