[MINOR] stream_interface: add functions to support running as internal/external tasks
It will soon be necessary to have stream interfaces running as part of the current task, or as independant tasks. For instance when we want to implement compression or SSL. It will also be used for applets running as stream interfaces. These new functions are used to perform exactly that. Note that it's still not easy to write a simple echo applet and more functions will likely be needed.
This commit is contained in:
parent
b029f8cd7d
commit
fb90d94d7a
@ -2,7 +2,7 @@
|
||||
include/proto/stream_interface.h
|
||||
This file contains stream_interface function prototypes
|
||||
|
||||
Copyright (C) 2000-2008 Willy Tarreau - w@1wt.eu
|
||||
Copyright (C) 2000-2009 Willy Tarreau - w@1wt.eu
|
||||
|
||||
This library is free software; you can redistribute it and/or
|
||||
modify it under the terms of the GNU Lesser General Public
|
||||
@ -34,6 +34,20 @@ void stream_int_report_error(struct stream_interface *si);
|
||||
void stream_int_return(struct stream_interface *si, const struct chunk *msg);
|
||||
void stream_int_retnclose(struct stream_interface *si, const struct chunk *msg);
|
||||
|
||||
/* functions used when running a stream interface as a task */
|
||||
void stream_int_update(struct stream_interface *si);
|
||||
void stream_int_update_embedded(struct stream_interface *si);
|
||||
void stream_int_shutr(struct stream_interface *si);
|
||||
void stream_int_shutw(struct stream_interface *si);
|
||||
void stream_int_chk_rcv(struct stream_interface *si);
|
||||
void stream_int_chk_snd(struct stream_interface *si);
|
||||
|
||||
struct task *stream_int_register_handler(struct stream_interface *si,
|
||||
void (*fct)(struct stream_interface *));
|
||||
struct task *stream_int_register_handler_task(struct stream_interface *si,
|
||||
struct task *(*fct)(struct task *));
|
||||
void stream_int_unregister_handler(struct stream_interface *si);
|
||||
|
||||
#endif /* _PROTO_STREAM_INTERFACE_H */
|
||||
|
||||
/*
|
||||
|
@ -76,6 +76,10 @@ enum {
|
||||
struct server;
|
||||
struct proxy;
|
||||
|
||||
/* Note that if an iohandler is set, the update function will not be called by
|
||||
* the session handler, so it may be used to resync flags at the end of the I/O
|
||||
* handler. See stream_int_update_embedded() for reference.
|
||||
*/
|
||||
struct stream_interface {
|
||||
unsigned int state; /* SI_ST* */
|
||||
unsigned int prev_state;/* SI_ST*, copy of previous state */
|
||||
|
@ -1,7 +1,7 @@
|
||||
/*
|
||||
* Functions managing stream_interface structures
|
||||
*
|
||||
* Copyright 2000-2008 Willy Tarreau <w@1wt.eu>
|
||||
* Copyright 2000-2009 Willy Tarreau <w@1wt.eu>
|
||||
*
|
||||
* This program is free software; you can redistribute it and/or
|
||||
* modify it under the terms of the GNU General Public License
|
||||
@ -93,6 +93,255 @@ void stream_int_retnclose(struct stream_interface *si, const struct chunk *msg)
|
||||
buffer_auto_close(si->ob);
|
||||
}
|
||||
|
||||
/* default update function for scheduled tasks, not used for embedded tasks */
|
||||
void stream_int_update(struct stream_interface *si)
|
||||
{
|
||||
DPRINTF(stderr, "%s: si=%p, si->state=%d ib->flags=%08x ob->flags=%08x\n",
|
||||
__FUNCTION__,
|
||||
si, si->state, si->ib->flags, si->ob->flags);
|
||||
|
||||
if (!(si->flags & SI_FL_DONT_WAKE) && si->owner)
|
||||
task_wakeup(si->owner, TASK_WOKEN_IO);
|
||||
}
|
||||
|
||||
/* default update function for embedded tasks, to be used at the end of the i/o handler */
|
||||
void stream_int_update_embedded(struct stream_interface *si)
|
||||
{
|
||||
DPRINTF(stderr, "%s: si=%p, si->state=%d ib->flags=%08x ob->flags=%08x\n",
|
||||
__FUNCTION__,
|
||||
si, si->state, si->ib->flags, si->ob->flags);
|
||||
|
||||
if (si->state != SI_ST_EST)
|
||||
return;
|
||||
|
||||
if ((si->ob->flags & (BF_OUT_EMPTY|BF_SHUTW|BF_HIJACK|BF_SHUTW_NOW)) == (BF_OUT_EMPTY|BF_SHUTW_NOW))
|
||||
si->shutw(si);
|
||||
|
||||
if ((si->ob->flags & (BF_FULL|BF_SHUTW|BF_SHUTW_NOW|BF_HIJACK)) == 0)
|
||||
si->flags |= SI_FL_WAIT_DATA;
|
||||
|
||||
if ((si->ib->flags & (BF_FULL|BF_SHUTR)) == BF_FULL)
|
||||
si->flags |= SI_FL_WAIT_ROOM;
|
||||
|
||||
if (si->ob->flags & BF_WRITE_ACTIVITY || si->ib->flags & BF_READ_ACTIVITY) {
|
||||
if (tick_isset(si->ib->rex))
|
||||
si->ib->rex = tick_add_ifset(now_ms, si->ib->rto);
|
||||
if (tick_isset(si->ob->wex))
|
||||
si->ob->wex = tick_add_ifset(now_ms, si->ob->wto);
|
||||
}
|
||||
|
||||
if (si->ob->flags & BF_WRITE_PARTIAL)
|
||||
si->ob->prod->chk_rcv(si->ob->prod);
|
||||
|
||||
if (si->ib->flags & BF_READ_PARTIAL)
|
||||
si->ib->cons->chk_snd(si->ib->cons);
|
||||
|
||||
/* Note that we're trying to wake up in two conditions here :
|
||||
* - special event, which needs the holder task attention
|
||||
* - status indicating that the applet can go on working. This
|
||||
* is rather hard because we might be blocking on output and
|
||||
* don't want to wake up on input and vice-versa. The idea is
|
||||
* the to only rely the changes the chk_* might have performed.
|
||||
*/
|
||||
if (/* check stream interface changes */
|
||||
(si->flags & SI_FL_ERR) || si->state != SI_ST_EST || si->ib->cons->state != SI_ST_EST ||
|
||||
/* check response buffer changes */
|
||||
(si->ib->flags & (BF_READ_NULL|BF_READ_ERROR|BF_READ_DONTWAIT)) ||
|
||||
((si->ib->flags & BF_READ_ACTIVITY) && !si->ib->to_forward) ||
|
||||
(!(si->ib->flags & BF_FULL) && (si->ib->flags & BF_WRITE_ACTIVITY) && si->ib->to_forward) ||
|
||||
/* check request buffer changes */
|
||||
(si->ob->flags & (BF_WRITE_ERROR)) ||
|
||||
((si->ob->flags & BF_WRITE_ACTIVITY) && (si->ob->flags & BF_OUT_EMPTY) && !si->ob->to_forward) ||
|
||||
(si->ob->flags & BF_READ_ACTIVITY)) {
|
||||
if (!(si->flags & SI_FL_DONT_WAKE) && si->owner)
|
||||
task_wakeup(si->owner, TASK_WOKEN_IO);
|
||||
}
|
||||
}
|
||||
|
||||
/* default shutr function for scheduled tasks */
|
||||
void stream_int_shutr(struct stream_interface *si)
|
||||
{
|
||||
DPRINTF(stderr, "%s: si=%p, si->state=%d ib->flags=%08x ob->flags=%08x\n",
|
||||
__FUNCTION__,
|
||||
si, si->state, si->ib->flags, si->ob->flags);
|
||||
|
||||
si->ib->flags &= ~BF_SHUTR_NOW;
|
||||
if (si->ib->flags & BF_SHUTR)
|
||||
return;
|
||||
si->ib->flags |= BF_SHUTR;
|
||||
si->ib->rex = TICK_ETERNITY;
|
||||
si->flags &= ~SI_FL_WAIT_ROOM;
|
||||
|
||||
if (si->state != SI_ST_EST && si->state != SI_ST_CON)
|
||||
return;
|
||||
|
||||
if (si->ob->flags & BF_SHUTW) {
|
||||
si->state = SI_ST_DIS;
|
||||
si->exp = TICK_ETERNITY;
|
||||
}
|
||||
|
||||
/* note that if the task exist, it must unregister itself once it runs */
|
||||
if (!(si->flags & SI_FL_DONT_WAKE) && si->owner)
|
||||
task_wakeup(si->owner, TASK_WOKEN_IO);
|
||||
}
|
||||
|
||||
/* default shutw function for scheduled tasks */
|
||||
void stream_int_shutw(struct stream_interface *si)
|
||||
{
|
||||
DPRINTF(stderr, "%s: si=%p, si->state=%d ib->flags=%08x ob->flags=%08x\n",
|
||||
__FUNCTION__,
|
||||
si, si->state, si->ib->flags, si->ob->flags);
|
||||
|
||||
si->ob->flags &= ~BF_SHUTW_NOW;
|
||||
if (si->ob->flags & BF_SHUTW)
|
||||
return;
|
||||
si->ob->flags |= BF_SHUTW;
|
||||
si->ob->wex = TICK_ETERNITY;
|
||||
si->flags &= ~SI_FL_WAIT_DATA;
|
||||
|
||||
switch (si->state) {
|
||||
case SI_ST_EST:
|
||||
if (!(si->ib->flags & BF_SHUTR))
|
||||
break;
|
||||
|
||||
/* fall through */
|
||||
case SI_ST_CON:
|
||||
case SI_ST_CER:
|
||||
si->state = SI_ST_DIS;
|
||||
/* fall through */
|
||||
default:
|
||||
si->flags &= ~SI_FL_WAIT_ROOM;
|
||||
si->ib->flags |= BF_SHUTR;
|
||||
si->ib->rex = TICK_ETERNITY;
|
||||
si->exp = TICK_ETERNITY;
|
||||
}
|
||||
|
||||
/* note that if the task exist, it must unregister itself once it runs */
|
||||
if (!(si->flags & SI_FL_DONT_WAKE) && si->owner)
|
||||
task_wakeup(si->owner, TASK_WOKEN_IO);
|
||||
}
|
||||
|
||||
/* default chk_rcv function for scheduled tasks */
|
||||
void stream_int_chk_rcv(struct stream_interface *si)
|
||||
{
|
||||
struct buffer *ib = si->ib;
|
||||
|
||||
DPRINTF(stderr, "%s: si=%p, si->state=%d ib->flags=%08x ob->flags=%08x\n",
|
||||
__FUNCTION__,
|
||||
si, si->state, si->ib->flags, si->ob->flags);
|
||||
|
||||
if (unlikely(si->state != SI_ST_EST || (ib->flags & BF_SHUTR)))
|
||||
return;
|
||||
|
||||
if (ib->flags & (BF_FULL|BF_HIJACK)) {
|
||||
/* stop reading */
|
||||
if ((ib->flags & (BF_FULL|BF_HIJACK)) == BF_FULL)
|
||||
si->flags |= SI_FL_WAIT_ROOM;
|
||||
}
|
||||
else {
|
||||
/* (re)start reading */
|
||||
si->flags &= ~SI_FL_WAIT_ROOM;
|
||||
if (!(si->flags & SI_FL_DONT_WAKE) && si->owner)
|
||||
task_wakeup(si->owner, TASK_WOKEN_IO);
|
||||
}
|
||||
}
|
||||
|
||||
/* default chk_snd function for scheduled tasks */
|
||||
void stream_int_chk_snd(struct stream_interface *si)
|
||||
{
|
||||
struct buffer *ob = si->ob;
|
||||
|
||||
DPRINTF(stderr, "%s: si=%p, si->state=%d ib->flags=%08x ob->flags=%08x\n",
|
||||
__FUNCTION__,
|
||||
si, si->state, si->ib->flags, si->ob->flags);
|
||||
|
||||
if (unlikely(si->state != SI_ST_EST || (si->ob->flags & BF_SHUTW)))
|
||||
return;
|
||||
|
||||
if (!(si->flags & SI_FL_WAIT_DATA) || /* not waiting for data */
|
||||
(ob->flags & BF_OUT_EMPTY)) /* called with nothing to send ! */
|
||||
return;
|
||||
|
||||
/* Otherwise there are remaining data to be sent in the buffer,
|
||||
* so we tell the handler.
|
||||
*/
|
||||
si->flags &= ~SI_FL_WAIT_DATA;
|
||||
if (!tick_isset(ob->wex))
|
||||
ob->wex = tick_add_ifset(now_ms, ob->wto);
|
||||
|
||||
if (!(si->flags & SI_FL_DONT_WAKE) && si->owner)
|
||||
task_wakeup(si->owner, TASK_WOKEN_IO);
|
||||
}
|
||||
|
||||
/* Register a function to handle a stream_interface as part of the stream
|
||||
* interface's owner task, which is returned. The SI will wake it up everytime
|
||||
* it is solicited. The task's processing function must call the specified
|
||||
* function before returning. It must be deleted by the task handler using
|
||||
* stream_int_unregister_handler(), possibly from withing the function itself.
|
||||
*/
|
||||
struct task *stream_int_register_handler(struct stream_interface *si,
|
||||
void (*fct)(struct stream_interface *))
|
||||
{
|
||||
DPRINTF(stderr, "registering handler %p for si %p (was %p)\n", fct, si, si->owner);
|
||||
|
||||
si->update = stream_int_update_embedded;
|
||||
si->shutr = stream_int_shutr;
|
||||
si->shutw = stream_int_shutw;
|
||||
si->chk_rcv = stream_int_chk_rcv;
|
||||
si->chk_snd = stream_int_chk_snd;
|
||||
si->connect = NULL;
|
||||
si->iohandler = fct;
|
||||
si->flags |= SI_FL_WAIT_DATA;
|
||||
return si->owner;
|
||||
}
|
||||
|
||||
/* Register a function to handle a stream_interface as a standalone task. The
|
||||
* new task itself is returned and is assigned as si->owner. The stream_interface
|
||||
* pointer will be pointed to by the task's context. The handler can be detached
|
||||
* by using stream_int_unregister_handler().
|
||||
*/
|
||||
struct task *stream_int_register_handler_task(struct stream_interface *si,
|
||||
struct task *(*fct)(struct task *))
|
||||
{
|
||||
struct task *t;
|
||||
|
||||
DPRINTF(stderr, "registering handler %p for si %p (was %p)\n", fct, si, si->owner);
|
||||
|
||||
si->update = stream_int_update;
|
||||
si->shutr = stream_int_shutr;
|
||||
si->shutw = stream_int_shutw;
|
||||
si->chk_rcv = stream_int_chk_rcv;
|
||||
si->chk_snd = stream_int_chk_snd;
|
||||
si->connect = NULL;
|
||||
si->iohandler = NULL; /* not used when running as an external task */
|
||||
si->flags |= SI_FL_WAIT_DATA;
|
||||
|
||||
t = task_new();
|
||||
si->owner = t;
|
||||
if (!t)
|
||||
return t;
|
||||
t->process = fct;
|
||||
t->context = si;
|
||||
task_wakeup(si->owner, TASK_WOKEN_INIT);
|
||||
|
||||
return t;
|
||||
}
|
||||
|
||||
/* Unregister a stream interface handler. This must be called by the handler task
|
||||
* itself when it detects that it is in the SI_ST_DIS state. This function can
|
||||
* both detach standalone handlers and embedded handlers.
|
||||
*/
|
||||
void stream_int_unregister_handler(struct stream_interface *si)
|
||||
{
|
||||
if (!si->iohandler && si->owner) {
|
||||
/* external handler : kill the task */
|
||||
task_delete(si->owner);
|
||||
task_free(si->owner);
|
||||
}
|
||||
si->iohandler = NULL;
|
||||
si->owner = NULL;
|
||||
}
|
||||
|
||||
/*
|
||||
* Local variables:
|
||||
* c-indent-level: 8
|
||||
|
Loading…
x
Reference in New Issue
Block a user