From: Willy Tarreau Date: Mon, 7 May 2012 19:22:09 +0000 (+0200) Subject: REORG/MEDIUM: move the default accept function from sockstream to protocols.c X-Git-Tag: v1.5-dev9~13 X-Git-Url: http://git.ipfire.org/cgi-bin/gitweb.cgi?a=commitdiff_plain;h=bbebbbff83255af915efa7dfe2e40a01676e519c;p=thirdparty%2Fhaproxy.git REORG/MEDIUM: move the default accept function from sockstream to protocols.c The previous sockstream_accept() function uses nothing from sockstream, and is totally irrelevant to stream interfaces. Move this to the protocols.c file which handles listeners and protocols, and call it listener_accept(). It now makes much more sense that the code dealing with listen() also handles accept() and passes it to upper layers. --- diff --git a/include/proto/protocols.h b/include/proto/protocols.h index cebe560913..91334dab99 100644 --- a/include/proto/protocols.h +++ b/include/proto/protocols.h @@ -99,6 +99,12 @@ int unbind_all_listeners(struct protocol *proto); */ void delete_listener(struct listener *listener); +/* This function is called on a read event from a listening socket, corresponding + * to an accept. It tries to accept as many connections as possible, and for each + * calls the listener's accept handler (generally the frontend's accept handler). + */ +int listener_accept(int fd); + /* Registers the protocol */ void protocol_register(struct protocol *proto); diff --git a/include/proto/stream_sock.h b/include/proto/stream_sock.h index 577dad8540..c93e483a30 100644 --- a/include/proto/stream_sock.h +++ b/include/proto/stream_sock.h @@ -31,7 +31,6 @@ /* main event functions used to move data between sockets and buffers */ -int stream_sock_accept(int fd); int stream_sock_read(int fd); int stream_sock_write(int fd); void stream_sock_data_finish(struct stream_interface *si); diff --git a/src/proto_tcp.c b/src/proto_tcp.c index 478bff5729..0c389d87c3 100644 --- a/src/proto_tcp.c +++ b/src/proto_tcp.c @@ -69,7 +69,7 @@ static struct protocol proto_tcpv4 = { .sock_family = AF_INET, .sock_addrlen = sizeof(struct sockaddr_in), .l3_addrlen = 32/8, - .accept = &stream_sock_accept, + .accept = &listener_accept, .connect = tcp_connect_server, .bind = tcp_bind_listener, .bind_all = tcp_bind_listeners, @@ -88,7 +88,7 @@ static struct protocol proto_tcpv6 = { .sock_family = AF_INET6, .sock_addrlen = sizeof(struct sockaddr_in6), .l3_addrlen = 128/8, - .accept = &stream_sock_accept, + .accept = &listener_accept, .connect = tcp_connect_server, .bind = tcp_bind_listener, .bind_all = tcp_bind_listeners, diff --git a/src/proto_uxst.c b/src/proto_uxst.c index 2140ae6cae..1a808ead89 100644 --- a/src/proto_uxst.c +++ b/src/proto_uxst.c @@ -55,7 +55,7 @@ static struct protocol proto_unix = { .sock_family = AF_UNIX, .sock_addrlen = sizeof(struct sockaddr_un), .l3_addrlen = sizeof(((struct sockaddr_un*)0)->sun_path),/* path len */ - .accept = &stream_sock_accept, + .accept = &listener_accept, .bind = uxst_bind_listener, .bind_all = uxst_bind_listeners, .unbind_all = uxst_unbind_listeners, diff --git a/src/protocols.c b/src/protocols.c index 155636f4fd..cc7b3ce312 100644 --- a/src/protocols.c +++ b/src/protocols.c @@ -10,6 +10,7 @@ * */ +#include #include #include @@ -17,9 +18,15 @@ #include #include #include +#include + +#include #include #include +#include +#include +#include /* List head of all registered protocols */ static struct list protocols = LIST_HEAD_INIT(protocols); @@ -230,6 +237,164 @@ void delete_listener(struct listener *listener) listener->proto->nb_listeners--; } +/* This function is called on a read event from a listening socket, corresponding + * to an accept. It tries to accept as many connections as possible, and for each + * calls the listener's accept handler (generally the frontend's accept handler). + */ +int listener_accept(int fd) +{ + struct listener *l = fdtab[fd].owner; + struct proxy *p = l->frontend; + int max_accept = global.tune.maxaccept; + int cfd; + int ret; + + if (unlikely(l->nbconn >= l->maxconn)) { + listener_full(l); + return 0; + } + + if (global.cps_lim && !(l->options & LI_O_UNLIMITED)) { + int max = freq_ctr_remain(&global.conn_per_sec, global.cps_lim, 0); + + if (unlikely(!max)) { + /* frontend accept rate limit was reached */ + limit_listener(l, &global_listener_queue); + task_schedule(global_listener_queue_task, tick_add(now_ms, next_event_delay(&global.conn_per_sec, global.cps_lim, 0))); + return 0; + } + + if (max_accept > max) + max_accept = max; + } + + if (p && p->fe_sps_lim) { + int max = freq_ctr_remain(&p->fe_sess_per_sec, p->fe_sps_lim, 0); + + if (unlikely(!max)) { + /* frontend accept rate limit was reached */ + limit_listener(l, &p->listener_queue); + task_schedule(p->task, tick_add(now_ms, next_event_delay(&p->fe_sess_per_sec, p->fe_sps_lim, 0))); + return 0; + } + + if (max_accept > max) + max_accept = max; + } + + /* Note: if we fail to allocate a connection because of configured + * limits, we'll schedule a new attempt worst 1 second later in the + * worst case. If we fail due to system limits or temporary resource + * shortage, we try again 100ms later in the worst case. + */ + while (max_accept--) { + struct sockaddr_storage addr; + socklen_t laddr = sizeof(addr); + + if (unlikely(actconn >= global.maxconn) && !(l->options & LI_O_UNLIMITED)) { + limit_listener(l, &global_listener_queue); + task_schedule(global_listener_queue_task, tick_add(now_ms, 1000)); /* try again in 1 second */ + return 0; + } + + if (unlikely(p && p->feconn >= p->maxconn)) { + limit_listener(l, &p->listener_queue); + return 0; + } + + cfd = accept(fd, (struct sockaddr *)&addr, &laddr); + if (unlikely(cfd == -1)) { + switch (errno) { + case EAGAIN: + case EINTR: + case ECONNABORTED: + return 0; /* nothing more to accept */ + case ENFILE: + if (p) + send_log(p, LOG_EMERG, + "Proxy %s reached system FD limit at %d. Please check system tunables.\n", + p->id, maxfd); + limit_listener(l, &global_listener_queue); + task_schedule(global_listener_queue_task, tick_add(now_ms, 100)); /* try again in 100 ms */ + return 0; + case EMFILE: + if (p) + send_log(p, LOG_EMERG, + "Proxy %s reached process FD limit at %d. Please check 'ulimit-n' and restart.\n", + p->id, maxfd); + limit_listener(l, &global_listener_queue); + task_schedule(global_listener_queue_task, tick_add(now_ms, 100)); /* try again in 100 ms */ + return 0; + case ENOBUFS: + case ENOMEM: + if (p) + send_log(p, LOG_EMERG, + "Proxy %s reached system memory limit at %d sockets. Please check system tunables.\n", + p->id, maxfd); + limit_listener(l, &global_listener_queue); + task_schedule(global_listener_queue_task, tick_add(now_ms, 100)); /* try again in 100 ms */ + return 0; + default: + return 0; + } + } + + if (unlikely(cfd >= global.maxsock)) { + send_log(p, LOG_EMERG, + "Proxy %s reached the configured maximum connection limit. Please check the global 'maxconn' value.\n", + p->id); + close(cfd); + limit_listener(l, &global_listener_queue); + task_schedule(global_listener_queue_task, tick_add(now_ms, 1000)); /* try again in 1 second */ + return 0; + } + + /* increase the per-process number of cumulated connections */ + if (!(l->options & LI_O_UNLIMITED)) { + update_freq_ctr(&global.conn_per_sec, 1); + if (global.conn_per_sec.curr_ctr > global.cps_max) + global.cps_max = global.conn_per_sec.curr_ctr; + actconn++; + } + + jobs++; + totalconn++; + l->nbconn++; + + if (l->counters) { + if (l->nbconn > l->counters->conn_max) + l->counters->conn_max = l->nbconn; + } + + ret = l->accept(l, cfd, &addr); + if (unlikely(ret <= 0)) { + /* The connection was closed by session_accept(). Either + * we just have to ignore it (ret == 0) or it's a critical + * error due to a resource shortage, and we must stop the + * listener (ret < 0). + */ + if (!(l->options & LI_O_UNLIMITED)) + actconn--; + jobs--; + l->nbconn--; + if (ret == 0) /* successful termination */ + continue; + + limit_listener(l, &global_listener_queue); + task_schedule(global_listener_queue_task, tick_add(now_ms, 100)); /* try again in 100 ms */ + return 0; + } + + if (l->nbconn >= l->maxconn) { + listener_full(l); + return 0; + } + + } /* end of while (p->feconn < p->maxconn) */ + + return 0; +} + /* Registers the protocol */ void protocol_register(struct protocol *proto) { diff --git a/src/stream_sock.c b/src/stream_sock.c index 93e0a9283f..accd4a1f37 100644 --- a/src/stream_sock.c +++ b/src/stream_sock.c @@ -1138,164 +1138,6 @@ void stream_sock_chk_snd(struct stream_interface *si) } } -/* This function is called on a read event from a listening socket, corresponding - * to an accept. It tries to accept as many connections as possible, and for each - * calls the listener's accept handler (generally the frontend's accept handler). - */ -int stream_sock_accept(int fd) -{ - struct listener *l = fdtab[fd].owner; - struct proxy *p = l->frontend; - int max_accept = global.tune.maxaccept; - int cfd; - int ret; - - if (unlikely(l->nbconn >= l->maxconn)) { - listener_full(l); - return 0; - } - - if (global.cps_lim && !(l->options & LI_O_UNLIMITED)) { - int max = freq_ctr_remain(&global.conn_per_sec, global.cps_lim, 0); - - if (unlikely(!max)) { - /* frontend accept rate limit was reached */ - limit_listener(l, &global_listener_queue); - task_schedule(global_listener_queue_task, tick_add(now_ms, next_event_delay(&global.conn_per_sec, global.cps_lim, 0))); - return 0; - } - - if (max_accept > max) - max_accept = max; - } - - if (p && p->fe_sps_lim) { - int max = freq_ctr_remain(&p->fe_sess_per_sec, p->fe_sps_lim, 0); - - if (unlikely(!max)) { - /* frontend accept rate limit was reached */ - limit_listener(l, &p->listener_queue); - task_schedule(p->task, tick_add(now_ms, next_event_delay(&p->fe_sess_per_sec, p->fe_sps_lim, 0))); - return 0; - } - - if (max_accept > max) - max_accept = max; - } - - /* Note: if we fail to allocate a connection because of configured - * limits, we'll schedule a new attempt worst 1 second later in the - * worst case. If we fail due to system limits or temporary resource - * shortage, we try again 100ms later in the worst case. - */ - while (max_accept--) { - struct sockaddr_storage addr; - socklen_t laddr = sizeof(addr); - - if (unlikely(actconn >= global.maxconn) && !(l->options & LI_O_UNLIMITED)) { - limit_listener(l, &global_listener_queue); - task_schedule(global_listener_queue_task, tick_add(now_ms, 1000)); /* try again in 1 second */ - return 0; - } - - if (unlikely(p && p->feconn >= p->maxconn)) { - limit_listener(l, &p->listener_queue); - return 0; - } - - cfd = accept(fd, (struct sockaddr *)&addr, &laddr); - if (unlikely(cfd == -1)) { - switch (errno) { - case EAGAIN: - case EINTR: - case ECONNABORTED: - return 0; /* nothing more to accept */ - case ENFILE: - if (p) - send_log(p, LOG_EMERG, - "Proxy %s reached system FD limit at %d. Please check system tunables.\n", - p->id, maxfd); - limit_listener(l, &global_listener_queue); - task_schedule(global_listener_queue_task, tick_add(now_ms, 100)); /* try again in 100 ms */ - return 0; - case EMFILE: - if (p) - send_log(p, LOG_EMERG, - "Proxy %s reached process FD limit at %d. Please check 'ulimit-n' and restart.\n", - p->id, maxfd); - limit_listener(l, &global_listener_queue); - task_schedule(global_listener_queue_task, tick_add(now_ms, 100)); /* try again in 100 ms */ - return 0; - case ENOBUFS: - case ENOMEM: - if (p) - send_log(p, LOG_EMERG, - "Proxy %s reached system memory limit at %d sockets. Please check system tunables.\n", - p->id, maxfd); - limit_listener(l, &global_listener_queue); - task_schedule(global_listener_queue_task, tick_add(now_ms, 100)); /* try again in 100 ms */ - return 0; - default: - return 0; - } - } - - if (unlikely(cfd >= global.maxsock)) { - send_log(p, LOG_EMERG, - "Proxy %s reached the configured maximum connection limit. Please check the global 'maxconn' value.\n", - p->id); - close(cfd); - limit_listener(l, &global_listener_queue); - task_schedule(global_listener_queue_task, tick_add(now_ms, 1000)); /* try again in 1 second */ - return 0; - } - - /* increase the per-process number of cumulated connections */ - if (!(l->options & LI_O_UNLIMITED)) { - update_freq_ctr(&global.conn_per_sec, 1); - if (global.conn_per_sec.curr_ctr > global.cps_max) - global.cps_max = global.conn_per_sec.curr_ctr; - actconn++; - } - - jobs++; - totalconn++; - l->nbconn++; - - if (l->counters) { - if (l->nbconn > l->counters->conn_max) - l->counters->conn_max = l->nbconn; - } - - ret = l->accept(l, cfd, &addr); - if (unlikely(ret <= 0)) { - /* The connection was closed by session_accept(). Either - * we just have to ignore it (ret == 0) or it's a critical - * error due to a resource shortage, and we must stop the - * listener (ret < 0). - */ - if (!(l->options & LI_O_UNLIMITED)) - actconn--; - jobs--; - l->nbconn--; - if (ret == 0) /* successful termination */ - continue; - - limit_listener(l, &global_listener_queue); - task_schedule(global_listener_queue_task, tick_add(now_ms, 100)); /* try again in 100 ms */ - return 0; - } - - if (l->nbconn >= l->maxconn) { - listener_full(l); - return 0; - } - - } /* end of while (p->feconn < p->maxconn) */ - - return 0; -} - /* stream sock operations */ struct sock_ops stream_sock = { .update = stream_sock_data_finish,