]> git.ipfire.org Git - thirdparty/haproxy.git/commitdiff
[MAJOR] implemented support for speculative I/O processing
authorWilly Tarreau <w@1wt.eu>
Sun, 15 Apr 2007 18:56:27 +0000 (20:56 +0200)
committerWilly Tarreau <w@1wt.eu>
Sun, 15 Apr 2007 18:56:27 +0000 (20:56 +0200)
The pollers will now be able to speculatively call the I/O
processing functions and decide whether or not they want to
poll on those FDs. The changes primarily consist in teaching
those functions how to pass the info they got an EAGAIN.

include/common/defaults.h
src/checks.c
src/stream_sock.c

index e6552de8058145fb24ee3c5feddb42037fb4a63c..c99aafe4feb65e4278585d270af81624b18f00be 100644 (file)
 #define MAX_READ_POLL_LOOPS 4
 #endif
 
+// same, but for writes. Generally, it's enough to write twice: one time for
+// first half of the buffer, and a second time for the last half after a
+// wrap-around.
+#ifndef MAX_WRITE_POLL_LOOPS
+#define MAX_WRITE_POLL_LOOPS 2
+#endif
+
 // the number of bytes returned by a read below which we will not try to
 // poll the socket again. Generally, return values below the MSS are worthless
 // to try again.
index 2ae01db56123ec965ebf3a4157ea75f2de836b9f..309d0c4a76902b45be2a78d55131aa625834dc13 100644 (file)
@@ -22,6 +22,7 @@
 #include <common/compat.h>
 #include <common/config.h>
 #include <common/mini-clist.h>
+#include <common/standard.h>
 #include <common/time.h>
 
 #include <types/global.h>
@@ -47,7 +48,7 @@
  * remaining servers on the proxy and transfers queued sessions whenever
  * possible to other servers.
  */
-void set_server_down(struct server *s)
+static void set_server_down(struct server *s)
 {
        struct pendconn *pc, *pc_bck, *pc_end;
        struct session *sess;
@@ -102,25 +103,31 @@ void set_server_down(struct server *s)
 /*
  * This function is used only for server health-checks. It handles
  * the connection acknowledgement. If the proxy requires HTTP health-checks,
- * it sends the request. In other cases, it returns 1 if the socket is OK,
- * or -1 if an error occured.
+ * it sends the request. In other cases, it returns 1 in s->result if the
+ * socket is OK, or -1 if an error occured.
+ * The function itself returns 0 if it needs some polling before being called
+ * again, otherwise 1.
  */
-int event_srv_chk_w(int fd)
+static int event_srv_chk_w(int fd)
 {
+       __label__ out_wakeup, out_nowake;
        struct task *t = fdtab[fd].owner;
        struct server *s = t->context;
        int skerr;
        socklen_t lskerr = sizeof(skerr);
 
        skerr = 1;
-       if ((getsockopt(fd, SOL_SOCKET, SO_ERROR, &skerr, &lskerr) == -1)
-           || (skerr != 0)) {
+       if (unlikely(fdtab[fd].state == FD_STERROR ||
+                    (fdtab[fd].ev & FD_POLL_ERR) ||
+                    (getsockopt(fd, SOL_SOCKET, SO_ERROR, &skerr, &lskerr) == -1) ||
+                    (skerr != 0))) {
                /* in case of TCP only, this tells us if the connection failed */
                s->result = -1;
                fdtab[fd].state = FD_STERROR;
-               EV_FD_CLR(fd, DIR_WR);
+               goto out_wakeup;
        }
-       else if (s->result != -1) {
+
+       if (s->result != -1) {
                /* we don't want to mark 'UP' a server on which we detected an error earlier */
                if ((s->proxy->options & PR_O_HTTP_CHK) ||
                    (s->proxy->options & PR_O_SSL3_CHK)) {
@@ -142,7 +149,11 @@ int event_srv_chk_w(int fd)
 #endif
                        if (ret == s->proxy->check_len) {
                                EV_FD_SET(fd, DIR_RD);   /* prepare for reading reply */
-                               EV_FD_CLR(fd, DIR_WR);   /* nothing more to write */
+                               goto out_nowake;
+                       }
+                       else if (ret == 0 || errno == EAGAIN) {
+                               /* we want some polling to happen first */
+                               fdtab[fd].ev &= ~FD_POLL_WR;
                                return 0;
                        }
                        else {
@@ -155,9 +166,12 @@ int event_srv_chk_w(int fd)
                        s->result = 1;
                }
        }
-
+ out_wakeup:
        task_wakeup(&rq, t);
-       return 0;
+ out_nowake:
+       EV_FD_CLR(fd, DIR_WR);   /* nothing more to write */
+       fdtab[fd].ev &= ~FD_POLL_WR;
+       return 1;
 }
 
 
@@ -167,10 +181,12 @@ int event_srv_chk_w(int fd)
  * server replies HTTP 2xx or 3xx (valid responses), or if it returns at least
  * 5 bytes in response to SSL HELLO. The principle is that this is enough to
  * distinguish between an SSL server and a pure TCP relay. All other cases will
- * return -1. The function returns 0.
+ * return -1. The function returns 0 if it needs to be called again after some
+ * polling, otherwise non-zero..
  */
-int event_srv_chk_r(int fd)
+static int event_srv_chk_r(int fd)
 {
+       __label__ out_wakeup;
        char reply[64];
        int len, result;
        struct task *t = fdtab[fd].owner;
@@ -179,34 +195,51 @@ int event_srv_chk_r(int fd)
        socklen_t lskerr = sizeof(skerr);
 
        result = len = -1;
-       if (!getsockopt(fd, SOL_SOCKET, SO_ERROR, &skerr, &lskerr) && !skerr) {
+
+       if (unlikely(fdtab[fd].state == FD_STERROR ||
+                    (fdtab[fd].ev & FD_POLL_ERR) ||
+                    (getsockopt(fd, SOL_SOCKET, SO_ERROR, &skerr, &lskerr) == -1) ||
+                    (skerr != 0))) {
+               /* in case of TCP only, this tells us if the connection failed */
+               s->result = -1;
+               fdtab[fd].state = FD_STERROR;
+               goto out_wakeup;
+       }
+
 #ifndef MSG_NOSIGNAL
-               len = recv(fd, reply, sizeof(reply), 0);
+       len = recv(fd, reply, sizeof(reply), 0);
 #else
-               /* Warning! Linux returns EAGAIN on SO_ERROR if data are still available
-                * but the connection was closed on the remote end. Fortunately, recv still
-                * works correctly and we don't need to do the getsockopt() on linux.
-                */
-               len = recv(fd, reply, sizeof(reply), MSG_NOSIGNAL);
+       /* Warning! Linux returns EAGAIN on SO_ERROR if data are still available
+        * but the connection was closed on the remote end. Fortunately, recv still
+        * works correctly and we don't need to do the getsockopt() on linux.
+        */
+       len = recv(fd, reply, sizeof(reply), MSG_NOSIGNAL);
 #endif
-               if (((s->proxy->options & PR_O_HTTP_CHK) &&
-                    (len >= sizeof("HTTP/1.0 000")) &&
-                   !memcmp(reply, "HTTP/1.", 7) &&
-                   (reply[9] == '2' || reply[9] == '3')) /* 2xx or 3xx */
-                   || ((s->proxy->options & PR_O_SSL3_CHK) && (len >= 5) &&
-                       (reply[0] == 0x15 || reply[0] == 0x16))) /* alert or handshake */
-                       result = 1;
+       if (unlikely(len < 0 && errno == EAGAIN)) {
+               /* we want some polling to happen first */
+               fdtab[fd].ev &= ~FD_POLL_RD;
+               return 0;
        }
 
+       if (((s->proxy->options & PR_O_HTTP_CHK) &&
+            (len >= sizeof("HTTP/1.0 000")) &&
+            !memcmp(reply, "HTTP/1.", 7) &&
+            (reply[9] == '2' || reply[9] == '3')) /* 2xx or 3xx */
+           || ((s->proxy->options & PR_O_SSL3_CHK) && (len >= 5) &&
+               (reply[0] == 0x15 || reply[0] == 0x16))) /* alert or handshake */
+               result = 1;
+
        if (result == -1)
                fdtab[fd].state = FD_STERROR;
 
        if (s->result != -1)
                s->result = result;
 
+ out_wakeup:
        EV_FD_CLR(fd, DIR_RD);
        task_wakeup(&rq, t);
-       return 0;
+       fdtab[fd].ev &= ~FD_POLL_RD;
+       return 1;
 }
 
 /*
index a150a08de331d406618eca7796747f3f86a6f520..7d2aa3035a5ea3d11c1593e4e8d6e4932340f04f 100644 (file)
@@ -21,6 +21,7 @@
 
 #include <common/compat.h>
 #include <common/config.h>
+#include <common/standard.h>
 #include <common/time.h>
 
 #include <types/buffers.h>
 
 /*
  * this function is called on a read event from a stream socket.
- * It returns 0.
+ * It returns 0 if we have a high confidence that we will not be
+ * able to read more data without polling first. Returns non-zero
+ * otherwise.
  */
 int stream_sock_read(int fd) {
+       __label__ out_wakeup;
        struct buffer *b = fdtab[fd].cb[DIR_RD].b;
-       int ret, max;
+       int ret, max, retval;
        int read_poll = MAX_READ_POLL_LOOPS;
 
 #ifdef DEBUG_FULL
        fprintf(stderr,"stream_sock_read : fd=%d, owner=%p\n", fd, fdtab[fd].owner);
 #endif
 
-       if (fdtab[fd].state != FD_STERROR) {
-               while (read_poll-- > 0)
-               {
-                       if (b->l == 0) { /* let's realign the buffer to optimize I/O */
-                               b->r = b->w = b->lr  = b->data;
+       retval = 1;
+
+       if (unlikely(fdtab[fd].state == FD_STERROR || (fdtab[fd].ev & FD_POLL_ERR))) {
+               /* read/write error */
+               b->flags |= BF_READ_ERROR;
+               fdtab[fd].state = FD_STERROR;
+               goto out_wakeup;
+       }
+
+       if (unlikely(fdtab[fd].ev & FD_POLL_HUP)) {
+               /* connection closed */
+               b->flags |= BF_READ_NULL;
+               goto out_wakeup;
+       }
+
+       retval = 0;
+       while (read_poll-- > 0) {
+               if (b->l == 0) { /* let's realign the buffer to optimize I/O */
+                       b->r = b->w = b->lr  = b->data;
+                       max = b->rlim - b->data;
+               }
+               else if (b->r > b->w) {
+                       max = b->rlim - b->r;
+               }
+               else {
+                       max = b->w - b->r;
+                       /* FIXME: theorically, if w>0, we shouldn't have rlim < data+size anymore
+                        * since it means that the rewrite protection has been removed. This
+                        * implies that the if statement can be removed.
+                        */
+                       if (max > b->rlim - b->data)
                                max = b->rlim - b->data;
-                       }
-                       else if (b->r > b->w) {
-                               max = b->rlim - b->r;
-                       }
-                       else {
-                               max = b->w - b->r;
-                               /* FIXME: theorically, if w>0, we shouldn't have rlim < data+size anymore
-                                * since it means that the rewrite protection has been removed. This
-                                * implies that the if statement can be removed.
-                                */
-                               if (max > b->rlim - b->data)
-                                       max = b->rlim - b->data;
-                       }
+               }
            
-                       if (max == 0) {  /* not anymore room to store data */
-                               EV_FD_CLR(fd, DIR_RD);
-                               break;
-                       }
+               if (max == 0) {  /* not anymore room to store data */
+                       EV_FD_CLR(fd, DIR_RD);
+                       break;
+               }
 
 #ifndef MSG_NOSIGNAL
-                       {
-                               int skerr;
-                               socklen_t lskerr = sizeof(skerr);
-       
-                               ret = getsockopt(fd, SOL_SOCKET, SO_ERROR, &skerr, &lskerr);
-                               if (ret == -1 || skerr)
-                                       ret = -1;
-                               else
-                                       ret = recv(fd, b->r, max, 0);
-                       }
+               {
+                       int skerr;
+                       socklen_t lskerr = sizeof(skerr);
+
+                       ret = getsockopt(fd, SOL_SOCKET, SO_ERROR, &skerr, &lskerr);
+                       if (ret == -1 || skerr)
+                               ret = -1;
+                       else
+                               ret = recv(fd, b->r, max, 0);
+               }
 #else
-                       ret = recv(fd, b->r, max, MSG_NOSIGNAL);
+               ret = recv(fd, b->r, max, MSG_NOSIGNAL);
 #endif
-                       if (ret > 0) {
-                               b->r += ret;
-                               b->l += ret;
-                               b->flags |= BF_PARTIAL_READ;
+               if (ret > 0) {
+                       b->r += ret;
+                       b->l += ret;
+                       b->flags |= BF_PARTIAL_READ;
+                       retval = 1;
        
-                               if (b->r == b->data + BUFSIZE) {
-                                       b->r = b->data; /* wrap around the buffer */
-                               }
-
-                               b->total += ret;
+                       if (b->r == b->data + BUFSIZE) {
+                               b->r = b->data; /* wrap around the buffer */
+                       }
 
-                               /* generally if we read something smaller than the 1 or 2 MSS,
-                                * it means that it's not worth trying to read again.
-                                */
-                               if (ret < MIN_RET_FOR_READ_LOOP)
-                                       break;
-                               if (!read_poll)
-                                       break;
+                       b->total += ret;
 
-                               /* we hope to read more data or to get a close on next round */
-                               continue;
-                       }
-                       else if (ret == 0) {
-                               b->flags |= BF_READ_NULL;
-                               break;
-                       }
-                       else if (errno == EAGAIN) {/* ignore EAGAIN */
+                       /* generally if we read something smaller than the 1 or 2 MSS,
+                        * it means that it's not worth trying to read again. It may
+                        * also happen on headers, but the application then can stop
+                        * reading before we start polling.
+                        */
+                       if (ret < MIN_RET_FOR_READ_LOOP)
                                break;
-                       }
-                       else {
-                               b->flags |= BF_READ_ERROR;
-                               fdtab[fd].state = FD_STERROR;
+
+                       if (!read_poll)
                                break;
-                       }
-               } /* while(1) */
-       }
-       else {
-               b->flags |= BF_READ_ERROR;
-               fdtab[fd].state = FD_STERROR;
-       }
+
+                       /* we hope to read more data or to get a close on next round */
+                       continue;
+               }
+               else if (ret == 0) {
+                       b->flags |= BF_READ_NULL;
+                       retval = 1;     // connection closed
+                       break;
+               }
+               else if (errno == EAGAIN) {/* ignore EAGAIN */
+                       retval = 0;
+                       break;
+               }
+               else {
+                       retval = 1;
+                       b->flags |= BF_READ_ERROR;
+                       fdtab[fd].state = FD_STERROR;
+                       break;
+               }
+       } /* while (read_poll) */
 
        if (b->flags & BF_READ_STATUS) {
+       out_wakeup:
                if (b->rto && EV_FD_ISSET(fd, DIR_RD))
                        tv_delayfrom(&b->rex, &now, b->rto);
                else
@@ -135,55 +156,71 @@ int stream_sock_read(int fd) {
                task_wakeup(&rq, fdtab[fd].owner);
        }
 
-       return 0;
+       fdtab[fd].ev &= ~FD_POLL_RD;
+       return retval;
 }
 
 
 /*
  * this function is called on a write event from a stream socket.
- * It returns 0.
+ * It returns 0 if we have a high confidence that we will not be
+ * able to write more data without polling first. Returns non-zero
+ * otherwise.
  */
 int stream_sock_write(int fd) {
+       __label__ out_eternity;
        struct buffer *b = fdtab[fd].cb[DIR_WR].b;
-       int ret, max;
+       int ret, max, retval;
+       int write_poll = MAX_WRITE_POLL_LOOPS;
 
 #ifdef DEBUG_FULL
        fprintf(stderr,"stream_sock_write : fd=%d, owner=%p\n", fd, fdtab[fd].owner);
 #endif
 
-       if (b->l == 0) { /* let's realign the buffer to optimize I/O */
-               b->r = b->w = b->lr  = b->data;
-               max = 0;
-       }
-       else if (b->r > b->w) {
-               max = b->r - b->w;
+       retval = 1;
+
+       if (unlikely(fdtab[fd].state == FD_STERROR || (fdtab[fd].ev & FD_POLL_ERR))) {
+               /* read/write error */
+               b->flags |= BF_WRITE_ERROR;
+               fdtab[fd].state = FD_STERROR;
+               EV_FD_CLR(fd, DIR_WR);
+               goto out_eternity;
        }
-       else
-               max = b->data + BUFSIZE - b->w;
-    
-       if (fdtab[fd].state != FD_STERROR) {
+
+       retval = 0;
+       while (write_poll-- > 0) {
+               if (b->l == 0) { /* let's realign the buffer to optimize I/O */
+                       b->r = b->w = b->lr  = b->data;
+                       max = 0;
+               }
+               else if (b->r > b->w) {
+                       max = b->r - b->w;
+               }
+               else {
+                       max = b->data + BUFSIZE - b->w;
+               }
+
                if (max == 0) {
                        /* may be we have received a connection acknowledgement in TCP mode without data */
-                       if (fdtab[fd].state == FD_STCONN) {
+                       if (!(b->flags & BF_PARTIAL_WRITE)
+                           && fdtab[fd].state == FD_STCONN) {
                                int skerr;
                                socklen_t lskerr = sizeof(skerr);
                                ret = getsockopt(fd, SOL_SOCKET, SO_ERROR, &skerr, &lskerr);
                                if (ret == -1 || skerr) {
                                        b->flags |= BF_WRITE_ERROR;
                                        fdtab[fd].state = FD_STERROR;
-                                       task_wakeup(&rq, fdtab[fd].owner);
-                                       tv_eternity(&b->wex);
                                        EV_FD_CLR(fd, DIR_WR);
-                                       return 0;
+                                       retval = 1;
+                                       goto out_eternity;
                                }
                        }
 
                        b->flags |= BF_WRITE_NULL;
-                       task_wakeup(&rq, fdtab[fd].owner);
                        fdtab[fd].state = FD_STREADY;
-                       tv_eternity(&b->wex);
                        EV_FD_CLR(fd, DIR_WR);
-                       return 0;
+                       retval = 1;
+                       goto out_eternity;
                }
 
 #ifndef MSG_NOSIGNAL
@@ -206,41 +243,54 @@ int stream_sock_write(int fd) {
                        b->w += ret;
            
                        b->flags |= BF_PARTIAL_WRITE;
+                       retval = 1;
            
                        if (b->w == b->data + BUFSIZE) {
                                b->w = b->data; /* wrap around the buffer */
                        }
+
+                       if (!write_poll)
+                               break;
+
+                       /* we hope to be able to write more data */
+                       continue;
                }
                else if (ret == 0) {
                        /* nothing written, just pretend we were never called */
-                       // b->flags |= BF_WRITE_NULL;
-                       return 0;
+                       retval = 0;
+                       break;
+               }
+               else if (errno == EAGAIN) {/* ignore EAGAIN */
+                       retval = 0;
+                       break;
                }
-               else if (errno == EAGAIN) /* ignore EAGAIN */
-                       return 0;
                else {
                        b->flags |= BF_WRITE_ERROR;
                        fdtab[fd].state = FD_STERROR;
+                       EV_FD_CLR(fd, DIR_WR);
+                       retval = 1;
+                       goto out_eternity;
                }
-       }
-       else {
-               b->flags |= BF_WRITE_ERROR;
-               fdtab[fd].state = FD_STERROR;
-       }
+       } /* while (write_poll) */
 
-       if (b->wto) {
-               tv_delayfrom(&b->wex, &now, b->wto);
-               /* FIXME: to prevent the client from expiring read timeouts during writes,
-                * we refresh it. A solution would be to merge read+write timeouts into a
-                * unique one, although that needs some study particularly on full-duplex
-                * TCP connections. */
-               b->rex = b->wex;
+       if (b->flags & BF_WRITE_STATUS) {
+               if (b->wto) {
+                       tv_delayfrom(&b->wex, &now, b->wto);
+                       /* FIXME: to prevent the client from expiring read timeouts during writes,
+                        * we refresh it. A solution would be to merge read+write timeouts into a
+                        * unique one, although that needs some study particularly on full-duplex
+                        * TCP connections. */
+                       b->rex = b->wex;
+               }
+               else {
+               out_eternity:
+                       tv_eternity(&b->wex);
+               }
        }
-       else
-               tv_eternity(&b->wex);
 
        task_wakeup(&rq, fdtab[fd].owner);
-       return 0;
+       fdtab[fd].ev &= ~FD_POLL_WR;
+       return retval;
 }