]> git.ipfire.org Git - thirdparty/haproxy.git/commitdiff
MAJOR: connection: split the send call into connection and stream interface
authorWilly Tarreau <wtarreau@exceliance.fr>
Tue, 21 Aug 2012 16:22:06 +0000 (18:22 +0200)
committerWilly Tarreau <w@1wt.eu>
Mon, 3 Sep 2012 18:47:31 +0000 (20:47 +0200)
Similar to what was done on the receive path, the data layer now provides
only an snd_buf() callback that is iterated over by the stream interface's
si_conn_send_loop() function.

The data layer now has no knowledge about channels nor stream interfaces.

The splice() code still need to be ported as it currently is disabled.

include/proto/connection.h
include/types/stream_interface.h
src/raw_sock.c
src/stream_interface.c

index d97978ebf90461c22a0e850e051c949ede66af2d..7205b2c4c565089fb743eef9b5c9ce196aba9c15 100644 (file)
@@ -37,16 +37,6 @@ static inline void conn_data_close(struct connection *conn)
                conn->data->close(conn);
 }
 
-/* Calls the snd_buf() function of the data layer if any, otherwise
- * returns 0.
- */
-static inline int conn_data_snd_buf(struct connection *conn)
-{
-       if (!conn->data->snd_buf)
-               return 0;
-       return conn->data->snd_buf(conn);
-}
-
 /* set polling depending on the change between the CURR part of the
  * flags and the new flags in connection C. The connection flags are
  * updated with the new flags at the end of the operation. Only the bits
index 018349bbccdb8a7f71bb8e49182a5bae56d8b9dc..ee552c2b93462b96ab9cf2fb81416055b987004a 100644 (file)
@@ -118,8 +118,8 @@ struct sock_ops {
        void (*read)(struct connection *conn);      /* read callback after poll() */
        void (*write)(struct connection *conn);     /* write callback after poll() */
        void (*close)(struct connection *);         /* close the data channel on the connection */
-       int  (*snd_buf)(struct connection *conn);   /* callback used to send a buffer contents */
        int  (*rcv_buf)(struct connection *conn, struct buffer *buf, int count); /* recv callback */
+       int  (*snd_buf)(struct connection *conn, struct buffer *buf, int flags); /* send callback */
 };
 
 /* A stream interface has 3 parts :
index 1bc40424af9e18b05d5530f98dd63d83f0a6ea7f..60c7ed0e8bffc048a59d5f929b7c0dc88672e7e9 100644 (file)
@@ -279,148 +279,60 @@ static int raw_sock_to_buf(struct connection *conn, struct buffer *buf, int coun
 }
 
 
-/*
- * This function is called to send buffer data to a stream socket.
- * It returns -1 in case of unrecoverable error, otherwise zero.
+/* Send all pending bytes from buffer <buf> to connection <conn>'s socket.
+ * <flags> may contain MSG_MORE to make the system hold on without sending
+ * data too fast.
+ * Only one call to send() is performed, unless the buffer wraps, in which case
+ * a second call may be performed. The connection's flags are updated with
+ * whatever special event is detected (error, empty). The caller is responsible
+ * for taking care of those events and avoiding the call if inappropriate. The
+ * function does not call the connection's polling update function, so the caller
+ * is responsible for this.
  */
-static int sock_raw_write_loop(struct connection *conn)
+static int raw_sock_from_buf(struct connection *conn, struct buffer *buf, int flags)
 {
-       struct stream_interface *si = container_of(conn, struct stream_interface, conn);
-       struct channel *b = si->ob;
-       int write_poll = MAX_WRITE_POLL_LOOPS;
-       int ret, max;
-
-#if 0 && defined(CONFIG_HAP_LINUX_SPLICE)
-       while (b->pipe) {
-               ret = splice(b->pipe->cons, NULL, si_fd(si), NULL, b->pipe->data,
-                            SPLICE_F_MOVE|SPLICE_F_NONBLOCK);
-               if (ret <= 0) {
-                       if (ret == 0 || errno == EAGAIN) {
-                               conn_data_poll_send(&si->conn);
-                               return 0;
-                       }
-                       /* here we have another error */
-                       return -1;
-               }
+       int ret, try, done, send_flag;
 
-               b->flags |= BF_WRITE_PARTIAL;
-               b->pipe->data -= ret;
-
-               if (!b->pipe->data) {
-                       put_pipe(b->pipe);
-                       b->pipe = NULL;
-                       break;
-               }
-
-               if (--write_poll <= 0)
-                       return 0;
-
-               /* The only reason we did not empty the pipe is that the output
-                * buffer is full.
-                */
-               conn_data_poll_send(&si->conn);
-               return 0;
-       }
-
-       /* At this point, the pipe is empty, but we may still have data pending
-        * in the normal buffer.
+       done = 0;
+       /* send the largest possible block. For this we perform only one call
+        * to send() unless the buffer wraps and we exactly fill the first hunk,
+        * in which case we accept to do it once again.
         */
-#endif
-       if (!b->buf.o) {
-               b->flags |= BF_OUT_EMPTY;
-               return 0;
-       }
-
-       /* when we're in this loop, we already know that there is no spliced
-        * data left, and that there are sendable buffered data.
-        */
-       while (1) {
-               max = b->buf.o;
-
+       while (buf->o) {
+               try = buf->o;
                /* outgoing data may wrap at the end */
-               if (b->buf.data + max > b->buf.p)
-                       max = b->buf.data + max - b->buf.p;
-
-               /* check if we want to inform the kernel that we're interested in
-                * sending more data after this call. We want this if :
-                *  - we're about to close after this last send and want to merge
-                *    the ongoing FIN with the last segment.
-                *  - we know we can't send everything at once and must get back
-                *    here because of unaligned data
-                *  - there is still a finite amount of data to forward
-                * The test is arranged so that the most common case does only 2
-                * tests.
-                */
-
-               if (MSG_NOSIGNAL && MSG_MORE) {
-                       unsigned int send_flag = MSG_DONTWAIT | MSG_NOSIGNAL;
-
-                       if ((!(b->flags & BF_NEVER_WAIT) &&
-                           ((b->to_forward && b->to_forward != BUF_INFINITE_FORWARD) ||
-                            (b->flags & BF_EXPECT_MORE))) ||
-                           ((b->flags & (BF_SHUTW|BF_SHUTW_NOW|BF_HIJACK)) == BF_SHUTW_NOW && (max == b->buf.o)) ||
-                           (max != b->buf.o)) {
-                               send_flag |= MSG_MORE;
-                       }
-
-                       /* this flag has precedence over the rest */
-                       if (b->flags & BF_SEND_DONTWAIT)
-                               send_flag &= ~MSG_MORE;
+               if (buf->data + try > buf->p)
+                       try = buf->data + try - buf->p;
 
-                       ret = send(si_fd(si), bo_ptr(&b->buf), max, send_flag);
-               } else {
-                       int skerr;
-                       socklen_t lskerr = sizeof(skerr);
+               send_flag = MSG_DONTWAIT | MSG_NOSIGNAL;
+               if (try < buf->o)
+                       send_flag = MSG_MORE;
 
-                       ret = getsockopt(si_fd(si), SOL_SOCKET, SO_ERROR, &skerr, &lskerr);
-                       if (ret == -1 || skerr)
-                               ret = -1;
-                       else
-                               ret = send(si_fd(si), bo_ptr(&b->buf), max, MSG_DONTWAIT);
-               }
+               ret = send(conn->t.sock.fd, bo_ptr(buf), try, send_flag | flags);
 
                if (ret > 0) {
-                       if (si->conn.flags & CO_FL_WAIT_L4_CONN) {
-                               si->conn.flags &= ~CO_FL_WAIT_L4_CONN;
-                               si->exp = TICK_ETERNITY;
-                       }
-
-                       b->flags |= BF_WRITE_PARTIAL;
+                       buf->o -= ret;
+                       done += ret;
 
-                       b->buf.o -= ret;
-                       if (likely(!buffer_len(&b->buf)))
+                       if (likely(!buffer_len(buf)))
                                /* optimize data alignment in the buffer */
-                               b->buf.p = b->buf.data;
-
-                       if (likely(!bi_full(b)))
-                               b->flags &= ~BF_FULL;
-
-                       if (!b->buf.o) {
-                               /* Always clear both flags once everything has been sent, they're one-shot */
-                               b->flags &= ~(BF_EXPECT_MORE | BF_SEND_DONTWAIT);
-                               if (likely(!b->pipe))
-                                       b->flags |= BF_OUT_EMPTY;
-                               break;
-                       }
+                               buf->p = buf->data;
 
                        /* if the system buffer is full, don't insist */
-                       if (ret < max)
-                               break;
-
-                       if (--write_poll <= 0)
+                       if (ret < try)
                                break;
                }
                else if (ret == 0 || errno == EAGAIN) {
                        /* nothing written, we need to poll for write first */
-                       conn_data_poll_send(&si->conn);
-                       return 0;
+                       conn->flags |= CO_FL_WAIT_ROOM;
+                       break;
                }
-               else {
-                       /* bad, we got an error */
-                       return -1;
+               else if (errno != EINTR) {
+                       conn->flags |= CO_FL_ERROR;
+                       break;
                }
-       } /* while (1) */
-       return 0;
+       }
+       return done;
 }
 
 
@@ -433,7 +345,7 @@ struct sock_ops raw_sock = {
        .chk_snd = stream_int_chk_snd_conn,
        .read    = si_conn_recv_cb,
        .write   = si_conn_send_cb,
-       .snd_buf = sock_raw_write_loop,
+       .snd_buf = raw_sock_from_buf,
        .rcv_buf = raw_sock_to_buf,
        .close   = NULL,
 };
index 2cc1962d196b1f87a7adda5a20939c6b9c65d8a9..eef6decfc588e6b67ad50c431b96111121073c03 100644 (file)
@@ -654,6 +654,118 @@ void conn_notify_si(struct connection *conn)
                si->ib->flags &= ~BF_READ_DONTWAIT;
 }
 
+/*
+ * This function is called to send buffer data to a stream socket.
+ * It returns -1 in case of unrecoverable error, otherwise zero.
+ * It iterates the data layer's snd_buf function.
+ */
+static int si_conn_send_loop(struct connection *conn)
+{
+       struct stream_interface *si = container_of(conn, struct stream_interface, conn);
+       struct channel *b = si->ob;
+       int write_poll = MAX_WRITE_POLL_LOOPS;
+       int ret;
+
+#if 0 && defined(CONFIG_HAP_LINUX_SPLICE)
+       while (b->pipe) {
+               ret = splice(b->pipe->cons, NULL, si_fd(si), NULL, b->pipe->data,
+                            SPLICE_F_MOVE|SPLICE_F_NONBLOCK);
+               if (ret <= 0) {
+                       if (ret == 0 || errno == EAGAIN) {
+                               conn_data_poll_send(&si->conn);
+                               return 0;
+                       }
+                       /* here we have another error */
+                       return -1;
+               }
+
+               b->flags |= BF_WRITE_PARTIAL;
+               b->pipe->data -= ret;
+
+               if (!b->pipe->data) {
+                       put_pipe(b->pipe);
+                       b->pipe = NULL;
+                       break;
+               }
+
+               if (--write_poll <= 0)
+                       return 0;
+
+               /* The only reason we did not empty the pipe is that the output
+                * buffer is full.
+                */
+               conn_data_poll_send(&si->conn);
+               return 0;
+       }
+
+       /* At this point, the pipe is empty, but we may still have data pending
+        * in the normal buffer.
+        */
+#endif
+       if (!b->buf.o) {
+               b->flags |= BF_OUT_EMPTY;
+               return 0;
+       }
+
+       /* when we're in this loop, we already know that there is no spliced
+        * data left, and that there are sendable buffered data.
+        */
+       conn->flags &= ~(CO_FL_WAIT_DATA | CO_FL_WAIT_ROOM);
+       while (!(conn->flags & (CO_FL_ERROR | CO_FL_SOCK_WR_SH | CO_FL_DATA_WR_SH | CO_FL_WAIT_DATA | CO_FL_WAIT_ROOM | CO_FL_HANDSHAKE))) {
+               /* check if we want to inform the kernel that we're interested in
+                * sending more data after this call. We want this if :
+                *  - we're about to close after this last send and want to merge
+                *    the ongoing FIN with the last segment.
+                *  - we know we can't send everything at once and must get back
+                *    here because of unaligned data
+                *  - there is still a finite amount of data to forward
+                * The test is arranged so that the most common case does only 2
+                * tests.
+                */
+               unsigned int send_flag = MSG_DONTWAIT | MSG_NOSIGNAL;
+
+               if ((!(b->flags & (BF_NEVER_WAIT|BF_SEND_DONTWAIT)) &&
+                    ((b->to_forward && b->to_forward != BUF_INFINITE_FORWARD) ||
+                     (b->flags & BF_EXPECT_MORE))) ||
+                   ((b->flags & (BF_SHUTW|BF_SHUTW_NOW|BF_HIJACK)) == BF_SHUTW_NOW))
+                       send_flag |= MSG_MORE;
+
+               ret = conn->data->snd_buf(conn, &b->buf, send_flag);
+               if (ret <= 0)
+                       break;
+
+               if (si->conn.flags & CO_FL_WAIT_L4_CONN)
+                       si->conn.flags &= ~CO_FL_WAIT_L4_CONN;
+
+               b->flags |= BF_WRITE_PARTIAL;
+
+               if (likely(!bi_full(b)))
+                       b->flags &= ~BF_FULL;
+
+               if (!b->buf.o) {
+                       /* Always clear both flags once everything has been sent, they're one-shot */
+                       b->flags &= ~(BF_EXPECT_MORE | BF_SEND_DONTWAIT);
+                       if (likely(!b->pipe))
+                               b->flags |= BF_OUT_EMPTY;
+                       break;
+               }
+
+               if (--write_poll <= 0)
+                       break;
+       } /* while */
+
+       if (conn->flags & CO_FL_ERROR)
+               return -1;
+
+       if (conn->flags & CO_FL_WAIT_ROOM) {
+               /* we need to poll before going on */
+               conn_data_poll_send(&si->conn);
+               return 0;
+       }
+       return 0;
+}
+
+
 /* Updates the timers and flags of a stream interface attached to a connection,
  * depending on the buffers' flags. It should only be called once after the
  * buffer flags have settled down, and before they are cleared. It doesn't
@@ -792,7 +904,7 @@ void stream_int_chk_snd_conn(struct stream_interface *si)
             (fdtab[si_fd(si)].ev & FD_POLL_OUT)))   /* we'll be called anyway */
                return;
 
-       if (conn_data_snd_buf(&si->conn) < 0) {
+       if (si_conn_send_loop(&si->conn) < 0) {
                /* Write error on the file descriptor. We mark the FD as STERROR so
                 * that we don't use it anymore and we notify the task.
                 */
@@ -1078,7 +1190,7 @@ void si_conn_send_cb(struct connection *conn)
                return;
 
        /* OK there are data waiting to be sent */
-       if (conn_data_snd_buf(conn) < 0)
+       if (si_conn_send_loop(conn) < 0)
                goto out_error;
 
        /* OK all done */