From fe035eca3a24ea0f031fdcdad23809bea5de32e4 Mon Sep 17 00:00:00 2001 From: Amaury Denoyelle Date: Wed, 6 Apr 2022 15:46:30 +0200 Subject: [PATCH] MEDIUM: mux-quic: report errors on conn-streams Complete the error reporting. For each attached streams, if CO_FL_ERROR is set, mark them with CS_FL_ERR_PENDING|CS_FL_ERROR. This will notify the upper layer to trigger streams detach and release of the MUX. This reporting is implemented in a new function qc_wake_some_streams(), called by qc_wake(). This ensures that a lower-layer error is quickly reported to the individual streams. --- include/haproxy/mux_quic.h | 1 + src/mux_quic.c | 41 +++++++++++++++++++++++++++++++++++++- 2 files changed, 41 insertions(+), 1 deletion(-) diff --git a/include/haproxy/mux_quic.h b/include/haproxy/mux_quic.h index c01d1560e..44dfe1efd 100644 --- a/include/haproxy/mux_quic.h +++ b/include/haproxy/mux_quic.h @@ -112,6 +112,7 @@ static inline struct conn_stream *qc_attach_cs(struct qcs *qcs, struct buffer *b return NULL; cs_attach_endp(cs, &qcs->qcc->conn->obj_type, qcs); + qcs->cs = cs; cs->ctx = qcs; stream_new(qcs->qcc->conn->owner, cs, buf); diff --git a/src/mux_quic.c b/src/mux_quic.c index 243fb09b2..cdac9a6f5 100644 --- a/src/mux_quic.c +++ b/src/mux_quic.c @@ -1068,7 +1068,8 @@ static void qc_detach(struct conn_stream *cs) --qcc->nb_cs; - if (b_data(&qcs->tx.buf) || qcs->tx.offset > qcs->tx.sent_offset) { + if ((b_data(&qcs->tx.buf) || qcs->tx.offset > qcs->tx.sent_offset) && + !(qcc->conn->flags & CO_FL_ERROR)) { TRACE_DEVEL("leaving with remaining data, detaching qcs", QMUX_EV_STRM_END, qcc->conn, qcs); qcs->flags |= QC_SF_DETACH; return; @@ -1203,6 +1204,42 @@ static int qc_unsubscribe(struct conn_stream *cs, int event_type, struct wait_ev return 0; } +/* Loop through all qcs from . If CO_FL_ERROR is set on the connection, + * report CS_FL_ERR_PENDING|CS_FL_ERROR on the attached conn-streams and wake + * them. + */ +static int qc_wake_some_streams(struct qcc *qcc) +{ + struct qc_stream_desc *stream; + struct qcs *qcs; + struct eb64_node *node; + + for (node = eb64_first(&qcc->streams_by_id); node; + node = eb64_next(node)) { + stream = eb64_entry(node, struct qc_stream_desc, by_id); + qcs = stream->ctx; + + if (!qcs->cs) + continue; + + if (qcc->conn->flags & CO_FL_ERROR) { + qcs->cs->flags |= CS_FL_ERR_PENDING; + if (qcs->cs->flags & CS_FL_EOS) + qcs->cs->flags |= CS_FL_ERROR; + + if (qcs->subs) { + qcs_notify_recv(qcs); + qcs_notify_send(qcs); + } + else if (qcs->cs->data_cb->wake) { + qcs->cs->data_cb->wake(qcs->cs); + } + } + } + + return 0; +} + static int qc_wake(struct connection *conn) { struct qcc *qcc = conn->ctx; @@ -1220,6 +1257,8 @@ static int qc_wake(struct connection *conn) qc_send(qcc); + qc_wake_some_streams(qcc); + if (qcc_is_dead(qcc)) goto release;