MEDIUM: sink: Refactor sink forwarder appctx creation
A .init callback function is defined for the sink_forward_applet applet. This function finishes the appctx startup by calling appctx_finalize_startup() and its handles the stream customization.
This commit is contained in:
parent
2ae25ea24b
commit
aee57fc277
81
src/sink.c
81
src/sink.c
@ -594,6 +594,43 @@ void __sink_forward_session_deinit(struct sink_forward_target *sft)
|
|||||||
task_wakeup(sink->forward_task, TASK_WOKEN_MSG);
|
task_wakeup(sink->forward_task, TASK_WOKEN_MSG);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
static int sink_forward_session_init(struct appctx *appctx)
|
||||||
|
{
|
||||||
|
struct sink_forward_target *sft = appctx->svcctx;
|
||||||
|
struct stream *s;
|
||||||
|
struct sockaddr_storage *addr = NULL;
|
||||||
|
|
||||||
|
if (!sockaddr_alloc(&addr, &sft->srv->addr, sizeof(sft->srv->addr)))
|
||||||
|
goto out_error;
|
||||||
|
|
||||||
|
if (appctx_finalize_startup(appctx, sft->sink->forward_px, &BUF_NULL) == -1)
|
||||||
|
goto out_free_addr;
|
||||||
|
|
||||||
|
s = appctx_strm(appctx);
|
||||||
|
s->csb->dst = addr;
|
||||||
|
s->csb->flags |= CS_FL_NOLINGER;
|
||||||
|
|
||||||
|
s->target = &sft->srv->obj_type;
|
||||||
|
s->flags = SF_ASSIGNED;
|
||||||
|
|
||||||
|
s->do_log = NULL;
|
||||||
|
s->uniq_id = 0;
|
||||||
|
|
||||||
|
s->res.flags |= CF_READ_DONTWAIT;
|
||||||
|
/* for rto and rex to eternity to not expire on idle recv:
|
||||||
|
* We are using a syslog server.
|
||||||
|
*/
|
||||||
|
s->res.rto = TICK_ETERNITY;
|
||||||
|
s->res.rex = TICK_ETERNITY;
|
||||||
|
sft->appctx = appctx;
|
||||||
|
|
||||||
|
return 0;
|
||||||
|
|
||||||
|
out_free_addr:
|
||||||
|
sockaddr_free(&addr);
|
||||||
|
out_error:
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
|
||||||
static void sink_forward_session_release(struct appctx *appctx)
|
static void sink_forward_session_release(struct appctx *appctx)
|
||||||
{
|
{
|
||||||
@ -612,6 +649,7 @@ static struct applet sink_forward_applet = {
|
|||||||
.obj_type = OBJ_TYPE_APPLET,
|
.obj_type = OBJ_TYPE_APPLET,
|
||||||
.name = "<SINKFWD>", /* used for logging */
|
.name = "<SINKFWD>", /* used for logging */
|
||||||
.fct = sink_forward_io_handler,
|
.fct = sink_forward_io_handler,
|
||||||
|
.init = sink_forward_session_init,
|
||||||
.release = sink_forward_session_release,
|
.release = sink_forward_session_release,
|
||||||
};
|
};
|
||||||
|
|
||||||
@ -619,6 +657,7 @@ static struct applet sink_forward_oc_applet = {
|
|||||||
.obj_type = OBJ_TYPE_APPLET,
|
.obj_type = OBJ_TYPE_APPLET,
|
||||||
.name = "<SINKFWDOC>", /* used for logging */
|
.name = "<SINKFWDOC>", /* used for logging */
|
||||||
.fct = sink_forward_oc_io_handler,
|
.fct = sink_forward_oc_io_handler,
|
||||||
|
.init = sink_forward_session_init,
|
||||||
.release = sink_forward_session_release,
|
.release = sink_forward_session_release,
|
||||||
};
|
};
|
||||||
|
|
||||||
@ -628,13 +667,8 @@ static struct applet sink_forward_oc_applet = {
|
|||||||
*/
|
*/
|
||||||
static struct appctx *sink_forward_session_create(struct sink *sink, struct sink_forward_target *sft)
|
static struct appctx *sink_forward_session_create(struct sink *sink, struct sink_forward_target *sft)
|
||||||
{
|
{
|
||||||
struct proxy *p = sink->forward_px;
|
|
||||||
struct appctx *appctx;
|
struct appctx *appctx;
|
||||||
struct session *sess;
|
|
||||||
struct conn_stream *cs;
|
|
||||||
struct stream *s;
|
|
||||||
struct applet *applet = &sink_forward_applet;
|
struct applet *applet = &sink_forward_applet;
|
||||||
struct sockaddr_storage *addr = NULL;
|
|
||||||
|
|
||||||
if (sft->srv->log_proto == SRV_LOG_PROTO_OCTET_COUNTING)
|
if (sft->srv->log_proto == SRV_LOG_PROTO_OCTET_COUNTING)
|
||||||
applet = &sink_forward_oc_applet;
|
applet = &sink_forward_oc_applet;
|
||||||
@ -642,49 +676,16 @@ static struct appctx *sink_forward_session_create(struct sink *sink, struct sink
|
|||||||
appctx = appctx_new(applet, NULL);
|
appctx = appctx_new(applet, NULL);
|
||||||
if (!appctx)
|
if (!appctx)
|
||||||
goto out_close;
|
goto out_close;
|
||||||
|
|
||||||
appctx->svcctx = (void *)sft;
|
appctx->svcctx = (void *)sft;
|
||||||
|
|
||||||
sess = session_new(p, NULL, &appctx->obj_type);
|
if (appctx_init(appctx) == -1)
|
||||||
if (!sess) {
|
|
||||||
ha_alert("out of memory in sink_forward_session_create().\n");
|
|
||||||
goto out_free_appctx;
|
|
||||||
}
|
|
||||||
appctx->sess = sess;
|
|
||||||
|
|
||||||
if (!sockaddr_alloc(&addr, &sft->srv->addr, sizeof(sft->srv->addr)))
|
|
||||||
goto out_free_appctx;
|
goto out_free_appctx;
|
||||||
|
|
||||||
cs = cs_new_from_endp(appctx->endp, sess, &BUF_NULL);
|
|
||||||
if (!cs) {
|
|
||||||
ha_alert("Failed to initialize stream in sink_forward_session_create().\n");
|
|
||||||
goto out_free_appctx;
|
|
||||||
}
|
|
||||||
s = DISGUISE(cs_strm(cs));
|
|
||||||
|
|
||||||
s->csb->dst = addr;
|
|
||||||
s->csb->flags |= CS_FL_NOLINGER;
|
|
||||||
|
|
||||||
s->target = &sft->srv->obj_type;
|
|
||||||
s->flags = SF_ASSIGNED;
|
|
||||||
|
|
||||||
s->do_log = NULL;
|
|
||||||
s->uniq_id = 0;
|
|
||||||
|
|
||||||
s->res.flags |= CF_READ_DONTWAIT;
|
|
||||||
/* for rto and rex to eternity to not expire on idle recv:
|
|
||||||
* We are using a syslog server.
|
|
||||||
*/
|
|
||||||
s->res.rto = TICK_ETERNITY;
|
|
||||||
s->res.rex = TICK_ETERNITY;
|
|
||||||
sft->appctx = appctx;
|
|
||||||
return appctx;
|
return appctx;
|
||||||
|
|
||||||
/* Error unrolling */
|
/* Error unrolling */
|
||||||
out_free_addr:
|
|
||||||
sockaddr_free(&addr);
|
|
||||||
out_free_appctx:
|
out_free_appctx:
|
||||||
appctx_free(appctx);
|
appctx_free_on_early_error(appctx);
|
||||||
out_close:
|
out_close:
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
|
Loading…
x
Reference in New Issue
Block a user