MEDIUM: stream: make stream_new() allocate its own task
Currently a task is allocated in session_new() and serves two purposes : - either the handshake is complete and it is offered to the stream via the second arg of stream_new() - or the handshake is not complete and it's diverted to be used as a timeout handler for the embryonic session and repurposed once we land into conn_complete_session() Furthermore, the task's process() function was taken from the listener's handler in conn_complete_session() prior to being replaced by a call to stream_new(). This will become a serious mess with the mux. Since it's impossible to have a stream without a task, this patch removes the second arg from stream_new() and make this function allocate its own task. In session_accept_fd(), we now only allocate the task if needed for the embryonic session and delete it later.
This commit is contained in:
parent
8e3c6ce75a
commit
87787acf72
@ -35,7 +35,7 @@ extern struct list streams;
|
|||||||
|
|
||||||
extern struct data_cb sess_conn_cb;
|
extern struct data_cb sess_conn_cb;
|
||||||
|
|
||||||
struct stream *stream_new(struct session *sess, struct task *t, enum obj_type *origin);
|
struct stream *stream_new(struct session *sess, enum obj_type *origin);
|
||||||
|
|
||||||
/* perform minimal intializations, report 0 in case of error, 1 if OK. */
|
/* perform minimal intializations, report 0 in case of error, 1 if OK. */
|
||||||
int init_stream();
|
int init_stream();
|
||||||
|
@ -1901,7 +1901,6 @@ spoe_create_appctx(struct spoe_config *conf)
|
|||||||
{
|
{
|
||||||
struct appctx *appctx;
|
struct appctx *appctx;
|
||||||
struct session *sess;
|
struct session *sess;
|
||||||
struct task *task;
|
|
||||||
struct stream *strm;
|
struct stream *strm;
|
||||||
|
|
||||||
if ((appctx = appctx_new(&spoe_applet)) == NULL)
|
if ((appctx = appctx_new(&spoe_applet)) == NULL)
|
||||||
@ -1937,12 +1936,9 @@ spoe_create_appctx(struct spoe_config *conf)
|
|||||||
if (!sess)
|
if (!sess)
|
||||||
goto out_free_spoe;
|
goto out_free_spoe;
|
||||||
|
|
||||||
if ((task = task_new()) == NULL)
|
if ((strm = stream_new(sess, &appctx->obj_type)) == NULL)
|
||||||
goto out_free_sess;
|
goto out_free_sess;
|
||||||
|
|
||||||
if ((strm = stream_new(sess, task, &appctx->obj_type)) == NULL)
|
|
||||||
goto out_free_task;
|
|
||||||
|
|
||||||
stream_set_backend(strm, conf->agent->b.be);
|
stream_set_backend(strm, conf->agent->b.be);
|
||||||
|
|
||||||
/* applet is waiting for data */
|
/* applet is waiting for data */
|
||||||
@ -1960,12 +1956,10 @@ spoe_create_appctx(struct spoe_config *conf)
|
|||||||
LIST_ADDQ(&conf->agent->applets, &SPOE_APPCTX(appctx)->list);
|
LIST_ADDQ(&conf->agent->applets, &SPOE_APPCTX(appctx)->list);
|
||||||
conf->agent->applets_act++;
|
conf->agent->applets_act++;
|
||||||
|
|
||||||
task_wakeup(task, TASK_WOKEN_INIT);
|
task_wakeup(strm->task, TASK_WOKEN_INIT);
|
||||||
return appctx;
|
return appctx;
|
||||||
|
|
||||||
/* Error unrolling */
|
/* Error unrolling */
|
||||||
out_free_task:
|
|
||||||
task_free(task);
|
|
||||||
out_free_sess:
|
out_free_sess:
|
||||||
session_free(sess);
|
session_free(sess);
|
||||||
out_free_spoe:
|
out_free_spoe:
|
||||||
|
14
src/hlua.c
14
src/hlua.c
@ -2297,7 +2297,6 @@ __LJMP static int hlua_socket_new(lua_State *L)
|
|||||||
struct appctx *appctx;
|
struct appctx *appctx;
|
||||||
struct session *sess;
|
struct session *sess;
|
||||||
struct stream *strm;
|
struct stream *strm;
|
||||||
struct task *task;
|
|
||||||
|
|
||||||
/* Check stack size. */
|
/* Check stack size. */
|
||||||
if (!lua_checkstack(L, 3)) {
|
if (!lua_checkstack(L, 3)) {
|
||||||
@ -2341,14 +2340,7 @@ __LJMP static int hlua_socket_new(lua_State *L)
|
|||||||
goto out_fail_sess;
|
goto out_fail_sess;
|
||||||
}
|
}
|
||||||
|
|
||||||
task = task_new();
|
strm = stream_new(sess, &appctx->obj_type);
|
||||||
if (!task) {
|
|
||||||
hlua_pusherror(L, "socket: out of memory");
|
|
||||||
goto out_fail_task;
|
|
||||||
}
|
|
||||||
task->nice = 0;
|
|
||||||
|
|
||||||
strm = stream_new(sess, task, &appctx->obj_type);
|
|
||||||
if (!strm) {
|
if (!strm) {
|
||||||
hlua_pusherror(L, "socket: out of memory");
|
hlua_pusherror(L, "socket: out of memory");
|
||||||
goto out_fail_stream;
|
goto out_fail_stream;
|
||||||
@ -2372,13 +2364,11 @@ __LJMP static int hlua_socket_new(lua_State *L)
|
|||||||
jobs++;
|
jobs++;
|
||||||
totalconn++;
|
totalconn++;
|
||||||
|
|
||||||
task_wakeup(task, TASK_WOKEN_INIT);
|
task_wakeup(strm->task, TASK_WOKEN_INIT);
|
||||||
/* Return yield waiting for connection. */
|
/* Return yield waiting for connection. */
|
||||||
return 1;
|
return 1;
|
||||||
|
|
||||||
out_fail_stream:
|
out_fail_stream:
|
||||||
task_free(task);
|
|
||||||
out_fail_task:
|
|
||||||
session_free(sess);
|
session_free(sess);
|
||||||
out_fail_sess:
|
out_fail_sess:
|
||||||
appctx_free(appctx);
|
appctx_free(appctx);
|
||||||
|
15
src/peers.c
15
src/peers.c
@ -1784,7 +1784,6 @@ static struct appctx *peer_session_create(struct peers *peers, struct peer *peer
|
|||||||
struct appctx *appctx;
|
struct appctx *appctx;
|
||||||
struct session *sess;
|
struct session *sess;
|
||||||
struct stream *s;
|
struct stream *s;
|
||||||
struct task *t;
|
|
||||||
struct connection *conn;
|
struct connection *conn;
|
||||||
|
|
||||||
peer->reconnect = tick_add(now_ms, MS_TO_TICKS(5000));
|
peer->reconnect = tick_add(now_ms, MS_TO_TICKS(5000));
|
||||||
@ -1804,15 +1803,9 @@ static struct appctx *peer_session_create(struct peers *peers, struct peer *peer
|
|||||||
goto out_free_appctx;
|
goto out_free_appctx;
|
||||||
}
|
}
|
||||||
|
|
||||||
if ((t = task_new()) == NULL) {
|
if ((s = stream_new(sess, &appctx->obj_type)) == NULL) {
|
||||||
Alert("out of memory in peer_session_create().\n");
|
|
||||||
goto out_free_sess;
|
|
||||||
}
|
|
||||||
t->nice = l->nice;
|
|
||||||
|
|
||||||
if ((s = stream_new(sess, t, &appctx->obj_type)) == NULL) {
|
|
||||||
Alert("Failed to initialize stream in peer_session_create().\n");
|
Alert("Failed to initialize stream in peer_session_create().\n");
|
||||||
goto out_free_task;
|
goto out_free_sess;
|
||||||
}
|
}
|
||||||
|
|
||||||
/* The tasks below are normally what is supposed to be done by
|
/* The tasks below are normally what is supposed to be done by
|
||||||
@ -1851,7 +1844,7 @@ static struct appctx *peer_session_create(struct peers *peers, struct peer *peer
|
|||||||
totalconn++;
|
totalconn++;
|
||||||
|
|
||||||
peer->appctx = appctx;
|
peer->appctx = appctx;
|
||||||
task_wakeup(t, TASK_WOKEN_INIT);
|
task_wakeup(s->task, TASK_WOKEN_INIT);
|
||||||
return appctx;
|
return appctx;
|
||||||
|
|
||||||
/* Error unrolling */
|
/* Error unrolling */
|
||||||
@ -1859,8 +1852,6 @@ static struct appctx *peer_session_create(struct peers *peers, struct peer *peer
|
|||||||
LIST_DEL(&s->by_sess);
|
LIST_DEL(&s->by_sess);
|
||||||
LIST_DEL(&s->list);
|
LIST_DEL(&s->list);
|
||||||
pool_free2(pool2_stream, s);
|
pool_free2(pool2_stream, s);
|
||||||
out_free_task:
|
|
||||||
task_free(t);
|
|
||||||
out_free_sess:
|
out_free_sess:
|
||||||
session_free(sess);
|
session_free(sess);
|
||||||
out_free_appctx:
|
out_free_appctx:
|
||||||
|
@ -109,7 +109,7 @@ int session_accept_fd(struct listener *l, int cfd, struct sockaddr_storage *addr
|
|||||||
struct connection *cli_conn;
|
struct connection *cli_conn;
|
||||||
struct proxy *p = l->bind_conf->frontend;
|
struct proxy *p = l->bind_conf->frontend;
|
||||||
struct session *sess;
|
struct session *sess;
|
||||||
struct task *t;
|
struct stream *strm;
|
||||||
int ret;
|
int ret;
|
||||||
|
|
||||||
|
|
||||||
@ -222,12 +222,6 @@ int session_accept_fd(struct listener *l, int cfd, struct sockaddr_storage *addr
|
|||||||
if (global.tune.client_rcvbuf)
|
if (global.tune.client_rcvbuf)
|
||||||
setsockopt(cfd, SOL_SOCKET, SO_RCVBUF, &global.tune.client_rcvbuf, sizeof(global.tune.client_rcvbuf));
|
setsockopt(cfd, SOL_SOCKET, SO_RCVBUF, &global.tune.client_rcvbuf, sizeof(global.tune.client_rcvbuf));
|
||||||
|
|
||||||
if (unlikely((t = task_new()) == NULL))
|
|
||||||
goto out_free_sess;
|
|
||||||
|
|
||||||
t->context = sess;
|
|
||||||
t->nice = l->nice;
|
|
||||||
|
|
||||||
/* OK, now either we have a pending handshake to execute with and
|
/* OK, now either we have a pending handshake to execute with and
|
||||||
* then we must return to the I/O layer, or we can proceed with the
|
* then we must return to the I/O layer, or we can proceed with the
|
||||||
* end of the stream initialization. In case of handshake, we also
|
* end of the stream initialization. In case of handshake, we also
|
||||||
@ -241,10 +235,18 @@ int session_accept_fd(struct listener *l, int cfd, struct sockaddr_storage *addr
|
|||||||
* conn -- owner ---> task
|
* conn -- owner ---> task
|
||||||
*/
|
*/
|
||||||
if (cli_conn->flags & CO_FL_HANDSHAKE) {
|
if (cli_conn->flags & CO_FL_HANDSHAKE) {
|
||||||
|
struct task *t;
|
||||||
|
|
||||||
|
if (unlikely((t = task_new()) == NULL))
|
||||||
|
goto out_free_sess;
|
||||||
|
|
||||||
conn_set_owner(cli_conn, t);
|
conn_set_owner(cli_conn, t);
|
||||||
conn_set_xprt_done_cb(cli_conn, conn_complete_session);
|
conn_set_xprt_done_cb(cli_conn, conn_complete_session);
|
||||||
|
|
||||||
|
t->context = sess;
|
||||||
|
t->nice = l->nice;
|
||||||
t->process = session_expire_embryonic;
|
t->process = session_expire_embryonic;
|
||||||
t->expire = tick_add_ifset(now_ms, p->timeout.client);
|
t->expire = tick_add_ifset(now_ms, p->timeout.client);
|
||||||
task_queue(t);
|
task_queue(t);
|
||||||
return 1;
|
return 1;
|
||||||
}
|
}
|
||||||
@ -261,14 +263,12 @@ int session_accept_fd(struct listener *l, int cfd, struct sockaddr_storage *addr
|
|||||||
goto out_free_sess;
|
goto out_free_sess;
|
||||||
|
|
||||||
session_count_new(sess);
|
session_count_new(sess);
|
||||||
if (!stream_new(sess, t, &cli_conn->obj_type))
|
if ((strm = stream_new(sess, &cli_conn->obj_type)) == NULL)
|
||||||
goto out_free_task;
|
goto out_free_sess;
|
||||||
|
|
||||||
task_wakeup(t, TASK_WOKEN_INIT);
|
task_wakeup(strm->task, TASK_WOKEN_INIT);
|
||||||
return 1;
|
return 1;
|
||||||
|
|
||||||
out_free_task:
|
|
||||||
task_free(t);
|
|
||||||
out_free_sess:
|
out_free_sess:
|
||||||
p->feconn--;
|
p->feconn--;
|
||||||
session_free(sess);
|
session_free(sess);
|
||||||
@ -412,6 +412,7 @@ static int conn_complete_session(struct connection *conn)
|
|||||||
{
|
{
|
||||||
struct task *task = conn->owner;
|
struct task *task = conn->owner;
|
||||||
struct session *sess = task->context;
|
struct session *sess = task->context;
|
||||||
|
struct stream *strm;
|
||||||
|
|
||||||
conn_clear_xprt_done_cb(conn);
|
conn_clear_xprt_done_cb(conn);
|
||||||
|
|
||||||
@ -430,11 +431,14 @@ static int conn_complete_session(struct connection *conn)
|
|||||||
goto fail;
|
goto fail;
|
||||||
|
|
||||||
session_count_new(sess);
|
session_count_new(sess);
|
||||||
task->process = sess->listener->handler;
|
if ((strm = stream_new(sess, &conn->obj_type)) == NULL)
|
||||||
if (!stream_new(sess, task, &conn->obj_type))
|
|
||||||
goto fail;
|
goto fail;
|
||||||
|
|
||||||
task_wakeup(task, TASK_WOKEN_INIT);
|
task_wakeup(strm->task, TASK_WOKEN_INIT);
|
||||||
|
|
||||||
|
/* the embryonic session's task is not needed anymore */
|
||||||
|
task_delete(task);
|
||||||
|
task_free(task);
|
||||||
return 0;
|
return 0;
|
||||||
|
|
||||||
fail:
|
fail:
|
||||||
|
21
src/stream.c
21
src/stream.c
@ -67,20 +67,22 @@ static struct list service_keywords = LIST_HEAD_INIT(service_keywords);
|
|||||||
|
|
||||||
/* This function is called from the session handler which detects the end of
|
/* This function is called from the session handler which detects the end of
|
||||||
* handshake, in order to complete initialization of a valid stream. It must be
|
* handshake, in order to complete initialization of a valid stream. It must be
|
||||||
* called with a session (which may be embryonic). It returns the pointer to
|
* called with a completley initialized session. It returns the pointer to
|
||||||
* the newly created stream, or NULL in case of fatal error. The client-facing
|
* the newly created stream, or NULL in case of fatal error. The client-facing
|
||||||
* end point is assigned to <origin>, which must be valid. The task's context
|
* end point is assigned to <origin>, which must be valid. The stream's task
|
||||||
* is set to the new stream, and its function is set to process_stream().
|
* is configured with a nice value inherited from the listener's nice if any.
|
||||||
* Target and analysers are null.
|
* The task's context is set to the new stream, and its function is set to
|
||||||
|
* process_stream(). Target and analysers are null.
|
||||||
*/
|
*/
|
||||||
struct stream *stream_new(struct session *sess, struct task *t, enum obj_type *origin)
|
struct stream *stream_new(struct session *sess, enum obj_type *origin)
|
||||||
{
|
{
|
||||||
struct stream *s;
|
struct stream *s;
|
||||||
|
struct task *t;
|
||||||
struct connection *conn = objt_conn(origin);
|
struct connection *conn = objt_conn(origin);
|
||||||
struct appctx *appctx = objt_appctx(origin);
|
struct appctx *appctx = objt_appctx(origin);
|
||||||
|
|
||||||
if (unlikely((s = pool_alloc2(pool2_stream)) == NULL))
|
if (unlikely((s = pool_alloc2(pool2_stream)) == NULL))
|
||||||
return s;
|
goto out_fail_alloc;
|
||||||
|
|
||||||
/* minimum stream initialization required for an embryonic stream is
|
/* minimum stream initialization required for an embryonic stream is
|
||||||
* fairly low. We need very little to execute L4 ACLs, then we need a
|
* fairly low. We need very little to execute L4 ACLs, then we need a
|
||||||
@ -145,11 +147,16 @@ struct stream *stream_new(struct session *sess, struct task *t, enum obj_type *o
|
|||||||
s->flags |= SF_INITIALIZED;
|
s->flags |= SF_INITIALIZED;
|
||||||
s->unique_id = NULL;
|
s->unique_id = NULL;
|
||||||
|
|
||||||
|
if ((t = task_new()) == NULL)
|
||||||
|
goto out_fail_alloc;
|
||||||
|
|
||||||
s->task = t;
|
s->task = t;
|
||||||
s->pending_events = 0;
|
s->pending_events = 0;
|
||||||
t->process = process_stream;
|
t->process = process_stream;
|
||||||
t->context = s;
|
t->context = s;
|
||||||
t->expire = TICK_ETERNITY;
|
t->expire = TICK_ETERNITY;
|
||||||
|
if (sess->listener)
|
||||||
|
t->nice = sess->listener->nice;
|
||||||
|
|
||||||
/* Note: initially, the stream's backend points to the frontend.
|
/* Note: initially, the stream's backend points to the frontend.
|
||||||
* This changes later when switching rules are executed or
|
* This changes later when switching rules are executed or
|
||||||
@ -250,6 +257,8 @@ struct stream *stream_new(struct session *sess, struct task *t, enum obj_type *o
|
|||||||
/* Error unrolling */
|
/* Error unrolling */
|
||||||
out_fail_accept:
|
out_fail_accept:
|
||||||
flt_stream_release(s, 0);
|
flt_stream_release(s, 0);
|
||||||
|
task_free(t);
|
||||||
|
out_fail_alloc:
|
||||||
LIST_DEL(&s->by_sess);
|
LIST_DEL(&s->by_sess);
|
||||||
LIST_DEL(&s->list);
|
LIST_DEL(&s->list);
|
||||||
pool_free2(pool2_stream, s);
|
pool_free2(pool2_stream, s);
|
||||||
|
Loading…
x
Reference in New Issue
Block a user