]> git.ipfire.org Git - thirdparty/haproxy.git/commitdiff
[MEDIUM] splice: make use of pipe pools
authorWilly Tarreau <w@1wt.eu>
Sun, 25 Jan 2009 12:56:13 +0000 (13:56 +0100)
committerWilly Tarreau <w@1wt.eu>
Sun, 25 Jan 2009 12:56:13 +0000 (13:56 +0100)
Using pipe pools makes pipe management a lot easier. It also allows to
remove quite a bunch of #ifdefs in areas which depended on the presence
or not of support for kernel splicing.

The buffer now holds a pointer to a pipe structure which is always NULL
except if there are still data in the pipe. When it needs to use that
pipe, it dynamically allocates it from the pipe pool. When the data is
consumed, the pipe is immediately released.

That way, there is no need anymore to care about pipe closure upon
session termination, nor about pipe creation when trying to use
splice().

Another immediate advantage of this method is that it considerably
reduces the number of pipes needed to use splice(). Tests have shown
that even with 0.2 pipe per connection, almost all sessions can use
splice(), because the same pipe may be used by several consecutive
calls to splice().

include/proto/buffers.h
include/types/buffers.h
include/types/global.h
src/fd.c
src/haproxy.c
src/session.c
src/stream_sock.c

index 9ebc6ffc0a6e737efec5515e507c86bec4b6a52b..6b5771b6ab6263f6435c4825fb58453fc3a1b629 100644 (file)
@@ -48,15 +48,12 @@ static inline void buffer_init(struct buffer *buf)
        buf->send_max = 0;
        buf->to_forward = 0;
        buf->l = buf->total = 0;
-       buf->splice_len = 0;
+       buf->pipe = NULL;
        buf->analysers = 0;
        buf->cons = NULL;
        buf->flags = BF_EMPTY;
        buf->r = buf->lr = buf->w = buf->data;
        buf->max_len = BUFSIZE;
-#if defined(CONFIG_HAP_LINUX_SPLICE)
-       buf->splice.prod = buf->splice.cons = -1; /* closed */
-#endif
 }
 
 /* returns 1 if the buffer is empty, 0 otherwise */
index ca808095482ed8469829e3a0d1648dcc6bfa1cca..3b0c3a12ae9e8be40e0894cf773ee672e85a587b 100644 (file)
@@ -128,7 +128,6 @@ struct buffer {
        int wto;                        /* write timeout, in ticks */
        int cto;                        /* connect timeout, in ticks */
        unsigned int l;                 /* data length */
-       unsigned int splice_len;        /* number of bytes remaining in splice, out of buffer */
        char *r, *w, *lr;               /* read ptr, write ptr, last read */
        unsigned int max_len;           /* read limit, used to keep room for header rewriting */
        unsigned int send_max;          /* number of bytes the sender can consume om this buffer, <= l */
@@ -141,12 +140,7 @@ struct buffer {
        unsigned long long total;       /* total data read */
        struct stream_interface *prod;  /* producer attached to this buffer */
        struct stream_interface *cons;  /* consumer attached to this buffer */
-       struct {
-#if defined(CONFIG_HAP_LINUX_SPLICE)
-               int prod;               /* -1 or fd of the pipe's end towards the producer */
-               int cons;               /* -1 or fd of the pipe's end towards the consumer */
-#endif
-       } splice;
+       struct pipe *pipe;              /* non-NULL only when data present */
        char data[BUFSIZE];
 };
 
@@ -158,7 +152,7 @@ struct buffer {
    split in two parts :
      - the visible data (->data, for ->l bytes)
      - the invisible data, typically in kernel buffers forwarded directly from
-       the source stream sock to the destination stream sock (->splice_len
+       the source stream sock to the destination stream sock (->pipe->data
        bytes). Those are used only during forward.
 
    In order not to mix data streams, the producer may only feed the invisible
@@ -173,7 +167,7 @@ struct buffer {
    buffer is expected to reach the destination file descriptor, by any means.
    However, it's the consumer's responsibility to ensure that the invisible
    data has been entirely consumed before consuming visible data. This must be
-   reflected by ->splice_len. This is very important as this and only this can
+   reflected by ->pipe->data. This is very important as this and only this can
    ensure strict ordering of data between buffers.
 
    The producer is responsible for decreasing ->to_forward and increasing
@@ -184,7 +178,7 @@ struct buffer {
    well as any data forwarded through the visible buffer.
 
    The consumer is responsible for decreasing ->send_max when it sends data
-   from the visible buffer, and ->splice_len when it sends data from the
+   from the visible buffer, and ->pipe->data when it sends data from the
    invisible buffer.
 
    A real-world example consists in part in an HTTP response waiting in a
index 8c2893095c0c5f655ea9287ba0fa5056d9db893c..120977906545c7514238c65d0a2fd5b67d227ab5 100644 (file)
@@ -75,7 +75,6 @@ extern char *progname;          /* program name */
 extern int  pid;                /* current process id */
 extern int  relative_pid;       /* process id starting at 1 */
 extern int  actconn;            /* # of active sessions */
-extern int  usedpipes;          /* # of used pipes */
 extern int listeners;
 extern char trash[BUFSIZE];
 extern const int zero;
index c0fb71287872ea49f620c3e46e9daed1904dd4ac..daa347f31d467a15895bb675e824921a2bb151a0 100644 (file)
--- a/src/fd.c
+++ b/src/fd.c
 #include <common/compat.h>
 #include <common/config.h>
 
-//#include <types/global.h>
-
 #include <proto/fd.h>
 
 struct fdtab *fdtab = NULL;     /* array of all the file descriptors */
 int maxfd;                      /* # of the highest fd + 1 */
 int totalconn;                  /* total # of terminated sessions */
 int actconn;                    /* # of active sessions */
-int usedpipes;                  /* # of pipes in use (2 fds each) */
 
 int cfg_polling_mechanism = 0;  /* POLL_USE_{SELECT|POLL|EPOLL} */
 
index 9b83146e63772e07755239bb97224d52ac9193ea..fd4b050288d27e5986f37c450fc2f8b4b1d3db4c 100644 (file)
@@ -393,7 +393,7 @@ void init(int argc, char **argv)
         * Initialize the previously static variables.
         */
     
-       usedpipes = totalconn = actconn = maxfd = listeners = stopping = 0;
+       totalconn = actconn = maxfd = listeners = stopping = 0;
     
 
 #ifdef HAPROXY_MEMMAX
index 3f18ad9ce2b871c675261fa85eb5b244372ad761..c7dbf6f8a9f09de50f3dbf737aca749b9f3c2e48 100644 (file)
@@ -24,6 +24,7 @@
 #include <proto/hdr_idx.h>
 #include <proto/log.h>
 #include <proto/session.h>
+#include <proto/pipe.h>
 #include <proto/proto_http.h>
 #include <proto/proto_tcp.h>
 #include <proto/queue.h>
@@ -64,23 +65,11 @@ void session_free(struct session *s)
                sess_change_server(s, NULL);
        }
 
-#if defined(CONFIG_HAP_LINUX_SPLICE)
-       if (s->req->splice.prod >= 0)
-               close(s->req->splice.prod);
-       if (s->req->splice.cons >= 0)
-               close(s->req->splice.cons);
-       
-       if (s->req->splice.prod >= 0 || s->req->splice.cons >= 0)
-               usedpipes--;
-
-       if (s->rep->splice.prod >= 0)
-               close(s->rep->splice.prod);
-       if (s->rep->splice.cons >= 0)
-               close(s->rep->splice.cons);
-
-       if (s->rep->splice.prod >= 0 || s->rep->splice.cons >= 0)
-               usedpipes--;
-#endif
+       if (s->req->pipe)
+               put_pipe(s->req->pipe);
+
+       if (s->rep->pipe)
+               put_pipe(s->rep->pipe);
 
        pool_free2(pool2_buffer, s->req);
        pool_free2(pool2_buffer, s->rep);
@@ -772,7 +761,7 @@ resync_stream_interface:
            !s->req->analysers && !(s->req->flags & BF_HIJACK)) {
                /* check if it is wise to enable kernel splicing on the request buffer */
                if (!(s->req->flags & BF_KERN_SPLICING) &&
-                   (usedpipes < global.maxpipes) &&
+                   (pipes_used < global.maxpipes) &&
                    (((s->fe->options2|s->be->options2) & PR_O2_SPLIC_REQ) ||
                     (((s->fe->options2|s->be->options2) & PR_O2_SPLIC_AUT) &&
                      (s->req->flags & BF_STREAMER_FAST))))
@@ -895,7 +884,7 @@ resync_stream_interface:
            !s->rep->analysers && !(s->rep->flags & BF_HIJACK)) {
                /* check if it is wise to enable kernel splicing on the response buffer */
                if (!(s->rep->flags & BF_KERN_SPLICING) &&
-                   (usedpipes < global.maxpipes) &&
+                   (pipes_used < global.maxpipes) &&
                    (((s->fe->options2|s->be->options2) & PR_O2_SPLIC_RTR) ||
                     (((s->fe->options2|s->be->options2) & PR_O2_SPLIC_AUT) &&
                      (s->rep->flags & BF_STREAMER_FAST))))
index 47fd8fa698a55b2b1c26cd512de40affdc2aec66..ae4eb9880eec5f62f1e76ce9e1088791281cd58b 100644 (file)
@@ -1,7 +1,7 @@
 /*
  * Functions operating on SOCK_STREAM and buffers.
  *
- * Copyright 2000-2008 Willy Tarreau <w@1wt.eu>
+ * Copyright 2000-2009 Willy Tarreau <w@1wt.eu>
  *
  * This program is free software; you can redistribute it and/or
  * modify it under the terms of the GNU General Public License
@@ -30,6 +30,7 @@
 #include <proto/buffers.h>
 #include <proto/client.h>
 #include <proto/fd.h>
+#include <proto/pipe.h>
 #include <proto/stream_sock.h>
 #include <proto/task.h>
 
@@ -95,6 +96,9 @@ _syscall6(int, splice, int, fdin, loff_t *, off_in, int, fdout, loff_t *, off_ou
  *   SI_FL_ERR
  *   SI_FL_WAIT_ROOM
  *   (SI_FL_WAIT_RECV)
+ *
+ * This function automatically allocates a pipe from the pipe pool. It also
+ * carefully ensures to clear b->pipe whenever it leaves the pipe empty.
  */
 static int stream_sock_splice_in(struct buffer *b, struct stream_interface *si)
 {
@@ -121,17 +125,15 @@ static int stream_sock_splice_in(struct buffer *b, struct stream_interface *si)
                return 1;
        }
 
-       if (unlikely(b->splice.prod == -1)) {
-               int pipefd[2];
-               if (usedpipes >= global.maxpipes || pipe(pipefd) < 0) {
+       if (unlikely(b->pipe == NULL)) {
+               if (pipes_used >= global.maxpipes || !(b->pipe = get_pipe())) {
                        b->flags &= ~BF_KERN_SPLICING;
                        return -1;
                }
-               usedpipes++;
-               b->splice.prod = pipefd[1];
-               b->splice.cons = pipefd[0];
        }
 
+       /* At this point, b->pipe is valid */
+
        while (1) {
                max = b->to_forward;
                if (max <= 0) {
@@ -144,7 +146,7 @@ static int stream_sock_splice_in(struct buffer *b, struct stream_interface *si)
                        break;
                }
 
-               ret = splice(fd, NULL, b->splice.prod, NULL, max,
+               ret = splice(fd, NULL, b->pipe->prod, NULL, max,
                             SPLICE_F_MOVE|SPLICE_F_NONBLOCK);
 
                if (ret <= 0) {
@@ -167,7 +169,7 @@ static int stream_sock_splice_in(struct buffer *b, struct stream_interface *si)
                                 * will almost always fill/empty the pipe.
                                 */
 
-                               if (b->splice_len > 0) {
+                               if (b->pipe->data) {
                                        si->flags |= SI_FL_WAIT_ROOM;
                                        retval = 1;
                                        break;
@@ -191,17 +193,22 @@ static int stream_sock_splice_in(struct buffer *b, struct stream_interface *si)
                b->to_forward -= ret;
                total += ret;
                b->total += ret;
-               b->splice_len += ret;
+               b->pipe->data += ret;
                b->flags |= BF_READ_PARTIAL;
                b->flags &= ~BF_EMPTY; /* to prevent shutdowns */
 
-               if (b->splice_len >= SPLICE_FULL_HINT) {
+               if (b->pipe->data >= SPLICE_FULL_HINT) {
                        /* We've read enough of it for this time. */
                        retval = 1;
                        break;
                }
        } /* while */
 
+       if (unlikely(!b->pipe->data)) {
+               put_pipe(b->pipe);
+               b->pipe = NULL;
+       }
+
        return retval;
 }
 
@@ -431,13 +438,14 @@ int stream_sock_read(int fd) {
 
  out_wakeup:
        /* We might have some data the consumer is waiting for */
-       if ((b->send_max || b->splice_len) && (b->cons->flags & SI_FL_WAIT_DATA)) {
-               int last_len = b->splice_len;
+       if ((b->send_max || b->pipe) && (b->cons->flags & SI_FL_WAIT_DATA)) {
+               int last_len = b->pipe ? b->pipe->data : 0;
 
                b->cons->chk_snd(b->cons);
 
                /* check if the consumer has freed some space */
-               if (!(b->flags & BF_FULL) && (!last_len || b->splice_len < last_len))
+               if (!(b->flags & BF_FULL) &&
+                   (!last_len || !b->pipe || b->pipe->data < last_len))
                        si->flags &= ~SI_FL_WAIT_ROOM;
        }
 
@@ -487,7 +495,8 @@ int stream_sock_read(int fd) {
 /*
  * This function is called to send buffer data to a stream socket.
  * It returns -1 in case of unrecoverable error, 0 if the caller needs to poll
- * before calling it again, otherwise 1.
+ * before calling it again, otherwise 1. If a pipe was associated with the
+ * buffer and it empties it, it releases it as well.
  */
 static int stream_sock_write_loop(struct stream_interface *si, struct buffer *b)
 {
@@ -496,8 +505,8 @@ static int stream_sock_write_loop(struct stream_interface *si, struct buffer *b)
        int ret, max;
 
 #if defined(CONFIG_HAP_LINUX_SPLICE)
-       while (b->splice_len) {
-               ret = splice(b->splice.cons, NULL, si->fd, NULL, b->splice_len,
+       while (b->pipe) {
+               ret = splice(b->pipe->cons, NULL, si->fd, NULL, b->pipe->data,
                             SPLICE_F_MOVE|SPLICE_F_NONBLOCK);
                if (ret <= 0) {
                        if (ret == 0 || errno == EAGAIN) {
@@ -510,10 +519,13 @@ static int stream_sock_write_loop(struct stream_interface *si, struct buffer *b)
                }
 
                b->flags |= BF_WRITE_PARTIAL;
-               b->splice_len -= ret;
+               b->pipe->data -= ret;
 
-               if (!b->splice_len)
+               if (!b->pipe->data) {
+                       put_pipe(b->pipe);
+                       b->pipe = NULL;
                        break;
+               }
 
                if (--write_poll <= 0)
                        return retval;
@@ -575,7 +587,7 @@ static int stream_sock_write_loop(struct stream_interface *si, struct buffer *b)
                        if (likely(!b->l)) {
                                /* optimize data alignment in the buffer */
                                b->r = b->w = b->lr = b->data;
-                               if (likely(!b->splice_len))
+                               if (likely(!b->pipe))
                                        b->flags |= BF_EMPTY;
                        }
 
@@ -669,7 +681,7 @@ int stream_sock_write(int fd)
                 */
        }
 
-       if (!b->splice_len && !b->send_max) {
+       if (!b->pipe && !b->send_max) {
                /* the connection is established but we can't write. Either the
                 * buffer is empty, or we just refrain from sending because the
                 * send_max limit was reached. Maybe we just wrote the last
@@ -692,7 +704,7 @@ int stream_sock_write(int fd)
  out_may_wakeup:
        if (b->flags & BF_WRITE_ACTIVITY) {
                /* update timeout if we have written something */
-               if ((b->send_max || b->splice_len) &&
+               if ((b->send_max || b->pipe) &&
                    (b->flags & (BF_SHUTW|BF_WRITE_PARTIAL)) == BF_WRITE_PARTIAL)
                        b->wex = tick_add_ifset(now_ms, b->wto);
 
@@ -716,7 +728,7 @@ int stream_sock_write(int fd)
                 * any more data to forward and it's not planned to send any more.
                 */
                if (likely((b->flags & (BF_WRITE_NULL|BF_WRITE_ERROR|BF_SHUTW)) ||
-                          (!b->to_forward && !b->send_max && !b->splice_len) ||
+                          (!b->to_forward && !b->send_max && !b->pipe) ||
                           si->state != SI_ST_EST ||
                           b->prod->state != SI_ST_EST))
                        task_wakeup(si->owner, TASK_WOKEN_IO);
@@ -850,7 +862,7 @@ void stream_sock_data_finish(struct stream_interface *si)
        /* Check if we need to close the write side */
        if (!(ob->flags & BF_SHUTW)) {
                /* Write not closed, update FD status and timeout for writes */
-               if ((ob->send_max == 0 && ob->splice_len == 0) ||
+               if ((ob->send_max == 0 && !ob->pipe) ||
                    (ob->flags & BF_EMPTY) ||
                    (ob->flags & (BF_HIJACK|BF_WRITE_ENA)) == 0) {
                        /* stop writing */
@@ -938,7 +950,7 @@ void stream_sock_chk_snd(struct stream_interface *si)
 
        if (!(si->flags & SI_FL_WAIT_DATA) ||        /* not waiting for data */
            (fdtab[si->fd].ev & FD_POLL_OUT) ||      /* we'll be called anyway */
-           !(ob->send_max || ob->splice_len) ||     /* called with nothing to send ! */
+           !(ob->send_max || ob->pipe) ||           /* called with nothing to send ! */
            !(ob->flags & (BF_HIJACK|BF_WRITE_ENA))) /* we may not write */
                return;
 
@@ -953,7 +965,7 @@ void stream_sock_chk_snd(struct stream_interface *si)
                goto out_wakeup;
        }
 
-       if (retval > 0 || (ob->send_max == 0 && ob->splice_len == 0)) {
+       if (retval > 0 || (ob->send_max == 0 && !ob->pipe)) {
                /* the connection is established but we can't write. Either the
                 * buffer is empty, or we just refrain from sending because the
                 * send_max limit was reached. Maybe we just wrote the last
@@ -980,7 +992,7 @@ void stream_sock_chk_snd(struct stream_interface *si)
         * have to notify the task.
         */
        if (likely((ob->flags & (BF_WRITE_NULL|BF_WRITE_ERROR|BF_SHUTW)) ||
-                  (!ob->to_forward && !ob->send_max && !ob->splice_len) ||
+                  (!ob->to_forward && !ob->send_max && !ob->pipe) ||
                   si->state != SI_ST_EST)) {
        out_wakeup:
                task_wakeup(si->owner, TASK_WOKEN_IO);