]> git.ipfire.org Git - thirdparty/haproxy.git/commitdiff
MAJOR: raw_sock: extract raw_sock_to_buf() from raw_sock_read()
authorWilly Tarreau <wtarreau@exceliance.fr>
Mon, 20 Aug 2012 15:30:32 +0000 (17:30 +0200)
committerWilly Tarreau <w@1wt.eu>
Mon, 3 Sep 2012 18:47:30 +0000 (20:47 +0200)
This is the start of the stream connection iterator which calls the
data-layer reader. This still looks a bit tricky but is OK. Splicing
is not handled at all at the moment.

include/types/connection.h
src/raw_sock.c

index 8dd4f20a310be8c3bbe04c3697b820f8bb2ab156..6d402cff565cdbdb9e7c34112fb970feb34104f0 100644 (file)
@@ -73,6 +73,13 @@ enum {
         */
        CO_FL_POLL_SOCK     = CO_FL_HANDSHAKE | CO_FL_WAIT_L4_CONN | CO_FL_WAIT_L6_CONN,
 
+       /* These flags are used by data layers to indicate to their iterators
+        * whether they had to stop due to missing data or missing room. Their
+        * callers must reset them before calling the data layer handlers.
+        */
+       CO_FL_WAIT_DATA     = 0x00004000,  /* data source is empty */
+       CO_FL_WAIT_ROOM     = 0x00008000,  /* data sink is full */
+
        /* flags used to remember what shutdown have been performed/reported */
        CO_FL_DATA_RD_SH    = 0x00010000,  /* DATA layer was notified about shutr/read0 */
        CO_FL_DATA_WR_SH    = 0x00020000,  /* DATA layer asked for shutw */
index 28ee222358cd3ada8fb0efab63bff60ffda3c415..2c96cbbce26379d3a4f72ef24c7342be57adc184 100644 (file)
@@ -209,19 +209,91 @@ static int sock_raw_splice_in(struct channel *b, struct stream_interface *si)
 #endif /* CONFIG_HAP_LINUX_SPLICE */
 
 
+/* Receive up to <count> bytes from connection <conn>'s socket and store them
+ * into buffer <buf>. The caller must ensure that <count> is always smaller
+ * than the buffer's size. Only one call to recv() is performed, unless the
+ * buffer wraps, in which case a second call may be performed. The connection's
+ * flags are updated with whatever special event is detected (error, read0,
+ * empty). The caller is responsible for taking care of those events and
+ * avoiding the call if inappropriate. The function does not call the
+ * connection's polling update function, so the caller is responsible for this.
+ */
+static int raw_sock_to_buf(struct connection *conn, struct buffer *buf, int count)
+{
+       int ret, done = 0;
+       int try = count;
+
+       /* stop here if we reached the end of data */
+       if ((fdtab[conn->t.sock.fd].ev & (FD_POLL_IN|FD_POLL_HUP)) == FD_POLL_HUP)
+               goto read0;
+
+       /* compute the maximum block size we can read at once. */
+       if (buffer_empty(buf)) {
+               /* let's realign the buffer to optimize I/O */
+               buf->p = buf->data;
+       }
+       else if (buf->data + buf->o < buf->p &&
+                buf->p + buf->i < buf->data + buf->size) {
+               /* remaining space wraps at the end, with a moving limit */
+               if (try > buf->data + buf->size - (buf->p + buf->i))
+                       try = buf->data + buf->size - (buf->p + buf->i);
+       }
+
+       /* read the largest possible block. For this, we perform only one call
+        * to recv() unless the buffer wraps and we exactly fill the first hunk,
+        * in which case we accept to do it once again. A new attempt is made on
+        * EINTR too.
+        */
+       while (try) {
+               ret = recv(conn->t.sock.fd, bi_end(buf), try, 0);
+
+               if (ret > 0) {
+                       buf->i += ret;
+                       done += ret;
+                       if (ret < try) {
+                               /* unfortunately, on level-triggered events, POLL_HUP
+                                * is generally delivered AFTER the system buffer is
+                                * empty, so this one might never match.
+                                */
+                               if (fdtab[conn->t.sock.fd].ev & FD_POLL_HUP)
+                                       goto read0;
+                               break;
+                       }
+                       count -= ret;
+                       try = count;
+               }
+               else if (ret == 0) {
+                       goto read0;
+               }
+               else if (errno == EAGAIN) {
+                       conn->flags |= CO_FL_WAIT_DATA;
+                       break;
+               }
+               else if (errno != EINTR) {
+                       conn->flags |= CO_FL_ERROR;
+                       break;
+               }
+       }
+       return done;
+
+ read0:
+       conn_sock_read0(conn);
+       return done;
+}
+
+
 /*
  * this function is called on a read event from a stream socket.
  */
 static void sock_raw_read(struct connection *conn)
 {
-       int fd = conn->t.sock.fd;
        struct stream_interface *si = container_of(conn, struct stream_interface, conn);
        struct channel *b = si->ib;
        int ret, max, cur_read;
        int read_poll = MAX_READ_POLL_LOOPS;
 
 #ifdef DEBUG_FULL
-       fprintf(stderr,"sock_raw_read : fd=%d, ev=0x%02x, owner=%p\n", fd, fdtab[fd].ev, fdtab[fd].owner);
+       fprintf(stderr,"sock_raw_read : fd=%d, ev=0x%02x, owner=%p\n", conn->t.sock.fd, fdtab[conn->t.sock.fd].ev, fdtab[conn->t.sock.fd].owner);
 #endif
        /* stop immediately on errors. Note that we DON'T want to stop on
         * POLL_ERR, as the poller might report a write error while there
@@ -233,7 +305,7 @@ static void sock_raw_read(struct connection *conn)
                goto out_error;
 
        /* stop here if we reached the end of data */
-       if ((fdtab[fd].ev & (FD_POLL_IN|FD_POLL_HUP)) == FD_POLL_HUP)
+       if (conn_data_read0_pending(conn))
                goto out_shutdown_r;
 
        /* maybe we were called immediately after an asynchronous shutr */
@@ -247,7 +319,7 @@ static void sock_raw_read(struct connection *conn)
                 * Since older splice() implementations were buggy and returned
                 * EAGAIN on end of read, let's bypass the call to splice() now.
                 */
-               if (fdtab[fd].ev & FD_POLL_HUP)
+               if (fdtab[conn->t.sock.fd].ev & FD_POLL_HUP)
                        goto out_shutdown_r;
 
                if (sock_raw_splice_in(b, si) >= 0) {
@@ -261,7 +333,8 @@ static void sock_raw_read(struct connection *conn)
        }
 #endif
        cur_read = 0;
-       while (1) {
+       conn->flags &= ~(CO_FL_WAIT_DATA | CO_FL_WAIT_ROOM);
+       while (!(conn->flags & (CO_FL_ERROR | CO_FL_SOCK_RD_SH | CO_FL_DATA_RD_SH | CO_FL_WAIT_DATA | CO_FL_WAIT_ROOM | CO_FL_HANDSHAKE))) {
                max = bi_avail(b);
 
                if (!max) {
@@ -270,172 +343,133 @@ static void sock_raw_read(struct connection *conn)
                        break;
                }
 
-               /*
-                * 1. compute the maximum block size we can read at once.
-                */
-               if (buffer_empty(&b->buf)) {
-                       /* let's realign the buffer to optimize I/O */
-                       b->buf.p = b->buf.data;
-               }
-               else if (b->buf.data + b->buf.o < b->buf.p &&
-                        b->buf.p + b->buf.i < b->buf.data + b->buf.size) {
-                       /* remaining space wraps at the end, with a moving limit */
-                       if (max > b->buf.data + b->buf.size - (b->buf.p + b->buf.i))
-                               max = b->buf.data + b->buf.size - (b->buf.p + b->buf.i);
-               }
-               /* else max is already OK */
+               ret = raw_sock_to_buf(conn, &b->buf, max);
+               if (ret <= 0)
+                       break;
 
-               /*
-                * 2. read the largest possible block
-                */
-               ret = recv(fd, bi_end(&b->buf), max, 0);
+               cur_read += ret;
 
-               if (ret > 0) {
-                       b->buf.i += ret;
-                       cur_read += ret;
-
-                       /* if we're allowed to directly forward data, we must update ->o */
-                       if (b->to_forward && !(b->flags & (BF_SHUTW|BF_SHUTW_NOW))) {
-                               unsigned long fwd = ret;
-                               if (b->to_forward != BUF_INFINITE_FORWARD) {
-                                       if (fwd > b->to_forward)
-                                               fwd = b->to_forward;
-                                       b->to_forward -= fwd;
-                               }
-                               b_adv(b, fwd);
+               /* if we're allowed to directly forward data, we must update ->o */
+               if (b->to_forward && !(b->flags & (BF_SHUTW|BF_SHUTW_NOW))) {
+                       unsigned long fwd = ret;
+                       if (b->to_forward != BUF_INFINITE_FORWARD) {
+                               if (fwd > b->to_forward)
+                                       fwd = b->to_forward;
+                               b->to_forward -= fwd;
                        }
+                       b_adv(b, fwd);
+               }
 
-                       if (conn->flags & CO_FL_WAIT_L4_CONN) {
-                               conn->flags &= ~CO_FL_WAIT_L4_CONN;
-                               si->exp = TICK_ETERNITY;
-                       }
+               if (conn->flags & CO_FL_WAIT_L4_CONN) {
+                       conn->flags &= ~CO_FL_WAIT_L4_CONN;
+                       si->exp = TICK_ETERNITY;
+               }
 
-                       b->flags |= BF_READ_PARTIAL;
-                       b->total += ret;
+               b->flags |= BF_READ_PARTIAL;
+               b->total += ret;
 
-                       if (bi_full(b)) {
-                               /* The buffer is now full, there's no point in going through
-                                * the loop again.
-                                */
-                               if (!(b->flags & BF_STREAMER_FAST) && (cur_read == buffer_len(&b->buf))) {
-                                       b->xfer_small = 0;
-                                       b->xfer_large++;
-                                       if (b->xfer_large >= 3) {
-                                               /* we call this buffer a fast streamer if it manages
-                                                * to be filled in one call 3 consecutive times.
-                                                */
-                                               b->flags |= (BF_STREAMER | BF_STREAMER_FAST);
-                                               //fputc('+', stderr);
-                                       }
-                               }
-                               else if ((b->flags & (BF_STREAMER | BF_STREAMER_FAST)) &&
-                                        (cur_read <= b->buf.size / 2)) {
-                                       b->xfer_large = 0;
-                                       b->xfer_small++;
-                                       if (b->xfer_small >= 2) {
-                                               /* if the buffer has been at least half full twice,
-                                                * we receive faster than we send, so at least it
-                                                * is not a "fast streamer".
-                                                */
-                                               b->flags &= ~BF_STREAMER_FAST;
-                                               //fputc('-', stderr);
-                                       }
-                               }
-                               else {
-                                       b->xfer_small = 0;
-                                       b->xfer_large = 0;
+               if (bi_full(b)) {
+                       /* The buffer is now full, there's no point in going through
+                        * the loop again.
+                        */
+                       if (!(b->flags & BF_STREAMER_FAST) && (cur_read == buffer_len(&b->buf))) {
+                               b->xfer_small = 0;
+                               b->xfer_large++;
+                               if (b->xfer_large >= 3) {
+                                       /* we call this buffer a fast streamer if it manages
+                                        * to be filled in one call 3 consecutive times.
+                                        */
+                                       b->flags |= (BF_STREAMER | BF_STREAMER_FAST);
+                                       //fputc('+', stderr);
                                }
-
-                               b->flags |= BF_FULL;
-                               si->flags |= SI_FL_WAIT_ROOM;
-                               break;
                        }
-
-                       /* if too many bytes were missing from last read, it means that
-                        * it's pointless trying to read again because the system does
-                        * not have them in buffers. BTW, if FD_POLL_HUP was present,
-                        * it means that we have reached the end and that the connection
-                        * is closed.
-                        */
-                       if (ret < max) {
-                               if ((b->flags & (BF_STREAMER | BF_STREAMER_FAST)) &&
-                                   (cur_read <= b->buf.size / 2)) {
-                                       b->xfer_large = 0;
-                                       b->xfer_small++;
-                                       if (b->xfer_small >= 3) {
-                                               /* we have read less than half of the buffer in
-                                                * one pass, and this happened at least 3 times.
-                                                * This is definitely not a streamer.
-                                                */
-                                               b->flags &= ~(BF_STREAMER | BF_STREAMER_FAST);
-                                               //fputc('!', stderr);
-                                       }
+                       else if ((b->flags & (BF_STREAMER | BF_STREAMER_FAST)) &&
+                                (cur_read <= b->buf.size / 2)) {
+                               b->xfer_large = 0;
+                               b->xfer_small++;
+                               if (b->xfer_small >= 2) {
+                                       /* if the buffer has been at least half full twice,
+                                        * we receive faster than we send, so at least it
+                                        * is not a "fast streamer".
+                                        */
+                                       b->flags &= ~BF_STREAMER_FAST;
+                                       //fputc('-', stderr);
                                }
-                               /* unfortunately, on level-triggered events, POLL_HUP
-                                * is generally delivered AFTER the system buffer is
-                                * empty, so this one might never match.
-                                */
-                               if (fdtab[fd].ev & FD_POLL_HUP)
-                                       goto out_shutdown_r;
+                       }
+                       else {
+                               b->xfer_small = 0;
+                               b->xfer_large = 0;
+                       }
 
-                               /* if a streamer has read few data, it may be because we
-                                * have exhausted system buffers. It's not worth trying
-                                * again.
-                                */
-                               if (b->flags & BF_STREAMER)
-                                       break;
+                       b->flags |= BF_FULL;
+                       si->flags |= SI_FL_WAIT_ROOM;
+                       break;
+               }
 
-                               /* generally if we read something smaller than 1 or 2 MSS,
-                                * it means that either we have exhausted the system's
-                                * buffers (streamer or question-response protocol) or
-                                * that the connection will be closed. Streamers are
-                                * easily detected so we return early. For other cases,
-                                * it's still better to perform a last read to be sure,
-                                * because it may save one complete poll/read/wakeup cycle
-                                * in case of shutdown.
-                                */
-                               if (ret < MIN_RET_FOR_READ_LOOP && b->flags & BF_STREAMER)
-                                       break;
+               if ((b->flags & BF_READ_DONTWAIT) || --read_poll <= 0)
+                       break;
 
-                               /* if we read a large block smaller than what we requested,
-                                * it's almost certain we'll never get anything more.
-                                */
-                               if (ret >= global.tune.recv_enough)
-                                       break;
+               /* if too many bytes were missing from last read, it means that
+                * it's pointless trying to read again because the system does
+                * not have them in buffers.
+                */
+               if (ret < max) {
+                       if ((b->flags & (BF_STREAMER | BF_STREAMER_FAST)) &&
+                           (cur_read <= b->buf.size / 2)) {
+                               b->xfer_large = 0;
+                               b->xfer_small++;
+                               if (b->xfer_small >= 3) {
+                                       /* we have read less than half of the buffer in
+                                        * one pass, and this happened at least 3 times.
+                                        * This is definitely not a streamer.
+                                        */
+                                       b->flags &= ~(BF_STREAMER | BF_STREAMER_FAST);
+                                       //fputc('!', stderr);
+                               }
                        }
 
-                       if ((b->flags & BF_READ_DONTWAIT) || --read_poll <= 0)
+                       /* if a streamer has read few data, it may be because we
+                        * have exhausted system buffers. It's not worth trying
+                        * again.
+                        */
+                       if (b->flags & BF_STREAMER)
                                break;
-               }
-               else if (ret == 0) {
-                       /* connection closed */
-                       goto out_shutdown_r;
-               }
-               else if (errno == EAGAIN) {
-                       /* Ignore EAGAIN but inform the poller that there is
-                        * nothing to read left if we did not read much, ie
-                        * less than what we were still expecting to read.
-                        * But we may have done some work justifying to notify
-                        * the task.
+
+                       /* if we read a large block smaller than what we requested,
+                        * it's almost certain we'll never get anything more.
                         */
-                       if (cur_read < MIN_RET_FOR_READ_LOOP)
-                               conn_data_poll_recv(conn);
-                       break;
-               }
-               else {
-                       goto out_error;
+                       if (ret >= global.tune.recv_enough)
+                               break;
                }
-       } /* while (1) */
+       } /* while !flags */
+
+       if (conn->flags & CO_FL_ERROR)
+               goto out_error;
+
+       if (conn->flags & CO_FL_WAIT_DATA) {
+               /* we don't automatically ask for polling if we have
+                * read enough data, as it saves some syscalls with
+                * speculative pollers.
+                */
+               if (cur_read < MIN_RET_FOR_READ_LOOP)
+                       __conn_data_poll_recv(conn);
+               else
+                       __conn_data_want_recv(conn);
+       }
+
+       if (conn_data_read0_pending(conn))
+               /* connection closed */
+               goto out_shutdown_r;
 
        return;
 
  out_shutdown_r:
        /* we received a shutdown */
-       fdtab[fd].ev &= ~FD_POLL_HUP;
        b->flags |= BF_READ_NULL;
        if (b->flags & BF_AUTO_CLOSE)
                buffer_shutw_now(b);
        stream_sock_read0(si);
+       conn_data_read0(conn);
        return;
 
  out_error: