CLEANUP: muxes: make mux->attach/detach take a conn_stream endpoint

The mux ->detach() function currently takes a conn_stream. This causes
an awkward situation where the caller cs_detach_endp() has to partially
mark it as released but not completely so that ->detach() finds its
endpoint and context, and it cannot be done later since it's possible
that ->detach() deletes the endpoint. As such the endpoint link between
the conn_stream and the mux's stream is in a transient situation while
we'd like it to be clean so that the mux's ->detach() code can call any
regular function it wants that knows the regular semantics of the
relation between the CS and the endpoint.

A better approach consists in slightly modifying the detach() API to
better match the reality, which is that the endpoint is detached but
still alive and that it's the only part the function is interested in.

As such, this patch modifies the function to take an endpoint there,
and by analogy (or simplicity) does the same for ->attach(), even
though it looks less important there since we're always attaching an
endpoint to a conn_stream anyway. It is possible that in the future
the API could evolve to use more endpoints that provide a bit more
flexibility in the API, but at this point we don't need to go further.
This commit is contained in:
Willy Tarreau 2022-05-10 19:18:52 +02:00
parent cfbfc3f091
commit 4201ab791d
8 changed files with 30 additions and 28 deletions

View File

@ -41,6 +41,7 @@
/* referenced below */
struct connection;
struct conn_stream;
struct cs_endpoint;
struct cs_info;
struct buffer;
struct proxy;
@ -394,9 +395,9 @@ struct mux_ops {
void (*shutr)(struct conn_stream *cs, enum co_shr_mode); /* shutr function */
void (*shutw)(struct conn_stream *cs, enum co_shw_mode); /* shutw function */
int (*attach)(struct connection *conn, struct conn_stream *, struct session *sess); /* attach a conn_stream to an outgoing connection */
int (*attach)(struct connection *conn, struct cs_endpoint *, struct session *sess); /* attach a conn_stream to an outgoing connection */
struct conn_stream *(*get_first_cs)(const struct connection *); /* retrieves any valid conn_stream from this connection */
void (*detach)(struct conn_stream *); /* Detach a conn_stream from an outgoing connection, when the request is done */
void (*detach)(struct cs_endpoint *); /* Detach a conn_stream from an outgoing connection, when the request is done */
int (*show_fd)(struct buffer *, struct connection *); /* append some data about connection into chunk for "show fd"; returns non-zero if suspicious */
int (*subscribe)(struct conn_stream *cs, int event_type, struct wait_event *es); /* Subscribe <es> to events, such as "being able to send" */
int (*unsubscribe)(struct conn_stream *cs, int event_type, struct wait_event *es); /* Unsubscribe <es> from events */

View File

@ -1574,7 +1574,7 @@ static int connect_server(struct stream *s)
}
if (avail >= 1) {
if (srv_conn->mux->attach(srv_conn, s->csb, s->sess) == -1) {
if (srv_conn->mux->attach(srv_conn, s->csb->endp, s->sess) == -1) {
srv_conn = NULL;
if (cs_reset_endp(s->csb) < 0)
return SF_ERR_INTERNAL;

View File

@ -363,15 +363,16 @@ static void cs_detach_endp(struct conn_stream **csp)
if (cs->endp->flags & CS_EP_T_MUX) {
struct connection *conn = __cs_conn(cs);
struct cs_endpoint *endp = cs->endp;
if (conn->mux) {
/* TODO: handle unsubscribe for healthchecks too */
if (cs->wait_event.events != 0)
conn->mux->unsubscribe(cs, cs->wait_event.events, &cs->wait_event);
cs->endp->flags |= CS_EP_ORPHAN;
cs->endp->cs = NULL;
conn->mux->detach(cs);
endp->flags |= CS_EP_ORPHAN;
endp->cs = NULL;
cs->endp = NULL;
conn->mux->detach(endp);
}
else {
/* It's too early to have a mux, let's just destroy

View File

@ -3521,13 +3521,13 @@ static size_t fcgi_strm_parse_response(struct fcgi_strm *fstrm, struct buffer *b
* Attach a new stream to a connection
* (Used for outgoing connections)
*/
static int fcgi_attach(struct connection *conn, struct conn_stream *cs, struct session *sess)
static int fcgi_attach(struct connection *conn, struct cs_endpoint *endp, struct session *sess)
{
struct fcgi_strm *fstrm;
struct fcgi_conn *fconn = conn->ctx;
TRACE_ENTER(FCGI_EV_FSTRM_NEW, conn);
fstrm = fcgi_conn_stream_new(fconn, cs, sess);
fstrm = fcgi_conn_stream_new(fconn, endp->cs, sess);
if (!fstrm)
goto err;
@ -3581,9 +3581,9 @@ static void fcgi_destroy(void *ctx)
/*
* Detach the stream from the connection and possibly release the connection.
*/
static void fcgi_detach(struct conn_stream *cs)
static void fcgi_detach(struct cs_endpoint *endp)
{
struct fcgi_strm *fstrm = __cs_mux(cs);
struct fcgi_strm *fstrm = endp->target;
struct fcgi_conn *fconn;
struct session *sess;

View File

@ -3303,7 +3303,7 @@ struct task *h1_timeout_task(struct task *t, void *context, unsigned int state)
* Attach a new stream to a connection
* (Used for outgoing connections)
*/
static int h1_attach(struct connection *conn, struct conn_stream *cs, struct session *sess)
static int h1_attach(struct connection *conn, struct cs_endpoint *endp, struct session *sess)
{
struct h1c *h1c = conn->ctx;
struct h1s *h1s;
@ -3314,7 +3314,7 @@ static int h1_attach(struct connection *conn, struct conn_stream *cs, struct ses
goto err;
}
h1s = h1c_bck_stream_new(h1c, cs, sess);
h1s = h1c_bck_stream_new(h1c, endp->cs, sess);
if (h1s == NULL) {
TRACE_ERROR("h1s creation failure", H1_EV_STRM_NEW|H1_EV_STRM_END|H1_EV_STRM_ERR, conn);
goto err;
@ -3357,9 +3357,9 @@ static void h1_destroy(void *ctx)
/*
* Detach the stream from the connection and possibly release the connection.
*/
static void h1_detach(struct conn_stream *cs)
static void h1_detach(struct cs_endpoint *endp)
{
struct h1s *h1s = __cs_mux(cs);
struct h1s *h1s = endp->target;
struct h1c *h1c;
struct session *sess;
int is_not_first;

View File

@ -4289,13 +4289,13 @@ do_leave:
* Attach a new stream to a connection
* (Used for outgoing connections)
*/
static int h2_attach(struct connection *conn, struct conn_stream *cs, struct session *sess)
static int h2_attach(struct connection *conn, struct cs_endpoint *endp, struct session *sess)
{
struct h2s *h2s;
struct h2c *h2c = conn->ctx;
TRACE_ENTER(H2_EV_H2S_NEW, conn);
h2s = h2c_bck_stream_new(h2c, cs, sess);
h2s = h2c_bck_stream_new(h2c, endp->cs, sess);
if (!h2s) {
TRACE_DEVEL("leaving on stream creation failure", H2_EV_H2S_NEW|H2_EV_H2S_ERR, conn);
return -1;
@ -4368,9 +4368,9 @@ static void h2_destroy(void *ctx)
/*
* Detach the stream from the connection and possibly release the connection.
*/
static void h2_detach(struct conn_stream *cs)
static void h2_detach(struct cs_endpoint *endp)
{
struct h2s *h2s = __cs_mux(cs);
struct h2s *h2s = endp->target;
struct h2c *h2c;
struct session *sess;

View File

@ -373,19 +373,19 @@ static int mux_pt_wake(struct connection *conn)
* Attach a new stream to a connection
* (Used for outgoing connections)
*/
static int mux_pt_attach(struct connection *conn, struct conn_stream *cs, struct session *sess)
static int mux_pt_attach(struct connection *conn, struct cs_endpoint *endp, struct session *sess)
{
struct mux_pt_ctx *ctx = conn->ctx;
TRACE_ENTER(PT_EV_STRM_NEW, conn);
if (ctx->wait_event.events)
conn->xprt->unsubscribe(ctx->conn, conn->xprt_ctx, SUB_RETRY_RECV, &ctx->wait_event);
if (cs_attach_mux(cs, ctx, conn) < 0)
if (cs_attach_mux(endp->cs, ctx, conn) < 0)
return -1;
ctx->endp = cs->endp;
ctx->endp = endp;
ctx->endp->flags |= CS_EP_RCV_MORE;
TRACE_LEAVE(PT_EV_STRM_NEW, conn, cs);
TRACE_LEAVE(PT_EV_STRM_NEW, conn, endp->cs);
return 0;
}
@ -417,12 +417,12 @@ static void mux_pt_destroy_meth(void *ctx)
/*
* Detach the stream from the connection and possibly release the connection.
*/
static void mux_pt_detach(struct conn_stream *cs)
static void mux_pt_detach(struct cs_endpoint *endp)
{
struct connection *conn = __cs_conn(cs);
struct connection *conn = endp->ctx;
struct mux_pt_ctx *ctx;
TRACE_ENTER(PT_EV_STRM_END, conn, cs);
TRACE_ENTER(PT_EV_STRM_END, conn, endp->cs);
ctx = conn->ctx;
@ -432,7 +432,7 @@ static void mux_pt_detach(struct conn_stream *cs)
conn->xprt->subscribe(conn, conn->xprt_ctx, SUB_RETRY_RECV, &ctx->wait_event);
} else {
/* There's no session attached to that connection, destroy it */
TRACE_DEVEL("killing dead connection", PT_EV_STRM_END, conn, cs);
TRACE_DEVEL("killing dead connection", PT_EV_STRM_END, conn, endp->cs);
mux_pt_destroy(ctx);
}

View File

@ -1220,9 +1220,9 @@ static void qc_destroy(void *ctx)
TRACE_LEAVE(QMUX_EV_QCC_END);
}
static void qc_detach(struct conn_stream *cs)
static void qc_detach(struct cs_endpoint *endp)
{
struct qcs *qcs = __cs_mux(cs);
struct qcs *qcs = __cs_mux(endp->cs);
struct qcc *qcc = qcs->qcc;
TRACE_ENTER(QMUX_EV_STRM_END, qcc->conn, qcs);