]> git.ipfire.org Git - thirdparty/haproxy.git/commitdiff
MINOR: dynbuf: pass offer_buffers() the number of buffers instead of a threshold
authorWilly Tarreau <w@1wt.eu>
Sat, 20 Feb 2021 11:02:46 +0000 (12:02 +0100)
committerWilly Tarreau <w@1wt.eu>
Sat, 20 Feb 2021 11:38:18 +0000 (12:38 +0100)
Historically this function would try to wake the most accurate number of
process_stream() waiters. But since the introduction of filters which could
also require buffers (e.g. for compression), things started not to be as
accurate anymore. Nowadays muxes and transport layers also use buffers, so
the runqueue size has nothing to do anymore with the number of supposed
users to come.

In addition to this, the threshold was compared to the number of free buffer
calculated as allocated minus used, but this didn't work anymore with local
pools since these counts are not updated upon alloc/free!

Let's clean this up and pass the number of released buffers instead, and
consider that each waiter successfully called counts as one buffer. This
is not rocket science and will not suddenly fix everything, but at least
it cannot be as wrong as it is today.

This could have been marked as a bug given that the current situation is
totally broken regarding this, but this probably doesn't completely fix
it, it only goes in a better direction. It is possible however that it
makes sense in the future to backport this as part of a larger series if
the situation significantly improves.

include/haproxy/channel.h
include/haproxy/dynbuf.h
src/check.c
src/dynbuf.c
src/flt_spoe.c
src/mux_fcgi.c
src/mux_h1.c
src/mux_h2.c
src/stream.c

index 47e4f44278319f91678021f809f7f3703a9de0eb..296e465df5cb7a9201945d31e6ef47b1083b5731 100644 (file)
@@ -864,7 +864,7 @@ static inline void channel_release_buffer(struct channel *chn, struct buffer_wai
 {
        if (c_size(chn) && c_empty(chn)) {
                b_free(&chn->buf);
-               offer_buffers(wait->target, tasks_run_queue);
+               offer_buffers(wait->target, 1);
        }
 }
 
index 83c2a9827800bd2696f24e383e2d87d8b01625be..c31b83aaac439f8db3e7b3930a88197f3b571744 100644 (file)
@@ -186,19 +186,19 @@ static inline struct buffer *b_alloc_margin(struct buffer *buf, int margin)
 }
 
 
-/* Offer a buffer currently belonging to target <from> to whoever needs one.
- * Any pointer is valid for <from>, including NULL. Its purpose is to avoid
- * passing a buffer to oneself in case of failed allocations (e.g. need two
- * buffers, get one, fail, release it and wake up self again). In case of
- * normal buffer release where it is expected that the caller is not waiting
+/* Offer one or multiple buffer currently belonging to target <from> to whoever
+ * needs one. Any pointer is valid for <from>, including NULL. Its purpose is
+ * to avoid passing a buffer to oneself in case of failed allocations (e.g.
+ * need two buffers, get one, fail, release it and wake up self again). In case
+ * of normal buffer release where it is expected that the caller is not waiting
  * for a buffer, NULL is fine. It will wake waiters on the current thread only.
  */
-void __offer_buffer(void *from, unsigned int threshold);
+void __offer_buffers(void *from, unsigned int count);
 
-static inline void offer_buffers(void *from, unsigned int threshold)
+static inline void offer_buffers(void *from, unsigned int count)
 {
        if (!LIST_ISEMPTY(&ti->buffer_wq))
-               __offer_buffer(from, threshold);
+               __offer_buffers(from, count);
 }
 
 
index 6451dcf15c3dca9aca2403b09596eb5be8de4f90..5b6535478142610291d7fcb992d8edfae576f658 100644 (file)
@@ -1032,7 +1032,7 @@ void check_release_buf(struct check *check, struct buffer *bptr)
 {
        if (bptr->size) {
                b_free(bptr);
-               offer_buffers(check->buf_wait.target, tasks_run_queue);
+               offer_buffers(check->buf_wait.target, 1);
        }
 }
 
index a6d1d40ac0f89d033411e77917fc5e4a860dc685..8b492e29db447c79339004c524451b86ecc52023 100644 (file)
@@ -96,32 +96,26 @@ void buffer_dump(FILE *o, struct buffer *b, int from, int to)
        fflush(o);
 }
 
-/* see offer_buffer() for details */
-void __offer_buffer(void *from, unsigned int threshold)
+/* see offer_buffers() for details */
+void __offer_buffers(void *from, unsigned int count)
 {
        struct buffer_wait *wait, *wait_back;
-       int avail;
 
        /* For now, we consider that all objects need 1 buffer, so we can stop
         * waking up them once we have enough of them to eat all the available
         * buffers. Note that we don't really know if they are streams or just
         * other tasks, but that's a rough estimate. Similarly, for each cached
-        * event we'll need 1 buffer. If no buffer is currently used, always
-        * wake up the number of tasks we can offer a buffer based on what is
-        * allocated, and in any case at least one task per two reserved
-        * buffers.
+        * event we'll need 1 buffer.
         */
-       avail = pool_head_buffer->allocated - pool_head_buffer->used - global.tune.reserved_bufs / 2;
-
        list_for_each_entry_safe(wait, wait_back, &ti->buffer_wq, list) {
-               if (avail <= threshold)
+               if (!count)
                        break;
 
                if (wait->target == from || !wait->wakeup_cb(wait->target))
                        continue;
 
                LIST_DEL_INIT(&wait->list);
-               avail--;
+               count--;
        }
 }
 
index eee77cbed7452b35bfd0dab2bdcff6365d696f38..92a5af425a039d2a35fd31187c071d269fa0cce7 100644 (file)
@@ -2841,7 +2841,7 @@ spoe_release_buffer(struct buffer *buf, struct buffer_wait *buffer_wait)
        /* Release the buffer if needed */
        if (buf->size) {
                b_free(buf);
-               offer_buffers(buffer_wait->target, tasks_run_queue);
+               offer_buffers(buffer_wait->target, 1);
        }
 }
 
index 464a075a9608adf96433ad78c4398cbfbf07d6dd..494517b4430a28211e2c2a2ed05382e6ba58d935 100644 (file)
@@ -617,7 +617,7 @@ static inline void fcgi_release_buf(struct fcgi_conn *fconn, struct buffer *bptr
 {
        if (bptr->size) {
                b_free(bptr);
-               offer_buffers(NULL, tasks_run_queue);
+               offer_buffers(NULL, 1);
        }
 }
 
@@ -631,7 +631,7 @@ static inline void fcgi_release_mbuf(struct fcgi_conn *fconn)
                count++;
        }
        if (count)
-               offer_buffers(NULL, tasks_run_queue);
+               offer_buffers(NULL, count);
 }
 
 /* Returns the number of allocatable outgoing streams for the connection taking
@@ -1027,7 +1027,7 @@ static void fcgi_strm_destroy(struct fcgi_strm *fstrm)
        eb32_delete(&fstrm->by_id);
        if (b_size(&fstrm->rxbuf)) {
                b_free(&fstrm->rxbuf);
-               offer_buffers(NULL, tasks_run_queue);
+               offer_buffers(NULL, 1);
        }
        if (fstrm->subs)
                fstrm->subs->events = 0;
@@ -2910,7 +2910,7 @@ static int fcgi_send(struct fcgi_conn *fconn)
                }
 
                if (released)
-                       offer_buffers(NULL, tasks_run_queue);
+                       offer_buffers(NULL, released);
 
                /* wrote at least one byte, the buffer is not full anymore */
                if (fconn->flags & (FCGI_CF_MUX_MFULL | FCGI_CF_DEM_MROOM))
@@ -3224,7 +3224,7 @@ do_leave:
                }
 
                if (released)
-                       offer_buffers(NULL, tasks_run_queue);
+                       offer_buffers(NULL, released);
        }
 
   end:
index 8e17d727d62b029c9227781be27b59cacc5f63d9..1bfdab61c4298ad371decedb78a59e45bd6a48af 100644 (file)
@@ -465,7 +465,7 @@ static inline void h1_release_buf(struct h1c *h1c, struct buffer *bptr)
 {
        if (bptr->size) {
                b_free(bptr);
-               offer_buffers(h1c->buf_wait.target, tasks_run_queue);
+               offer_buffers(h1c->buf_wait.target, 1);
        }
 }
 
index 30bedc3e61539e488c2bfe0864830b8f9b31c90d..09b42684f6cd8c7400b3825a3d5410df3d2ca411 100644 (file)
@@ -819,7 +819,7 @@ static inline void h2_release_buf(struct h2c *h2c, struct buffer *bptr)
 {
        if (bptr->size) {
                b_free(bptr);
-               offer_buffers(NULL, tasks_run_queue);
+               offer_buffers(NULL, 1);
        }
 }
 
@@ -833,7 +833,7 @@ static inline void h2_release_mbuf(struct h2c *h2c)
                count++;
        }
        if (count)
-               offer_buffers(NULL, tasks_run_queue);
+               offer_buffers(NULL, count);
 }
 
 /* returns the number of allocatable outgoing streams for the connection taking
@@ -1393,7 +1393,7 @@ static void h2s_destroy(struct h2s *h2s)
        eb32_delete(&h2s->by_id);
        if (b_size(&h2s->rxbuf)) {
                b_free(&h2s->rxbuf);
-               offer_buffers(NULL, tasks_run_queue);
+               offer_buffers(NULL, 1);
        }
 
        if (h2s->subs)
@@ -3740,7 +3740,7 @@ static int h2_send(struct h2c *h2c)
                }
 
                if (released)
-                       offer_buffers(NULL, tasks_run_queue);
+                       offer_buffers(NULL, released);
 
                /* wrote at least one byte, the buffer is not full anymore */
                if (sent)
@@ -4049,7 +4049,7 @@ do_leave:
                }
 
                if (released)
-                       offer_buffers(NULL, tasks_run_queue);
+                       offer_buffers(NULL, released);
        }
 
        /* in any case this connection must not be considered idle anymore */
@@ -6242,7 +6242,7 @@ static size_t h2_rcv_buf(struct conn_stream *cs, struct buffer *buf, size_t coun
                        cs->flags |= CS_FL_ERROR;
                if (b_size(&h2s->rxbuf)) {
                        b_free(&h2s->rxbuf);
-                       offer_buffers(NULL, tasks_run_queue);
+                       offer_buffers(NULL, 1);
                }
        }
 
index 29d54c777e2cf83501978481e403c5d86a865510..6d534cdec24aaad12b3bb16e86e33384565f3f0e 100644 (file)
@@ -638,9 +638,11 @@ static void stream_free(struct stream *s)
                LIST_DEL_INIT(&s->buffer_wait.list);
 
        if (s->req.buf.size || s->res.buf.size) {
+               int count = !!s->req.buf.size + !!s->res.buf.size;
+
                b_free(&s->req.buf);
                b_free(&s->res.buf);
-               offer_buffers(NULL, tasks_run_queue);
+               offer_buffers(NULL, count);
        }
 
        pool_free(pool_head_uniqueid, s->unique_id.ptr);
@@ -788,11 +790,11 @@ void stream_release_buffers(struct stream *s)
        int offer = 0;
 
        if (c_size(&s->req) && c_empty(&s->req)) {
-               offer = 1;
+               offer++;
                b_free(&s->req.buf);
        }
        if (c_size(&s->res) && c_empty(&s->res)) {
-               offer = 1;
+               offer++;
                b_free(&s->res.buf);
        }
 
@@ -800,7 +802,7 @@ void stream_release_buffers(struct stream *s)
         * someone waiting, we can wake up a waiter and offer them.
         */
        if (offer)
-               offer_buffers(s, tasks_run_queue);
+               offer_buffers(s, offer);
 }
 
 void stream_process_counters(struct stream *s)