sink_proxies_list = px;
}
-/*
- * IO Handler to handle message push to syslog tcp server.
- * It takes its context from appctx->svcctx.
- */
-static void sink_forward_io_handler(struct appctx *appctx)
+static void _sink_forward_io_handler(struct appctx *appctx,
+ ssize_t (*msg_handler)(void *ctx, struct ist v1, struct ist v2, size_t ofs, size_t len))
{
struct stconn *sc = appctx_sc(appctx);
struct sink_forward_target *sft = appctx->svcctx;
MT_LIST_DELETE(&appctx->wait_entry);
- ret = ring_dispatch_messages(ring, appctx, &sft->ofs, &last_ofs, 0, applet_append_line);
+ ret = ring_dispatch_messages(ring, appctx, &sft->ofs, &last_ofs, 0, msg_handler);
if (ret) {
/* let's be woken up once new data arrive */
se_fl_set(appctx->sedesc, SE_FL_EOS|SE_FL_EOI);
}
+/*
+ * IO Handler to handle message push to syslog tcp server.
+ * It takes its context from appctx->svcctx.
+ */
+static inline void sink_forward_io_handler(struct appctx *appctx)
+{
+ _sink_forward_io_handler(appctx, applet_append_line);
+}
+
/*
* IO Handler to handle message push to syslog tcp server
* using octet counting frames
* It takes its context from appctx->svcctx.
*/
-static void sink_forward_oc_io_handler(struct appctx *appctx)
+static inline void sink_forward_oc_io_handler(struct appctx *appctx)
{
- struct stconn *sc = appctx_sc(appctx);
- struct sink_forward_target *sft = appctx->svcctx;
- struct sink *sink = sft->sink;
- struct ring *ring = sink->ctx.ring;
- size_t ofs, last_ofs;
- int ret = 0;
-
- if (unlikely(se_fl_test(appctx->sedesc, (SE_FL_EOS|SE_FL_ERROR))))
- goto out;
-
- /* if stopping was requested, close immediately */
- if (unlikely(stopping))
- goto close;
-
- /* if the connection is not established, inform the stream that we want
- * to be notified whenever the connection completes.
- */
- if (sc_opposite(sc)->state < SC_ST_EST) {
- applet_need_more_data(appctx);
- se_need_remote_conn(appctx->sedesc);
- applet_have_more_data(appctx);
- goto out;
- }
-
- HA_SPIN_LOCK(SFT_LOCK, &sft->lock);
- if (appctx != sft->appctx) {
- HA_SPIN_UNLOCK(SFT_LOCK, &sft->lock);
- goto close;
- }
-
- MT_LIST_DELETE(&appctx->wait_entry);
-
- ret = ring_dispatch_messages(ring, appctx, &sft->ofs, &last_ofs, 0, syslog_applet_append_event);
- if (ret) {
- /* let's be woken up once new data arrive */
- MT_LIST_APPEND(&ring->waiters, &appctx->wait_entry);
- ofs = ring_tail(ring);
- if (ofs != last_ofs) {
- /* more data was added into the ring between the
- * unlock and the lock, and the writer might not
- * have seen us. We need to reschedule a read.
- */
- applet_have_more_data(appctx);
- } else
- applet_have_no_more_data(appctx);
- }
- HA_SPIN_UNLOCK(SFT_LOCK, &sft->lock);
-
- out:
- /* always drain data from server */
- co_skip(sc_oc(sc), sc_oc(sc)->output);
- return;
-
-close:
- se_fl_set(appctx->sedesc, SE_FL_EOS|SE_FL_EOI);
+ _sink_forward_io_handler(appctx, syslog_applet_append_event);
}
void __sink_forward_session_deinit(struct sink_forward_target *sft)