MAJOR: connection : Split struct connection into struct connection and struct conn_stream.

All the references to connections in the data path from streams and
stream_interfaces were changed to use conn_streams. Most functions named
"something_conn" were renamed to "something_cs" for this. Sometimes the
connection still is what matters (eg during a connection establishment)
and were not always renamed. The change is significant and minimal at the
same time, and was quite thoroughly tested now. As of this patch, all
accesses to the connection from upper layers go through the pass-through
mux.
This commit is contained in:
Olivier Houchard
2017-09-13 18:30:23 +02:00
committed by Willy Tarreau
parent 7a3f0dfb7b
commit 9aaf778129
17 changed files with 439 additions and 298 deletions

View File

@ -315,25 +315,25 @@ static inline void __conn_xprt_stop_recv(struct connection *c)
c->flags &= ~CO_FL_XPRT_RD_ENA; c->flags &= ~CO_FL_XPRT_RD_ENA;
} }
static inline void __cs_data_want_recv(struct conn_stream *cs) static inline void __cs_want_recv(struct conn_stream *cs)
{ {
cs->flags |= CS_FL_DATA_RD_ENA; cs->flags |= CS_FL_DATA_RD_ENA;
} }
static inline void __cs_data_stop_recv(struct conn_stream *cs) static inline void __cs_stop_recv(struct conn_stream *cs)
{ {
cs->flags &= ~CS_FL_DATA_RD_ENA; cs->flags &= ~CS_FL_DATA_RD_ENA;
} }
static inline void cs_data_want_recv(struct conn_stream *cs) static inline void cs_want_recv(struct conn_stream *cs)
{ {
__cs_data_want_recv(cs); __cs_want_recv(cs);
cs_update_mux_polling(cs); cs_update_mux_polling(cs);
} }
static inline void cs_data_stop_recv(struct conn_stream *cs) static inline void cs_stop_recv(struct conn_stream *cs)
{ {
__cs_data_stop_recv(cs); __cs_stop_recv(cs);
cs_update_mux_polling(cs); cs_update_mux_polling(cs);
} }
@ -366,36 +366,36 @@ static inline void __conn_xprt_stop_both(struct connection *c)
c->flags &= ~(CO_FL_XPRT_WR_ENA | CO_FL_XPRT_RD_ENA); c->flags &= ~(CO_FL_XPRT_WR_ENA | CO_FL_XPRT_RD_ENA);
} }
static inline void __cs_data_want_send(struct conn_stream *cs) static inline void __cs_want_send(struct conn_stream *cs)
{ {
cs->flags |= CS_FL_DATA_WR_ENA; cs->flags |= CS_FL_DATA_WR_ENA;
} }
static inline void __cs_data_stop_send(struct conn_stream *cs) static inline void __cs_stop_send(struct conn_stream *cs)
{ {
cs->flags &= ~CS_FL_DATA_WR_ENA; cs->flags &= ~CS_FL_DATA_WR_ENA;
} }
static inline void cs_data_stop_send(struct conn_stream *cs) static inline void cs_stop_send(struct conn_stream *cs)
{ {
__cs_data_stop_send(cs); __cs_stop_send(cs);
cs_update_mux_polling(cs); cs_update_mux_polling(cs);
} }
static inline void cs_data_want_send(struct conn_stream *cs) static inline void cs_want_send(struct conn_stream *cs)
{ {
__cs_data_want_send(cs); __cs_want_send(cs);
cs_update_mux_polling(cs); cs_update_mux_polling(cs);
} }
static inline void __cs_data_stop_both(struct conn_stream *cs) static inline void __cs_stop_both(struct conn_stream *cs)
{ {
cs->flags &= ~(CS_FL_DATA_WR_ENA | CS_FL_DATA_RD_ENA); cs->flags &= ~(CS_FL_DATA_WR_ENA | CS_FL_DATA_RD_ENA);
} }
static inline void cs_data_stop_both(struct conn_stream *cs) static inline void cs_stop_both(struct conn_stream *cs)
{ {
__cs_data_stop_both(cs); __cs_stop_both(cs);
cs_update_mux_polling(cs); cs_update_mux_polling(cs);
} }
@ -537,6 +537,45 @@ static inline void conn_xprt_shutw_hard(struct connection *c)
c->xprt->shutw(c, 0); c->xprt->shutw(c, 0);
} }
/* shut read after draining possibly pending data */
static inline void cs_shutr(struct conn_stream *cs)
{
__cs_stop_recv(cs);
/* clean data-layer shutdown */
if (cs->conn->mux && cs->conn->mux->shutr)
cs->conn->mux->shutr(cs, 1);
}
/* shut read after disabling lingering */
static inline void cs_shutr_hard(struct conn_stream *cs)
{
__cs_stop_recv(cs);
/* clean data-layer shutdown */
if (cs->conn->mux && cs->conn->mux->shutr)
cs->conn->mux->shutr(cs, 0);
}
static inline void cs_shutw(struct conn_stream *cs)
{
__cs_stop_send(cs);
/* clean data-layer shutdown */
if (cs->conn->mux && cs->conn->mux->shutw)
cs->conn->mux->shutw(cs, 1);
}
static inline void cs_shutw_hard(struct conn_stream *cs)
{
__cs_stop_send(cs);
/* unclean data-layer shutdown */
if (cs->conn->mux && cs->conn->mux->shutw)
cs->conn->mux->shutw(cs, 0);
}
/* detect sock->data read0 transition */ /* detect sock->data read0 transition */
static inline int conn_xprt_read0_pending(struct connection *c) static inline int conn_xprt_read0_pending(struct connection *c)
{ {
@ -576,7 +615,6 @@ static inline void conn_init(struct connection *conn)
{ {
conn->obj_type = OBJ_TYPE_CONN; conn->obj_type = OBJ_TYPE_CONN;
conn->flags = CO_FL_NONE; conn->flags = CO_FL_NONE;
conn->data = NULL;
conn->tmp_early_data = -1; conn->tmp_early_data = -1;
conn->mux = NULL; conn->mux = NULL;
conn->mux_ctx = NULL; conn->mux_ctx = NULL;
@ -622,31 +660,43 @@ static inline struct connection *conn_new()
return conn; return conn;
} }
/* Tries to allocate a new conn_stream and initialize its main fields. The
* connection is returned on success, NULL on failure. The connection must
* be released using pool_free2() or conn_free().
*/
static inline struct conn_stream *cs_new(struct connection *conn)
{
struct conn_stream *cs;
cs = pool_alloc2(pool2_connstream);
if (likely(cs != NULL))
cs_init(cs, conn);
return cs;
}
/* Releases a conn_stream previously allocated by cs_new() */ /* Releases a conn_stream previously allocated by cs_new() */
static inline void cs_free(struct conn_stream *cs) static inline void cs_free(struct conn_stream *cs)
{ {
pool_free2(pool2_connstream, cs); pool_free2(pool2_connstream, cs);
} }
/* Tries to allocate a new conn_stream and initialize its main fields. If
* <conn> is NULL, then a new connection is allocated on the fly, initialized,
* and assigned to cs->conn ; this connection will then have to be released
* using pool_free2() or conn_free(). The conn_stream is initialized and added
* to the mux's stream list on success, then returned. On failure, nothing is
* allocated and NULL is returned.
*/
static inline struct conn_stream *cs_new(struct connection *conn)
{
struct conn_stream *cs;
cs = pool_alloc2(pool2_connstream);
if (!likely(cs))
return NULL;
if (!conn) {
conn = conn_new();
if (!likely(conn)) {
cs_free(cs);
return NULL;
}
conn_init(conn);
}
cs_init(cs, conn);
return cs;
}
/* Releases a connection previously allocated by conn_new() */ /* Releases a connection previously allocated by conn_new() */
static inline void conn_free(struct connection *conn) static inline void conn_free(struct connection *conn)
{ {
if (conn->mux && conn->mux->release)
conn->mux->release(conn);
pool_free2(pool2_connection, conn); pool_free2(pool2_connection, conn);
} }
@ -700,11 +750,11 @@ static inline void conn_get_to_addr(struct connection *conn)
conn->flags |= CO_FL_ADDR_TO_SET; conn->flags |= CO_FL_ADDR_TO_SET;
} }
/* Attaches a connection to an owner and assigns a data layer */ /* Attaches a conn_stream to a data layer and sets the relevant callbacks */
static inline void conn_attach(struct connection *conn, void *owner, const struct data_cb *data) static inline void cs_attach(struct conn_stream *cs, void *data, const struct data_cb *data_cb)
{ {
conn->data = data; cs->data_cb = data_cb;
conn->owner = owner; cs->data = data;
} }
/* Installs the connection's mux layer for upper context <ctx>. /* Installs the connection's mux layer for upper context <ctx>.
@ -789,11 +839,11 @@ static inline const char *conn_get_mux_name(const struct connection *conn)
return conn->mux->name; return conn->mux->name;
} }
static inline const char *conn_get_data_name(const struct connection *conn) static inline const char *cs_get_data_name(const struct conn_stream *cs)
{ {
if (!conn->data) if (!cs->data_cb)
return "NONE"; return "NONE";
return conn->data->name; return cs->data_cb->name;
} }
/* registers pointer to transport layer <id> (XPRT_*) */ /* registers pointer to transport layer <id> (XPRT_*) */

View File

@ -36,7 +36,7 @@ extern struct list streams;
extern struct data_cb sess_conn_cb; extern struct data_cb sess_conn_cb;
struct stream *stream_new(struct session *sess, enum obj_type *origin); struct stream *stream_new(struct session *sess, enum obj_type *origin);
int stream_create_from_conn(struct connection *conn); int stream_create_from_cs(struct conn_stream *cs);
/* perform minimal intializations, report 0 in case of error, 1 if OK. */ /* perform minimal intializations, report 0 in case of error, 1 if OK. */
int init_stream(); int init_stream();

View File

@ -152,18 +152,14 @@ static inline enum obj_type *si_detach_endpoint(struct stream_interface *si)
*/ */
static inline void si_release_endpoint(struct stream_interface *si) static inline void si_release_endpoint(struct stream_interface *si)
{ {
struct connection *conn; struct conn_stream *cs;
struct appctx *appctx; struct appctx *appctx;
if (!si->end) if (!si->end)
return; return;
if ((conn = objt_conn(si->end))) { if ((cs = objt_cs(si->end)))
LIST_DEL(&conn->list); cs_destroy(cs);
conn_stop_tracking(conn);
conn_full_close(conn);
conn_free(conn);
}
else if ((appctx = objt_appctx(si->end))) { else if ((appctx = objt_appctx(si->end))) {
if (appctx->applet->release && si->state < SI_ST_DIS) if (appctx->applet->release && si->state < SI_ST_DIS)
appctx->applet->release(appctx); appctx->applet->release(appctx);
@ -178,26 +174,27 @@ static inline void si_release_endpoint(struct stream_interface *si)
* connection will also be added at the head of this list. This connection * connection will also be added at the head of this list. This connection
* remains assigned to the stream interface it is currently attached to. * remains assigned to the stream interface it is currently attached to.
*/ */
static inline void si_idle_conn(struct stream_interface *si, struct list *pool) static inline void si_idle_cs(struct stream_interface *si, struct list *pool)
{ {
struct connection *conn = __objt_conn(si->end); struct conn_stream *cs = __objt_cs(si->end);
struct connection *conn = cs->conn;
if (pool) if (pool)
LIST_ADD(pool, &conn->list); LIST_ADD(pool, &conn->list);
conn_attach(conn, si, &si_idle_conn_cb); cs_attach(cs, si, &si_idle_conn_cb);
conn_xprt_want_recv(conn); cs_want_recv(cs);
} }
/* Attach connection <conn> to the stream interface <si>. The stream interface /* Attach conn_stream <cs> to the stream interface <si>. The stream interface
* is configured to work with a connection and the connection it configured * is configured to work with a connection and the connection it configured
* with a stream interface data layer. * with a stream interface data layer.
*/ */
static inline void si_attach_conn(struct stream_interface *si, struct connection *conn) static inline void si_attach_cs(struct stream_interface *si, struct conn_stream *cs)
{ {
si->ops = &si_conn_ops; si->ops = &si_conn_ops;
si->end = &conn->obj_type; si->end = &cs->obj_type;
conn_attach(conn, si, &si_conn_cb); cs_attach(cs, si, &si_conn_cb);
} }
/* Returns true if a connection is attached to the stream interface <si> and /* Returns true if a connection is attached to the stream interface <si> and
@ -205,7 +202,7 @@ static inline void si_attach_conn(struct stream_interface *si, struct connection
*/ */
static inline int si_conn_ready(struct stream_interface *si) static inline int si_conn_ready(struct stream_interface *si)
{ {
struct connection *conn = objt_conn(si->end); struct connection *conn = cs_conn(objt_cs(si->end));
return conn && conn_ctrl_ready(conn) && conn_xprt_ready(conn); return conn && conn_ctrl_ready(conn) && conn_xprt_ready(conn);
} }
@ -276,22 +273,22 @@ static inline void si_applet_stop_get(struct stream_interface *si)
si->flags &= ~SI_FL_WANT_GET; si->flags &= ~SI_FL_WANT_GET;
} }
/* Try to allocate a new connection and assign it to the interface. If /* Try to allocate a new conn_stream and assign it to the interface. If
* an endpoint was previously allocated, it is released first. The newly * an endpoint was previously allocated, it is released first. The newly
* allocated connection is initialized, assigned to the stream interface, * allocated conn_stream is initialized, assigned to the stream interface,
* and returned. * and returned.
*/ */
static inline struct connection *si_alloc_conn(struct stream_interface *si) static inline struct conn_stream *si_alloc_cs(struct stream_interface *si, struct connection *conn)
{ {
struct connection *conn; struct conn_stream *cs;
si_release_endpoint(si); si_release_endpoint(si);
conn = conn_new(); cs = cs_new(conn);
if (conn) if (cs)
si_attach_conn(si, conn); si_attach_cs(si, cs);
return conn; return cs;
} }
/* Release the interface's existing endpoint (connection or appctx) and /* Release the interface's existing endpoint (connection or appctx) and
@ -346,7 +343,8 @@ static inline void si_chk_snd(struct stream_interface *si)
/* Calls chk_snd on the connection using the ctrl layer */ /* Calls chk_snd on the connection using the ctrl layer */
static inline int si_connect(struct stream_interface *si) static inline int si_connect(struct stream_interface *si)
{ {
struct connection *conn = objt_conn(si->end); struct conn_stream *cs = objt_cs(si->end);
struct connection *conn = cs_conn(cs);
int ret = SF_ERR_NONE; int ret = SF_ERR_NONE;
if (unlikely(!conn || !conn->ctrl || !conn->ctrl->connect)) if (unlikely(!conn || !conn->ctrl || !conn->ctrl->connect))
@ -364,7 +362,7 @@ static inline int si_connect(struct stream_interface *si)
/* reuse the existing connection */ /* reuse the existing connection */
if (!channel_is_empty(si_oc(si))) { if (!channel_is_empty(si_oc(si))) {
/* we'll have to send a request there. */ /* we'll have to send a request there. */
conn_xprt_want_send(conn); cs_want_send(cs);
} }
/* the connection is established */ /* the connection is established */

View File

@ -157,7 +157,7 @@ enum {
struct check { struct check {
struct xprt_ops *xprt; /* transport layer operations for health checks */ struct xprt_ops *xprt; /* transport layer operations for health checks */
struct connection *conn; /* connection state for health checks */ struct conn_stream *cs; /* conn_stream state for health checks */
unsigned short port; /* the port to use for the health checks */ unsigned short port; /* the port to use for the health checks */
struct buffer *bi, *bo; /* input and output buffers to send/recv check */ struct buffer *bi, *bo; /* input and output buffers to send/recv check */
struct task *task; /* the task associated to the health check processing, NULL if disabled */ struct task *task; /* the task associated to the health check processing, NULL if disabled */

View File

@ -288,9 +288,9 @@ struct mux_ops {
* data movement. It may abort a connection by returning < 0. * data movement. It may abort a connection by returning < 0.
*/ */
struct data_cb { struct data_cb {
void (*recv)(struct connection *conn); /* data-layer recv callback */ void (*recv)(struct conn_stream *cs); /* data-layer recv callback */
void (*send)(struct connection *conn); /* data-layer send callback */ void (*send)(struct conn_stream *cs); /* data-layer send callback */
int (*wake)(struct connection *conn); /* data-layer callback to report activity */ int (*wake)(struct conn_stream *cs); /* data-layer callback to report activity */
char name[8]; /* data layer name, zero-terminated */ char name[8]; /* data layer name, zero-terminated */
}; };
@ -347,10 +347,9 @@ struct connection {
const struct protocol *ctrl; /* operations at the socket layer */ const struct protocol *ctrl; /* operations at the socket layer */
const struct xprt_ops *xprt; /* operations at the transport layer */ const struct xprt_ops *xprt; /* operations at the transport layer */
const struct mux_ops *mux; /* mux layer opreations. Must be set before xprt->init() */ const struct mux_ops *mux; /* mux layer opreations. Must be set before xprt->init() */
const struct data_cb *data; /* data layer callbacks. Must be set before xprt->init() */
void *xprt_ctx; /* general purpose pointer, initialized to NULL */ void *xprt_ctx; /* general purpose pointer, initialized to NULL */
void *mux_ctx; /* mux-specific context, initialized to NULL */ void *mux_ctx; /* mux-specific context, initialized to NULL */
void *owner; /* pointer to upper layer's entity (eg: session, stream interface) */ void *owner; /* pointer to the owner session for incoming connections, or NULL */
int xprt_st; /* transport layer state, initialized to zero */ int xprt_st; /* transport layer state, initialized to zero */
int tmp_early_data; /* 1st byte of early data, if any */ int tmp_early_data; /* 1st byte of early data, if any */
union conn_handle handle; /* connection handle at the socket layer */ union conn_handle handle; /* connection handle at the socket layer */

View File

@ -567,7 +567,7 @@ int assign_server(struct stream *s)
srv = NULL; srv = NULL;
s->target = NULL; s->target = NULL;
conn = objt_conn(s->si[1].end); conn = cs_conn(objt_cs(s->si[1].end));
if (conn && if (conn &&
(conn->flags & CO_FL_CONNECTED) && (conn->flags & CO_FL_CONNECTED) &&
@ -720,8 +720,7 @@ int assign_server(struct stream *s)
s->target = &s->be->obj_type; s->target = &s->be->obj_type;
} }
else if ((s->be->options & PR_O_HTTP_PROXY) && else if ((s->be->options & PR_O_HTTP_PROXY) &&
(conn = objt_conn(s->si[1].end)) && conn && is_addr(&conn->addr.to)) {
is_addr(&conn->addr.to)) {
/* in proxy mode, we need a valid destination address */ /* in proxy mode, we need a valid destination address */
s->target = &s->be->obj_type; s->target = &s->be->obj_type;
} }
@ -769,7 +768,7 @@ int assign_server(struct stream *s)
int assign_server_address(struct stream *s) int assign_server_address(struct stream *s)
{ {
struct connection *cli_conn = objt_conn(strm_orig(s)); struct connection *cli_conn = objt_conn(strm_orig(s));
struct connection *srv_conn = objt_conn(s->si[1].end); struct connection *srv_conn = cs_conn(objt_cs(s->si[1].end));
#ifdef DEBUG_FULL #ifdef DEBUG_FULL
fprintf(stderr,"assign_server_address : s=%p\n",s); fprintf(stderr,"assign_server_address : s=%p\n",s);
@ -973,7 +972,7 @@ static void assign_tproxy_address(struct stream *s)
struct server *srv = objt_server(s->target); struct server *srv = objt_server(s->target);
struct conn_src *src; struct conn_src *src;
struct connection *cli_conn; struct connection *cli_conn;
struct connection *srv_conn = objt_conn(s->si[1].end); struct connection *srv_conn = cs_conn(objt_cs(s->si[1].end));
if (srv && srv->conn_src.opts & CO_SRC_BIND) if (srv && srv->conn_src.opts & CO_SRC_BIND)
src = &srv->conn_src; src = &srv->conn_src;
@ -1041,21 +1040,23 @@ int connect_server(struct stream *s)
{ {
struct connection *cli_conn; struct connection *cli_conn;
struct connection *srv_conn; struct connection *srv_conn;
struct connection *old_conn; struct conn_stream *srv_cs;
struct conn_stream *old_cs;
struct server *srv; struct server *srv;
int reuse = 0; int reuse = 0;
int err; int err;
srv = objt_server(s->target); srv = objt_server(s->target);
srv_conn = objt_conn(s->si[1].end); srv_cs = objt_cs(s->si[1].end);
srv_conn = cs_conn(srv_cs);
if (srv_conn) if (srv_conn)
reuse = s->target == srv_conn->target; reuse = s->target == srv_conn->target;
if (srv && !reuse) { if (srv && !reuse) {
old_conn = srv_conn; old_cs = srv_cs;
if (old_conn) { if (old_cs) {
srv_conn = NULL; srv_conn = NULL;
old_conn->owner = NULL; srv_cs->data = NULL;
si_detach_endpoint(&s->si[1]); si_detach_endpoint(&s->si[1]);
/* note: if the connection was in a server's idle /* note: if the connection was in a server's idle
* queue, it doesn't get dequeued. * queue, it doesn't get dequeued.
@ -1101,23 +1102,25 @@ int connect_server(struct stream *s)
LIST_DEL(&srv_conn->list); LIST_DEL(&srv_conn->list);
LIST_INIT(&srv_conn->list); LIST_INIT(&srv_conn->list);
if (srv_conn->owner) { /* XXX cognet: this assumes only 1 conn_stream per
si_detach_endpoint(srv_conn->owner); * connection, has to be revisited later
if (old_conn && !(old_conn->flags & CO_FL_PRIVATE)) { */
si_attach_conn(srv_conn->owner, old_conn); srv_cs = srv_conn->mux_ctx;
si_idle_conn(srv_conn->owner, NULL);
if (srv_conn->mux == &mux_pt_ops && srv_cs->data) {
si_detach_endpoint(srv_cs->data);
if (old_cs && !(old_cs->conn->flags & CO_FL_PRIVATE)) {
si_attach_cs(srv_cs->data, old_cs);
si_idle_cs(srv_cs->data, NULL);
} }
} }
si_attach_conn(&s->si[1], srv_conn); si_attach_cs(&s->si[1], srv_cs);
reuse = 1; reuse = 1;
} }
/* we may have to release our connection if we couldn't swap it */ /* we may have to release our connection if we couldn't swap it */
if (old_conn && !old_conn->owner) { if (old_cs && !old_cs->data)
LIST_DEL(&old_conn->list); cs_destroy(old_cs);
conn_full_close(old_conn);
conn_free(old_conn);
}
} }
if (reuse) { if (reuse) {
@ -1136,15 +1139,16 @@ int connect_server(struct stream *s)
} }
} }
if (!reuse) if (!reuse) {
srv_conn = si_alloc_conn(&s->si[1]); srv_cs = si_alloc_cs(&s->si[1], NULL);
else { srv_conn = cs_conn(srv_cs);
} else {
/* reusing our connection, take it out of the idle list */ /* reusing our connection, take it out of the idle list */
LIST_DEL(&srv_conn->list); LIST_DEL(&srv_conn->list);
LIST_INIT(&srv_conn->list); LIST_INIT(&srv_conn->list);
} }
if (!srv_conn) if (!srv_cs)
return SF_ERR_RESOURCE; return SF_ERR_RESOURCE;
if (!(s->flags & SF_ADDR_SET)) { if (!(s->flags & SF_ADDR_SET)) {
@ -1160,14 +1164,16 @@ int connect_server(struct stream *s)
/* set the correct protocol on the output stream interface */ /* set the correct protocol on the output stream interface */
if (srv) { if (srv) {
conn_prepare(srv_conn, protocol_by_family(srv_conn->addr.to.ss_family), srv->xprt); conn_prepare(srv_conn, protocol_by_family(srv_conn->addr.to.ss_family), srv->xprt);
conn_install_mux(srv_conn, &mux_pt_ops, srv_conn); /* XXX: Pick the right mux, when we finally have one */
conn_install_mux(srv_conn, &mux_pt_ops, srv_cs);
} }
else if (obj_type(s->target) == OBJ_TYPE_PROXY) { else if (obj_type(s->target) == OBJ_TYPE_PROXY) {
/* proxies exclusively run on raw_sock right now */ /* proxies exclusively run on raw_sock right now */
conn_prepare(srv_conn, protocol_by_family(srv_conn->addr.to.ss_family), xprt_get(XPRT_RAW)); conn_prepare(srv_conn, protocol_by_family(srv_conn->addr.to.ss_family), xprt_get(XPRT_RAW));
if (!objt_conn(s->si[1].end) || !objt_conn(s->si[1].end)->ctrl) if (!objt_cs(s->si[1].end) || !objt_cs(s->si[1].end)->conn->ctrl)
return SF_ERR_INTERNAL; return SF_ERR_INTERNAL;
conn_install_mux(srv_conn, &mux_pt_ops, srv_conn); /* XXX: Pick the right mux, when we finally have one */
conn_install_mux(srv_conn, &mux_pt_ops, srv_cs);
} }
else else
return SF_ERR_INTERNAL; /* how did we get there ? */ return SF_ERR_INTERNAL; /* how did we get there ? */
@ -1182,13 +1188,13 @@ int connect_server(struct stream *s)
conn_get_to_addr(cli_conn); conn_get_to_addr(cli_conn);
} }
si_attach_conn(&s->si[1], srv_conn); si_attach_cs(&s->si[1], srv_cs);
assign_tproxy_address(s); assign_tproxy_address(s);
} }
else { else {
/* the connection is being reused, just re-attach it */ /* the connection is being reused, just re-attach it */
si_attach_conn(&s->si[1], srv_conn); si_attach_cs(&s->si[1], srv_cs);
s->flags |= SF_SRV_REUSED; s->flags |= SF_SRV_REUSED;
} }

View File

@ -582,7 +582,8 @@ static int retrieve_errno_from_socket(struct connection *conn)
*/ */
static void chk_report_conn_err(struct check *check, int errno_bck, int expired) static void chk_report_conn_err(struct check *check, int errno_bck, int expired)
{ {
struct connection *conn = check->conn; struct conn_stream *cs = check->cs;
struct connection *conn = cs_conn(cs);
const char *err_msg; const char *err_msg;
struct chunk *chk; struct chunk *chk;
int step; int step;
@ -705,9 +706,10 @@ static void chk_report_conn_err(struct check *check, int errno_bck, int expired)
* it sends the request. In other cases, it calls set_server_check_status() * it sends the request. In other cases, it calls set_server_check_status()
* to set check->status, check->duration and check->result. * to set check->status, check->duration and check->result.
*/ */
static void event_srv_chk_w(struct connection *conn) static void event_srv_chk_w(struct conn_stream *cs)
{ {
struct check *check = conn->owner; struct connection *conn = cs->conn;
struct check *check = cs->data;
struct server *s = check->server; struct server *s = check->server;
struct task *t = check->task; struct task *t = check->task;
@ -719,7 +721,7 @@ static void event_srv_chk_w(struct connection *conn)
if (retrieve_errno_from_socket(conn)) { if (retrieve_errno_from_socket(conn)) {
chk_report_conn_err(check, errno, 0); chk_report_conn_err(check, errno, 0);
__conn_xprt_stop_both(conn); __cs_stop_both(cs);
goto out_wakeup; goto out_wakeup;
} }
@ -741,10 +743,10 @@ static void event_srv_chk_w(struct connection *conn)
return; return;
if (check->bo->o) { if (check->bo->o) {
conn->xprt->snd_buf(conn, check->bo, 0); conn->mux->snd_buf(cs, check->bo, 0);
if (conn->flags & CO_FL_ERROR) { if (conn->flags & CO_FL_ERROR) {
chk_report_conn_err(check, errno, 0); chk_report_conn_err(check, errno, 0);
__conn_xprt_stop_both(conn); __cs_stop_both(cs);
goto out_wakeup; goto out_wakeup;
} }
if (check->bo->o) if (check->bo->o)
@ -761,7 +763,7 @@ static void event_srv_chk_w(struct connection *conn)
out_wakeup: out_wakeup:
task_wakeup(t, TASK_WOKEN_IO); task_wakeup(t, TASK_WOKEN_IO);
out_nowake: out_nowake:
__conn_xprt_stop_send(conn); /* nothing more to write */ __cs_stop_send(cs); /* nothing more to write */
} }
/* /*
@ -778,9 +780,10 @@ static void event_srv_chk_w(struct connection *conn)
* call it with a proper error status like HCHK_STATUS_L7STS, HCHK_STATUS_L6RSP, * call it with a proper error status like HCHK_STATUS_L7STS, HCHK_STATUS_L6RSP,
* etc. * etc.
*/ */
static void event_srv_chk_r(struct connection *conn) static void event_srv_chk_r(struct conn_stream *cs)
{ {
struct check *check = conn->owner; struct connection *conn = cs->conn;
struct check *check = cs->data;
struct server *s = check->server; struct server *s = check->server;
struct task *t = check->task; struct task *t = check->task;
char *desc; char *desc;
@ -815,7 +818,7 @@ static void event_srv_chk_r(struct connection *conn)
done = 0; done = 0;
conn->xprt->rcv_buf(conn, check->bi, check->bi->size); conn->mux->rcv_buf(cs, check->bi, check->bi->size);
if (conn->flags & (CO_FL_ERROR | CO_FL_SOCK_RD_SH)) { if (conn->flags & (CO_FL_ERROR | CO_FL_SOCK_RD_SH)) {
done = 1; done = 1;
if ((conn->flags & CO_FL_ERROR) && !check->bi->i) { if ((conn->flags & CO_FL_ERROR) && !check->bi->i) {
@ -1339,8 +1342,8 @@ static void event_srv_chk_r(struct connection *conn)
* range quickly. To avoid sending RSTs all the time, we first try to * range quickly. To avoid sending RSTs all the time, we first try to
* drain pending data. * drain pending data.
*/ */
__conn_xprt_stop_both(conn); __cs_stop_both(cs);
conn_xprt_shutw(conn); cs_shutw(cs);
/* OK, let's not stay here forever */ /* OK, let's not stay here forever */
if (check->result == CHK_RES_FAILED) if (check->result == CHK_RES_FAILED)
@ -1350,7 +1353,7 @@ static void event_srv_chk_r(struct connection *conn)
return; return;
wait_more_data: wait_more_data:
__conn_xprt_want_recv(conn); __cs_want_recv(cs);
} }
/* /*
@ -1359,15 +1362,17 @@ static void event_srv_chk_r(struct connection *conn)
* It returns 0 on normal cases, <0 if at least one close() has happened on the * It returns 0 on normal cases, <0 if at least one close() has happened on the
* connection (eg: reconnect). * connection (eg: reconnect).
*/ */
static int wake_srv_chk(struct connection *conn) static int wake_srv_chk(struct conn_stream *cs)
{ {
struct check *check = conn->owner; struct connection *conn = cs->conn;
struct check *check = cs->data;
int ret = 0; int ret = 0;
/* we may have to make progress on the TCP checks */ /* we may have to make progress on the TCP checks */
if (check->type == PR_O2_TCPCHK_CHK) { if (check->type == PR_O2_TCPCHK_CHK) {
ret = tcpcheck_main(check); ret = tcpcheck_main(check);
conn = check->conn; cs = check->cs;
conn = cs_conn(cs);
} }
if (unlikely(conn->flags & CO_FL_ERROR)) { if (unlikely(conn->flags & CO_FL_ERROR)) {
@ -1378,8 +1383,7 @@ static int wake_srv_chk(struct connection *conn)
* we expect errno to still be valid. * we expect errno to still be valid.
*/ */
chk_report_conn_err(check, errno, 0); chk_report_conn_err(check, errno, 0);
__cs_stop_both(cs);
__conn_xprt_stop_both(conn);
task_wakeup(check->task, TASK_WOKEN_IO); task_wakeup(check->task, TASK_WOKEN_IO);
} }
else if (!(conn->flags & (CO_FL_XPRT_RD_ENA|CO_FL_XPRT_WR_ENA|CO_FL_HANDSHAKE))) { else if (!(conn->flags & (CO_FL_XPRT_RD_ENA|CO_FL_XPRT_WR_ENA|CO_FL_HANDSHAKE))) {
@ -1478,7 +1482,8 @@ static int connect_conn_chk(struct task *t)
{ {
struct check *check = t->context; struct check *check = t->context;
struct server *s = check->server; struct server *s = check->server;
struct connection *conn = check->conn; struct conn_stream *cs = check->cs;
struct connection *conn = cs_conn(cs);
struct protocol *proto; struct protocol *proto;
struct tcpcheck_rule *tcp_rule = NULL; struct tcpcheck_rule *tcp_rule = NULL;
int ret; int ret;
@ -1535,9 +1540,10 @@ static int connect_conn_chk(struct task *t)
} }
/* prepare a new connection */ /* prepare a new connection */
conn = check->conn = conn_new(); cs = check->cs = cs_new(NULL);
if (!check->conn) if (!check->cs)
return SF_ERR_RESOURCE; return SF_ERR_RESOURCE;
conn = cs->conn;
if (is_addr(&check->addr)) { if (is_addr(&check->addr)) {
/* we'll connect to the check addr specified on the server */ /* we'll connect to the check addr specified on the server */
@ -1553,7 +1559,7 @@ static int connect_conn_chk(struct task *t)
i = srv_check_healthcheck_port(check); i = srv_check_healthcheck_port(check);
if (i == 0) { if (i == 0) {
conn->owner = check; cs->data = check;
return SF_ERR_CHK_PORT; return SF_ERR_CHK_PORT;
} }
@ -1563,8 +1569,8 @@ static int connect_conn_chk(struct task *t)
proto = protocol_by_family(conn->addr.to.ss_family); proto = protocol_by_family(conn->addr.to.ss_family);
conn_prepare(conn, proto, check->xprt); conn_prepare(conn, proto, check->xprt);
conn_install_mux(conn, &mux_pt_ops, conn); conn_install_mux(conn, &mux_pt_ops, cs);
conn_attach(conn, check, &check_conn_cb); cs_attach(cs, check, &check_conn_cb);
conn->target = &s->obj_type; conn->target = &s->obj_type;
/* no client address */ /* no client address */
@ -2077,7 +2083,8 @@ static struct task *process_chk_conn(struct task *t)
{ {
struct check *check = t->context; struct check *check = t->context;
struct server *s = check->server; struct server *s = check->server;
struct connection *conn = check->conn; struct conn_stream *cs = check->cs;
struct connection *conn = cs_conn(cs);
int rv; int rv;
int ret; int ret;
int expired = tick_is_expired(t->expire, now_ms); int expired = tick_is_expired(t->expire, now_ms);
@ -2105,7 +2112,8 @@ static struct task *process_chk_conn(struct task *t)
check->bo->o = 0; check->bo->o = 0;
ret = connect_conn_chk(t); ret = connect_conn_chk(t);
conn = check->conn; cs = check->cs;
conn = cs_conn(cs);
switch (ret) { switch (ret) {
case SF_ERR_UP: case SF_ERR_UP:
@ -2123,7 +2131,7 @@ static struct task *process_chk_conn(struct task *t)
} }
if (check->type) if (check->type)
conn_xprt_want_recv(conn); /* prepare for reading a possible reply */ cs_want_recv(cs); /* prepare for reading a possible reply */
task_set_affinity(t, tid_bit); task_set_affinity(t, tid_bit);
goto reschedule; goto reschedule;
@ -2147,9 +2155,10 @@ static struct task *process_chk_conn(struct task *t)
} }
/* here, we have seen a synchronous error, no fd was allocated */ /* here, we have seen a synchronous error, no fd was allocated */
if (conn) { if (cs) {
conn_free(conn); cs_destroy(cs);
check->conn = conn = NULL; cs = check->cs = NULL;
conn = NULL;
} }
check->state &= ~CHK_ST_INPROGRESS; check->state &= ~CHK_ST_INPROGRESS;
@ -2201,8 +2210,9 @@ static struct task *process_chk_conn(struct task *t)
} }
if (conn) { if (conn) {
conn_free(conn); cs_destroy(cs);
check->conn = conn = NULL; cs = check->cs = NULL;
conn = NULL;
} }
if (check->result == CHK_RES_FAILED) { if (check->result == CHK_RES_FAILED) {
@ -2550,7 +2560,8 @@ static int tcpcheck_main(struct check *check)
char *contentptr, *comment; char *contentptr, *comment;
struct tcpcheck_rule *next; struct tcpcheck_rule *next;
int done = 0, ret = 0, step = 0; int done = 0, ret = 0, step = 0;
struct connection *conn = check->conn; struct conn_stream *cs = check->cs;
struct connection *conn = cs_conn(cs);
struct server *s = check->server; struct server *s = check->server;
struct task *t = check->task; struct task *t = check->task;
struct list *head = check->tcpcheck_rules; struct list *head = check->tcpcheck_rules;
@ -2619,8 +2630,8 @@ static int tcpcheck_main(struct check *check)
} }
/* It's only the rules which will enable send/recv */ /* It's only the rules which will enable send/recv */
if (conn) if (cs)
__conn_xprt_stop_both(conn); cs_stop_both(cs);
while (1) { while (1) {
/* We have to try to flush the output buffer before reading, at /* We have to try to flush the output buffer before reading, at
@ -2633,11 +2644,11 @@ static int tcpcheck_main(struct check *check)
check->current_step->action != TCPCHK_ACT_SEND || check->current_step->action != TCPCHK_ACT_SEND ||
check->current_step->string_len >= buffer_total_space(check->bo))) { check->current_step->string_len >= buffer_total_space(check->bo))) {
__conn_xprt_want_send(conn); __cs_want_send(cs);
if (conn->xprt->snd_buf(conn, check->bo, 0) <= 0) { if (conn->mux->snd_buf(cs, check->bo, 0) <= 0) {
if (conn->flags & CO_FL_ERROR) { if (conn->flags & CO_FL_ERROR) {
chk_report_conn_err(check, errno, 0); chk_report_conn_err(check, errno, 0);
__conn_xprt_stop_both(conn); __cs_stop_both(cs);
goto out_end_tcpcheck; goto out_end_tcpcheck;
} }
break; break;
@ -2673,8 +2684,9 @@ static int tcpcheck_main(struct check *check)
* 2: try to get a new connection * 2: try to get a new connection
* 3: release and replace the old one on success * 3: release and replace the old one on success
*/ */
if (check->conn) { if (check->cs) {
conn_full_close(check->conn); /* XXX: need to kill all CS here as well but not to free them yet */
conn_full_close(check->cs->conn);
retcode = -1; /* do not reuse the fd! */ retcode = -1; /* do not reuse the fd! */
} }
@ -2682,8 +2694,8 @@ static int tcpcheck_main(struct check *check)
check->last_started_step = check->current_step; check->last_started_step = check->current_step;
/* prepare new connection */ /* prepare new connection */
conn = conn_new(); cs = cs_new(NULL);
if (!conn) { if (!cs) {
step = tcpcheck_get_step_id(check); step = tcpcheck_get_step_id(check);
chunk_printf(&trash, "TCPCHK error allocating connection at step %d", step); chunk_printf(&trash, "TCPCHK error allocating connection at step %d", step);
comment = tcpcheck_get_step_comment(check, step); comment = tcpcheck_get_step_comment(check, step);
@ -2694,11 +2706,15 @@ static int tcpcheck_main(struct check *check)
return retcode; return retcode;
} }
if (check->conn) if (check->cs) {
conn_free(check->conn); if (check->cs->conn)
check->conn = conn; conn_free(check->cs->conn);
cs_free(check->cs);
}
conn_attach(conn, check, &check_conn_cb); check->cs = cs;
conn = cs->conn;
cs_attach(cs, check, &check_conn_cb);
conn->target = &s->obj_type; conn->target = &s->obj_type;
/* no client address */ /* no client address */
@ -2727,7 +2743,7 @@ static int tcpcheck_main(struct check *check)
xprt = xprt_get(XPRT_RAW); xprt = xprt_get(XPRT_RAW);
} }
conn_prepare(conn, proto, xprt); conn_prepare(conn, proto, xprt);
conn_install_mux(conn, &mux_pt_ops, conn); conn_install_mux(conn, &mux_pt_ops, cs);
ret = SF_ERR_INTERNAL; ret = SF_ERR_INTERNAL;
if (proto->connect) if (proto->connect)
@ -2860,8 +2876,8 @@ static int tcpcheck_main(struct check *check)
if (unlikely(check->result == CHK_RES_FAILED)) if (unlikely(check->result == CHK_RES_FAILED))
goto out_end_tcpcheck; goto out_end_tcpcheck;
__conn_xprt_want_recv(conn); __cs_want_recv(cs);
if (conn->xprt->rcv_buf(conn, check->bi, check->bi->size) <= 0) { if (conn->mux->rcv_buf(cs, check->bi, check->bi->size) <= 0) {
if (conn->flags & (CO_FL_ERROR | CO_FL_SOCK_RD_SH)) { if (conn->flags & (CO_FL_ERROR | CO_FL_SOCK_RD_SH)) {
done = 1; done = 1;
if ((conn->flags & CO_FL_ERROR) && !check->bi->i) { if ((conn->flags & CO_FL_ERROR) && !check->bi->i) {
@ -2958,7 +2974,7 @@ static int tcpcheck_main(struct check *check)
if (check->current_step->action == TCPCHK_ACT_EXPECT) if (check->current_step->action == TCPCHK_ACT_EXPECT)
goto tcpcheck_expect; goto tcpcheck_expect;
__conn_xprt_stop_recv(conn); __cs_stop_recv(cs);
} }
} }
else { else {
@ -2978,7 +2994,7 @@ static int tcpcheck_main(struct check *check)
if (check->current_step->action == TCPCHK_ACT_EXPECT) if (check->current_step->action == TCPCHK_ACT_EXPECT)
goto tcpcheck_expect; goto tcpcheck_expect;
__conn_xprt_stop_recv(conn); __cs_stop_recv(cs);
} }
/* not matched but was supposed to => ERROR */ /* not matched but was supposed to => ERROR */
else { else {
@ -3012,11 +3028,11 @@ static int tcpcheck_main(struct check *check)
/* warning, current_step may now point to the head */ /* warning, current_step may now point to the head */
if (check->bo->o) if (check->bo->o)
__conn_xprt_want_send(conn); __cs_want_send(cs);
if (&check->current_step->list != head && if (&check->current_step->list != head &&
check->current_step->action == TCPCHK_ACT_EXPECT) check->current_step->action == TCPCHK_ACT_EXPECT)
__conn_xprt_want_recv(conn); __cs_want_recv(cs);
return retcode; return retcode;
out_end_tcpcheck: out_end_tcpcheck:
@ -3030,7 +3046,7 @@ static int tcpcheck_main(struct check *check)
if (check->result == CHK_RES_FAILED) if (check->result == CHK_RES_FAILED)
conn->flags |= CO_FL_ERROR; conn->flags |= CO_FL_ERROR;
__conn_xprt_stop_both(conn); __cs_stop_both(cs);
return retcode; return retcode;
} }
@ -3049,7 +3065,6 @@ const char *init_check(struct check *check, int type)
return "out of memory while allocating check buffer"; return "out of memory while allocating check buffer";
} }
check->bo->size = global.tune.chksize; check->bo->size = global.tune.chksize;
return NULL; return NULL;
} }
@ -3059,8 +3074,10 @@ void free_check(struct check *check)
check->bi = NULL; check->bi = NULL;
free(check->bo); free(check->bo);
check->bo = NULL; check->bo = NULL;
free(check->conn); free(check->cs->conn);
check->conn = NULL; check->cs->conn = NULL;
cs_free(check->cs);
check->cs = NULL;
} }
void email_alert_free(struct email_alert *alert) void email_alert_free(struct email_alert *alert)

View File

@ -1268,7 +1268,7 @@ static int _getsocks(char **args, struct appctx *appctx, void *private)
struct cmsghdr *cmsg; struct cmsghdr *cmsg;
struct stream_interface *si = appctx->owner; struct stream_interface *si = appctx->owner;
struct stream *s = si_strm(si); struct stream *s = si_strm(si);
struct connection *remote = objt_conn(si_opposite(si)->end); struct connection *remote = cs_conn(objt_cs(si_opposite(si)->end));
struct msghdr msghdr; struct msghdr msghdr;
struct iovec iov; struct iovec iov;
struct timeval tv = { .tv_sec = 1, .tv_usec = 0 }; struct timeval tv = { .tv_sec = 1, .tv_usec = 0 };

View File

@ -101,7 +101,7 @@ int frontend_accept(struct stream *s)
/* try to report the ALPN value when available (also works for NPN) */ /* try to report the ALPN value when available (also works for NPN) */
if (conn && conn->owner == &s->si[0]) { if (conn && conn == cs_conn(objt_cs(s->si[0].end))) {
if (conn_get_alpn(conn, &alpn_str, &alpn_len) && alpn_str) { if (conn_get_alpn(conn, &alpn_str, &alpn_len) && alpn_str) {
int len = MIN(alpn_len, sizeof(alpn) - 1); int len = MIN(alpn_len, sizeof(alpn) - 1);
memcpy(alpn, alpn_str, len); memcpy(alpn, alpn_str, len);

View File

@ -1521,7 +1521,7 @@ __LJMP static struct hlua_socket *hlua_checksocket(lua_State *L, int ud)
static void hlua_socket_handler(struct appctx *appctx) static void hlua_socket_handler(struct appctx *appctx)
{ {
struct stream_interface *si = appctx->owner; struct stream_interface *si = appctx->owner;
struct connection *c = objt_conn(si_opposite(si)->end); struct connection *c = cs_conn(objt_cs(si_opposite(si)->end));
if (appctx->ctx.hlua_cosocket.die) { if (appctx->ctx.hlua_cosocket.die) {
si_shutw(si); si_shutw(si);
@ -2167,7 +2167,7 @@ __LJMP static int hlua_socket_getpeername(struct lua_State *L)
si = appctx->owner; si = appctx->owner;
s = si_strm(si); s = si_strm(si);
conn = objt_conn(s->si[1].end); conn = cs_conn(objt_cs(s->si[1].end));
if (!conn) { if (!conn) {
xref_unlock(&socket->xref, peer); xref_unlock(&socket->xref, peer);
lua_pushnil(L); lua_pushnil(L);
@ -2217,7 +2217,7 @@ static int hlua_socket_getsockname(struct lua_State *L)
si = appctx->owner; si = appctx->owner;
s = si_strm(si); s = si_strm(si);
conn = objt_conn(s->si[1].end); conn = cs_conn(objt_cs(s->si[1].end));
if (!conn) { if (!conn) {
xref_unlock(&socket->xref, peer); xref_unlock(&socket->xref, peer);
lua_pushnil(L); lua_pushnil(L);
@ -2346,7 +2346,7 @@ __LJMP static int hlua_socket_connect(struct lua_State *L)
s = si_strm(si); s = si_strm(si);
/* Initialise connection. */ /* Initialise connection. */
conn = si_alloc_conn(&s->si[1]); conn = cs_conn(si_alloc_cs(&s->si[1], NULL));
if (!conn) { if (!conn) {
xref_unlock(&socket->xref, peer); xref_unlock(&socket->xref, peer);
WILL_LJMP(luaL_error(L, "connect: internal error")); WILL_LJMP(luaL_error(L, "connect: internal error"));

View File

@ -1525,7 +1525,7 @@ int build_logline(struct stream *s, char *dst, size_t maxsize, struct list *list
break; break;
case LOG_FMT_BACKENDIP: // %bi case LOG_FMT_BACKENDIP: // %bi
conn = objt_conn(s->si[1].end); conn = cs_conn(objt_cs(s->si[1].end));
if (conn) if (conn)
ret = lf_ip(tmplog, (struct sockaddr *)&conn->addr.from, dst + maxsize - tmplog, tmp); ret = lf_ip(tmplog, (struct sockaddr *)&conn->addr.from, dst + maxsize - tmplog, tmp);
else else
@ -1538,7 +1538,7 @@ int build_logline(struct stream *s, char *dst, size_t maxsize, struct list *list
break; break;
case LOG_FMT_BACKENDPORT: // %bp case LOG_FMT_BACKENDPORT: // %bp
conn = objt_conn(s->si[1].end); conn = cs_conn(objt_cs(s->si[1].end));
if (conn) if (conn)
ret = lf_port(tmplog, (struct sockaddr *)&conn->addr.from, dst + maxsize - tmplog, tmp); ret = lf_port(tmplog, (struct sockaddr *)&conn->addr.from, dst + maxsize - tmplog, tmp);
else else
@ -1551,7 +1551,7 @@ int build_logline(struct stream *s, char *dst, size_t maxsize, struct list *list
break; break;
case LOG_FMT_SERVERIP: // %si case LOG_FMT_SERVERIP: // %si
conn = objt_conn(s->si[1].end); conn = cs_conn(objt_cs(s->si[1].end));
if (conn) if (conn)
ret = lf_ip(tmplog, (struct sockaddr *)&conn->addr.to, dst + maxsize - tmplog, tmp); ret = lf_ip(tmplog, (struct sockaddr *)&conn->addr.to, dst + maxsize - tmplog, tmp);
else else
@ -1564,7 +1564,7 @@ int build_logline(struct stream *s, char *dst, size_t maxsize, struct list *list
break; break;
case LOG_FMT_SERVERPORT: // %sp case LOG_FMT_SERVERPORT: // %sp
conn = objt_conn(s->si[1].end); conn = cs_conn(objt_cs(s->si[1].end));
if (conn) if (conn)
ret = lf_port(tmplog, (struct sockaddr *)&conn->addr.to, dst + maxsize - tmplog, tmp); ret = lf_port(tmplog, (struct sockaddr *)&conn->addr.to, dst + maxsize - tmplog, tmp);
else else

View File

@ -14,17 +14,31 @@
#include <proto/connection.h> #include <proto/connection.h>
#include <proto/stream.h> #include <proto/stream.h>
/* Initialize the mux once it's attached. If conn->mux_ctx is NULL, it is /* Initialize the mux once it's attached. It is expected that conn->mux_ctx
* assumed that no data layer has yet been instanciated so the mux is * points to the existing conn_stream (for outgoing connections) or NULL (for
* attached to an incoming connection and will instanciate a new stream. If * incoming ones, in which case one will be allocated and a new stream will be
* conn->mux_ctx exists, it is assumed that it is an outgoing connection * instanciated). Returns < 0 on error.
* requested for this context. Returns < 0 on error.
*/ */
static int mux_pt_init(struct connection *conn) static int mux_pt_init(struct connection *conn)
{ {
if (!conn->mux_ctx) struct conn_stream *cs = conn->mux_ctx;
return stream_create_from_conn(conn);
if (!cs) {
cs = cs_new(conn);
if (!cs)
goto fail;
if (stream_create_from_cs(cs) < 0)
goto fail_free;
conn->mux_ctx = cs;
}
return 0; return 0;
fail_free:
cs_free(cs);
fail:
return -1;
} }
/* callback to be used by default for the pass-through mux. It calls the data /* callback to be used by default for the pass-through mux. It calls the data
@ -32,7 +46,13 @@ static int mux_pt_init(struct connection *conn)
*/ */
static int mux_pt_wake(struct connection *conn) static int mux_pt_wake(struct connection *conn)
{ {
return conn->data->wake ? conn->data->wake(conn) : 0; struct conn_stream *cs = conn->mux_ctx;
int ret;
ret = cs->data_cb->wake ? cs->data_cb->wake(cs) : 0;
cs_update_mux_polling(cs);
return (ret);
} }
/* callback used to update the mux's polling flags after changing a cs' status. /* callback used to update the mux's polling flags after changing a cs' status.
@ -60,7 +80,12 @@ static void mux_pt_update_poll(struct conn_stream *cs)
*/ */
static void mux_pt_recv(struct connection *conn) static void mux_pt_recv(struct connection *conn)
{ {
conn->data->recv(conn); struct conn_stream *cs = conn->mux_ctx;
if (conn_xprt_read0_pending(conn))
cs->flags |= CS_FL_EOS;
cs->data_cb->recv(cs);
cs_update_mux_polling(cs);
} }
/* callback to be used by default for the pass-through mux. It simply calls the /* callback to be used by default for the pass-through mux. It simply calls the
@ -68,7 +93,10 @@ static void mux_pt_recv(struct connection *conn)
*/ */
static void mux_pt_send(struct connection *conn) static void mux_pt_send(struct connection *conn)
{ {
conn->data->send(conn); struct conn_stream *cs = conn->mux_ctx;
cs->data_cb->send(cs);
cs_update_mux_polling(cs);
} }
/* /*

View File

@ -1871,6 +1871,7 @@ static struct appctx *peer_session_create(struct peers *peers, struct peer *peer
struct session *sess; struct session *sess;
struct stream *s; struct stream *s;
struct connection *conn; struct connection *conn;
struct conn_stream *cs;
peer->reconnect = tick_add(now_ms, MS_TO_TICKS(5000)); peer->reconnect = tick_add(now_ms, MS_TO_TICKS(5000));
peer->statuscode = PEER_SESS_SC_CONNECTCODE; peer->statuscode = PEER_SESS_SC_CONNECTCODE;
@ -1912,9 +1913,12 @@ static struct appctx *peer_session_create(struct peers *peers, struct peer *peer
if (unlikely((conn = conn_new()) == NULL)) if (unlikely((conn = conn_new()) == NULL))
goto out_free_strm; goto out_free_strm;
if (unlikely((cs = cs_new(conn)) == NULL))
goto out_free_conn;
conn_prepare(conn, peer->proto, peer->xprt); conn_prepare(conn, peer->proto, peer->xprt);
conn_install_mux(conn, &mux_pt_ops, conn); conn_install_mux(conn, &mux_pt_ops, cs);
si_attach_conn(&s->si[1], conn); si_attach_cs(&s->si[1], cs);
conn->target = s->target = &s->be->obj_type; conn->target = s->target = &s->be->obj_type;
memcpy(&conn->addr.to, &peer->addr, sizeof(conn->addr.to)); memcpy(&conn->addr.to, &peer->addr, sizeof(conn->addr.to));
@ -1928,6 +1932,8 @@ static struct appctx *peer_session_create(struct peers *peers, struct peer *peer
return appctx; return appctx;
/* Error unrolling */ /* Error unrolling */
out_free_conn:
conn_free(conn);
out_free_strm: out_free_strm:
LIST_DEL(&s->list); LIST_DEL(&s->list);
pool_free2(pool2_stream, s); pool_free2(pool2_stream, s);

View File

@ -3662,7 +3662,7 @@ int http_process_request(struct stream *s, struct channel *req, int an_bit)
char *path; char *path;
/* Note that for now we don't reuse existing proxy connections */ /* Note that for now we don't reuse existing proxy connections */
if (unlikely((conn = si_alloc_conn(&s->si[1])) == NULL)) { if (unlikely((conn = cs_conn(si_alloc_cs(&s->si[1], NULL))) == NULL)) {
txn->req.err_state = txn->req.msg_state; txn->req.err_state = txn->req.msg_state;
txn->req.msg_state = HTTP_MSG_ERROR; txn->req.msg_state = HTTP_MSG_ERROR;
txn->status = 500; txn->status = 500;
@ -4212,6 +4212,7 @@ void http_end_txn_clean_session(struct stream *s)
int prev_status = s->txn->status; int prev_status = s->txn->status;
struct proxy *fe = strm_fe(s); struct proxy *fe = strm_fe(s);
struct proxy *be = s->be; struct proxy *be = s->be;
struct conn_stream *cs;
struct connection *srv_conn; struct connection *srv_conn;
struct server *srv; struct server *srv;
unsigned int prev_flags = s->txn->flags; unsigned int prev_flags = s->txn->flags;
@ -4221,7 +4222,14 @@ void http_end_txn_clean_session(struct stream *s)
* flags. We also need a more accurate method for computing per-request * flags. We also need a more accurate method for computing per-request
* data. * data.
*/ */
srv_conn = objt_conn(s->si[1].end); /*
* XXX cognet: This is probably wrong, this is killing a whole
* connection, in the new world order, we probably want to just kill
* the stream, this is to be revisited the day we handle multiple
* streams in one server connection.
*/
cs = objt_cs(s->si[1].end);
srv_conn = cs_conn(cs);
/* unless we're doing keep-alive, we want to quickly close the connection /* unless we're doing keep-alive, we want to quickly close the connection
* to the server. * to the server.
@ -4364,17 +4372,17 @@ void http_end_txn_clean_session(struct stream *s)
if (srv_conn && LIST_ISEMPTY(&srv_conn->list)) { if (srv_conn && LIST_ISEMPTY(&srv_conn->list)) {
srv = objt_server(srv_conn->target); srv = objt_server(srv_conn->target);
if (!srv) if (!srv)
si_idle_conn(&s->si[1], NULL); si_idle_cs(&s->si[1], NULL);
else if (srv_conn->flags & CO_FL_PRIVATE) else if (srv_conn->flags & CO_FL_PRIVATE)
si_idle_conn(&s->si[1], (srv->priv_conns ? &srv->priv_conns[tid] : NULL)); si_idle_cs(&s->si[1], (srv->priv_conns ? &srv->priv_conns[tid] : NULL));
else if (prev_flags & TX_NOT_FIRST) else if (prev_flags & TX_NOT_FIRST)
/* note: we check the request, not the connection, but /* note: we check the request, not the connection, but
* this is valid for strategies SAFE and AGGR, and in * this is valid for strategies SAFE and AGGR, and in
* case of ALWS, we don't care anyway. * case of ALWS, we don't care anyway.
*/ */
si_idle_conn(&s->si[1], (srv->safe_conns ? &srv->safe_conns[tid] : NULL)); si_idle_cs(&s->si[1], (srv->safe_conns ? &srv->safe_conns[tid] : NULL));
else else
si_idle_conn(&s->si[1], (srv->idle_conns ? &srv->idle_conns[tid] : NULL)); si_idle_cs(&s->si[1], (srv->idle_conns ? &srv->idle_conns[tid] : NULL));
} }
s->req.analysers = strm_li(s) ? strm_li(s)->analysers : 0; s->req.analysers = strm_li(s) ? strm_li(s)->analysers : 0;
s->res.analysers = 0; s->res.analysers = 0;
@ -7936,7 +7944,7 @@ void debug_hdr(const char *dir, struct stream *s, const char *start, const char
chunk_printf(&trash, "%08x:%s.%s[%04x:%04x]: ", s->uniq_id, s->be->id, chunk_printf(&trash, "%08x:%s.%s[%04x:%04x]: ", s->uniq_id, s->be->id,
dir, dir,
objt_conn(sess->origin) ? (unsigned short)objt_conn(sess->origin)->handle.fd : -1, objt_conn(sess->origin) ? (unsigned short)objt_conn(sess->origin)->handle.fd : -1,
objt_conn(s->si[1].end) ? (unsigned short)objt_conn(s->si[1].end)->handle.fd : -1); objt_cs(s->si[1].end) ? (unsigned short)objt_cs(s->si[1].end)->conn->handle.fd : -1);
for (max = 0; start + max < end; max++) for (max = 0; start + max < end; max++)
if (start[max] == '\r' || start[max] == '\n') if (start[max] == '\r' || start[max] == '\n')

View File

@ -1598,11 +1598,11 @@ static inline int get_tcp_info(const struct arg *args, struct sample *smp,
/* get the object associated with the stream interface.The /* get the object associated with the stream interface.The
* object can be other thing than a connection. For example, * object can be other thing than a connection. For example,
* it be a appctx. */ * it be a appctx. */
conn = objt_conn(smp->strm->si[dir].end); conn = cs_conn(objt_cs(smp->strm->si[dir].end));
if (!conn) if (!conn)
return 0; return 0;
/* The fd may not be avalaible for the tcp_info struct, and the /* The fd may not be available for the tcp_info struct, and the
syscal can fail. */ syscal can fail. */
optlen = sizeof(info); optlen = sizeof(info);
if (getsockopt(conn->handle.fd, SOL_TCP, TCP_INFO, &info, &optlen) == -1) if (getsockopt(conn->handle.fd, SOL_TCP, TCP_INFO, &info, &optlen) == -1)

View File

@ -74,11 +74,11 @@ static struct list service_keywords = LIST_HEAD_INIT(service_keywords);
* valid right after the handshake, before the connection's data layer is * valid right after the handshake, before the connection's data layer is
* initialized, because it relies on the session to be in conn->owner. * initialized, because it relies on the session to be in conn->owner.
*/ */
int stream_create_from_conn(struct connection *conn) int stream_create_from_cs(struct conn_stream *cs)
{ {
struct stream *strm; struct stream *strm;
strm = stream_new(conn->owner, &conn->obj_type); strm = stream_new(cs->conn->owner, &cs->obj_type);
if (strm == NULL) if (strm == NULL)
return -1; return -1;
@ -99,7 +99,7 @@ struct stream *stream_new(struct session *sess, enum obj_type *origin)
{ {
struct stream *s; struct stream *s;
struct task *t; struct task *t;
struct connection *conn = objt_conn(origin); struct conn_stream *cs = objt_cs(origin);
struct appctx *appctx = objt_appctx(origin); struct appctx *appctx = objt_appctx(origin);
if (unlikely((s = pool_alloc2(pool2_stream)) == NULL)) if (unlikely((s = pool_alloc2(pool2_stream)) == NULL))
@ -198,8 +198,8 @@ struct stream *stream_new(struct session *sess, enum obj_type *origin)
s->si[0].hcto = sess->fe->timeout.clientfin; s->si[0].hcto = sess->fe->timeout.clientfin;
/* attach the incoming connection to the stream interface now. */ /* attach the incoming connection to the stream interface now. */
if (conn) if (cs)
si_attach_conn(&s->si[0], conn); si_attach_cs(&s->si[0], cs);
else if (appctx) else if (appctx)
si_attach_appctx(&s->si[0], appctx); si_attach_appctx(&s->si[0], appctx);
@ -261,8 +261,8 @@ struct stream *stream_new(struct session *sess, enum obj_type *origin)
goto out_fail_accept; goto out_fail_accept;
/* finish initialization of the accepted file descriptor */ /* finish initialization of the accepted file descriptor */
if (conn) if (cs)
conn_xprt_want_recv(conn); cs_want_recv(cs);
else if (appctx) else if (appctx)
si_applet_want_get(&s->si[0]); si_applet_want_get(&s->si[0]);
@ -295,7 +295,8 @@ static void stream_free(struct stream *s)
struct session *sess = strm_sess(s); struct session *sess = strm_sess(s);
struct proxy *fe = sess->fe; struct proxy *fe = sess->fe;
struct bref *bref, *back; struct bref *bref, *back;
struct connection *cli_conn = objt_conn(s->si[0].end); struct conn_stream *cli_cs = objt_cs(s->si[0].end);
struct connection *cli_conn = cs_conn(cli_cs);
int i; int i;
if (s->pend_pos) if (s->pend_pos)
@ -343,6 +344,7 @@ static void stream_free(struct stream *s)
http_end_txn(s); http_end_txn(s);
/* ensure the client-side transport layer is destroyed */ /* ensure the client-side transport layer is destroyed */
/* XXX cognet: wrong for multiple streams in one connection */
if (cli_conn) { if (cli_conn) {
conn_stop_tracking(cli_conn); conn_stop_tracking(cli_conn);
conn_full_close(cli_conn); conn_full_close(cli_conn);
@ -577,7 +579,7 @@ static int sess_update_st_con_tcp(struct stream *s)
struct stream_interface *si = &s->si[1]; struct stream_interface *si = &s->si[1];
struct channel *req = &s->req; struct channel *req = &s->req;
struct channel *rep = &s->res; struct channel *rep = &s->res;
struct connection *srv_conn = __objt_conn(si->end); struct connection *srv_conn = __objt_cs(si->end)->conn;
/* If we got an error, or if nothing happened and the connection timed /* If we got an error, or if nothing happened and the connection timed
* out, we must give up. The CER state handler will take care of retry * out, we must give up. The CER state handler will take care of retry
@ -597,6 +599,9 @@ static int sess_update_st_con_tcp(struct stream *s)
si->exp = TICK_ETERNITY; si->exp = TICK_ETERNITY;
si->state = SI_ST_CER; si->state = SI_ST_CER;
/* XXX cognet: do we really want to kill the connection here ?
* Probably not for multiple streams.
*/
conn_full_close(srv_conn); conn_full_close(srv_conn);
if (si->err_type) if (si->err_type)
@ -647,7 +652,8 @@ static int sess_update_st_con_tcp(struct stream *s)
static int sess_update_st_cer(struct stream *s) static int sess_update_st_cer(struct stream *s)
{ {
struct stream_interface *si = &s->si[1]; struct stream_interface *si = &s->si[1];
struct connection *conn = objt_conn(si->end); struct conn_stream *cs = objt_cs(si->end);
struct connection *conn = cs_conn(cs);
/* we probably have to release last stream from the server */ /* we probably have to release last stream from the server */
if (objt_server(s->target)) { if (objt_server(s->target)) {
@ -812,7 +818,7 @@ static void sess_establish(struct stream *s)
req->flags |= CF_WAKE_ONCE; req->flags |= CF_WAKE_ONCE;
req->flags &= ~CF_WAKE_CONNECT; req->flags &= ~CF_WAKE_CONNECT;
} }
if (objt_conn(si->end)) { if (objt_cs(si->end)) {
/* real connections have timeouts */ /* real connections have timeouts */
req->wto = s->be->timeout.server; req->wto = s->be->timeout.server;
rep->rto = s->be->timeout.server; rep->rto = s->be->timeout.server;
@ -2111,8 +2117,8 @@ struct task *process_stream(struct task *t)
if (!(req->flags & (CF_KERN_SPLICING|CF_SHUTR)) && if (!(req->flags & (CF_KERN_SPLICING|CF_SHUTR)) &&
req->to_forward && req->to_forward &&
(global.tune.options & GTUNE_USE_SPLICE) && (global.tune.options & GTUNE_USE_SPLICE) &&
(objt_conn(si_f->end) && __objt_conn(si_f->end)->xprt && __objt_conn(si_f->end)->xprt->rcv_pipe) && (objt_cs(si_f->end) && __objt_cs(si_f->end)->conn->xprt && __objt_cs(si_f->end)->conn->xprt->rcv_pipe) &&
(objt_conn(si_b->end) && __objt_conn(si_b->end)->xprt && __objt_conn(si_b->end)->xprt->snd_pipe) && (objt_cs(si_b->end) && __objt_cs(si_b->end)->conn->xprt && __objt_cs(si_b->end)->conn->xprt->snd_pipe) &&
(pipes_used < global.maxpipes) && (pipes_used < global.maxpipes) &&
(((sess->fe->options2|s->be->options2) & PR_O2_SPLIC_REQ) || (((sess->fe->options2|s->be->options2) & PR_O2_SPLIC_REQ) ||
(((sess->fe->options2|s->be->options2) & PR_O2_SPLIC_AUT) && (((sess->fe->options2|s->be->options2) & PR_O2_SPLIC_AUT) &&
@ -2292,8 +2298,8 @@ struct task *process_stream(struct task *t)
if (!(res->flags & (CF_KERN_SPLICING|CF_SHUTR)) && if (!(res->flags & (CF_KERN_SPLICING|CF_SHUTR)) &&
res->to_forward && res->to_forward &&
(global.tune.options & GTUNE_USE_SPLICE) && (global.tune.options & GTUNE_USE_SPLICE) &&
(objt_conn(si_f->end) && __objt_conn(si_f->end)->xprt && __objt_conn(si_f->end)->xprt->snd_pipe) && (objt_cs(si_f->end) && __objt_cs(si_f->end)->conn->xprt && __objt_cs(si_f->end)->conn->xprt->snd_pipe) &&
(objt_conn(si_b->end) && __objt_conn(si_b->end)->xprt && __objt_conn(si_b->end)->xprt->rcv_pipe) && (objt_cs(si_b->end) && __objt_cs(si_b->end)->conn->xprt && __objt_cs(si_b->end)->conn->xprt->rcv_pipe) &&
(pipes_used < global.maxpipes) && (pipes_used < global.maxpipes) &&
(((sess->fe->options2|s->be->options2) & PR_O2_SPLIC_RTR) || (((sess->fe->options2|s->be->options2) & PR_O2_SPLIC_RTR) ||
(((sess->fe->options2|s->be->options2) & PR_O2_SPLIC_AUT) && (((sess->fe->options2|s->be->options2) & PR_O2_SPLIC_AUT) &&
@ -2361,8 +2367,8 @@ struct task *process_stream(struct task *t)
si_b->prev_state == SI_ST_EST) { si_b->prev_state == SI_ST_EST) {
chunk_printf(&trash, "%08x:%s.srvcls[%04x:%04x]\n", chunk_printf(&trash, "%08x:%s.srvcls[%04x:%04x]\n",
s->uniq_id, s->be->id, s->uniq_id, s->be->id,
objt_conn(si_f->end) ? (unsigned short)objt_conn(si_f->end)->handle.fd : -1, objt_cs(si_f->end) ? (unsigned short)objt_cs(si_f->end)->conn->handle.fd : -1,
objt_conn(si_b->end) ? (unsigned short)objt_conn(si_b->end)->handle.fd : -1); objt_cs(si_b->end) ? (unsigned short)objt_cs(si_b->end)->conn->handle.fd : -1);
shut_your_big_mouth_gcc(write(1, trash.str, trash.len)); shut_your_big_mouth_gcc(write(1, trash.str, trash.len));
} }
@ -2370,8 +2376,8 @@ struct task *process_stream(struct task *t)
si_f->prev_state == SI_ST_EST) { si_f->prev_state == SI_ST_EST) {
chunk_printf(&trash, "%08x:%s.clicls[%04x:%04x]\n", chunk_printf(&trash, "%08x:%s.clicls[%04x:%04x]\n",
s->uniq_id, s->be->id, s->uniq_id, s->be->id,
objt_conn(si_f->end) ? (unsigned short)objt_conn(si_f->end)->handle.fd : -1, objt_cs(si_f->end) ? (unsigned short)objt_cs(si_f->end)->conn->handle.fd : -1,
objt_conn(si_b->end) ? (unsigned short)objt_conn(si_b->end)->handle.fd : -1); objt_cs(si_b->end) ? (unsigned short)objt_cs(si_b->end)->conn->handle.fd : -1);
shut_your_big_mouth_gcc(write(1, trash.str, trash.len)); shut_your_big_mouth_gcc(write(1, trash.str, trash.len));
} }
} }
@ -2460,8 +2466,8 @@ struct task *process_stream(struct task *t)
(!(global.mode & MODE_QUIET) || (global.mode & MODE_VERBOSE)))) { (!(global.mode & MODE_QUIET) || (global.mode & MODE_VERBOSE)))) {
chunk_printf(&trash, "%08x:%s.closed[%04x:%04x]\n", chunk_printf(&trash, "%08x:%s.closed[%04x:%04x]\n",
s->uniq_id, s->be->id, s->uniq_id, s->be->id,
objt_conn(si_f->end) ? (unsigned short)objt_conn(si_f->end)->handle.fd : -1, objt_cs(si_f->end) ? (unsigned short)objt_cs(si_f->end)->conn->handle.fd : -1,
objt_conn(si_b->end) ? (unsigned short)objt_conn(si_b->end)->handle.fd : -1); objt_cs(si_b->end) ? (unsigned short)objt_cs(si_b->end)->conn->handle.fd : -1);
shut_your_big_mouth_gcc(write(1, trash.str, trash.len)); shut_your_big_mouth_gcc(write(1, trash.str, trash.len));
} }
@ -2692,6 +2698,7 @@ static int stats_dump_full_strm_to_buffer(struct stream_interface *si, struct st
struct tm tm; struct tm tm;
extern const char *monthname[12]; extern const char *monthname[12];
char pn[INET6_ADDRSTRLEN]; char pn[INET6_ADDRSTRLEN];
struct conn_stream *cs;
struct connection *conn; struct connection *conn;
struct appctx *tmpctx; struct appctx *tmpctx;
@ -2777,7 +2784,9 @@ static int stats_dump_full_strm_to_buffer(struct stream_interface *si, struct st
else else
chunk_appendf(&trash, " backend=<NONE> (id=-1 mode=-)"); chunk_appendf(&trash, " backend=<NONE> (id=-1 mode=-)");
conn = objt_conn(strm->si[1].end); cs = objt_cs(strm->si[1].end);
conn = cs_conn(cs);
if (conn) if (conn)
conn_get_from_addr(conn); conn_get_from_addr(conn);
@ -2869,14 +2878,16 @@ static int stats_dump_full_strm_to_buffer(struct stream_interface *si, struct st
TICKS_TO_MS(1000)) : "<NEVER>", TICKS_TO_MS(1000)) : "<NEVER>",
strm->si[1].err_type); strm->si[1].err_type);
if ((conn = objt_conn(strm->si[0].end)) != NULL) { if ((cs = objt_cs(strm->si[0].end)) != NULL) {
conn = cs->conn;
chunk_appendf(&trash, chunk_appendf(&trash,
" co0=%p ctrl=%s xprt=%s mux=%s data=%s target=%s:%p\n", " co0=%p ctrl=%s xprt=%s mux=%s data=%s target=%s:%p\n",
conn, conn,
conn_get_ctrl_name(conn), conn_get_ctrl_name(conn),
conn_get_xprt_name(conn), conn_get_xprt_name(conn),
conn_get_mux_name(conn), conn_get_mux_name(conn),
conn_get_data_name(conn), cs_get_data_name(cs),
obj_type_name(conn->target), obj_type_name(conn->target),
obj_base_ptr(conn->target)); obj_base_ptr(conn->target));
@ -2898,14 +2909,16 @@ static int stats_dump_full_strm_to_buffer(struct stream_interface *si, struct st
tmpctx->applet->name); tmpctx->applet->name);
} }
if ((conn = objt_conn(strm->si[1].end)) != NULL) { if ((cs = objt_cs(strm->si[1].end)) != NULL) {
conn = cs->conn;
chunk_appendf(&trash, chunk_appendf(&trash,
" co1=%p ctrl=%s xprt=%s mux=%s data=%s target=%s:%p\n", " co1=%p ctrl=%s xprt=%s mux=%s data=%s target=%s:%p\n",
conn, conn,
conn_get_ctrl_name(conn), conn_get_ctrl_name(conn),
conn_get_xprt_name(conn), conn_get_xprt_name(conn),
conn_get_mux_name(conn), conn_get_mux_name(conn),
conn_get_data_name(conn), cs_get_data_name(cs),
obj_type_name(conn->target), obj_type_name(conn->target),
obj_base_ptr(conn->target)); obj_base_ptr(conn->target));
@ -3171,7 +3184,7 @@ static int cli_io_handler_dump_sess(struct appctx *appctx)
human_time(TICKS_TO_MS(curr_strm->res.analyse_exp - now_ms), human_time(TICKS_TO_MS(curr_strm->res.analyse_exp - now_ms),
TICKS_TO_MS(1000)) : ""); TICKS_TO_MS(1000)) : "");
conn = objt_conn(curr_strm->si[0].end); conn = cs_conn(objt_cs(curr_strm->si[0].end));
chunk_appendf(&trash, chunk_appendf(&trash,
" s0=[%d,%1xh,fd=%d,ex=%s]", " s0=[%d,%1xh,fd=%d,ex=%s]",
curr_strm->si[0].state, curr_strm->si[0].state,
@ -3181,7 +3194,7 @@ static int cli_io_handler_dump_sess(struct appctx *appctx)
human_time(TICKS_TO_MS(curr_strm->si[0].exp - now_ms), human_time(TICKS_TO_MS(curr_strm->si[0].exp - now_ms),
TICKS_TO_MS(1000)) : ""); TICKS_TO_MS(1000)) : "");
conn = objt_conn(curr_strm->si[1].end); conn = cs_conn(objt_cs(curr_strm->si[1].end));
chunk_appendf(&trash, chunk_appendf(&trash,
" s1=[%d,%1xh,fd=%d,ex=%s]", " s1=[%d,%1xh,fd=%d,ex=%s]",
curr_strm->si[1].state, curr_strm->si[1].state,

View File

@ -30,6 +30,7 @@
#include <proto/applet.h> #include <proto/applet.h>
#include <proto/channel.h> #include <proto/channel.h>
#include <proto/connection.h> #include <proto/connection.h>
#include <proto/mux_pt.h>
#include <proto/pipe.h> #include <proto/pipe.h>
#include <proto/stream.h> #include <proto/stream.h>
#include <proto/stream_interface.h> #include <proto/stream_interface.h>
@ -50,11 +51,11 @@ static void stream_int_shutr_applet(struct stream_interface *si);
static void stream_int_shutw_applet(struct stream_interface *si); static void stream_int_shutw_applet(struct stream_interface *si);
static void stream_int_chk_rcv_applet(struct stream_interface *si); static void stream_int_chk_rcv_applet(struct stream_interface *si);
static void stream_int_chk_snd_applet(struct stream_interface *si); static void stream_int_chk_snd_applet(struct stream_interface *si);
static void si_conn_recv_cb(struct connection *conn); static void si_cs_recv_cb(struct conn_stream *cs);
static void si_conn_send_cb(struct connection *conn); static void si_cs_send_cb(struct conn_stream *cs);
static int si_conn_wake_cb(struct connection *conn); static int si_cs_wake_cb(struct conn_stream *cs);
static int si_idle_conn_wake_cb(struct connection *conn); static int si_idle_conn_wake_cb(struct conn_stream *cs);
static void si_idle_conn_null_cb(struct connection *conn); static void si_idle_conn_null_cb(struct conn_stream *cs);
/* stream-interface operations for embedded tasks */ /* stream-interface operations for embedded tasks */
struct si_ops si_embedded_ops = { struct si_ops si_embedded_ops = {
@ -83,9 +84,9 @@ struct si_ops si_applet_ops = {
}; };
struct data_cb si_conn_cb = { struct data_cb si_conn_cb = {
.recv = si_conn_recv_cb, .recv = si_cs_recv_cb,
.send = si_conn_send_cb, .send = si_cs_send_cb,
.wake = si_conn_wake_cb, .wake = si_cs_wake_cb,
.name = "STRM", .name = "STRM",
}; };
@ -337,8 +338,10 @@ int conn_si_send_proxy(struct connection *conn, unsigned int flag)
* we've sent the whole proxy line. Otherwise we use connect(). * we've sent the whole proxy line. Otherwise we use connect().
*/ */
while (conn->send_proxy_ofs) { while (conn->send_proxy_ofs) {
struct conn_stream *cs;
int ret; int ret;
cs = conn->mux_ctx;
/* The target server expects a PROXY line to be sent first. /* The target server expects a PROXY line to be sent first.
* If the send_proxy_ofs is negative, it corresponds to the * If the send_proxy_ofs is negative, it corresponds to the
* offset to start sending from then end of the proxy string * offset to start sending from then end of the proxy string
@ -348,10 +351,10 @@ int conn_si_send_proxy(struct connection *conn, unsigned int flag)
* is attached to a stream interface. Otherwise we can only * is attached to a stream interface. Otherwise we can only
* send a LOCAL line (eg: for use with health checks). * send a LOCAL line (eg: for use with health checks).
*/ */
if (conn->data == &si_conn_cb) { if (conn->mux == &mux_pt_ops && cs->data_cb == &si_conn_cb) {
struct stream_interface *si = conn->owner; struct stream_interface *si = cs->data;
struct connection *remote = objt_conn(si_opposite(si)->end); struct conn_stream *remote_cs = objt_cs(si_opposite(si)->end);
ret = make_proxy_line(trash.str, trash.size, objt_server(conn->target), remote); ret = make_proxy_line(trash.str, trash.size, objt_server(conn->target), remote_cs ? remote_cs->conn : NULL);
} }
else { else {
/* The target server expects a LOCAL line to be sent first. Retrieving /* The target server expects a LOCAL line to be sent first. Retrieving
@ -414,9 +417,9 @@ int conn_si_send_proxy(struct connection *conn, unsigned int flag)
* It simply sets the CO_FL_SOCK_RD_SH flag so that si_idle_conn_wake_cb() * It simply sets the CO_FL_SOCK_RD_SH flag so that si_idle_conn_wake_cb()
* is notified and can kill the connection. * is notified and can kill the connection.
*/ */
static void si_idle_conn_null_cb(struct connection *conn) static void si_idle_conn_null_cb(struct conn_stream *cs)
{ {
conn_sock_drain(conn); conn_sock_drain(cs->conn);
} }
/* Callback to be used by connection I/O handlers when some activity is detected /* Callback to be used by connection I/O handlers when some activity is detected
@ -424,9 +427,10 @@ static void si_idle_conn_null_cb(struct connection *conn)
* a close was detected on it. It returns 0 if it did nothing serious, or -1 if * a close was detected on it. It returns 0 if it did nothing serious, or -1 if
* it killed the connection. * it killed the connection.
*/ */
static int si_idle_conn_wake_cb(struct connection *conn) static int si_idle_conn_wake_cb(struct conn_stream *cs)
{ {
struct stream_interface *si = conn->owner; struct connection *conn = cs->conn;
struct stream_interface *si = cs->data;
if (!conn_ctrl_ready(conn)) if (!conn_ctrl_ready(conn))
return 0; return 0;
@ -560,9 +564,10 @@ void stream_int_notify(struct stream_interface *si)
* connection's polling based on the channels and stream interface's final * connection's polling based on the channels and stream interface's final
* states. The function always returns 0. * states. The function always returns 0.
*/ */
static int si_conn_wake_cb(struct connection *conn) static int si_cs_wake_cb(struct conn_stream *cs)
{ {
struct stream_interface *si = conn->owner; struct connection *conn = cs->conn;
struct stream_interface *si = cs->data;
struct channel *ic = si_ic(si); struct channel *ic = si_ic(si);
struct channel *oc = si_oc(si); struct channel *oc = si_oc(si);
@ -599,36 +604,37 @@ static int si_conn_wake_cb(struct connection *conn)
* was done above (eg: maybe some buffers got emptied). * was done above (eg: maybe some buffers got emptied).
*/ */
if (channel_is_empty(oc)) if (channel_is_empty(oc))
__conn_xprt_stop_send(conn); __cs_stop_send(cs);
if (si->flags & SI_FL_WAIT_ROOM) { if (si->flags & SI_FL_WAIT_ROOM) {
__conn_xprt_stop_recv(conn); __cs_stop_recv(cs);
} }
else if ((ic->flags & (CF_SHUTR|CF_READ_PARTIAL|CF_DONT_READ)) == CF_READ_PARTIAL && else if ((ic->flags & (CF_SHUTR|CF_READ_PARTIAL|CF_DONT_READ)) == CF_READ_PARTIAL &&
channel_may_recv(ic)) { channel_may_recv(ic)) {
__conn_xprt_want_recv(conn); __cs_want_recv(cs);
} }
return 0; return 0;
} }
/* /*
* This function is called to send buffer data to a stream socket. * This function is called to send buffer data to a stream socket.
* It calls the transport layer's snd_buf function. It relies on the * It calls the mux layer's snd_buf function. It relies on the
* caller to commit polling changes. The caller should check conn->flags * caller to commit polling changes. The caller should check conn->flags
* for errors. * for errors.
*/ */
static void si_conn_send(struct connection *conn) static void si_cs_send(struct conn_stream *cs)
{ {
struct stream_interface *si = conn->owner; struct connection *conn = cs->conn;
struct stream_interface *si = cs->data;
struct channel *oc = si_oc(si); struct channel *oc = si_oc(si);
int ret; int ret;
/* ensure it's only set if a write attempt has succeeded */ /* ensure it's only set if a write attempt has succeeded */
oc->flags &= ~CF_WRITE_PARTIAL; oc->flags &= ~CF_WRITE_PARTIAL;
if (oc->pipe && conn->xprt->snd_pipe) { if (oc->pipe && conn->xprt->snd_pipe && conn->mux->snd_pipe) {
ret = conn->xprt->snd_pipe(conn, oc->pipe); ret = conn->mux->snd_pipe(cs, oc->pipe);
if (ret > 0) if (ret > 0)
oc->flags |= CF_WRITE_PARTIAL | CF_WROTE_DATA; oc->flags |= CF_WRITE_PARTIAL | CF_WROTE_DATA;
@ -672,7 +678,7 @@ static void si_conn_send(struct connection *conn)
if (oc->flags & CF_STREAMER) if (oc->flags & CF_STREAMER)
send_flag |= CO_SFL_STREAMER; send_flag |= CO_SFL_STREAMER;
ret = conn->xprt->snd_buf(conn, oc->buf, send_flag); ret = conn->mux->snd_buf(cs, oc->buf, send_flag);
if (ret > 0) { if (ret > 0) {
oc->flags |= CF_WRITE_PARTIAL | CF_WROTE_DATA; oc->flags |= CF_WRITE_PARTIAL | CF_WROTE_DATA;
@ -766,25 +772,25 @@ void stream_int_update_conn(struct stream_interface *si)
{ {
struct channel *ic = si_ic(si); struct channel *ic = si_ic(si);
struct channel *oc = si_oc(si); struct channel *oc = si_oc(si);
struct connection *conn = __objt_conn(si->end); struct conn_stream *cs = __objt_cs(si->end);
if (!(ic->flags & CF_SHUTR)) { if (!(ic->flags & CF_SHUTR)) {
/* Read not closed */ /* Read not closed */
if ((ic->flags & CF_DONT_READ) || !channel_may_recv(ic)) if ((ic->flags & CF_DONT_READ) || !channel_may_recv(ic))
__conn_xprt_stop_recv(conn); __cs_stop_recv(cs);
else else
__conn_xprt_want_recv(conn); __cs_want_recv(cs);
} }
if (!(oc->flags & CF_SHUTW)) { if (!(oc->flags & CF_SHUTW)) {
/* Write not closed */ /* Write not closed */
if (channel_is_empty(oc)) if (channel_is_empty(oc))
__conn_xprt_stop_send(conn); __cs_stop_send(cs);
else else
__conn_xprt_want_send(conn); __cs_want_send(cs);
} }
conn_cond_update_xprt_polling(conn); cs_update_mux_polling(cs);
} }
/* /*
@ -799,7 +805,8 @@ void stream_int_update_conn(struct stream_interface *si)
*/ */
static void stream_int_shutr_conn(struct stream_interface *si) static void stream_int_shutr_conn(struct stream_interface *si)
{ {
struct connection *conn = __objt_conn(si->end); struct conn_stream *cs = __objt_cs(si->end);
struct connection *conn = cs->conn;
struct channel *ic = si_ic(si); struct channel *ic = si_ic(si);
ic->flags &= ~CF_SHUTR_NOW; ic->flags &= ~CF_SHUTR_NOW;
@ -813,6 +820,7 @@ static void stream_int_shutr_conn(struct stream_interface *si)
return; return;
if (si_oc(si)->flags & CF_SHUTW) { if (si_oc(si)->flags & CF_SHUTW) {
/* XXX: should just close cs ? */
conn_full_close(conn); conn_full_close(conn);
si->state = SI_ST_DIS; si->state = SI_ST_DIS;
si->exp = TICK_ETERNITY; si->exp = TICK_ETERNITY;
@ -823,7 +831,7 @@ static void stream_int_shutr_conn(struct stream_interface *si)
} }
else if (conn->ctrl) { else if (conn->ctrl) {
/* we want the caller to disable polling on this FD */ /* we want the caller to disable polling on this FD */
conn_xprt_stop_recv(conn); cs_stop_recv(cs);
} }
} }
@ -837,7 +845,8 @@ static void stream_int_shutr_conn(struct stream_interface *si)
*/ */
static void stream_int_shutw_conn(struct stream_interface *si) static void stream_int_shutw_conn(struct stream_interface *si)
{ {
struct connection *conn = __objt_conn(si->end); struct conn_stream *cs = __objt_cs(si->end);
struct connection *conn = cs->conn;
struct channel *ic = si_ic(si); struct channel *ic = si_ic(si);
struct channel *oc = si_oc(si); struct channel *oc = si_oc(si);
@ -865,13 +874,21 @@ static void stream_int_shutw_conn(struct stream_interface *si)
/* quick close, the socket is alredy shut anyway */ /* quick close, the socket is alredy shut anyway */
} }
else if (si->flags & SI_FL_NOLINGER) { else if (si->flags & SI_FL_NOLINGER) {
/* unclean data-layer shutdown */ /* unclean data-layer shutdown, typically an aborted request
conn_xprt_shutw_hard(conn); * or a forwarded shutdown from a client to a server due to
* option abortonclose. No need for the TLS layer to try to
* emit a shutdown message.
*/
cs_shutw_hard(cs);
} }
else { else {
/* clean data-layer shutdown */ /* clean data-layer shutdown. This only happens on the
conn_xprt_shutw(conn); * frontend side, or on the backend side when forwarding
conn_sock_shutw(conn); * a client close in TCP mode or in HTTP TUNNEL mode
* while option abortonclose is set. We want the TLS
* layer to try to signal it to the peer before we close.
*/
cs_shutw(cs);
/* If the stream interface is configured to disable half-open /* If the stream interface is configured to disable half-open
* connections, we'll skip the shutdown(), but only if the * connections, we'll skip the shutdown(), but only if the
@ -920,25 +937,23 @@ static void stream_int_shutw_conn(struct stream_interface *si)
static void stream_int_chk_rcv_conn(struct stream_interface *si) static void stream_int_chk_rcv_conn(struct stream_interface *si)
{ {
struct channel *ic = si_ic(si); struct channel *ic = si_ic(si);
struct connection *conn = __objt_conn(si->end); struct conn_stream *cs = __objt_cs(si->end);
if (unlikely(si->state > SI_ST_EST || (ic->flags & CF_SHUTR))) if (unlikely(si->state > SI_ST_EST || (ic->flags & CF_SHUTR)))
return; return;
conn_refresh_polling_flags(conn);
if ((ic->flags & CF_DONT_READ) || !channel_may_recv(ic)) { if ((ic->flags & CF_DONT_READ) || !channel_may_recv(ic)) {
/* stop reading */ /* stop reading */
if (!(ic->flags & CF_DONT_READ)) /* full */ if (!(ic->flags & CF_DONT_READ)) /* full */
si->flags |= SI_FL_WAIT_ROOM; si->flags |= SI_FL_WAIT_ROOM;
__conn_xprt_stop_recv(conn); __cs_stop_recv(cs);
} }
else { else {
/* (re)start reading */ /* (re)start reading */
si->flags &= ~SI_FL_WAIT_ROOM; si->flags &= ~SI_FL_WAIT_ROOM;
__conn_xprt_want_recv(conn); __cs_want_recv(cs);
} }
conn_cond_update_xprt_polling(conn); cs_update_mux_polling(cs);
} }
@ -950,7 +965,7 @@ static void stream_int_chk_rcv_conn(struct stream_interface *si)
static void stream_int_chk_snd_conn(struct stream_interface *si) static void stream_int_chk_snd_conn(struct stream_interface *si)
{ {
struct channel *oc = si_oc(si); struct channel *oc = si_oc(si);
struct connection *conn = __objt_conn(si->end); struct conn_stream *cs = __objt_cs(si->end);
/* ensure it's only set if a write attempt has succeeded */ /* ensure it's only set if a write attempt has succeeded */
oc->flags &= ~CF_WRITE_PARTIAL; oc->flags &= ~CF_WRITE_PARTIAL;
@ -965,7 +980,7 @@ static void stream_int_chk_snd_conn(struct stream_interface *si)
!(si->flags & SI_FL_WAIT_DATA)) /* not waiting for data */ !(si->flags & SI_FL_WAIT_DATA)) /* not waiting for data */
return; return;
if (conn->flags & CO_FL_XPRT_WR_ENA) { if (cs->flags & CS_FL_DATA_WR_ENA) {
/* already subscribed to write notifications, will be called /* already subscribed to write notifications, will be called
* anyway, so let's avoid calling it especially if the reader * anyway, so let's avoid calling it especially if the reader
* is not ready. * is not ready.
@ -973,16 +988,12 @@ static void stream_int_chk_snd_conn(struct stream_interface *si)
return; return;
} }
/* Before calling the data-level operations, we have to prepare __cs_want_send(cs);
* the polling flags to ensure we properly detect changes.
*/
conn_refresh_polling_flags(conn);
__conn_xprt_want_send(conn);
si_conn_send(conn); si_cs_send(cs);
if (conn->flags & CO_FL_ERROR) { if (cs->conn->flags & CO_FL_ERROR) {
/* Write error on the file descriptor */ /* Write error on the file descriptor */
__conn_xprt_stop_both(conn); __cs_stop_both(cs);
si->flags |= SI_FL_ERR; si->flags |= SI_FL_ERR;
goto out_wakeup; goto out_wakeup;
} }
@ -996,7 +1007,7 @@ static void stream_int_chk_snd_conn(struct stream_interface *si)
* ->o limit was reached. Maybe we just wrote the last * ->o limit was reached. Maybe we just wrote the last
* chunk and need to close. * chunk and need to close.
*/ */
__conn_xprt_stop_send(conn); __cs_stop_send(cs);
if (((oc->flags & (CF_SHUTW|CF_AUTO_CLOSE|CF_SHUTW_NOW)) == if (((oc->flags & (CF_SHUTW|CF_AUTO_CLOSE|CF_SHUTW_NOW)) ==
(CF_AUTO_CLOSE|CF_SHUTW_NOW)) && (CF_AUTO_CLOSE|CF_SHUTW_NOW)) &&
(si->state == SI_ST_EST)) { (si->state == SI_ST_EST)) {
@ -1012,7 +1023,7 @@ static void stream_int_chk_snd_conn(struct stream_interface *si)
/* Otherwise there are remaining data to be sent in the buffer, /* Otherwise there are remaining data to be sent in the buffer,
* which means we have to poll before doing so. * which means we have to poll before doing so.
*/ */
__conn_xprt_want_send(conn); __cs_want_send(cs);
si->flags &= ~SI_FL_WAIT_DATA; si->flags &= ~SI_FL_WAIT_DATA;
if (!tick_isset(oc->wex)) if (!tick_isset(oc->wex))
oc->wex = tick_add_ifset(now_ms, oc->wto); oc->wex = tick_add_ifset(now_ms, oc->wto);
@ -1052,17 +1063,18 @@ static void stream_int_chk_snd_conn(struct stream_interface *si)
} }
/* commit possible polling changes */ /* commit possible polling changes */
conn_cond_update_polling(conn); cs_update_mux_polling(cs);
} }
/* /*
* This is the callback which is called by the connection layer to receive data * This is the callback which is called by the connection layer to receive data
* into the buffer from the connection. It iterates over the transport layer's * into the buffer from the connection. It iterates over the mux layer's
* rcv_buf function. * rcv_buf function.
*/ */
static void si_conn_recv_cb(struct connection *conn) static void si_cs_recv_cb(struct conn_stream *cs)
{ {
struct stream_interface *si = conn->owner; struct connection *conn = cs->conn;
struct stream_interface *si = cs->data;
struct channel *ic = si_ic(si); struct channel *ic = si_ic(si);
int ret, max, cur_read; int ret, max, cur_read;
int read_poll = MAX_READ_POLL_LOOPS; int read_poll = MAX_READ_POLL_LOOPS;
@ -1081,7 +1093,7 @@ static void si_conn_recv_cb(struct connection *conn)
return; return;
/* stop here if we reached the end of data */ /* stop here if we reached the end of data */
if (conn_xprt_read0_pending(conn)) if (cs->flags & CS_FL_EOS)
goto out_shutdown_r; goto out_shutdown_r;
cur_read = 0; cur_read = 0;
@ -1101,7 +1113,7 @@ static void si_conn_recv_cb(struct connection *conn)
/* First, let's see if we may splice data across the channel without /* First, let's see if we may splice data across the channel without
* using a buffer. * using a buffer.
*/ */
if (conn->xprt->rcv_pipe && if (conn->xprt->rcv_pipe && conn->mux->rcv_pipe &&
(ic->pipe || ic->to_forward >= MIN_SPLICE_FORWARD) && (ic->pipe || ic->to_forward >= MIN_SPLICE_FORWARD) &&
ic->flags & CF_KERN_SPLICING) { ic->flags & CF_KERN_SPLICING) {
if (buffer_not_empty(ic->buf)) { if (buffer_not_empty(ic->buf)) {
@ -1120,7 +1132,7 @@ static void si_conn_recv_cb(struct connection *conn)
} }
} }
ret = conn->xprt->rcv_pipe(conn, ic->pipe, ic->to_forward); ret = conn->mux->rcv_pipe(cs, ic->pipe, ic->to_forward);
if (ret < 0) { if (ret < 0) {
/* splice not supported on this end, let's disable it */ /* splice not supported on this end, let's disable it */
ic->flags &= ~CF_KERN_SPLICING; ic->flags &= ~CF_KERN_SPLICING;
@ -1135,7 +1147,7 @@ static void si_conn_recv_cb(struct connection *conn)
ic->flags |= CF_READ_PARTIAL; ic->flags |= CF_READ_PARTIAL;
} }
if (conn_xprt_read0_pending(conn)) if (cs->flags & CS_FL_EOS)
goto out_shutdown_r; goto out_shutdown_r;
if (conn->flags & CO_FL_ERROR) if (conn->flags & CO_FL_ERROR)
@ -1146,7 +1158,7 @@ static void si_conn_recv_cb(struct connection *conn)
* could soon be full. Let's stop before needing to poll. * could soon be full. Let's stop before needing to poll.
*/ */
si->flags |= SI_FL_WAIT_ROOM; si->flags |= SI_FL_WAIT_ROOM;
__conn_xprt_stop_recv(conn); __cs_stop_recv(cs);
} }
/* splice not possible (anymore), let's go on on standard copy */ /* splice not possible (anymore), let's go on on standard copy */
@ -1177,7 +1189,7 @@ static void si_conn_recv_cb(struct connection *conn)
break; break;
} }
ret = conn->xprt->rcv_buf(conn, ic->buf, max); ret = conn->mux->rcv_buf(cs, ic->buf, max);
if (ret <= 0) if (ret <= 0)
break; break;
@ -1203,9 +1215,12 @@ static void si_conn_recv_cb(struct connection *conn)
} }
if ((ic->flags & CF_READ_DONTWAIT) || --read_poll <= 0) { if ((ic->flags & CF_READ_DONTWAIT) || --read_poll <= 0) {
if (__conn_xprt_done_recv(conn)) /*
si->flags |= SI_FL_WAIT_ROOM; * This used to be __conn_xprt_done_recv()
break; * This was changed to accomodate with the mux code,
* but we may have lost a worthwhile optimization.
*/
__cs_stop_recv(cs);
} }
/* if too many bytes were missing from last read, it means that /* if too many bytes were missing from last read, it means that
@ -1271,7 +1286,7 @@ static void si_conn_recv_cb(struct connection *conn)
if (conn->flags & CO_FL_ERROR) if (conn->flags & CO_FL_ERROR)
return; return;
if (conn_xprt_read0_pending(conn)) if (cs->flags & CS_FL_EOS)
/* connection closed */ /* connection closed */
goto out_shutdown_r; goto out_shutdown_r;
@ -1291,9 +1306,10 @@ static void si_conn_recv_cb(struct connection *conn)
* from the buffer to the connection. It iterates over the transport layer's * from the buffer to the connection. It iterates over the transport layer's
* snd_buf function. * snd_buf function.
*/ */
static void si_conn_send_cb(struct connection *conn) static void si_cs_send_cb(struct conn_stream *cs)
{ {
struct stream_interface *si = conn->owner; struct connection *conn = cs->conn;
struct stream_interface *si = cs->data;
if (conn->flags & CO_FL_ERROR) if (conn->flags & CO_FL_ERROR)
return; return;
@ -1307,7 +1323,7 @@ static void si_conn_send_cb(struct connection *conn)
return; return;
/* OK there are data waiting to be sent */ /* OK there are data waiting to be sent */
si_conn_send(conn); si_cs_send(cs);
/* OK all done */ /* OK all done */
return; return;
@ -1320,7 +1336,7 @@ static void si_conn_send_cb(struct connection *conn)
*/ */
void stream_sock_read0(struct stream_interface *si) void stream_sock_read0(struct stream_interface *si)
{ {
struct connection *conn = __objt_conn(si->end); struct conn_stream *cs = __objt_cs(si->end);
struct channel *ic = si_ic(si); struct channel *ic = si_ic(si);
struct channel *oc = si_oc(si); struct channel *oc = si_oc(si);
@ -1340,17 +1356,17 @@ void stream_sock_read0(struct stream_interface *si)
if (si->flags & SI_FL_NOHALF) { if (si->flags & SI_FL_NOHALF) {
/* we want to immediately forward this close to the write side */ /* we want to immediately forward this close to the write side */
/* force flag on ssl to keep stream in cache */ /* force flag on ssl to keep stream in cache */
conn_xprt_shutw_hard(conn); cs_shutw_hard(cs);
goto do_close; goto do_close;
} }
/* otherwise that's just a normal read shutdown */ /* otherwise that's just a normal read shutdown */
__conn_xprt_stop_recv(conn); __cs_stop_recv(cs);
return; return;
do_close: do_close:
/* OK we completely close the socket here just as if we went through si_shut[rw]() */ /* OK we completely close the socket here just as if we went through si_shut[rw]() */
conn_full_close(conn); conn_full_close(cs->conn);
ic->flags &= ~CF_SHUTR_NOW; ic->flags &= ~CF_SHUTR_NOW;
ic->flags |= CF_SHUTR; ic->flags |= CF_SHUTR;