From: Willy Tarreau Date: Thu, 9 Aug 2012 12:45:22 +0000 (+0200) Subject: MAJOR: make use of conn_{data|sock}_{poll|stop|want}* in connection handlers X-Git-Tag: v1.5-dev12~92 X-Git-Url: http://git.ipfire.org/cgi-bin/gitweb.cgi?a=commitdiff_plain;h=afad0e0f809adb3327f3c225028671b815caea94;p=thirdparty%2Fhaproxy.git MAJOR: make use of conn_{data|sock}_{poll|stop|want}* in connection handlers This is a second attempt at getting rid of FD_WAIT_*. Now the situation is much better since native I/O handlers can directly manipulate the FD using fd_{poll|want|stop}_* and the connection handlers manipulate connection-level flags using the conn_{data|sock}_* equivalent. Proceeding this way ensures that the connection flags always reflect the reality even after data<->handshake switches. --- diff --git a/include/proto/connection.h b/include/proto/connection.h index 6bef475cec..2ecdfd3c5a 100644 --- a/include/proto/connection.h +++ b/include/proto/connection.h @@ -26,7 +26,7 @@ #include /* I/O callback for fd-based connections. It calls the read/write handlers - * provided by the connection's sock_ops. Returns FD_WAIT_*. + * provided by the connection's sock_ops. Returns 0. */ int conn_fd_handler(int fd); diff --git a/include/types/stream_interface.h b/include/types/stream_interface.h index 20787e8547..6fbf94d04f 100644 --- a/include/types/stream_interface.h +++ b/include/types/stream_interface.h @@ -114,8 +114,8 @@ struct sock_ops { void (*shutw)(struct connection *, int); /* shutw function */ void (*chk_rcv)(struct stream_interface *); /* chk_rcv function */ void (*chk_snd)(struct stream_interface *); /* chk_snd function */ - int (*read)(struct connection *conn); /* read callback after poll() */ - int (*write)(struct connection *conn); /* write callback after poll() */ + void (*read)(struct connection *conn); /* read callback after poll() */ + void (*write)(struct connection *conn); /* write callback after poll() */ void (*close)(struct connection *); /* close the data channel on the connection */ }; diff --git a/src/connection.c b/src/connection.c index a3f38eb84f..1f0d8a98c4 100644 --- a/src/connection.c +++ b/src/connection.c @@ -19,13 +19,11 @@ #include /* I/O callback for fd-based connections. It calls the read/write handlers - * provided by the connection's sock_ops, which must be valid. It returns - * FD_WAIT_*. + * provided by the connection's sock_ops, which must be valid. It returns 0. */ int conn_fd_handler(int fd) { struct connection *conn = fdtab[fd].owner; - int ret = 0; if (unlikely(!conn)) goto leave; @@ -42,7 +40,7 @@ int conn_fd_handler(int fd) goto leave; if (conn->flags & CO_FL_SI_SEND_PROXY) - if ((ret = conn_si_send_proxy(conn, CO_FL_SI_SEND_PROXY))) + if (!conn_si_send_proxy(conn, CO_FL_SI_SEND_PROXY)) goto leave; } @@ -51,8 +49,7 @@ int conn_fd_handler(int fd) __conn_sock_stop_both(conn); if (fdtab[fd].ev & (FD_POLL_IN | FD_POLL_HUP | FD_POLL_ERR)) - if (!conn->data->read(conn)) - ret |= FD_WAIT_READ; + conn->data->read(conn); if (unlikely(conn->flags & CO_FL_ERROR)) goto leave; @@ -64,8 +61,7 @@ int conn_fd_handler(int fd) goto process_handshake; if (fdtab[fd].ev & (FD_POLL_OUT | FD_POLL_ERR)) - if (!conn->data->write(conn)) - ret |= FD_WAIT_WRITE; + conn->data->write(conn); if (unlikely(conn->flags & CO_FL_ERROR)) goto leave; @@ -81,7 +77,7 @@ int conn_fd_handler(int fd) * send in order to probe it ? Then let's retry the connect(). */ if (!tcp_connect_probe(conn)) - ret |= FD_WAIT_WRITE; + goto leave; } leave: @@ -97,7 +93,7 @@ int conn_fd_handler(int fd) /* commit polling changes */ conn_cond_update_polling(conn); - return ret; + return 0; } /* set polling depending on the change between the CURR part of the diff --git a/src/proto_tcp.c b/src/proto_tcp.c index 2f56797141..6c411afe63 100644 --- a/src/proto_tcp.c +++ b/src/proto_tcp.c @@ -523,15 +523,17 @@ int tcp_get_dst(int fd, struct sockaddr *sa, socklen_t salen, int dir) /* This is the callback which is set when a connection establishment is pending * and we have nothing to send, or if we have an init function we want to call - * once the connection is established. It returns zero if it needs some polling - * before being called again. + * once the connection is established. It updates the FD polling status. It + * returns 0 if it fails in a fatal way or needs to poll to go further, otherwise + * it returns non-zero and removes itself from the connection's flags (the bit is + * provided in by the caller). */ int tcp_connect_probe(struct connection *conn) { int fd = conn->t.sock.fd; if (conn->flags & CO_FL_ERROR) - return 1; + return 0; if (!(conn->flags & CO_FL_WAIT_L4_CONN)) return 1; /* strange we were called while ready */ @@ -576,7 +578,7 @@ int tcp_connect_probe(struct connection *conn) conn->flags |= CO_FL_ERROR; conn_sock_stop_both(conn); - return 1; + return 0; } diff --git a/src/protocols.c b/src/protocols.c index 2b2da06532..377c985d4e 100644 --- a/src/protocols.c +++ b/src/protocols.c @@ -240,9 +240,8 @@ void delete_listener(struct listener *listener) /* This function is called on a read event from a listening socket, corresponding * to an accept. It tries to accept as many connections as possible, and for each * calls the listener's accept handler (generally the frontend's accept handler). - * It returns FD_WAIT_READ or zero. */ -int listener_accept(int fd) +void listener_accept(int fd) { struct listener *l = fdtab[fd].owner; struct proxy *p = l->frontend; @@ -252,7 +251,7 @@ int listener_accept(int fd) if (unlikely(l->nbconn >= l->maxconn)) { listener_full(l); - return FD_WAIT_READ; + return; } if (global.cps_lim && !(l->options & LI_O_UNLIMITED)) { @@ -262,7 +261,7 @@ int listener_accept(int fd) /* frontend accept rate limit was reached */ limit_listener(l, &global_listener_queue); task_schedule(global_listener_queue_task, tick_add(now_ms, next_event_delay(&global.conn_per_sec, global.cps_lim, 0))); - return FD_WAIT_READ; + return; } if (max_accept > max) @@ -276,7 +275,7 @@ int listener_accept(int fd) /* frontend accept rate limit was reached */ limit_listener(l, &p->listener_queue); task_schedule(p->task, tick_add(now_ms, next_event_delay(&p->fe_sess_per_sec, p->fe_sps_lim, 0))); - return FD_WAIT_READ; + return; } if (max_accept > max) @@ -295,12 +294,12 @@ int listener_accept(int fd) if (unlikely(actconn >= global.maxconn) && !(l->options & LI_O_UNLIMITED)) { limit_listener(l, &global_listener_queue); task_schedule(global_listener_queue_task, tick_add(now_ms, 1000)); /* try again in 1 second */ - return FD_WAIT_READ; + return; } if (unlikely(p && p->feconn >= p->maxconn)) { limit_listener(l, &p->listener_queue); - return FD_WAIT_READ; + return; } cfd = accept(fd, (struct sockaddr *)&addr, &laddr); @@ -309,7 +308,8 @@ int listener_accept(int fd) case EAGAIN: case EINTR: case ECONNABORTED: - return FD_WAIT_READ; /* nothing more to accept */ + fd_poll_recv(fd); + return; /* nothing more to accept */ case ENFILE: if (p) send_log(p, LOG_EMERG, @@ -317,7 +317,7 @@ int listener_accept(int fd) p->id, maxfd); limit_listener(l, &global_listener_queue); task_schedule(global_listener_queue_task, tick_add(now_ms, 100)); /* try again in 100 ms */ - return FD_WAIT_READ; + return; case EMFILE: if (p) send_log(p, LOG_EMERG, @@ -325,7 +325,7 @@ int listener_accept(int fd) p->id, maxfd); limit_listener(l, &global_listener_queue); task_schedule(global_listener_queue_task, tick_add(now_ms, 100)); /* try again in 100 ms */ - return FD_WAIT_READ; + return; case ENOBUFS: case ENOMEM: if (p) @@ -334,9 +334,11 @@ int listener_accept(int fd) p->id, maxfd); limit_listener(l, &global_listener_queue); task_schedule(global_listener_queue_task, tick_add(now_ms, 100)); /* try again in 100 ms */ - return FD_WAIT_READ; + return; default: - return FD_WAIT_READ; + /* unexpected result, let's go back to poll */ + fd_poll_recv(fd); + return; } } @@ -359,7 +361,7 @@ int listener_accept(int fd) close(cfd); limit_listener(l, &global_listener_queue); task_schedule(global_listener_queue_task, tick_add(now_ms, 1000)); /* try again in 1 second */ - return FD_WAIT_READ; + return; } /* increase the per-process number of cumulated connections */ @@ -395,18 +397,18 @@ int listener_accept(int fd) limit_listener(l, &global_listener_queue); task_schedule(global_listener_queue_task, tick_add(now_ms, 100)); /* try again in 100 ms */ - return FD_WAIT_READ; + return; } if (l->nbconn >= l->maxconn) { listener_full(l); - return FD_WAIT_READ; + return; } } /* end of while (max_accept--) */ /* we've exhausted max_accept, so there is no need to poll again */ - return 0; + return; } /* Registers the protocol */ diff --git a/src/sock_raw.c b/src/sock_raw.c index c48480a81c..6ce4c91181 100644 --- a/src/sock_raw.c +++ b/src/sock_raw.c @@ -43,8 +43,8 @@ #include /* main event functions used to move data between sockets and buffers */ -static int sock_raw_read(struct connection *conn); -static int sock_raw_write(struct connection *conn); +static void sock_raw_read(struct connection *conn); +static void sock_raw_write(struct connection *conn); static void sock_raw_data_finish(struct stream_interface *si); static void sock_raw_read0(struct stream_interface *si); static void sock_raw_chk_rcv(struct stream_interface *si); @@ -66,9 +66,7 @@ static void sock_raw_chk_snd(struct stream_interface *si); /* Returns : * -1 if splice is not possible or not possible anymore and we must switch to * user-land copy (eg: to_forward reached) - * 0 when we know that polling is required to get more data (EAGAIN) - * 1 for all other cases (we can safely try again, or if an activity has been - * detected (DATA/NULL/ERR)) + * 0 otherwise, including errors and close. * Sets : * BF_READ_NULL * BF_READ_PARTIAL @@ -87,7 +85,7 @@ static int sock_raw_splice_in(struct buffer *b, struct stream_interface *si) int fd = si_fd(si); int ret; unsigned long max; - int retval = 1; + int retval = 0; if (!b->to_forward) return -1; @@ -105,7 +103,7 @@ static int sock_raw_splice_in(struct buffer *b, struct stream_interface *si) conn_data_stop_recv(&si->conn); b->rex = TICK_ETERNITY; si_chk_snd(b->cons); - return 1; + return 0; } if (unlikely(b->pipe == NULL)) { @@ -144,7 +142,6 @@ static int sock_raw_splice_in(struct buffer *b, struct stream_interface *si) */ splice_detects_close = 1; b->flags |= BF_READ_NULL; - retval = 1; /* no need for further polling */ break; } @@ -160,7 +157,6 @@ static int sock_raw_splice_in(struct buffer *b, struct stream_interface *si) if (b->pipe->data) { si->flags |= SI_FL_WAIT_ROOM; - retval = 1; break; } @@ -173,7 +169,7 @@ static int sock_raw_splice_in(struct buffer *b, struct stream_interface *si) * which will be able to deal with the situation. */ if (splice_detects_close) - retval = 0; /* we know for sure that it's EAGAIN */ + conn_data_poll_recv(&si->conn); /* we know for sure that it's EAGAIN */ else retval = -1; break; @@ -190,7 +186,6 @@ static int sock_raw_splice_in(struct buffer *b, struct stream_interface *si) /* here we have another error */ si->flags |= SI_FL_ERR; - retval = 1; break; } /* ret <= 0 */ @@ -204,7 +199,6 @@ static int sock_raw_splice_in(struct buffer *b, struct stream_interface *si) if (b->pipe->data >= SPLICE_FULL_HINT || ret >= global.tune.recv_enough) { /* We've read enough of it for this time. */ - retval = 1; break; } } /* while */ @@ -222,24 +216,18 @@ static int sock_raw_splice_in(struct buffer *b, struct stream_interface *si) /* * this function is called on a read event from a stream socket. - * It returns 0 if we have a high confidence that we will not be - * able to read more data without polling first. Returns non-zero - * otherwise. */ -static int sock_raw_read(struct connection *conn) +static void sock_raw_read(struct connection *conn) { int fd = conn->t.sock.fd; struct stream_interface *si = container_of(conn, struct stream_interface, conn); struct buffer *b = si->ib; - int ret, max, retval, cur_read; + int ret, max, cur_read; int read_poll = MAX_READ_POLL_LOOPS; #ifdef DEBUG_FULL fprintf(stderr,"sock_raw_read : fd=%d, ev=0x%02x, owner=%p\n", fd, fdtab[fd].ev, fdtab[fd].owner); #endif - - retval = 1; - /* stop immediately on errors. Note that we DON'T want to stop on * POLL_ERR, as the poller might report a write error while there * are still data available in the recv buffer. This typically @@ -255,7 +243,7 @@ static int sock_raw_read(struct connection *conn) /* maybe we were called immediately after an asynchronous shutr */ if (b->flags & BF_SHUTR) - goto out_wakeup; + return; #if defined(CONFIG_HAP_LINUX_SPLICE) if (b->to_forward >= MIN_SPLICE_FORWARD && b->flags & BF_KERN_SPLICING) { @@ -267,14 +255,12 @@ static int sock_raw_read(struct connection *conn) if (fdtab[fd].ev & FD_POLL_HUP) goto out_shutdown_r; - retval = sock_raw_splice_in(b, si); - - if (retval >= 0) { + if (sock_raw_splice_in(b, si) >= 0) { if (si->flags & SI_FL_ERR) goto out_error; if (b->flags & BF_READ_NULL) goto out_shutdown_r; - goto out_wakeup; + return; } /* splice not possible (anymore), let's go on on standard copy */ } @@ -438,7 +424,7 @@ static int sock_raw_read(struct connection *conn) * the task. */ if (cur_read < MIN_RET_FOR_READ_LOOP) - retval = 0; + conn_data_poll_recv(conn); break; } else { @@ -446,8 +432,7 @@ static int sock_raw_read(struct connection *conn) } } /* while (1) */ - out_wakeup: - return retval; + return; out_shutdown_r: /* we received a shutdown */ @@ -456,33 +441,22 @@ static int sock_raw_read(struct connection *conn) if (b->flags & BF_AUTO_CLOSE) buffer_shutw_now(b); sock_raw_read0(si); - goto out_wakeup; + return; out_error: - /* Read error on the file descriptor. We mark the FD as STERROR so - * that we don't use it anymore. The error is reported to the stream - * interface which will take proper action. We must not perturbate the - * buffer because the stream interface wants to ensure transparent - * connection retries. - */ - + /* Read error on the connection, report the error and stop I/O */ conn->flags |= CO_FL_ERROR; conn_data_stop_both(conn); - retval = 1; - goto out_wakeup; } /* * This function is called to send buffer data to a stream socket. - * It returns -1 in case of unrecoverable error, 0 if the caller needs to poll - * before calling it again, otherwise 1. If a pipe was associated with the - * buffer and it empties it, it releases it as well. + * It returns -1 in case of unrecoverable error, otherwise zero. */ static int sock_raw_write_loop(struct stream_interface *si, struct buffer *b) { int write_poll = MAX_WRITE_POLL_LOOPS; - int retval = 1; int ret, max; #if defined(CONFIG_HAP_LINUX_SPLICE) @@ -491,12 +465,11 @@ static int sock_raw_write_loop(struct stream_interface *si, struct buffer *b) SPLICE_F_MOVE|SPLICE_F_NONBLOCK); if (ret <= 0) { if (ret == 0 || errno == EAGAIN) { - retval = 0; - return retval; + conn_data_poll_send(&si->conn); + return 0; } /* here we have another error */ - retval = -1; - return retval; + return -1; } b->flags |= BF_WRITE_PARTIAL; @@ -509,11 +482,12 @@ static int sock_raw_write_loop(struct stream_interface *si, struct buffer *b) } if (--write_poll <= 0) - return retval; + return 0; /* The only reason we did not empty the pipe is that the output * buffer is full. */ + conn_data_poll_send(&si->conn); return 0; } @@ -523,7 +497,7 @@ static int sock_raw_write_loop(struct stream_interface *si, struct buffer *b) #endif if (!b->o) { b->flags |= BF_OUT_EMPTY; - return retval; + return 0; } /* when we're in this loop, we already know that there is no spliced @@ -607,30 +581,25 @@ static int sock_raw_write_loop(struct stream_interface *si, struct buffer *b) } else if (ret == 0 || errno == EAGAIN) { /* nothing written, we need to poll for write first */ - retval = 0; - break; + conn_data_poll_send(&si->conn); + return 0; } else { /* bad, we got an error */ - retval = -1; - break; + return -1; } } /* while (1) */ - - return retval; + return 0; } /* * This function is called on a write event from a stream socket. - * It returns 0 if the caller needs to poll before calling it again, otherwise - * non-zero. */ -static int sock_raw_write(struct connection *conn) +static void sock_raw_write(struct connection *conn) { struct stream_interface *si = container_of(conn, struct stream_interface, conn); struct buffer *b = si->ob; - int retval = 1; #ifdef DEBUG_FULL fprintf(stderr,"sock_raw_write : fd=%d, owner=%p\n", fd, fdtab[fd].owner); @@ -641,26 +610,19 @@ static int sock_raw_write(struct connection *conn) /* we might have been called just after an asynchronous shutw */ if (b->flags & BF_SHUTW) - goto out_wakeup; + return; - retval = sock_raw_write_loop(si, b); - if (retval < 0) + if (sock_raw_write_loop(si, b) < 0) goto out_error; - out_wakeup: - return retval; + /* OK all done */ + return; out_error: - /* Write error on the file descriptor. We mark the FD as STERROR so - * that we don't use it anymore. The error is reported to the stream - * interface which will take proper action. We must not perturbate the - * buffer because the stream interface wants to ensure transparent - * connection retries. - */ + /* Write error on the connection, report the error and stop I/O */ conn->flags |= CO_FL_ERROR; conn_data_stop_both(conn); - return 1; } /* @@ -834,7 +796,6 @@ static void sock_raw_chk_rcv(struct stream_interface *si) static void sock_raw_chk_snd(struct stream_interface *si) { struct buffer *ob = si->ob; - int retval; DPRINTF(stderr,"[%u] %s: fd=%d owner=%p ib=%p, ob=%p, exp(r,w)=%u,%u ibf=%08x obf=%08x ibh=%d ibt=%d obh=%d obd=%d si=%d\n", now_ms, __FUNCTION__, @@ -855,13 +816,7 @@ static void sock_raw_chk_snd(struct stream_interface *si) (fdtab[si_fd(si)].ev & FD_POLL_OUT))) /* we'll be called anyway */ return; - retval = sock_raw_write_loop(si, ob); - /* here, we have : - * retval < 0 if an error was encountered during write. - * retval = 0 if we can't write anymore without polling - * retval = 1 if we're invited to come back when desired - */ - if (retval < 0) { + if (sock_raw_write_loop(si, ob) < 0) { /* Write error on the file descriptor. We mark the FD as STERROR so * that we don't use it anymore and we notify the task. */ @@ -872,9 +827,8 @@ static void sock_raw_chk_snd(struct stream_interface *si) goto out_wakeup; } - /* OK, so now we know that retval >= 0 means that some data might have - * been sent, and that we may have to poll first. We have to do that - * too if the buffer is not empty. + /* OK, so now we know that some data might have been sent, and that we may + * have to poll first. We have to do that too if the buffer is not empty. */ if (ob->flags & BF_OUT_EMPTY) { /* the connection is established but we can't write. Either the diff --git a/src/stream_interface.c b/src/stream_interface.c index c5da3f1100..5f32933f8c 100644 --- a/src/stream_interface.c +++ b/src/stream_interface.c @@ -474,9 +474,9 @@ void stream_int_unregister_handler(struct stream_interface *si) } /* This callback is used to send a valid PROXY protocol line to a socket being - * established. It returns a combination of FD_WAIT_* if it wants some polling - * before being called again, otherwise it returns zero and removes itself from - * the connection's flags (the bit is provided in by the caller). + * established. It returns 0 if it fails in a fatal way or needs to poll to go + * further, otherwise it returns non-zero and removes itself from the connection's + * flags (the bit is provided in by the caller). */ int conn_si_send_proxy(struct connection *conn, unsigned int flag) { @@ -535,28 +535,21 @@ int conn_si_send_proxy(struct connection *conn, unsigned int flag) conn->flags &= ~CO_FL_WAIT_L4_CONN; b->flags |= BF_WRITE_NULL; si->exp = TICK_ETERNITY; - - out_leave: conn->flags &= ~flag; - return 0; + return 1; out_error: - /* Write error on the file descriptor. We mark the FD as STERROR so - * that we don't use it anymore. The error is reported to the stream - * interface which will take proper action. We must not perturbate the - * buffer because the stream interface wants to ensure transparent - * connection retries. - */ - + /* Write error on the file descriptor */ conn->flags |= CO_FL_ERROR; + conn->flags &= ~flag; fdtab[fd].ev &= ~FD_POLL_STICKY; conn_sock_stop_both(conn); - goto out_leave; + return 0; out_wait: conn_sock_stop_recv(conn); conn_sock_poll_send(conn); - return FD_WAIT_WRITE; + return 0; } /* function to be called on stream sockets after all I/O handlers */