MEDIUM: mux-quic/h3/qpack: use ncbuf for uni streams
This commit is the equivalent for uni-streams of previous commit MEDIUM: mux-quic/h3/hq-interop: use ncbuf for bidir streams All unidirectional streams data is now handle in MUX Rx ncbuf. The obsolete buffer is not unused and will be cleared in the following patches.
This commit is contained in:
parent
1290f1ebfb
commit
3db98e9d13
31
src/h3.c
31
src/h3.c
@ -334,12 +334,12 @@ static int h3_decode_qcs(struct qcs *qcs, int fin, void *ctx)
|
||||
* <rxbuf> buffer. This function does not update this buffer.
|
||||
* Returns 0 if something wrong happened, 1 if not.
|
||||
*/
|
||||
static int h3_parse_settings_frm(struct h3 *h3, const struct buffer *rxbuf, size_t flen)
|
||||
static int h3_parse_settings_frm(struct h3 *h3, const struct ncbuf *rxbuf, size_t flen)
|
||||
{
|
||||
uint64_t id, value;
|
||||
const unsigned char *buf, *end;
|
||||
|
||||
buf = (const unsigned char *)b_head(rxbuf);
|
||||
buf = (const unsigned char *)ncb_head(rxbuf);
|
||||
end = buf + flen;
|
||||
|
||||
while (buf <= end) {
|
||||
@ -376,20 +376,20 @@ static int h3_parse_settings_frm(struct h3 *h3, const struct buffer *rxbuf, size
|
||||
*/
|
||||
static int h3_control_recv(struct h3_uqs *h3_uqs, void *ctx)
|
||||
{
|
||||
struct buffer *rxbuf = &h3_uqs->qcs->rx.buf;
|
||||
struct ncbuf *rxbuf = &h3_uqs->qcs->rx.ncbuf;
|
||||
struct h3 *h3 = ctx;
|
||||
|
||||
h3_debug_printf(stderr, "%s STREAM ID: %lu\n", __func__, h3_uqs->qcs->id);
|
||||
if (!b_data(rxbuf))
|
||||
if (!ncb_data(rxbuf, 0))
|
||||
return 1;
|
||||
|
||||
while (b_data(rxbuf)) {
|
||||
while (ncb_data(rxbuf, 0)) {
|
||||
size_t hlen;
|
||||
uint64_t ftype, flen;
|
||||
struct buffer b;
|
||||
|
||||
/* Work on a copy of <rxbuf> */
|
||||
b = b_make(rxbuf->area, rxbuf->size, rxbuf->head, rxbuf->data);
|
||||
b = h3_b_dup(rxbuf);
|
||||
hlen = h3_decode_frm_header(&ftype, &flen, &b);
|
||||
if (!hlen)
|
||||
break;
|
||||
@ -399,7 +399,8 @@ static int h3_control_recv(struct h3_uqs *h3_uqs, void *ctx)
|
||||
if (flen > b_data(&b))
|
||||
break;
|
||||
|
||||
b_del(rxbuf, hlen);
|
||||
ncb_advance(rxbuf, hlen);
|
||||
h3_uqs->qcs->rx.offset += hlen;
|
||||
/* From here, a frame must not be truncated */
|
||||
switch (ftype) {
|
||||
case H3_FT_CANCEL_PUSH:
|
||||
@ -423,14 +424,15 @@ static int h3_control_recv(struct h3_uqs *h3_uqs, void *ctx)
|
||||
h3->err = H3_FRAME_UNEXPECTED;
|
||||
return 0;
|
||||
}
|
||||
b_del(rxbuf, flen);
|
||||
ncb_advance(rxbuf, flen);
|
||||
h3_uqs->qcs->rx.offset += flen;
|
||||
}
|
||||
|
||||
/* Handle the case where remaining data are present in the buffer. This
|
||||
* can happen if there is an incomplete frame. In this case, subscribe
|
||||
* on the lower layer to restart receive operation.
|
||||
*/
|
||||
if (b_data(rxbuf))
|
||||
if (ncb_data(rxbuf, 0))
|
||||
qcs_subscribe(h3_uqs->qcs, SUB_RETRY_RECV, &h3_uqs->wait_event);
|
||||
|
||||
return 1;
|
||||
@ -773,12 +775,19 @@ static int h3_attach_ruqs(struct qcs *qcs, void *ctx)
|
||||
{
|
||||
uint64_t strm_type;
|
||||
struct h3 *h3 = ctx;
|
||||
struct buffer *rxbuf = &qcs->rx.buf;
|
||||
struct ncbuf *rxbuf = &qcs->rx.ncbuf;
|
||||
struct buffer b;
|
||||
size_t len = 0;
|
||||
|
||||
b = h3_b_dup(rxbuf);
|
||||
|
||||
/* First octets: the uni-stream type */
|
||||
if (!b_quic_dec_int(&strm_type, rxbuf, NULL) || strm_type > H3_UNI_STRM_TP_MAX)
|
||||
if (!b_quic_dec_int(&strm_type, &b, &len) || strm_type > H3_UNI_STRM_TP_MAX)
|
||||
return 0;
|
||||
|
||||
ncb_advance(rxbuf, len);
|
||||
qcs->rx.offset += len;
|
||||
|
||||
/* Note that for all the uni-streams below, this is an error to receive two times the
|
||||
* same type of uni-stream (even for Push stream which is not supported at this time.
|
||||
*/
|
||||
|
@ -27,6 +27,7 @@
|
||||
#include <haproxy/buf.h>
|
||||
#include <haproxy/chunk.h>
|
||||
#include <haproxy/h3.h>
|
||||
#include <haproxy/ncbuf.h>
|
||||
#include <haproxy/qpack-t.h>
|
||||
#include <haproxy/qpack-dec.h>
|
||||
#include <haproxy/qpack-tbl.h>
|
||||
@ -96,19 +97,19 @@ static uint64_t qpack_get_varint(const unsigned char **buf, uint64_t *len_in, in
|
||||
int qpack_decode_enc(struct h3_uqs *h3_uqs, void *ctx)
|
||||
{
|
||||
size_t len;
|
||||
struct buffer *rxbuf;
|
||||
struct ncbuf *rxbuf;
|
||||
unsigned char inst;
|
||||
|
||||
rxbuf = &h3_uqs->qcs->rx.buf;
|
||||
len = b_data(rxbuf);
|
||||
qpack_debug_hexdump(stderr, "[QPACK-DEC-ENC] ", b_head(rxbuf), 0, len);
|
||||
rxbuf = &h3_uqs->qcs->rx.ncbuf;
|
||||
len = ncb_data(rxbuf, 0);
|
||||
qpack_debug_hexdump(stderr, "[QPACK-DEC-ENC] ", ncb_head(rxbuf), 0, len);
|
||||
|
||||
if (!len) {
|
||||
qpack_debug_printf(stderr, "[QPACK-DEC-ENC] empty stream\n");
|
||||
return 0;
|
||||
}
|
||||
|
||||
inst = (unsigned char)*b_head(rxbuf) & QPACK_ENC_INST_BITMASK;
|
||||
inst = (unsigned char)*ncb_head(rxbuf) & QPACK_ENC_INST_BITMASK;
|
||||
if (inst == QPACK_ENC_INST_DUP) {
|
||||
/* Duplicate */
|
||||
}
|
||||
@ -129,19 +130,19 @@ int qpack_decode_enc(struct h3_uqs *h3_uqs, void *ctx)
|
||||
int qpack_decode_dec(struct h3_uqs *h3_uqs, void *ctx)
|
||||
{
|
||||
size_t len;
|
||||
struct buffer *rxbuf;
|
||||
struct ncbuf *rxbuf;
|
||||
unsigned char inst;
|
||||
|
||||
rxbuf = &h3_uqs->qcs->rx.buf;
|
||||
len = b_data(rxbuf);
|
||||
qpack_debug_hexdump(stderr, "[QPACK-DEC-DEC] ", b_head(rxbuf), 0, len);
|
||||
rxbuf = &h3_uqs->qcs->rx.ncbuf;
|
||||
len = ncb_data(rxbuf, 0);
|
||||
qpack_debug_hexdump(stderr, "[QPACK-DEC-DEC] ", ncb_head(rxbuf), 0, len);
|
||||
|
||||
if (!len) {
|
||||
qpack_debug_printf(stderr, "[QPACK-DEC-DEC] empty stream\n");
|
||||
return 0;
|
||||
}
|
||||
|
||||
inst = (unsigned char)*b_head(rxbuf) & QPACK_DEC_INST_BITMASK;
|
||||
inst = (unsigned char)*ncb_head(rxbuf) & QPACK_DEC_INST_BITMASK;
|
||||
if (inst == QPACK_DEC_INST_ICINC) {
|
||||
/* Insert count increment */
|
||||
}
|
||||
|
@ -38,6 +38,7 @@
|
||||
#include <haproxy/hq_interop.h>
|
||||
#include <haproxy/log.h>
|
||||
#include <haproxy/mux_quic.h>
|
||||
#include <haproxy/ncbuf.h>
|
||||
#include <haproxy/pipe.h>
|
||||
#include <haproxy/proxy.h>
|
||||
#include <haproxy/quic_cc.h>
|
||||
@ -2170,20 +2171,6 @@ struct quic_rx_strm_frm *new_quic_rx_strm_frm(struct quic_stream *stream_frm,
|
||||
return frm;
|
||||
}
|
||||
|
||||
/* Copy as most as possible STREAM data from <strm_frm> into <strm> stream.
|
||||
* Also update <strm_frm> frame to reflect the data which have been consumed.
|
||||
*/
|
||||
static size_t qc_strm_cpy(struct buffer *buf, struct quic_stream *strm_frm)
|
||||
{
|
||||
size_t ret;
|
||||
|
||||
ret = b_putblk(buf, (char *)strm_frm->data, strm_frm->len);
|
||||
strm_frm->len -= ret;
|
||||
strm_frm->offset.key += ret;
|
||||
|
||||
return ret;
|
||||
}
|
||||
|
||||
/* Handle <strm_frm> bidirectional STREAM frame. Depending on its ID, several
|
||||
* streams may be open. The data are copied to the stream RX buffer if possible.
|
||||
* If not, the STREAM frame is stored to be treated again later.
|
||||
@ -2221,8 +2208,7 @@ static int qc_handle_uni_strm_frm(struct quic_rx_packet *pkt,
|
||||
struct quic_conn *qc)
|
||||
{
|
||||
struct qcs *strm;
|
||||
struct quic_rx_strm_frm *frm;
|
||||
size_t strm_frm_len;
|
||||
enum ncb_ret ret;
|
||||
|
||||
strm = qcc_get_qcs(qc->qcc, strm_frm->id);
|
||||
if (!strm) {
|
||||
@ -2246,47 +2232,23 @@ static int qc_handle_uni_strm_frm(struct quic_rx_packet *pkt,
|
||||
strm_frm->data += diff;
|
||||
}
|
||||
|
||||
strm_frm_len = strm_frm->len;
|
||||
if (strm_frm->offset.key == strm->rx.offset) {
|
||||
int ret;
|
||||
qc_get_ncbuf(strm, &strm->rx.ncbuf);
|
||||
if (ncb_is_null(&strm->rx.ncbuf))
|
||||
return 0;
|
||||
|
||||
if (!qc_get_buf(strm, &strm->rx.buf))
|
||||
goto store_frm;
|
||||
ret = ncb_add(&strm->rx.ncbuf, strm_frm->offset.key - strm->rx.offset,
|
||||
(char *)strm_frm->data, strm_frm->len, NCB_ADD_COMPARE);
|
||||
if (ret != NCB_RET_OK)
|
||||
return 0;
|
||||
|
||||
/* qc_strm_cpy() will modify the offset, depending on the number
|
||||
* of bytes copied.
|
||||
*/
|
||||
ret = qc_strm_cpy(&strm->rx.buf, strm_frm);
|
||||
/* Inform the application of the arrival of this new stream */
|
||||
if (!strm->rx.offset && !qc->qcc->app_ops->attach_ruqs(strm, qc->qcc->ctx)) {
|
||||
TRACE_PROTO("Could not set an uni-stream", QUIC_EV_CONN_PSTRM, qc);
|
||||
return 0;
|
||||
}
|
||||
|
||||
if (ret)
|
||||
qcs_notify_recv(strm);
|
||||
|
||||
strm_frm->offset.key += ret;
|
||||
}
|
||||
/* Take this frame into an account for the stream flow control */
|
||||
strm->rx.offset += strm_frm_len;
|
||||
/* It all the data were provided to the application, there is no need to
|
||||
* store any more information for it.
|
||||
*/
|
||||
if (!strm_frm->len)
|
||||
goto out;
|
||||
|
||||
store_frm:
|
||||
frm = new_quic_rx_strm_frm(strm_frm, pkt);
|
||||
if (!frm) {
|
||||
TRACE_PROTO("Could not alloc RX STREAM frame",
|
||||
QUIC_EV_CONN_PSTRM, qc);
|
||||
return 0;
|
||||
}
|
||||
|
||||
eb64_insert(&strm->rx.frms, &frm->offset_node);
|
||||
quic_rx_packet_refinc(pkt);
|
||||
|
||||
out:
|
||||
return 1;
|
||||
}
|
||||
|
Loading…
x
Reference in New Issue
Block a user