From: Willy Tarreau Date: Mon, 13 Apr 2015 14:30:14 +0000 (+0200) Subject: REORG: stream-int: create si_applet_ops dedicated to applets X-Git-Tag: v1.6-dev2~195 X-Git-Url: http://git.ipfire.org/?a=commitdiff_plain;h=d45b9f899160be41ebe1307db26abba1aa4d0542;p=thirdparty%2Fhaproxy.git REORG: stream-int: create si_applet_ops dedicated to applets These functions are dedicated to applets so that we don't use the default ones anymore in this case. --- diff --git a/include/proto/stream_interface.h b/include/proto/stream_interface.h index 036a4e1fea..0812d63c26 100644 --- a/include/proto/stream_interface.h +++ b/include/proto/stream_interface.h @@ -41,6 +41,7 @@ void stream_sock_read0(struct stream_interface *si); extern struct si_ops si_embedded_ops; extern struct si_ops si_conn_ops; +extern struct si_ops si_applet_ops; extern struct data_cb si_conn_cb; extern struct data_cb si_idle_conn_cb; @@ -198,7 +199,7 @@ static inline int si_conn_ready(struct stream_interface *si) */ static inline void si_attach_appctx(struct stream_interface *si, struct appctx *appctx) { - si->ops = &si_embedded_ops; + si->ops = &si_applet_ops; si->end = &appctx->obj_type; appctx->owner = si; } diff --git a/src/stream_interface.c b/src/stream_interface.c index 28d65149f9..a1bc70d883 100644 --- a/src/stream_interface.c +++ b/src/stream_interface.c @@ -48,6 +48,11 @@ static void stream_int_shutr_conn(struct stream_interface *si); static void stream_int_shutw_conn(struct stream_interface *si); static void stream_int_chk_rcv_conn(struct stream_interface *si); static void stream_int_chk_snd_conn(struct stream_interface *si); +static void stream_int_update_applet(struct stream_interface *si); +static void stream_int_shutr_applet(struct stream_interface *si); +static void stream_int_shutw_applet(struct stream_interface *si); +static void stream_int_chk_rcv_applet(struct stream_interface *si); +static void stream_int_chk_snd_applet(struct stream_interface *si); static void si_conn_recv_cb(struct connection *conn); static void si_conn_send_cb(struct connection *conn); static int si_conn_wake_cb(struct connection *conn); @@ -72,6 +77,15 @@ struct si_ops si_conn_ops = { .shutw = stream_int_shutw_conn, }; +/* stream-interface operations for connections */ +struct si_ops si_applet_ops = { + .update = stream_int_update_applet, + .chk_rcv = stream_int_chk_rcv_applet, + .chk_snd = stream_int_chk_snd_applet, + .shutr = stream_int_shutr_applet, + .shutw = stream_int_shutw_applet, +}; + struct data_cb si_conn_cb = { .recv = si_conn_recv_cb, .send = si_conn_send_cb, @@ -225,12 +239,11 @@ static void stream_int_update_embedded(struct stream_interface *si) } /* - * This function performs a shutdown-read on a stream interface attached to an - * applet in a connected or init state (it does nothing for other states). It - * either shuts the read side or marks itself as closed. The buffer flags are - * updated to reflect the new state. If the stream interface has SI_FL_NOHALF, - * we also forward the close to the write side. The owner task is woken up if - * it exists. + * This function performs a shutdown-read on a detached stream interface in a + * connected or init state (it does nothing for other states). It either shuts + * the read side or marks itself as closed. The buffer flags are updated to + * reflect the new state. If the stream interface has SI_FL_NOHALF, we also + * forward the close to the write side. The owner task is woken up if it exists. */ static void stream_int_shutr(struct stream_interface *si) { @@ -249,7 +262,6 @@ static void stream_int_shutr(struct stream_interface *si) if (si_oc(si)->flags & CF_SHUTW) { si->state = SI_ST_DIS; si->exp = TICK_ETERNITY; - si_applet_release(si); } else if (si->flags & SI_FL_NOHALF) { /* we want to immediately forward this close to the write side */ @@ -262,11 +274,11 @@ static void stream_int_shutr(struct stream_interface *si) } /* - * This function performs a shutdown-write on a stream interface attached to an - * applet in a connected or init state (it does nothing for other states). It - * either shuts the write side or marks itself as closed. The buffer flags are - * updated to reflect the new state. It does also close everything if the SI - * was marked as being in error state. The owner task is woken up if it exists. + * This function performs a shutdown-write on a detached stream interface in a + * connected or init state (it does nothing for other states). It either shuts + * the write side or marks itself as closed. The buffer flags are updated to + * reflect the new state. It does also close everything if the SI was marked as + * being in error state. The owner task is woken up if it exists. */ static void stream_int_shutw(struct stream_interface *si) { @@ -299,7 +311,6 @@ static void stream_int_shutw(struct stream_interface *si) case SI_ST_TAR: /* Note that none of these states may happen with applets */ si->state = SI_ST_DIS; - si_applet_release(si); default: si->flags &= ~(SI_FL_WAIT_ROOM | SI_FL_NOLINGER); ic->flags &= ~CF_SHUTR_NOW; @@ -1358,6 +1369,231 @@ void stream_sock_read0(struct stream_interface *si) return; } +/* default update function for applets, to be used at the end of the i/o handler */ +static void stream_int_update_applet(struct stream_interface *si) +{ + int old_flags = si->flags; + struct channel *ic = si_ic(si); + struct channel *oc = si_oc(si); + + DPRINTF(stderr, "%s: si=%p, si->state=%d ic->flags=%08x oc->flags=%08x\n", + __FUNCTION__, + si, si->state, ic->flags, oc->flags); + + if (si->state != SI_ST_EST) + return; + + if ((oc->flags & (CF_SHUTW|CF_SHUTW_NOW)) == CF_SHUTW_NOW && + channel_is_empty(oc)) + si_shutw(si); + + if ((oc->flags & (CF_SHUTW|CF_SHUTW_NOW)) == 0 && channel_may_recv(oc)) + si->flags |= SI_FL_WAIT_DATA; + + /* we're almost sure that we need some space if the buffer is not + * empty, even if it's not full, because the applets can't fill it. + */ + if ((ic->flags & (CF_SHUTR|CF_DONT_READ)) == 0 && !channel_is_empty(ic)) + si->flags |= SI_FL_WAIT_ROOM; + + if (oc->flags & CF_WRITE_ACTIVITY) { + if (tick_isset(oc->wex)) + oc->wex = tick_add_ifset(now_ms, oc->wto); + } + + if (ic->flags & CF_READ_ACTIVITY || + (oc->flags & CF_WRITE_ACTIVITY && !(si->flags & SI_FL_INDEP_STR))) { + if (tick_isset(ic->rex)) + ic->rex = tick_add_ifset(now_ms, ic->rto); + } + + /* save flags to detect changes */ + old_flags = si->flags; + if (likely((oc->flags & (CF_SHUTW|CF_WRITE_PARTIAL|CF_DONT_READ)) == CF_WRITE_PARTIAL && + channel_may_recv(oc) && + (si_opposite(si)->flags & SI_FL_WAIT_ROOM))) + si_chk_rcv(si_opposite(si)); + + if (((ic->flags & CF_READ_PARTIAL) && !channel_is_empty(ic)) && + (ic->pipe /* always try to send spliced data */ || + (ic->buf->i == 0 && (si_opposite(si)->flags & SI_FL_WAIT_DATA)))) { + si_chk_snd(si_opposite(si)); + /* check if the consumer has freed some space */ + if (channel_may_recv(ic) && !ic->pipe) + si->flags &= ~SI_FL_WAIT_ROOM; + } + + /* Note that we're trying to wake up in two conditions here : + * - special event, which needs the holder task attention + * - status indicating that the applet can go on working. This + * is rather hard because we might be blocking on output and + * don't want to wake up on input and vice-versa. The idea is + * to only rely on the changes the chk_* might have performed. + */ + if (/* check stream interface changes */ + ((old_flags & ~si->flags) & (SI_FL_WAIT_ROOM|SI_FL_WAIT_DATA)) || + + /* changes on the production side */ + (ic->flags & (CF_READ_NULL|CF_READ_ERROR)) || + si->state != SI_ST_EST || + (si->flags & SI_FL_ERR) || + ((ic->flags & CF_READ_PARTIAL) && + (!ic->to_forward || si_opposite(si)->state != SI_ST_EST)) || + + /* changes on the consumption side */ + (oc->flags & (CF_WRITE_NULL|CF_WRITE_ERROR)) || + ((oc->flags & CF_WRITE_ACTIVITY) && + ((oc->flags & CF_SHUTW) || + ((oc->flags & CF_WAKE_WRITE) && + (si_opposite(si)->state != SI_ST_EST || + (channel_is_empty(oc) && !oc->to_forward)))))) { + if (!(si->flags & SI_FL_DONT_WAKE)) + task_wakeup(si_task(si), TASK_WOKEN_IO); + } + if (ic->flags & CF_READ_ACTIVITY) + ic->flags &= ~CF_READ_DONTWAIT; +} + +/* + * This function performs a shutdown-read on a stream interface attached to an + * applet in a connected or init state (it does nothing for other states). It + * either shuts the read side or marks itself as closed. The buffer flags are + * updated to reflect the new state. If the stream interface has SI_FL_NOHALF, + * we also forward the close to the write side. The owner task is woken up if + * it exists. + */ +static void stream_int_shutr_applet(struct stream_interface *si) +{ + struct channel *ic = si_ic(si); + + ic->flags &= ~CF_SHUTR_NOW; + if (ic->flags & CF_SHUTR) + return; + ic->flags |= CF_SHUTR; + ic->rex = TICK_ETERNITY; + si->flags &= ~SI_FL_WAIT_ROOM; + + if (si->state != SI_ST_EST && si->state != SI_ST_CON) + return; + + if (si_oc(si)->flags & CF_SHUTW) { + si->state = SI_ST_DIS; + si->exp = TICK_ETERNITY; + si_applet_release(si); + } + else if (si->flags & SI_FL_NOHALF) { + /* we want to immediately forward this close to the write side */ + return stream_int_shutw_applet(si); + } + + /* note that if the task exists, it must unregister itself once it runs */ + if (!(si->flags & SI_FL_DONT_WAKE)) + task_wakeup(si_task(si), TASK_WOKEN_IO); +} + +/* + * This function performs a shutdown-write on a stream interface attached to an + * applet in a connected or init state (it does nothing for other states). It + * either shuts the write side or marks itself as closed. The buffer flags are + * updated to reflect the new state. It does also close everything if the SI + * was marked as being in error state. The owner task is woken up if it exists. + */ +static void stream_int_shutw_applet(struct stream_interface *si) +{ + struct channel *ic = si_ic(si); + struct channel *oc = si_oc(si); + + oc->flags &= ~CF_SHUTW_NOW; + if (oc->flags & CF_SHUTW) + return; + oc->flags |= CF_SHUTW; + oc->wex = TICK_ETERNITY; + si->flags &= ~SI_FL_WAIT_DATA; + + switch (si->state) { + case SI_ST_EST: + /* we have to shut before closing, otherwise some short messages + * may never leave the system, especially when there are remaining + * unread data in the socket input buffer, or when nolinger is set. + * However, if SI_FL_NOLINGER is explicitly set, we know there is + * no risk so we close both sides immediately. + */ + if (!(si->flags & (SI_FL_ERR | SI_FL_NOLINGER)) && + !(ic->flags & (CF_SHUTR|CF_DONT_READ))) + return; + + /* fall through */ + case SI_ST_CON: + case SI_ST_CER: + case SI_ST_QUE: + case SI_ST_TAR: + /* Note that none of these states may happen with applets */ + si->state = SI_ST_DIS; + si_applet_release(si); + default: + si->flags &= ~(SI_FL_WAIT_ROOM | SI_FL_NOLINGER); + ic->flags &= ~CF_SHUTR_NOW; + ic->flags |= CF_SHUTR; + ic->rex = TICK_ETERNITY; + si->exp = TICK_ETERNITY; + } + + /* note that if the task exists, it must unregister itself once it runs */ + if (!(si->flags & SI_FL_DONT_WAKE)) + task_wakeup(si_task(si), TASK_WOKEN_IO); +} + +/* chk_rcv function for applets */ +static void stream_int_chk_rcv_applet(struct stream_interface *si) +{ + struct channel *ic = si_ic(si); + + DPRINTF(stderr, "%s: si=%p, si->state=%d ic->flags=%08x oc->flags=%08x\n", + __FUNCTION__, + si, si->state, ic->flags, si_oc(si)->flags); + + if (unlikely(si->state != SI_ST_EST || (ic->flags & (CF_SHUTR|CF_DONT_READ)))) + return; + + if (!channel_may_recv(ic) || ic->pipe) { + /* stop reading */ + si->flags |= SI_FL_WAIT_ROOM; + } + else { + /* (re)start reading */ + si->flags &= ~SI_FL_WAIT_ROOM; + if (!(si->flags & SI_FL_DONT_WAKE)) + task_wakeup(si_task(si), TASK_WOKEN_IO); + } +} + +/* chk_snd function for applets */ +static void stream_int_chk_snd_applet(struct stream_interface *si) +{ + struct channel *oc = si_oc(si); + + DPRINTF(stderr, "%s: si=%p, si->state=%d ic->flags=%08x oc->flags=%08x\n", + __FUNCTION__, + si, si->state, si_ic(si)->flags, oc->flags); + + if (unlikely(si->state != SI_ST_EST || (oc->flags & CF_SHUTW))) + return; + + if (!(si->flags & SI_FL_WAIT_DATA) || /* not waiting for data */ + channel_is_empty(oc)) /* called with nothing to send ! */ + return; + + /* Otherwise there are remaining data to be sent in the buffer, + * so we tell the handler. + */ + si->flags &= ~SI_FL_WAIT_DATA; + if (!tick_isset(oc->wex)) + oc->wex = tick_add_ifset(now_ms, oc->wto); + + if (!(si->flags & SI_FL_DONT_WAKE)) + task_wakeup(si_task(si), TASK_WOKEN_IO); +} + /* * Local variables: * c-indent-level: 8