diff --git a/include/proto/stream.h b/include/proto/stream.h index 5ff22916a..44fc8bea5 100644 --- a/include/proto/stream.h +++ b/include/proto/stream.h @@ -35,7 +35,7 @@ extern struct list streams; extern struct data_cb sess_conn_cb; -struct stream *stream_new(struct session *sess, struct task *t, enum obj_type *origin); +struct stream *stream_new(struct session *sess, enum obj_type *origin); /* perform minimal intializations, report 0 in case of error, 1 if OK. */ int init_stream(); diff --git a/src/flt_spoe.c b/src/flt_spoe.c index 1a8bd2c0f..47aef5762 100644 --- a/src/flt_spoe.c +++ b/src/flt_spoe.c @@ -1901,7 +1901,6 @@ spoe_create_appctx(struct spoe_config *conf) { struct appctx *appctx; struct session *sess; - struct task *task; struct stream *strm; if ((appctx = appctx_new(&spoe_applet)) == NULL) @@ -1937,12 +1936,9 @@ spoe_create_appctx(struct spoe_config *conf) if (!sess) goto out_free_spoe; - if ((task = task_new()) == NULL) + if ((strm = stream_new(sess, &appctx->obj_type)) == NULL) goto out_free_sess; - if ((strm = stream_new(sess, task, &appctx->obj_type)) == NULL) - goto out_free_task; - stream_set_backend(strm, conf->agent->b.be); /* applet is waiting for data */ @@ -1960,12 +1956,10 @@ spoe_create_appctx(struct spoe_config *conf) LIST_ADDQ(&conf->agent->applets, &SPOE_APPCTX(appctx)->list); conf->agent->applets_act++; - task_wakeup(task, TASK_WOKEN_INIT); + task_wakeup(strm->task, TASK_WOKEN_INIT); return appctx; /* Error unrolling */ - out_free_task: - task_free(task); out_free_sess: session_free(sess); out_free_spoe: diff --git a/src/hlua.c b/src/hlua.c index 0f82425de..594d880ed 100644 --- a/src/hlua.c +++ b/src/hlua.c @@ -2297,7 +2297,6 @@ __LJMP static int hlua_socket_new(lua_State *L) struct appctx *appctx; struct session *sess; struct stream *strm; - struct task *task; /* Check stack size. */ if (!lua_checkstack(L, 3)) { @@ -2341,14 +2340,7 @@ __LJMP static int hlua_socket_new(lua_State *L) goto out_fail_sess; } - task = task_new(); - if (!task) { - hlua_pusherror(L, "socket: out of memory"); - goto out_fail_task; - } - task->nice = 0; - - strm = stream_new(sess, task, &appctx->obj_type); + strm = stream_new(sess, &appctx->obj_type); if (!strm) { hlua_pusherror(L, "socket: out of memory"); goto out_fail_stream; @@ -2372,13 +2364,11 @@ __LJMP static int hlua_socket_new(lua_State *L) jobs++; totalconn++; - task_wakeup(task, TASK_WOKEN_INIT); + task_wakeup(strm->task, TASK_WOKEN_INIT); /* Return yield waiting for connection. */ return 1; out_fail_stream: - task_free(task); - out_fail_task: session_free(sess); out_fail_sess: appctx_free(appctx); diff --git a/src/peers.c b/src/peers.c index 249edf7a3..d03e72fef 100644 --- a/src/peers.c +++ b/src/peers.c @@ -1784,7 +1784,6 @@ static struct appctx *peer_session_create(struct peers *peers, struct peer *peer struct appctx *appctx; struct session *sess; struct stream *s; - struct task *t; struct connection *conn; peer->reconnect = tick_add(now_ms, MS_TO_TICKS(5000)); @@ -1804,15 +1803,9 @@ static struct appctx *peer_session_create(struct peers *peers, struct peer *peer goto out_free_appctx; } - if ((t = task_new()) == NULL) { - Alert("out of memory in peer_session_create().\n"); - goto out_free_sess; - } - t->nice = l->nice; - - if ((s = stream_new(sess, t, &appctx->obj_type)) == NULL) { + if ((s = stream_new(sess, &appctx->obj_type)) == NULL) { Alert("Failed to initialize stream in peer_session_create().\n"); - goto out_free_task; + goto out_free_sess; } /* The tasks below are normally what is supposed to be done by @@ -1851,7 +1844,7 @@ static struct appctx *peer_session_create(struct peers *peers, struct peer *peer totalconn++; peer->appctx = appctx; - task_wakeup(t, TASK_WOKEN_INIT); + task_wakeup(s->task, TASK_WOKEN_INIT); return appctx; /* Error unrolling */ @@ -1859,8 +1852,6 @@ static struct appctx *peer_session_create(struct peers *peers, struct peer *peer LIST_DEL(&s->by_sess); LIST_DEL(&s->list); pool_free2(pool2_stream, s); - out_free_task: - task_free(t); out_free_sess: session_free(sess); out_free_appctx: diff --git a/src/session.c b/src/session.c index cc2a0b87a..ea4e020a6 100644 --- a/src/session.c +++ b/src/session.c @@ -109,7 +109,7 @@ int session_accept_fd(struct listener *l, int cfd, struct sockaddr_storage *addr struct connection *cli_conn; struct proxy *p = l->bind_conf->frontend; struct session *sess; - struct task *t; + struct stream *strm; int ret; @@ -222,12 +222,6 @@ int session_accept_fd(struct listener *l, int cfd, struct sockaddr_storage *addr if (global.tune.client_rcvbuf) setsockopt(cfd, SOL_SOCKET, SO_RCVBUF, &global.tune.client_rcvbuf, sizeof(global.tune.client_rcvbuf)); - if (unlikely((t = task_new()) == NULL)) - goto out_free_sess; - - t->context = sess; - t->nice = l->nice; - /* OK, now either we have a pending handshake to execute with and * then we must return to the I/O layer, or we can proceed with the * end of the stream initialization. In case of handshake, we also @@ -241,10 +235,18 @@ int session_accept_fd(struct listener *l, int cfd, struct sockaddr_storage *addr * conn -- owner ---> task */ if (cli_conn->flags & CO_FL_HANDSHAKE) { + struct task *t; + + if (unlikely((t = task_new()) == NULL)) + goto out_free_sess; + conn_set_owner(cli_conn, t); conn_set_xprt_done_cb(cli_conn, conn_complete_session); + + t->context = sess; + t->nice = l->nice; t->process = session_expire_embryonic; - t->expire = tick_add_ifset(now_ms, p->timeout.client); + t->expire = tick_add_ifset(now_ms, p->timeout.client); task_queue(t); return 1; } @@ -261,14 +263,12 @@ int session_accept_fd(struct listener *l, int cfd, struct sockaddr_storage *addr goto out_free_sess; session_count_new(sess); - if (!stream_new(sess, t, &cli_conn->obj_type)) - goto out_free_task; + if ((strm = stream_new(sess, &cli_conn->obj_type)) == NULL) + goto out_free_sess; - task_wakeup(t, TASK_WOKEN_INIT); + task_wakeup(strm->task, TASK_WOKEN_INIT); return 1; - out_free_task: - task_free(t); out_free_sess: p->feconn--; session_free(sess); @@ -412,6 +412,7 @@ static int conn_complete_session(struct connection *conn) { struct task *task = conn->owner; struct session *sess = task->context; + struct stream *strm; conn_clear_xprt_done_cb(conn); @@ -430,11 +431,14 @@ static int conn_complete_session(struct connection *conn) goto fail; session_count_new(sess); - task->process = sess->listener->handler; - if (!stream_new(sess, task, &conn->obj_type)) + if ((strm = stream_new(sess, &conn->obj_type)) == NULL) goto fail; - task_wakeup(task, TASK_WOKEN_INIT); + task_wakeup(strm->task, TASK_WOKEN_INIT); + + /* the embryonic session's task is not needed anymore */ + task_delete(task); + task_free(task); return 0; fail: diff --git a/src/stream.c b/src/stream.c index 6f7a1be64..8527c297f 100644 --- a/src/stream.c +++ b/src/stream.c @@ -67,20 +67,22 @@ static struct list service_keywords = LIST_HEAD_INIT(service_keywords); /* This function is called from the session handler which detects the end of * handshake, in order to complete initialization of a valid stream. It must be - * called with a session (which may be embryonic). It returns the pointer to + * called with a completley initialized session. It returns the pointer to * the newly created stream, or NULL in case of fatal error. The client-facing - * end point is assigned to , which must be valid. The task's context - * is set to the new stream, and its function is set to process_stream(). - * Target and analysers are null. + * end point is assigned to , which must be valid. The stream's task + * is configured with a nice value inherited from the listener's nice if any. + * The task's context is set to the new stream, and its function is set to + * process_stream(). Target and analysers are null. */ -struct stream *stream_new(struct session *sess, struct task *t, enum obj_type *origin) +struct stream *stream_new(struct session *sess, enum obj_type *origin) { struct stream *s; + struct task *t; struct connection *conn = objt_conn(origin); struct appctx *appctx = objt_appctx(origin); if (unlikely((s = pool_alloc2(pool2_stream)) == NULL)) - return s; + goto out_fail_alloc; /* minimum stream initialization required for an embryonic stream is * fairly low. We need very little to execute L4 ACLs, then we need a @@ -145,11 +147,16 @@ struct stream *stream_new(struct session *sess, struct task *t, enum obj_type *o s->flags |= SF_INITIALIZED; s->unique_id = NULL; + if ((t = task_new()) == NULL) + goto out_fail_alloc; + s->task = t; s->pending_events = 0; t->process = process_stream; t->context = s; t->expire = TICK_ETERNITY; + if (sess->listener) + t->nice = sess->listener->nice; /* Note: initially, the stream's backend points to the frontend. * This changes later when switching rules are executed or @@ -250,6 +257,8 @@ struct stream *stream_new(struct session *sess, struct task *t, enum obj_type *o /* Error unrolling */ out_fail_accept: flt_stream_release(s, 0); + task_free(t); + out_fail_alloc: LIST_DEL(&s->by_sess); LIST_DEL(&s->list); pool_free2(pool2_stream, s);