MAJOR: connection : Split struct connection into struct connection and struct conn_stream.

All the references to connections in the data path from streams and
stream_interfaces were changed to use conn_streams. Most functions named
"something_conn" were renamed to "something_cs" for this. Sometimes the
connection still is what matters (eg during a connection establishment)
and were not always renamed. The change is significant and minimal at the
same time, and was quite thoroughly tested now. As of this patch, all
accesses to the connection from upper layers go through the pass-through
mux.
This commit is contained in:
Olivier Houchard 2017-09-13 18:30:23 +02:00 committed by Willy Tarreau
parent 7a3f0dfb7b
commit 9aaf778129
17 changed files with 439 additions and 298 deletions

View File

@ -315,25 +315,25 @@ static inline void __conn_xprt_stop_recv(struct connection *c)
c->flags &= ~CO_FL_XPRT_RD_ENA;
}
static inline void __cs_data_want_recv(struct conn_stream *cs)
static inline void __cs_want_recv(struct conn_stream *cs)
{
cs->flags |= CS_FL_DATA_RD_ENA;
}
static inline void __cs_data_stop_recv(struct conn_stream *cs)
static inline void __cs_stop_recv(struct conn_stream *cs)
{
cs->flags &= ~CS_FL_DATA_RD_ENA;
}
static inline void cs_data_want_recv(struct conn_stream *cs)
static inline void cs_want_recv(struct conn_stream *cs)
{
__cs_data_want_recv(cs);
__cs_want_recv(cs);
cs_update_mux_polling(cs);
}
static inline void cs_data_stop_recv(struct conn_stream *cs)
static inline void cs_stop_recv(struct conn_stream *cs)
{
__cs_data_stop_recv(cs);
__cs_stop_recv(cs);
cs_update_mux_polling(cs);
}
@ -366,36 +366,36 @@ static inline void __conn_xprt_stop_both(struct connection *c)
c->flags &= ~(CO_FL_XPRT_WR_ENA | CO_FL_XPRT_RD_ENA);
}
static inline void __cs_data_want_send(struct conn_stream *cs)
static inline void __cs_want_send(struct conn_stream *cs)
{
cs->flags |= CS_FL_DATA_WR_ENA;
}
static inline void __cs_data_stop_send(struct conn_stream *cs)
static inline void __cs_stop_send(struct conn_stream *cs)
{
cs->flags &= ~CS_FL_DATA_WR_ENA;
}
static inline void cs_data_stop_send(struct conn_stream *cs)
static inline void cs_stop_send(struct conn_stream *cs)
{
__cs_data_stop_send(cs);
__cs_stop_send(cs);
cs_update_mux_polling(cs);
}
static inline void cs_data_want_send(struct conn_stream *cs)
static inline void cs_want_send(struct conn_stream *cs)
{
__cs_data_want_send(cs);
__cs_want_send(cs);
cs_update_mux_polling(cs);
}
static inline void __cs_data_stop_both(struct conn_stream *cs)
static inline void __cs_stop_both(struct conn_stream *cs)
{
cs->flags &= ~(CS_FL_DATA_WR_ENA | CS_FL_DATA_RD_ENA);
}
static inline void cs_data_stop_both(struct conn_stream *cs)
static inline void cs_stop_both(struct conn_stream *cs)
{
__cs_data_stop_both(cs);
__cs_stop_both(cs);
cs_update_mux_polling(cs);
}
@ -537,6 +537,45 @@ static inline void conn_xprt_shutw_hard(struct connection *c)
c->xprt->shutw(c, 0);
}
/* shut read after draining possibly pending data */
static inline void cs_shutr(struct conn_stream *cs)
{
__cs_stop_recv(cs);
/* clean data-layer shutdown */
if (cs->conn->mux && cs->conn->mux->shutr)
cs->conn->mux->shutr(cs, 1);
}
/* shut read after disabling lingering */
static inline void cs_shutr_hard(struct conn_stream *cs)
{
__cs_stop_recv(cs);
/* clean data-layer shutdown */
if (cs->conn->mux && cs->conn->mux->shutr)
cs->conn->mux->shutr(cs, 0);
}
static inline void cs_shutw(struct conn_stream *cs)
{
__cs_stop_send(cs);
/* clean data-layer shutdown */
if (cs->conn->mux && cs->conn->mux->shutw)
cs->conn->mux->shutw(cs, 1);
}
static inline void cs_shutw_hard(struct conn_stream *cs)
{
__cs_stop_send(cs);
/* unclean data-layer shutdown */
if (cs->conn->mux && cs->conn->mux->shutw)
cs->conn->mux->shutw(cs, 0);
}
/* detect sock->data read0 transition */
static inline int conn_xprt_read0_pending(struct connection *c)
{
@ -576,7 +615,6 @@ static inline void conn_init(struct connection *conn)
{
conn->obj_type = OBJ_TYPE_CONN;
conn->flags = CO_FL_NONE;
conn->data = NULL;
conn->tmp_early_data = -1;
conn->mux = NULL;
conn->mux_ctx = NULL;
@ -622,31 +660,43 @@ static inline struct connection *conn_new()
return conn;
}
/* Tries to allocate a new conn_stream and initialize its main fields. The
* connection is returned on success, NULL on failure. The connection must
* be released using pool_free2() or conn_free().
*/
static inline struct conn_stream *cs_new(struct connection *conn)
{
struct conn_stream *cs;
cs = pool_alloc2(pool2_connstream);
if (likely(cs != NULL))
cs_init(cs, conn);
return cs;
}
/* Releases a conn_stream previously allocated by cs_new() */
static inline void cs_free(struct conn_stream *cs)
{
pool_free2(pool2_connstream, cs);
}
/* Tries to allocate a new conn_stream and initialize its main fields. If
* <conn> is NULL, then a new connection is allocated on the fly, initialized,
* and assigned to cs->conn ; this connection will then have to be released
* using pool_free2() or conn_free(). The conn_stream is initialized and added
* to the mux's stream list on success, then returned. On failure, nothing is
* allocated and NULL is returned.
*/
static inline struct conn_stream *cs_new(struct connection *conn)
{
struct conn_stream *cs;
cs = pool_alloc2(pool2_connstream);
if (!likely(cs))
return NULL;
if (!conn) {
conn = conn_new();
if (!likely(conn)) {
cs_free(cs);
return NULL;
}
conn_init(conn);
}
cs_init(cs, conn);
return cs;
}
/* Releases a connection previously allocated by conn_new() */
static inline void conn_free(struct connection *conn)
{
if (conn->mux && conn->mux->release)
conn->mux->release(conn);
pool_free2(pool2_connection, conn);
}
@ -700,11 +750,11 @@ static inline void conn_get_to_addr(struct connection *conn)
conn->flags |= CO_FL_ADDR_TO_SET;
}
/* Attaches a connection to an owner and assigns a data layer */
static inline void conn_attach(struct connection *conn, void *owner, const struct data_cb *data)
/* Attaches a conn_stream to a data layer and sets the relevant callbacks */
static inline void cs_attach(struct conn_stream *cs, void *data, const struct data_cb *data_cb)
{
conn->data = data;
conn->owner = owner;
cs->data_cb = data_cb;
cs->data = data;
}
/* Installs the connection's mux layer for upper context <ctx>.
@ -789,11 +839,11 @@ static inline const char *conn_get_mux_name(const struct connection *conn)
return conn->mux->name;
}
static inline const char *conn_get_data_name(const struct connection *conn)
static inline const char *cs_get_data_name(const struct conn_stream *cs)
{
if (!conn->data)
if (!cs->data_cb)
return "NONE";
return conn->data->name;
return cs->data_cb->name;
}
/* registers pointer to transport layer <id> (XPRT_*) */

View File

@ -36,7 +36,7 @@ extern struct list streams;
extern struct data_cb sess_conn_cb;
struct stream *stream_new(struct session *sess, enum obj_type *origin);
int stream_create_from_conn(struct connection *conn);
int stream_create_from_cs(struct conn_stream *cs);
/* perform minimal intializations, report 0 in case of error, 1 if OK. */
int init_stream();

View File

@ -152,18 +152,14 @@ static inline enum obj_type *si_detach_endpoint(struct stream_interface *si)
*/
static inline void si_release_endpoint(struct stream_interface *si)
{
struct connection *conn;
struct conn_stream *cs;
struct appctx *appctx;
if (!si->end)
return;
if ((conn = objt_conn(si->end))) {
LIST_DEL(&conn->list);
conn_stop_tracking(conn);
conn_full_close(conn);
conn_free(conn);
}
if ((cs = objt_cs(si->end)))
cs_destroy(cs);
else if ((appctx = objt_appctx(si->end))) {
if (appctx->applet->release && si->state < SI_ST_DIS)
appctx->applet->release(appctx);
@ -178,26 +174,27 @@ static inline void si_release_endpoint(struct stream_interface *si)
* connection will also be added at the head of this list. This connection
* remains assigned to the stream interface it is currently attached to.
*/
static inline void si_idle_conn(struct stream_interface *si, struct list *pool)
static inline void si_idle_cs(struct stream_interface *si, struct list *pool)
{
struct connection *conn = __objt_conn(si->end);
struct conn_stream *cs = __objt_cs(si->end);
struct connection *conn = cs->conn;
if (pool)
LIST_ADD(pool, &conn->list);
conn_attach(conn, si, &si_idle_conn_cb);
conn_xprt_want_recv(conn);
cs_attach(cs, si, &si_idle_conn_cb);
cs_want_recv(cs);
}
/* Attach connection <conn> to the stream interface <si>. The stream interface
/* Attach conn_stream <cs> to the stream interface <si>. The stream interface
* is configured to work with a connection and the connection it configured
* with a stream interface data layer.
*/
static inline void si_attach_conn(struct stream_interface *si, struct connection *conn)
static inline void si_attach_cs(struct stream_interface *si, struct conn_stream *cs)
{
si->ops = &si_conn_ops;
si->end = &conn->obj_type;
conn_attach(conn, si, &si_conn_cb);
si->end = &cs->obj_type;
cs_attach(cs, si, &si_conn_cb);
}
/* Returns true if a connection is attached to the stream interface <si> and
@ -205,7 +202,7 @@ static inline void si_attach_conn(struct stream_interface *si, struct connection
*/
static inline int si_conn_ready(struct stream_interface *si)
{
struct connection *conn = objt_conn(si->end);
struct connection *conn = cs_conn(objt_cs(si->end));
return conn && conn_ctrl_ready(conn) && conn_xprt_ready(conn);
}
@ -276,22 +273,22 @@ static inline void si_applet_stop_get(struct stream_interface *si)
si->flags &= ~SI_FL_WANT_GET;
}
/* Try to allocate a new connection and assign it to the interface. If
/* Try to allocate a new conn_stream and assign it to the interface. If
* an endpoint was previously allocated, it is released first. The newly
* allocated connection is initialized, assigned to the stream interface,
* allocated conn_stream is initialized, assigned to the stream interface,
* and returned.
*/
static inline struct connection *si_alloc_conn(struct stream_interface *si)
static inline struct conn_stream *si_alloc_cs(struct stream_interface *si, struct connection *conn)
{
struct connection *conn;
struct conn_stream *cs;
si_release_endpoint(si);
conn = conn_new();
if (conn)
si_attach_conn(si, conn);
cs = cs_new(conn);
if (cs)
si_attach_cs(si, cs);
return conn;
return cs;
}
/* Release the interface's existing endpoint (connection or appctx) and
@ -346,7 +343,8 @@ static inline void si_chk_snd(struct stream_interface *si)
/* Calls chk_snd on the connection using the ctrl layer */
static inline int si_connect(struct stream_interface *si)
{
struct connection *conn = objt_conn(si->end);
struct conn_stream *cs = objt_cs(si->end);
struct connection *conn = cs_conn(cs);
int ret = SF_ERR_NONE;
if (unlikely(!conn || !conn->ctrl || !conn->ctrl->connect))
@ -364,7 +362,7 @@ static inline int si_connect(struct stream_interface *si)
/* reuse the existing connection */
if (!channel_is_empty(si_oc(si))) {
/* we'll have to send a request there. */
conn_xprt_want_send(conn);
cs_want_send(cs);
}
/* the connection is established */

View File

@ -157,7 +157,7 @@ enum {
struct check {
struct xprt_ops *xprt; /* transport layer operations for health checks */
struct connection *conn; /* connection state for health checks */
struct conn_stream *cs; /* conn_stream state for health checks */
unsigned short port; /* the port to use for the health checks */
struct buffer *bi, *bo; /* input and output buffers to send/recv check */
struct task *task; /* the task associated to the health check processing, NULL if disabled */

View File

@ -288,9 +288,9 @@ struct mux_ops {
* data movement. It may abort a connection by returning < 0.
*/
struct data_cb {
void (*recv)(struct connection *conn); /* data-layer recv callback */
void (*send)(struct connection *conn); /* data-layer send callback */
int (*wake)(struct connection *conn); /* data-layer callback to report activity */
void (*recv)(struct conn_stream *cs); /* data-layer recv callback */
void (*send)(struct conn_stream *cs); /* data-layer send callback */
int (*wake)(struct conn_stream *cs); /* data-layer callback to report activity */
char name[8]; /* data layer name, zero-terminated */
};
@ -347,10 +347,9 @@ struct connection {
const struct protocol *ctrl; /* operations at the socket layer */
const struct xprt_ops *xprt; /* operations at the transport layer */
const struct mux_ops *mux; /* mux layer opreations. Must be set before xprt->init() */
const struct data_cb *data; /* data layer callbacks. Must be set before xprt->init() */
void *xprt_ctx; /* general purpose pointer, initialized to NULL */
void *mux_ctx; /* mux-specific context, initialized to NULL */
void *owner; /* pointer to upper layer's entity (eg: session, stream interface) */
void *owner; /* pointer to the owner session for incoming connections, or NULL */
int xprt_st; /* transport layer state, initialized to zero */
int tmp_early_data; /* 1st byte of early data, if any */
union conn_handle handle; /* connection handle at the socket layer */

View File

@ -567,7 +567,7 @@ int assign_server(struct stream *s)
srv = NULL;
s->target = NULL;
conn = objt_conn(s->si[1].end);
conn = cs_conn(objt_cs(s->si[1].end));
if (conn &&
(conn->flags & CO_FL_CONNECTED) &&
@ -720,8 +720,7 @@ int assign_server(struct stream *s)
s->target = &s->be->obj_type;
}
else if ((s->be->options & PR_O_HTTP_PROXY) &&
(conn = objt_conn(s->si[1].end)) &&
is_addr(&conn->addr.to)) {
conn && is_addr(&conn->addr.to)) {
/* in proxy mode, we need a valid destination address */
s->target = &s->be->obj_type;
}
@ -769,7 +768,7 @@ int assign_server(struct stream *s)
int assign_server_address(struct stream *s)
{
struct connection *cli_conn = objt_conn(strm_orig(s));
struct connection *srv_conn = objt_conn(s->si[1].end);
struct connection *srv_conn = cs_conn(objt_cs(s->si[1].end));
#ifdef DEBUG_FULL
fprintf(stderr,"assign_server_address : s=%p\n",s);
@ -973,7 +972,7 @@ static void assign_tproxy_address(struct stream *s)
struct server *srv = objt_server(s->target);
struct conn_src *src;
struct connection *cli_conn;
struct connection *srv_conn = objt_conn(s->si[1].end);
struct connection *srv_conn = cs_conn(objt_cs(s->si[1].end));
if (srv && srv->conn_src.opts & CO_SRC_BIND)
src = &srv->conn_src;
@ -1041,21 +1040,23 @@ int connect_server(struct stream *s)
{
struct connection *cli_conn;
struct connection *srv_conn;
struct connection *old_conn;
struct conn_stream *srv_cs;
struct conn_stream *old_cs;
struct server *srv;
int reuse = 0;
int err;
srv = objt_server(s->target);
srv_conn = objt_conn(s->si[1].end);
srv_cs = objt_cs(s->si[1].end);
srv_conn = cs_conn(srv_cs);
if (srv_conn)
reuse = s->target == srv_conn->target;
if (srv && !reuse) {
old_conn = srv_conn;
if (old_conn) {
old_cs = srv_cs;
if (old_cs) {
srv_conn = NULL;
old_conn->owner = NULL;
srv_cs->data = NULL;
si_detach_endpoint(&s->si[1]);
/* note: if the connection was in a server's idle
* queue, it doesn't get dequeued.
@ -1101,23 +1102,25 @@ int connect_server(struct stream *s)
LIST_DEL(&srv_conn->list);
LIST_INIT(&srv_conn->list);
if (srv_conn->owner) {
si_detach_endpoint(srv_conn->owner);
if (old_conn && !(old_conn->flags & CO_FL_PRIVATE)) {
si_attach_conn(srv_conn->owner, old_conn);
si_idle_conn(srv_conn->owner, NULL);
/* XXX cognet: this assumes only 1 conn_stream per
* connection, has to be revisited later
*/
srv_cs = srv_conn->mux_ctx;
if (srv_conn->mux == &mux_pt_ops && srv_cs->data) {
si_detach_endpoint(srv_cs->data);
if (old_cs && !(old_cs->conn->flags & CO_FL_PRIVATE)) {
si_attach_cs(srv_cs->data, old_cs);
si_idle_cs(srv_cs->data, NULL);
}
}
si_attach_conn(&s->si[1], srv_conn);
si_attach_cs(&s->si[1], srv_cs);
reuse = 1;
}
/* we may have to release our connection if we couldn't swap it */
if (old_conn && !old_conn->owner) {
LIST_DEL(&old_conn->list);
conn_full_close(old_conn);
conn_free(old_conn);
}
if (old_cs && !old_cs->data)
cs_destroy(old_cs);
}
if (reuse) {
@ -1136,15 +1139,16 @@ int connect_server(struct stream *s)
}
}
if (!reuse)
srv_conn = si_alloc_conn(&s->si[1]);
else {
if (!reuse) {
srv_cs = si_alloc_cs(&s->si[1], NULL);
srv_conn = cs_conn(srv_cs);
} else {
/* reusing our connection, take it out of the idle list */
LIST_DEL(&srv_conn->list);
LIST_INIT(&srv_conn->list);
}
if (!srv_conn)
if (!srv_cs)
return SF_ERR_RESOURCE;
if (!(s->flags & SF_ADDR_SET)) {
@ -1160,14 +1164,16 @@ int connect_server(struct stream *s)
/* set the correct protocol on the output stream interface */
if (srv) {
conn_prepare(srv_conn, protocol_by_family(srv_conn->addr.to.ss_family), srv->xprt);
conn_install_mux(srv_conn, &mux_pt_ops, srv_conn);
/* XXX: Pick the right mux, when we finally have one */
conn_install_mux(srv_conn, &mux_pt_ops, srv_cs);
}
else if (obj_type(s->target) == OBJ_TYPE_PROXY) {
/* proxies exclusively run on raw_sock right now */
conn_prepare(srv_conn, protocol_by_family(srv_conn->addr.to.ss_family), xprt_get(XPRT_RAW));
if (!objt_conn(s->si[1].end) || !objt_conn(s->si[1].end)->ctrl)
if (!objt_cs(s->si[1].end) || !objt_cs(s->si[1].end)->conn->ctrl)
return SF_ERR_INTERNAL;
conn_install_mux(srv_conn, &mux_pt_ops, srv_conn);
/* XXX: Pick the right mux, when we finally have one */
conn_install_mux(srv_conn, &mux_pt_ops, srv_cs);
}
else
return SF_ERR_INTERNAL; /* how did we get there ? */
@ -1182,13 +1188,13 @@ int connect_server(struct stream *s)
conn_get_to_addr(cli_conn);
}
si_attach_conn(&s->si[1], srv_conn);
si_attach_cs(&s->si[1], srv_cs);
assign_tproxy_address(s);
}
else {
/* the connection is being reused, just re-attach it */
si_attach_conn(&s->si[1], srv_conn);
si_attach_cs(&s->si[1], srv_cs);
s->flags |= SF_SRV_REUSED;
}

View File

@ -582,7 +582,8 @@ static int retrieve_errno_from_socket(struct connection *conn)
*/
static void chk_report_conn_err(struct check *check, int errno_bck, int expired)
{
struct connection *conn = check->conn;
struct conn_stream *cs = check->cs;
struct connection *conn = cs_conn(cs);
const char *err_msg;
struct chunk *chk;
int step;
@ -705,9 +706,10 @@ static void chk_report_conn_err(struct check *check, int errno_bck, int expired)
* it sends the request. In other cases, it calls set_server_check_status()
* to set check->status, check->duration and check->result.
*/
static void event_srv_chk_w(struct connection *conn)
static void event_srv_chk_w(struct conn_stream *cs)
{
struct check *check = conn->owner;
struct connection *conn = cs->conn;
struct check *check = cs->data;
struct server *s = check->server;
struct task *t = check->task;
@ -719,7 +721,7 @@ static void event_srv_chk_w(struct connection *conn)
if (retrieve_errno_from_socket(conn)) {
chk_report_conn_err(check, errno, 0);
__conn_xprt_stop_both(conn);
__cs_stop_both(cs);
goto out_wakeup;
}
@ -741,10 +743,10 @@ static void event_srv_chk_w(struct connection *conn)
return;
if (check->bo->o) {
conn->xprt->snd_buf(conn, check->bo, 0);
conn->mux->snd_buf(cs, check->bo, 0);
if (conn->flags & CO_FL_ERROR) {
chk_report_conn_err(check, errno, 0);
__conn_xprt_stop_both(conn);
__cs_stop_both(cs);
goto out_wakeup;
}
if (check->bo->o)
@ -761,7 +763,7 @@ static void event_srv_chk_w(struct connection *conn)
out_wakeup:
task_wakeup(t, TASK_WOKEN_IO);
out_nowake:
__conn_xprt_stop_send(conn); /* nothing more to write */
__cs_stop_send(cs); /* nothing more to write */
}
/*
@ -778,9 +780,10 @@ static void event_srv_chk_w(struct connection *conn)
* call it with a proper error status like HCHK_STATUS_L7STS, HCHK_STATUS_L6RSP,
* etc.
*/
static void event_srv_chk_r(struct connection *conn)
static void event_srv_chk_r(struct conn_stream *cs)
{
struct check *check = conn->owner;
struct connection *conn = cs->conn;
struct check *check = cs->data;
struct server *s = check->server;
struct task *t = check->task;
char *desc;
@ -815,7 +818,7 @@ static void event_srv_chk_r(struct connection *conn)
done = 0;
conn->xprt->rcv_buf(conn, check->bi, check->bi->size);
conn->mux->rcv_buf(cs, check->bi, check->bi->size);
if (conn->flags & (CO_FL_ERROR | CO_FL_SOCK_RD_SH)) {
done = 1;
if ((conn->flags & CO_FL_ERROR) && !check->bi->i) {
@ -1339,8 +1342,8 @@ static void event_srv_chk_r(struct connection *conn)
* range quickly. To avoid sending RSTs all the time, we first try to
* drain pending data.
*/
__conn_xprt_stop_both(conn);
conn_xprt_shutw(conn);
__cs_stop_both(cs);
cs_shutw(cs);
/* OK, let's not stay here forever */
if (check->result == CHK_RES_FAILED)
@ -1350,7 +1353,7 @@ static void event_srv_chk_r(struct connection *conn)
return;
wait_more_data:
__conn_xprt_want_recv(conn);
__cs_want_recv(cs);
}
/*
@ -1359,15 +1362,17 @@ static void event_srv_chk_r(struct connection *conn)
* It returns 0 on normal cases, <0 if at least one close() has happened on the
* connection (eg: reconnect).
*/
static int wake_srv_chk(struct connection *conn)
static int wake_srv_chk(struct conn_stream *cs)
{
struct check *check = conn->owner;
struct connection *conn = cs->conn;
struct check *check = cs->data;
int ret = 0;
/* we may have to make progress on the TCP checks */
if (check->type == PR_O2_TCPCHK_CHK) {
ret = tcpcheck_main(check);
conn = check->conn;
cs = check->cs;
conn = cs_conn(cs);
}
if (unlikely(conn->flags & CO_FL_ERROR)) {
@ -1378,8 +1383,7 @@ static int wake_srv_chk(struct connection *conn)
* we expect errno to still be valid.
*/
chk_report_conn_err(check, errno, 0);
__conn_xprt_stop_both(conn);
__cs_stop_both(cs);
task_wakeup(check->task, TASK_WOKEN_IO);
}
else if (!(conn->flags & (CO_FL_XPRT_RD_ENA|CO_FL_XPRT_WR_ENA|CO_FL_HANDSHAKE))) {
@ -1478,7 +1482,8 @@ static int connect_conn_chk(struct task *t)
{
struct check *check = t->context;
struct server *s = check->server;
struct connection *conn = check->conn;
struct conn_stream *cs = check->cs;
struct connection *conn = cs_conn(cs);
struct protocol *proto;
struct tcpcheck_rule *tcp_rule = NULL;
int ret;
@ -1535,9 +1540,10 @@ static int connect_conn_chk(struct task *t)
}
/* prepare a new connection */
conn = check->conn = conn_new();
if (!check->conn)
cs = check->cs = cs_new(NULL);
if (!check->cs)
return SF_ERR_RESOURCE;
conn = cs->conn;
if (is_addr(&check->addr)) {
/* we'll connect to the check addr specified on the server */
@ -1553,7 +1559,7 @@ static int connect_conn_chk(struct task *t)
i = srv_check_healthcheck_port(check);
if (i == 0) {
conn->owner = check;
cs->data = check;
return SF_ERR_CHK_PORT;
}
@ -1563,8 +1569,8 @@ static int connect_conn_chk(struct task *t)
proto = protocol_by_family(conn->addr.to.ss_family);
conn_prepare(conn, proto, check->xprt);
conn_install_mux(conn, &mux_pt_ops, conn);
conn_attach(conn, check, &check_conn_cb);
conn_install_mux(conn, &mux_pt_ops, cs);
cs_attach(cs, check, &check_conn_cb);
conn->target = &s->obj_type;
/* no client address */
@ -2077,7 +2083,8 @@ static struct task *process_chk_conn(struct task *t)
{
struct check *check = t->context;
struct server *s = check->server;
struct connection *conn = check->conn;
struct conn_stream *cs = check->cs;
struct connection *conn = cs_conn(cs);
int rv;
int ret;
int expired = tick_is_expired(t->expire, now_ms);
@ -2105,7 +2112,8 @@ static struct task *process_chk_conn(struct task *t)
check->bo->o = 0;
ret = connect_conn_chk(t);
conn = check->conn;
cs = check->cs;
conn = cs_conn(cs);
switch (ret) {
case SF_ERR_UP:
@ -2123,7 +2131,7 @@ static struct task *process_chk_conn(struct task *t)
}
if (check->type)
conn_xprt_want_recv(conn); /* prepare for reading a possible reply */
cs_want_recv(cs); /* prepare for reading a possible reply */
task_set_affinity(t, tid_bit);
goto reschedule;
@ -2147,9 +2155,10 @@ static struct task *process_chk_conn(struct task *t)
}
/* here, we have seen a synchronous error, no fd was allocated */
if (conn) {
conn_free(conn);
check->conn = conn = NULL;
if (cs) {
cs_destroy(cs);
cs = check->cs = NULL;
conn = NULL;
}
check->state &= ~CHK_ST_INPROGRESS;
@ -2201,8 +2210,9 @@ static struct task *process_chk_conn(struct task *t)
}
if (conn) {
conn_free(conn);
check->conn = conn = NULL;
cs_destroy(cs);
cs = check->cs = NULL;
conn = NULL;
}
if (check->result == CHK_RES_FAILED) {
@ -2550,7 +2560,8 @@ static int tcpcheck_main(struct check *check)
char *contentptr, *comment;
struct tcpcheck_rule *next;
int done = 0, ret = 0, step = 0;
struct connection *conn = check->conn;
struct conn_stream *cs = check->cs;
struct connection *conn = cs_conn(cs);
struct server *s = check->server;
struct task *t = check->task;
struct list *head = check->tcpcheck_rules;
@ -2619,8 +2630,8 @@ static int tcpcheck_main(struct check *check)
}
/* It's only the rules which will enable send/recv */
if (conn)
__conn_xprt_stop_both(conn);
if (cs)
cs_stop_both(cs);
while (1) {
/* We have to try to flush the output buffer before reading, at
@ -2633,11 +2644,11 @@ static int tcpcheck_main(struct check *check)
check->current_step->action != TCPCHK_ACT_SEND ||
check->current_step->string_len >= buffer_total_space(check->bo))) {
__conn_xprt_want_send(conn);
if (conn->xprt->snd_buf(conn, check->bo, 0) <= 0) {
__cs_want_send(cs);
if (conn->mux->snd_buf(cs, check->bo, 0) <= 0) {
if (conn->flags & CO_FL_ERROR) {
chk_report_conn_err(check, errno, 0);
__conn_xprt_stop_both(conn);
__cs_stop_both(cs);
goto out_end_tcpcheck;
}
break;
@ -2673,8 +2684,9 @@ static int tcpcheck_main(struct check *check)
* 2: try to get a new connection
* 3: release and replace the old one on success
*/
if (check->conn) {
conn_full_close(check->conn);
if (check->cs) {
/* XXX: need to kill all CS here as well but not to free them yet */
conn_full_close(check->cs->conn);
retcode = -1; /* do not reuse the fd! */
}
@ -2682,8 +2694,8 @@ static int tcpcheck_main(struct check *check)
check->last_started_step = check->current_step;
/* prepare new connection */
conn = conn_new();
if (!conn) {
cs = cs_new(NULL);
if (!cs) {
step = tcpcheck_get_step_id(check);
chunk_printf(&trash, "TCPCHK error allocating connection at step %d", step);
comment = tcpcheck_get_step_comment(check, step);
@ -2694,11 +2706,15 @@ static int tcpcheck_main(struct check *check)
return retcode;
}
if (check->conn)
conn_free(check->conn);
check->conn = conn;
if (check->cs) {
if (check->cs->conn)
conn_free(check->cs->conn);
cs_free(check->cs);
}
conn_attach(conn, check, &check_conn_cb);
check->cs = cs;
conn = cs->conn;
cs_attach(cs, check, &check_conn_cb);
conn->target = &s->obj_type;
/* no client address */
@ -2727,7 +2743,7 @@ static int tcpcheck_main(struct check *check)
xprt = xprt_get(XPRT_RAW);
}
conn_prepare(conn, proto, xprt);
conn_install_mux(conn, &mux_pt_ops, conn);
conn_install_mux(conn, &mux_pt_ops, cs);
ret = SF_ERR_INTERNAL;
if (proto->connect)
@ -2860,8 +2876,8 @@ static int tcpcheck_main(struct check *check)
if (unlikely(check->result == CHK_RES_FAILED))
goto out_end_tcpcheck;
__conn_xprt_want_recv(conn);
if (conn->xprt->rcv_buf(conn, check->bi, check->bi->size) <= 0) {
__cs_want_recv(cs);
if (conn->mux->rcv_buf(cs, check->bi, check->bi->size) <= 0) {
if (conn->flags & (CO_FL_ERROR | CO_FL_SOCK_RD_SH)) {
done = 1;
if ((conn->flags & CO_FL_ERROR) && !check->bi->i) {
@ -2958,7 +2974,7 @@ static int tcpcheck_main(struct check *check)
if (check->current_step->action == TCPCHK_ACT_EXPECT)
goto tcpcheck_expect;
__conn_xprt_stop_recv(conn);
__cs_stop_recv(cs);
}
}
else {
@ -2978,7 +2994,7 @@ static int tcpcheck_main(struct check *check)
if (check->current_step->action == TCPCHK_ACT_EXPECT)
goto tcpcheck_expect;
__conn_xprt_stop_recv(conn);
__cs_stop_recv(cs);
}
/* not matched but was supposed to => ERROR */
else {
@ -3012,11 +3028,11 @@ static int tcpcheck_main(struct check *check)
/* warning, current_step may now point to the head */
if (check->bo->o)
__conn_xprt_want_send(conn);
__cs_want_send(cs);
if (&check->current_step->list != head &&
check->current_step->action == TCPCHK_ACT_EXPECT)
__conn_xprt_want_recv(conn);
__cs_want_recv(cs);
return retcode;
out_end_tcpcheck:
@ -3030,7 +3046,7 @@ static int tcpcheck_main(struct check *check)
if (check->result == CHK_RES_FAILED)
conn->flags |= CO_FL_ERROR;
__conn_xprt_stop_both(conn);
__cs_stop_both(cs);
return retcode;
}
@ -3049,7 +3065,6 @@ const char *init_check(struct check *check, int type)
return "out of memory while allocating check buffer";
}
check->bo->size = global.tune.chksize;
return NULL;
}
@ -3059,8 +3074,10 @@ void free_check(struct check *check)
check->bi = NULL;
free(check->bo);
check->bo = NULL;
free(check->conn);
check->conn = NULL;
free(check->cs->conn);
check->cs->conn = NULL;
cs_free(check->cs);
check->cs = NULL;
}
void email_alert_free(struct email_alert *alert)

View File

@ -1268,7 +1268,7 @@ static int _getsocks(char **args, struct appctx *appctx, void *private)
struct cmsghdr *cmsg;
struct stream_interface *si = appctx->owner;
struct stream *s = si_strm(si);
struct connection *remote = objt_conn(si_opposite(si)->end);
struct connection *remote = cs_conn(objt_cs(si_opposite(si)->end));
struct msghdr msghdr;
struct iovec iov;
struct timeval tv = { .tv_sec = 1, .tv_usec = 0 };

View File

@ -101,7 +101,7 @@ int frontend_accept(struct stream *s)
/* try to report the ALPN value when available (also works for NPN) */
if (conn && conn->owner == &s->si[0]) {
if (conn && conn == cs_conn(objt_cs(s->si[0].end))) {
if (conn_get_alpn(conn, &alpn_str, &alpn_len) && alpn_str) {
int len = MIN(alpn_len, sizeof(alpn) - 1);
memcpy(alpn, alpn_str, len);

View File

@ -1521,7 +1521,7 @@ __LJMP static struct hlua_socket *hlua_checksocket(lua_State *L, int ud)
static void hlua_socket_handler(struct appctx *appctx)
{
struct stream_interface *si = appctx->owner;
struct connection *c = objt_conn(si_opposite(si)->end);
struct connection *c = cs_conn(objt_cs(si_opposite(si)->end));
if (appctx->ctx.hlua_cosocket.die) {
si_shutw(si);
@ -2167,7 +2167,7 @@ __LJMP static int hlua_socket_getpeername(struct lua_State *L)
si = appctx->owner;
s = si_strm(si);
conn = objt_conn(s->si[1].end);
conn = cs_conn(objt_cs(s->si[1].end));
if (!conn) {
xref_unlock(&socket->xref, peer);
lua_pushnil(L);
@ -2217,7 +2217,7 @@ static int hlua_socket_getsockname(struct lua_State *L)
si = appctx->owner;
s = si_strm(si);
conn = objt_conn(s->si[1].end);
conn = cs_conn(objt_cs(s->si[1].end));
if (!conn) {
xref_unlock(&socket->xref, peer);
lua_pushnil(L);
@ -2346,7 +2346,7 @@ __LJMP static int hlua_socket_connect(struct lua_State *L)
s = si_strm(si);
/* Initialise connection. */
conn = si_alloc_conn(&s->si[1]);
conn = cs_conn(si_alloc_cs(&s->si[1], NULL));
if (!conn) {
xref_unlock(&socket->xref, peer);
WILL_LJMP(luaL_error(L, "connect: internal error"));

View File

@ -1525,7 +1525,7 @@ int build_logline(struct stream *s, char *dst, size_t maxsize, struct list *list
break;
case LOG_FMT_BACKENDIP: // %bi
conn = objt_conn(s->si[1].end);
conn = cs_conn(objt_cs(s->si[1].end));
if (conn)
ret = lf_ip(tmplog, (struct sockaddr *)&conn->addr.from, dst + maxsize - tmplog, tmp);
else
@ -1538,7 +1538,7 @@ int build_logline(struct stream *s, char *dst, size_t maxsize, struct list *list
break;
case LOG_FMT_BACKENDPORT: // %bp
conn = objt_conn(s->si[1].end);
conn = cs_conn(objt_cs(s->si[1].end));
if (conn)
ret = lf_port(tmplog, (struct sockaddr *)&conn->addr.from, dst + maxsize - tmplog, tmp);
else
@ -1551,7 +1551,7 @@ int build_logline(struct stream *s, char *dst, size_t maxsize, struct list *list
break;
case LOG_FMT_SERVERIP: // %si
conn = objt_conn(s->si[1].end);
conn = cs_conn(objt_cs(s->si[1].end));
if (conn)
ret = lf_ip(tmplog, (struct sockaddr *)&conn->addr.to, dst + maxsize - tmplog, tmp);
else
@ -1564,7 +1564,7 @@ int build_logline(struct stream *s, char *dst, size_t maxsize, struct list *list
break;
case LOG_FMT_SERVERPORT: // %sp
conn = objt_conn(s->si[1].end);
conn = cs_conn(objt_cs(s->si[1].end));
if (conn)
ret = lf_port(tmplog, (struct sockaddr *)&conn->addr.to, dst + maxsize - tmplog, tmp);
else

View File

@ -14,17 +14,31 @@
#include <proto/connection.h>
#include <proto/stream.h>
/* Initialize the mux once it's attached. If conn->mux_ctx is NULL, it is
* assumed that no data layer has yet been instanciated so the mux is
* attached to an incoming connection and will instanciate a new stream. If
* conn->mux_ctx exists, it is assumed that it is an outgoing connection
* requested for this context. Returns < 0 on error.
/* Initialize the mux once it's attached. It is expected that conn->mux_ctx
* points to the existing conn_stream (for outgoing connections) or NULL (for
* incoming ones, in which case one will be allocated and a new stream will be
* instanciated). Returns < 0 on error.
*/
static int mux_pt_init(struct connection *conn)
{
if (!conn->mux_ctx)
return stream_create_from_conn(conn);
struct conn_stream *cs = conn->mux_ctx;
if (!cs) {
cs = cs_new(conn);
if (!cs)
goto fail;
if (stream_create_from_cs(cs) < 0)
goto fail_free;
conn->mux_ctx = cs;
}
return 0;
fail_free:
cs_free(cs);
fail:
return -1;
}
/* callback to be used by default for the pass-through mux. It calls the data
@ -32,7 +46,13 @@ static int mux_pt_init(struct connection *conn)
*/
static int mux_pt_wake(struct connection *conn)
{
return conn->data->wake ? conn->data->wake(conn) : 0;
struct conn_stream *cs = conn->mux_ctx;
int ret;
ret = cs->data_cb->wake ? cs->data_cb->wake(cs) : 0;
cs_update_mux_polling(cs);
return (ret);
}
/* callback used to update the mux's polling flags after changing a cs' status.
@ -60,7 +80,12 @@ static void mux_pt_update_poll(struct conn_stream *cs)
*/
static void mux_pt_recv(struct connection *conn)
{
conn->data->recv(conn);
struct conn_stream *cs = conn->mux_ctx;
if (conn_xprt_read0_pending(conn))
cs->flags |= CS_FL_EOS;
cs->data_cb->recv(cs);
cs_update_mux_polling(cs);
}
/* callback to be used by default for the pass-through mux. It simply calls the
@ -68,7 +93,10 @@ static void mux_pt_recv(struct connection *conn)
*/
static void mux_pt_send(struct connection *conn)
{
conn->data->send(conn);
struct conn_stream *cs = conn->mux_ctx;
cs->data_cb->send(cs);
cs_update_mux_polling(cs);
}
/*

View File

@ -1871,6 +1871,7 @@ static struct appctx *peer_session_create(struct peers *peers, struct peer *peer
struct session *sess;
struct stream *s;
struct connection *conn;
struct conn_stream *cs;
peer->reconnect = tick_add(now_ms, MS_TO_TICKS(5000));
peer->statuscode = PEER_SESS_SC_CONNECTCODE;
@ -1912,9 +1913,12 @@ static struct appctx *peer_session_create(struct peers *peers, struct peer *peer
if (unlikely((conn = conn_new()) == NULL))
goto out_free_strm;
if (unlikely((cs = cs_new(conn)) == NULL))
goto out_free_conn;
conn_prepare(conn, peer->proto, peer->xprt);
conn_install_mux(conn, &mux_pt_ops, conn);
si_attach_conn(&s->si[1], conn);
conn_install_mux(conn, &mux_pt_ops, cs);
si_attach_cs(&s->si[1], cs);
conn->target = s->target = &s->be->obj_type;
memcpy(&conn->addr.to, &peer->addr, sizeof(conn->addr.to));
@ -1928,6 +1932,8 @@ static struct appctx *peer_session_create(struct peers *peers, struct peer *peer
return appctx;
/* Error unrolling */
out_free_conn:
conn_free(conn);
out_free_strm:
LIST_DEL(&s->list);
pool_free2(pool2_stream, s);

View File

@ -3662,7 +3662,7 @@ int http_process_request(struct stream *s, struct channel *req, int an_bit)
char *path;
/* Note that for now we don't reuse existing proxy connections */
if (unlikely((conn = si_alloc_conn(&s->si[1])) == NULL)) {
if (unlikely((conn = cs_conn(si_alloc_cs(&s->si[1], NULL))) == NULL)) {
txn->req.err_state = txn->req.msg_state;
txn->req.msg_state = HTTP_MSG_ERROR;
txn->status = 500;
@ -4212,6 +4212,7 @@ void http_end_txn_clean_session(struct stream *s)
int prev_status = s->txn->status;
struct proxy *fe = strm_fe(s);
struct proxy *be = s->be;
struct conn_stream *cs;
struct connection *srv_conn;
struct server *srv;
unsigned int prev_flags = s->txn->flags;
@ -4221,7 +4222,14 @@ void http_end_txn_clean_session(struct stream *s)
* flags. We also need a more accurate method for computing per-request
* data.
*/
srv_conn = objt_conn(s->si[1].end);
/*
* XXX cognet: This is probably wrong, this is killing a whole
* connection, in the new world order, we probably want to just kill
* the stream, this is to be revisited the day we handle multiple
* streams in one server connection.
*/
cs = objt_cs(s->si[1].end);
srv_conn = cs_conn(cs);
/* unless we're doing keep-alive, we want to quickly close the connection
* to the server.
@ -4364,17 +4372,17 @@ void http_end_txn_clean_session(struct stream *s)
if (srv_conn && LIST_ISEMPTY(&srv_conn->list)) {
srv = objt_server(srv_conn->target);
if (!srv)
si_idle_conn(&s->si[1], NULL);
si_idle_cs(&s->si[1], NULL);
else if (srv_conn->flags & CO_FL_PRIVATE)
si_idle_conn(&s->si[1], (srv->priv_conns ? &srv->priv_conns[tid] : NULL));
si_idle_cs(&s->si[1], (srv->priv_conns ? &srv->priv_conns[tid] : NULL));
else if (prev_flags & TX_NOT_FIRST)
/* note: we check the request, not the connection, but
* this is valid for strategies SAFE and AGGR, and in
* case of ALWS, we don't care anyway.
*/
si_idle_conn(&s->si[1], (srv->safe_conns ? &srv->safe_conns[tid] : NULL));
si_idle_cs(&s->si[1], (srv->safe_conns ? &srv->safe_conns[tid] : NULL));
else
si_idle_conn(&s->si[1], (srv->idle_conns ? &srv->idle_conns[tid] : NULL));
si_idle_cs(&s->si[1], (srv->idle_conns ? &srv->idle_conns[tid] : NULL));
}
s->req.analysers = strm_li(s) ? strm_li(s)->analysers : 0;
s->res.analysers = 0;
@ -7936,7 +7944,7 @@ void debug_hdr(const char *dir, struct stream *s, const char *start, const char
chunk_printf(&trash, "%08x:%s.%s[%04x:%04x]: ", s->uniq_id, s->be->id,
dir,
objt_conn(sess->origin) ? (unsigned short)objt_conn(sess->origin)->handle.fd : -1,
objt_conn(s->si[1].end) ? (unsigned short)objt_conn(s->si[1].end)->handle.fd : -1);
objt_cs(s->si[1].end) ? (unsigned short)objt_cs(s->si[1].end)->conn->handle.fd : -1);
for (max = 0; start + max < end; max++)
if (start[max] == '\r' || start[max] == '\n')

View File

@ -1598,11 +1598,11 @@ static inline int get_tcp_info(const struct arg *args, struct sample *smp,
/* get the object associated with the stream interface.The
* object can be other thing than a connection. For example,
* it be a appctx. */
conn = objt_conn(smp->strm->si[dir].end);
conn = cs_conn(objt_cs(smp->strm->si[dir].end));
if (!conn)
return 0;
/* The fd may not be avalaible for the tcp_info struct, and the
/* The fd may not be available for the tcp_info struct, and the
syscal can fail. */
optlen = sizeof(info);
if (getsockopt(conn->handle.fd, SOL_TCP, TCP_INFO, &info, &optlen) == -1)

View File

@ -74,11 +74,11 @@ static struct list service_keywords = LIST_HEAD_INIT(service_keywords);
* valid right after the handshake, before the connection's data layer is
* initialized, because it relies on the session to be in conn->owner.
*/
int stream_create_from_conn(struct connection *conn)
int stream_create_from_cs(struct conn_stream *cs)
{
struct stream *strm;
strm = stream_new(conn->owner, &conn->obj_type);
strm = stream_new(cs->conn->owner, &cs->obj_type);
if (strm == NULL)
return -1;
@ -99,7 +99,7 @@ struct stream *stream_new(struct session *sess, enum obj_type *origin)
{
struct stream *s;
struct task *t;
struct connection *conn = objt_conn(origin);
struct conn_stream *cs = objt_cs(origin);
struct appctx *appctx = objt_appctx(origin);
if (unlikely((s = pool_alloc2(pool2_stream)) == NULL))
@ -198,8 +198,8 @@ struct stream *stream_new(struct session *sess, enum obj_type *origin)
s->si[0].hcto = sess->fe->timeout.clientfin;
/* attach the incoming connection to the stream interface now. */
if (conn)
si_attach_conn(&s->si[0], conn);
if (cs)
si_attach_cs(&s->si[0], cs);
else if (appctx)
si_attach_appctx(&s->si[0], appctx);
@ -261,8 +261,8 @@ struct stream *stream_new(struct session *sess, enum obj_type *origin)
goto out_fail_accept;
/* finish initialization of the accepted file descriptor */
if (conn)
conn_xprt_want_recv(conn);
if (cs)
cs_want_recv(cs);
else if (appctx)
si_applet_want_get(&s->si[0]);
@ -295,7 +295,8 @@ static void stream_free(struct stream *s)
struct session *sess = strm_sess(s);
struct proxy *fe = sess->fe;
struct bref *bref, *back;
struct connection *cli_conn = objt_conn(s->si[0].end);
struct conn_stream *cli_cs = objt_cs(s->si[0].end);
struct connection *cli_conn = cs_conn(cli_cs);
int i;
if (s->pend_pos)
@ -343,6 +344,7 @@ static void stream_free(struct stream *s)
http_end_txn(s);
/* ensure the client-side transport layer is destroyed */
/* XXX cognet: wrong for multiple streams in one connection */
if (cli_conn) {
conn_stop_tracking(cli_conn);
conn_full_close(cli_conn);
@ -577,7 +579,7 @@ static int sess_update_st_con_tcp(struct stream *s)
struct stream_interface *si = &s->si[1];
struct channel *req = &s->req;
struct channel *rep = &s->res;
struct connection *srv_conn = __objt_conn(si->end);
struct connection *srv_conn = __objt_cs(si->end)->conn;
/* If we got an error, or if nothing happened and the connection timed
* out, we must give up. The CER state handler will take care of retry
@ -597,6 +599,9 @@ static int sess_update_st_con_tcp(struct stream *s)
si->exp = TICK_ETERNITY;
si->state = SI_ST_CER;
/* XXX cognet: do we really want to kill the connection here ?
* Probably not for multiple streams.
*/
conn_full_close(srv_conn);
if (si->err_type)
@ -647,7 +652,8 @@ static int sess_update_st_con_tcp(struct stream *s)
static int sess_update_st_cer(struct stream *s)
{
struct stream_interface *si = &s->si[1];
struct connection *conn = objt_conn(si->end);
struct conn_stream *cs = objt_cs(si->end);
struct connection *conn = cs_conn(cs);
/* we probably have to release last stream from the server */
if (objt_server(s->target)) {
@ -812,7 +818,7 @@ static void sess_establish(struct stream *s)
req->flags |= CF_WAKE_ONCE;
req->flags &= ~CF_WAKE_CONNECT;
}
if (objt_conn(si->end)) {
if (objt_cs(si->end)) {
/* real connections have timeouts */
req->wto = s->be->timeout.server;
rep->rto = s->be->timeout.server;
@ -2111,8 +2117,8 @@ struct task *process_stream(struct task *t)
if (!(req->flags & (CF_KERN_SPLICING|CF_SHUTR)) &&
req->to_forward &&
(global.tune.options & GTUNE_USE_SPLICE) &&
(objt_conn(si_f->end) && __objt_conn(si_f->end)->xprt && __objt_conn(si_f->end)->xprt->rcv_pipe) &&
(objt_conn(si_b->end) && __objt_conn(si_b->end)->xprt && __objt_conn(si_b->end)->xprt->snd_pipe) &&
(objt_cs(si_f->end) && __objt_cs(si_f->end)->conn->xprt && __objt_cs(si_f->end)->conn->xprt->rcv_pipe) &&
(objt_cs(si_b->end) && __objt_cs(si_b->end)->conn->xprt && __objt_cs(si_b->end)->conn->xprt->snd_pipe) &&
(pipes_used < global.maxpipes) &&
(((sess->fe->options2|s->be->options2) & PR_O2_SPLIC_REQ) ||
(((sess->fe->options2|s->be->options2) & PR_O2_SPLIC_AUT) &&
@ -2292,8 +2298,8 @@ struct task *process_stream(struct task *t)
if (!(res->flags & (CF_KERN_SPLICING|CF_SHUTR)) &&
res->to_forward &&
(global.tune.options & GTUNE_USE_SPLICE) &&
(objt_conn(si_f->end) && __objt_conn(si_f->end)->xprt && __objt_conn(si_f->end)->xprt->snd_pipe) &&
(objt_conn(si_b->end) && __objt_conn(si_b->end)->xprt && __objt_conn(si_b->end)->xprt->rcv_pipe) &&
(objt_cs(si_f->end) && __objt_cs(si_f->end)->conn->xprt && __objt_cs(si_f->end)->conn->xprt->snd_pipe) &&
(objt_cs(si_b->end) && __objt_cs(si_b->end)->conn->xprt && __objt_cs(si_b->end)->conn->xprt->rcv_pipe) &&
(pipes_used < global.maxpipes) &&
(((sess->fe->options2|s->be->options2) & PR_O2_SPLIC_RTR) ||
(((sess->fe->options2|s->be->options2) & PR_O2_SPLIC_AUT) &&
@ -2361,8 +2367,8 @@ struct task *process_stream(struct task *t)
si_b->prev_state == SI_ST_EST) {
chunk_printf(&trash, "%08x:%s.srvcls[%04x:%04x]\n",
s->uniq_id, s->be->id,
objt_conn(si_f->end) ? (unsigned short)objt_conn(si_f->end)->handle.fd : -1,
objt_conn(si_b->end) ? (unsigned short)objt_conn(si_b->end)->handle.fd : -1);
objt_cs(si_f->end) ? (unsigned short)objt_cs(si_f->end)->conn->handle.fd : -1,
objt_cs(si_b->end) ? (unsigned short)objt_cs(si_b->end)->conn->handle.fd : -1);
shut_your_big_mouth_gcc(write(1, trash.str, trash.len));
}
@ -2370,8 +2376,8 @@ struct task *process_stream(struct task *t)
si_f->prev_state == SI_ST_EST) {
chunk_printf(&trash, "%08x:%s.clicls[%04x:%04x]\n",
s->uniq_id, s->be->id,
objt_conn(si_f->end) ? (unsigned short)objt_conn(si_f->end)->handle.fd : -1,
objt_conn(si_b->end) ? (unsigned short)objt_conn(si_b->end)->handle.fd : -1);
objt_cs(si_f->end) ? (unsigned short)objt_cs(si_f->end)->conn->handle.fd : -1,
objt_cs(si_b->end) ? (unsigned short)objt_cs(si_b->end)->conn->handle.fd : -1);
shut_your_big_mouth_gcc(write(1, trash.str, trash.len));
}
}
@ -2460,8 +2466,8 @@ struct task *process_stream(struct task *t)
(!(global.mode & MODE_QUIET) || (global.mode & MODE_VERBOSE)))) {
chunk_printf(&trash, "%08x:%s.closed[%04x:%04x]\n",
s->uniq_id, s->be->id,
objt_conn(si_f->end) ? (unsigned short)objt_conn(si_f->end)->handle.fd : -1,
objt_conn(si_b->end) ? (unsigned short)objt_conn(si_b->end)->handle.fd : -1);
objt_cs(si_f->end) ? (unsigned short)objt_cs(si_f->end)->conn->handle.fd : -1,
objt_cs(si_b->end) ? (unsigned short)objt_cs(si_b->end)->conn->handle.fd : -1);
shut_your_big_mouth_gcc(write(1, trash.str, trash.len));
}
@ -2692,6 +2698,7 @@ static int stats_dump_full_strm_to_buffer(struct stream_interface *si, struct st
struct tm tm;
extern const char *monthname[12];
char pn[INET6_ADDRSTRLEN];
struct conn_stream *cs;
struct connection *conn;
struct appctx *tmpctx;
@ -2777,7 +2784,9 @@ static int stats_dump_full_strm_to_buffer(struct stream_interface *si, struct st
else
chunk_appendf(&trash, " backend=<NONE> (id=-1 mode=-)");
conn = objt_conn(strm->si[1].end);
cs = objt_cs(strm->si[1].end);
conn = cs_conn(cs);
if (conn)
conn_get_from_addr(conn);
@ -2869,14 +2878,16 @@ static int stats_dump_full_strm_to_buffer(struct stream_interface *si, struct st
TICKS_TO_MS(1000)) : "<NEVER>",
strm->si[1].err_type);
if ((conn = objt_conn(strm->si[0].end)) != NULL) {
if ((cs = objt_cs(strm->si[0].end)) != NULL) {
conn = cs->conn;
chunk_appendf(&trash,
" co0=%p ctrl=%s xprt=%s mux=%s data=%s target=%s:%p\n",
conn,
conn_get_ctrl_name(conn),
conn_get_xprt_name(conn),
conn_get_mux_name(conn),
conn_get_data_name(conn),
cs_get_data_name(cs),
obj_type_name(conn->target),
obj_base_ptr(conn->target));
@ -2898,14 +2909,16 @@ static int stats_dump_full_strm_to_buffer(struct stream_interface *si, struct st
tmpctx->applet->name);
}
if ((conn = objt_conn(strm->si[1].end)) != NULL) {
if ((cs = objt_cs(strm->si[1].end)) != NULL) {
conn = cs->conn;
chunk_appendf(&trash,
" co1=%p ctrl=%s xprt=%s mux=%s data=%s target=%s:%p\n",
conn,
conn_get_ctrl_name(conn),
conn_get_xprt_name(conn),
conn_get_mux_name(conn),
conn_get_data_name(conn),
cs_get_data_name(cs),
obj_type_name(conn->target),
obj_base_ptr(conn->target));
@ -3171,7 +3184,7 @@ static int cli_io_handler_dump_sess(struct appctx *appctx)
human_time(TICKS_TO_MS(curr_strm->res.analyse_exp - now_ms),
TICKS_TO_MS(1000)) : "");
conn = objt_conn(curr_strm->si[0].end);
conn = cs_conn(objt_cs(curr_strm->si[0].end));
chunk_appendf(&trash,
" s0=[%d,%1xh,fd=%d,ex=%s]",
curr_strm->si[0].state,
@ -3181,7 +3194,7 @@ static int cli_io_handler_dump_sess(struct appctx *appctx)
human_time(TICKS_TO_MS(curr_strm->si[0].exp - now_ms),
TICKS_TO_MS(1000)) : "");
conn = objt_conn(curr_strm->si[1].end);
conn = cs_conn(objt_cs(curr_strm->si[1].end));
chunk_appendf(&trash,
" s1=[%d,%1xh,fd=%d,ex=%s]",
curr_strm->si[1].state,

View File

@ -30,6 +30,7 @@
#include <proto/applet.h>
#include <proto/channel.h>
#include <proto/connection.h>
#include <proto/mux_pt.h>
#include <proto/pipe.h>
#include <proto/stream.h>
#include <proto/stream_interface.h>
@ -50,11 +51,11 @@ static void stream_int_shutr_applet(struct stream_interface *si);
static void stream_int_shutw_applet(struct stream_interface *si);
static void stream_int_chk_rcv_applet(struct stream_interface *si);
static void stream_int_chk_snd_applet(struct stream_interface *si);
static void si_conn_recv_cb(struct connection *conn);
static void si_conn_send_cb(struct connection *conn);
static int si_conn_wake_cb(struct connection *conn);
static int si_idle_conn_wake_cb(struct connection *conn);
static void si_idle_conn_null_cb(struct connection *conn);
static void si_cs_recv_cb(struct conn_stream *cs);
static void si_cs_send_cb(struct conn_stream *cs);
static int si_cs_wake_cb(struct conn_stream *cs);
static int si_idle_conn_wake_cb(struct conn_stream *cs);
static void si_idle_conn_null_cb(struct conn_stream *cs);
/* stream-interface operations for embedded tasks */
struct si_ops si_embedded_ops = {
@ -83,9 +84,9 @@ struct si_ops si_applet_ops = {
};
struct data_cb si_conn_cb = {
.recv = si_conn_recv_cb,
.send = si_conn_send_cb,
.wake = si_conn_wake_cb,
.recv = si_cs_recv_cb,
.send = si_cs_send_cb,
.wake = si_cs_wake_cb,
.name = "STRM",
};
@ -337,8 +338,10 @@ int conn_si_send_proxy(struct connection *conn, unsigned int flag)
* we've sent the whole proxy line. Otherwise we use connect().
*/
while (conn->send_proxy_ofs) {
struct conn_stream *cs;
int ret;
cs = conn->mux_ctx;
/* The target server expects a PROXY line to be sent first.
* If the send_proxy_ofs is negative, it corresponds to the
* offset to start sending from then end of the proxy string
@ -348,10 +351,10 @@ int conn_si_send_proxy(struct connection *conn, unsigned int flag)
* is attached to a stream interface. Otherwise we can only
* send a LOCAL line (eg: for use with health checks).
*/
if (conn->data == &si_conn_cb) {
struct stream_interface *si = conn->owner;
struct connection *remote = objt_conn(si_opposite(si)->end);
ret = make_proxy_line(trash.str, trash.size, objt_server(conn->target), remote);
if (conn->mux == &mux_pt_ops && cs->data_cb == &si_conn_cb) {
struct stream_interface *si = cs->data;
struct conn_stream *remote_cs = objt_cs(si_opposite(si)->end);
ret = make_proxy_line(trash.str, trash.size, objt_server(conn->target), remote_cs ? remote_cs->conn : NULL);
}
else {
/* The target server expects a LOCAL line to be sent first. Retrieving
@ -414,9 +417,9 @@ int conn_si_send_proxy(struct connection *conn, unsigned int flag)
* It simply sets the CO_FL_SOCK_RD_SH flag so that si_idle_conn_wake_cb()
* is notified and can kill the connection.
*/
static void si_idle_conn_null_cb(struct connection *conn)
static void si_idle_conn_null_cb(struct conn_stream *cs)
{
conn_sock_drain(conn);
conn_sock_drain(cs->conn);
}
/* Callback to be used by connection I/O handlers when some activity is detected
@ -424,9 +427,10 @@ static void si_idle_conn_null_cb(struct connection *conn)
* a close was detected on it. It returns 0 if it did nothing serious, or -1 if
* it killed the connection.
*/
static int si_idle_conn_wake_cb(struct connection *conn)
static int si_idle_conn_wake_cb(struct conn_stream *cs)
{
struct stream_interface *si = conn->owner;
struct connection *conn = cs->conn;
struct stream_interface *si = cs->data;
if (!conn_ctrl_ready(conn))
return 0;
@ -560,9 +564,10 @@ void stream_int_notify(struct stream_interface *si)
* connection's polling based on the channels and stream interface's final
* states. The function always returns 0.
*/
static int si_conn_wake_cb(struct connection *conn)
static int si_cs_wake_cb(struct conn_stream *cs)
{
struct stream_interface *si = conn->owner;
struct connection *conn = cs->conn;
struct stream_interface *si = cs->data;
struct channel *ic = si_ic(si);
struct channel *oc = si_oc(si);
@ -599,36 +604,37 @@ static int si_conn_wake_cb(struct connection *conn)
* was done above (eg: maybe some buffers got emptied).
*/
if (channel_is_empty(oc))
__conn_xprt_stop_send(conn);
__cs_stop_send(cs);
if (si->flags & SI_FL_WAIT_ROOM) {
__conn_xprt_stop_recv(conn);
__cs_stop_recv(cs);
}
else if ((ic->flags & (CF_SHUTR|CF_READ_PARTIAL|CF_DONT_READ)) == CF_READ_PARTIAL &&
channel_may_recv(ic)) {
__conn_xprt_want_recv(conn);
__cs_want_recv(cs);
}
return 0;
}
/*
* This function is called to send buffer data to a stream socket.
* It calls the transport layer's snd_buf function. It relies on the
* It calls the mux layer's snd_buf function. It relies on the
* caller to commit polling changes. The caller should check conn->flags
* for errors.
*/
static void si_conn_send(struct connection *conn)
static void si_cs_send(struct conn_stream *cs)
{
struct stream_interface *si = conn->owner;
struct connection *conn = cs->conn;
struct stream_interface *si = cs->data;
struct channel *oc = si_oc(si);
int ret;
/* ensure it's only set if a write attempt has succeeded */
oc->flags &= ~CF_WRITE_PARTIAL;
if (oc->pipe && conn->xprt->snd_pipe) {
ret = conn->xprt->snd_pipe(conn, oc->pipe);
if (oc->pipe && conn->xprt->snd_pipe && conn->mux->snd_pipe) {
ret = conn->mux->snd_pipe(cs, oc->pipe);
if (ret > 0)
oc->flags |= CF_WRITE_PARTIAL | CF_WROTE_DATA;
@ -672,7 +678,7 @@ static void si_conn_send(struct connection *conn)
if (oc->flags & CF_STREAMER)
send_flag |= CO_SFL_STREAMER;
ret = conn->xprt->snd_buf(conn, oc->buf, send_flag);
ret = conn->mux->snd_buf(cs, oc->buf, send_flag);
if (ret > 0) {
oc->flags |= CF_WRITE_PARTIAL | CF_WROTE_DATA;
@ -766,25 +772,25 @@ void stream_int_update_conn(struct stream_interface *si)
{
struct channel *ic = si_ic(si);
struct channel *oc = si_oc(si);
struct connection *conn = __objt_conn(si->end);
struct conn_stream *cs = __objt_cs(si->end);
if (!(ic->flags & CF_SHUTR)) {
/* Read not closed */
if ((ic->flags & CF_DONT_READ) || !channel_may_recv(ic))
__conn_xprt_stop_recv(conn);
__cs_stop_recv(cs);
else
__conn_xprt_want_recv(conn);
__cs_want_recv(cs);
}
if (!(oc->flags & CF_SHUTW)) {
/* Write not closed */
if (channel_is_empty(oc))
__conn_xprt_stop_send(conn);
__cs_stop_send(cs);
else
__conn_xprt_want_send(conn);
__cs_want_send(cs);
}
conn_cond_update_xprt_polling(conn);
cs_update_mux_polling(cs);
}
/*
@ -799,7 +805,8 @@ void stream_int_update_conn(struct stream_interface *si)
*/
static void stream_int_shutr_conn(struct stream_interface *si)
{
struct connection *conn = __objt_conn(si->end);
struct conn_stream *cs = __objt_cs(si->end);
struct connection *conn = cs->conn;
struct channel *ic = si_ic(si);
ic->flags &= ~CF_SHUTR_NOW;
@ -813,6 +820,7 @@ static void stream_int_shutr_conn(struct stream_interface *si)
return;
if (si_oc(si)->flags & CF_SHUTW) {
/* XXX: should just close cs ? */
conn_full_close(conn);
si->state = SI_ST_DIS;
si->exp = TICK_ETERNITY;
@ -823,7 +831,7 @@ static void stream_int_shutr_conn(struct stream_interface *si)
}
else if (conn->ctrl) {
/* we want the caller to disable polling on this FD */
conn_xprt_stop_recv(conn);
cs_stop_recv(cs);
}
}
@ -837,7 +845,8 @@ static void stream_int_shutr_conn(struct stream_interface *si)
*/
static void stream_int_shutw_conn(struct stream_interface *si)
{
struct connection *conn = __objt_conn(si->end);
struct conn_stream *cs = __objt_cs(si->end);
struct connection *conn = cs->conn;
struct channel *ic = si_ic(si);
struct channel *oc = si_oc(si);
@ -865,13 +874,21 @@ static void stream_int_shutw_conn(struct stream_interface *si)
/* quick close, the socket is alredy shut anyway */
}
else if (si->flags & SI_FL_NOLINGER) {
/* unclean data-layer shutdown */
conn_xprt_shutw_hard(conn);
/* unclean data-layer shutdown, typically an aborted request
* or a forwarded shutdown from a client to a server due to
* option abortonclose. No need for the TLS layer to try to
* emit a shutdown message.
*/
cs_shutw_hard(cs);
}
else {
/* clean data-layer shutdown */
conn_xprt_shutw(conn);
conn_sock_shutw(conn);
/* clean data-layer shutdown. This only happens on the
* frontend side, or on the backend side when forwarding
* a client close in TCP mode or in HTTP TUNNEL mode
* while option abortonclose is set. We want the TLS
* layer to try to signal it to the peer before we close.
*/
cs_shutw(cs);
/* If the stream interface is configured to disable half-open
* connections, we'll skip the shutdown(), but only if the
@ -920,25 +937,23 @@ static void stream_int_shutw_conn(struct stream_interface *si)
static void stream_int_chk_rcv_conn(struct stream_interface *si)
{
struct channel *ic = si_ic(si);
struct connection *conn = __objt_conn(si->end);
struct conn_stream *cs = __objt_cs(si->end);
if (unlikely(si->state > SI_ST_EST || (ic->flags & CF_SHUTR)))
return;
conn_refresh_polling_flags(conn);
if ((ic->flags & CF_DONT_READ) || !channel_may_recv(ic)) {
/* stop reading */
if (!(ic->flags & CF_DONT_READ)) /* full */
si->flags |= SI_FL_WAIT_ROOM;
__conn_xprt_stop_recv(conn);
__cs_stop_recv(cs);
}
else {
/* (re)start reading */
si->flags &= ~SI_FL_WAIT_ROOM;
__conn_xprt_want_recv(conn);
__cs_want_recv(cs);
}
conn_cond_update_xprt_polling(conn);
cs_update_mux_polling(cs);
}
@ -950,7 +965,7 @@ static void stream_int_chk_rcv_conn(struct stream_interface *si)
static void stream_int_chk_snd_conn(struct stream_interface *si)
{
struct channel *oc = si_oc(si);
struct connection *conn = __objt_conn(si->end);
struct conn_stream *cs = __objt_cs(si->end);
/* ensure it's only set if a write attempt has succeeded */
oc->flags &= ~CF_WRITE_PARTIAL;
@ -965,7 +980,7 @@ static void stream_int_chk_snd_conn(struct stream_interface *si)
!(si->flags & SI_FL_WAIT_DATA)) /* not waiting for data */
return;
if (conn->flags & CO_FL_XPRT_WR_ENA) {
if (cs->flags & CS_FL_DATA_WR_ENA) {
/* already subscribed to write notifications, will be called
* anyway, so let's avoid calling it especially if the reader
* is not ready.
@ -973,16 +988,12 @@ static void stream_int_chk_snd_conn(struct stream_interface *si)
return;
}
/* Before calling the data-level operations, we have to prepare
* the polling flags to ensure we properly detect changes.
*/
conn_refresh_polling_flags(conn);
__conn_xprt_want_send(conn);
__cs_want_send(cs);
si_conn_send(conn);
if (conn->flags & CO_FL_ERROR) {
si_cs_send(cs);
if (cs->conn->flags & CO_FL_ERROR) {
/* Write error on the file descriptor */
__conn_xprt_stop_both(conn);
__cs_stop_both(cs);
si->flags |= SI_FL_ERR;
goto out_wakeup;
}
@ -996,7 +1007,7 @@ static void stream_int_chk_snd_conn(struct stream_interface *si)
* ->o limit was reached. Maybe we just wrote the last
* chunk and need to close.
*/
__conn_xprt_stop_send(conn);
__cs_stop_send(cs);
if (((oc->flags & (CF_SHUTW|CF_AUTO_CLOSE|CF_SHUTW_NOW)) ==
(CF_AUTO_CLOSE|CF_SHUTW_NOW)) &&
(si->state == SI_ST_EST)) {
@ -1012,7 +1023,7 @@ static void stream_int_chk_snd_conn(struct stream_interface *si)
/* Otherwise there are remaining data to be sent in the buffer,
* which means we have to poll before doing so.
*/
__conn_xprt_want_send(conn);
__cs_want_send(cs);
si->flags &= ~SI_FL_WAIT_DATA;
if (!tick_isset(oc->wex))
oc->wex = tick_add_ifset(now_ms, oc->wto);
@ -1052,17 +1063,18 @@ static void stream_int_chk_snd_conn(struct stream_interface *si)
}
/* commit possible polling changes */
conn_cond_update_polling(conn);
cs_update_mux_polling(cs);
}
/*
* This is the callback which is called by the connection layer to receive data
* into the buffer from the connection. It iterates over the transport layer's
* into the buffer from the connection. It iterates over the mux layer's
* rcv_buf function.
*/
static void si_conn_recv_cb(struct connection *conn)
static void si_cs_recv_cb(struct conn_stream *cs)
{
struct stream_interface *si = conn->owner;
struct connection *conn = cs->conn;
struct stream_interface *si = cs->data;
struct channel *ic = si_ic(si);
int ret, max, cur_read;
int read_poll = MAX_READ_POLL_LOOPS;
@ -1081,7 +1093,7 @@ static void si_conn_recv_cb(struct connection *conn)
return;
/* stop here if we reached the end of data */
if (conn_xprt_read0_pending(conn))
if (cs->flags & CS_FL_EOS)
goto out_shutdown_r;
cur_read = 0;
@ -1101,7 +1113,7 @@ static void si_conn_recv_cb(struct connection *conn)
/* First, let's see if we may splice data across the channel without
* using a buffer.
*/
if (conn->xprt->rcv_pipe &&
if (conn->xprt->rcv_pipe && conn->mux->rcv_pipe &&
(ic->pipe || ic->to_forward >= MIN_SPLICE_FORWARD) &&
ic->flags & CF_KERN_SPLICING) {
if (buffer_not_empty(ic->buf)) {
@ -1120,7 +1132,7 @@ static void si_conn_recv_cb(struct connection *conn)
}
}
ret = conn->xprt->rcv_pipe(conn, ic->pipe, ic->to_forward);
ret = conn->mux->rcv_pipe(cs, ic->pipe, ic->to_forward);
if (ret < 0) {
/* splice not supported on this end, let's disable it */
ic->flags &= ~CF_KERN_SPLICING;
@ -1135,7 +1147,7 @@ static void si_conn_recv_cb(struct connection *conn)
ic->flags |= CF_READ_PARTIAL;
}
if (conn_xprt_read0_pending(conn))
if (cs->flags & CS_FL_EOS)
goto out_shutdown_r;
if (conn->flags & CO_FL_ERROR)
@ -1146,7 +1158,7 @@ static void si_conn_recv_cb(struct connection *conn)
* could soon be full. Let's stop before needing to poll.
*/
si->flags |= SI_FL_WAIT_ROOM;
__conn_xprt_stop_recv(conn);
__cs_stop_recv(cs);
}
/* splice not possible (anymore), let's go on on standard copy */
@ -1177,7 +1189,7 @@ static void si_conn_recv_cb(struct connection *conn)
break;
}
ret = conn->xprt->rcv_buf(conn, ic->buf, max);
ret = conn->mux->rcv_buf(cs, ic->buf, max);
if (ret <= 0)
break;
@ -1203,9 +1215,12 @@ static void si_conn_recv_cb(struct connection *conn)
}
if ((ic->flags & CF_READ_DONTWAIT) || --read_poll <= 0) {
if (__conn_xprt_done_recv(conn))
si->flags |= SI_FL_WAIT_ROOM;
break;
/*
* This used to be __conn_xprt_done_recv()
* This was changed to accomodate with the mux code,
* but we may have lost a worthwhile optimization.
*/
__cs_stop_recv(cs);
}
/* if too many bytes were missing from last read, it means that
@ -1271,7 +1286,7 @@ static void si_conn_recv_cb(struct connection *conn)
if (conn->flags & CO_FL_ERROR)
return;
if (conn_xprt_read0_pending(conn))
if (cs->flags & CS_FL_EOS)
/* connection closed */
goto out_shutdown_r;
@ -1291,9 +1306,10 @@ static void si_conn_recv_cb(struct connection *conn)
* from the buffer to the connection. It iterates over the transport layer's
* snd_buf function.
*/
static void si_conn_send_cb(struct connection *conn)
static void si_cs_send_cb(struct conn_stream *cs)
{
struct stream_interface *si = conn->owner;
struct connection *conn = cs->conn;
struct stream_interface *si = cs->data;
if (conn->flags & CO_FL_ERROR)
return;
@ -1307,7 +1323,7 @@ static void si_conn_send_cb(struct connection *conn)
return;
/* OK there are data waiting to be sent */
si_conn_send(conn);
si_cs_send(cs);
/* OK all done */
return;
@ -1320,7 +1336,7 @@ static void si_conn_send_cb(struct connection *conn)
*/
void stream_sock_read0(struct stream_interface *si)
{
struct connection *conn = __objt_conn(si->end);
struct conn_stream *cs = __objt_cs(si->end);
struct channel *ic = si_ic(si);
struct channel *oc = si_oc(si);
@ -1340,17 +1356,17 @@ void stream_sock_read0(struct stream_interface *si)
if (si->flags & SI_FL_NOHALF) {
/* we want to immediately forward this close to the write side */
/* force flag on ssl to keep stream in cache */
conn_xprt_shutw_hard(conn);
cs_shutw_hard(cs);
goto do_close;
}
/* otherwise that's just a normal read shutdown */
__conn_xprt_stop_recv(conn);
__cs_stop_recv(cs);
return;
do_close:
/* OK we completely close the socket here just as if we went through si_shut[rw]() */
conn_full_close(conn);
conn_full_close(cs->conn);
ic->flags &= ~CF_SHUTR_NOW;
ic->flags |= CF_SHUTR;