MAJOR: conn-stream: Invert conn-stream endpoint and its context
This change is only significant for the multiplexer part. For the applets, the context and the endpoint are the same. Thus, there is no much change. For the multiplexer part, the connection was used to set the conn-stream endpoint and the mux's stream was the context. But it is a bit strange because once a mux is installed, it takes over the connection. In a wonderful world, the connection should be totally hidden behind the mux. The stream-interface and, in a lesser extent, the stream, still access the connection because that was inherited from the pre-multiplexer era. Now, the conn-stream endpoint is the mux's stream (an opaque entity for the conn-stream) and the connection is the context. Dedicated functions have been added to attached an applet or a mux to a conn-stream.
This commit is contained in:
parent
2479e5f775
commit
9388204db1
@ -95,11 +95,11 @@ struct conn_stream {
|
||||
enum obj_type obj_type; /* differentiates connection from applet context */
|
||||
/* 3 bytes hole here */
|
||||
unsigned int flags; /* CS_FL_* */
|
||||
enum obj_type *end; /* points to the end point (connection or appctx) */
|
||||
void *end; /* points to the end point (MUX stream or appctx) */
|
||||
enum obj_type *app; /* points to the applicative point (stream or check) */
|
||||
struct stream_interface *si;
|
||||
const struct data_cb *data_cb; /* data layer callbacks. Must be set before xprt->init() */
|
||||
void *ctx; /* mux-specific context */
|
||||
void *ctx; /* endpoint-specific context */
|
||||
};
|
||||
|
||||
|
||||
|
@ -36,7 +36,8 @@ struct check;
|
||||
|
||||
struct conn_stream *cs_new();
|
||||
void cs_free(struct conn_stream *cs);
|
||||
void cs_attach_endp(struct conn_stream *cs, enum obj_type *endp, void *ctx);
|
||||
void cs_attach_endp_mux(struct conn_stream *cs, void *endp, void *ctx);
|
||||
void cs_attach_endp_app(struct conn_stream *cs, void *endp, void *ctx);
|
||||
int cs_attach_app(struct conn_stream *cs, enum obj_type *app);
|
||||
void cs_detach_endp(struct conn_stream *cs);
|
||||
void cs_detach_app(struct conn_stream *cs);
|
||||
@ -55,13 +56,13 @@ static inline void cs_init(struct conn_stream *cs)
|
||||
cs->data_cb = NULL;
|
||||
}
|
||||
|
||||
/* Returns the connection from a cs if the endpoint is a connection. Otherwise
|
||||
/* Returns the connection from a cs if the endpoint is a mux stream. Otherwise
|
||||
* NULL is returned. __cs_conn() returns the connection without any control
|
||||
* while cs_conn() check the endpoint type.
|
||||
*/
|
||||
static inline struct connection *__cs_conn(const struct conn_stream *cs)
|
||||
{
|
||||
return __objt_conn(cs->end);
|
||||
return cs->ctx;
|
||||
}
|
||||
static inline struct connection *cs_conn(const struct conn_stream *cs)
|
||||
{
|
||||
@ -70,8 +71,8 @@ static inline struct connection *cs_conn(const struct conn_stream *cs)
|
||||
return NULL;
|
||||
}
|
||||
|
||||
/* Returns the mux of the connection from a cs if the endpoint is a
|
||||
* connection. Otherwise NULL is returned.
|
||||
/* Returns the mux ops of the connection from a cs if the endpoint is a
|
||||
* mux stream. Otherwise NULL is returned.
|
||||
*/
|
||||
static inline const struct mux_ops *cs_conn_mux(const struct conn_stream *cs)
|
||||
{
|
||||
@ -86,7 +87,7 @@ static inline const struct mux_ops *cs_conn_mux(const struct conn_stream *cs)
|
||||
*/
|
||||
static inline struct appctx *__cs_appctx(const struct conn_stream *cs)
|
||||
{
|
||||
return __objt_appctx(cs->end);
|
||||
return cs->end;
|
||||
}
|
||||
static inline struct appctx *cs_appctx(const struct conn_stream *cs)
|
||||
{
|
||||
|
@ -110,10 +110,8 @@ static inline struct conn_stream *qc_attach_cs(struct qcs *qcs, struct buffer *b
|
||||
struct conn_stream *cs = cs_new();
|
||||
if (!cs)
|
||||
return NULL;
|
||||
cs_attach_endp(cs, &qcs->qcc->conn->obj_type, qcs);
|
||||
|
||||
cs_attach_endp_mux(cs, qcs, qcs->qcc->conn);
|
||||
qcs->cs = cs;
|
||||
cs->ctx = qcs;
|
||||
stream_new(qcs->qcc->conn->owner, cs, buf);
|
||||
|
||||
++qcs->qcc->nb_cs;
|
||||
|
@ -1495,7 +1495,6 @@ static int connect_server(struct stream *s)
|
||||
}
|
||||
|
||||
if (avail >= 1) {
|
||||
cs_attach_endp(s->csb, &srv_conn->obj_type, srv_conn);
|
||||
if (srv_conn->mux->attach(srv_conn, s->csb, s->sess) == -1) {
|
||||
cs_detach_endp(s->csb);
|
||||
srv_conn = NULL;
|
||||
@ -1571,7 +1570,7 @@ skip_reuse:
|
||||
return SF_ERR_INTERNAL; /* how did we get there ? */
|
||||
}
|
||||
|
||||
cs_attach_endp(s->csb, &srv_conn->obj_type, srv_conn);
|
||||
cs_attach_endp_mux(s->csb, NULL, srv_conn);
|
||||
#if defined(USE_OPENSSL) && defined(TLSEXT_TYPE_application_layer_protocol_negotiation)
|
||||
if (!srv ||
|
||||
(srv->use_ssl != 1 || (!(srv->ssl_ctx.alpn_str) && !(srv->ssl_ctx.npn_str)) ||
|
||||
|
@ -43,33 +43,37 @@ void cs_free(struct conn_stream *cs)
|
||||
}
|
||||
|
||||
|
||||
/* Attaches a conn_stream to an endpoint and sets the endpoint ctx */
|
||||
void cs_attach_endp(struct conn_stream *cs, enum obj_type *endp, void *ctx)
|
||||
/* Attaches a conn_stream to an mux endpoint and sets the endpoint ctx */
|
||||
void cs_attach_endp_mux(struct conn_stream *cs, void *endp, void *ctx)
|
||||
{
|
||||
struct connection *conn;
|
||||
struct appctx *appctx;
|
||||
struct connection *conn = ctx;
|
||||
|
||||
cs->end = endp;
|
||||
cs->ctx = ctx;
|
||||
if ((conn = objt_conn(endp)) != NULL) {
|
||||
if (!conn->ctx)
|
||||
conn->ctx = cs;
|
||||
if (cs_strm(cs)) {
|
||||
cs->si->ops = &si_conn_ops;
|
||||
cs->data_cb = &si_conn_cb;
|
||||
}
|
||||
else if (cs_check(cs))
|
||||
cs->data_cb = &check_conn_cb;
|
||||
cs->flags |= CS_FL_ENDP_MUX;
|
||||
if (!conn->ctx)
|
||||
conn->ctx = cs;
|
||||
if (cs_strm(cs)) {
|
||||
cs->si->ops = &si_conn_ops;
|
||||
cs->data_cb = &si_conn_cb;
|
||||
}
|
||||
else if ((appctx = objt_appctx(endp)) != NULL) {
|
||||
appctx->owner = cs;
|
||||
if (cs->si) {
|
||||
cs->si->ops = &si_applet_ops;
|
||||
cs->data_cb = NULL;
|
||||
}
|
||||
cs->flags |= CS_FL_ENDP_APP;
|
||||
else if (cs_check(cs))
|
||||
cs->data_cb = &check_conn_cb;
|
||||
cs->flags |= CS_FL_ENDP_MUX;
|
||||
}
|
||||
|
||||
/* Attaches a conn_stream to an applet endpoint and sets the endpoint ctx */
|
||||
void cs_attach_endp_app(struct conn_stream *cs, void *endp, void *ctx)
|
||||
{
|
||||
struct appctx *appctx = endp;
|
||||
|
||||
cs->end = endp;
|
||||
cs->ctx = ctx;
|
||||
appctx->owner = cs;
|
||||
if (cs->si) {
|
||||
cs->si->ops = &si_applet_ops;
|
||||
cs->data_cb = NULL;
|
||||
}
|
||||
cs->flags |= CS_FL_ENDP_APP;
|
||||
}
|
||||
|
||||
/* Attaches a conn_stream to a app layer and sets the relevant callbacks */
|
||||
|
@ -917,7 +917,7 @@ static struct appctx *dns_session_create(struct dns_session *ds)
|
||||
if (!sockaddr_alloc(&cs_si(s->csb)->dst, &ds->dss->srv->addr, sizeof(ds->dss->srv->addr)))
|
||||
goto out_free_strm;
|
||||
|
||||
cs_attach_endp(cs, &appctx->obj_type, appctx);
|
||||
cs_attach_endp_app(cs, appctx, appctx);
|
||||
s->flags = SF_ASSIGNED|SF_ADDR_SET;
|
||||
cs_si(s->csb)->flags |= SI_FL_NOLINGER;
|
||||
|
||||
|
@ -2031,7 +2031,7 @@ spoe_create_appctx(struct spoe_config *conf)
|
||||
if ((strm = stream_new(sess, cs, &BUF_NULL)) == NULL)
|
||||
goto out_free_sess;
|
||||
|
||||
cs_attach_endp(cs, &appctx->obj_type, appctx);
|
||||
cs_attach_endp_app(cs, appctx, appctx);
|
||||
stream_set_backend(strm, conf->agent->b.be);
|
||||
|
||||
/* applet is waiting for data */
|
||||
|
2
src/h3.c
2
src/h3.c
@ -636,7 +636,7 @@ static int h3_resp_data_send(struct qcs *qcs, struct buffer *buf, size_t count)
|
||||
size_t h3_snd_buf(struct conn_stream *cs, struct buffer *buf, size_t count, int flags)
|
||||
{
|
||||
size_t total = 0;
|
||||
struct qcs *qcs = cs->ctx;
|
||||
struct qcs *qcs = cs->end;
|
||||
struct htx *htx;
|
||||
enum htx_blk_type btype;
|
||||
struct htx_blk *blk;
|
||||
|
@ -2975,7 +2975,7 @@ __LJMP static int hlua_socket_new(lua_State *L)
|
||||
goto out_fail_sess;
|
||||
}
|
||||
|
||||
cs_attach_endp(cs, &appctx->obj_type, appctx);
|
||||
cs_attach_endp_app(cs, appctx, appctx);
|
||||
|
||||
/* Initialise cross reference between stream and Lua socket object. */
|
||||
xref_create(&socket->xref, &appctx->ctx.hlua_cosocket.xref);
|
||||
|
@ -75,6 +75,7 @@ static int hq_interop_decode_qcs(struct qcs *qcs, int fin, void *ctx)
|
||||
if (!cs)
|
||||
return -1;
|
||||
|
||||
|
||||
b_del(rxbuf, b_data(rxbuf));
|
||||
b_free(&htx_buf);
|
||||
|
||||
@ -95,7 +96,7 @@ static struct buffer *mux_get_buf(struct qcs *qcs)
|
||||
static size_t hq_interop_snd_buf(struct conn_stream *cs, struct buffer *buf,
|
||||
size_t count, int flags)
|
||||
{
|
||||
struct qcs *qcs = cs->ctx;
|
||||
struct qcs *qcs = cs->end;
|
||||
struct htx *htx;
|
||||
enum htx_blk_type btype;
|
||||
struct htx_blk *blk;
|
||||
|
@ -529,7 +529,7 @@ struct appctx *httpclient_start(struct httpclient *hc)
|
||||
break;
|
||||
}
|
||||
|
||||
cs_attach_endp(cs, &appctx->obj_type, appctx);
|
||||
cs_attach_endp_app(cs, appctx, appctx);
|
||||
s->flags |= SF_ASSIGNED|SF_ADDR_SET;
|
||||
cs_si(s->csb)->flags |= SI_FL_NOLINGER;
|
||||
s->res.flags |= CF_READ_DONTWAIT;
|
||||
|
@ -1130,10 +1130,10 @@ static struct fcgi_strm *fcgi_conn_stream_new(struct fcgi_conn *fconn, struct co
|
||||
TRACE_ERROR("fstream allocation failure", FCGI_EV_FSTRM_NEW|FCGI_EV_FSTRM_END|FCGI_EV_FSTRM_ERR, fconn->conn);
|
||||
goto out;
|
||||
}
|
||||
|
||||
cs_attach_endp_mux(cs, fstrm, fconn->conn);
|
||||
fstrm->cs = cs;
|
||||
fstrm->sess = sess;
|
||||
cs->ctx = fstrm;
|
||||
cs->end = fstrm;
|
||||
fconn->nb_cs++;
|
||||
|
||||
TRACE_LEAVE(FCGI_EV_FSTRM_NEW, fconn->conn, fstrm);
|
||||
@ -3579,12 +3579,13 @@ static void fcgi_destroy(void *ctx)
|
||||
*/
|
||||
static void fcgi_detach(struct conn_stream *cs)
|
||||
{
|
||||
struct fcgi_strm *fstrm = cs->ctx;
|
||||
struct fcgi_strm *fstrm = cs->end;
|
||||
struct fcgi_conn *fconn;
|
||||
struct session *sess;
|
||||
|
||||
TRACE_ENTER(FCGI_EV_STRM_END, (fstrm ? fstrm->fconn->conn : NULL), fstrm);
|
||||
|
||||
cs->end = NULL;
|
||||
cs->ctx = NULL;
|
||||
if (!fstrm) {
|
||||
TRACE_LEAVE(FCGI_EV_STRM_END);
|
||||
@ -3853,7 +3854,7 @@ struct task *fcgi_deferred_shut(struct task *t, void *ctx, unsigned int state)
|
||||
/* shutr() called by the conn_stream (mux_ops.shutr) */
|
||||
static void fcgi_shutr(struct conn_stream *cs, enum cs_shr_mode mode)
|
||||
{
|
||||
struct fcgi_strm *fstrm = cs->ctx;
|
||||
struct fcgi_strm *fstrm = cs->end;
|
||||
|
||||
TRACE_POINT(FCGI_EV_STRM_SHUT, fstrm->fconn->conn, fstrm);
|
||||
if (cs->flags & CS_FL_KILL_CONN)
|
||||
@ -3868,7 +3869,7 @@ static void fcgi_shutr(struct conn_stream *cs, enum cs_shr_mode mode)
|
||||
/* shutw() called by the conn_stream (mux_ops.shutw) */
|
||||
static void fcgi_shutw(struct conn_stream *cs, enum cs_shw_mode mode)
|
||||
{
|
||||
struct fcgi_strm *fstrm = cs->ctx;
|
||||
struct fcgi_strm *fstrm = cs->end;
|
||||
|
||||
TRACE_POINT(FCGI_EV_STRM_SHUT, fstrm->fconn->conn, fstrm);
|
||||
if (cs->flags & CS_FL_KILL_CONN)
|
||||
@ -3884,7 +3885,7 @@ static void fcgi_shutw(struct conn_stream *cs, enum cs_shw_mode mode)
|
||||
*/
|
||||
static int fcgi_subscribe(struct conn_stream *cs, int event_type, struct wait_event *es)
|
||||
{
|
||||
struct fcgi_strm *fstrm = cs->ctx;
|
||||
struct fcgi_strm *fstrm = cs->end;
|
||||
struct fcgi_conn *fconn = fstrm->fconn;
|
||||
|
||||
BUG_ON(event_type & ~(SUB_RETRY_SEND|SUB_RETRY_RECV));
|
||||
@ -3910,7 +3911,7 @@ static int fcgi_subscribe(struct conn_stream *cs, int event_type, struct wait_ev
|
||||
*/
|
||||
static int fcgi_unsubscribe(struct conn_stream *cs, int event_type, struct wait_event *es)
|
||||
{
|
||||
struct fcgi_strm *fstrm = cs->ctx;
|
||||
struct fcgi_strm *fstrm = cs->end;
|
||||
struct fcgi_conn *fconn = fstrm->fconn;
|
||||
|
||||
BUG_ON(event_type & ~(SUB_RETRY_SEND|SUB_RETRY_RECV));
|
||||
@ -3946,7 +3947,7 @@ static int fcgi_unsubscribe(struct conn_stream *cs, int event_type, struct wait_
|
||||
*/
|
||||
static size_t fcgi_rcv_buf(struct conn_stream *cs, struct buffer *buf, size_t count, int flags)
|
||||
{
|
||||
struct fcgi_strm *fstrm = cs->ctx;
|
||||
struct fcgi_strm *fstrm = cs->end;
|
||||
struct fcgi_conn *fconn = fstrm->fconn;
|
||||
size_t ret = 0;
|
||||
|
||||
@ -3990,7 +3991,7 @@ static size_t fcgi_rcv_buf(struct conn_stream *cs, struct buffer *buf, size_t co
|
||||
*/
|
||||
static size_t fcgi_snd_buf(struct conn_stream *cs, struct buffer *buf, size_t count, int flags)
|
||||
{
|
||||
struct fcgi_strm *fstrm = cs->ctx;
|
||||
struct fcgi_strm *fstrm = cs->end;
|
||||
struct fcgi_conn *fconn = fstrm->fconn;
|
||||
size_t total = 0;
|
||||
size_t ret;
|
||||
|
26
src/mux_h1.c
26
src/mux_h1.c
@ -724,7 +724,7 @@ static struct conn_stream *h1s_new_cs(struct h1s *h1s, struct buffer *input)
|
||||
TRACE_ERROR("CS allocation failure", H1_EV_STRM_NEW|H1_EV_STRM_END|H1_EV_STRM_ERR, h1c->conn, h1s);
|
||||
goto err;
|
||||
}
|
||||
cs_attach_endp(cs, &h1c->conn->obj_type, h1s);
|
||||
cs_attach_endp_mux(cs, h1s, h1c->conn);
|
||||
h1s->cs = cs;
|
||||
|
||||
if (h1s->flags & H1S_F_NOT_FIRST)
|
||||
@ -848,10 +848,10 @@ static struct h1s *h1c_bck_stream_new(struct h1c *h1c, struct conn_stream *cs, s
|
||||
if (!h1s)
|
||||
goto fail;
|
||||
|
||||
cs_attach_endp_mux(cs, h1s, h1c->conn);
|
||||
h1s->flags |= H1S_F_RX_BLK;
|
||||
h1s->cs = cs;
|
||||
h1s->sess = sess;
|
||||
cs->ctx = h1s;
|
||||
|
||||
h1c->flags = (h1c->flags & ~H1C_F_ST_EMBRYONIC) | H1C_F_ST_ATTACHED | H1C_F_ST_READY;
|
||||
|
||||
@ -995,9 +995,8 @@ static int h1_init(struct connection *conn, struct proxy *proxy, struct session
|
||||
|
||||
if (!h1c_frt_stream_new(h1c))
|
||||
goto fail;
|
||||
|
||||
h1c->h1s->cs = cs;
|
||||
cs->ctx = h1c->h1s;
|
||||
cs_attach_endp_mux(cs, h1c->h1s, conn);
|
||||
|
||||
/* Attach the CS but Not ready yet */
|
||||
h1c->flags = (h1c->flags & ~H1C_F_ST_EMBRYONIC) | H1C_F_ST_ATTACHED;
|
||||
@ -3338,13 +3337,14 @@ static void h1_destroy(void *ctx)
|
||||
*/
|
||||
static void h1_detach(struct conn_stream *cs)
|
||||
{
|
||||
struct h1s *h1s = cs->ctx;
|
||||
struct h1s *h1s = cs->end;
|
||||
struct h1c *h1c;
|
||||
struct session *sess;
|
||||
int is_not_first;
|
||||
|
||||
TRACE_ENTER(H1_EV_STRM_END, h1s ? h1s->h1c->conn : NULL, h1s);
|
||||
|
||||
cs->end = NULL;
|
||||
cs->ctx = NULL;
|
||||
if (!h1s) {
|
||||
TRACE_LEAVE(H1_EV_STRM_END);
|
||||
@ -3447,7 +3447,7 @@ static void h1_detach(struct conn_stream *cs)
|
||||
|
||||
static void h1_shutr(struct conn_stream *cs, enum cs_shr_mode mode)
|
||||
{
|
||||
struct h1s *h1s = cs->ctx;
|
||||
struct h1s *h1s = cs->end;
|
||||
struct h1c *h1c;
|
||||
|
||||
if (!h1s)
|
||||
@ -3490,7 +3490,7 @@ static void h1_shutr(struct conn_stream *cs, enum cs_shr_mode mode)
|
||||
|
||||
static void h1_shutw(struct conn_stream *cs, enum cs_shw_mode mode)
|
||||
{
|
||||
struct h1s *h1s = cs->ctx;
|
||||
struct h1s *h1s = cs->end;
|
||||
struct h1c *h1c;
|
||||
|
||||
if (!h1s)
|
||||
@ -3550,7 +3550,7 @@ static void h1_shutw_conn(struct connection *conn)
|
||||
*/
|
||||
static int h1_unsubscribe(struct conn_stream *cs, int event_type, struct wait_event *es)
|
||||
{
|
||||
struct h1s *h1s = cs->ctx;
|
||||
struct h1s *h1s = cs->end;
|
||||
|
||||
if (!h1s)
|
||||
return 0;
|
||||
@ -3579,7 +3579,7 @@ static int h1_unsubscribe(struct conn_stream *cs, int event_type, struct wait_ev
|
||||
*/
|
||||
static int h1_subscribe(struct conn_stream *cs, int event_type, struct wait_event *es)
|
||||
{
|
||||
struct h1s *h1s = cs->ctx;
|
||||
struct h1s *h1s = cs->end;
|
||||
struct h1c *h1c;
|
||||
|
||||
if (!h1s)
|
||||
@ -3627,7 +3627,7 @@ static int h1_subscribe(struct conn_stream *cs, int event_type, struct wait_even
|
||||
*/
|
||||
static size_t h1_rcv_buf(struct conn_stream *cs, struct buffer *buf, size_t count, int flags)
|
||||
{
|
||||
struct h1s *h1s = cs->ctx;
|
||||
struct h1s *h1s = cs->end;
|
||||
struct h1c *h1c = h1s->h1c;
|
||||
struct h1m *h1m = (!(h1c->flags & H1C_F_IS_BACK) ? &h1s->req : &h1s->res);
|
||||
size_t ret = 0;
|
||||
@ -3663,7 +3663,7 @@ static size_t h1_rcv_buf(struct conn_stream *cs, struct buffer *buf, size_t coun
|
||||
/* Called from the upper layer, to send data */
|
||||
static size_t h1_snd_buf(struct conn_stream *cs, struct buffer *buf, size_t count, int flags)
|
||||
{
|
||||
struct h1s *h1s = cs->ctx;
|
||||
struct h1s *h1s = cs->end;
|
||||
struct h1c *h1c;
|
||||
size_t total = 0;
|
||||
|
||||
@ -3728,7 +3728,7 @@ static size_t h1_snd_buf(struct conn_stream *cs, struct buffer *buf, size_t coun
|
||||
/* Send and get, using splicing */
|
||||
static int h1_rcv_pipe(struct conn_stream *cs, struct pipe *pipe, unsigned int count)
|
||||
{
|
||||
struct h1s *h1s = cs->ctx;
|
||||
struct h1s *h1s = cs->end;
|
||||
struct h1c *h1c = h1s->h1c;
|
||||
struct h1m *h1m = (!(h1c->flags & H1C_F_IS_BACK) ? &h1s->req : &h1s->res);
|
||||
int ret = 0;
|
||||
@ -3798,7 +3798,7 @@ static int h1_rcv_pipe(struct conn_stream *cs, struct pipe *pipe, unsigned int c
|
||||
|
||||
static int h1_snd_pipe(struct conn_stream *cs, struct pipe *pipe)
|
||||
{
|
||||
struct h1s *h1s = cs->ctx;
|
||||
struct h1s *h1s = cs->end;
|
||||
struct h1c *h1c = h1s->h1c;
|
||||
struct h1m *h1m = (!(h1c->flags & H1C_F_IS_BACK) ? &h1s->res : &h1s->req);
|
||||
int ret = 0;
|
||||
|
19
src/mux_h2.c
19
src/mux_h2.c
@ -1606,7 +1606,7 @@ static struct h2s *h2c_frt_stream_new(struct h2c *h2c, int id, struct buffer *in
|
||||
if (!cs)
|
||||
goto out_close;
|
||||
cs->flags |= CS_FL_NOT_FIRST;
|
||||
cs_attach_endp(cs, &h2c->conn->obj_type, h2s);
|
||||
cs_attach_endp_mux(cs, h2s, h2c->conn);
|
||||
h2s->cs = cs;
|
||||
h2c->nb_cs++;
|
||||
|
||||
@ -1676,9 +1676,9 @@ static struct h2s *h2c_bck_stream_new(struct h2c *h2c, struct conn_stream *cs, s
|
||||
if (!h2s)
|
||||
goto out;
|
||||
|
||||
cs_attach_endp_mux(cs, h2s, h2c->conn);
|
||||
h2s->cs = cs;
|
||||
h2s->sess = sess;
|
||||
cs->ctx = h2s;
|
||||
h2c->nb_cs++;
|
||||
|
||||
out:
|
||||
@ -4349,12 +4349,13 @@ static void h2_destroy(void *ctx)
|
||||
*/
|
||||
static void h2_detach(struct conn_stream *cs)
|
||||
{
|
||||
struct h2s *h2s = cs->ctx;
|
||||
struct h2s *h2s = cs->end;
|
||||
struct h2c *h2c;
|
||||
struct session *sess;
|
||||
|
||||
TRACE_ENTER(H2_EV_STRM_END, h2s ? h2s->h2c->conn : NULL, h2s);
|
||||
|
||||
cs->end = NULL;
|
||||
cs->ctx = NULL;
|
||||
if (!h2s) {
|
||||
TRACE_LEAVE(H2_EV_STRM_END);
|
||||
@ -4669,7 +4670,7 @@ struct task *h2_deferred_shut(struct task *t, void *ctx, unsigned int state)
|
||||
/* shutr() called by the conn_stream (mux_ops.shutr) */
|
||||
static void h2_shutr(struct conn_stream *cs, enum cs_shr_mode mode)
|
||||
{
|
||||
struct h2s *h2s = cs->ctx;
|
||||
struct h2s *h2s = cs->end;
|
||||
|
||||
TRACE_ENTER(H2_EV_STRM_SHUT, h2s->h2c->conn, h2s);
|
||||
if (cs->flags & CS_FL_KILL_CONN)
|
||||
@ -4684,7 +4685,7 @@ static void h2_shutr(struct conn_stream *cs, enum cs_shr_mode mode)
|
||||
/* shutw() called by the conn_stream (mux_ops.shutw) */
|
||||
static void h2_shutw(struct conn_stream *cs, enum cs_shw_mode mode)
|
||||
{
|
||||
struct h2s *h2s = cs->ctx;
|
||||
struct h2s *h2s = cs->end;
|
||||
|
||||
TRACE_ENTER(H2_EV_STRM_SHUT, h2s->h2c->conn, h2s);
|
||||
if (cs->flags & CS_FL_KILL_CONN)
|
||||
@ -6361,7 +6362,7 @@ static size_t h2s_make_trailers(struct h2s *h2s, struct htx *htx)
|
||||
*/
|
||||
static int h2_subscribe(struct conn_stream *cs, int event_type, struct wait_event *es)
|
||||
{
|
||||
struct h2s *h2s = cs->ctx;
|
||||
struct h2s *h2s = cs->end;
|
||||
struct h2c *h2c = h2s->h2c;
|
||||
|
||||
TRACE_ENTER(H2_EV_STRM_SEND|H2_EV_STRM_RECV, h2c->conn, h2s);
|
||||
@ -6395,7 +6396,7 @@ static int h2_subscribe(struct conn_stream *cs, int event_type, struct wait_even
|
||||
*/
|
||||
static int h2_unsubscribe(struct conn_stream *cs, int event_type, struct wait_event *es)
|
||||
{
|
||||
struct h2s *h2s = cs->ctx;
|
||||
struct h2s *h2s = cs->end;
|
||||
|
||||
TRACE_ENTER(H2_EV_STRM_SEND|H2_EV_STRM_RECV, h2s->h2c->conn, h2s);
|
||||
|
||||
@ -6435,7 +6436,7 @@ static int h2_unsubscribe(struct conn_stream *cs, int event_type, struct wait_ev
|
||||
*/
|
||||
static size_t h2_rcv_buf(struct conn_stream *cs, struct buffer *buf, size_t count, int flags)
|
||||
{
|
||||
struct h2s *h2s = cs->ctx;
|
||||
struct h2s *h2s = cs->end;
|
||||
struct h2c *h2c = h2s->h2c;
|
||||
struct htx *h2s_htx = NULL;
|
||||
struct htx *buf_htx = NULL;
|
||||
@ -6518,7 +6519,7 @@ static size_t h2_rcv_buf(struct conn_stream *cs, struct buffer *buf, size_t coun
|
||||
*/
|
||||
static size_t h2_snd_buf(struct conn_stream *cs, struct buffer *buf, size_t count, int flags)
|
||||
{
|
||||
struct h2s *h2s = cs->ctx;
|
||||
struct h2s *h2s = cs->end;
|
||||
size_t total = 0;
|
||||
size_t ret;
|
||||
struct htx *htx;
|
||||
|
@ -296,7 +296,7 @@ static int mux_pt_init(struct connection *conn, struct proxy *prx, struct sessio
|
||||
TRACE_ERROR("CS allocation failure", PT_EV_STRM_NEW|PT_EV_STRM_END|PT_EV_STRM_ERR, conn);
|
||||
goto fail_free_ctx;
|
||||
}
|
||||
cs_attach_endp(cs, &conn->obj_type, NULL);
|
||||
cs_attach_endp_mux(cs, ctx, conn);
|
||||
|
||||
if (!stream_new(conn->owner, cs, &BUF_NULL)) {
|
||||
TRACE_ERROR("stream creation failure", PT_EV_STRM_NEW|PT_EV_STRM_END|PT_EV_STRM_ERR, conn, cs);
|
||||
@ -372,6 +372,7 @@ static int mux_pt_attach(struct connection *conn, struct conn_stream *cs, struct
|
||||
TRACE_ENTER(PT_EV_STRM_NEW, conn);
|
||||
if (ctx->wait_event.events)
|
||||
conn->xprt->unsubscribe(ctx->conn, conn->xprt_ctx, SUB_RETRY_RECV, &ctx->wait_event);
|
||||
cs_attach_endp_mux(cs, ctx, conn);
|
||||
ctx->cs = cs;
|
||||
cs->flags |= CS_FL_RCV_MORE;
|
||||
|
||||
@ -414,6 +415,9 @@ static void mux_pt_detach(struct conn_stream *cs)
|
||||
|
||||
TRACE_ENTER(PT_EV_STRM_END, conn, cs);
|
||||
|
||||
cs->end = NULL;
|
||||
cs->ctx = NULL;
|
||||
|
||||
/* Subscribe, to know if we got disconnected */
|
||||
if (!conn_is_back(conn) && conn->owner != NULL &&
|
||||
!(conn->flags & (CO_FL_ERROR | CO_FL_SOCK_RD_SH | CO_FL_SOCK_WR_SH))) {
|
||||
|
@ -1056,7 +1056,7 @@ static int qc_init(struct connection *conn, struct proxy *prx,
|
||||
|
||||
static void qc_detach(struct conn_stream *cs)
|
||||
{
|
||||
struct qcs *qcs = cs->ctx;
|
||||
struct qcs *qcs = cs->end;
|
||||
struct qcc *qcc = qcs->qcc;
|
||||
|
||||
TRACE_ENTER(QMUX_EV_STRM_END, qcc->conn, qcs);
|
||||
@ -1092,7 +1092,7 @@ static void qc_detach(struct conn_stream *cs)
|
||||
static size_t qc_rcv_buf(struct conn_stream *cs, struct buffer *buf,
|
||||
size_t count, int flags)
|
||||
{
|
||||
struct qcs *qcs = cs->ctx;
|
||||
struct qcs *qcs = cs->end;
|
||||
struct htx *qcs_htx = NULL;
|
||||
struct htx *cs_htx = NULL;
|
||||
size_t ret = 0;
|
||||
@ -1160,7 +1160,7 @@ static size_t qc_rcv_buf(struct conn_stream *cs, struct buffer *buf,
|
||||
static size_t qc_snd_buf(struct conn_stream *cs, struct buffer *buf,
|
||||
size_t count, int flags)
|
||||
{
|
||||
struct qcs *qcs = cs->ctx;
|
||||
struct qcs *qcs = cs->end;
|
||||
size_t ret;
|
||||
|
||||
TRACE_ENTER(QMUX_EV_STRM_SEND, qcs->qcc->conn, qcs);
|
||||
@ -1180,7 +1180,7 @@ static size_t qc_snd_buf(struct conn_stream *cs, struct buffer *buf,
|
||||
static int qc_subscribe(struct conn_stream *cs, int event_type,
|
||||
struct wait_event *es)
|
||||
{
|
||||
return qcs_subscribe(cs->ctx, event_type, es);
|
||||
return qcs_subscribe(cs->end, event_type, es);
|
||||
}
|
||||
|
||||
/* Called from the upper layer, to unsubscribe <es> from events <event_type>.
|
||||
@ -1189,7 +1189,7 @@ static int qc_subscribe(struct conn_stream *cs, int event_type,
|
||||
*/
|
||||
static int qc_unsubscribe(struct conn_stream *cs, int event_type, struct wait_event *es)
|
||||
{
|
||||
struct qcs *qcs = cs->ctx;
|
||||
struct qcs *qcs = cs->end;
|
||||
|
||||
BUG_ON(event_type & ~(SUB_RETRY_SEND|SUB_RETRY_RECV));
|
||||
BUG_ON(qcs->subs && qcs->subs != es);
|
||||
|
@ -3224,7 +3224,7 @@ static struct appctx *peer_session_create(struct peers *peers, struct peer *peer
|
||||
if (!sockaddr_alloc(&(cs_si(s->csb)->dst), &peer->addr, sizeof(peer->addr)))
|
||||
goto out_free_strm;
|
||||
|
||||
cs_attach_endp(cs, &appctx->obj_type, appctx);
|
||||
cs_attach_endp_app(cs, appctx, appctx);
|
||||
s->flags = SF_ASSIGNED|SF_ADDR_SET;
|
||||
cs_si(s->csb)->flags |= SI_FL_NOLINGER;
|
||||
|
||||
|
@ -671,7 +671,7 @@ static struct appctx *sink_forward_session_create(struct sink *sink, struct sink
|
||||
if (!sockaddr_alloc(&cs_si(s->csb)->dst, &sft->srv->addr, sizeof(sft->srv->addr)))
|
||||
goto out_free_strm;
|
||||
|
||||
cs_attach_endp(cs, &appctx->obj_type, appctx);
|
||||
cs_attach_endp_app(cs, appctx, appctx);
|
||||
s->flags = SF_ASSIGNED|SF_ADDR_SET;
|
||||
cs_si(s->csb)->flags |= SI_FL_NOLINGER;
|
||||
|
||||
|
@ -988,7 +988,7 @@ enum act_return process_use_service(struct act_rule *rule, struct proxy *px,
|
||||
if (unlikely(!appctx))
|
||||
return ACT_RET_ERR;
|
||||
|
||||
/* Initialise the context. */
|
||||
/* Finish initialisation of the context. */
|
||||
memset(&appctx->ctx, 0, sizeof(appctx->ctx));
|
||||
appctx->rule = rule;
|
||||
if (appctx->applet->init && !appctx->applet->init(appctx))
|
||||
@ -3275,7 +3275,7 @@ static int stats_dump_full_strm_to_buffer(struct stream_interface *si, struct st
|
||||
strm->csb->si->err_type, strm->csb->si->wait_event.events);
|
||||
|
||||
cs = strm->csf;
|
||||
chunk_appendf(&trash, " cs=%p csf=0x%08x ctx=%p\n", cs, cs->flags, cs->ctx);
|
||||
chunk_appendf(&trash, " cs=%p csf=0x%08x endp=%p\n", cs, cs->flags, cs->end);
|
||||
|
||||
if ((conn = cs_conn(cs)) != NULL) {
|
||||
chunk_appendf(&trash,
|
||||
@ -3311,7 +3311,7 @@ static int stats_dump_full_strm_to_buffer(struct stream_interface *si, struct st
|
||||
}
|
||||
|
||||
cs = strm->csb;
|
||||
chunk_appendf(&trash, " cs=%p csf=0x%08x ctx=%p\n", cs, cs->flags, cs->ctx);
|
||||
chunk_appendf(&trash, " cs=%p csf=0x%08x end=%p\n", cs, cs->flags, cs->end);
|
||||
if ((conn = cs_conn(cs)) != NULL) {
|
||||
chunk_appendf(&trash,
|
||||
" co1=%p ctrl=%s xprt=%s mux=%s data=%s target=%s:%p\n",
|
||||
|
@ -342,7 +342,7 @@ struct appctx *si_register_handler(struct stream_interface *si, struct applet *a
|
||||
appctx = appctx_new(app, si->cs);
|
||||
if (!appctx)
|
||||
return NULL;
|
||||
cs_attach_endp(si->cs, &appctx->obj_type, appctx);
|
||||
cs_attach_endp_app(si->cs, appctx, appctx);
|
||||
appctx->t->nice = si_strm(si)->task->nice;
|
||||
si_cant_get(si);
|
||||
appctx_wakeup(appctx);
|
||||
|
@ -1101,7 +1101,7 @@ enum tcpcheck_eval_ret tcpcheck_eval_connect(struct check *check, struct tcpchec
|
||||
TRACE_ERROR("conn-stream allocation error", CHK_EV_TCPCHK_CONN|CHK_EV_TCPCHK_ERR, check);
|
||||
goto out;
|
||||
}
|
||||
cs_attach_endp(check->cs, &conn->obj_type, conn);
|
||||
cs_attach_endp_mux(check->cs, NULL, conn);
|
||||
tasklet_set_tid(check->wait_list.tasklet, tid);
|
||||
conn_set_owner(conn, check->sess, NULL);
|
||||
|
||||
|
Loading…
x
Reference in New Issue
Block a user