From 3eba98aa57adc794a1b38ab5082bd08babb4a7f3 Mon Sep 17 00:00:00 2001 From: Willy Tarreau Date: Sun, 25 Jan 2009 13:56:13 +0100 Subject: [PATCH] [MEDIUM] splice: make use of pipe pools Using pipe pools makes pipe management a lot easier. It also allows to remove quite a bunch of #ifdefs in areas which depended on the presence or not of support for kernel splicing. The buffer now holds a pointer to a pipe structure which is always NULL except if there are still data in the pipe. When it needs to use that pipe, it dynamically allocates it from the pipe pool. When the data is consumed, the pipe is immediately released. That way, there is no need anymore to care about pipe closure upon session termination, nor about pipe creation when trying to use splice(). Another immediate advantage of this method is that it considerably reduces the number of pipes needed to use splice(). Tests have shown that even with 0.2 pipe per connection, almost all sessions can use splice(), because the same pipe may be used by several consecutive calls to splice(). --- include/proto/buffers.h | 5 +--- include/types/buffers.h | 14 +++------ include/types/global.h | 1 - src/fd.c | 3 -- src/haproxy.c | 2 +- src/session.c | 27 +++++------------ src/stream_sock.c | 66 ++++++++++++++++++++++++----------------- 7 files changed, 53 insertions(+), 65 deletions(-) diff --git a/include/proto/buffers.h b/include/proto/buffers.h index 9ebc6ffc0a..6b5771b6ab 100644 --- a/include/proto/buffers.h +++ b/include/proto/buffers.h @@ -48,15 +48,12 @@ static inline void buffer_init(struct buffer *buf) buf->send_max = 0; buf->to_forward = 0; buf->l = buf->total = 0; - buf->splice_len = 0; + buf->pipe = NULL; buf->analysers = 0; buf->cons = NULL; buf->flags = BF_EMPTY; buf->r = buf->lr = buf->w = buf->data; buf->max_len = BUFSIZE; -#if defined(CONFIG_HAP_LINUX_SPLICE) - buf->splice.prod = buf->splice.cons = -1; /* closed */ -#endif } /* returns 1 if the buffer is empty, 0 otherwise */ diff --git a/include/types/buffers.h b/include/types/buffers.h index ca80809548..3b0c3a12ae 100644 --- a/include/types/buffers.h +++ b/include/types/buffers.h @@ -128,7 +128,6 @@ struct buffer { int wto; /* write timeout, in ticks */ int cto; /* connect timeout, in ticks */ unsigned int l; /* data length */ - unsigned int splice_len; /* number of bytes remaining in splice, out of buffer */ char *r, *w, *lr; /* read ptr, write ptr, last read */ unsigned int max_len; /* read limit, used to keep room for header rewriting */ unsigned int send_max; /* number of bytes the sender can consume om this buffer, <= l */ @@ -141,12 +140,7 @@ struct buffer { unsigned long long total; /* total data read */ struct stream_interface *prod; /* producer attached to this buffer */ struct stream_interface *cons; /* consumer attached to this buffer */ - struct { -#if defined(CONFIG_HAP_LINUX_SPLICE) - int prod; /* -1 or fd of the pipe's end towards the producer */ - int cons; /* -1 or fd of the pipe's end towards the consumer */ -#endif - } splice; + struct pipe *pipe; /* non-NULL only when data present */ char data[BUFSIZE]; }; @@ -158,7 +152,7 @@ struct buffer { split in two parts : - the visible data (->data, for ->l bytes) - the invisible data, typically in kernel buffers forwarded directly from - the source stream sock to the destination stream sock (->splice_len + the source stream sock to the destination stream sock (->pipe->data bytes). Those are used only during forward. In order not to mix data streams, the producer may only feed the invisible @@ -173,7 +167,7 @@ struct buffer { buffer is expected to reach the destination file descriptor, by any means. However, it's the consumer's responsibility to ensure that the invisible data has been entirely consumed before consuming visible data. This must be - reflected by ->splice_len. This is very important as this and only this can + reflected by ->pipe->data. This is very important as this and only this can ensure strict ordering of data between buffers. The producer is responsible for decreasing ->to_forward and increasing @@ -184,7 +178,7 @@ struct buffer { well as any data forwarded through the visible buffer. The consumer is responsible for decreasing ->send_max when it sends data - from the visible buffer, and ->splice_len when it sends data from the + from the visible buffer, and ->pipe->data when it sends data from the invisible buffer. A real-world example consists in part in an HTTP response waiting in a diff --git a/include/types/global.h b/include/types/global.h index 8c2893095c..1209779065 100644 --- a/include/types/global.h +++ b/include/types/global.h @@ -75,7 +75,6 @@ extern char *progname; /* program name */ extern int pid; /* current process id */ extern int relative_pid; /* process id starting at 1 */ extern int actconn; /* # of active sessions */ -extern int usedpipes; /* # of used pipes */ extern int listeners; extern char trash[BUFSIZE]; extern const int zero; diff --git a/src/fd.c b/src/fd.c index c0fb712878..daa347f31d 100644 --- a/src/fd.c +++ b/src/fd.c @@ -18,15 +18,12 @@ #include #include -//#include - #include struct fdtab *fdtab = NULL; /* array of all the file descriptors */ int maxfd; /* # of the highest fd + 1 */ int totalconn; /* total # of terminated sessions */ int actconn; /* # of active sessions */ -int usedpipes; /* # of pipes in use (2 fds each) */ int cfg_polling_mechanism = 0; /* POLL_USE_{SELECT|POLL|EPOLL} */ diff --git a/src/haproxy.c b/src/haproxy.c index 9b83146e63..fd4b050288 100644 --- a/src/haproxy.c +++ b/src/haproxy.c @@ -393,7 +393,7 @@ void init(int argc, char **argv) * Initialize the previously static variables. */ - usedpipes = totalconn = actconn = maxfd = listeners = stopping = 0; + totalconn = actconn = maxfd = listeners = stopping = 0; #ifdef HAPROXY_MEMMAX diff --git a/src/session.c b/src/session.c index 3f18ad9ce2..c7dbf6f8a9 100644 --- a/src/session.c +++ b/src/session.c @@ -24,6 +24,7 @@ #include #include #include +#include #include #include #include @@ -64,23 +65,11 @@ void session_free(struct session *s) sess_change_server(s, NULL); } -#if defined(CONFIG_HAP_LINUX_SPLICE) - if (s->req->splice.prod >= 0) - close(s->req->splice.prod); - if (s->req->splice.cons >= 0) - close(s->req->splice.cons); - - if (s->req->splice.prod >= 0 || s->req->splice.cons >= 0) - usedpipes--; - - if (s->rep->splice.prod >= 0) - close(s->rep->splice.prod); - if (s->rep->splice.cons >= 0) - close(s->rep->splice.cons); - - if (s->rep->splice.prod >= 0 || s->rep->splice.cons >= 0) - usedpipes--; -#endif + if (s->req->pipe) + put_pipe(s->req->pipe); + + if (s->rep->pipe) + put_pipe(s->rep->pipe); pool_free2(pool2_buffer, s->req); pool_free2(pool2_buffer, s->rep); @@ -772,7 +761,7 @@ resync_stream_interface: !s->req->analysers && !(s->req->flags & BF_HIJACK)) { /* check if it is wise to enable kernel splicing on the request buffer */ if (!(s->req->flags & BF_KERN_SPLICING) && - (usedpipes < global.maxpipes) && + (pipes_used < global.maxpipes) && (((s->fe->options2|s->be->options2) & PR_O2_SPLIC_REQ) || (((s->fe->options2|s->be->options2) & PR_O2_SPLIC_AUT) && (s->req->flags & BF_STREAMER_FAST)))) @@ -895,7 +884,7 @@ resync_stream_interface: !s->rep->analysers && !(s->rep->flags & BF_HIJACK)) { /* check if it is wise to enable kernel splicing on the response buffer */ if (!(s->rep->flags & BF_KERN_SPLICING) && - (usedpipes < global.maxpipes) && + (pipes_used < global.maxpipes) && (((s->fe->options2|s->be->options2) & PR_O2_SPLIC_RTR) || (((s->fe->options2|s->be->options2) & PR_O2_SPLIC_AUT) && (s->rep->flags & BF_STREAMER_FAST)))) diff --git a/src/stream_sock.c b/src/stream_sock.c index 47fd8fa698..ae4eb9880e 100644 --- a/src/stream_sock.c +++ b/src/stream_sock.c @@ -1,7 +1,7 @@ /* * Functions operating on SOCK_STREAM and buffers. * - * Copyright 2000-2008 Willy Tarreau + * Copyright 2000-2009 Willy Tarreau * * This program is free software; you can redistribute it and/or * modify it under the terms of the GNU General Public License @@ -30,6 +30,7 @@ #include #include #include +#include #include #include @@ -95,6 +96,9 @@ _syscall6(int, splice, int, fdin, loff_t *, off_in, int, fdout, loff_t *, off_ou * SI_FL_ERR * SI_FL_WAIT_ROOM * (SI_FL_WAIT_RECV) + * + * This function automatically allocates a pipe from the pipe pool. It also + * carefully ensures to clear b->pipe whenever it leaves the pipe empty. */ static int stream_sock_splice_in(struct buffer *b, struct stream_interface *si) { @@ -121,17 +125,15 @@ static int stream_sock_splice_in(struct buffer *b, struct stream_interface *si) return 1; } - if (unlikely(b->splice.prod == -1)) { - int pipefd[2]; - if (usedpipes >= global.maxpipes || pipe(pipefd) < 0) { + if (unlikely(b->pipe == NULL)) { + if (pipes_used >= global.maxpipes || !(b->pipe = get_pipe())) { b->flags &= ~BF_KERN_SPLICING; return -1; } - usedpipes++; - b->splice.prod = pipefd[1]; - b->splice.cons = pipefd[0]; } + /* At this point, b->pipe is valid */ + while (1) { max = b->to_forward; if (max <= 0) { @@ -144,7 +146,7 @@ static int stream_sock_splice_in(struct buffer *b, struct stream_interface *si) break; } - ret = splice(fd, NULL, b->splice.prod, NULL, max, + ret = splice(fd, NULL, b->pipe->prod, NULL, max, SPLICE_F_MOVE|SPLICE_F_NONBLOCK); if (ret <= 0) { @@ -167,7 +169,7 @@ static int stream_sock_splice_in(struct buffer *b, struct stream_interface *si) * will almost always fill/empty the pipe. */ - if (b->splice_len > 0) { + if (b->pipe->data) { si->flags |= SI_FL_WAIT_ROOM; retval = 1; break; @@ -191,17 +193,22 @@ static int stream_sock_splice_in(struct buffer *b, struct stream_interface *si) b->to_forward -= ret; total += ret; b->total += ret; - b->splice_len += ret; + b->pipe->data += ret; b->flags |= BF_READ_PARTIAL; b->flags &= ~BF_EMPTY; /* to prevent shutdowns */ - if (b->splice_len >= SPLICE_FULL_HINT) { + if (b->pipe->data >= SPLICE_FULL_HINT) { /* We've read enough of it for this time. */ retval = 1; break; } } /* while */ + if (unlikely(!b->pipe->data)) { + put_pipe(b->pipe); + b->pipe = NULL; + } + return retval; } @@ -431,13 +438,14 @@ int stream_sock_read(int fd) { out_wakeup: /* We might have some data the consumer is waiting for */ - if ((b->send_max || b->splice_len) && (b->cons->flags & SI_FL_WAIT_DATA)) { - int last_len = b->splice_len; + if ((b->send_max || b->pipe) && (b->cons->flags & SI_FL_WAIT_DATA)) { + int last_len = b->pipe ? b->pipe->data : 0; b->cons->chk_snd(b->cons); /* check if the consumer has freed some space */ - if (!(b->flags & BF_FULL) && (!last_len || b->splice_len < last_len)) + if (!(b->flags & BF_FULL) && + (!last_len || !b->pipe || b->pipe->data < last_len)) si->flags &= ~SI_FL_WAIT_ROOM; } @@ -487,7 +495,8 @@ int stream_sock_read(int fd) { /* * This function is called to send buffer data to a stream socket. * It returns -1 in case of unrecoverable error, 0 if the caller needs to poll - * before calling it again, otherwise 1. + * before calling it again, otherwise 1. If a pipe was associated with the + * buffer and it empties it, it releases it as well. */ static int stream_sock_write_loop(struct stream_interface *si, struct buffer *b) { @@ -496,8 +505,8 @@ static int stream_sock_write_loop(struct stream_interface *si, struct buffer *b) int ret, max; #if defined(CONFIG_HAP_LINUX_SPLICE) - while (b->splice_len) { - ret = splice(b->splice.cons, NULL, si->fd, NULL, b->splice_len, + while (b->pipe) { + ret = splice(b->pipe->cons, NULL, si->fd, NULL, b->pipe->data, SPLICE_F_MOVE|SPLICE_F_NONBLOCK); if (ret <= 0) { if (ret == 0 || errno == EAGAIN) { @@ -510,10 +519,13 @@ static int stream_sock_write_loop(struct stream_interface *si, struct buffer *b) } b->flags |= BF_WRITE_PARTIAL; - b->splice_len -= ret; + b->pipe->data -= ret; - if (!b->splice_len) + if (!b->pipe->data) { + put_pipe(b->pipe); + b->pipe = NULL; break; + } if (--write_poll <= 0) return retval; @@ -575,7 +587,7 @@ static int stream_sock_write_loop(struct stream_interface *si, struct buffer *b) if (likely(!b->l)) { /* optimize data alignment in the buffer */ b->r = b->w = b->lr = b->data; - if (likely(!b->splice_len)) + if (likely(!b->pipe)) b->flags |= BF_EMPTY; } @@ -669,7 +681,7 @@ int stream_sock_write(int fd) */ } - if (!b->splice_len && !b->send_max) { + if (!b->pipe && !b->send_max) { /* the connection is established but we can't write. Either the * buffer is empty, or we just refrain from sending because the * send_max limit was reached. Maybe we just wrote the last @@ -692,7 +704,7 @@ int stream_sock_write(int fd) out_may_wakeup: if (b->flags & BF_WRITE_ACTIVITY) { /* update timeout if we have written something */ - if ((b->send_max || b->splice_len) && + if ((b->send_max || b->pipe) && (b->flags & (BF_SHUTW|BF_WRITE_PARTIAL)) == BF_WRITE_PARTIAL) b->wex = tick_add_ifset(now_ms, b->wto); @@ -716,7 +728,7 @@ int stream_sock_write(int fd) * any more data to forward and it's not planned to send any more. */ if (likely((b->flags & (BF_WRITE_NULL|BF_WRITE_ERROR|BF_SHUTW)) || - (!b->to_forward && !b->send_max && !b->splice_len) || + (!b->to_forward && !b->send_max && !b->pipe) || si->state != SI_ST_EST || b->prod->state != SI_ST_EST)) task_wakeup(si->owner, TASK_WOKEN_IO); @@ -850,7 +862,7 @@ void stream_sock_data_finish(struct stream_interface *si) /* Check if we need to close the write side */ if (!(ob->flags & BF_SHUTW)) { /* Write not closed, update FD status and timeout for writes */ - if ((ob->send_max == 0 && ob->splice_len == 0) || + if ((ob->send_max == 0 && !ob->pipe) || (ob->flags & BF_EMPTY) || (ob->flags & (BF_HIJACK|BF_WRITE_ENA)) == 0) { /* stop writing */ @@ -938,7 +950,7 @@ void stream_sock_chk_snd(struct stream_interface *si) if (!(si->flags & SI_FL_WAIT_DATA) || /* not waiting for data */ (fdtab[si->fd].ev & FD_POLL_OUT) || /* we'll be called anyway */ - !(ob->send_max || ob->splice_len) || /* called with nothing to send ! */ + !(ob->send_max || ob->pipe) || /* called with nothing to send ! */ !(ob->flags & (BF_HIJACK|BF_WRITE_ENA))) /* we may not write */ return; @@ -953,7 +965,7 @@ void stream_sock_chk_snd(struct stream_interface *si) goto out_wakeup; } - if (retval > 0 || (ob->send_max == 0 && ob->splice_len == 0)) { + if (retval > 0 || (ob->send_max == 0 && !ob->pipe)) { /* the connection is established but we can't write. Either the * buffer is empty, or we just refrain from sending because the * send_max limit was reached. Maybe we just wrote the last @@ -980,7 +992,7 @@ void stream_sock_chk_snd(struct stream_interface *si) * have to notify the task. */ if (likely((ob->flags & (BF_WRITE_NULL|BF_WRITE_ERROR|BF_SHUTW)) || - (!ob->to_forward && !ob->send_max && !ob->splice_len) || + (!ob->to_forward && !ob->send_max && !ob->pipe) || si->state != SI_ST_EST)) { out_wakeup: task_wakeup(si->owner, TASK_WOKEN_IO); -- 2.39.5