]> git.ipfire.org Git - thirdparty/haproxy.git/commitdiff
MEDIUM: stream-int: make si_update() synchronize flag changes before the I/O
authorWilly Tarreau <w@1wt.eu>
Thu, 25 Oct 2018 09:06:57 +0000 (11:06 +0200)
committerWilly Tarreau <w@1wt.eu>
Sun, 28 Oct 2018 12:47:00 +0000 (13:47 +0100)
With the new synchronous si_cs_send() at the end of process_stream(),
we're seeing re-appear the I/O layer specific part of the stream interface
which is supposed to deal with I/O event subscription. The only difference
is that now we subscribe to I/Os only after having attempted (and failed)
them.

This patch brings a cleanup in this by reintroducing stream_int_update_conn()
with the send code from process_stream(). However this alone would not be
enough because the flags which are cleared afterwards would result in the
loss of the possible events (write events only at the moment). So the flags
clearing and stream-int state updates are also performed inside si_update()
between the generic code and the I/O specific code. This definitely makes
sense as after this call we can simply check again for channel and SI flag
changes and decide to loop once again or not.

include/proto/stream_interface.h
src/stream.c
src/stream_interface.c

index f3cf5e0e90f0eaab8967f32094528ec6ae5eb6be..4e400399b04ae98d32ec7d4729937b3e305c147b 100644 (file)
@@ -357,11 +357,23 @@ static inline void si_shutw(struct stream_interface *si)
        si->ops->shutw(si);
 }
 
-/* Updates the stream interface and timers, then updates the data layer below */
+/* Updates the stream interface and timers, to complete the work after the
+ * analysers, then clears the relevant channel flags, and the errors and
+ * expirations, then updates the data layer below. This will ensure that any
+ * synchronous update performed at the data layer will be reflected in the
+ * channel flags and/or stream-interface.
+ */
 static inline void si_update(struct stream_interface *si)
 {
-       stream_int_update(si);
-       if (si->ops->update)
+       if (si->state == SI_ST_EST)
+               stream_int_update(si);
+
+       si_ic(si)->flags &= ~(CF_READ_NULL|CF_READ_PARTIAL|CF_READ_ATTACHED);
+       si_oc(si)->flags &= ~(CF_WRITE_NULL|CF_WRITE_PARTIAL);
+       si->flags &= ~(SI_FL_ERR|SI_FL_EXP);
+       si->prev_state = si->state;
+
+       if (si->ops->update && (si->state == SI_ST_CON || si->state == SI_ST_EST))
                si->ops->update(si);
 }
 
index 4e2db45afc1d46cdd8ff6976d164aad30c4546f4..f33713dc9ce4a7f4da02716ed8395c182d8b7601 100644 (file)
@@ -1659,7 +1659,6 @@ struct task *process_stream(struct task *t, void *context, unsigned short state)
        struct channel *req, *res;
        struct stream_interface *si_f, *si_b;
        struct conn_stream *cs;
-       int ret;
 
        activity[tid].stream++;
 
@@ -2447,37 +2446,12 @@ redo:
                if ((sess->fe->options & PR_O_CONTSTATS) && (s->flags & SF_BE_ASSIGNED))
                        stream_process_counters(s);
 
-               cs = objt_cs(si_f->end);
-               ret = 0;
-               if (cs && !(cs->conn->flags & CO_FL_ERROR) &&
-                   !(cs->flags & CS_FL_ERROR) &&
-                   !(si_oc(si_f)->flags & CF_SHUTW) &&
-                   !(si_f->wait_event.wait_reason & SUB_CAN_SEND) &&
-                   co_data(si_oc(si_f)))
-                       ret = si_cs_send(cs);
-               cs = objt_cs(si_b->end);
-               if (cs && !(cs->conn->flags & CO_FL_ERROR) &&
-                   !(cs->flags & CS_FL_ERROR) &&
-                   !(si_oc(si_b)->flags & CF_SHUTW) &&
-                   !(si_b->wait_event.wait_reason & SUB_CAN_SEND) &&
-                   co_data(si_oc(si_b)))
-                       ret |= si_cs_send(cs);
-
-               if (ret)
-                       goto redo;
-
-               if (si_f->state == SI_ST_EST)
-                       si_update(si_f);
+               si_update(si_f);
+               si_update(si_b);
 
-               if (si_b->state == SI_ST_EST)
-                       si_update(si_b);
-
-               req->flags &= ~(CF_READ_NULL|CF_READ_PARTIAL|CF_WRITE_NULL|CF_WRITE_PARTIAL|CF_READ_ATTACHED);
-               res->flags &= ~(CF_READ_NULL|CF_READ_PARTIAL|CF_WRITE_NULL|CF_WRITE_PARTIAL|CF_READ_ATTACHED);
-               si_f->prev_state = si_f->state;
-               si_b->prev_state = si_b->state;
-               si_f->flags &= ~(SI_FL_ERR|SI_FL_EXP);
-               si_b->flags &= ~(SI_FL_ERR|SI_FL_EXP);
+               if (si_f->state == SI_ST_DIS || si_b->state == SI_ST_DIS ||
+                   (((req->flags ^ rqf_last) | (res->flags ^ rpf_last)) & CF_MASK_ANALYSER))
+                       goto redo;
 
                /* Trick: if a request is being waiting for the server to respond,
                 * and if we know the server can timeout, we don't want the timeout
index 8574b4ca30a9ebded5f6f8f195cecd141b095d0e..08814cf39912bd9c4bb9c895f58b2599deafc8f1 100644 (file)
@@ -66,6 +66,7 @@ struct si_ops si_embedded_ops = {
 
 /* stream-interface operations for connections */
 struct si_ops si_conn_ops = {
+       .update  = stream_int_update_conn,
        .chk_rcv = stream_int_chk_rcv_conn,
        .chk_snd = stream_int_chk_snd_conn,
        .shutr   = stream_int_shutr_conn,
@@ -798,6 +799,33 @@ void stream_int_update(struct stream_interface *si)
        }
 }
 
+/* Updates the active status of a connection outside of the connection handler
+ * based on the channel's flags and the stream interface's flags. It needs to
+ * be called once after the channels' flags have settled down and the stream
+ * has been updated. It is not designed to be called from within the connection
+ * handler itself.
+ */
+void stream_int_update_conn(struct stream_interface *si)
+{
+       struct channel *ic = si_ic(si);
+       struct channel *oc = si_oc(si);
+       struct conn_stream *cs = __objt_cs(si->end);
+
+       if (!(ic->flags & CF_SHUTR)) {
+               /* Read not closed, it doesn't seem we have to do anything here */
+       }
+
+       if (!(oc->flags & CF_SHUTW)) {
+               /* Write not closed */
+               if (!channel_is_empty(oc) &&
+                   !(cs->conn->flags & CO_FL_ERROR) &&
+                   !(cs->flags & CS_FL_ERROR) &&
+                   !(oc->flags & CF_SHUTW) &&
+                   !(si->wait_event.wait_reason & SUB_CAN_SEND))
+                       si_cs_send(cs);
+       }
+}
+
 /*
  * This function performs a shutdown-read on a stream interface attached to
  * a connection in a connected or init state (it does nothing for other