MEDIUM: mux-quic: implement RESET_STREAM emission

Implement functions to be able to reset a stream via RESET_STREAM.

If needed, a qcs instance is flagged with QC_SF_TO_RESET to schedule a
stream reset. This will interrupt all future send operations.

On stream emission, if a stream is flagged with QC_SF_TO_RESET, a
RESET_STREAM frame is generated and emitted to the transport layer. If
this operation succeeds, the stream is locally closed. If upper layer is
instantiated, error flag is set on it.
This commit is contained in:
Amaury Denoyelle 2022-07-04 11:44:38 +02:00
parent 20d1f84ce4
commit 843a1196b3
3 changed files with 67 additions and 1 deletions

View File

@ -107,6 +107,7 @@ struct qcc {
#define QC_SF_BLK_SFCTL 0x00000010 /* stream blocked due to stream flow control limit */
#define QC_SF_DEM_FULL 0x00000020 /* demux blocked on request channel buffer full */
#define QC_SF_READ_ABORTED 0x00000040 /* stream rejected by app layer */
#define QC_SF_TO_RESET 0x00000080 /* a RESET_STREAM must be sent */
/* Maximum size of stream Rx buffer. */
#define QC_S_RX_BUF_SZ (global.tune.bufsize - NCB_RESERVED_SZ)
@ -163,6 +164,8 @@ struct qcs {
struct wait_event wait_event;
struct wait_event *subs;
uint64_t err; /* error code to transmit via RESET_STREAM */
};
/* QUIC application layer operations */

View File

@ -22,6 +22,7 @@ void qcs_notify_recv(struct qcs *qcs);
void qcs_notify_send(struct qcs *qcs);
void qcc_emit_cc_app(struct qcc *qcc, int err);
void qcc_reset_stream(struct qcs *qcs, int err);
int qcc_recv(struct qcc *qcc, uint64_t id, uint64_t len, uint64_t offset,
char fin, char *data);
int qcc_recv_max_data(struct qcc *qcc, uint64_t max);

View File

@ -171,6 +171,8 @@ static struct qcs *qcs_new(struct qcc *qcc, uint64_t id, enum qcs_type type)
qcs->wait_event.events = 0;
qcs->subs = NULL;
qcs->err = 0;
out:
TRACE_LEAVE(QMUX_EV_QCS_NEW, qcc->conn, qcs);
return qcs;
@ -674,6 +676,20 @@ void qcc_emit_cc_app(struct qcc *qcc, int err)
tasklet_wakeup(qcc->wait_event.tasklet);
}
/* Prepare for the emission of RESET_STREAM on <qcs> with error code <err>. */
void qcc_reset_stream(struct qcs *qcs, int err)
{
struct qcc *qcc = qcs->qcc;
if ((qcs->flags & QC_SF_TO_RESET) || qcs_is_close_local(qcs))
return;
qcs->flags |= QC_SF_TO_RESET;
qcs->err = err;
tasklet_wakeup(qcc->wait_event.tasklet);
TRACE_DEVEL("reset stream", QMUX_EV_QCS_END, qcc->conn, qcs);
}
/* Handle a new STREAM frame for stream with id <id>. Payload is pointed by
* <data> with length <len> and represents the offset <offset>. <fin> is set if
* the QUIC frame FIN bit is set.
@ -1275,6 +1291,46 @@ static int qc_send_frames(struct qcc *qcc, struct list *frms)
return 0;
}
/* Emit a RESET_STREAM on <qcs>.
*
* Returns 0 if the frame has been successfully sent else non-zero.
*/
static int qcs_send_reset(struct qcs *qcs)
{
struct list frms = LIST_HEAD_INIT(frms);
struct quic_frame *frm;
TRACE_ENTER(QMUX_EV_QCS_SEND, qcs->qcc->conn, qcs);
frm = pool_zalloc(pool_head_quic_frame);
if (!frm)
return 1;
LIST_INIT(&frm->reflist);
frm->type = QUIC_FT_RESET_STREAM;
frm->reset_stream.id = qcs->id;
frm->reset_stream.app_error_code = qcs->err;
frm->reset_stream.final_size = qcs->tx.sent_offset;
LIST_APPEND(&frms, &frm->list);
if (qc_send_frames(qcs->qcc, &frms)) {
pool_free(pool_head_quic_frame, frm);
TRACE_DEVEL("cannot send RESET_STREAM", QMUX_EV_QCS_SEND, qcs->qcc->conn, qcs);
return 1;
}
if (qcs_sc(qcs)) {
se_fl_set_error(qcs->sd);
qcs_alert(qcs);
}
qcs_close_local(qcs);
qcs->flags &= ~QC_SF_TO_RESET;
TRACE_LEAVE(QMUX_EV_QCS_SEND, qcs->qcc->conn, qcs);
return 0;
}
/* Used internally by qc_send function. Proceed to send for <qcs>. This will
* transfer data from qcs buffer to its quic_stream counterpart. A STREAM frame
* is then generated and inserted in <frms> list.
@ -1378,6 +1434,12 @@ static int qc_send(struct qcc *qcc)
continue;
}
if (qcs->flags & QC_SF_TO_RESET) {
qcs_send_reset(qcs);
node = eb64_next(node);
continue;
}
if (qcs_is_close_local(qcs)) {
node = eb64_next(node);
continue;
@ -1834,7 +1896,7 @@ static size_t qc_snd_buf(struct stconn *sc, struct buffer *buf,
TRACE_ENTER(QMUX_EV_STRM_SEND, qcs->qcc->conn, qcs);
if (qcs_is_close_local(qcs)) {
if (qcs_is_close_local(qcs) || (qcs->flags & QC_SF_TO_RESET)) {
ret = count;
goto end;
}