]> git.ipfire.org Git - thirdparty/haproxy.git/commitdiff
MAJOR: channel: add a new flag CF_WAKE_WRITE to notify the task of writes
authorWilly Tarreau <w@1wt.eu>
Tue, 31 Dec 2013 16:26:25 +0000 (17:26 +0100)
committerWilly Tarreau <w@1wt.eu>
Tue, 31 Dec 2013 17:37:36 +0000 (18:37 +0100)
Since commit 6b66f3e ([MAJOR] implement autonomous inter-socket forwarding)
introduced in 1.3.16-rc1, we've been relying on a stupid mechanism to wake
up the task after a write, which was an exact copy-paste of the reader side.

The principle was that if we empty a buffer and there's no forwarding
scheduled or if the *producer* is not in a connected state, then we wake
the task up.

That does not make any sense. It happens to wake up too late sometimes (eg,
when the request analyser waits for some room in the buffer to start to
work), and leads to unneeded wakeups in client-side keep-alive, because
the task is woken up when the response is sent, while the analysers are
simply waiting for a new request.

In order to fix this, we introduce a new channel flag : CF_WAKE_WRITE. It
is designed so that an analyser can explicitly request being notified when
some data were written. It is used only when the HTTP request or response
analysers need to wait for more room in the buffers. It is automatically
cleared upon wake up.

The flag is also automatically set by the functions which try to write into
a buffer from an applet when they fail (bi_putblk() etc...).

That allows us to remove the stupid condition above and avoid some wakeups.
In http-server-close and in http-keep-alive modes, this reduces from 4 to 3
the average number of wakeups per request, and increases the overall
performance by about 1.5%.

include/types/channel.h
src/channel.c
src/dumpstats.c
src/proto_http.c
src/session.c
src/stream_interface.c

index 7f524c1f31b84123e030ef73204b32db558bcc9d..905308e1fd8c45b85e67e4e15f528357e2832736 100644 (file)
@@ -70,7 +70,7 @@
 #define CF_WRITE_ERROR    0x00000800  /* unrecoverable error on consumer side */
 #define CF_WRITE_ACTIVITY (CF_WRITE_NULL|CF_WRITE_PARTIAL|CF_WRITE_ERROR)
 
-/* unused: 0x00001000 */
+#define CF_WAKE_WRITE     0x00001000  /* wake the task up when there's write activity */
 #define CF_SHUTW          0x00002000  /* consumer has already shut down */
 #define CF_SHUTW_NOW      0x00004000  /* the consumer must shut down for writes ASAP */
 #define CF_AUTO_CLOSE     0x00008000  /* producer can forward shutdown to other side */
 #define CF_WAKE_ONCE      0x10000000  /* pretend there is activity on this channel (one-shoot) */
 /* unused: 0x20000000, 0x40000000, 0x80000000 */
 
-/* Use these masks to clear the flags before going back to lower layers */
-#define CF_CLEAR_READ     (~(CF_READ_NULL|CF_READ_PARTIAL|CF_READ_ERROR|CF_READ_ATTACHED))
-#define CF_CLEAR_WRITE    (~(CF_WRITE_NULL|CF_WRITE_PARTIAL|CF_WRITE_ERROR))
-#define CF_CLEAR_TIMEOUT  (~(CF_READ_TIMEOUT|CF_WRITE_TIMEOUT|CF_ANA_TIMEOUT))
-
 /* Masks which define input events for stream analysers */
 #define CF_MASK_ANALYSER  (CF_READ_ATTACHED|CF_READ_ACTIVITY|CF_READ_TIMEOUT|CF_ANA_TIMEOUT|CF_WRITE_ACTIVITY|CF_WAKE_ONCE)
 
index 2942d9241da5c42227d6073cc2de7e07bb097a7a..1250370767a04d8f73c502c11a5003ec5c00a233 100644 (file)
@@ -113,14 +113,18 @@ int bo_inject(struct channel *chn, const char *msg, int len)
  * input is closed, -2 is returned. If there is not enough room left in the
  * buffer, -1 is returned. Otherwise the number of bytes copied is returned
  * (1). Channel flag READ_PARTIAL is updated if some data can be transferred.
+ * Channel flag CF_WAKE_WRITE is set if the write fails because the buffer is
+ * full.
  */
 int bi_putchr(struct channel *chn, char c)
 {
        if (unlikely(channel_input_closed(chn)))
                return -2;
 
-       if (channel_full(chn))
+       if (channel_full(chn)) {
+               chn->flags |= CF_WAKE_WRITE;
                return -1;
+       }
 
        *bi_end(chn->buf) = c;
 
@@ -143,7 +147,8 @@ int bi_putchr(struct channel *chn, char c)
  * -3 is returned. If there is not enough room left in the buffer, -1 is
  * returned. Otherwise the number of bytes copied is returned (0 being a valid
  * number). Channel flag READ_PARTIAL is updated if some data can be
- * transferred.
+ * transferred. Channel flag CF_WAKE_WRITE is set if the write fails because
+ * the buffer is full.
  */
 int bi_putblk(struct channel *chn, const char *blk, int len)
 {
@@ -161,6 +166,7 @@ int bi_putblk(struct channel *chn, const char *blk, int len)
                if (len > max)
                        return -3;
 
+               chn->flags |= CF_WAKE_WRITE;
                return -1;
        }
 
index eeb662b0b9b05226cb14eb2e24a60683a2452c39..e313b5a96a92daba953931fd3c79816e193bd08b 100644 (file)
@@ -1965,8 +1965,10 @@ static void cli_io_handler(struct stream_interface *si)
                        /* ensure we have some output room left in the event we
                         * would want to return some info right after parsing.
                         */
-                       if (buffer_almost_full(si->ib->buf))
+                       if (buffer_almost_full(si->ib->buf)) {
+                               si->ib->flags |= CF_WAKE_WRITE;
                                break;
+                       }
 
                        reql = bo_getline(si->ob, trash.str, trash.size);
                        if (reql <= 0) { /* closed or EOL not found */
@@ -3360,8 +3362,10 @@ static int stats_dump_proxy_to_buffer(struct stream_interface *si, struct proxy
        case STAT_PX_ST_LI:
                /* stats.l has been initialized above */
                for (; appctx->ctx.stats.l != &px->conf.listeners; appctx->ctx.stats.l = l->by_fe.n) {
-                       if (buffer_almost_full(rep->buf))
+                       if (buffer_almost_full(rep->buf)) {
+                               rep->flags |= CF_WAKE_WRITE;
                                return 0;
+                       }
 
                        l = LIST_ELEM(appctx->ctx.stats.l, struct listener *, by_fe);
                        if (!l->counters)
@@ -3390,8 +3394,10 @@ static int stats_dump_proxy_to_buffer(struct stream_interface *si, struct proxy
                for (; appctx->ctx.stats.sv != NULL; appctx->ctx.stats.sv = sv->next) {
                        int sv_state; /* 0=DOWN, 1=going up, 2=going down, 3=UP, 4,5=NOLB, 6=unchecked */
 
-                       if (buffer_almost_full(rep->buf))
+                       if (buffer_almost_full(rep->buf)) {
+                               rep->flags |= CF_WAKE_WRITE;
                                return 0;
+                       }
 
                        sv = appctx->ctx.stats.sv;
 
@@ -3874,8 +3880,10 @@ static int stats_dump_stat_to_buffer(struct stream_interface *si, struct uri_aut
        case STAT_ST_LIST:
                /* dump proxies */
                while (appctx->ctx.stats.px) {
-                       if (buffer_almost_full(rep->buf))
+                       if (buffer_almost_full(rep->buf)) {
+                               rep->flags |= CF_WAKE_WRITE;
                                return 0;
+                       }
 
                        px = appctx->ctx.stats.px;
                        /* skip the disabled proxies, global frontend and non-networked ones */
index 18da38f7c57600732f5a9eccedcade3fd0152e50..16db90a41233e0542ba1a150f81b296f5c066b95 100644 (file)
@@ -2307,6 +2307,7 @@ int http_wait_for_request(struct session *s, struct channel *req, int an_bit)
                                /* some data has still not left the buffer, wake us once that's done */
                                channel_dont_connect(req);
                                req->flags |= CF_READ_DONTWAIT; /* try to get back here ASAP */
+                               req->flags |= CF_WAKE_WRITE;
                                return 0;
                        }
                        if (unlikely(bi_end(req->buf) < b_ptr(req->buf, msg->next) ||
@@ -2331,6 +2332,7 @@ int http_wait_for_request(struct session *s, struct channel *req, int an_bit)
                                /* don't let a connection request be initiated */
                                channel_dont_connect(req);
                                s->rep->flags &= ~CF_EXPECT_MORE; /* speed up sending a previous response */
+                               s->rep->flags |= CF_WAKE_WRITE;
                                s->rep->analysers |= an_bit; /* wake us up once it changes */
                                return 0;
                        }
@@ -5115,6 +5117,7 @@ int http_wait_for_response(struct session *s, struct channel *rep, int an_bit)
                                goto abort_response;
                        channel_dont_close(rep);
                        rep->flags |= CF_READ_DONTWAIT; /* try to get back here ASAP */
+                       rep->flags |= CF_WAKE_WRITE;
                        return 0;
                }
 
index f06d06bbf2ae4ca66d1c0bc7629dc0072a99a168..18d670ebd6e9b508a9519efc85b9558f3bb436b4 100644 (file)
@@ -1605,7 +1605,8 @@ struct task *process_session(struct task *t)
        memset(&s->txn.auth, 0, sizeof(s->txn.auth));
 
        /* This flag must explicitly be set every time */
-       s->req->flags &= ~CF_READ_NOEXP;
+       s->req->flags &= ~(CF_READ_NOEXP|CF_WAKE_WRITE);
+       s->rep->flags &= ~(CF_READ_NOEXP|CF_WAKE_WRITE);
 
        /* Keep a copy of req/rep flags so that we can detect shutdowns */
        rqf_last = s->req->flags & ~CF_MASK_ANALYSER;
index 9aa19072c76a372210bcf787689293b379ccf663..f38ddc13cf4f281a1c63459bf1fb390c8b0cb800 100644 (file)
@@ -205,9 +205,7 @@ static void stream_int_update_embedded(struct stream_interface *si)
            /* changes on the consumption side */
            (si->ob->flags & (CF_WRITE_NULL|CF_WRITE_ERROR)) ||
            ((si->ob->flags & CF_WRITE_ACTIVITY) &&
-            ((si->ob->flags & CF_SHUTW) ||
-             si->ob->prod->state != SI_ST_EST ||
-             (channel_is_empty(si->ob) && !si->ob->to_forward)))) {
+            (si->ob->flags & (CF_SHUTW|CF_WAKE_WRITE)))) {
                if (!(si->flags & SI_FL_DONT_WAKE) && si->owner)
                        task_wakeup(si->owner, TASK_WOKEN_IO);
        }
@@ -630,9 +628,7 @@ static int si_conn_wake_cb(struct connection *conn)
            /* changes on the consumption side */
            (si->ob->flags & (CF_WRITE_NULL|CF_WRITE_ERROR)) ||
            ((si->ob->flags & CF_WRITE_ACTIVITY) &&
-            ((si->ob->flags & CF_SHUTW) ||
-             si->ob->prod->state != SI_ST_EST ||
-             (channel_is_empty(si->ob) && !si->ob->to_forward)))) {
+            (si->ob->flags & (CF_SHUTW|CF_WAKE_WRITE)))) {
                task_wakeup(si->owner, TASK_WOKEN_IO);
        }
        if (si->ib->flags & CF_READ_ACTIVITY)
@@ -1045,9 +1041,7 @@ static void stream_int_chk_snd_conn(struct stream_interface *si)
        /* in case of special condition (error, shutdown, end of write...), we
         * have to notify the task.
         */
-       if (likely((ob->flags & (CF_WRITE_NULL|CF_WRITE_ERROR|CF_SHUTW)) ||
-                  (channel_is_empty(ob) && !ob->to_forward) ||
-                  si->state != SI_ST_EST)) {
+       if (likely(ob->flags & (CF_WRITE_NULL|CF_WRITE_ERROR|CF_SHUTW|CF_WAKE_WRITE))) {
        out_wakeup:
                if (!(si->flags & SI_FL_DONT_WAKE) && si->owner)
                        task_wakeup(si->owner, TASK_WOKEN_IO);