]> git.ipfire.org Git - thirdparty/haproxy.git/commitdiff
MEDIUM: stream-int: always call si_chk_rcv() when we make room in the buffer
authorWilly Tarreau <w@1wt.eu>
Wed, 7 Nov 2018 17:53:29 +0000 (18:53 +0100)
committerWilly Tarreau <w@1wt.eu>
Sun, 11 Nov 2018 09:18:37 +0000 (10:18 +0100)
Instead of clearing the SI_FL_WAIT_ROOM flag and losing the information
about the need from the producer to be woken up, we now call si_chk_rcv()
immediately. This is cheap to do and it could possibly be further improved
by only doing it when SI_FL_WAIT_ROOM was still set, though this will
require some extra auditing of the code paths.

The only remaining place where the flag was cleared without a call to
si_chk_rcv() is si_alloc_ibuf(), but since this one is called from a
receive path woken up from si_chk_rcv() or not having failed, the
clearing was not necessary anymore either.

And there was one place in stream_int_notify() where si_chk_rcv() was
called with SI_FL_WAIT_ROOM still explicitly set so this place was
adjusted in order to clear the flag prior to calling si_chk_rcv().

Now we don't have any situation where we randomly clear SI_FL_WAIT_ROOM
without trying to wake the other side up, nor where we call si_chk_rcv()
with the flag set, so this flag should accurately represent a failed
attempt at putting data into the buffer.

include/proto/stream_interface.h
src/stream_interface.c

index c0f7cdae30ac1d00cb51172606a58c7648636a9c..2d3fe565188187284aa4a1a26a147bff0d042105 100644 (file)
@@ -315,9 +315,8 @@ static inline struct conn_stream *si_alloc_cs(struct stream_interface *si, struc
 }
 
 /* Try to allocate a buffer for the stream-int's input channel. It relies on
- * channel_alloc_buffer() for this so it abides by its rules. It returns 0 in
- * case of failure, non-zero otherwise. The stream-int's flag SI_FL_WAIT_ROOM
- * is cleared before trying. If no buffer are available, the requester,
+ * channel_alloc_buffer() for this so it abides by its rules. It returns 0 on
+ * failure, non-zero otherwise. If no buffer is available, the requester,
  * represented by <wait> pointer, will be added in the list of objects waiting
  * for an available buffer, and SI_FL_WAIT_ROOM will be set on the stream-int.
  * The requester will be responsible for calling this function to try again
@@ -327,7 +326,6 @@ static inline int si_alloc_ibuf(struct stream_interface *si, struct buffer_wait
 {
        int ret;
 
-       si->flags &= ~SI_FL_WAIT_ROOM;
        ret = channel_alloc_buffer(si_ic(si), wait);
        if (!ret)
                si_cant_put(si);
index 3372fc2a61b45f6bf8963fed11d9f9fcf55f2a0f..b29cb2f756430f593595dd39995645a86a6c1945 100644 (file)
@@ -480,8 +480,10 @@ void stream_int_notify(struct stream_interface *si)
 
                if (likely((oc->flags & (CF_SHUTW|CF_WRITE_PARTIAL|CF_DONT_READ)) == CF_WRITE_PARTIAL &&
                           channel_may_recv(oc) &&
-                          (si_opposite(si)->flags & SI_FL_WAIT_ROOM)))
+                          (si_opposite(si)->flags & SI_FL_WAIT_ROOM))) {
+                       si_opposite(si)->flags &= ~SI_FL_WAIT_ROOM;
                        si_chk_rcv(si_opposite(si));
+               }
        }
 
        /* Notify the other side when we've injected data into the IC that
@@ -513,8 +515,10 @@ void stream_int_notify(struct stream_interface *si)
                /* check if the consumer has freed some space either in the
                 * buffer or in the pipe.
                 */
-               if (channel_may_recv(ic) && new_len < last_len)
+               if (channel_may_recv(ic) && new_len < last_len) {
                        si->flags &= ~SI_FL_WAIT_ROOM;
+                       si_chk_rcv(si);
+               }
        }
 
        if (si->flags & SI_FL_WAIT_ROOM) {
@@ -561,7 +565,6 @@ static int si_cs_process(struct conn_stream *cs)
        struct stream_interface *si = cs->data;
        struct channel *ic = si_ic(si);
        struct channel *oc = si_oc(si);
-       int wait_room = si->flags & SI_FL_WAIT_ROOM;
 
        /* If we have data to send, try it now */
        if (!channel_is_empty(oc) && !(si->wait_event.wait_reason & SUB_CAN_SEND))
@@ -597,10 +600,6 @@ static int si_cs_process(struct conn_stream *cs)
        stream_int_notify(si);
        channel_release_buffer(ic, &(si_strm(si)->buffer_wait));
 
-       /* Try to run again if we free'd some room in the process */
-       if (wait_room && !(si->flags & SI_FL_WAIT_ROOM))
-               tasklet_wakeup(si->wait_event.task);
-
        return 0;
 }
 
@@ -769,7 +768,7 @@ void stream_int_update(struct stream_interface *si)
                         * have updated it if there has been a completed I/O.
                         */
                        si->flags &= ~SI_FL_WAIT_ROOM;
-                       tasklet_wakeup(si->wait_event.task);
+                       si_chk_rcv(si);
                        if (!(ic->flags & (CF_READ_NOEXP|CF_DONT_READ)) && !tick_isset(ic->rex))
                                ic->rex = tick_add_ifset(now_ms, ic->rto);
                }