]> git.ipfire.org Git - thirdparty/haproxy.git/commitdiff
REORG: stream-int: create si_applet_ops dedicated to applets
authorWilly Tarreau <w@1wt.eu>
Mon, 13 Apr 2015 14:30:14 +0000 (16:30 +0200)
committerWilly Tarreau <w@1wt.eu>
Thu, 23 Apr 2015 15:56:16 +0000 (17:56 +0200)
These functions are dedicated to applets so that we don't use the default
ones anymore in this case.

include/proto/stream_interface.h
src/stream_interface.c

index 036a4e1fead1026f9d868ae0292a911fb021f3c3..0812d63c266f60d7b1b7eb22f6b5cdc48489bea2 100644 (file)
@@ -41,6 +41,7 @@ void stream_sock_read0(struct stream_interface *si);
 
 extern struct si_ops si_embedded_ops;
 extern struct si_ops si_conn_ops;
+extern struct si_ops si_applet_ops;
 extern struct data_cb si_conn_cb;
 extern struct data_cb si_idle_conn_cb;
 
@@ -198,7 +199,7 @@ static inline int si_conn_ready(struct stream_interface *si)
  */
 static inline void si_attach_appctx(struct stream_interface *si, struct appctx *appctx)
 {
-       si->ops = &si_embedded_ops;
+       si->ops = &si_applet_ops;
        si->end = &appctx->obj_type;
        appctx->owner = si;
 }
index 28d65149f9186583e735e26b92566d7b2cabdc15..a1bc70d883fa72b8239e1ace5be9ae903c2a86e9 100644 (file)
@@ -48,6 +48,11 @@ static void stream_int_shutr_conn(struct stream_interface *si);
 static void stream_int_shutw_conn(struct stream_interface *si);
 static void stream_int_chk_rcv_conn(struct stream_interface *si);
 static void stream_int_chk_snd_conn(struct stream_interface *si);
+static void stream_int_update_applet(struct stream_interface *si);
+static void stream_int_shutr_applet(struct stream_interface *si);
+static void stream_int_shutw_applet(struct stream_interface *si);
+static void stream_int_chk_rcv_applet(struct stream_interface *si);
+static void stream_int_chk_snd_applet(struct stream_interface *si);
 static void si_conn_recv_cb(struct connection *conn);
 static void si_conn_send_cb(struct connection *conn);
 static int si_conn_wake_cb(struct connection *conn);
@@ -72,6 +77,15 @@ struct si_ops si_conn_ops = {
        .shutw   = stream_int_shutw_conn,
 };
 
+/* stream-interface operations for connections */
+struct si_ops si_applet_ops = {
+       .update  = stream_int_update_applet,
+       .chk_rcv = stream_int_chk_rcv_applet,
+       .chk_snd = stream_int_chk_snd_applet,
+       .shutr   = stream_int_shutr_applet,
+       .shutw   = stream_int_shutw_applet,
+};
+
 struct data_cb si_conn_cb = {
        .recv    = si_conn_recv_cb,
        .send    = si_conn_send_cb,
@@ -225,12 +239,11 @@ static void stream_int_update_embedded(struct stream_interface *si)
 }
 
 /*
- * This function performs a shutdown-read on a stream interface attached to an
- * applet in a connected or init state (it does nothing for other states). It
- * either shuts the read side or marks itself as closed. The buffer flags are
- * updated to reflect the new state. If the stream interface has SI_FL_NOHALF,
- * we also forward the close to the write side. The owner task is woken up if
- * it exists.
+ * This function performs a shutdown-read on a detached stream interface in a
+ * connected or init state (it does nothing for other states). It either shuts
+ * the read side or marks itself as closed. The buffer flags are updated to
+ * reflect the new state. If the stream interface has SI_FL_NOHALF, we also
+ * forward the close to the write side. The owner task is woken up if it exists.
  */
 static void stream_int_shutr(struct stream_interface *si)
 {
@@ -249,7 +262,6 @@ static void stream_int_shutr(struct stream_interface *si)
        if (si_oc(si)->flags & CF_SHUTW) {
                si->state = SI_ST_DIS;
                si->exp = TICK_ETERNITY;
-               si_applet_release(si);
        }
        else if (si->flags & SI_FL_NOHALF) {
                /* we want to immediately forward this close to the write side */
@@ -262,11 +274,11 @@ static void stream_int_shutr(struct stream_interface *si)
 }
 
 /*
- * This function performs a shutdown-write on a stream interface attached to an
- * applet in a connected or init state (it does nothing for other states). It
- * either shuts the write side or marks itself as closed. The buffer flags are
- * updated to reflect the new state. It does also close everything if the SI
- * was marked as being in error state. The owner task is woken up if it exists.
+ * This function performs a shutdown-write on a detached stream interface in a
+ * connected or init state (it does nothing for other states). It either shuts
+ * the write side or marks itself as closed. The buffer flags are updated to
+ * reflect the new state. It does also close everything if the SI was marked as
+ * being in error state. The owner task is woken up if it exists.
  */
 static void stream_int_shutw(struct stream_interface *si)
 {
@@ -299,7 +311,6 @@ static void stream_int_shutw(struct stream_interface *si)
        case SI_ST_TAR:
                /* Note that none of these states may happen with applets */
                si->state = SI_ST_DIS;
-               si_applet_release(si);
        default:
                si->flags &= ~(SI_FL_WAIT_ROOM | SI_FL_NOLINGER);
                ic->flags &= ~CF_SHUTR_NOW;
@@ -1358,6 +1369,231 @@ void stream_sock_read0(struct stream_interface *si)
        return;
 }
 
+/* default update function for applets, to be used at the end of the i/o handler */
+static void stream_int_update_applet(struct stream_interface *si)
+{
+       int old_flags = si->flags;
+       struct channel *ic = si_ic(si);
+       struct channel *oc = si_oc(si);
+
+       DPRINTF(stderr, "%s: si=%p, si->state=%d ic->flags=%08x oc->flags=%08x\n",
+               __FUNCTION__,
+               si, si->state, ic->flags, oc->flags);
+
+       if (si->state != SI_ST_EST)
+               return;
+
+       if ((oc->flags & (CF_SHUTW|CF_SHUTW_NOW)) == CF_SHUTW_NOW &&
+           channel_is_empty(oc))
+               si_shutw(si);
+
+       if ((oc->flags & (CF_SHUTW|CF_SHUTW_NOW)) == 0 && channel_may_recv(oc))
+               si->flags |= SI_FL_WAIT_DATA;
+
+       /* we're almost sure that we need some space if the buffer is not
+        * empty, even if it's not full, because the applets can't fill it.
+        */
+       if ((ic->flags & (CF_SHUTR|CF_DONT_READ)) == 0 && !channel_is_empty(ic))
+               si->flags |= SI_FL_WAIT_ROOM;
+
+       if (oc->flags & CF_WRITE_ACTIVITY) {
+               if (tick_isset(oc->wex))
+                       oc->wex = tick_add_ifset(now_ms, oc->wto);
+       }
+
+       if (ic->flags & CF_READ_ACTIVITY ||
+           (oc->flags & CF_WRITE_ACTIVITY && !(si->flags & SI_FL_INDEP_STR))) {
+               if (tick_isset(ic->rex))
+                       ic->rex = tick_add_ifset(now_ms, ic->rto);
+       }
+
+       /* save flags to detect changes */
+       old_flags = si->flags;
+       if (likely((oc->flags & (CF_SHUTW|CF_WRITE_PARTIAL|CF_DONT_READ)) == CF_WRITE_PARTIAL &&
+                  channel_may_recv(oc) &&
+                  (si_opposite(si)->flags & SI_FL_WAIT_ROOM)))
+               si_chk_rcv(si_opposite(si));
+
+       if (((ic->flags & CF_READ_PARTIAL) && !channel_is_empty(ic)) &&
+           (ic->pipe /* always try to send spliced data */ ||
+            (ic->buf->i == 0 && (si_opposite(si)->flags & SI_FL_WAIT_DATA)))) {
+               si_chk_snd(si_opposite(si));
+               /* check if the consumer has freed some space */
+               if (channel_may_recv(ic) && !ic->pipe)
+                       si->flags &= ~SI_FL_WAIT_ROOM;
+       }
+
+       /* Note that we're trying to wake up in two conditions here :
+        *  - special event, which needs the holder task attention
+        *  - status indicating that the applet can go on working. This
+        *    is rather hard because we might be blocking on output and
+        *    don't want to wake up on input and vice-versa. The idea is
+        *    to only rely on the changes the chk_* might have performed.
+        */
+       if (/* check stream interface changes */
+           ((old_flags & ~si->flags) & (SI_FL_WAIT_ROOM|SI_FL_WAIT_DATA)) ||
+
+           /* changes on the production side */
+           (ic->flags & (CF_READ_NULL|CF_READ_ERROR)) ||
+           si->state != SI_ST_EST ||
+           (si->flags & SI_FL_ERR) ||
+           ((ic->flags & CF_READ_PARTIAL) &&
+            (!ic->to_forward || si_opposite(si)->state != SI_ST_EST)) ||
+
+           /* changes on the consumption side */
+           (oc->flags & (CF_WRITE_NULL|CF_WRITE_ERROR)) ||
+           ((oc->flags & CF_WRITE_ACTIVITY) &&
+            ((oc->flags & CF_SHUTW) ||
+             ((oc->flags & CF_WAKE_WRITE) &&
+              (si_opposite(si)->state != SI_ST_EST ||
+               (channel_is_empty(oc) && !oc->to_forward)))))) {
+               if (!(si->flags & SI_FL_DONT_WAKE))
+                       task_wakeup(si_task(si), TASK_WOKEN_IO);
+       }
+       if (ic->flags & CF_READ_ACTIVITY)
+               ic->flags &= ~CF_READ_DONTWAIT;
+}
+
+/*
+ * This function performs a shutdown-read on a stream interface attached to an
+ * applet in a connected or init state (it does nothing for other states). It
+ * either shuts the read side or marks itself as closed. The buffer flags are
+ * updated to reflect the new state. If the stream interface has SI_FL_NOHALF,
+ * we also forward the close to the write side. The owner task is woken up if
+ * it exists.
+ */
+static void stream_int_shutr_applet(struct stream_interface *si)
+{
+       struct channel *ic = si_ic(si);
+
+       ic->flags &= ~CF_SHUTR_NOW;
+       if (ic->flags & CF_SHUTR)
+               return;
+       ic->flags |= CF_SHUTR;
+       ic->rex = TICK_ETERNITY;
+       si->flags &= ~SI_FL_WAIT_ROOM;
+
+       if (si->state != SI_ST_EST && si->state != SI_ST_CON)
+               return;
+
+       if (si_oc(si)->flags & CF_SHUTW) {
+               si->state = SI_ST_DIS;
+               si->exp = TICK_ETERNITY;
+               si_applet_release(si);
+       }
+       else if (si->flags & SI_FL_NOHALF) {
+               /* we want to immediately forward this close to the write side */
+               return stream_int_shutw_applet(si);
+       }
+
+       /* note that if the task exists, it must unregister itself once it runs */
+       if (!(si->flags & SI_FL_DONT_WAKE))
+               task_wakeup(si_task(si), TASK_WOKEN_IO);
+}
+
+/*
+ * This function performs a shutdown-write on a stream interface attached to an
+ * applet in a connected or init state (it does nothing for other states). It
+ * either shuts the write side or marks itself as closed. The buffer flags are
+ * updated to reflect the new state. It does also close everything if the SI
+ * was marked as being in error state. The owner task is woken up if it exists.
+ */
+static void stream_int_shutw_applet(struct stream_interface *si)
+{
+       struct channel *ic = si_ic(si);
+       struct channel *oc = si_oc(si);
+
+       oc->flags &= ~CF_SHUTW_NOW;
+       if (oc->flags & CF_SHUTW)
+               return;
+       oc->flags |= CF_SHUTW;
+       oc->wex = TICK_ETERNITY;
+       si->flags &= ~SI_FL_WAIT_DATA;
+
+       switch (si->state) {
+       case SI_ST_EST:
+               /* we have to shut before closing, otherwise some short messages
+                * may never leave the system, especially when there are remaining
+                * unread data in the socket input buffer, or when nolinger is set.
+                * However, if SI_FL_NOLINGER is explicitly set, we know there is
+                * no risk so we close both sides immediately.
+                */
+               if (!(si->flags & (SI_FL_ERR | SI_FL_NOLINGER)) &&
+                   !(ic->flags & (CF_SHUTR|CF_DONT_READ)))
+                       return;
+
+               /* fall through */
+       case SI_ST_CON:
+       case SI_ST_CER:
+       case SI_ST_QUE:
+       case SI_ST_TAR:
+               /* Note that none of these states may happen with applets */
+               si->state = SI_ST_DIS;
+               si_applet_release(si);
+       default:
+               si->flags &= ~(SI_FL_WAIT_ROOM | SI_FL_NOLINGER);
+               ic->flags &= ~CF_SHUTR_NOW;
+               ic->flags |= CF_SHUTR;
+               ic->rex = TICK_ETERNITY;
+               si->exp = TICK_ETERNITY;
+       }
+
+       /* note that if the task exists, it must unregister itself once it runs */
+       if (!(si->flags & SI_FL_DONT_WAKE))
+               task_wakeup(si_task(si), TASK_WOKEN_IO);
+}
+
+/* chk_rcv function for applets */
+static void stream_int_chk_rcv_applet(struct stream_interface *si)
+{
+       struct channel *ic = si_ic(si);
+
+       DPRINTF(stderr, "%s: si=%p, si->state=%d ic->flags=%08x oc->flags=%08x\n",
+               __FUNCTION__,
+               si, si->state, ic->flags, si_oc(si)->flags);
+
+       if (unlikely(si->state != SI_ST_EST || (ic->flags & (CF_SHUTR|CF_DONT_READ))))
+               return;
+
+       if (!channel_may_recv(ic) || ic->pipe) {
+               /* stop reading */
+               si->flags |= SI_FL_WAIT_ROOM;
+       }
+       else {
+               /* (re)start reading */
+               si->flags &= ~SI_FL_WAIT_ROOM;
+               if (!(si->flags & SI_FL_DONT_WAKE))
+                       task_wakeup(si_task(si), TASK_WOKEN_IO);
+       }
+}
+
+/* chk_snd function for applets */
+static void stream_int_chk_snd_applet(struct stream_interface *si)
+{
+       struct channel *oc = si_oc(si);
+
+       DPRINTF(stderr, "%s: si=%p, si->state=%d ic->flags=%08x oc->flags=%08x\n",
+               __FUNCTION__,
+               si, si->state, si_ic(si)->flags, oc->flags);
+
+       if (unlikely(si->state != SI_ST_EST || (oc->flags & CF_SHUTW)))
+               return;
+
+       if (!(si->flags & SI_FL_WAIT_DATA) ||        /* not waiting for data */
+           channel_is_empty(oc))           /* called with nothing to send ! */
+               return;
+
+       /* Otherwise there are remaining data to be sent in the buffer,
+        * so we tell the handler.
+        */
+       si->flags &= ~SI_FL_WAIT_DATA;
+       if (!tick_isset(oc->wex))
+               oc->wex = tick_add_ifset(now_ms, oc->wto);
+
+       if (!(si->flags & SI_FL_DONT_WAKE))
+               task_wakeup(si_task(si), TASK_WOKEN_IO);
+}
+
 /*
  * Local variables:
  *  c-indent-level: 8