]> git.ipfire.org Git - thirdparty/haproxy.git/commitdiff
MAJOR: stream-interface: restore splicing mechanism
authorWilly Tarreau <w@1wt.eu>
Thu, 23 Aug 2012 22:46:52 +0000 (00:46 +0200)
committerWilly Tarreau <w@1wt.eu>
Mon, 3 Sep 2012 18:47:31 +0000 (20:47 +0200)
The splicing is now provided by the data-layer rcv_pipe/snd_pipe functions
which in turn are called by the stream interface's recv and send callbacks.

The presence of the rcv_pipe/snd_pipe functions is used to attest support
for splicing at the data layer. It looks like the stream-interface's
SI_FL_CAP_SPLICE flag does not make sense anymore as it's used as a proxy
for the pointers above.

It also appears that we call chk_snd() from the recv callback and then
try to call it again in update_conn(). It is very likely that this last
function will progressively slip into the recv/send callbacks in order
to avoid duplicate check code.

The code works right now with and without splicing. Only raw_sock provides
support for it and it is automatically selected when the various splice
options are set. However it looks like splice-auto doesn't enable it, which
possibly means that the streamer detection code does not work anymore, or
that it's only called at a time where it's too late to enable splicing (in
process_session).

include/types/stream_interface.h
src/proto_tcp.c
src/raw_sock.c
src/session.c
src/stream_interface.c

index ee552c2b93462b96ab9cf2fb81416055b987004a..921363ff0c9d6a302e873d0a8ad401f070bc5125 100644 (file)
@@ -96,6 +96,7 @@ struct server;
 struct proxy;
 struct si_applet;
 struct stream_interface;
+struct pipe;
 
 struct target {
        int type;
@@ -120,6 +121,8 @@ struct sock_ops {
        void (*close)(struct connection *);         /* close the data channel on the connection */
        int  (*rcv_buf)(struct connection *conn, struct buffer *buf, int count); /* recv callback */
        int  (*snd_buf)(struct connection *conn, struct buffer *buf, int flags); /* send callback */
+       int  (*rcv_pipe)(struct connection *conn, struct pipe *pipe, unsigned int count); /* recv-to-pipe callback */
+       int  (*snd_pipe)(struct connection *conn, struct pipe *pipe); /* send-to-pipe callback */
 };
 
 /* A stream interface has 3 parts :
index 86fd6bd512da5a8a5fd53cb362f270a9d94c9752..9ff5b666bd11ebd620e316807a9472f28474cfd1 100644 (file)
@@ -479,7 +479,8 @@ int tcp_connect_server(struct stream_interface *si)
                conn_data_want_send(&si->conn);  /* prepare to send data if any */
 
        si->state = SI_ST_CON;
-       si->flags |= SI_FL_CAP_SPLTCP; /* TCP supports splicing */
+       if (si->conn.data->rcv_pipe && si->conn.data->snd_pipe)
+               si->flags |= SI_FL_CAP_SPLTCP; /* TCP supports splicing */
        si->exp = tick_add_ifset(now_ms, be->timeout.connect);
 
        return SN_ERR_NONE;  /* connection is OK */
index 60c7ed0e8bffc048a59d5f929b7c0dc88672e7e9..035d3386061a735de2b59bd4ec7ea4c3792f4507 100644 (file)
@@ -43,7 +43,7 @@
 #include <types/global.h>
 
 
-#if 0 && defined(CONFIG_HAP_LINUX_SPLICE)
+#if defined(CONFIG_HAP_LINUX_SPLICE)
 #include <common/splice.h>
 
 /* A pipe contains 16 segments max, and it's common to see segments of 1448 bytes
 #define MAX_SPLICE_AT_ONCE     (1<<30)
 
 /* Returns :
- *   -1 if splice is not possible or not possible anymore and we must switch to
- *      user-land copy (eg: to_forward reached)
- *    0 otherwise, including errors and close.
- * Sets :
- *   BF_READ_NULL
- *   BF_READ_PARTIAL
- *   BF_WRITE_PARTIAL (during copy)
- *   BF_OUT_EMPTY (during copy)
- *   SI_FL_ERR
- *   SI_FL_WAIT_ROOM
- *   (SI_FL_WAIT_RECV)
- *
- * This function automatically allocates a pipe from the pipe pool. It also
- * carefully ensures to clear b->pipe whenever it leaves the pipe empty.
+ *   -1 if splice() is not supported
+ *   >= 0 to report the amount of spliced bytes.
+ *   connection flags are updated (error, read0, wait_room, wait_data).
+ *   The caller must have previously allocated the pipe.
  */
-static int sock_raw_splice_in(struct channel *b, struct stream_interface *si)
+int raw_sock_to_pipe(struct connection *conn, struct pipe *pipe, unsigned int count)
 {
        static int splice_detects_close;
-       int fd = si_fd(si);
        int ret;
-       unsigned long max;
        int retval = 0;
 
-       if (!b->to_forward)
-               return -1;
-
-       if (!(b->flags & BF_KERN_SPLICING))
-               return -1;
-
-       if (buffer_not_empty(&b->buf)) {
-               /* We're embarrassed, there are already data pending in
-                * the buffer and we don't want to have them at two
-                * locations at a time. Let's indicate we need some
-                * place and ask the consumer to hurry.
-                */
-               si->flags |= SI_FL_WAIT_ROOM;
-               conn_data_stop_recv(&si->conn);
-               b->rex = TICK_ETERNITY;
-               si_chk_snd(b->cons);
-               return 0;
-       }
-
-       if (unlikely(b->pipe == NULL)) {
-               if (pipes_used >= global.maxpipes || !(b->pipe = get_pipe())) {
-                       b->flags &= ~BF_KERN_SPLICING;
-                       return -1;
-               }
-       }
-
-       /* At this point, b->pipe is valid */
+       /* Under Linux, if FD_POLL_HUP is set, we have reached the end.
+        * Since older splice() implementations were buggy and returned
+        * EAGAIN on end of read, let's bypass the call to splice() now.
+        */
+       if ((fdtab[conn->t.sock.fd].ev & (FD_POLL_IN|FD_POLL_HUP)) == FD_POLL_HUP)
+               goto out_read0;
 
-       while (1) {
-               if (b->to_forward == BUF_INFINITE_FORWARD)
-                       max = MAX_SPLICE_AT_ONCE;
-               else
-                       max = b->to_forward;
+       while (count) {
+               if (count > MAX_SPLICE_AT_ONCE)
+                       count = MAX_SPLICE_AT_ONCE;
 
-               if (!max) {
-                       /* It looks like the buffer + the pipe already contain
-                        * the maximum amount of data to be transferred. Try to
-                        * send those data immediately on the other side if it
-                        * is currently waiting.
-                        */
-                       retval = -1; /* end of forwarding */
-                       break;
-               }
-
-               ret = splice(fd, NULL, b->pipe->prod, NULL, max,
+               ret = splice(conn->t.sock.fd, NULL, pipe->prod, NULL, count,
                             SPLICE_F_MOVE|SPLICE_F_NONBLOCK);
 
                if (ret <= 0) {
@@ -133,8 +88,7 @@ static int sock_raw_splice_in(struct channel *b, struct stream_interface *si)
                                 * it works, we store the info for later use.
                                 */
                                splice_detects_close = 1;
-                               b->flags |= BF_READ_NULL;
-                               break;
+                               goto out_read0;
                        }
 
                        if (errno == EAGAIN) {
@@ -142,13 +96,16 @@ static int sock_raw_splice_in(struct channel *b, struct stream_interface *si)
                                 *   - nothing in the socket buffer (standard)
                                 *   - pipe is full
                                 *   - the connection is closed (kernel < 2.6.27.13)
-                                * Since we don't know if pipe is full, we'll
-                                * stop if the pipe is not empty. Anyway, we
-                                * will almost always fill/empty the pipe.
+                                * The last case is annoying but know if we can detect it
+                                * and if we can't then we rely on the call to recv() to
+                                * get a valid verdict. The difference between the first
+                                * two situations is problematic. Since we don't know if
+                                * the pipe is full, we'll stop if the pipe is not empty.
+                                * Anyway, we will almost always fill/empty the pipe.
                                 */
-
-                               if (b->pipe->data) {
-                                       si->flags |= SI_FL_WAIT_ROOM;
+                               if (pipe->data) {
+                                       /* alway stop reading until the pipe is flushed */
+                                       conn->flags |= CO_FL_WAIT_ROOM;
                                        break;
                                }
 
@@ -161,48 +118,73 @@ static int sock_raw_splice_in(struct channel *b, struct stream_interface *si)
                                 * which will be able to deal with the situation.
                                 */
                                if (splice_detects_close)
-                                       conn_data_poll_recv(&si->conn); /* we know for sure that it's EAGAIN */
-                               else
-                                       retval = -1;
+                                       conn->flags |= CO_FL_WAIT_DATA; /* we know for sure that it's EAGAIN */
                                break;
                        }
-
-                       if (errno == ENOSYS || errno == EINVAL) {
-                               /* splice not supported on this end, disable it */
-                               b->flags &= ~BF_KERN_SPLICING;
-                               si->flags &= ~SI_FL_CAP_SPLICE;
-                               put_pipe(b->pipe);
-                               b->pipe = NULL;
+                       else if (errno == ENOSYS || errno == EINVAL) {
+                               /* splice not supported on this end, disable it.
+                                * We can safely return -1 since there is no
+                                * chance that any data has been piped yet.
+                                */
                                return -1;
                        }
-
+                       else if (errno == EINTR) {
+                               /* try again */
+                               continue;
+                       }
                        /* here we have another error */
-                       si->flags |= SI_FL_ERR;
+                       conn->flags |= CO_FL_ERROR;
                        break;
                } /* ret <= 0 */
 
-               if (b->to_forward != BUF_INFINITE_FORWARD)
-                       b->to_forward -= ret;
-               b->total += ret;
-               b->pipe->data += ret;
-               b->flags |= BF_READ_PARTIAL;
-               b->flags &= ~BF_OUT_EMPTY;
+               retval += ret;
+               pipe->data += ret;
 
-               if (b->pipe->data >= SPLICE_FULL_HINT ||
-                   ret >= global.tune.recv_enough) {
-                       /* We've read enough of it for this time. */
+               if (pipe->data >= SPLICE_FULL_HINT || ret >= global.tune.recv_enough) {
+                       /* We've read enough of it for this time, let's stop before
+                        * being asked to poll.
+                        */
                        break;
                }
        } /* while */
 
-       if (unlikely(!b->pipe->data)) {
-               put_pipe(b->pipe);
-               b->pipe = NULL;
-       }
+       return retval;
 
+ out_read0:
+       conn_sock_read0(conn);
        return retval;
 }
 
+/* Send as many bytes as possible from the pipe to the connection's socket.
+ */
+int raw_sock_from_pipe(struct connection *conn, struct pipe *pipe)
+{
+       int ret, done;
+
+       done = 0;
+       while (pipe->data) {
+               ret = splice(pipe->cons, NULL, conn->t.sock.fd, NULL, pipe->data,
+                            SPLICE_F_MOVE|SPLICE_F_NONBLOCK);
+
+               if (ret <= 0) {
+                       if (ret == 0 || errno == EAGAIN) {
+                               conn->flags |= CO_FL_WAIT_ROOM;
+                               break;
+                       }
+                       else if (errno == EINTR)
+                               continue;
+
+                       /* here we have another error */
+                       conn->flags |= CO_FL_ERROR;
+                       break;
+               }
+
+               done += ret;
+               pipe->data -= ret;
+       }
+       return done;
+}
+
 #endif /* CONFIG_HAP_LINUX_SPLICE */
 
 
@@ -347,6 +329,10 @@ struct sock_ops raw_sock = {
        .write   = si_conn_send_cb,
        .snd_buf = raw_sock_from_buf,
        .rcv_buf = raw_sock_to_buf,
+#if defined(CONFIG_HAP_LINUX_SPLICE)
+       .rcv_pipe = raw_sock_to_pipe,
+       .snd_pipe = raw_sock_from_pipe,
+#endif
        .close   = NULL,
 };
 
index fa3c4dcb52def4e5179142ae154f6d10f79269d6..80415b3be5dfbdc0e94268845f6005325311085a 100644 (file)
@@ -179,12 +179,13 @@ int session_accept(struct listener *l, int cfd, struct sockaddr_storage *addr)
        if (likely(s->fe->options2 & PR_O2_INDEPSTR))
                s->si[0].flags |= SI_FL_INDEP_STR;
 
-       if (addr->ss_family == AF_INET || addr->ss_family == AF_INET6)
-               s->si[0].flags = SI_FL_CAP_SPLTCP; /* TCP/TCPv6 splicing possible */
-
        /* add the various callbacks */
        stream_interface_prepare(&s->si[0], l->sock);
 
+       if ((s->si[0].conn.data->rcv_pipe && s->si[0].conn.data->snd_pipe) &&
+           (addr->ss_family == AF_INET || addr->ss_family == AF_INET6))
+               s->si[0].flags = SI_FL_CAP_SPLTCP; /* TCP/TCPv6 splicing possible */
+
        /* pre-initialize the other side's stream interface to an INIT state. The
         * callbacks will be initialized before attempting to connect.
         */
index eef6decfc588e6b67ad50c431b96111121073c03..2f24c18a6ed151992e261e9c0e2417c8ca07bd25 100644 (file)
@@ -30,6 +30,7 @@
 #include <proto/connection.h>
 #include <proto/fd.h>
 #include <proto/frontend.h>
+#include <proto/pipe.h>
 #include <proto/stream_interface.h>
 #include <proto/task.h>
 
@@ -666,42 +667,30 @@ static int si_conn_send_loop(struct connection *conn)
        int write_poll = MAX_WRITE_POLL_LOOPS;
        int ret;
 
-#if 0 && defined(CONFIG_HAP_LINUX_SPLICE)
-       while (b->pipe) {
-               ret = splice(b->pipe->cons, NULL, si_fd(si), NULL, b->pipe->data,
-                            SPLICE_F_MOVE|SPLICE_F_NONBLOCK);
-               if (ret <= 0) {
-                       if (ret == 0 || errno == EAGAIN) {
-                               conn_data_poll_send(&si->conn);
-                               return 0;
-                       }
-                       /* here we have another error */
-                       return -1;
-               }
+       conn->flags &= ~(CO_FL_WAIT_DATA | CO_FL_WAIT_ROOM);
 
-               b->flags |= BF_WRITE_PARTIAL;
-               b->pipe->data -= ret;
+       if (b->pipe && conn->data->snd_pipe) {
+               ret = conn->data->snd_pipe(conn, b->pipe);
+               if (ret > 0)
+                       b->flags |= BF_WRITE_PARTIAL;
 
                if (!b->pipe->data) {
                        put_pipe(b->pipe);
                        b->pipe = NULL;
-                       break;
                }
 
-               if (--write_poll <= 0)
-                       return 0;
+               if (conn->flags & CO_FL_ERROR)
+                       return -1;
 
-               /* The only reason we did not empty the pipe is that the output
-                * buffer is full.
-                */
-               conn_data_poll_send(&si->conn);
-               return 0;
+               if (conn->flags & CO_FL_WAIT_ROOM) {
+                       conn_data_poll_send(conn);
+                       return 0;
+               }
        }
 
        /* At this point, the pipe is empty, but we may still have data pending
         * in the normal buffer.
         */
-#endif
        if (!b->buf.o) {
                b->flags |= BF_OUT_EMPTY;
                return 0;
@@ -710,7 +699,6 @@ static int si_conn_send_loop(struct connection *conn)
        /* when we're in this loop, we already know that there is no spliced
         * data left, and that there are sendable buffered data.
         */
-       conn->flags &= ~(CO_FL_WAIT_DATA | CO_FL_WAIT_ROOM);
        while (!(conn->flags & (CO_FL_ERROR | CO_FL_SOCK_WR_SH | CO_FL_DATA_WR_SH | CO_FL_WAIT_DATA | CO_FL_WAIT_ROOM | CO_FL_HANDSHAKE))) {
                /* check if we want to inform the kernel that we're interested in
                 * sending more data after this call. We want this if :
@@ -1004,34 +992,69 @@ void si_conn_recv_cb(struct connection *conn)
        if (b->flags & BF_SHUTR)
                return;
 
-#if 0 && defined(CONFIG_HAP_LINUX_SPLICE)
-       if (b->to_forward >= MIN_SPLICE_FORWARD && b->flags & BF_KERN_SPLICING) {
+       cur_read = 0;
+       conn->flags &= ~(CO_FL_WAIT_DATA | CO_FL_WAIT_ROOM);
+
+       /* First, let's see if we may splice data across the channel without
+        * using a buffer.
+        */
+       if (conn->data->rcv_pipe &&
+           b->to_forward >= MIN_SPLICE_FORWARD && b->flags & BF_KERN_SPLICING) {
+               if (buffer_not_empty(&b->buf)) {
+                       /* We're embarrassed, there are already data pending in
+                        * the buffer and we don't want to have them at two
+                        * locations at a time. Let's indicate we need some
+                        * place and ask the consumer to hurry.
+                        */
+                       goto abort_splice;
+               }
 
-               /* Under Linux, if FD_POLL_HUP is set, we have reached the end.
-                * Since older splice() implementations were buggy and returned
-                * EAGAIN on end of read, let's bypass the call to splice() now.
-                */
-               if (fdtab[conn->t.sock.fd].ev & FD_POLL_HUP)
-                       goto out_shutdown_r;
+               if (unlikely(b->pipe == NULL)) {
+                       if (pipes_used >= global.maxpipes || !(b->pipe = get_pipe())) {
+                               b->flags &= ~BF_KERN_SPLICING;
+                               goto abort_splice;
+                       }
+               }
+
+               ret = conn->data->rcv_pipe(conn, b->pipe, b->to_forward);
+               if (ret < 0) {
+                       /* splice not supported on this end, let's disable it */
+                       b->flags &= ~BF_KERN_SPLICING;
+                       si->flags &= ~SI_FL_CAP_SPLICE;
+                       goto abort_splice;
+               }
 
-               if (sock_raw_splice_in(b, si) >= 0) {
-                       if (si->flags & SI_FL_ERR)
-                               goto out_error;
-                       if (b->flags & BF_READ_NULL)
-                               goto out_shutdown_r;
-                       return;
+               if (ret > 0) {
+                       if (b->to_forward != BUF_INFINITE_FORWARD)
+                               b->to_forward -= ret;
+                       b->total += ret;
+                       cur_read += ret;
+                       b->flags |= BF_READ_PARTIAL;
+                       b->flags &= ~BF_OUT_EMPTY;
                }
+
+               if (conn_data_read0_pending(conn))
+                       goto out_shutdown_r;
+
+               if (conn->flags & CO_FL_ERROR)
+                       goto out_error;
+
                /* splice not possible (anymore), let's go on on standard copy */
        }
-#endif
-       cur_read = 0;
-       conn->flags &= ~(CO_FL_WAIT_DATA | CO_FL_WAIT_ROOM);
-       while (!(conn->flags & (CO_FL_ERROR | CO_FL_SOCK_RD_SH | CO_FL_DATA_RD_SH | CO_FL_WAIT_DATA | CO_FL_WAIT_ROOM | CO_FL_HANDSHAKE))) {
+
+ abort_splice:
+       /* release the pipe if we can, which is almost always the case */
+       if (b->pipe && !b->pipe->data) {
+               put_pipe(b->pipe);
+               b->pipe = NULL;
+       }
+
+       while (!b->pipe && !(conn->flags & (CO_FL_ERROR | CO_FL_SOCK_RD_SH | CO_FL_DATA_RD_SH | CO_FL_WAIT_DATA | CO_FL_WAIT_ROOM | CO_FL_HANDSHAKE))) {
                max = bi_avail(b);
 
                if (!max) {
                        b->flags |= BF_FULL;
-                       si->flags |= SI_FL_WAIT_ROOM;
+                       conn->flags |= CO_FL_WAIT_ROOM;
                        break;
                }
 
@@ -1133,7 +1156,39 @@ void si_conn_recv_cb(struct connection *conn)
                }
        } /* while !flags */
 
-       if (conn->flags & CO_FL_WAIT_DATA) {
+       if (conn->flags & CO_FL_ERROR)
+               goto out_error;
+
+       if (conn->flags & CO_FL_WAIT_ROOM) {
+               /* We might have some data the consumer is waiting for.
+                * We can do fast-forwarding, but we avoid doing this for partial
+                * buffers, because it is very likely that it will be done again
+                * immediately afterwards once the following data is parsed (eg:
+                * HTTP chunking).
+                */
+               if (((b->flags & (BF_READ_PARTIAL|BF_OUT_EMPTY)) == BF_READ_PARTIAL) &&
+                   (b->pipe /* always try to send spliced data */ ||
+                    (b->buf.i == 0 && (b->cons->flags & SI_FL_WAIT_DATA)))) {
+                       int last_len = b->pipe ? b->pipe->data : 0;
+
+                       si_chk_snd(b->cons);
+
+                       /* check if the consumer has freed some space */
+                       if (!(b->flags & BF_FULL) &&
+                           (!last_len || !b->pipe || b->pipe->data < last_len))
+                               si->flags &= ~SI_FL_WAIT_ROOM;
+               }
+
+               if (si->flags & SI_FL_WAIT_ROOM) {
+                       conn_data_stop_recv(conn);
+                       b->rex = TICK_ETERNITY;
+               }
+               else if ((b->flags & (BF_SHUTR|BF_READ_PARTIAL|BF_FULL|BF_DONT_READ|BF_READ_NOEXP)) == BF_READ_PARTIAL) {
+                       if (tick_isset(b->rex))
+                               b->rex = tick_add_ifset(now_ms, b->rto);
+               }
+       }
+       else if (conn->flags & CO_FL_WAIT_DATA) {
                /* we don't automatically ask for polling if we have
                 * read enough data, as it saves some syscalls with
                 * speculative pollers.
@@ -1144,9 +1199,6 @@ void si_conn_recv_cb(struct connection *conn)
                        __conn_data_want_recv(conn);
        }
 
-       if (conn->flags & CO_FL_ERROR)
-               goto out_error;
-
        if (conn_data_read0_pending(conn))
                /* connection closed */
                goto out_shutdown_r;