From 0c6a64cd5f20cd4489c47f625737727a4ad8525a Mon Sep 17 00:00:00 2001 From: Christopher Faulet Date: Fri, 1 Apr 2022 08:58:29 +0200 Subject: [PATCH] MEDIUM: stream-int/conn-stream: Move si_ops in the conn-stream scope The si_ops structure is renamed to cs_app_ops and the callback functions are changed to manipulate a conn-stream instead of a stream-interface.. --- include/haproxy/conn_stream-t.h | 9 + include/haproxy/stream_interface-t.h | 9 - include/haproxy/stream_interface.h | 15 +- src/conn_stream.c | 15 +- src/stream_interface.c | 299 ++++++++++++++------------- 5 files changed, 175 insertions(+), 172 deletions(-) diff --git a/include/haproxy/conn_stream-t.h b/include/haproxy/conn_stream-t.h index 3491cfed7..ef13a2098 100644 --- a/include/haproxy/conn_stream-t.h +++ b/include/haproxy/conn_stream-t.h @@ -147,6 +147,14 @@ struct cs_endpoint { unsigned int flags; }; +/* operations available on a conn-stream */ +struct cs_app_ops { + void (*chk_rcv)(struct conn_stream *); /* chk_rcv function, may not be null */ + void (*chk_snd)(struct conn_stream *); /* chk_snd function, may not be null */ + void (*shutr)(struct conn_stream *); /* shut read function, may not be null */ + void (*shutw)(struct conn_stream *); /* shut write function, may not be null */ +}; + /* * This structure describes the elements of a connection relevant to a stream */ @@ -162,6 +170,7 @@ struct conn_stream { enum obj_type *app; /* points to the applicative point (stream or check) */ struct stream_interface *si; const struct data_cb *data_cb; /* data layer callbacks. Must be set before xprt->init() */ + struct cs_app_ops *ops; /* general operations used at the app layer */ struct sockaddr_storage *src; /* source address (pool), when known, otherwise NULL */ struct sockaddr_storage *dst; /* destination address (pool), when known, otherwise NULL */ }; diff --git a/include/haproxy/stream_interface-t.h b/include/haproxy/stream_interface-t.h index 2b44e30cd..ea02d6c17 100644 --- a/include/haproxy/stream_interface-t.h +++ b/include/haproxy/stream_interface-t.h @@ -62,15 +62,6 @@ struct stream_interface { /* 16-bit hole here */ unsigned int flags; /* SI_FL_* */ struct conn_stream *cs; /* points to the conn-streams that owns the endpoint (connection or applet) */ - struct si_ops *ops; /* general operations at the stream interface layer */ -}; - -/* operations available on a stream-interface */ -struct si_ops { - void (*chk_rcv)(struct stream_interface *); /* chk_rcv function, may not be null */ - void (*chk_snd)(struct stream_interface *); /* chk_snd function, may not be null */ - void (*shutr)(struct stream_interface *); /* shut read function, may not be null */ - void (*shutw)(struct stream_interface *); /* shut write function, may not be null */ }; #endif /* _HAPROXY_STREAM_INTERFACE_T_H */ diff --git a/include/haproxy/stream_interface.h b/include/haproxy/stream_interface.h index 2a75d6a6e..cdc13200a 100644 --- a/include/haproxy/stream_interface.h +++ b/include/haproxy/stream_interface.h @@ -30,9 +30,9 @@ #include #include -extern struct si_ops si_embedded_ops; -extern struct si_ops si_conn_ops; -extern struct si_ops si_applet_ops; +extern struct cs_app_ops cs_app_embedded_ops; +extern struct cs_app_ops cs_app_conn_ops; +extern struct cs_app_ops cs_app_applet_ops; extern struct data_cb si_conn_cb; extern struct data_cb check_conn_cb; @@ -107,7 +107,6 @@ static inline int si_init(struct stream_interface *si) { si->flags &= SI_FL_ISBACK; si->cs = NULL; - si->ops = &si_embedded_ops; return 0; } @@ -276,13 +275,13 @@ static inline int si_alloc_ibuf(struct stream_interface *si, struct buffer_wait /* Sends a shutr to the endpoint using the data layer */ static inline void cs_shutr(struct conn_stream *cs) { - cs->si->ops->shutr(cs->si); + cs->ops->shutr(cs); } /* Sends a shutw to the endpoint using the data layer */ static inline void cs_shutw(struct conn_stream *cs) { - cs->si->ops->shutw(cs->si); + cs->ops->shutw(cs); } /* This is to be used after making some room available in a channel. It will @@ -303,13 +302,13 @@ static inline void cs_chk_rcv(struct conn_stream *cs) return; cs->si->flags |= SI_FL_RX_WAIT_EP; - cs->si->ops->chk_rcv(cs->si); + cs->ops->chk_rcv(cs); } /* Calls chk_snd on the endpoint using the data layer */ static inline void cs_chk_snd(struct conn_stream *cs) { - cs->si->ops->chk_snd(cs->si); + cs->ops->chk_snd(cs); } /* Combines both si_update_rx() and si_update_tx() at once */ diff --git a/src/conn_stream.c b/src/conn_stream.c index 68db216f2..2589dc148 100644 --- a/src/conn_stream.c +++ b/src/conn_stream.c @@ -127,8 +127,9 @@ struct conn_stream *cs_new_from_strm(struct stream *strm, unsigned int flags) cs_free(cs); return NULL; } + cs->app = &strm->obj_type; - cs->si->ops = &si_embedded_ops; + cs->ops = &cs_app_embedded_ops; cs->data_cb = NULL; return cs; } @@ -186,7 +187,7 @@ int cs_attach_mux(struct conn_stream *cs, void *target, void *ctx) cs->wait_event.events = 0; } - cs->si->ops = &si_conn_ops; + cs->ops = &cs_app_conn_ops; cs->data_cb = &si_conn_cb; } else if (cs_check(cs)) @@ -205,7 +206,7 @@ void cs_attach_applet(struct conn_stream *cs, void *target, void *ctx) cs->endp->flags &= ~CS_EP_DETACHED; appctx->owner = cs; if (cs_strm(cs)) { - cs->si->ops = &si_applet_ops; + cs->ops = &cs_app_applet_ops; cs->data_cb = NULL; } } @@ -231,15 +232,15 @@ int cs_attach_strm(struct conn_stream *cs, struct stream *strm) cs->wait_event.tasklet->context = cs->si; cs->wait_event.events = 0; - cs->si->ops = &si_conn_ops; + cs->ops = &cs_app_conn_ops; cs->data_cb = &si_conn_cb; } else if (cs->endp->flags & CS_EP_T_APPLET) { - cs->si->ops = &si_applet_ops; + cs->ops = &cs_app_applet_ops; cs->data_cb = NULL; } else { - cs->si->ops = &si_embedded_ops; + cs->ops = &cs_app_embedded_ops; cs->data_cb = NULL; } return 0; @@ -300,7 +301,7 @@ void cs_detach_endp(struct conn_stream *cs) */ cs->flags &= CS_FL_ISBACK; if (cs->si) - cs->si->ops = &si_embedded_ops; + cs->ops = &cs_app_embedded_ops; cs->data_cb = NULL; if (cs->app == NULL) diff --git a/src/stream_interface.c b/src/stream_interface.c index b119cff82..7b9481267 100644 --- a/src/stream_interface.c +++ b/src/stream_interface.c @@ -23,6 +23,8 @@ #include #include #include +#include +#include #include #include #include @@ -40,23 +42,23 @@ DECLARE_POOL(pool_head_streaminterface, "stream_interface", sizeof(struct stream_interface)); -/* functions used by default on a detached stream-interface */ -static void stream_int_shutr(struct stream_interface *si); -static void stream_int_shutw(struct stream_interface *si); -static void stream_int_chk_rcv(struct stream_interface *si); -static void stream_int_chk_snd(struct stream_interface *si); +/* functions used by default on a detached conn-stream */ +static void cs_app_shutr(struct conn_stream *cs); +static void cs_app_shutw(struct conn_stream *cs); +static void cs_app_chk_rcv(struct conn_stream *cs); +static void cs_app_chk_snd(struct conn_stream *cs); -/* functions used on a conn_stream-based stream-interface */ -static void stream_int_shutr_conn(struct stream_interface *si); -static void stream_int_shutw_conn(struct stream_interface *si); -static void stream_int_chk_rcv_conn(struct stream_interface *si); -static void stream_int_chk_snd_conn(struct stream_interface *si); +/* functions used on a mux-based conn-stream */ +static void cs_app_shutr_conn(struct conn_stream *cs); +static void cs_app_shutw_conn(struct conn_stream *cs); +static void cs_app_chk_rcv_conn(struct conn_stream *cs); +static void cs_app_chk_snd_conn(struct conn_stream *cs); -/* functions used on an applet-based stream-interface */ -static void stream_int_shutr_applet(struct stream_interface *si); -static void stream_int_shutw_applet(struct stream_interface *si); -static void stream_int_chk_rcv_applet(struct stream_interface *si); -static void stream_int_chk_snd_applet(struct stream_interface *si); +/* functions used on an applet-based conn-stream */ +static void cs_app_shutr_applet(struct conn_stream *cs); +static void cs_app_shutw_applet(struct conn_stream *cs); +static void cs_app_chk_rcv_applet(struct conn_stream *cs); +static void cs_app_chk_snd_applet(struct conn_stream *cs); /* last read notification */ static void stream_int_read0(struct stream_interface *si); @@ -64,31 +66,31 @@ static void stream_int_read0(struct stream_interface *si); /* post-IO notification callback */ static void stream_int_notify(struct stream_interface *si); -/* stream-interface operations for embedded tasks */ -struct si_ops si_embedded_ops = { - .chk_rcv = stream_int_chk_rcv, - .chk_snd = stream_int_chk_snd, - .shutr = stream_int_shutr, - .shutw = stream_int_shutw, + +/* conn-stream operations for connections */ +struct cs_app_ops cs_app_conn_ops = { + .chk_rcv = cs_app_chk_rcv_conn, + .chk_snd = cs_app_chk_snd_conn, + .shutr = cs_app_shutr_conn, + .shutw = cs_app_shutw_conn, }; -/* stream-interface operations for connections */ -struct si_ops si_conn_ops = { - .chk_rcv = stream_int_chk_rcv_conn, - .chk_snd = stream_int_chk_snd_conn, - .shutr = stream_int_shutr_conn, - .shutw = stream_int_shutw_conn, +/* conn-stream operations for embedded tasks */ +struct cs_app_ops cs_app_embedded_ops = { + .chk_rcv = cs_app_chk_rcv, + .chk_snd = cs_app_chk_snd, + .shutr = cs_app_shutr, + .shutw = cs_app_shutw, }; -/* stream-interface operations for connections */ -struct si_ops si_applet_ops = { - .chk_rcv = stream_int_chk_rcv_applet, - .chk_snd = stream_int_chk_snd_applet, - .shutr = stream_int_shutr_applet, - .shutw = stream_int_shutw_applet, +/* conn-stream operations for connections */ +struct cs_app_ops cs_app_applet_ops = { + .chk_rcv = cs_app_chk_rcv_applet, + .chk_snd = cs_app_chk_snd_applet, + .shutr = cs_app_shutr_applet, + .shutw = cs_app_shutw_applet, }; - /* Functions used to communicate with a conn_stream. The first two may be used * directly, the last one is mostly a wake callback. */ @@ -127,65 +129,66 @@ void si_free(struct stream_interface *si) pool_free(pool_head_streaminterface, si); } + /* - * This function performs a shutdown-read on a detached stream interface in a + * This function performs a shutdown-read on a detached conn-stream in a * connected or init state (it does nothing for other states). It either shuts * the read side or marks itself as closed. The buffer flags are updated to * reflect the new state. If the stream interface has CS_FL_NOHALF, we also * forward the close to the write side. The owner task is woken up if it exists. */ -static void stream_int_shutr(struct stream_interface *si) +static void cs_app_shutr(struct conn_stream *cs) { - struct channel *ic = si_ic(si); + struct channel *ic = cs_ic(cs); - si_rx_shut_blk(si); + si_rx_shut_blk(cs->si); if (ic->flags & CF_SHUTR) return; ic->flags |= CF_SHUTR; ic->rex = TICK_ETERNITY; - if (!cs_state_in(si->cs->state, CS_SB_CON|CS_SB_RDY|CS_SB_EST)) + if (!cs_state_in(cs->state, CS_SB_CON|CS_SB_RDY|CS_SB_EST)) return; - if (si_oc(si)->flags & CF_SHUTW) { - si->cs->state = CS_ST_DIS; - __cs_strm(si->cs)->conn_exp = TICK_ETERNITY; + if (cs_oc(cs)->flags & CF_SHUTW) { + cs->state = CS_ST_DIS; + __cs_strm(cs)->conn_exp = TICK_ETERNITY; } - else if (si->cs->flags & CS_FL_NOHALF) { + else if (cs->flags & CS_FL_NOHALF) { /* we want to immediately forward this close to the write side */ - return stream_int_shutw(si); + return cs_app_shutw(cs); } /* note that if the task exists, it must unregister itself once it runs */ - if (!(si->cs->flags & CS_FL_DONT_WAKE)) - task_wakeup(si_task(si), TASK_WOKEN_IO); + if (!(cs->flags & CS_FL_DONT_WAKE)) + task_wakeup(cs_strm_task(cs), TASK_WOKEN_IO); } /* - * This function performs a shutdown-write on a detached stream interface in a + * This function performs a shutdown-write on a detached conn-stream in a * connected or init state (it does nothing for other states). It either shuts * the write side or marks itself as closed. The buffer flags are updated to * reflect the new state. It does also close everything if the SI was marked as * being in error state. The owner task is woken up if it exists. */ -static void stream_int_shutw(struct stream_interface *si) +static void cs_app_shutw(struct conn_stream *cs) { - struct channel *ic = si_ic(si); - struct channel *oc = si_oc(si); + struct channel *ic = cs_ic(cs); + struct channel *oc = cs_oc(cs); oc->flags &= ~CF_SHUTW_NOW; if (oc->flags & CF_SHUTW) return; oc->flags |= CF_SHUTW; oc->wex = TICK_ETERNITY; - si_done_get(si); + si_done_get(cs->si); - if (tick_isset(si->cs->hcto)) { - ic->rto = si->cs->hcto; + if (tick_isset(cs->hcto)) { + ic->rto = cs->hcto; ic->rex = tick_add(now_ms, ic->rto); } - switch (si->cs->state) { + switch (cs->state) { case CS_ST_RDY: case CS_ST_EST: /* we have to shut before closing, otherwise some short messages @@ -194,7 +197,7 @@ static void stream_int_shutw(struct stream_interface *si) * However, if CS_FL_NOLINGER is explicitly set, we know there is * no risk so we close both sides immediately. */ - if (!(si->cs->endp->flags & CS_EP_ERROR) && !(si->cs->flags & CS_FL_NOLINGER) && + if (!(cs->endp->flags & CS_EP_ERROR) && !(cs->flags & CS_FL_NOLINGER) && !(ic->flags & (CF_SHUTR|CF_DONT_READ))) return; @@ -204,66 +207,66 @@ static void stream_int_shutw(struct stream_interface *si) case CS_ST_QUE: case CS_ST_TAR: /* Note that none of these states may happen with applets */ - si->cs->state = CS_ST_DIS; + cs->state = CS_ST_DIS; /* fall through */ default: - si->cs->flags &= ~CS_FL_NOLINGER; - si_rx_shut_blk(si); + cs->flags &= ~CS_FL_NOLINGER; + si_rx_shut_blk(cs->si); ic->flags |= CF_SHUTR; ic->rex = TICK_ETERNITY; - __cs_strm(si->cs)->conn_exp = TICK_ETERNITY; + __cs_strm(cs)->conn_exp = TICK_ETERNITY; } /* note that if the task exists, it must unregister itself once it runs */ - if (!(si->cs->flags & CS_FL_DONT_WAKE)) - task_wakeup(si_task(si), TASK_WOKEN_IO); + if (!(cs->flags & CS_FL_DONT_WAKE)) + task_wakeup(cs_strm_task(cs), TASK_WOKEN_IO); } /* default chk_rcv function for scheduled tasks */ -static void stream_int_chk_rcv(struct stream_interface *si) +static void cs_app_chk_rcv(struct conn_stream *cs) { - struct channel *ic = si_ic(si); + struct channel *ic = cs_ic(cs); - DPRINTF(stderr, "%s: si=%p, cs->state=%d ic->flags=%08x oc->flags=%08x\n", + DPRINTF(stderr, "%s: cs=%p, cs->state=%d ic->flags=%08x oc->flags=%08x\n", __FUNCTION__, - si, si->cs->state, ic->flags, si_oc(si)->flags); + cs, cs->state, ic->flags, cs_oc(cs)->flags); if (ic->pipe) { /* stop reading */ - si_rx_room_blk(si); + si_rx_room_blk(cs->si); } else { /* (re)start reading */ - if (!(si->cs->flags & CS_FL_DONT_WAKE)) - task_wakeup(si_task(si), TASK_WOKEN_IO); + if (!(cs->flags & CS_FL_DONT_WAKE)) + task_wakeup(cs_strm_task(cs), TASK_WOKEN_IO); } } /* default chk_snd function for scheduled tasks */ -static void stream_int_chk_snd(struct stream_interface *si) +static void cs_app_chk_snd(struct conn_stream *cs) { - struct channel *oc = si_oc(si); + struct channel *oc = cs_oc(cs); - DPRINTF(stderr, "%s: si=%p, cs->state=%d ic->flags=%08x oc->flags=%08x\n", + DPRINTF(stderr, "%s: cs=%p, cs->state=%d ic->flags=%08x oc->flags=%08x\n", __FUNCTION__, - si, si->cs->state, si_ic(si)->flags, oc->flags); + cs, cs->state, cs_ic(cs)->flags, oc->flags); - if (unlikely(si->cs->state != CS_ST_EST || (oc->flags & CF_SHUTW))) + if (unlikely(cs->state != CS_ST_EST || (oc->flags & CF_SHUTW))) return; - if (!(si->flags & SI_FL_WAIT_DATA) || /* not waiting for data */ - channel_is_empty(oc)) /* called with nothing to send ! */ + if (!(cs->si->flags & SI_FL_WAIT_DATA) || /* not waiting for data */ + channel_is_empty(oc)) /* called with nothing to send ! */ return; /* Otherwise there are remaining data to be sent in the buffer, * so we tell the handler. */ - si->flags &= ~SI_FL_WAIT_DATA; + cs->si->flags &= ~SI_FL_WAIT_DATA; if (!tick_isset(oc->wex)) oc->wex = tick_add_ifset(now_ms, oc->wto); - if (!(si->cs->flags & CS_FL_DONT_WAKE)) - task_wakeup(si_task(si), TASK_WOKEN_IO); + if (!(cs->flags & CS_FL_DONT_WAKE)) + task_wakeup(cs_strm_task(cs), TASK_WOKEN_IO); } /* Register an applet to handle a stream_interface as a new appctx. The SI will @@ -980,7 +983,7 @@ void si_update_both(struct stream_interface *si_f, struct stream_interface *si_b } /* - * This function performs a shutdown-read on a stream interface attached to + * This function performs a shutdown-read on a conn-stream attached to * a connection in a connected or init state (it does nothing for other * states). It either shuts the read side or marks itself as closed. The buffer * flags are updated to reflect the new state. If the stream interface has @@ -989,14 +992,13 @@ void si_update_both(struct stream_interface *si_f, struct stream_interface *si_b * descriptors are then shutdown or closed accordingly. The function * automatically disables polling if needed. */ -static void stream_int_shutr_conn(struct stream_interface *si) +static void cs_app_shutr_conn(struct conn_stream *cs) { - struct conn_stream *cs = si->cs; - struct channel *ic = si_ic(si); + struct channel *ic = cs_ic(cs); BUG_ON(!cs_conn(cs)); - si_rx_shut_blk(si); + si_rx_shut_blk(cs->si); if (ic->flags & CF_SHUTR) return; ic->flags |= CF_SHUTR; @@ -1005,30 +1007,29 @@ static void stream_int_shutr_conn(struct stream_interface *si) if (!cs_state_in(cs->state, CS_SB_CON|CS_SB_RDY|CS_SB_EST)) return; - if (si_oc(si)->flags & CF_SHUTW) { + if (cs_oc(cs)->flags & CF_SHUTW) { cs_conn_close(cs); cs->state = CS_ST_DIS; __cs_strm(cs)->conn_exp = TICK_ETERNITY; } - else if (si->cs->flags & CS_FL_NOHALF) { + else if (cs->flags & CS_FL_NOHALF) { /* we want to immediately forward this close to the write side */ - return stream_int_shutw_conn(si); + return cs_app_shutw_conn(cs); } } /* - * This function performs a shutdown-write on a stream interface attached to + * This function performs a shutdown-write on a conn-stream attached to * a connection in a connected or init state (it does nothing for other * states). It either shuts the write side or marks itself as closed. The * buffer flags are updated to reflect the new state. It does also close * everything if the SI was marked as being in error state. If there is a * data-layer shutdown, it is called. */ -static void stream_int_shutw_conn(struct stream_interface *si) +static void cs_app_shutw_conn(struct conn_stream *cs) { - struct conn_stream *cs = si->cs; - struct channel *ic = si_ic(si); - struct channel *oc = si_oc(si); + struct channel *ic = cs_ic(cs); + struct channel *oc = cs_oc(cs); BUG_ON(!cs_conn(cs)); @@ -1037,10 +1038,10 @@ static void stream_int_shutw_conn(struct stream_interface *si) return; oc->flags |= CF_SHUTW; oc->wex = TICK_ETERNITY; - si_done_get(si); + si_done_get(cs->si); - if (tick_isset(si->cs->hcto)) { - ic->rto = si->cs->hcto; + if (tick_isset(cs->hcto)) { + ic->rto = cs->hcto; ic->rex = tick_add(now_ms, ic->rto); } @@ -1092,36 +1093,37 @@ static void stream_int_shutw_conn(struct stream_interface *si) /* fall through */ default: cs->flags &= ~CS_FL_NOLINGER; - si_rx_shut_blk(si); + si_rx_shut_blk(cs->si); ic->flags |= CF_SHUTR; ic->rex = TICK_ETERNITY; __cs_strm(cs)->conn_exp = TICK_ETERNITY; } } -/* This function is used for inter-stream-interface calls. It is called by the +/* This function is used for inter-conn-stream calls. It is called by the * consumer to inform the producer side that it may be interested in checking * for free space in the buffer. Note that it intentionally does not update * timeouts, so that we can still check them later at wake-up. This function is * dedicated to connection-based stream interfaces. */ -static void stream_int_chk_rcv_conn(struct stream_interface *si) +static void cs_app_chk_rcv_conn(struct conn_stream *cs) { + BUG_ON(!cs_conn(cs)); + /* (re)start reading */ - if (cs_state_in(si->cs->state, CS_SB_CON|CS_SB_RDY|CS_SB_EST)) - tasklet_wakeup(si->cs->wait_event.tasklet); + if (cs_state_in(cs->state, CS_SB_CON|CS_SB_RDY|CS_SB_EST)) + tasklet_wakeup(cs->wait_event.tasklet); } -/* This function is used for inter-stream-interface calls. It is called by the +/* This function is used for inter-conn-stream calls. It is called by the * producer to inform the consumer side that it may be interested in checking * for data in the buffer. Note that it intentionally does not update timeouts, * so that we can still check them later at wake-up. */ -static void stream_int_chk_snd_conn(struct stream_interface *si) +static void cs_app_chk_snd_conn(struct conn_stream *cs) { - struct channel *oc = si_oc(si); - struct conn_stream *cs = si->cs; + struct channel *oc = cs_oc(cs); BUG_ON(!cs_conn(cs)); @@ -1133,13 +1135,13 @@ static void stream_int_chk_snd_conn(struct stream_interface *si) return; if (!oc->pipe && /* spliced data wants to be forwarded ASAP */ - !(si->flags & SI_FL_WAIT_DATA)) /* not waiting for data */ + !(cs->si->flags & SI_FL_WAIT_DATA)) /* not waiting for data */ return; - if (!(si->cs->wait_event.events & SUB_RETRY_SEND) && !channel_is_empty(si_oc(si))) + if (!(cs->wait_event.events & SUB_RETRY_SEND) && !channel_is_empty(cs_oc(cs))) si_cs_send(cs); - if (cs->endp->flags & (CS_EP_ERROR|CS_EP_ERR_PENDING) || si_is_conn_error(si)) { + if (cs->endp->flags & (CS_EP_ERROR|CS_EP_ERR_PENDING) || si_is_conn_error(cs->si)) { /* Write error on the file descriptor */ if (cs->state >= CS_ST_CON) cs->endp->flags |= CS_EP_ERROR; @@ -1626,21 +1628,22 @@ void si_applet_wake_cb(struct stream_interface *si) appctx_wakeup(__cs_appctx(si->cs)); } + /* - * This function performs a shutdown-read on a stream interface attached to an + * This function performs a shutdown-read on a conn-stream attached to an * applet in a connected or init state (it does nothing for other states). It * either shuts the read side or marks itself as closed. The buffer flags are * updated to reflect the new state. If the stream interface has CS_FL_NOHALF, * we also forward the close to the write side. The owner task is woken up if * it exists. */ -static void stream_int_shutr_applet(struct stream_interface *si) +static void cs_app_shutr_applet(struct conn_stream *cs) { - struct channel *ic = si_ic(si); + struct channel *ic = cs_ic(cs); - BUG_ON(!cs_appctx(si->cs)); + BUG_ON(!cs_appctx(cs)); - si_rx_shut_blk(si); + si_rx_shut_blk(cs->si); if (ic->flags & CF_SHUTR) return; ic->flags |= CF_SHUTR; @@ -1648,50 +1651,50 @@ static void stream_int_shutr_applet(struct stream_interface *si) /* Note: on shutr, we don't call the applet */ - if (!cs_state_in(si->cs->state, CS_SB_CON|CS_SB_RDY|CS_SB_EST)) + if (!cs_state_in(cs->state, CS_SB_CON|CS_SB_RDY|CS_SB_EST)) return; - if (si_oc(si)->flags & CF_SHUTW) { - si_applet_release(si); - si->cs->state = CS_ST_DIS; - __cs_strm(si->cs)->conn_exp = TICK_ETERNITY; + if (cs_oc(cs)->flags & CF_SHUTW) { + si_applet_release(cs->si); + cs->state = CS_ST_DIS; + __cs_strm(cs)->conn_exp = TICK_ETERNITY; } - else if (si->cs->flags & CS_FL_NOHALF) { + else if (cs->flags & CS_FL_NOHALF) { /* we want to immediately forward this close to the write side */ - return stream_int_shutw_applet(si); + return cs_app_shutw_applet(cs); } } /* - * This function performs a shutdown-write on a stream interface attached to an + * This function performs a shutdown-write on a conn-stream attached to an * applet in a connected or init state (it does nothing for other states). It * either shuts the write side or marks itself as closed. The buffer flags are * updated to reflect the new state. It does also close everything if the SI * was marked as being in error state. The owner task is woken up if it exists. */ -static void stream_int_shutw_applet(struct stream_interface *si) +static void cs_app_shutw_applet(struct conn_stream *cs) { - struct channel *ic = si_ic(si); - struct channel *oc = si_oc(si); + struct channel *ic = cs_ic(cs); + struct channel *oc = cs_oc(cs); - BUG_ON(!cs_appctx(si->cs)); + BUG_ON(!cs_appctx(cs)); oc->flags &= ~CF_SHUTW_NOW; if (oc->flags & CF_SHUTW) return; oc->flags |= CF_SHUTW; oc->wex = TICK_ETERNITY; - si_done_get(si); + si_done_get(cs->si); - if (tick_isset(si->cs->hcto)) { - ic->rto = si->cs->hcto; + if (tick_isset(cs->hcto)) { + ic->rto = cs->hcto; ic->rex = tick_add(now_ms, ic->rto); } /* on shutw we always wake the applet up */ - appctx_wakeup(__cs_appctx(si->cs)); + appctx_wakeup(__cs_appctx(cs)); - switch (si->cs->state) { + switch (cs->state) { case CS_ST_RDY: case CS_ST_EST: /* we have to shut before closing, otherwise some short messages @@ -1700,7 +1703,7 @@ static void stream_int_shutw_applet(struct stream_interface *si) * However, if CS_FL_NOLINGER is explicitly set, we know there is * no risk so we close both sides immediately. */ - if (!(si->cs->endp->flags & CS_EP_ERROR) && !(si->cs->flags & CS_FL_NOLINGER) && + if (!(cs->endp->flags & CS_EP_ERROR) && !(cs->flags & CS_FL_NOLINGER) && !(ic->flags & (CF_SHUTR|CF_DONT_READ))) return; @@ -1710,52 +1713,52 @@ static void stream_int_shutw_applet(struct stream_interface *si) case CS_ST_QUE: case CS_ST_TAR: /* Note that none of these states may happen with applets */ - si_applet_release(si); - si->cs->state = CS_ST_DIS; + si_applet_release(cs->si); + cs->state = CS_ST_DIS; /* fall through */ default: - si->cs->flags &= ~CS_FL_NOLINGER; - si_rx_shut_blk(si); + cs->flags &= ~CS_FL_NOLINGER; + si_rx_shut_blk(cs->si); ic->flags |= CF_SHUTR; ic->rex = TICK_ETERNITY; - __cs_strm(si->cs)->conn_exp = TICK_ETERNITY; + __cs_strm(cs)->conn_exp = TICK_ETERNITY; } } /* chk_rcv function for applets */ -static void stream_int_chk_rcv_applet(struct stream_interface *si) +static void cs_app_chk_rcv_applet(struct conn_stream *cs) { - struct channel *ic = si_ic(si); + struct channel *ic = cs_ic(cs); - BUG_ON(!cs_appctx(si->cs)); + BUG_ON(!cs_appctx(cs)); - DPRINTF(stderr, "%s: si=%p, cs->state=%d ic->flags=%08x oc->flags=%08x\n", + DPRINTF(stderr, "%s: cs=%p, cs->state=%d ic->flags=%08x oc->flags=%08x\n", __FUNCTION__, - si, si->cs->state, ic->flags, si_oc(si)->flags); + cs, cs->state, ic->flags, cs_oc(cs)->flags); if (!ic->pipe) { /* (re)start reading */ - appctx_wakeup(__cs_appctx(si->cs)); + appctx_wakeup(__cs_appctx(cs)); } } /* chk_snd function for applets */ -static void stream_int_chk_snd_applet(struct stream_interface *si) +static void cs_app_chk_snd_applet(struct conn_stream *cs) { - struct channel *oc = si_oc(si); + struct channel *oc = cs_oc(cs); - BUG_ON(!cs_appctx(si->cs)); + BUG_ON(!cs_appctx(cs)); - DPRINTF(stderr, "%s: si=%p, cs->state=%d ic->flags=%08x oc->flags=%08x\n", + DPRINTF(stderr, "%s: cs=%p, cs->state=%d ic->flags=%08x oc->flags=%08x\n", __FUNCTION__, - si, si->cs->state, si_ic(si)->flags, oc->flags); + cs, cs->state, cs_ic(cs)->flags, oc->flags); - if (unlikely(si->cs->state != CS_ST_EST || (oc->flags & CF_SHUTW))) + if (unlikely(cs->state != CS_ST_EST || (oc->flags & CF_SHUTW))) return; /* we only wake the applet up if it was waiting for some data */ - if (!(si->flags & SI_FL_WAIT_DATA)) + if (!(cs->si->flags & SI_FL_WAIT_DATA)) return; if (!tick_isset(oc->wex)) @@ -1763,7 +1766,7 @@ static void stream_int_chk_snd_applet(struct stream_interface *si) if (!channel_is_empty(oc)) { /* (re)start sending */ - appctx_wakeup(__cs_appctx(si->cs)); + appctx_wakeup(__cs_appctx(cs)); } }