MAJOR: applet: applet scheduler rework.
In order to authorize call of appctx_wakeup on running task: - from within the task handler itself. - in futur, from another thread. The appctx is considered paused as default after running the handler. The handler should explicitly call appctx_wakeup to be re-called. When the appctx_free is called on a running handler. The real free is postponed at the end of the handler process.
This commit is contained in:
parent
57ec32fb99
commit
c730606879
@ -48,6 +48,7 @@ static inline void appctx_init(struct appctx *appctx)
|
||||
{
|
||||
appctx->st0 = appctx->st1 = appctx->st2 = 0;
|
||||
appctx->io_release = NULL;
|
||||
appctx->state = APPLET_SLEEPING;
|
||||
}
|
||||
|
||||
/* Tries to allocate a new appctx and initialize its main fields. The appctx
|
||||
@ -76,7 +77,7 @@ static inline struct appctx *appctx_new(struct applet *applet)
|
||||
/* Releases an appctx previously allocated by appctx_new(). Note that
|
||||
* we share the connection pool.
|
||||
*/
|
||||
static inline void appctx_free(struct appctx *appctx)
|
||||
static inline void __appctx_free(struct appctx *appctx)
|
||||
{
|
||||
if (!LIST_ISEMPTY(&appctx->runq)) {
|
||||
LIST_DEL(&appctx->runq);
|
||||
@ -89,9 +90,17 @@ static inline void appctx_free(struct appctx *appctx)
|
||||
pool_free2(pool2_connection, appctx);
|
||||
nb_applets--;
|
||||
}
|
||||
static inline void appctx_free(struct appctx *appctx)
|
||||
{
|
||||
if (appctx->state & APPLET_RUNNING) {
|
||||
appctx->state |= APPLET_WANT_DIE;
|
||||
return;
|
||||
}
|
||||
__appctx_free(appctx);
|
||||
}
|
||||
|
||||
/* wakes up an applet when conditions have changed */
|
||||
static inline void appctx_wakeup(struct appctx *appctx)
|
||||
static inline void __appctx_wakeup(struct appctx *appctx)
|
||||
{
|
||||
if (LIST_ISEMPTY(&appctx->runq)) {
|
||||
LIST_ADDQ(&applet_active_queue, &appctx->runq);
|
||||
@ -99,25 +108,34 @@ static inline void appctx_wakeup(struct appctx *appctx)
|
||||
}
|
||||
}
|
||||
|
||||
/* removes an applet from the list of active applets */
|
||||
static inline void appctx_pause(struct appctx *appctx)
|
||||
static inline void appctx_wakeup(struct appctx *appctx)
|
||||
{
|
||||
if (!LIST_ISEMPTY(&appctx->runq)) {
|
||||
LIST_DEL(&appctx->runq);
|
||||
LIST_INIT(&appctx->runq);
|
||||
applets_active_queue--;
|
||||
if (appctx->state & APPLET_RUNNING) {
|
||||
appctx->state |= APPLET_WOKEN_UP;
|
||||
return;
|
||||
}
|
||||
__appctx_wakeup(appctx);
|
||||
}
|
||||
|
||||
/* Callback used to wake up an applet when a buffer is available. The applet
|
||||
* <appctx> is woken up is if it is not already in the list of "active"
|
||||
* applets. This functions returns 1 is the stream is woken up, otherwise it
|
||||
* returns 0. */
|
||||
* returns 0. If task is running we request we check if woken was already
|
||||
* requested */
|
||||
static inline int appctx_res_wakeup(struct appctx *appctx)
|
||||
{
|
||||
if (!LIST_ISEMPTY(&appctx->runq))
|
||||
if (appctx->state & APPLET_RUNNING) {
|
||||
if (appctx->state & APPLET_WOKEN_UP) {
|
||||
return 0;
|
||||
}
|
||||
appctx->state |= APPLET_WOKEN_UP;
|
||||
return 1;
|
||||
}
|
||||
|
||||
if (!LIST_ISEMPTY(&appctx->runq)) {
|
||||
return 0;
|
||||
appctx_wakeup(appctx);
|
||||
}
|
||||
__appctx_wakeup(appctx);
|
||||
return 1;
|
||||
}
|
||||
|
||||
|
@ -44,11 +44,17 @@ struct applet {
|
||||
unsigned int timeout; /* execution timeout. */
|
||||
};
|
||||
|
||||
#define APPLET_SLEEPING 0x00 /* applet is currently sleeping or pending in active queue */
|
||||
#define APPLET_RUNNING 0x01 /* applet is currently running */
|
||||
#define APPLET_WOKEN_UP 0x02 /* applet was running and requested to woken up again */
|
||||
#define APPLET_WANT_DIE 0x04 /* applet was running and requested to die */
|
||||
|
||||
/* Context of a running applet. */
|
||||
struct appctx {
|
||||
struct list runq; /* chaining in the applet run queue */
|
||||
enum obj_type obj_type; /* OBJ_TYPE_APPCTX */
|
||||
/* 3 unused bytes here */
|
||||
unsigned short state; /* Internal appctx state */
|
||||
unsigned int st0; /* CLI state for stats, session state for peers */
|
||||
unsigned int st1; /* prompt for stats, session error for peers */
|
||||
unsigned int st2; /* output state for stats, unused by peers */
|
||||
@ -59,6 +65,7 @@ struct appctx {
|
||||
void (*io_release)(struct appctx *appctx); /* used within the cli_io_handler when st0 = CLI_ST_CALLBACK,
|
||||
if the command is terminated or the session released */
|
||||
struct buffer_wait buffer_wait; /* position in the list of objects waiting for a buffer */
|
||||
unsigned long process_mask; /* mask of thread IDs authorized to process the applet */
|
||||
|
||||
union {
|
||||
struct {
|
||||
|
37
src/applet.c
37
src/applet.c
@ -24,22 +24,22 @@ unsigned int nb_applets = 0;
|
||||
unsigned int applets_active_queue = 0;
|
||||
|
||||
struct list applet_active_queue = LIST_HEAD_INIT(applet_active_queue);
|
||||
struct list applet_cur_queue = LIST_HEAD_INIT(applet_cur_queue);
|
||||
|
||||
void applet_run_active()
|
||||
{
|
||||
struct appctx *curr;
|
||||
struct appctx *curr, *next;
|
||||
struct stream_interface *si;
|
||||
struct list applet_cur_queue = LIST_HEAD_INIT(applet_cur_queue);
|
||||
|
||||
if (LIST_ISEMPTY(&applet_active_queue))
|
||||
return;
|
||||
|
||||
/* move active queue to run queue */
|
||||
applet_active_queue.n->p = &applet_cur_queue;
|
||||
applet_active_queue.p->n = &applet_cur_queue;
|
||||
|
||||
applet_cur_queue = applet_active_queue;
|
||||
LIST_INIT(&applet_active_queue);
|
||||
curr = LIST_NEXT(&applet_active_queue, typeof(curr), runq);
|
||||
while (&curr->runq != &applet_active_queue) {
|
||||
next = LIST_NEXT(&curr->runq, typeof(next), runq);
|
||||
LIST_DEL(&curr->runq);
|
||||
curr->state = APPLET_RUNNING;
|
||||
LIST_ADDQ(&applet_cur_queue, &curr->runq);
|
||||
applets_active_queue--;
|
||||
curr = next;
|
||||
}
|
||||
|
||||
/* The list is only scanned from the head. This guarantees that if any
|
||||
* applet removes another one, there is no side effect while walking
|
||||
@ -70,7 +70,20 @@ void applet_run_active()
|
||||
if (applet_cur_queue.n == &curr->runq) {
|
||||
/* curr was left in the list, move it back to the active list */
|
||||
LIST_DEL(&curr->runq);
|
||||
LIST_ADDQ(&applet_active_queue, &curr->runq);
|
||||
LIST_INIT(&curr->runq);
|
||||
if (curr->state & APPLET_WANT_DIE) {
|
||||
curr->state = APPLET_SLEEPING;
|
||||
__appctx_free(curr);
|
||||
}
|
||||
else {
|
||||
if (curr->state & APPLET_WOKEN_UP) {
|
||||
curr->state = APPLET_SLEEPING;
|
||||
__appctx_wakeup(curr);
|
||||
}
|
||||
else {
|
||||
curr->state = APPLET_SLEEPING;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -1061,7 +1061,6 @@ enum act_return process_use_service(struct act_rule *rule, struct proxy *px,
|
||||
/* Stops the applet sheduling, in case of the init function miss
|
||||
* some data.
|
||||
*/
|
||||
appctx_pause(appctx);
|
||||
si_applet_stop_get(&s->si[1]);
|
||||
|
||||
/* Call initialisation. */
|
||||
|
@ -1369,16 +1369,6 @@ void si_applet_wake_cb(struct stream_interface *si)
|
||||
|
||||
/* update the stream-int, channels, and possibly wake the stream up */
|
||||
stream_int_notify(si);
|
||||
|
||||
/* Get away from the active list if we can't work anymore.
|
||||
* We also do that if the main task has already scheduled, because it
|
||||
* saves a useless wakeup/pause/wakeup cycle causing one useless call
|
||||
* per session on average.
|
||||
*/
|
||||
if (task_in_rq(si_task(si)) ||
|
||||
(((si->flags & (SI_FL_WANT_PUT|SI_FL_WAIT_ROOM)) != SI_FL_WANT_PUT) &&
|
||||
((si->flags & (SI_FL_WANT_GET|SI_FL_WAIT_DATA)) != SI_FL_WANT_GET)))
|
||||
appctx_pause(si_appctx(si));
|
||||
}
|
||||
|
||||
|
||||
@ -1393,8 +1383,6 @@ void stream_int_update_applet(struct stream_interface *si)
|
||||
if (((si->flags & (SI_FL_WANT_PUT|SI_FL_WAIT_ROOM)) == SI_FL_WANT_PUT) ||
|
||||
((si->flags & (SI_FL_WANT_GET|SI_FL_WAIT_DATA)) == SI_FL_WANT_GET))
|
||||
appctx_wakeup(si_appctx(si));
|
||||
else
|
||||
appctx_pause(si_appctx(si));
|
||||
}
|
||||
|
||||
/*
|
||||
|
Loading…
Reference in New Issue
Block a user