]> git.ipfire.org Git - thirdparty/haproxy.git/commitdiff
MEDIUM: connection: make use of the new polling functions
authorWilly Tarreau <wtarreau@exceliance.fr>
Fri, 17 Aug 2012 15:33:53 +0000 (17:33 +0200)
committerWilly Tarreau <w@1wt.eu>
Sun, 2 Sep 2012 19:53:11 +0000 (21:53 +0200)
Now the connection handler, the handshake callbacks and the I/O callbacks
make use of the connection-layer polling functions to enable or disable
polling on a file descriptor.

Some changes still need to be done to avoid using the FD_WAIT_* constants.

include/proto/stream_interface.h
src/connection.c
src/proto_tcp.c
src/session.c
src/sock_raw.c
src/stream_interface.c

index 3ddc56dd836fce16fe6bc1a65ebf81d898b4ab11..224ce8499b9a93c89f63f475cc3f4624ac8aa9a4 100644 (file)
@@ -27,6 +27,7 @@
 #include <common/config.h>
 #include <types/session.h>
 #include <types/stream_interface.h>
+#include <proto/connection.h>
 
 
 /* main event functions used to move data between sockets and buffers */
@@ -167,14 +168,14 @@ static inline void si_get_to_addr(struct stream_interface *si)
 static inline void si_shutr(struct stream_interface *si)
 {
        if (stream_int_shutr(si))
-               fd_stop_recv(si_fd(si));
+               conn_data_stop_recv(&si->conn);
 }
 
 /* Sends a shutw to the connection using the data layer */
 static inline void si_shutw(struct stream_interface *si)
 {
        if (stream_int_shutw(si))
-               fd_stop_send(si_fd(si));
+               conn_data_stop_send(&si->conn);
 }
 
 /* Calls the data state update on the stream interfaace */
index 712dfbaa30c73b4a05a2e464ef5f129082b712b7..a3f38eb84f576e8efe8f0a96844a7cad6dca09a4 100644 (file)
@@ -31,6 +31,12 @@ int conn_fd_handler(int fd)
                goto leave;
 
  process_handshake:
+       /* The handshake callbacks are called in sequence. If either of them is
+        * missing something, it must enable the required polling at the socket
+        * layer of the connection. Polling state is not guaranteed when entering
+        * these handlers, so any handshake handler which does not complete its
+        * work must explicitly disable events it's not interested in.
+        */
        while (unlikely(conn->flags & CO_FL_HANDSHAKE)) {
                if (unlikely(conn->flags & CO_FL_ERROR))
                        goto leave;
@@ -40,7 +46,9 @@ int conn_fd_handler(int fd)
                                goto leave;
        }
 
-       /* OK now we're in the data phase now */
+       /* Once we're purely in the data phase, we disable handshake polling */
+       if (!(conn->flags & CO_FL_POLL_SOCK))
+               __conn_sock_stop_both(conn);
 
        if (fdtab[fd].ev & (FD_POLL_IN | FD_POLL_HUP | FD_POLL_ERR))
                if (!conn->data->read(conn))
@@ -86,6 +94,9 @@ int conn_fd_handler(int fd)
 
        /* remove the events before leaving */
        fdtab[fd].ev &= ~(FD_POLL_IN | FD_POLL_OUT | FD_POLL_HUP | FD_POLL_ERR);
+
+       /* commit polling changes */
+       conn_cond_update_polling(conn);
        return ret;
 }
 
index 7de238a2181f2d01abf76870962e1ba0c4733d8e..2f56797141aee566d5f1a004e7e1b193d54edcb9 100644 (file)
@@ -475,7 +475,9 @@ int tcp_connect_server(struct stream_interface *si)
 
        fdtab[fd].iocb = conn_fd_handler;
        fd_insert(fd);
-       fd_want_send(fd);  /* for connect status */
+       conn_sock_want_send(&si->conn);  /* for connect status */
+       if (!(si->ob->flags & BF_OUT_EMPTY))
+               conn_data_want_send(&si->conn);  /* prepare to send data if any */
 
        si->state = SI_ST_CON;
        si->flags |= SI_FL_CAP_SPLTCP; /* TCP supports splicing */
@@ -548,8 +550,11 @@ int tcp_connect_probe(struct connection *conn)
         *  - connected (EISCONN, 0)
         */
        if ((connect(fd, conn->peeraddr, conn->peerlen) < 0)) {
-               if (errno == EALREADY || errno == EINPROGRESS)
+               if (errno == EALREADY || errno == EINPROGRESS) {
+                       conn_sock_stop_recv(conn);
+                       conn_sock_poll_send(conn);
                        return 0;
+               }
 
                if (errno && errno != EISCONN)
                        goto out_error;
@@ -570,7 +575,7 @@ int tcp_connect_probe(struct connection *conn)
         */
 
        conn->flags |= CO_FL_ERROR;
-       fd_stop_both(fd);
+       conn_sock_stop_both(conn);
        return 1;
 }
 
index ffb683ef3abf0ae5156ab38e7da256a17988b084..6612f0a43a1a019df806d8b46f03362d8c1e273e 100644 (file)
@@ -284,7 +284,7 @@ int session_accept(struct listener *l, int cfd, struct sockaddr_storage *addr)
        fdtab[cfd].owner = &s->si[0].conn;
        fdtab[cfd].flags = 0;
        fdtab[cfd].iocb = conn_fd_handler;
-       fd_want_recv(cfd);
+       conn_data_want_recv(&s->si[0].conn);
 
        if (p->accept && (ret = p->accept(s)) <= 0) {
                /* Either we had an unrecoverable error (<0) or work is
index eb2bfbdcc9f4551c2fdfcfc9a44c8a95824d7a42..c48480a81c5908859cb649076c5edbf35a6bbd31 100644 (file)
@@ -102,7 +102,7 @@ static int sock_raw_splice_in(struct buffer *b, struct stream_interface *si)
                 * place and ask the consumer to hurry.
                 */
                si->flags |= SI_FL_WAIT_ROOM;
-               fd_stop_recv(fd);
+               conn_data_stop_recv(&si->conn);
                b->rex = TICK_ETERNITY;
                si_chk_snd(b->cons);
                return 1;
@@ -467,7 +467,7 @@ static int sock_raw_read(struct connection *conn)
         */
 
        conn->flags |= CO_FL_ERROR;
-       fd_stop_both(fd);
+       conn_data_stop_both(conn);
        retval = 1;
        goto out_wakeup;
 }
@@ -628,7 +628,6 @@ static int sock_raw_write_loop(struct stream_interface *si, struct buffer *b)
  */
 static int sock_raw_write(struct connection *conn)
 {
-       int fd = conn->t.sock.fd;
        struct stream_interface *si = container_of(conn, struct stream_interface, conn);
        struct buffer *b = si->ob;
        int retval = 1;
@@ -660,7 +659,7 @@ static int sock_raw_write(struct connection *conn)
         */
 
        conn->flags |= CO_FL_ERROR;
-       fd_stop_both(fd);
+       conn_data_stop_both(conn);
        return 1;
 }
 
@@ -700,7 +699,7 @@ static void sock_raw_read0(struct stream_interface *si)
        }
 
        /* otherwise that's just a normal read shutdown */
-       fd_stop_recv(si_fd(si));
+       conn_data_stop_recv(&si->conn);
        return;
 
  do_close:
@@ -723,11 +722,10 @@ static void sock_raw_data_finish(struct stream_interface *si)
 {
        struct buffer *ib = si->ib;
        struct buffer *ob = si->ob;
-       int fd = si_fd(si);
 
        DPRINTF(stderr,"[%u] %s: fd=%d owner=%p ib=%p, ob=%p, exp(r,w)=%u,%u ibf=%08x obf=%08x ibh=%d ibt=%d obh=%d obd=%d si=%d\n",
                now_ms, __FUNCTION__,
-               fd, fdtab[fd].owner,
+               si_fd(si), fdtab[si_fd(fd)].owner,
                ib, ob,
                ib->rex, ob->wex,
                ib->flags, ob->flags,
@@ -741,7 +739,7 @@ static void sock_raw_data_finish(struct stream_interface *si)
                        if (!(si->flags & SI_FL_WAIT_ROOM)) {
                                if ((ib->flags & (BF_FULL|BF_HIJACK|BF_DONT_READ)) == BF_FULL)
                                        si->flags |= SI_FL_WAIT_ROOM;
-                               fd_stop_recv(fd);
+                               conn_data_stop_recv(&si->conn);
                                ib->rex = TICK_ETERNITY;
                        }
                }
@@ -752,7 +750,7 @@ static void sock_raw_data_finish(struct stream_interface *si)
                         * have updated it if there has been a completed I/O.
                         */
                        si->flags &= ~SI_FL_WAIT_ROOM;
-                       fd_want_recv(fd);
+                       conn_data_want_recv(&si->conn);
                        if (!(ib->flags & (BF_READ_NOEXP|BF_DONT_READ)) && !tick_isset(ib->rex))
                                ib->rex = tick_add_ifset(now_ms, ib->rto);
                }
@@ -766,7 +764,7 @@ static void sock_raw_data_finish(struct stream_interface *si)
                        if (!(si->flags & SI_FL_WAIT_DATA)) {
                                if ((ob->flags & (BF_FULL|BF_HIJACK|BF_SHUTW_NOW)) == 0)
                                        si->flags |= SI_FL_WAIT_DATA;
-                               fd_stop_send(fd);
+                               conn_data_stop_send(&si->conn);
                                ob->wex = TICK_ETERNITY;
                        }
                }
@@ -777,7 +775,7 @@ static void sock_raw_data_finish(struct stream_interface *si)
                         * have updated it if there has been a completed I/O.
                         */
                        si->flags &= ~SI_FL_WAIT_DATA;
-                       fd_want_send(fd);
+                       conn_data_want_send(&si->conn);
                        if (!tick_isset(ob->wex)) {
                                ob->wex = tick_add_ifset(now_ms, ob->wto);
                                if (tick_isset(ib->rex) && !(si->flags & SI_FL_INDEP_STR)) {
@@ -818,12 +816,12 @@ static void sock_raw_chk_rcv(struct stream_interface *si)
                /* stop reading */
                if ((ib->flags & (BF_FULL|BF_HIJACK|BF_DONT_READ)) == BF_FULL)
                        si->flags |= SI_FL_WAIT_ROOM;
-               fd_stop_recv(si_fd(si));
+               conn_data_stop_recv(&si->conn);
        }
        else {
                /* (re)start reading */
                si->flags &= ~SI_FL_WAIT_ROOM;
-               fd_want_recv(si_fd(si));
+               conn_data_want_recv(&si->conn);
        }
 }
 
@@ -869,7 +867,7 @@ static void sock_raw_chk_snd(struct stream_interface *si)
                 */
                si->conn.flags |= CO_FL_ERROR;
                fdtab[si_fd(si)].ev &= ~FD_POLL_STICKY;
-               fd_stop_both(si_fd(si));
+               conn_data_stop_both(&si->conn);
                si->flags |= SI_FL_ERR;
                goto out_wakeup;
        }
@@ -899,7 +897,7 @@ static void sock_raw_chk_snd(struct stream_interface *si)
                /* Otherwise there are remaining data to be sent in the buffer,
                 * which means we have to poll before doing so.
                 */
-               fd_want_send(si_fd(si));
+               conn_data_want_send(&si->conn);
                si->flags &= ~SI_FL_WAIT_DATA;
                if (!tick_isset(ob->wex))
                        ob->wex = tick_add_ifset(now_ms, ob->wto);
index 45707dd2c0b714f0ae6ea64630a0630f827ae466..c5da3f11007e1dec0494ff8efdcb39ad2f589268 100644 (file)
@@ -550,10 +550,12 @@ int conn_si_send_proxy(struct connection *conn, unsigned int flag)
 
        conn->flags |= CO_FL_ERROR;
        fdtab[fd].ev &= ~FD_POLL_STICKY;
-       fd_stop_both(fd);
+       conn_sock_stop_both(conn);
        goto out_leave;
 
  out_wait:
+       conn_sock_stop_recv(conn);
+       conn_sock_poll_send(conn);
        return FD_WAIT_WRITE;
 }
 
@@ -582,7 +584,7 @@ void stream_sock_update_conn(struct connection *conn)
                        if (((si->ob->flags & (BF_SHUTW|BF_HIJACK|BF_SHUTW_NOW)) == BF_SHUTW_NOW) &&
                            (si->state == SI_ST_EST))
                                stream_int_shutw(si);
-                       fd_stop_send(fd);
+                       conn_data_stop_send(conn);
                        si->ob->wex = TICK_ETERNITY;
                }
 
@@ -627,7 +629,7 @@ void stream_sock_update_conn(struct connection *conn)
                }
 
                if (si->flags & SI_FL_WAIT_ROOM) {
-                       fd_stop_recv(fd);
+                       conn_data_stop_recv(conn);
                        si->ib->rex = TICK_ETERNITY;
                }
                else if ((si->ib->flags & (BF_SHUTR|BF_READ_PARTIAL|BF_FULL|BF_DONT_READ|BF_READ_NOEXP)) == BF_READ_PARTIAL) {