]> git.ipfire.org Git - thirdparty/haproxy.git/commitdiff
[MAJOR] implement autonomous inter-socket forwarding
authorWilly Tarreau <w@1wt.eu>
Sun, 14 Dec 2008 16:31:54 +0000 (17:31 +0100)
committerWilly Tarreau <w@1wt.eu>
Fri, 9 Jan 2009 09:15:02 +0000 (10:15 +0100)
If an analyser sets buf->to_forward to a given value, that many
data will be forwarded between the two stream interfaces attached
to a buffer without waking the task up. The same applies once all
analysers have been released. This saves a large amount of calls
to process_session() and a number of task_dequeue/queue.

include/common/defaults.h
include/proto/buffers.h
include/types/buffers.h
src/proto_uxst.c
src/session.c
src/stream_sock.c

index 05628e176393630d6b82e4a903b2cc8a0017b997..acba5c6eed8209457b641dc437e828888c688233 100644 (file)
@@ -2,8 +2,8 @@
   include/common/defaults.h
   Miscellaneous default values.
 
-  Copyright (C) 2000-2007 Willy Tarreau - w@1wt.eu
-  
+  Copyright (C) 2000-2008 Willy Tarreau - w@1wt.eu
+
   This library is free software; you can redistribute it and/or
   modify it under the terms of the GNU Lesser General Public
   License as published by the Free Software Foundation, version 2.1
 #define MAXREWRITE      (BUFSIZE / 2)
 #endif
 
+/* FORWARD_DEFAULT_SIZE
+ * Indicates how many bytes may be forwarded at once in low-level stream-socks
+ * without waking the owner task up. This should be much larger than the buffer
+ * size. A few megabytes seem appropriate.
+ */
+#ifndef FORWARD_DEFAULT_SIZE
+#define FORWARD_DEFAULT_SIZE (16*1024*1024)
+#endif
+
+
 #define REQURI_LEN      1024
 #define CAPTURE_LEN     64
 
index 64427963d2a02ad0f7f262a51fd7c782beb9c45b..ab0de118a9a4a95ce089ad07333303add39ffe34 100644 (file)
@@ -46,6 +46,7 @@ int init_buffer();
 static inline void buffer_init(struct buffer *buf)
 {
        buf->send_max = 0;
+       buf->to_forward = 0;
        buf->l = buf->total = 0;
        buf->analysers = 0;
        buf->cons = NULL;
@@ -92,6 +93,7 @@ static inline void buffer_check_timeouts(struct buffer *b)
 static inline void buffer_flush(struct buffer *buf)
 {
        buf->send_max = 0;
+       buf->to_forward = 0;
        buf->r = buf->lr = buf->w = buf->data;
        buf->l = 0;
        buf->flags |= BF_EMPTY | BF_FULL;
index d8f71188b4f1b12a4b327aefea1556be239cd2c2..f15d33daaacd9a176518dd9b7443531cf19bfa7d 100644 (file)
@@ -130,6 +130,7 @@ struct buffer {
        char *r, *w, *lr;               /* read ptr, write ptr, last read */
        char *rlim;                     /* read limit, used for header rewriting */
        unsigned int send_max;          /* number of bytes the sender can consume */
+       unsigned int to_forward;        /* number of bytes that can send without a wake-up, >= send_max */
        unsigned int analysers;         /* bit field indicating what to do on the buffer */
        int analyse_exp;                /* expiration date for current analysers (if set) */
        void (*hijacker)(struct session *, struct buffer *); /* alternative content producer */
index 6c1367936e9788e056d6623e07e7f9baf13e15cd..a6eed1332e6162bfb099f93fa8a9d8c20a48c38d 100644 (file)
@@ -809,6 +809,17 @@ void uxst_process_session(struct task *t, int *next)
        if (!s->req->analysers && !(s->req->flags & BF_HIJACK))
                s->req->send_max = s->req->l;
 
+       /* if noone is interested in analysing data, let's forward everything
+        * and only wake up every 1-2 MB. We still wake up when send_max is
+        * reached though.
+        */
+       if (!s->req->send_max && s->req->prod->state >= SI_ST_EST &&
+           !s->req->analysers && !(s->req->flags & BF_HIJACK)) {
+               if (s->req->to_forward < FORWARD_DEFAULT_SIZE)
+                       s->req->to_forward += FORWARD_DEFAULT_SIZE;
+               s->req->send_max = s->req->l;
+       }
+
        /* reflect what the L7 analysers have seen last */
        rqf_last = s->req->flags;
 
@@ -879,9 +890,17 @@ void uxst_process_session(struct task *t, int *next)
                        resync = 1;
        }
 
-       /* if noone is interested in analysing data, let's forward everything */
-       if (!s->rep->analysers && !(s->rep->flags & BF_HIJACK))
+       /* if noone is interested in analysing data, let's forward everything
+        * and only wake up every 1-2 MB. We still wake up when send_max is
+        * reached though.
+        */
+       if (!s->rep->send_max && s->rep->prod->state >= SI_ST_EST &&
+           !s->rep->analysers && !(s->rep->flags & BF_HIJACK)) {
+               if (s->rep->to_forward < FORWARD_DEFAULT_SIZE) {
+                       s->rep->to_forward += FORWARD_DEFAULT_SIZE;
+               }
                s->rep->send_max = s->rep->l;
+       }
 
        /* reflect what the L7 analysers have seen last */
        rpf_last = s->rep->flags;
index a775ba5319226cf1d3508d1ee395940c0bc59c28..97ce822a1e636b8ea2e47de813abf37e12bd19af 100644 (file)
@@ -746,9 +746,16 @@ resync_stream_interface:
                        resync = 1;
        }
 
-       /* if noone is interested in analysing data, let's forward everything */
-       if (!s->req->analysers && !(s->req->flags & BF_HIJACK))
+       /* if noone is interested in analysing data, let's forward everything
+        * and only wake up every 1-2 MB. We still wake up when send_max is
+        * reached though.
+        */
+       if (!s->req->send_max && s->req->prod->state >= SI_ST_EST &&
+           !s->req->analysers && !(s->req->flags & BF_HIJACK)) {
+               if (s->req->to_forward < FORWARD_DEFAULT_SIZE)
+                       s->req->to_forward += FORWARD_DEFAULT_SIZE;
                s->req->send_max = s->req->l;
+       }
 
        /* reflect what the L7 analysers have seen last */
        rqf_last = s->req->flags;
@@ -855,9 +862,17 @@ resync_stream_interface:
                        resync = 1;
        }
 
-       /* if noone is interested in analysing data, let's forward everything */
-       if (!s->rep->analysers && !(s->rep->flags & BF_HIJACK))
+       /* if noone is interested in analysing data, let's forward everything
+        * and only wake up every 1-2 MB. We still wake up when send_max is
+        * reached though.
+        */
+       if (!s->rep->send_max && s->rep->prod->state >= SI_ST_EST &&
+           !s->rep->analysers && !(s->rep->flags & BF_HIJACK)) {
+               if (s->rep->to_forward < FORWARD_DEFAULT_SIZE) {
+                       s->rep->to_forward += FORWARD_DEFAULT_SIZE;
+               }
                s->rep->send_max = s->rep->l;
+       }
 
        /* reflect what the L7 analysers have seen last */
        rpf_last = s->rep->flags;
@@ -870,7 +885,7 @@ resync_stream_interface:
         * FIXME: this is probably where we should produce error responses.
         */
 
-       /* first, let's check if the request buffer needs to shutdown(write) */
+       /* first, let's check if the response buffer needs to shutdown(write) */
        if (unlikely((s->rep->flags & (BF_SHUTW|BF_SHUTW_NOW|BF_EMPTY|BF_HIJACK|BF_WRITE_ENA|BF_SHUTR)) ==
                     (BF_EMPTY|BF_WRITE_ENA|BF_SHUTR)))
                buffer_shutw_now(s->rep);
index fdd0dbdf3a9b46968f1c4741d4e8c3581f5a7ce4..82e105555499bcc8474c4084e50443d3b786bcc0 100644 (file)
@@ -116,8 +116,8 @@ int stream_sock_read(int fd) {
                        cur_read += ret;
 
                        /* if noone is interested in analysing data, let's forward everything */
-                       if (!b->analysers)
-                               b->send_max += ret;
+                       if (b->to_forward > b->send_max)
+                               b->send_max = MIN(b->to_forward, b->l);
 
                        if (fdtab[fd].state == FD_STCONN)
                                fdtab[fd].state = FD_STREADY;
@@ -251,10 +251,17 @@ int stream_sock_read(int fd) {
                goto out_skip_wakeup;
  out_wakeup:
        /* the consumer might be waiting for data */
-       if (b->cons->flags & SI_FL_WAIT_DATA && (b->flags & BF_READ_PARTIAL))
+       if (b->cons->flags & SI_FL_WAIT_DATA && (b->flags & BF_READ_PARTIAL) && !(b->flags & BF_EMPTY))
                b->cons->chk_snd(b->cons);
 
-       task_wakeup(si->owner, TASK_WOKEN_IO);
+       /* we have to wake up if there is a special event or if we don't have
+        * any more data to forward.
+        */
+       if ((b->flags & (BF_READ_NULL|BF_READ_ERROR|BF_SHUTR)) ||
+           !b->to_forward ||
+           si->state != SI_ST_EST ||
+           b->cons->state != SI_ST_EST)
+               task_wakeup(si->owner, TASK_WOKEN_IO);
 
  out_skip_wakeup:
        fdtab[fd].ev &= ~FD_POLL_IN;
@@ -379,6 +386,13 @@ int stream_sock_write(int fd) {
                        b->l -= ret;
                        b->w += ret;
                        b->send_max -= ret;
+                       /* we can send up to send_max, we just want to know when
+                        * to_forward has been reached.
+                        */
+                       if ((signed)(b->to_forward - ret) >= 0)
+                               b->to_forward -= ret;
+                       else
+                               b->to_forward = 0;
 
                        if (fdtab[fd].state == FD_STCONN)
                                fdtab[fd].state = FD_STREADY;
@@ -453,10 +467,17 @@ int stream_sock_write(int fd) {
                goto out_skip_wakeup;
  out_wakeup:
        /* the producer might be waiting for more room to store data */
-       if ((b->prod->flags & SI_FL_WAIT_ROOM) && (b->flags & BF_WRITE_PARTIAL))
+       if ((b->prod->flags & SI_FL_WAIT_ROOM) && (b->flags & BF_WRITE_PARTIAL) && !(b->flags & BF_FULL))
                b->prod->chk_rcv(b->prod);
 
-       task_wakeup(si->owner, TASK_WOKEN_IO);
+       /* we have to wake up if there is a special event or if we don't have
+        * any more data to forward.
+        */
+       if ((b->flags & (BF_WRITE_NULL|BF_WRITE_ERROR|BF_SHUTW)) ||
+           !b->to_forward ||
+           si->state != SI_ST_EST ||
+           b->prod->state != SI_ST_EST)
+               task_wakeup(si->owner, TASK_WOKEN_IO);
 
  out_skip_wakeup:
        fdtab[fd].ev &= ~FD_POLL_OUT;