From 35a0994984950337127d9aea49c474e01fcb1db3 Mon Sep 17 00:00:00 2001 From: Willy Tarreau Date: Tue, 1 Jun 2010 17:12:40 +0200 Subject: [PATCH] [MAJOR] frontend: reorder the session initialization upon accept This will be needed for the last factoring step which adds support for application-level accept(). The tcp/http accept() code has now been isolated and will have to move to a separate function. --- src/frontend.c | 274 ++++++++++++++++++++++++++----------------------- 1 file changed, 148 insertions(+), 126 deletions(-) diff --git a/src/frontend.c b/src/frontend.c index fc40929f45..0e4d24fa59 100644 --- a/src/frontend.c +++ b/src/frontend.c @@ -69,15 +69,13 @@ int frontend_accept(struct listener *l, int cfd, struct sockaddr_storage *addr) goto out_close; } - LIST_ADDQ(&sessions, &s->list); - LIST_INIT(&s->back_refs); - + /* minimum session initialization required for monitor mode below */ s->flags = 0; - s->term_trace = 0; - s->cli_addr = *addr; + s->logs.logwait = p->to_log; /* if this session comes from a known monitoring system, we want to ignore - * it as soon as possible, which means closing it immediately for TCP. + * it as soon as possible, which means closing it immediately for TCP, but + * cleanly. */ if (unlikely((l->options & LI_O_CHK_MONNET) && addr->ss_family == AF_INET && @@ -87,16 +85,29 @@ int frontend_accept(struct listener *l, int cfd, struct sockaddr_storage *addr) return 0; } s->flags |= SN_MONITOR; + s->logs.logwait = 0; } + /* OK, we're keeping the session, so let's properly initialize the session */ + LIST_ADDQ(&sessions, &s->list); + LIST_INIT(&s->back_refs); + if (unlikely((t = task_new()) == NULL)) { /* disable this proxy for a while */ Alert("out of memory in event_accept().\n"); goto out_free_session; } + s->term_trace = 0; + s->cli_addr = *addr; + s->logs.accept_date = date; /* user-visible date for logging */ + s->logs.tv_accept = now; /* corrected date for internal use */ + s->uniq_id = totalconn; + proxy_inc_fe_ctr(l, p); /* note: cum_beconn will be increased once assigned */ + t->process = l->handler; t->context = s; t->nice = l->nice; + t->expire = TICK_ETERNITY; s->task = t; s->listener = l; @@ -105,34 +116,13 @@ int frontend_accept(struct listener *l, int cfd, struct sockaddr_storage *addr) * This changes later when switching rules are executed or * when the default backend is assigned. */ - s->be = s->fe = p; - + s->be = s->fe = p; s->req = s->rep = NULL; /* will be allocated later */ - /* this part should be common with other protocols */ - s->si[0].state = s->si[0].prev_state = SI_ST_EST; - s->si[0].err_type = SI_ET_NONE; - s->si[0].err_loc = NULL; - s->si[0].owner = t; - s->si[0].update = stream_sock_data_finish; - s->si[0].shutr = stream_sock_shutr; - s->si[0].shutw = stream_sock_shutw; - s->si[0].chk_rcv = stream_sock_chk_rcv; - s->si[0].chk_snd = stream_sock_chk_snd; - s->si[0].connect = NULL; - s->si[0].iohandler = NULL; - s->si[0].fd = cfd; - s->si[0].flags = SI_FL_NONE | SI_FL_CAP_SPLTCP; /* TCP splicing capable */ - if (likely(s->fe->options2 & PR_O2_INDEPSTR)) - s->si[0].flags |= SI_FL_INDEP_STR; - s->si[0].exp = TICK_ETERNITY; - - s->logs.accept_date = date; /* user-visible date for logging */ - s->logs.tv_accept = now; /* corrected date for internal use */ - s->uniq_id = totalconn; - proxy_inc_fe_ctr(l, p); /* note: cum_beconn will be increased once assigned */ - - /* now evaluate the tcp-request rules */ + /* now evaluate the tcp-request layer4 rules. Since we expect to be able + * to abort right here as soon as possible, we check the rules before + * even initializing the stream interfaces. + */ if ((l->options & LI_O_TCP_RULES) && !tcp_exec_req_rules(s)) { task_free(t); LIST_DEL(&s->list); @@ -142,16 +132,41 @@ int frontend_accept(struct listener *l, int cfd, struct sockaddr_storage *addr) return 0; } - /* pre-initialize the other side's stream interface to an INIT state */ - s->si[1].owner = t; - s->si[1].state = s->si[1].prev_state = SI_ST_INI; - s->si[1].err_type = SI_ET_NONE; - s->si[1].err_loc = NULL; + /* this part should be common with other protocols */ + s->si[0].fd = cfd; + s->si[0].owner = t; + s->si[0].state = s->si[0].prev_state = SI_ST_EST; + s->si[0].err_type = SI_ET_NONE; + s->si[0].err_loc = NULL; + s->si[0].connect = NULL; + s->si[0].iohandler = NULL; + s->si[0].exp = TICK_ETERNITY; + s->si[0].flags = SI_FL_NONE; + + if (likely(s->fe->options2 & PR_O2_INDEPSTR)) + s->si[0].flags |= SI_FL_INDEP_STR; + + if (addr->ss_family == AF_INET || addr->ss_family == AF_INET6) + s->si[0].flags = SI_FL_CAP_SPLTCP; /* TCP/TCPv6 splicing possible */ + + /* add the various callbacks */ + stream_sock_prepare_interface(&s->si[0]); + + /* pre-initialize the other side's stream interface to an INIT state. The + * callbacks will be initialized before attempting to connect. + */ + s->si[1].fd = -1; /* just to help with debugging */ + s->si[1].owner = t; + s->si[1].state = s->si[1].prev_state = SI_ST_INI; + s->si[1].err_type = SI_ET_NONE; + s->si[1].err_loc = NULL; + s->si[1].connect = NULL; s->si[1].iohandler = NULL; - s->si[1].exp = TICK_ETERNITY; - s->si[1].fd = -1; /* just to help with debugging */ + s->si[1].shutr = stream_int_shutr; + s->si[1].shutw = stream_int_shutw; + s->si[1].exp = TICK_ETERNITY; + s->si[1].flags = SI_FL_NONE; - s->si[1].flags = SI_FL_NONE; if (likely(s->fe->options2 & PR_O2_INDEPSTR)) s->si[1].flags |= SI_FL_INDEP_STR; @@ -161,22 +176,77 @@ int frontend_accept(struct listener *l, int cfd, struct sockaddr_storage *addr) /* init store persistence */ s->store_count = 0; - /* FIXME: the logs are horribly complicated now, because they are - * defined in

,

, and later and . + /* Adjust some socket options */ + if (unlikely(fcntl(cfd, F_SETFL, O_NONBLOCK) == -1)) { + Alert("accept(): cannot set the socket in non blocking mode. Giving up\n"); + goto out_free_task; + } + + txn = &s->txn; + /* Those variables will be checked and freed if non-NULL in + * session.c:session_free(). It is important that they are + * properly initialized. */ + txn->sessid = NULL; + txn->srv_cookie = NULL; + txn->cli_cookie = NULL; + txn->uri = NULL; + txn->req.cap = NULL; + txn->rsp.cap = NULL; + txn->hdr_idx.v = NULL; + txn->hdr_idx.size = txn->hdr_idx.used = 0; - if (s->flags & SN_MONITOR) - s->logs.logwait = 0; - else - s->logs.logwait = p->to_log; + if (unlikely((s->req = pool_alloc2(pool2_buffer)) == NULL)) + goto out_free_task; /* no memory */ - if (s->logs.logwait & LW_REQ) - s->do_log = http_sess_log; - else - s->do_log = tcp_sess_log; + if (unlikely((s->rep = pool_alloc2(pool2_buffer)) == NULL)) + goto out_free_req; /* no memory */ - /* default error reporting function, may be changed by analysers */ - s->srv_error = default_srv_error; + /* initialize the request buffer */ + s->req->size = global.tune.bufsize; + buffer_init(s->req); + s->req->prod = &s->si[0]; + s->req->cons = &s->si[1]; + s->si[0].ib = s->si[1].ob = s->req; + s->req->flags |= BF_READ_ATTACHED; /* the producer is already connected */ + + /* activate default analysers enabled for this listener */ + s->req->analysers = l->analysers; + + s->req->wto = TICK_ETERNITY; + s->req->rto = TICK_ETERNITY; + s->req->rex = TICK_ETERNITY; + s->req->wex = TICK_ETERNITY; + s->req->analyse_exp = TICK_ETERNITY; + + /* initialize response buffer */ + s->rep->size = global.tune.bufsize; + buffer_init(s->rep); + s->rep->prod = &s->si[1]; + s->rep->cons = &s->si[0]; + s->si[0].ob = s->si[1].ib = s->rep; + s->rep->analysers = 0; + + s->rep->rto = TICK_ETERNITY; + s->rep->wto = TICK_ETERNITY; + s->rep->rex = TICK_ETERNITY; + s->rep->wex = TICK_ETERNITY; + s->rep->analyse_exp = TICK_ETERNITY; + + /* finish initialization of the accepted file descriptor */ + fd_insert(cfd); + fdtab[cfd].owner = &s->si[0]; + fdtab[cfd].state = FD_STREADY; + fdtab[cfd].flags = 0; + fdtab[cfd].cb[DIR_RD].f = l->proto->read; + fdtab[cfd].cb[DIR_RD].b = s->req; + fdtab[cfd].cb[DIR_WR].f = l->proto->write; + fdtab[cfd].cb[DIR_WR].b = s->rep; + fdinfo[cfd].peeraddr = (struct sockaddr *)&s->cli_addr; + fdinfo[cfd].peerlen = sizeof(s->cli_addr); + EV_FD_SET(cfd, DIR_RD); + + /***************** to be moved to the TCP/HTTP frontend's accept() **************/ tv_zero(&s->logs.tv_request); s->logs.t_queue = -1; @@ -187,28 +257,24 @@ int frontend_accept(struct listener *l, int cfd, struct sockaddr_storage *addr) s->logs.prx_queue_size = 0; /* we get the number of pending conns before us */ s->logs.srv_queue_size = 0; /* we will get this number soon */ + s->data_state = DATA_ST_INIT; s->data_source = DATA_SRC_NONE; - txn = &s->txn; - /* Those variables will be checked and freed if non-NULL in - * session.c:session_free(). It is important that they are - * properly initialized. + /* FIXME: the logs are horribly complicated now, because they are + * defined in

,

, and later and . */ - txn->sessid = NULL; - txn->srv_cookie = NULL; - txn->cli_cookie = NULL; - txn->uri = NULL; - txn->req.cap = NULL; - txn->rsp.cap = NULL; - txn->hdr_idx.v = NULL; - txn->hdr_idx.size = txn->hdr_idx.used = 0; + if (s->logs.logwait & LW_REQ) + s->do_log = http_sess_log; + else + s->do_log = tcp_sess_log; + + /* default error reporting function, may be changed by analysers */ + s->srv_error = default_srv_error; /* Adjust some socket options */ - if (unlikely(fcntl(cfd, F_SETFL, O_NONBLOCK) == -1 || - setsockopt(cfd, IPPROTO_TCP, TCP_NODELAY, - (char *) &one, sizeof(one)) == -1)) { + if (unlikely(setsockopt(cfd, IPPROTO_TCP, TCP_NODELAY, (char *) &one, sizeof(one)) == -1)) { Alert("accept(): cannot set the socket in non blocking mode. Giving up\n"); - goto out_free_task; + goto out_delete_cfd; } if (p->options & PR_O_TCP_CLI_KA) @@ -227,11 +293,11 @@ int frontend_accept(struct listener *l, int cfd, struct sockaddr_storage *addr) /* the captures are only used in HTTP frontends */ if (unlikely(p->nb_req_cap > 0 && (txn->req.cap = pool_alloc2(p->req_cap_pool)) == NULL)) - goto out_fail_reqcap; /* no memory */ + goto out_delete_cfd; /* no memory */ if (unlikely(p->nb_rsp_cap > 0 && (txn->rsp.cap = pool_alloc2(p->rsp_cap_pool)) == NULL)) - goto out_fail_rspcap; /* no memory */ + goto out_free_reqcap; /* no memory */ } if (p->acl_requires & ACL_USE_L7_ANY) { @@ -242,7 +308,7 @@ int frontend_accept(struct listener *l, int cfd, struct sockaddr_storage *addr) txn->hdr_idx.size = MAX_HTTP_HDR; if (unlikely((txn->hdr_idx.v = pool_alloc2(p->hdr_idx_pool)) == NULL)) - goto out_fail_idx; /* no memory */ + goto out_free_rspcap; /* no memory */ /* and now initialize the HTTP transaction state */ http_init_txn(s); @@ -320,23 +386,9 @@ int frontend_accept(struct listener *l, int cfd, struct sockaddr_storage *addr) write(1, trash, len); } - if (unlikely((s->req = pool_alloc2(pool2_buffer)) == NULL)) - goto out_fail_req; /* no memory */ - - s->req->size = global.tune.bufsize; - buffer_init(s->req); - s->req->prod = &s->si[0]; - s->req->cons = &s->si[1]; - s->si[0].ib = s->si[1].ob = s->req; - - s->req->flags |= BF_READ_ATTACHED; /* the producer is already connected */ - if (p->mode == PR_MODE_HTTP) s->req->flags |= BF_READ_DONTWAIT; /* one read is usually enough */ - /* activate default analysers enabled for this listener */ - s->req->analysers = l->analysers; - /* note: this should not happen anymore since there's always at least the switching rules */ if (!s->req->analysers) { buffer_auto_connect(s->req); /* don't wait to establish connection */ @@ -344,43 +396,12 @@ int frontend_accept(struct listener *l, int cfd, struct sockaddr_storage *addr) } s->req->rto = s->fe->timeout.client; - s->req->wto = TICK_ETERNITY; - - if (unlikely((s->rep = pool_alloc2(pool2_buffer)) == NULL)) - goto out_fail_rep; /* no memory */ - - s->rep->size = global.tune.bufsize; - buffer_init(s->rep); - s->rep->prod = &s->si[1]; - s->rep->cons = &s->si[0]; - s->si[0].ob = s->si[1].ib = s->rep; - s->rep->analysers = 0; - - s->rep->rto = TICK_ETERNITY; s->rep->wto = s->fe->timeout.client; - s->req->rex = TICK_ETERNITY; - s->req->wex = TICK_ETERNITY; - s->req->analyse_exp = TICK_ETERNITY; - s->rep->rex = TICK_ETERNITY; - s->rep->wex = TICK_ETERNITY; - s->rep->analyse_exp = TICK_ETERNITY; - t->expire = TICK_ETERNITY; - - fd_insert(cfd); - fdtab[cfd].owner = &s->si[0]; - fdtab[cfd].state = FD_STREADY; fdtab[cfd].flags = FD_FL_TCP | FD_FL_TCP_NODELAY; if (p->options & PR_O_TCP_NOLING) fdtab[cfd].flags |= FD_FL_TCP_NOLING; - fdtab[cfd].cb[DIR_RD].f = l->proto->read; - fdtab[cfd].cb[DIR_RD].b = s->req; - fdtab[cfd].cb[DIR_WR].f = l->proto->write; - fdtab[cfd].cb[DIR_WR].b = s->rep; - fdinfo[cfd].peeraddr = (struct sockaddr *)&s->cli_addr; - fdinfo[cfd].peerlen = sizeof(s->cli_addr); - if (unlikely((p->mode == PR_MODE_HTTP && (s->flags & SN_MONITOR)) || (p->mode == PR_MODE_HEALTH && (p->options & PR_O_HTTP_CHK)))) { /* Either we got a request from a monitoring system on an HTTP instance, @@ -392,6 +413,7 @@ int frontend_accept(struct listener *l, int cfd, struct sockaddr_storage *addr) stream_int_retnclose(&s->si[0], &msg); /* forge a 200 response */ s->req->analysers = 0; t->expire = s->rep->wex; + EV_FD_CLR(cfd, DIR_RD); } else if (unlikely(p->mode == PR_MODE_HEALTH)) { /* health check mode, no client reading */ struct chunk msg; @@ -399,10 +421,10 @@ int frontend_accept(struct listener *l, int cfd, struct sockaddr_storage *addr) stream_int_retnclose(&s->si[0], &msg); /* forge an "OK" response */ s->req->analysers = 0; t->expire = s->rep->wex; + EV_FD_CLR(cfd, DIR_RD); } - else { - EV_FD_SET(cfd, DIR_RD); - } + + /**********************************************/ /* it is important not to call the wakeup function directly but to * pass through task_wakeup(), because this one knows how to apply @@ -413,15 +435,15 @@ int frontend_accept(struct listener *l, int cfd, struct sockaddr_storage *addr) return 1; /* Error unrolling */ - out_fail_rep: - pool_free2(pool2_buffer, s->req); - out_fail_req: - pool_free2(p->hdr_idx_pool, txn->hdr_idx.v); - out_fail_idx: + out_free_rspcap: pool_free2(p->rsp_cap_pool, txn->rsp.cap); - out_fail_rspcap: + out_free_reqcap: pool_free2(p->req_cap_pool, txn->req.cap); - out_fail_reqcap: + out_delete_cfd: + fd_delete(cfd); + pool_free2(pool2_buffer, s->rep); + out_free_req: + pool_free2(pool2_buffer, s->req); out_free_task: task_free(t); out_free_session: -- 2.47.3