From: Willy Tarreau Date: Tue, 21 Aug 2012 16:22:06 +0000 (+0200) Subject: MAJOR: connection: split the send call into connection and stream interface X-Git-Tag: v1.5-dev12~75 X-Git-Url: http://git.ipfire.org/?a=commitdiff_plain;h=5368d80ede227406d7346b324fc6ef5fcfa134b8;p=thirdparty%2Fhaproxy.git MAJOR: connection: split the send call into connection and stream interface Similar to what was done on the receive path, the data layer now provides only an snd_buf() callback that is iterated over by the stream interface's si_conn_send_loop() function. The data layer now has no knowledge about channels nor stream interfaces. The splice() code still need to be ported as it currently is disabled. --- diff --git a/include/proto/connection.h b/include/proto/connection.h index d97978ebf9..7205b2c4c5 100644 --- a/include/proto/connection.h +++ b/include/proto/connection.h @@ -37,16 +37,6 @@ static inline void conn_data_close(struct connection *conn) conn->data->close(conn); } -/* Calls the snd_buf() function of the data layer if any, otherwise - * returns 0. - */ -static inline int conn_data_snd_buf(struct connection *conn) -{ - if (!conn->data->snd_buf) - return 0; - return conn->data->snd_buf(conn); -} - /* set polling depending on the change between the CURR part of the * flags and the new flags in connection C. The connection flags are * updated with the new flags at the end of the operation. Only the bits diff --git a/include/types/stream_interface.h b/include/types/stream_interface.h index 018349bbcc..ee552c2b93 100644 --- a/include/types/stream_interface.h +++ b/include/types/stream_interface.h @@ -118,8 +118,8 @@ struct sock_ops { void (*read)(struct connection *conn); /* read callback after poll() */ void (*write)(struct connection *conn); /* write callback after poll() */ void (*close)(struct connection *); /* close the data channel on the connection */ - int (*snd_buf)(struct connection *conn); /* callback used to send a buffer contents */ int (*rcv_buf)(struct connection *conn, struct buffer *buf, int count); /* recv callback */ + int (*snd_buf)(struct connection *conn, struct buffer *buf, int flags); /* send callback */ }; /* A stream interface has 3 parts : diff --git a/src/raw_sock.c b/src/raw_sock.c index 1bc40424af..60c7ed0e8b 100644 --- a/src/raw_sock.c +++ b/src/raw_sock.c @@ -279,148 +279,60 @@ static int raw_sock_to_buf(struct connection *conn, struct buffer *buf, int coun } -/* - * This function is called to send buffer data to a stream socket. - * It returns -1 in case of unrecoverable error, otherwise zero. +/* Send all pending bytes from buffer to connection 's socket. + * may contain MSG_MORE to make the system hold on without sending + * data too fast. + * Only one call to send() is performed, unless the buffer wraps, in which case + * a second call may be performed. The connection's flags are updated with + * whatever special event is detected (error, empty). The caller is responsible + * for taking care of those events and avoiding the call if inappropriate. The + * function does not call the connection's polling update function, so the caller + * is responsible for this. */ -static int sock_raw_write_loop(struct connection *conn) +static int raw_sock_from_buf(struct connection *conn, struct buffer *buf, int flags) { - struct stream_interface *si = container_of(conn, struct stream_interface, conn); - struct channel *b = si->ob; - int write_poll = MAX_WRITE_POLL_LOOPS; - int ret, max; - -#if 0 && defined(CONFIG_HAP_LINUX_SPLICE) - while (b->pipe) { - ret = splice(b->pipe->cons, NULL, si_fd(si), NULL, b->pipe->data, - SPLICE_F_MOVE|SPLICE_F_NONBLOCK); - if (ret <= 0) { - if (ret == 0 || errno == EAGAIN) { - conn_data_poll_send(&si->conn); - return 0; - } - /* here we have another error */ - return -1; - } + int ret, try, done, send_flag; - b->flags |= BF_WRITE_PARTIAL; - b->pipe->data -= ret; - - if (!b->pipe->data) { - put_pipe(b->pipe); - b->pipe = NULL; - break; - } - - if (--write_poll <= 0) - return 0; - - /* The only reason we did not empty the pipe is that the output - * buffer is full. - */ - conn_data_poll_send(&si->conn); - return 0; - } - - /* At this point, the pipe is empty, but we may still have data pending - * in the normal buffer. + done = 0; + /* send the largest possible block. For this we perform only one call + * to send() unless the buffer wraps and we exactly fill the first hunk, + * in which case we accept to do it once again. */ -#endif - if (!b->buf.o) { - b->flags |= BF_OUT_EMPTY; - return 0; - } - - /* when we're in this loop, we already know that there is no spliced - * data left, and that there are sendable buffered data. - */ - while (1) { - max = b->buf.o; - + while (buf->o) { + try = buf->o; /* outgoing data may wrap at the end */ - if (b->buf.data + max > b->buf.p) - max = b->buf.data + max - b->buf.p; - - /* check if we want to inform the kernel that we're interested in - * sending more data after this call. We want this if : - * - we're about to close after this last send and want to merge - * the ongoing FIN with the last segment. - * - we know we can't send everything at once and must get back - * here because of unaligned data - * - there is still a finite amount of data to forward - * The test is arranged so that the most common case does only 2 - * tests. - */ - - if (MSG_NOSIGNAL && MSG_MORE) { - unsigned int send_flag = MSG_DONTWAIT | MSG_NOSIGNAL; - - if ((!(b->flags & BF_NEVER_WAIT) && - ((b->to_forward && b->to_forward != BUF_INFINITE_FORWARD) || - (b->flags & BF_EXPECT_MORE))) || - ((b->flags & (BF_SHUTW|BF_SHUTW_NOW|BF_HIJACK)) == BF_SHUTW_NOW && (max == b->buf.o)) || - (max != b->buf.o)) { - send_flag |= MSG_MORE; - } - - /* this flag has precedence over the rest */ - if (b->flags & BF_SEND_DONTWAIT) - send_flag &= ~MSG_MORE; + if (buf->data + try > buf->p) + try = buf->data + try - buf->p; - ret = send(si_fd(si), bo_ptr(&b->buf), max, send_flag); - } else { - int skerr; - socklen_t lskerr = sizeof(skerr); + send_flag = MSG_DONTWAIT | MSG_NOSIGNAL; + if (try < buf->o) + send_flag = MSG_MORE; - ret = getsockopt(si_fd(si), SOL_SOCKET, SO_ERROR, &skerr, &lskerr); - if (ret == -1 || skerr) - ret = -1; - else - ret = send(si_fd(si), bo_ptr(&b->buf), max, MSG_DONTWAIT); - } + ret = send(conn->t.sock.fd, bo_ptr(buf), try, send_flag | flags); if (ret > 0) { - if (si->conn.flags & CO_FL_WAIT_L4_CONN) { - si->conn.flags &= ~CO_FL_WAIT_L4_CONN; - si->exp = TICK_ETERNITY; - } - - b->flags |= BF_WRITE_PARTIAL; + buf->o -= ret; + done += ret; - b->buf.o -= ret; - if (likely(!buffer_len(&b->buf))) + if (likely(!buffer_len(buf))) /* optimize data alignment in the buffer */ - b->buf.p = b->buf.data; - - if (likely(!bi_full(b))) - b->flags &= ~BF_FULL; - - if (!b->buf.o) { - /* Always clear both flags once everything has been sent, they're one-shot */ - b->flags &= ~(BF_EXPECT_MORE | BF_SEND_DONTWAIT); - if (likely(!b->pipe)) - b->flags |= BF_OUT_EMPTY; - break; - } + buf->p = buf->data; /* if the system buffer is full, don't insist */ - if (ret < max) - break; - - if (--write_poll <= 0) + if (ret < try) break; } else if (ret == 0 || errno == EAGAIN) { /* nothing written, we need to poll for write first */ - conn_data_poll_send(&si->conn); - return 0; + conn->flags |= CO_FL_WAIT_ROOM; + break; } - else { - /* bad, we got an error */ - return -1; + else if (errno != EINTR) { + conn->flags |= CO_FL_ERROR; + break; } - } /* while (1) */ - return 0; + } + return done; } @@ -433,7 +345,7 @@ struct sock_ops raw_sock = { .chk_snd = stream_int_chk_snd_conn, .read = si_conn_recv_cb, .write = si_conn_send_cb, - .snd_buf = sock_raw_write_loop, + .snd_buf = raw_sock_from_buf, .rcv_buf = raw_sock_to_buf, .close = NULL, }; diff --git a/src/stream_interface.c b/src/stream_interface.c index 2cc1962d19..eef6decfc5 100644 --- a/src/stream_interface.c +++ b/src/stream_interface.c @@ -654,6 +654,118 @@ void conn_notify_si(struct connection *conn) si->ib->flags &= ~BF_READ_DONTWAIT; } +/* + * This function is called to send buffer data to a stream socket. + * It returns -1 in case of unrecoverable error, otherwise zero. + * It iterates the data layer's snd_buf function. + */ +static int si_conn_send_loop(struct connection *conn) +{ + struct stream_interface *si = container_of(conn, struct stream_interface, conn); + struct channel *b = si->ob; + int write_poll = MAX_WRITE_POLL_LOOPS; + int ret; + +#if 0 && defined(CONFIG_HAP_LINUX_SPLICE) + while (b->pipe) { + ret = splice(b->pipe->cons, NULL, si_fd(si), NULL, b->pipe->data, + SPLICE_F_MOVE|SPLICE_F_NONBLOCK); + if (ret <= 0) { + if (ret == 0 || errno == EAGAIN) { + conn_data_poll_send(&si->conn); + return 0; + } + /* here we have another error */ + return -1; + } + + b->flags |= BF_WRITE_PARTIAL; + b->pipe->data -= ret; + + if (!b->pipe->data) { + put_pipe(b->pipe); + b->pipe = NULL; + break; + } + + if (--write_poll <= 0) + return 0; + + /* The only reason we did not empty the pipe is that the output + * buffer is full. + */ + conn_data_poll_send(&si->conn); + return 0; + } + + /* At this point, the pipe is empty, but we may still have data pending + * in the normal buffer. + */ +#endif + if (!b->buf.o) { + b->flags |= BF_OUT_EMPTY; + return 0; + } + + /* when we're in this loop, we already know that there is no spliced + * data left, and that there are sendable buffered data. + */ + conn->flags &= ~(CO_FL_WAIT_DATA | CO_FL_WAIT_ROOM); + while (!(conn->flags & (CO_FL_ERROR | CO_FL_SOCK_WR_SH | CO_FL_DATA_WR_SH | CO_FL_WAIT_DATA | CO_FL_WAIT_ROOM | CO_FL_HANDSHAKE))) { + /* check if we want to inform the kernel that we're interested in + * sending more data after this call. We want this if : + * - we're about to close after this last send and want to merge + * the ongoing FIN with the last segment. + * - we know we can't send everything at once and must get back + * here because of unaligned data + * - there is still a finite amount of data to forward + * The test is arranged so that the most common case does only 2 + * tests. + */ + unsigned int send_flag = MSG_DONTWAIT | MSG_NOSIGNAL; + + if ((!(b->flags & (BF_NEVER_WAIT|BF_SEND_DONTWAIT)) && + ((b->to_forward && b->to_forward != BUF_INFINITE_FORWARD) || + (b->flags & BF_EXPECT_MORE))) || + ((b->flags & (BF_SHUTW|BF_SHUTW_NOW|BF_HIJACK)) == BF_SHUTW_NOW)) + send_flag |= MSG_MORE; + + ret = conn->data->snd_buf(conn, &b->buf, send_flag); + if (ret <= 0) + break; + + if (si->conn.flags & CO_FL_WAIT_L4_CONN) + si->conn.flags &= ~CO_FL_WAIT_L4_CONN; + + b->flags |= BF_WRITE_PARTIAL; + + if (likely(!bi_full(b))) + b->flags &= ~BF_FULL; + + if (!b->buf.o) { + /* Always clear both flags once everything has been sent, they're one-shot */ + b->flags &= ~(BF_EXPECT_MORE | BF_SEND_DONTWAIT); + if (likely(!b->pipe)) + b->flags |= BF_OUT_EMPTY; + break; + } + + if (--write_poll <= 0) + break; + } /* while */ + + if (conn->flags & CO_FL_ERROR) + return -1; + + if (conn->flags & CO_FL_WAIT_ROOM) { + /* we need to poll before going on */ + conn_data_poll_send(&si->conn); + return 0; + } + return 0; +} + + /* Updates the timers and flags of a stream interface attached to a connection, * depending on the buffers' flags. It should only be called once after the * buffer flags have settled down, and before they are cleared. It doesn't @@ -792,7 +904,7 @@ void stream_int_chk_snd_conn(struct stream_interface *si) (fdtab[si_fd(si)].ev & FD_POLL_OUT))) /* we'll be called anyway */ return; - if (conn_data_snd_buf(&si->conn) < 0) { + if (si_conn_send_loop(&si->conn) < 0) { /* Write error on the file descriptor. We mark the FD as STERROR so * that we don't use it anymore and we notify the task. */ @@ -1078,7 +1190,7 @@ void si_conn_send_cb(struct connection *conn) return; /* OK there are data waiting to be sent */ - if (conn_data_snd_buf(conn) < 0) + if (si_conn_send_loop(conn) < 0) goto out_error; /* OK all done */