MAJOR: mux-quic: implement a simplified mux version

Re-implement the QUIC mux. It will reuse the mechanics from the previous
mux without all untested/unsupported features. This should ease the
maintenance.

Note that a lot of features are broken for the moment. They will be
re-implemented on the following commits to have a clean commit history.
This commit is contained in:
Amaury Denoyelle 2021-12-03 11:36:46 +01:00
parent d1202edadd
commit deed777766
10 changed files with 210 additions and 2051 deletions

View File

@ -427,10 +427,6 @@ struct mux_ops {
int (*show_fd)(struct buffer *, struct connection *); /* append some data about connection into chunk for "show fd"; returns non-zero if suspicious */ int (*show_fd)(struct buffer *, struct connection *); /* append some data about connection into chunk for "show fd"; returns non-zero if suspicious */
int (*subscribe)(struct conn_stream *cs, int event_type, struct wait_event *es); /* Subscribe <es> to events, such as "being able to send" */ int (*subscribe)(struct conn_stream *cs, int event_type, struct wait_event *es); /* Subscribe <es> to events, such as "being able to send" */
int (*unsubscribe)(struct conn_stream *cs, int event_type, struct wait_event *es); /* Unsubscribe <es> from events */ int (*unsubscribe)(struct conn_stream *cs, int event_type, struct wait_event *es); /* Unsubscribe <es> from events */
int (*ruqs_subscribe)(struct qcs *qcs, int event_type, struct wait_event *es); /* Subscribe <es> to events, "being able to receive" only */
int (*ruqs_unsubscribe)(struct qcs *qcs, int event_type, struct wait_event *es); /* Unsubscribe <es> from events */
int (*luqs_subscribe)(struct qcs *qcs, int event_type, struct wait_event *es); /* Subscribe <es> to events, "being able to send" only */
int (*luqs_unsubscribe)(struct qcs *qcs, int event_type, struct wait_event *es); /* Unsubscribe <es> from events */
int (*avail_streams)(struct connection *conn); /* Returns the number of streams still available for a connection */ int (*avail_streams)(struct connection *conn); /* Returns the number of streams still available for a connection */
int (*avail_streams_bidi)(struct connection *conn); /* Returns the number of bidirectional streams still available for a connection */ int (*avail_streams_bidi)(struct connection *conn); /* Returns the number of bidirectional streams still available for a connection */
int (*avail_streams_uni)(struct connection *conn); /* Returns the number of unidirectional streams still available for a connection */ int (*avail_streams_uni)(struct connection *conn); /* Returns the number of unidirectional streams still available for a connection */

View File

@ -1,85 +1,15 @@
/*
* include/haproxy/mux_quic-t.h
* This file contains types for QUIC mux-demux.
*
* Copyright 2021 HAProxy Technologies, Frédéric Lécaille <flecaille@haproxy.com>
*
* This library is free software; you can redistribute it and/or
* modify it under the terms of the GNU Lesser General Public
* License as published by the Free Software Foundation, version 2.1
* exclusively.
*
* This library is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
* Lesser General Public License for more details.
*
* You should have received a copy of the GNU Lesser General Public
* License along with this library; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
*/
#ifndef _HAPROXY_MUX_QUIC_T_H #ifndef _HAPROXY_MUX_QUIC_T_H
#define _HAPROXY_MUX_QUIC_T_H #define _HAPROXY_MUX_QUIC_T_H
#ifdef USE_QUIC #ifdef USE_QUIC
#ifndef USE_OPENSSL #ifndef USE_OPENSSL
#error "Must define USE_OPENSSL" #error "Must define USE_OPENSSL"
#endif #endif
#include <stdint.h> #include <import/ebtree-t.h>
#include <haproxy/buf-t.h> #include <haproxy/buf-t.h>
#include <haproxy/connection-t.h> #include <haproxy/connection-t.h>
#include <haproxy/dynbuf-t.h>
#include <import/ebtree-t.h>
/* Bit shift to get the stream sub ID for internal use which is obtained
* shifting the stream IDs by this value, knowing that the
* QCS_ID_TYPE_SHIFT less significant bits identify the stream ID
* types (client initiated bidirectional, server initiated bidirectional,
* client initiated unidirectional, server initiated bidirectional).
* Note that there is no reference to such stream sub IDs in the RFC.
*/
#define QCS_ID_TYPE_MASK 0x3
#define QCS_ID_TYPE_SHIFT 2
/* The less significant bit of a stream ID is set for a server initiated stream */
#define QCS_ID_SRV_INTIATOR_BIT 0x1
/* This bit is set for unidirectional streams */
#define QCS_ID_DIR_BIT 0x2
#define QCS_ID_DIR_BIT_SHIFT 1
#define OUQS_SF_TXBUF_MALLOC 0x00000001
#define OUQS_SF_TXBUF_FULL 0x00000002
/* Connection flags (32 bit), in qcc->flags */
#define QC_CF_NONE 0x00000000
/* Flags indicating why writing to the mux is blocked. */
#define QC_CF_MUX_MALLOC 0x00000001 // mux blocked on lack of connection's mux buffer
#define QC_CF_MUX_MFULL 0x00000002 // mux blocked on connection's mux buffer full
#define QC_CF_MUX_BLOCK_ANY 0x00000003 // aggregate of the mux flags above
/* Flags indicating why writing to the demux is blocked.
* The first two ones directly affect the ability for the mux to receive data
* from the connection. The other ones affect the mux's ability to demux
* received data.
*/
#define QC_CF_DEM_DFULL 0x00000004 // demux blocked on connection's demux buffer full
#define QC_CF_DEM_MBUSY 0x00000008 // demux blocked on connection's mux side busy
#define QC_CF_DEM_MROOM 0x00000010 // demux blocked on lack of room in mux buffer
#define QC_CF_DEM_SALLOC 0x00000020 // demux blocked on lack of stream's request buffer
#define QC_CF_DEM_SFULL 0x00000040 // demux blocked on stream request buffer full
#define QC_CF_DEM_TOOMANY 0x00000100 // demux blocked waiting for some conn_streams to leave
#define QC_CF_DEM_BLOCK_ANY 0x00000170 // aggregate of the demux flags above except DFULL
/* other flags */
#define QC_CF_IS_BACK 0x00008000 // this is an outgoing connection
#define QC_CF_CC_RECV 0x00010000 // CONNECTION_CLOSE received
extern struct pool_head *pool_head_qcs;
/* Stream types */ /* Stream types */
enum qcs_type { enum qcs_type {
@ -87,32 +17,15 @@ enum qcs_type {
QCS_SRV_BIDI, QCS_SRV_BIDI,
QCS_CLT_UNI, QCS_CLT_UNI,
QCS_SRV_UNI, QCS_SRV_UNI,
/* Must be the last one */ /* Must be the last one */
QCS_MAX_TYPES, QCS_MAX_TYPES
}; };
/* Stream direction types */
enum qcs_dir {
QCS_BIDI = 0,
QCS_UNI = 1,
/* Must be the last one */
QCS_MAX_DIR = 2,
};
/* QUIC connection state, in qcc->st0 */
enum qc_cs {
/* Initial state */
QC_CS_NOERR,
QC_CS_ERROR,
};
/* QUIC connection descriptor */
struct qcc { struct qcc {
struct connection *conn; /* mux state */ struct connection *conn;
enum qc_cs st0; /* connection flags: QC_CF_* */
unsigned int errcode;
uint32_t flags; uint32_t flags;
/* Stream information, one by direction and by initiator */
struct { struct {
uint64_t max_streams; /* maximum number of concurrent streams */ uint64_t max_streams; /* maximum number of concurrent streams */
uint64_t largest_id; /* Largest ID of the open streams */ uint64_t largest_id; /* Largest ID of the open streams */
@ -126,112 +39,44 @@ struct qcc {
uint64_t bytes; /* Number of bytes sent */ uint64_t bytes; /* Number of bytes sent */
} tx; } tx;
} strms[QCS_MAX_TYPES]; } strms[QCS_MAX_TYPES];
struct {
uint64_t max_data; /* Maximum number of bytes which may be received */
uint64_t bytes; /* Number of bytes received */
uint64_t inmux; /* Number of bytes received but not already demultiplexed. */
} rx;
struct { struct {
uint64_t max_data; /* Maximum number of bytes which may be sent */ uint64_t max_data; /* Maximum number of bytes which may be sent */
uint64_t bytes; /* Number of bytes sent */
} tx; } tx;
struct eb_root streams_by_id; /* all active streams by their ID */ struct eb_root streams_by_id; /* all active streams by their ID */
int timeout; /* idle timeout duration in ticks */
int shut_timeout; /* idle timeout duration in ticks after GOAWAY was sent */
unsigned int nb_cs; /* number of attached conn_streams */
unsigned int stream_cnt; /* total number of streams seen */
struct proxy *proxy; /* the proxy this connection was created for */
struct task *task; /* timeout management task */
struct qc_counters *px_counters; /* quic counters attached to proxy */
struct list send_list; /* list of blocked streams requesting to send */
struct list fctl_list; /* list of streams blocked by connection's fctl */
struct list blocked_list; /* list of streams blocked for other reasons (e.g. sfctl, dep) */
struct buffer_wait buf_wait; /* wait list for buffer allocations */
struct wait_event wait_event; /* To be used if we're waiting for I/Os */ struct wait_event wait_event; /* To be used if we're waiting for I/Os */
struct wait_event *subs; /* recv wait_event the mux associated is waiting on (via quic_conn_subscribe) */ struct wait_event *subs;
struct mt_list qcs_rxbuf_wlist; /* list of streams waiting for their rxbuf */
void *ctx; /* Application layer context */
const struct qcc_app_ops *app_ops; const struct qcc_app_ops *app_ops;
void *ctx; /* Application layer context */
}; };
/* QUIC RX states */
enum qcs_rx_st {
QC_RX_SS_IDLE = 0, // idle
QC_RX_SS_RECV, // receive
QC_RX_SS_SIZE_KNOWN, // stream size known
/* Terminal states */
QC_RX_SS_DATA_RECVD, // all data received
QC_RX_SS_DATA_READ, // app. read all data
QC_RX_SS_RST_RECVD, // reset received
QC_RX_SS_RST_READ, // app. read reset
};
/* QUIC TX states */
enum qcs_tx_st {
QC_TX_SS_IDLE = 0,
QC_TX_SS_READY, // ready
QC_TX_SS_SEND, // send
QC_TX_SS_DATA_SENT, // all data sent
/* Terminal states */
QC_TX_SS_DATA_RECVD, // all data received
QC_TX_SS_RST_SENT, // reset sent
QC_TX_SS_RST_RECVD, // reset received
};
/* QUIC stream flags (32 bit), in qcs->flags */
#define QC_SF_NONE 0x00000000 #define QC_SF_NONE 0x00000000
#define QC_SF_TXBUF_MALLOC 0x00000001 // blocked on lack of TX buffer
/* stream flags indicating the reason the stream is blocked */
#define QC_SF_BLK_MBUSY 0x00000010 // blocked waiting for mux access (transient)
#define QC_SF_BLK_MROOM 0x00000020 // blocked waiting for room in the mux (must be in send list)
#define QC_SF_BLK_MFCTL 0x00000040 // blocked due to mux fctl (must be in fctl list)
#define QC_SF_BLK_SFCTL 0x00000080 // blocked due to stream fctl (must be in blocked list)
#define QC_SF_BLK_ANY 0x000000F0 // any of the reasons above
#define QC_SF_NOTIFIED 0x00000800 // a paused stream was notified to try to send again
#define QC_SF_WANT_SHUTR 0x00008000 // a stream couldn't shutr() (mux full/busy)
#define QC_SF_WANT_SHUTW 0x00010000 // a stream couldn't shutw() (mux full/busy)
#define QC_SF_KILL_CONN 0x00020000 // kill the whole connection with this stream
#define QC_SF_FIN_STREAM 0x00040000 // FIN bit must be set for last frame of the stream
#define QC_SF_DETACH 0x00080000
/* QUIC stream descriptor, describing the stream as it appears in the QUIC_CONN, and as
* it is being processed in the internal HTTP representation (HTX).
*/
struct qcs { struct qcs {
struct conn_stream *cs;
struct session *sess;
struct qcc *qcc; struct qcc *qcc;
struct eb64_node by_id; /* place in qcc's streams_by_id */ struct conn_stream *cs;
uint64_t id; /* stream ID */
uint32_t flags; /* QC_SF_* */ uint32_t flags; /* QC_SF_* */
struct { struct {
enum qcs_rx_st st; /* RX state */
uint64_t max_data; /* maximum number of bytes which may be received */
uint64_t offset; /* the current offset of received data */
uint64_t bytes; /* number of bytes received */
struct buffer buf; /* receive buffer, always valid (buf_empty or real buffer) */
struct eb_root frms; /* received frames ordered by their offsets */ struct eb_root frms; /* received frames ordered by their offsets */
uint64_t offset; /* the current offset of received data */
struct buffer buf; /* receive buffer, always valid (buf_empty or real buffer) */
} rx; } rx;
struct { struct {
enum qcs_tx_st st; /* TX state */ uint64_t offset; /* the current offset of received data */
uint64_t max_data; /* maximum number of bytes which may be sent */
uint64_t offset; /* the current offset of data to send */
uint64_t bytes; /* number of bytes sent */
uint64_t ack_offset; /* last acked ordered byte offset */
struct eb_root acked_frms; /* acked frames ordered by their offsets */ struct eb_root acked_frms; /* acked frames ordered by their offsets */
struct buffer buf; /* transmit buffer before sending via xprt */ uint64_t ack_offset; /* last acked ordered byte offset */
struct buffer buf; /* transmit buffer before sending via xprt */
struct buffer xprt_buf; /* buffer for xprt sending, cleared on ACK. */ struct buffer xprt_buf; /* buffer for xprt sending, cleared on ACK. */
} tx; } tx;
struct wait_event *subs; /* recv wait_event the conn_stream associated is waiting on (via qc_subscribe) */
struct list list; /* To be used when adding in qcc->send_list or qcc->fctl_lsit */ struct eb64_node by_id; /* place in qcc's streams_by_id */
struct tasklet *shut_tl; /* deferred shutdown tasklet, to retry to send an RST after we failed to,
* in case there's no other subscription to do it */ struct wait_event wait_event;
struct wait_event *subs;
}; };
/* QUIC application layer operations */ /* QUIC application layer operations */
@ -244,4 +89,5 @@ struct qcc_app_ops {
}; };
#endif /* USE_QUIC */ #endif /* USE_QUIC */
#endif /* _HAPROXY_MUX_QUIC_T_H */ #endif /* _HAPROXY_MUX_QUIC_T_H */

View File

@ -1,125 +1,53 @@
/*
* include/haproxy/mux_quic-t.h
* This file contains prototypes for QUIC mux-demux.
*
* Copyright 2021 HAProxy Technologies, Frédéric Lécaille <flecaille@haproxy.com>
*
* This library is free software; you can redistribute it and/or
* modify it under the terms of the GNU Lesser General Public
* License as published by the Free Software Foundation, version 2.1
* exclusively.
*
* This library is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
* Lesser General Public License for more details.
*
* You should have received a copy of the GNU Lesser General Public
* License along with this library; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
*/
#ifndef _HAPROXY_MUX_QUIC_H #ifndef _HAPROXY_MUX_QUIC_H
#define _HAPROXY_MUX_QUIC_H #define _HAPROXY_MUX_QUIC_H
#ifdef USE_QUIC #ifdef USE_QUIC
#ifndef USE_OPENSSL #ifndef USE_OPENSSL
#error "Must define USE_OPENSSL" #error "Must define USE_OPENSSL"
#endif #endif
#include <haproxy/buf-t.h> #include <haproxy/bug.h>
#include <haproxy/mux_quic-t.h> #include <haproxy/mux_quic-t.h>
#include <haproxy/obj_type.h>
void quic_mux_transport_params_update(struct qcc *qcc); void quic_mux_transport_params_update(struct qcc *qcc);
void qc_error(struct qcc *qcc, int err); struct qcs *qcs_new(struct qcc *qcc, uint64_t id, enum qcs_type type);
void uni_qcs_free(struct qcs *qcs);
struct buffer *qc_get_buf(struct qcs *qcs, struct buffer *bptr); struct buffer *qc_get_buf(struct qcs *qcs, struct buffer *bptr);
struct qcs *qcc_get_stream(struct qcc *qcc, uint64_t id);
struct qcs *bidi_qcs_new(struct qcc *qcc, uint64_t id);
struct qcs *luqs_new(struct qcc *qcc);
struct qcs *ruqs_new(struct qcc *qcc, uint64_t id);
size_t luqs_snd_buf(struct qcs *qcs, struct buffer *buf, size_t count, int flags);
void qcs_release(struct qcs *qcs);
void ruqs_notify_recv(struct qcs *qcs); /* Bit shift to get the stream sub ID for internal use which is obtained
* shifting the stream IDs by this value, knowing that the
/* Return 1 if the stream with <id> as ID attached to <qcc> connection * QCS_ID_TYPE_SHIFT less significant bits identify the stream ID
* has been locally initiated, 0 if not. * types (client initiated bidirectional, server initiated bidirectional,
* client initiated unidirectional, server initiated bidirectional).
* Note that there is no reference to such stream sub IDs in the RFC.
*/ */
static inline int qc_local_stream_id(struct qcc *qcc, uint64_t id) #define QCS_ID_TYPE_MASK 0x3
{ #define QCS_ID_TYPE_SHIFT 2
if ((objt_listener(qcc->conn->target) && (id & QCS_ID_SRV_INTIATOR_BIT)) || /* The less significant bit of a stream ID is set for a server initiated stream */
(objt_server(qcc->conn->target) && !(id & QCS_ID_SRV_INTIATOR_BIT))) #define QCS_ID_SRV_INTIATOR_BIT 0x1
return 1; /* This bit is set for unidirectional streams */
#define QCS_ID_DIR_BIT 0x2
return 0;
}
/* Return 1 if <qcs> stream has been locally initiated, 0 if not. */
static inline int qcs_local(struct qcs *qcs)
{
if ((objt_listener(qcs->qcc->conn->target) && (qcs->id & QCS_ID_SRV_INTIATOR_BIT)) ||
(objt_server(qcs->qcc->conn->target) && !(qcs->id & QCS_ID_SRV_INTIATOR_BIT)))
return 1;
return 0;
}
/* Return the direction of a stream with <id> as ID. */
static inline enum qcs_dir qcs_id_dir(uint64_t id)
{
return (id & QCS_ID_DIR_BIT) >> QCS_ID_DIR_BIT_SHIFT;
}
/* Return the direction of <qcs> QUIC stream. */
static inline enum qcs_dir qcs_dir(struct qcs *qcs)
{
return (qcs->id & QCS_ID_DIR_BIT) >> QCS_ID_DIR_BIT_SHIFT;
}
static inline enum qcs_type qcs_id_type(uint64_t id) static inline enum qcs_type qcs_id_type(uint64_t id)
{ {
return id & QCS_ID_TYPE_MASK; return id & QCS_ID_TYPE_MASK;
} }
static inline enum qcs_type qcs_type_from_dir(struct qcc *qcc, enum qcs_dir dir) /* Return 1 if the stream with <id> as ID attached to <qcc> connection
* has been locally initiated, 0 if not.
*/
static inline int qc_local_stream_id(struct qcc *qcc, uint64_t id)
{ {
return (dir << QCS_ID_DIR_BIT_SHIFT) | return id & QCS_ID_SRV_INTIATOR_BIT;
(!!objt_listener(qcc->conn->target) ? QCS_ID_SRV_INTIATOR_BIT : 0);
} }
static inline int64_t qcc_wnd(struct qcc *qcc) static inline int qcs_get_next_id(struct qcc *qcc, enum qcs_type type)
{ {
return qcc->tx.max_data - qcc->tx.bytes; BUG_ON(qcc->strms[type].nb_streams + 1 > qcc->strms[type].max_streams);
} return (qcc->strms[type].nb_streams++ << QCS_ID_TYPE_SHIFT) | type;
/* Return 1 if <qcs> is unidirectional, 0 if not. */
static inline int qcs_uni(struct qcs *qcs)
{
return qcs->id & QCS_ID_DIR_BIT;
}
/* Return 1 if <qcs> is bidirectional, 0 if not. */
static inline int qcs_bidi(struct qcs *qcs)
{
return !qcs_uni(qcs);
}
/* Return the next stream ID with <qcs_type> as type if succeeded, (uint64_t)-1 if not. */
static inline uint64_t qcs_next_id(struct qcc *qcc, enum qcs_type qcs_type)
{
if (qcc->strms[qcs_type].nb_streams + 1 > qcc->strms[qcs_type].max_streams)
return (uint64_t)-1;
return (qcc->strms[qcs_type].nb_streams++ << QCS_ID_TYPE_SHIFT) | qcs_type;
}
static inline void *qcs_new(struct qcc *qcc, uint64_t id)
{
if (id & QCS_ID_DIR_BIT)
return ruqs_new(qcc, id);
else
return bidi_qcs_new(qcc, id);
} }
#endif /* USE_QUIC */ #endif /* USE_QUIC */
#endif /* _HAPROXY_MUX_QUIC_H */ #endif /* _HAPROXY_MUX_QUIC_H */

View File

@ -21,8 +21,6 @@
#ifndef _HAPROXY_QPACK_DEC_H #ifndef _HAPROXY_QPACK_DEC_H
#define _HAPROXY_QPACK_DEC_H #define _HAPROXY_QPACK_DEC_H
#include <haproxy/mux_quic-t.h>
struct h3_uqs; struct h3_uqs;
struct http_hdr; struct http_hdr;

View File

@ -33,6 +33,8 @@
#include <import/ebtree-t.h> #include <import/ebtree-t.h>
#include <haproxy/mux_quic-t.h>
/* QUIC frame types. */ /* QUIC frame types. */
enum quic_frame_type { enum quic_frame_type {
QUIC_FT_PADDING = 0x00, QUIC_FT_PADDING = 0x00,

View File

@ -32,6 +32,7 @@
#include <haproxy/cbuf-t.h> #include <haproxy/cbuf-t.h>
#include <haproxy/list.h> #include <haproxy/list.h>
#include <haproxy/mux_quic-t.h>
#include <haproxy/quic_cc-t.h> #include <haproxy/quic_cc-t.h>
#include <haproxy/quic_frame-t.h> #include <haproxy/quic_frame-t.h>
#include <haproxy/quic_tls-t.h> #include <haproxy/quic_tls-t.h>

View File

@ -23,7 +23,7 @@
#include <haproxy/http.h> #include <haproxy/http.h>
#include <haproxy/htx.h> #include <haproxy/htx.h>
#include <haproxy/istbuf.h> #include <haproxy/istbuf.h>
#include <haproxy/mux_quic.h> #include <haproxy/mux_quic-t.h>
#include <haproxy/pool.h> #include <haproxy/pool.h>
#include <haproxy/qpack-dec.h> #include <haproxy/qpack-dec.h>
#include <haproxy/qpack-enc.h> #include <haproxy/qpack-enc.h>
@ -322,8 +322,9 @@ static int h3_control_recv(struct h3_uqs *h3_uqs, void *ctx)
b_del(rxbuf, flen); b_del(rxbuf, flen);
} }
if (b_data(rxbuf)) /* TODO handle the case when the buffer is not empty. This can happens
h3->qcc->conn->mux->ruqs_subscribe(h3_uqs->qcs, SUB_RETRY_RECV, &h3->rctrl.wait_event); * if there is an incomplete frame.
*/
return 1; return 1;
} }
@ -533,10 +534,8 @@ static int h3_resp_data_send(struct qcs *qcs, struct buffer *buf, size_t count)
/* not enough room for headers and at least one data byte, block the /* not enough room for headers and at least one data byte, block the
* stream * stream
*/ */
if (b_size(&outbuf) <= hsize) { if (b_size(&outbuf) <= hsize)
qcs->flags |= QC_SF_BLK_MROOM; ABORT_NOW();
goto end;
}
if (b_size(&outbuf) < hsize + fsize) if (b_size(&outbuf) < hsize + fsize)
fsize = b_size(&outbuf) - hsize; fsize = b_size(&outbuf) - hsize;
@ -573,9 +572,11 @@ size_t h3_snd_buf(struct conn_stream *cs, struct buffer *buf, size_t count, int
int32_t idx; int32_t idx;
int ret; int ret;
fprintf(stderr, "%s\n", __func__);
htx = htx_from_buf(buf); htx = htx_from_buf(buf);
while (count && !htx_is_empty(htx) && !(qcs->flags & QC_SF_BLK_MROOM)) { while (count && !htx_is_empty(htx)) {
idx = htx_get_head(htx); idx = htx_get_head(htx);
blk = htx_get_blk(htx, idx); blk = htx_get_blk(htx, idx);
btype = htx_get_blk_type(blk); btype = htx_get_blk_type(blk);
@ -619,9 +620,6 @@ size_t h3_snd_buf(struct conn_stream *cs, struct buffer *buf, size_t count, int
} }
} }
if ((htx->flags & HTX_FL_EOM) && htx_is_empty(htx))
qcs->flags |= QC_SF_FIN_STREAM;
out: out:
if (total) { if (total) {
if (!(qcs->qcc->wait_event.events & SUB_RETRY_SEND)) if (!(qcs->qcc->wait_event.events & SUB_RETRY_SEND))
@ -657,7 +655,7 @@ static int h3_attach_ruqs(struct qcs *qcs, void *ctx)
h3->rctrl.qcs = qcs; h3->rctrl.qcs = qcs;
h3->rctrl.cb = h3_control_recv; h3->rctrl.cb = h3_control_recv;
h3->qcc->conn->mux->ruqs_subscribe(qcs, SUB_RETRY_RECV, &h3->rctrl.wait_event); // TODO wake-up rctrl tasklet on reception
break; break;
case H3_UNI_STRM_TP_PUSH_STREAM: case H3_UNI_STRM_TP_PUSH_STREAM:
/* NOT SUPPORTED */ /* NOT SUPPORTED */
@ -670,7 +668,7 @@ static int h3_attach_ruqs(struct qcs *qcs, void *ctx)
h3->rqpack_enc.qcs = qcs; h3->rqpack_enc.qcs = qcs;
h3->rqpack_enc.cb = qpack_decode_enc; h3->rqpack_enc.cb = qpack_decode_enc;
h3->qcc->conn->mux->ruqs_subscribe(qcs, SUB_RETRY_RECV, &h3->rqpack_enc.wait_event); // TODO wake-up rqpack_enc tasklet on reception
break; break;
case H3_UNI_STRM_TP_QPACK_DECODER: case H3_UNI_STRM_TP_QPACK_DECODER:
if (h3->rqpack_dec.qcs) { if (h3->rqpack_dec.qcs) {
@ -680,7 +678,7 @@ static int h3_attach_ruqs(struct qcs *qcs, void *ctx)
h3->rqpack_dec.qcs = qcs; h3->rqpack_dec.qcs = qcs;
h3->rqpack_dec.cb = qpack_decode_dec; h3->rqpack_dec.cb = qpack_decode_dec;
h3->qcc->conn->mux->ruqs_subscribe(qcs, SUB_RETRY_RECV, &h3->rqpack_dec.wait_event); // TODO wake-up rqpack_dec tasklet on reception
break; break;
default: default:
/* Error */ /* Error */
@ -694,8 +692,10 @@ static int h3_attach_ruqs(struct qcs *qcs, void *ctx)
static int h3_finalize(void *ctx) static int h3_finalize(void *ctx)
{ {
struct h3 *h3 = ctx; struct h3 *h3 = ctx;
int lctrl_id;
h3->lctrl.qcs = luqs_new(h3->qcc); lctrl_id = qcs_get_next_id(h3->qcc, QCS_SRV_UNI);
h3->lctrl.qcs = qcs_new(h3->qcc, lctrl_id, QCS_SRV_UNI);
if (!h3->lctrl.qcs) if (!h3->lctrl.qcs)
return 0; return 0;
@ -766,7 +766,7 @@ static int h3_uqs_init(struct h3_uqs *h3_uqs, struct h3 *h3,
static inline void h3_uqs_release(struct h3_uqs *h3_uqs) static inline void h3_uqs_release(struct h3_uqs *h3_uqs)
{ {
if (h3_uqs->qcs) if (h3_uqs->qcs)
qcs_release(h3_uqs->qcs); uni_qcs_free(h3_uqs->qcs);
} }
static inline void h3_uqs_release_all(struct h3 *h3) static inline void h3_uqs_release_all(struct h3 *h3)

View File

@ -81,7 +81,7 @@ static size_t hq_interop_snd_buf(struct conn_stream *cs, struct buffer *buf,
res = mux_get_buf(qcs); res = mux_get_buf(qcs);
outbuf = b_make(b_tail(res), b_contig_space(res), 0, 0); outbuf = b_make(b_tail(res), b_contig_space(res), 0, 0);
while (count && !htx_is_empty(htx) && !(qcs->flags & QC_SF_BLK_MROOM)) { while (count && !htx_is_empty(htx)) {
/* Not implemented : QUIC on backend side */ /* Not implemented : QUIC on backend side */
idx = htx_get_head(htx); idx = htx_get_head(htx);
blk = htx_get_blk(htx, idx); blk = htx_get_blk(htx, idx);
@ -116,9 +116,6 @@ static size_t hq_interop_snd_buf(struct conn_stream *cs, struct buffer *buf,
} }
} }
if ((htx->flags & HTX_FL_EOM) && htx_is_empty(htx))
qcs->flags |= QC_SF_FIN_STREAM;
b_add(res, b_data(&outbuf)); b_add(res, b_data(&outbuf));
if (total) { if (total) {

File diff suppressed because it is too large Load Diff

View File

@ -1426,16 +1426,12 @@ static inline void qc_treat_acked_tx_frm(struct quic_frame *frm,
qcs->tx.ack_offset += strm->len; qcs->tx.ack_offset += strm->len;
LIST_DELETE(&frm->list); LIST_DELETE(&frm->list);
pool_free(pool_head_quic_frame, frm); pool_free(pool_head_quic_frame, frm);
qc->qcc->flags &= ~QC_CF_MUX_MFULL;
stream_acked = 1; stream_acked = 1;
} }
else { else {
eb64_insert(&qcs->tx.acked_frms, &strm->offset); eb64_insert(&qcs->tx.acked_frms, &strm->offset);
} }
stream_acked |= qcs_try_to_consume(qcs); stream_acked |= qcs_try_to_consume(qcs);
if (qcs->flags & QC_SF_DETACH)
tasklet_wakeup(qcs->shut_tl);
} }
break; break;
default: default:
@ -1989,7 +1985,9 @@ static struct eb64_node *qcc_get_qcs(struct qcc *qcc, uint64_t id)
qcs = NULL; qcs = NULL;
for (i = largest_id + 1; i <= sub_id; i++) { for (i = largest_id + 1; i <= sub_id; i++) {
qcs = qcs_new(qcc, (i << QCS_ID_TYPE_SHIFT) | strm_type); uint64_t id = (i << QCS_ID_TYPE_SHIFT) | strm_type;
enum qcs_type type = id & QCS_ID_DIR_BIT ? QCS_CLT_UNI : QCS_CLT_BIDI;
qcs = qcs_new(qcc, id, type);
if (!qcs) { if (!qcs) {
TRACE_PROTO("Could not allocate a new stream", TRACE_PROTO("Could not allocate a new stream",
QUIC_EV_CONN_PSTRM, qcc->conn); QUIC_EV_CONN_PSTRM, qcc->conn);
@ -2151,9 +2149,6 @@ static int qc_handle_uni_strm_frm(struct quic_rx_packet *pkt,
return 0; return 0;
} }
if (ret)
ruqs_notify_recv(strm);
strm_frm->offset.key += ret; strm_frm->offset.key += ret;
} }
/* Take this frame into an account for the stream flow control */ /* Take this frame into an account for the stream flow control */
@ -2297,7 +2292,7 @@ static int qc_parse_pkt_frms(struct quic_rx_packet *pkt, struct ssl_sock_ctx *ct
break; break;
case QUIC_FT_CONNECTION_CLOSE: case QUIC_FT_CONNECTION_CLOSE:
case QUIC_FT_CONNECTION_CLOSE_APP: case QUIC_FT_CONNECTION_CLOSE_APP:
conn->qcc->flags |= QC_CF_CC_RECV; /* TODO warn the mux to close the connection */
break; break;
case QUIC_FT_HANDSHAKE_DONE: case QUIC_FT_HANDSHAKE_DONE:
if (objt_listener(ctx->conn->target)) if (objt_listener(ctx->conn->target))