From: Willy Tarreau Date: Mon, 23 Jul 2012 16:24:25 +0000 (+0200) Subject: MAJOR: remove the stream interface and task management code from sock_* X-Git-Tag: v1.5-dev12~107 X-Git-Url: http://git.ipfire.org/?a=commitdiff_plain;h=fd31e53139fa8d55c68ecf6f5a82443e8715f354;p=thirdparty%2Fhaproxy.git MAJOR: remove the stream interface and task management code from sock_* The socket data layer code must only focus on moving data between a socket and a buffer. We need a special stream interface handler to update the stream interface and the file descriptor status. At the moment the code works but suffers from a race condition caused by its API : the read/write callbacks still make use of the fd instead of using the connection. And when a double shutdown is performed, a call to ->write() after ->read() processed an error results in dereferencing a NULL fdtab[]->owner. This is only a temporary issue which doesn't need to be fixed now since this will automatically go away when the functions change to use the connection instead. --- diff --git a/include/proto/stream_interface.h b/include/proto/stream_interface.h index f36af89662..c7ae117cc0 100644 --- a/include/proto/stream_interface.h +++ b/include/proto/stream_interface.h @@ -34,6 +34,7 @@ int stream_int_check_timeouts(struct stream_interface *si); void stream_int_report_error(struct stream_interface *si); void stream_int_retnclose(struct stream_interface *si, const struct chunk *msg); int conn_si_send_proxy(struct connection *conn, unsigned int flag); +void stream_sock_update_conn(struct connection *conn); extern struct sock_ops stream_int_embedded; extern struct sock_ops stream_int_task; diff --git a/include/types/connection.h b/include/types/connection.h index 6e0b545101..2a7a9f5a0d 100644 --- a/include/types/connection.h +++ b/include/types/connection.h @@ -38,6 +38,7 @@ enum { CO_FL_WAIT_L4_CONN = 0x00000002, /* waiting for L4 to be connected */ /* flags below are used for connection handshakes */ CO_FL_SI_SEND_PROXY = 0x00000004, /* send a valid PROXY protocol header */ + CO_FL_NOTIFY_SI = 0x00000008, /* notify stream interface about changes */ }; /* This structure describes a connection with its methods and data. diff --git a/src/connection.c b/src/connection.c index 679bc89444..6b84c94e1a 100644 --- a/src/connection.c +++ b/src/connection.c @@ -60,6 +60,9 @@ int conn_fd_handler(int fd) } leave: + if (conn->flags & CO_FL_NOTIFY_SI) + stream_sock_update_conn(conn); + /* remove the events before leaving */ fdtab[fd].ev &= ~(FD_POLL_IN | FD_POLL_OUT | FD_POLL_HUP | FD_POLL_ERR); return ret; diff --git a/src/proto_tcp.c b/src/proto_tcp.c index c1a3e6ff0c..9fb03c44fe 100644 --- a/src/proto_tcp.c +++ b/src/proto_tcp.c @@ -467,6 +467,7 @@ int tcp_connect_server(struct stream_interface *si) fdtab[fd].owner = &si->conn; fdtab[fd].flags = FD_FL_TCP | FD_FL_TCP_NODELAY; si->conn.flags = CO_FL_WAIT_L4_CONN; /* connection in progress */ + si->conn.flags |= CO_FL_NOTIFY_SI; /* we're on a stream_interface */ /* Prepare to send a few handshakes related to the on-wire protocol. */ if (si->send_proxy_ofs) @@ -574,11 +575,8 @@ int tcp_connect_probe(int fd) */ conn->flags &= ~CO_FL_WAIT_L4_CONN; si->exp = TICK_ETERNITY; - return si_data(si)->write(fd); out_wakeup: - task_wakeup(si->owner, TASK_WOKEN_IO); - out_ignore: return retval; diff --git a/src/session.c b/src/session.c index 8c649e7483..eca06d934d 100644 --- a/src/session.c +++ b/src/session.c @@ -87,7 +87,7 @@ int session_accept(struct listener *l, int cfd, struct sockaddr_storage *addr) s->term_trace = 0; s->si[0].conn.t.sock.fd = cfd; s->si[0].conn.ctrl = l->proto; - s->si[0].conn.flags = CO_FL_NONE; + s->si[0].conn.flags = CO_FL_NONE | CO_FL_NOTIFY_SI; /* we're on a stream_interface */ s->si[0].addr.from = *addr; s->si[0].conn.peeraddr = (struct sockaddr *)&s->si[0].addr.from; s->si[0].conn.peerlen = sizeof(s->si[0].addr.from); diff --git a/src/sock_raw.c b/src/sock_raw.c index ac0dbcde29..74ad127751 100644 --- a/src/sock_raw.c +++ b/src/sock_raw.c @@ -240,6 +240,9 @@ static int sock_raw_read(int fd) retval = 1; + if (!conn) + goto out_wakeup; + /* stop immediately on errors. Note that we DON'T want to stop on * POLL_ERR, as the poller might report a write error while there * are still data available in the recv buffer. This typically @@ -447,43 +450,6 @@ static int sock_raw_read(int fd) } /* while (1) */ out_wakeup: - /* We might have some data the consumer is waiting for. - * We can do fast-forwarding, but we avoid doing this for partial - * buffers, because it is very likely that it will be done again - * immediately afterwards once the following data is parsed (eg: - * HTTP chunking). - */ - if (b->pipe || /* always try to send spliced data */ - (b->i == 0 && (b->cons->flags & SI_FL_WAIT_DATA))) { - int last_len = b->pipe ? b->pipe->data : 0; - - si_chk_snd(b->cons); - - /* check if the consumer has freed some space */ - if (!(b->flags & BF_FULL) && - (!last_len || !b->pipe || b->pipe->data < last_len)) - si->flags &= ~SI_FL_WAIT_ROOM; - } - - if (si->flags & SI_FL_WAIT_ROOM) { - EV_FD_CLR(fd, DIR_RD); - b->rex = TICK_ETERNITY; - } - else if ((b->flags & (BF_SHUTR|BF_READ_PARTIAL|BF_FULL|BF_DONT_READ|BF_READ_NOEXP)) == BF_READ_PARTIAL) - b->rex = tick_add_ifset(now_ms, b->rto); - - /* we have to wake up if there is a special event or if we don't have - * any more data to forward. - */ - if ((b->flags & (BF_READ_NULL|BF_READ_ERROR)) || - si->state != SI_ST_EST || - (si->flags & SI_FL_ERR) || - ((b->flags & BF_READ_PARTIAL) && (!b->to_forward || b->cons->state != SI_ST_EST))) - task_wakeup(si->owner, TASK_WOKEN_IO); - - if (b->flags & BF_READ_ACTIVITY) - b->flags &= ~BF_READ_DONTWAIT; - return retval; out_shutdown_r: @@ -677,6 +643,9 @@ static int sock_raw_write(int fd) #endif retval = 1; + if (!conn) + goto out_wakeup; + if (conn->flags & CO_FL_ERROR) goto out_error; @@ -688,58 +657,7 @@ static int sock_raw_write(int fd) if (retval < 0) goto out_error; - if (b->flags & BF_OUT_EMPTY) { - /* the connection is established but we can't write. Either the - * buffer is empty, or we just refrain from sending because the - * ->o limit was reached. Maybe we just wrote the last - * chunk and need to close. - */ - if (((b->flags & (BF_SHUTW|BF_HIJACK|BF_SHUTW_NOW)) == BF_SHUTW_NOW) && - (si->state == SI_ST_EST)) { - sock_raw_shutw(si); - goto out_wakeup; - } - - if ((b->flags & (BF_SHUTW|BF_SHUTW_NOW|BF_FULL|BF_HIJACK)) == 0) - si->flags |= SI_FL_WAIT_DATA; - - EV_FD_CLR(fd, DIR_WR); - b->wex = TICK_ETERNITY; - } - - if (b->flags & BF_WRITE_ACTIVITY) { - /* update timeout if we have written something */ - if ((b->flags & (BF_OUT_EMPTY|BF_SHUTW|BF_WRITE_PARTIAL)) == BF_WRITE_PARTIAL) - b->wex = tick_add_ifset(now_ms, b->wto); - - out_wakeup: - if (tick_isset(si->ib->rex) && !(si->flags & SI_FL_INDEP_STR)) { - /* Note: to prevent the client from expiring read timeouts - * during writes, we refresh it. We only do this if the - * interface is not configured for "independent streams", - * because for some applications it's better not to do this, - * for instance when continuously exchanging small amounts - * of data which can full the socket buffers long before a - * write timeout is detected. - */ - si->ib->rex = tick_add_ifset(now_ms, si->ib->rto); - } - - /* the producer might be waiting for more room to store data */ - if (likely((b->flags & (BF_SHUTW|BF_WRITE_PARTIAL|BF_FULL|BF_DONT_READ)) == BF_WRITE_PARTIAL && - (b->prod->flags & SI_FL_WAIT_ROOM))) - si_chk_rcv(b->prod); - - /* we have to wake up if there is a special event or if we don't have - * any more data to forward and it's not planned to send any more. - */ - if (likely((b->flags & (BF_WRITE_NULL|BF_WRITE_ERROR|BF_SHUTW)) || - ((b->flags & BF_OUT_EMPTY) && !b->to_forward) || - si->state != SI_ST_EST || - b->prod->state != SI_ST_EST)) - task_wakeup(si->owner, TASK_WOKEN_IO); - } - + out_wakeup: return retval; out_error: @@ -754,7 +672,6 @@ static int sock_raw_write(int fd) fdtab[fd].ev &= ~FD_POLL_STICKY; EV_FD_REM(fd); si->flags |= SI_FL_ERR; - task_wakeup(si->owner, TASK_WOKEN_IO); return 1; } diff --git a/src/stream_interface.c b/src/stream_interface.c index 59eca1bd3d..9d6f9616c4 100644 --- a/src/stream_interface.c +++ b/src/stream_interface.c @@ -33,6 +33,8 @@ #include #include +#include + /* socket functions used when running a stream interface as a task */ static void stream_int_update(struct stream_interface *si); static void stream_int_update_embedded(struct stream_interface *si); @@ -486,6 +488,95 @@ int conn_si_send_proxy(struct connection *conn, unsigned int flag) return FD_WAIT_WRITE; } +/* function to be called on stream sockets after all I/O handlers */ +void stream_sock_update_conn(struct connection *conn) +{ + int fd = conn->t.sock.fd; + struct stream_interface *si = container_of(conn, struct stream_interface, conn); + + DPRINTF(stderr, "%s: si=%p, si->state=%d ib->flags=%08x ob->flags=%08x\n", + __FUNCTION__, + si, si->state, si->ib->flags, si->ob->flags); + + /* process consumer side, only once if possible */ + if (fdtab[fd].ev & (FD_POLL_OUT | FD_POLL_ERR)) { + if (si->ob->flags & BF_OUT_EMPTY) { + if (((si->ob->flags & (BF_SHUTW|BF_HIJACK|BF_SHUTW_NOW)) == BF_SHUTW_NOW) && + (si->state == SI_ST_EST)) + si_shutw(si); + EV_FD_CLR(fd, DIR_WR); + si->ob->wex = TICK_ETERNITY; + } + + if ((si->ob->flags & (BF_FULL|BF_SHUTW|BF_SHUTW_NOW|BF_HIJACK)) == 0) + si->flags |= SI_FL_WAIT_DATA; + + if (si->ob->flags & BF_WRITE_ACTIVITY) { + /* update timeouts if we have written something */ + if ((si->ob->flags & (BF_OUT_EMPTY|BF_SHUTW|BF_WRITE_PARTIAL)) == BF_WRITE_PARTIAL) + if (tick_isset(si->ob->wex)) + si->ob->wex = tick_add_ifset(now_ms, si->ob->wto); + + if (!(si->flags & SI_FL_INDEP_STR)) + if (tick_isset(si->ib->rex)) + si->ib->rex = tick_add_ifset(now_ms, si->ib->rto); + + if (likely((si->ob->flags & (BF_SHUTW|BF_WRITE_PARTIAL|BF_FULL|BF_DONT_READ)) == BF_WRITE_PARTIAL && + (si->ob->prod->flags & SI_FL_WAIT_ROOM))) + si_chk_rcv(si->ob->prod); + } + } + + /* process producer side, only once if possible */ + if (fdtab[fd].ev & (FD_POLL_IN | FD_POLL_HUP | FD_POLL_ERR)) { + /* We might have some data the consumer is waiting for. + * We can do fast-forwarding, but we avoid doing this for partial + * buffers, because it is very likely that it will be done again + * immediately afterwards once the following data is parsed (eg: + * HTTP chunking). + */ + if (((si->ib->flags & (BF_READ_PARTIAL|BF_OUT_EMPTY)) == BF_READ_PARTIAL) && + (si->ib->pipe /* always try to send spliced data */ || + (si->ib->i == 0 && (si->ib->cons->flags & SI_FL_WAIT_DATA)))) { + int last_len = si->ib->pipe ? si->ib->pipe->data : 0; + + si_chk_snd(si->ib->cons); + + /* check if the consumer has freed some space */ + if (!(si->ib->flags & BF_FULL) && + (!last_len || !si->ib->pipe || si->ib->pipe->data < last_len)) + si->flags &= ~SI_FL_WAIT_ROOM; + } + + if (si->flags & SI_FL_WAIT_ROOM) { + EV_FD_CLR(fd, DIR_RD); + si->ib->rex = TICK_ETERNITY; + } + else if ((si->ib->flags & (BF_SHUTR|BF_READ_PARTIAL|BF_FULL|BF_DONT_READ|BF_READ_NOEXP)) == BF_READ_PARTIAL) { + if (tick_isset(si->ib->rex)) + si->ib->rex = tick_add_ifset(now_ms, si->ib->rto); + } + } + + /* wake the task up only when needed */ + if (/* changes on the production side */ + (si->ib->flags & (BF_READ_NULL|BF_READ_ERROR)) || + si->state != SI_ST_EST || + (si->flags & SI_FL_ERR) || + ((si->ib->flags & BF_READ_PARTIAL) && + (!si->ib->to_forward || si->ib->cons->state != SI_ST_EST)) || + + /* changes on the consumption side */ + (si->ob->flags & (BF_WRITE_NULL|BF_WRITE_ERROR)) || + ((si->ob->flags & BF_WRITE_ACTIVITY) && + ((si->ob->flags & BF_SHUTW) || + si->ob->prod->state != SI_ST_EST || + ((si->ob->flags & BF_OUT_EMPTY) && !si->ob->to_forward)))) { + task_wakeup(si->owner, TASK_WOKEN_IO); + } + if (si->ib->flags & BF_READ_ACTIVITY) + si->ib->flags &= ~BF_READ_DONTWAIT; +} /* * Local variables: