From: Willy Tarreau Date: Mon, 20 Aug 2012 15:30:32 +0000 (+0200) Subject: MAJOR: raw_sock: extract raw_sock_to_buf() from raw_sock_read() X-Git-Tag: v1.5-dev12~79 X-Git-Url: http://git.ipfire.org/?a=commitdiff_plain;h=2ba44650863ceef2ffb0f3bb59caf25cc296a417;p=thirdparty%2Fhaproxy.git MAJOR: raw_sock: extract raw_sock_to_buf() from raw_sock_read() This is the start of the stream connection iterator which calls the data-layer reader. This still looks a bit tricky but is OK. Splicing is not handled at all at the moment. --- diff --git a/include/types/connection.h b/include/types/connection.h index 8dd4f20a31..6d402cff56 100644 --- a/include/types/connection.h +++ b/include/types/connection.h @@ -73,6 +73,13 @@ enum { */ CO_FL_POLL_SOCK = CO_FL_HANDSHAKE | CO_FL_WAIT_L4_CONN | CO_FL_WAIT_L6_CONN, + /* These flags are used by data layers to indicate to their iterators + * whether they had to stop due to missing data or missing room. Their + * callers must reset them before calling the data layer handlers. + */ + CO_FL_WAIT_DATA = 0x00004000, /* data source is empty */ + CO_FL_WAIT_ROOM = 0x00008000, /* data sink is full */ + /* flags used to remember what shutdown have been performed/reported */ CO_FL_DATA_RD_SH = 0x00010000, /* DATA layer was notified about shutr/read0 */ CO_FL_DATA_WR_SH = 0x00020000, /* DATA layer asked for shutw */ diff --git a/src/raw_sock.c b/src/raw_sock.c index 28ee222358..2c96cbbce2 100644 --- a/src/raw_sock.c +++ b/src/raw_sock.c @@ -209,19 +209,91 @@ static int sock_raw_splice_in(struct channel *b, struct stream_interface *si) #endif /* CONFIG_HAP_LINUX_SPLICE */ +/* Receive up to bytes from connection 's socket and store them + * into buffer . The caller must ensure that is always smaller + * than the buffer's size. Only one call to recv() is performed, unless the + * buffer wraps, in which case a second call may be performed. The connection's + * flags are updated with whatever special event is detected (error, read0, + * empty). The caller is responsible for taking care of those events and + * avoiding the call if inappropriate. The function does not call the + * connection's polling update function, so the caller is responsible for this. + */ +static int raw_sock_to_buf(struct connection *conn, struct buffer *buf, int count) +{ + int ret, done = 0; + int try = count; + + /* stop here if we reached the end of data */ + if ((fdtab[conn->t.sock.fd].ev & (FD_POLL_IN|FD_POLL_HUP)) == FD_POLL_HUP) + goto read0; + + /* compute the maximum block size we can read at once. */ + if (buffer_empty(buf)) { + /* let's realign the buffer to optimize I/O */ + buf->p = buf->data; + } + else if (buf->data + buf->o < buf->p && + buf->p + buf->i < buf->data + buf->size) { + /* remaining space wraps at the end, with a moving limit */ + if (try > buf->data + buf->size - (buf->p + buf->i)) + try = buf->data + buf->size - (buf->p + buf->i); + } + + /* read the largest possible block. For this, we perform only one call + * to recv() unless the buffer wraps and we exactly fill the first hunk, + * in which case we accept to do it once again. A new attempt is made on + * EINTR too. + */ + while (try) { + ret = recv(conn->t.sock.fd, bi_end(buf), try, 0); + + if (ret > 0) { + buf->i += ret; + done += ret; + if (ret < try) { + /* unfortunately, on level-triggered events, POLL_HUP + * is generally delivered AFTER the system buffer is + * empty, so this one might never match. + */ + if (fdtab[conn->t.sock.fd].ev & FD_POLL_HUP) + goto read0; + break; + } + count -= ret; + try = count; + } + else if (ret == 0) { + goto read0; + } + else if (errno == EAGAIN) { + conn->flags |= CO_FL_WAIT_DATA; + break; + } + else if (errno != EINTR) { + conn->flags |= CO_FL_ERROR; + break; + } + } + return done; + + read0: + conn_sock_read0(conn); + return done; +} + + /* * this function is called on a read event from a stream socket. */ static void sock_raw_read(struct connection *conn) { - int fd = conn->t.sock.fd; struct stream_interface *si = container_of(conn, struct stream_interface, conn); struct channel *b = si->ib; int ret, max, cur_read; int read_poll = MAX_READ_POLL_LOOPS; #ifdef DEBUG_FULL - fprintf(stderr,"sock_raw_read : fd=%d, ev=0x%02x, owner=%p\n", fd, fdtab[fd].ev, fdtab[fd].owner); + fprintf(stderr,"sock_raw_read : fd=%d, ev=0x%02x, owner=%p\n", conn->t.sock.fd, fdtab[conn->t.sock.fd].ev, fdtab[conn->t.sock.fd].owner); #endif /* stop immediately on errors. Note that we DON'T want to stop on * POLL_ERR, as the poller might report a write error while there @@ -233,7 +305,7 @@ static void sock_raw_read(struct connection *conn) goto out_error; /* stop here if we reached the end of data */ - if ((fdtab[fd].ev & (FD_POLL_IN|FD_POLL_HUP)) == FD_POLL_HUP) + if (conn_data_read0_pending(conn)) goto out_shutdown_r; /* maybe we were called immediately after an asynchronous shutr */ @@ -247,7 +319,7 @@ static void sock_raw_read(struct connection *conn) * Since older splice() implementations were buggy and returned * EAGAIN on end of read, let's bypass the call to splice() now. */ - if (fdtab[fd].ev & FD_POLL_HUP) + if (fdtab[conn->t.sock.fd].ev & FD_POLL_HUP) goto out_shutdown_r; if (sock_raw_splice_in(b, si) >= 0) { @@ -261,7 +333,8 @@ static void sock_raw_read(struct connection *conn) } #endif cur_read = 0; - while (1) { + conn->flags &= ~(CO_FL_WAIT_DATA | CO_FL_WAIT_ROOM); + while (!(conn->flags & (CO_FL_ERROR | CO_FL_SOCK_RD_SH | CO_FL_DATA_RD_SH | CO_FL_WAIT_DATA | CO_FL_WAIT_ROOM | CO_FL_HANDSHAKE))) { max = bi_avail(b); if (!max) { @@ -270,172 +343,133 @@ static void sock_raw_read(struct connection *conn) break; } - /* - * 1. compute the maximum block size we can read at once. - */ - if (buffer_empty(&b->buf)) { - /* let's realign the buffer to optimize I/O */ - b->buf.p = b->buf.data; - } - else if (b->buf.data + b->buf.o < b->buf.p && - b->buf.p + b->buf.i < b->buf.data + b->buf.size) { - /* remaining space wraps at the end, with a moving limit */ - if (max > b->buf.data + b->buf.size - (b->buf.p + b->buf.i)) - max = b->buf.data + b->buf.size - (b->buf.p + b->buf.i); - } - /* else max is already OK */ + ret = raw_sock_to_buf(conn, &b->buf, max); + if (ret <= 0) + break; - /* - * 2. read the largest possible block - */ - ret = recv(fd, bi_end(&b->buf), max, 0); + cur_read += ret; - if (ret > 0) { - b->buf.i += ret; - cur_read += ret; - - /* if we're allowed to directly forward data, we must update ->o */ - if (b->to_forward && !(b->flags & (BF_SHUTW|BF_SHUTW_NOW))) { - unsigned long fwd = ret; - if (b->to_forward != BUF_INFINITE_FORWARD) { - if (fwd > b->to_forward) - fwd = b->to_forward; - b->to_forward -= fwd; - } - b_adv(b, fwd); + /* if we're allowed to directly forward data, we must update ->o */ + if (b->to_forward && !(b->flags & (BF_SHUTW|BF_SHUTW_NOW))) { + unsigned long fwd = ret; + if (b->to_forward != BUF_INFINITE_FORWARD) { + if (fwd > b->to_forward) + fwd = b->to_forward; + b->to_forward -= fwd; } + b_adv(b, fwd); + } - if (conn->flags & CO_FL_WAIT_L4_CONN) { - conn->flags &= ~CO_FL_WAIT_L4_CONN; - si->exp = TICK_ETERNITY; - } + if (conn->flags & CO_FL_WAIT_L4_CONN) { + conn->flags &= ~CO_FL_WAIT_L4_CONN; + si->exp = TICK_ETERNITY; + } - b->flags |= BF_READ_PARTIAL; - b->total += ret; + b->flags |= BF_READ_PARTIAL; + b->total += ret; - if (bi_full(b)) { - /* The buffer is now full, there's no point in going through - * the loop again. - */ - if (!(b->flags & BF_STREAMER_FAST) && (cur_read == buffer_len(&b->buf))) { - b->xfer_small = 0; - b->xfer_large++; - if (b->xfer_large >= 3) { - /* we call this buffer a fast streamer if it manages - * to be filled in one call 3 consecutive times. - */ - b->flags |= (BF_STREAMER | BF_STREAMER_FAST); - //fputc('+', stderr); - } - } - else if ((b->flags & (BF_STREAMER | BF_STREAMER_FAST)) && - (cur_read <= b->buf.size / 2)) { - b->xfer_large = 0; - b->xfer_small++; - if (b->xfer_small >= 2) { - /* if the buffer has been at least half full twice, - * we receive faster than we send, so at least it - * is not a "fast streamer". - */ - b->flags &= ~BF_STREAMER_FAST; - //fputc('-', stderr); - } - } - else { - b->xfer_small = 0; - b->xfer_large = 0; + if (bi_full(b)) { + /* The buffer is now full, there's no point in going through + * the loop again. + */ + if (!(b->flags & BF_STREAMER_FAST) && (cur_read == buffer_len(&b->buf))) { + b->xfer_small = 0; + b->xfer_large++; + if (b->xfer_large >= 3) { + /* we call this buffer a fast streamer if it manages + * to be filled in one call 3 consecutive times. + */ + b->flags |= (BF_STREAMER | BF_STREAMER_FAST); + //fputc('+', stderr); } - - b->flags |= BF_FULL; - si->flags |= SI_FL_WAIT_ROOM; - break; } - - /* if too many bytes were missing from last read, it means that - * it's pointless trying to read again because the system does - * not have them in buffers. BTW, if FD_POLL_HUP was present, - * it means that we have reached the end and that the connection - * is closed. - */ - if (ret < max) { - if ((b->flags & (BF_STREAMER | BF_STREAMER_FAST)) && - (cur_read <= b->buf.size / 2)) { - b->xfer_large = 0; - b->xfer_small++; - if (b->xfer_small >= 3) { - /* we have read less than half of the buffer in - * one pass, and this happened at least 3 times. - * This is definitely not a streamer. - */ - b->flags &= ~(BF_STREAMER | BF_STREAMER_FAST); - //fputc('!', stderr); - } + else if ((b->flags & (BF_STREAMER | BF_STREAMER_FAST)) && + (cur_read <= b->buf.size / 2)) { + b->xfer_large = 0; + b->xfer_small++; + if (b->xfer_small >= 2) { + /* if the buffer has been at least half full twice, + * we receive faster than we send, so at least it + * is not a "fast streamer". + */ + b->flags &= ~BF_STREAMER_FAST; + //fputc('-', stderr); } - /* unfortunately, on level-triggered events, POLL_HUP - * is generally delivered AFTER the system buffer is - * empty, so this one might never match. - */ - if (fdtab[fd].ev & FD_POLL_HUP) - goto out_shutdown_r; + } + else { + b->xfer_small = 0; + b->xfer_large = 0; + } - /* if a streamer has read few data, it may be because we - * have exhausted system buffers. It's not worth trying - * again. - */ - if (b->flags & BF_STREAMER) - break; + b->flags |= BF_FULL; + si->flags |= SI_FL_WAIT_ROOM; + break; + } - /* generally if we read something smaller than 1 or 2 MSS, - * it means that either we have exhausted the system's - * buffers (streamer or question-response protocol) or - * that the connection will be closed. Streamers are - * easily detected so we return early. For other cases, - * it's still better to perform a last read to be sure, - * because it may save one complete poll/read/wakeup cycle - * in case of shutdown. - */ - if (ret < MIN_RET_FOR_READ_LOOP && b->flags & BF_STREAMER) - break; + if ((b->flags & BF_READ_DONTWAIT) || --read_poll <= 0) + break; - /* if we read a large block smaller than what we requested, - * it's almost certain we'll never get anything more. - */ - if (ret >= global.tune.recv_enough) - break; + /* if too many bytes were missing from last read, it means that + * it's pointless trying to read again because the system does + * not have them in buffers. + */ + if (ret < max) { + if ((b->flags & (BF_STREAMER | BF_STREAMER_FAST)) && + (cur_read <= b->buf.size / 2)) { + b->xfer_large = 0; + b->xfer_small++; + if (b->xfer_small >= 3) { + /* we have read less than half of the buffer in + * one pass, and this happened at least 3 times. + * This is definitely not a streamer. + */ + b->flags &= ~(BF_STREAMER | BF_STREAMER_FAST); + //fputc('!', stderr); + } } - if ((b->flags & BF_READ_DONTWAIT) || --read_poll <= 0) + /* if a streamer has read few data, it may be because we + * have exhausted system buffers. It's not worth trying + * again. + */ + if (b->flags & BF_STREAMER) break; - } - else if (ret == 0) { - /* connection closed */ - goto out_shutdown_r; - } - else if (errno == EAGAIN) { - /* Ignore EAGAIN but inform the poller that there is - * nothing to read left if we did not read much, ie - * less than what we were still expecting to read. - * But we may have done some work justifying to notify - * the task. + + /* if we read a large block smaller than what we requested, + * it's almost certain we'll never get anything more. */ - if (cur_read < MIN_RET_FOR_READ_LOOP) - conn_data_poll_recv(conn); - break; - } - else { - goto out_error; + if (ret >= global.tune.recv_enough) + break; } - } /* while (1) */ + } /* while !flags */ + + if (conn->flags & CO_FL_ERROR) + goto out_error; + + if (conn->flags & CO_FL_WAIT_DATA) { + /* we don't automatically ask for polling if we have + * read enough data, as it saves some syscalls with + * speculative pollers. + */ + if (cur_read < MIN_RET_FOR_READ_LOOP) + __conn_data_poll_recv(conn); + else + __conn_data_want_recv(conn); + } + + if (conn_data_read0_pending(conn)) + /* connection closed */ + goto out_shutdown_r; return; out_shutdown_r: /* we received a shutdown */ - fdtab[fd].ev &= ~FD_POLL_HUP; b->flags |= BF_READ_NULL; if (b->flags & BF_AUTO_CLOSE) buffer_shutw_now(b); stream_sock_read0(si); + conn_data_read0(conn); return; out_error: