]> git.ipfire.org Git - thirdparty/haproxy.git/commitdiff
MINOR: stream-int: implement the stream_int_notify() function
authorWilly Tarreau <w@1wt.eu>
Wed, 23 Sep 2015 16:40:09 +0000 (18:40 +0200)
committerWilly Tarreau <w@1wt.eu>
Fri, 25 Sep 2015 19:16:02 +0000 (21:16 +0200)
stream_int_notify() was taken from the common part between si_conn_wake_cb()
and si_applet_done(). It is designed to report activity to a stream from
outside its handler. It'll generally be used by lower layers to report I/O
completion but may also be used by remote streams if the buffer processing
is shared.

include/proto/stream_interface.h
src/stream_interface.c

index d1d75baabb56b3bf1513a195f264ad6bc1fc72d8..22d05be80598c390cd6c366f5a36261901768cf7 100644 (file)
@@ -51,6 +51,7 @@ void si_applet_done(struct stream_interface *si);
 void stream_int_update(struct stream_interface *si);
 void stream_int_update_conn(struct stream_interface *si);
 void stream_int_update_applet(struct stream_interface *si);
+void stream_int_notify(struct stream_interface *si);
 
 /* returns the channel which receives data from this stream interface (input channel) */
 static inline struct channel *si_ic(struct stream_interface *si)
index 1da5248e0092daaeeafc7a932f052123f0b846cd..dbc481f7998160de2d74becdf0398207d3f17d64 100644 (file)
@@ -519,6 +519,115 @@ static int si_idle_conn_wake_cb(struct connection *conn)
        return 0;
 }
 
+/* This function is the equivalent to stream_int_update() except that it's
+ * designed to be called from outside the stream handlers, typically the lower
+ * layers (applets, connections) after I/O completion. After updating the stream
+ * interface and timeouts, it will try to forward what can be forwarded, then to
+ * wake the associated task up if an important event requires special handling.
+ * It should not be called from within the stream itself, stream_int_update()
+ * is designed for this.
+ */
+void stream_int_notify(struct stream_interface *si)
+{
+       struct channel *ic = si_ic(si);
+       struct channel *oc = si_oc(si);
+
+       /* process consumer side */
+       if (channel_is_empty(oc)) {
+               if (((oc->flags & (CF_SHUTW|CF_SHUTW_NOW)) == CF_SHUTW_NOW) &&
+                   (si->state == SI_ST_EST))
+                       si_shutw(si);
+               oc->wex = TICK_ETERNITY;
+       }
+
+       /* indicate that we may be waiting for data from the output channel */
+       if ((oc->flags & (CF_SHUTW|CF_SHUTW_NOW)) == 0 && channel_may_recv(oc))
+               si->flags |= SI_FL_WAIT_DATA;
+
+       /* update OC timeouts and wake the other side up if it's waiting for room */
+       if (oc->flags & CF_WRITE_ACTIVITY) {
+               if ((oc->flags & (CF_SHUTW|CF_WRITE_PARTIAL)) == CF_WRITE_PARTIAL &&
+                   !channel_is_empty(oc))
+                       if (tick_isset(oc->wex))
+                               oc->wex = tick_add_ifset(now_ms, oc->wto);
+
+               if (!(si->flags & SI_FL_INDEP_STR))
+                       if (tick_isset(ic->rex))
+                               ic->rex = tick_add_ifset(now_ms, ic->rto);
+
+               if (likely((oc->flags & (CF_SHUTW|CF_WRITE_PARTIAL|CF_DONT_READ)) == CF_WRITE_PARTIAL &&
+                          channel_may_recv(oc) &&
+                          (si_opposite(si)->flags & SI_FL_WAIT_ROOM)))
+                       si_chk_rcv(si_opposite(si));
+       }
+
+       /* Notify the other side when we've injected data into the IC that
+        * needs to be forwarded. We can do fast-forwarding as soon as there
+        * are output data, but we avoid doing this if some of the data are
+        * not yet scheduled for being forwarded, because it is very likely
+        * that it will be done again immediately afterwards once the following
+        * data are parsed (eg: HTTP chunking). We only SI_FL_WAIT_ROOM once
+        * we've emptied *some* of the output buffer, and not just when there
+        * is available room, because applets are often forced to stop before
+        * the buffer is full. We must not stop based on input data alone because
+        * an HTTP parser might need more data to complete the parsing.
+        */
+       if (!channel_is_empty(ic) &&
+           (si_opposite(si)->flags & SI_FL_WAIT_DATA) &&
+           (ic->buf->i == 0 || ic->pipe)) {
+               int new_len, last_len;
+
+               last_len = ic->buf->o;
+               if (ic->pipe)
+                       last_len += ic->pipe->data;
+
+               si_chk_snd(si_opposite(si));
+
+               new_len = ic->buf->o;
+               if (ic->pipe)
+                       new_len += ic->pipe->data;
+
+               /* check if the consumer has freed some space either in the
+                * buffer or in the pipe.
+                */
+               if (channel_may_recv(ic) && new_len < last_len)
+                       si->flags &= ~SI_FL_WAIT_ROOM;
+       }
+
+       if (si->flags & SI_FL_WAIT_ROOM) {
+               ic->rex = TICK_ETERNITY;
+       }
+       else if ((ic->flags & (CF_SHUTR|CF_READ_PARTIAL|CF_DONT_READ)) == CF_READ_PARTIAL &&
+                channel_may_recv(ic)) {
+               /* we must re-enable reading if si_chk_snd() has freed some space */
+               if (!(ic->flags & CF_READ_NOEXP) && tick_isset(ic->rex))
+                       ic->rex = tick_add_ifset(now_ms, ic->rto);
+       }
+
+       /* wake the task up only when needed */
+       if (/* changes on the production side */
+           (ic->flags & (CF_READ_NULL|CF_READ_ERROR)) ||
+           si->state != SI_ST_EST ||
+           (si->flags & SI_FL_ERR) ||
+           ((ic->flags & CF_READ_PARTIAL) &&
+            (!ic->to_forward || si_opposite(si)->state != SI_ST_EST)) ||
+
+           /* changes on the consumption side */
+           (oc->flags & (CF_WRITE_NULL|CF_WRITE_ERROR)) ||
+           ((oc->flags & CF_WRITE_ACTIVITY) &&
+            ((oc->flags & CF_SHUTW) ||
+             ((oc->flags & CF_WAKE_WRITE) &&
+              (si_opposite(si)->state != SI_ST_EST ||
+               (channel_is_empty(oc) && !oc->to_forward)))))) {
+               task_wakeup(si_task(si), TASK_WOKEN_IO);
+       }
+       if (ic->flags & CF_READ_ACTIVITY)
+               ic->flags &= ~CF_READ_DONTWAIT;
+
+       stream_release_buffers(si_strm(si));
+}
+
+
 /* Callback to be used by connection I/O handlers upon completion. It differs from
  * the update function in that it is designed to be called by lower layers after I/O
  * events have been completed. It will also try to wake the associated task up if