]> git.ipfire.org Git - thirdparty/haproxy.git/commitdiff
MAJOR: remove the stream interface and task management code from sock_*
authorWilly Tarreau <wtarreau@exceliance.fr>
Mon, 23 Jul 2012 16:24:25 +0000 (18:24 +0200)
committerWilly Tarreau <w@1wt.eu>
Sun, 2 Sep 2012 19:53:08 +0000 (21:53 +0200)
The socket data layer code must only focus on moving data between a
socket and a buffer. We need a special stream interface handler to
update the stream interface and the file descriptor status.

At the moment the code works but suffers from a race condition caused
by its API : the read/write callbacks still make use of the fd instead
of using the connection. And when a double shutdown is performed, a call
to ->write() after ->read() processed an error results in dereferencing
a NULL fdtab[]->owner. This is only a temporary issue which doesn't need
to be fixed now since this will automatically go away when the functions
change to use the connection instead.

include/proto/stream_interface.h
include/types/connection.h
src/connection.c
src/proto_tcp.c
src/session.c
src/sock_raw.c
src/stream_interface.c

index f36af89662c125789fc6863fe7fcebd6a1335b75..c7ae117cc0f4a520486a99957287787d9400ff77 100644 (file)
@@ -34,6 +34,7 @@ int stream_int_check_timeouts(struct stream_interface *si);
 void stream_int_report_error(struct stream_interface *si);
 void stream_int_retnclose(struct stream_interface *si, const struct chunk *msg);
 int conn_si_send_proxy(struct connection *conn, unsigned int flag);
+void stream_sock_update_conn(struct connection *conn);
 
 extern struct sock_ops stream_int_embedded;
 extern struct sock_ops stream_int_task;
index 6e0b54510102bd18333a82b88a39fefa9b8b4ba8..2a7a9f5a0d5c78227446bb84988cb39eecc4a45b 100644 (file)
@@ -38,6 +38,7 @@ enum {
        CO_FL_WAIT_L4_CONN  = 0x00000002,  /* waiting for L4 to be connected */
        /* flags below are used for connection handshakes */
        CO_FL_SI_SEND_PROXY = 0x00000004,  /* send a valid PROXY protocol header */
+       CO_FL_NOTIFY_SI     = 0x00000008,  /* notify stream interface about changes */
 };
 
 /* This structure describes a connection with its methods and data.
index 679bc8944414cbf97ac8924748493a5ce6adcfab..6b84c94e1a1cf6a498da5e03122d1d1678c09889 100644 (file)
@@ -60,6 +60,9 @@ int conn_fd_handler(int fd)
        }
 
  leave:
+       if (conn->flags & CO_FL_NOTIFY_SI)
+               stream_sock_update_conn(conn);
+
        /* remove the events before leaving */
        fdtab[fd].ev &= ~(FD_POLL_IN | FD_POLL_OUT | FD_POLL_HUP | FD_POLL_ERR);
        return ret;
index c1a3e6ff0c67db7f5d2aab5b270e3e1ebb725b19..9fb03c44fecfd61a61310d192d23d08961127d99 100644 (file)
@@ -467,6 +467,7 @@ int tcp_connect_server(struct stream_interface *si)
        fdtab[fd].owner = &si->conn;
        fdtab[fd].flags = FD_FL_TCP | FD_FL_TCP_NODELAY;
        si->conn.flags  = CO_FL_WAIT_L4_CONN; /* connection in progress */
+       si->conn.flags |= CO_FL_NOTIFY_SI; /* we're on a stream_interface */
 
        /* Prepare to send a few handshakes related to the on-wire protocol. */
        if (si->send_proxy_ofs)
@@ -574,11 +575,8 @@ int tcp_connect_probe(int fd)
         */
        conn->flags &= ~CO_FL_WAIT_L4_CONN;
        si->exp = TICK_ETERNITY;
-       return si_data(si)->write(fd);
 
  out_wakeup:
-       task_wakeup(si->owner, TASK_WOKEN_IO);
-
  out_ignore:
        return retval;
 
index 8c649e74833da5227c1ced4ac549488ab565ea94..eca06d934d199e3a58a1200e3cc1cd592b7fc9d4 100644 (file)
@@ -87,7 +87,7 @@ int session_accept(struct listener *l, int cfd, struct sockaddr_storage *addr)
        s->term_trace = 0;
        s->si[0].conn.t.sock.fd = cfd;
        s->si[0].conn.ctrl = l->proto;
-       s->si[0].conn.flags = CO_FL_NONE;
+       s->si[0].conn.flags = CO_FL_NONE | CO_FL_NOTIFY_SI; /* we're on a stream_interface */
        s->si[0].addr.from = *addr;
        s->si[0].conn.peeraddr = (struct sockaddr *)&s->si[0].addr.from;
        s->si[0].conn.peerlen  = sizeof(s->si[0].addr.from);
index ac0dbcde29dc6d8f421988ba4dbe35e6eb744d24..74ad1277519d41fa4ee8c60262de43019ca183d0 100644 (file)
@@ -240,6 +240,9 @@ static int sock_raw_read(int fd)
 
        retval = 1;
 
+       if (!conn)
+               goto out_wakeup;
+
        /* stop immediately on errors. Note that we DON'T want to stop on
         * POLL_ERR, as the poller might report a write error while there
         * are still data available in the recv buffer. This typically
@@ -447,43 +450,6 @@ static int sock_raw_read(int fd)
        } /* while (1) */
 
  out_wakeup:
-       /* We might have some data the consumer is waiting for.
-        * We can do fast-forwarding, but we avoid doing this for partial
-        * buffers, because it is very likely that it will be done again
-        * immediately afterwards once the following data is parsed (eg:
-        * HTTP chunking).
-        */
-       if (b->pipe || /* always try to send spliced data */
-           (b->i == 0 && (b->cons->flags & SI_FL_WAIT_DATA))) {
-               int last_len = b->pipe ? b->pipe->data : 0;
-
-               si_chk_snd(b->cons);
-
-               /* check if the consumer has freed some space */
-               if (!(b->flags & BF_FULL) &&
-                   (!last_len || !b->pipe || b->pipe->data < last_len))
-                       si->flags &= ~SI_FL_WAIT_ROOM;
-       }
-
-       if (si->flags & SI_FL_WAIT_ROOM) {
-               EV_FD_CLR(fd, DIR_RD);
-               b->rex = TICK_ETERNITY;
-       }
-       else if ((b->flags & (BF_SHUTR|BF_READ_PARTIAL|BF_FULL|BF_DONT_READ|BF_READ_NOEXP)) == BF_READ_PARTIAL)
-               b->rex = tick_add_ifset(now_ms, b->rto);
-
-       /* we have to wake up if there is a special event or if we don't have
-        * any more data to forward.
-        */
-       if ((b->flags & (BF_READ_NULL|BF_READ_ERROR)) ||
-           si->state != SI_ST_EST ||
-           (si->flags & SI_FL_ERR) ||
-           ((b->flags & BF_READ_PARTIAL) && (!b->to_forward || b->cons->state != SI_ST_EST)))
-               task_wakeup(si->owner, TASK_WOKEN_IO);
-
-       if (b->flags & BF_READ_ACTIVITY)
-               b->flags &= ~BF_READ_DONTWAIT;
-
        return retval;
 
  out_shutdown_r:
@@ -677,6 +643,9 @@ static int sock_raw_write(int fd)
 #endif
 
        retval = 1;
+       if (!conn)
+               goto out_wakeup;
+
        if (conn->flags & CO_FL_ERROR)
                goto out_error;
 
@@ -688,58 +657,7 @@ static int sock_raw_write(int fd)
        if (retval < 0)
                goto out_error;
 
-       if (b->flags & BF_OUT_EMPTY) {
-               /* the connection is established but we can't write. Either the
-                * buffer is empty, or we just refrain from sending because the
-                * ->o limit was reached. Maybe we just wrote the last
-                * chunk and need to close.
-                */
-               if (((b->flags & (BF_SHUTW|BF_HIJACK|BF_SHUTW_NOW)) == BF_SHUTW_NOW) &&
-                   (si->state == SI_ST_EST)) {
-                       sock_raw_shutw(si);
-                       goto out_wakeup;
-               }
-               
-               if ((b->flags & (BF_SHUTW|BF_SHUTW_NOW|BF_FULL|BF_HIJACK)) == 0)
-                       si->flags |= SI_FL_WAIT_DATA;
-
-               EV_FD_CLR(fd, DIR_WR);
-               b->wex = TICK_ETERNITY;
-       }
-
-       if (b->flags & BF_WRITE_ACTIVITY) {
-               /* update timeout if we have written something */
-               if ((b->flags & (BF_OUT_EMPTY|BF_SHUTW|BF_WRITE_PARTIAL)) == BF_WRITE_PARTIAL)
-                       b->wex = tick_add_ifset(now_ms, b->wto);
-
-       out_wakeup:
-               if (tick_isset(si->ib->rex) && !(si->flags & SI_FL_INDEP_STR)) {
-                       /* Note: to prevent the client from expiring read timeouts
-                        * during writes, we refresh it. We only do this if the
-                        * interface is not configured for "independent streams",
-                        * because for some applications it's better not to do this,
-                        * for instance when continuously exchanging small amounts
-                        * of data which can full the socket buffers long before a
-                        * write timeout is detected.
-                        */
-                       si->ib->rex = tick_add_ifset(now_ms, si->ib->rto);
-               }
-
-               /* the producer might be waiting for more room to store data */
-               if (likely((b->flags & (BF_SHUTW|BF_WRITE_PARTIAL|BF_FULL|BF_DONT_READ)) == BF_WRITE_PARTIAL &&
-                          (b->prod->flags & SI_FL_WAIT_ROOM)))
-                       si_chk_rcv(b->prod);
-
-               /* we have to wake up if there is a special event or if we don't have
-                * any more data to forward and it's not planned to send any more.
-                */
-               if (likely((b->flags & (BF_WRITE_NULL|BF_WRITE_ERROR|BF_SHUTW)) ||
-                          ((b->flags & BF_OUT_EMPTY) && !b->to_forward) ||
-                          si->state != SI_ST_EST ||
-                          b->prod->state != SI_ST_EST))
-                       task_wakeup(si->owner, TASK_WOKEN_IO);
-       }
-
+ out_wakeup:
        return retval;
 
  out_error:
@@ -754,7 +672,6 @@ static int sock_raw_write(int fd)
        fdtab[fd].ev &= ~FD_POLL_STICKY;
        EV_FD_REM(fd);
        si->flags |= SI_FL_ERR;
-       task_wakeup(si->owner, TASK_WOKEN_IO);
        return 1;
 }
 
index 59eca1bd3d341a8ac595553163cf5d4f820daaa3..9d6f9616c47f56daa8a08e9d0cf6ce8f53ee7c82 100644 (file)
@@ -33,6 +33,8 @@
 #include <proto/stream_interface.h>
 #include <proto/task.h>
 
+#include <types/pipe.h>
+
 /* socket functions used when running a stream interface as a task */
 static void stream_int_update(struct stream_interface *si);
 static void stream_int_update_embedded(struct stream_interface *si);
@@ -486,6 +488,95 @@ int conn_si_send_proxy(struct connection *conn, unsigned int flag)
        return FD_WAIT_WRITE;
 }
 
+/* function to be called on stream sockets after all I/O handlers */
+void stream_sock_update_conn(struct connection *conn)
+{
+       int fd = conn->t.sock.fd;
+       struct stream_interface *si = container_of(conn, struct stream_interface, conn);
+
+       DPRINTF(stderr, "%s: si=%p, si->state=%d ib->flags=%08x ob->flags=%08x\n",
+               __FUNCTION__,
+               si, si->state, si->ib->flags, si->ob->flags);
+
+       /* process consumer side, only once if possible */
+       if (fdtab[fd].ev & (FD_POLL_OUT | FD_POLL_ERR)) {
+               if (si->ob->flags & BF_OUT_EMPTY) {
+                       if (((si->ob->flags & (BF_SHUTW|BF_HIJACK|BF_SHUTW_NOW)) == BF_SHUTW_NOW) &&
+                           (si->state == SI_ST_EST))
+                               si_shutw(si);
+                       EV_FD_CLR(fd, DIR_WR);
+                       si->ob->wex = TICK_ETERNITY;
+               }
+
+               if ((si->ob->flags & (BF_FULL|BF_SHUTW|BF_SHUTW_NOW|BF_HIJACK)) == 0)
+                       si->flags |= SI_FL_WAIT_DATA;
+
+               if (si->ob->flags & BF_WRITE_ACTIVITY) {
+                       /* update timeouts if we have written something */
+                       if ((si->ob->flags & (BF_OUT_EMPTY|BF_SHUTW|BF_WRITE_PARTIAL)) == BF_WRITE_PARTIAL)
+                               if (tick_isset(si->ob->wex))
+                                       si->ob->wex = tick_add_ifset(now_ms, si->ob->wto);
+
+                       if (!(si->flags & SI_FL_INDEP_STR))
+                               if (tick_isset(si->ib->rex))
+                                       si->ib->rex = tick_add_ifset(now_ms, si->ib->rto);
+
+                       if (likely((si->ob->flags & (BF_SHUTW|BF_WRITE_PARTIAL|BF_FULL|BF_DONT_READ)) == BF_WRITE_PARTIAL &&
+                                  (si->ob->prod->flags & SI_FL_WAIT_ROOM)))
+                               si_chk_rcv(si->ob->prod);
+               }
+       }
+
+       /* process producer side, only once if possible */
+       if (fdtab[fd].ev & (FD_POLL_IN | FD_POLL_HUP | FD_POLL_ERR)) {
+               /* We might have some data the consumer is waiting for.
+                * We can do fast-forwarding, but we avoid doing this for partial
+                * buffers, because it is very likely that it will be done again
+                * immediately afterwards once the following data is parsed (eg:
+                * HTTP chunking).
+                */
+               if (((si->ib->flags & (BF_READ_PARTIAL|BF_OUT_EMPTY)) == BF_READ_PARTIAL) &&
+                   (si->ib->pipe /* always try to send spliced data */ ||
+                    (si->ib->i == 0 && (si->ib->cons->flags & SI_FL_WAIT_DATA)))) {
+                       int last_len = si->ib->pipe ? si->ib->pipe->data : 0;
+
+                       si_chk_snd(si->ib->cons);
+
+                       /* check if the consumer has freed some space */
+                       if (!(si->ib->flags & BF_FULL) &&
+                           (!last_len || !si->ib->pipe || si->ib->pipe->data < last_len))
+                               si->flags &= ~SI_FL_WAIT_ROOM;
+               }
+
+               if (si->flags & SI_FL_WAIT_ROOM) {
+                       EV_FD_CLR(fd, DIR_RD);
+                       si->ib->rex = TICK_ETERNITY;
+               }
+               else if ((si->ib->flags & (BF_SHUTR|BF_READ_PARTIAL|BF_FULL|BF_DONT_READ|BF_READ_NOEXP)) == BF_READ_PARTIAL) {
+                       if (tick_isset(si->ib->rex))
+                               si->ib->rex = tick_add_ifset(now_ms, si->ib->rto);
+               }
+       }
+
+       /* wake the task up only when needed */
+       if (/* changes on the production side */
+           (si->ib->flags & (BF_READ_NULL|BF_READ_ERROR)) ||
+           si->state != SI_ST_EST ||
+           (si->flags & SI_FL_ERR) ||
+           ((si->ib->flags & BF_READ_PARTIAL) &&
+            (!si->ib->to_forward || si->ib->cons->state != SI_ST_EST)) ||
+
+           /* changes on the consumption side */
+           (si->ob->flags & (BF_WRITE_NULL|BF_WRITE_ERROR)) ||
+           ((si->ob->flags & BF_WRITE_ACTIVITY) &&
+            ((si->ob->flags & BF_SHUTW) ||
+             si->ob->prod->state != SI_ST_EST ||
+             ((si->ob->flags & BF_OUT_EMPTY) && !si->ob->to_forward)))) {
+               task_wakeup(si->owner, TASK_WOKEN_IO);
+       }
+       if (si->ib->flags & BF_READ_ACTIVITY)
+               si->ib->flags &= ~BF_READ_DONTWAIT;
+}
 
 /*
  * Local variables: