From: Willy Tarreau Date: Mon, 7 May 2012 14:50:03 +0000 (+0200) Subject: REORG: stream_interface: create a struct sock_ops to hold socket operations X-Git-Tag: v1.5-dev9~17 X-Git-Url: http://git.ipfire.org/cgi-bin/gitweb.cgi?a=commitdiff_plain;h=060781fb4a7958d45e51e4331e3ea2f47ababa20;p=thirdparty%2Fhaproxy.git REORG: stream_interface: create a struct sock_ops to hold socket operations These operators are used regardless of the socket protocol family. Move them to a "sock_ops" struct. ->read and ->write have been moved there too as they have no reason to remain at the protocol level. --- diff --git a/include/types/stream_interface.h b/include/types/stream_interface.h index 271abc75c0..1dc5052caa 100644 --- a/include/types/stream_interface.h +++ b/include/types/stream_interface.h @@ -91,6 +91,7 @@ enum { struct server; struct proxy; struct si_applet; +struct stream_interface; struct target { int type; @@ -103,6 +104,16 @@ struct target { } ptr; }; +struct sock_ops { + void (*update)(struct stream_interface *); /* I/O update function */ + void (*shutr)(struct stream_interface *); /* shutr function */ + void (*shutw)(struct stream_interface *); /* shutw function */ + void (*chk_rcv)(struct stream_interface *); /* chk_rcv function */ + void (*chk_snd)(struct stream_interface *); /* chk_snd function */ + int (*read)(int fd); /* read callback after poll() */ + int (*write)(int fd); /* wrtie callback after poll() */ +}; + /* A stream interface has 3 parts : * - the buffer side, which interfaces to the buffers. * - the remote side, which describes the state and address of the other side. @@ -125,17 +136,14 @@ struct stream_interface { unsigned int err_type; /* first error detected, one of SI_ET_* */ void *err_loc; /* commonly the server, NULL when SI_ET_NONE */ - /* these struct members are used by the buffer side to act on the remote side */ - void (*update)(struct stream_interface *); /* I/O update function */ - void (*shutr)(struct stream_interface *); /* shutr function */ - void (*shutw)(struct stream_interface *); /* shutw function */ - void (*chk_rcv)(struct stream_interface *);/* chk_rcv function */ - void (*chk_snd)(struct stream_interface *);/* chk_snd function */ + struct sock_ops sock; /* socket level operations */ + int (*connect)(struct stream_interface *); /* connect function if any */ - void (*release)(struct stream_interface *); /* handler to call after the last close() */ int (*get_src)(int, struct sockaddr *, socklen_t *); /* syscall used to retrieve src addr */ int (*get_dst)(int, struct sockaddr *, socklen_t *); /* syscall used to retrieve dst addr */ + void (*release)(struct stream_interface *); /* handler to call after the last close() */ + /* struct members below are the "remote" part, as seen from the buffer side */ struct target target; /* the target to connect to (server, proxy, applet, ...) */ int conn_retries; /* number of connect retries left */ diff --git a/src/dumpstats.c b/src/dumpstats.c index cddbc3ed7b..465844f848 100644 --- a/src/dumpstats.c +++ b/src/dumpstats.c @@ -1376,7 +1376,7 @@ static void cli_io_handler(struct stream_interface *si) /* Let's close for real now. We just close the request * side, the conditions below will complete if needed. */ - si->shutw(si); + si->sock.shutw(si); break; } else if (si->applet.st0 == STAT_CLI_GETREQ) { @@ -1518,7 +1518,7 @@ static void cli_io_handler(struct stream_interface *si) * we forward the close to the request side so that it flows upstream to * the client. */ - si->shutw(si); + si->sock.shutw(si); } if ((req->flags & BF_SHUTW) && (si->state == SI_ST_EST) && (si->applet.st0 < STAT_CLI_OUTPUT)) { @@ -1528,12 +1528,12 @@ static void cli_io_handler(struct stream_interface *si) * the client side has closed. So we'll forward this state downstream * on the response buffer. */ - si->shutr(si); + si->sock.shutr(si); res->flags |= BF_READ_NULL; } /* update all other flags and resync with the other side */ - si->update(si); + si->sock.update(si); /* we don't want to expire timeouts while we're processing requests */ si->ib->rex = TICK_ETERNITY; @@ -1735,26 +1735,26 @@ static void http_stats_io_handler(struct stream_interface *si) if (s->txn.meth == HTTP_METH_POST) { if (stats_http_redir(si, s->be->uri_auth)) { si->applet.st0 = 1; - si->shutw(si); + si->sock.shutw(si); } } else { if (stats_dump_http(si, s->be->uri_auth)) { si->applet.st0 = 1; - si->shutw(si); + si->sock.shutw(si); } } } if ((res->flags & BF_SHUTR) && (si->state == SI_ST_EST)) - si->shutw(si); + si->sock.shutw(si); if ((req->flags & BF_SHUTW) && (si->state == SI_ST_EST) && si->applet.st0) { - si->shutr(si); + si->sock.shutr(si); res->flags |= BF_READ_NULL; } /* update all other flags and resync with the other side */ - si->update(si); + si->sock.update(si); /* we don't want to expire timeouts while we're processing requests */ si->ib->rex = TICK_ETERNITY; diff --git a/src/peers.c b/src/peers.c index ab6f7ed038..8c8795af27 100644 --- a/src/peers.c +++ b/src/peers.c @@ -1023,15 +1023,15 @@ incomplete: si->applet.st0 = PEER_SESSION_END; /* fall through */ case PEER_SESSION_END: { - si->shutw(si); - si->shutr(si); + si->sock.shutw(si); + si->sock.shutr(si); si->ib->flags |= BF_READ_NULL; goto quit; } } } out: - si->update(si); + si->sock.update(si); si->ob->flags |= BF_READ_DONTWAIT; /* we don't want to expire timeouts while we're processing requests */ si->ib->rex = TICK_ETERNITY; diff --git a/src/proto_http.c b/src/proto_http.c index 0fffad4d9d..2c1ff566cf 100644 --- a/src/proto_http.c +++ b/src/proto_http.c @@ -803,8 +803,8 @@ void perform_http_redirect(struct session *s, struct stream_interface *si) } /* prepare to return without error. */ - si->shutr(si); - si->shutw(si); + si->sock.shutr(si); + si->sock.shutw(si); si->err_type = SI_ET_NONE; si->err_loc = NULL; si->state = SI_ST_CLO; @@ -3713,8 +3713,8 @@ void http_end_txn_clean_session(struct session *s) http_silent_debug(__LINE__, s); s->req->cons->flags |= SI_FL_NOLINGER; - s->req->cons->shutr(s->req->cons); - s->req->cons->shutw(s->req->cons); + s->req->cons->sock.shutr(s->req->cons); + s->req->cons->sock.shutw(s->req->cons); http_silent_debug(__LINE__, s); diff --git a/src/session.c b/src/session.c index f8b651682c..fabba1f201 100644 --- a/src/session.c +++ b/src/session.c @@ -200,8 +200,8 @@ int session_accept(struct listener *l, int cfd, struct sockaddr_storage *addr) s->si[1].get_src = NULL; s->si[1].get_dst = NULL; clear_target(&s->si[1].target); - s->si[1].shutr = stream_int_shutr; - s->si[1].shutw = stream_int_shutw; + s->si[1].sock.shutr= stream_int_shutr; + s->si[1].sock.shutw= stream_int_shutw; s->si[1].exp = TICK_ETERNITY; s->si[1].flags = SI_FL_NONE; @@ -573,7 +573,7 @@ static int sess_update_st_con_tcp(struct session *s, struct stream_interface *si (((req->flags & (BF_OUT_EMPTY|BF_WRITE_ACTIVITY)) == BF_OUT_EMPTY) || s->be->options & PR_O_ABRT_CLOSE)))) { /* give up */ - si->shutw(si); + si->sock.shutw(si); si->err_type |= SI_ET_CONN_ABRT; si->err_loc = target_srv(&s->target); si->flags &= ~SI_FL_CAP_SPLICE; @@ -634,7 +634,7 @@ static int sess_update_st_cer(struct session *s, struct stream_interface *si) process_srv_queue(target_srv(&s->target)); /* shutw is enough so stop a connecting socket */ - si->shutw(si); + si->sock.shutw(si); si->ob->flags |= BF_WRITE_ERROR; si->ib->flags |= BF_READ_ERROR; @@ -773,8 +773,8 @@ static void sess_update_stream_int(struct session *s, struct stream_interface *s process_srv_queue(srv); /* Failed and not retryable. */ - si->shutr(si); - si->shutw(si); + si->sock.shutr(si); + si->sock.shutw(si); si->ob->flags |= BF_WRITE_ERROR; s->logs.t_queue = tv_ms_elapsed(&s->logs.tv_accept, &now); @@ -822,8 +822,8 @@ static void sess_update_stream_int(struct session *s, struct stream_interface *s if (srv) srv->counters.failed_conns++; s->be->be_counters.failed_conns++; - si->shutr(si); - si->shutw(si); + si->sock.shutr(si); + si->sock.shutw(si); si->ob->flags |= BF_WRITE_TIMEOUT; if (!si->err_type) si->err_type = SI_ET_QUEUE_TO; @@ -840,8 +840,8 @@ static void sess_update_stream_int(struct session *s, struct stream_interface *s /* give up */ si->exp = TICK_ETERNITY; s->logs.t_queue = tv_ms_elapsed(&s->logs.tv_accept, &now); - si->shutr(si); - si->shutw(si); + si->sock.shutr(si); + si->sock.shutw(si); si->err_type |= SI_ET_QUEUE_ABRT; si->state = SI_ST_CLO; if (s->srv_error) @@ -859,8 +859,8 @@ static void sess_update_stream_int(struct session *s, struct stream_interface *s (si->ob->flags & BF_OUT_EMPTY || s->be->options & PR_O_ABRT_CLOSE))) { /* give up */ si->exp = TICK_ETERNITY; - si->shutr(si); - si->shutw(si); + si->sock.shutr(si); + si->sock.shutw(si); si->err_type |= SI_ET_CONN_ABRT; si->state = SI_ST_CLO; if (s->srv_error) @@ -938,8 +938,8 @@ static void sess_prepare_conn_req(struct session *s, struct stream_interface *si return; /* we did not get any server, let's check the cause */ - si->shutr(si); - si->shutw(si); + si->sock.shutr(si); + si->sock.shutw(si); si->ob->flags |= BF_WRITE_ERROR; if (!si->err_type) si->err_type = SI_ET_CONN_OTHER; @@ -1352,21 +1352,21 @@ struct task *process_session(struct task *t) if (unlikely((s->req->flags & (BF_SHUTW|BF_WRITE_TIMEOUT)) == BF_WRITE_TIMEOUT)) { s->req->cons->flags |= SI_FL_NOLINGER; - s->req->cons->shutw(s->req->cons); + s->req->cons->sock.shutw(s->req->cons); } if (unlikely((s->req->flags & (BF_SHUTR|BF_READ_TIMEOUT)) == BF_READ_TIMEOUT)) - s->req->prod->shutr(s->req->prod); + s->req->prod->sock.shutr(s->req->prod); buffer_check_timeouts(s->rep); if (unlikely((s->rep->flags & (BF_SHUTW|BF_WRITE_TIMEOUT)) == BF_WRITE_TIMEOUT)) { s->rep->cons->flags |= SI_FL_NOLINGER; - s->rep->cons->shutw(s->rep->cons); + s->rep->cons->sock.shutw(s->rep->cons); } if (unlikely((s->rep->flags & (BF_SHUTR|BF_READ_TIMEOUT)) == BF_READ_TIMEOUT)) - s->rep->prod->shutr(s->rep->prod); + s->rep->prod->sock.shutr(s->rep->prod); } /* 1b: check for low-level errors reported at the stream interface. @@ -1379,8 +1379,8 @@ struct task *process_session(struct task *t) srv = target_srv(&s->target); if (unlikely(s->si[0].flags & SI_FL_ERR)) { if (s->si[0].state == SI_ST_EST || s->si[0].state == SI_ST_DIS) { - s->si[0].shutr(&s->si[0]); - s->si[0].shutw(&s->si[0]); + s->si[0].sock.shutr(&s->si[0]); + s->si[0].sock.shutw(&s->si[0]); stream_int_report_error(&s->si[0]); if (!(s->req->analysers) && !(s->rep->analysers)) { s->be->be_counters.cli_aborts++; @@ -1397,8 +1397,8 @@ struct task *process_session(struct task *t) if (unlikely(s->si[1].flags & SI_FL_ERR)) { if (s->si[1].state == SI_ST_EST || s->si[1].state == SI_ST_DIS) { - s->si[1].shutr(&s->si[1]); - s->si[1].shutw(&s->si[1]); + s->si[1].sock.shutr(&s->si[1]); + s->si[1].sock.shutw(&s->si[1]); stream_int_report_error(&s->si[1]); s->be->be_counters.failed_resp++; if (srv) @@ -1895,7 +1895,7 @@ struct task *process_session(struct task *t) /* shutdown(write) pending */ if (unlikely((s->req->flags & (BF_SHUTW|BF_SHUTW_NOW|BF_OUT_EMPTY)) == (BF_SHUTW_NOW|BF_OUT_EMPTY))) - s->req->cons->shutw(s->req->cons); + s->req->cons->sock.shutw(s->req->cons); /* shutdown(write) done on server side, we must stop the client too */ if (unlikely((s->req->flags & (BF_SHUTW|BF_SHUTR|BF_SHUTR_NOW)) == BF_SHUTW && @@ -1904,7 +1904,7 @@ struct task *process_session(struct task *t) /* shutdown(read) pending */ if (unlikely((s->req->flags & (BF_SHUTR|BF_SHUTR_NOW)) == BF_SHUTR_NOW)) - s->req->prod->shutr(s->req->prod); + s->req->prod->sock.shutr(s->req->prod); /* it's possible that an upper layer has requested a connection setup or abort. * There are 2 situations where we decide to establish a new connection : @@ -2026,7 +2026,7 @@ struct task *process_session(struct task *t) /* shutdown(write) pending */ if (unlikely((s->rep->flags & (BF_SHUTW|BF_OUT_EMPTY|BF_SHUTW_NOW)) == (BF_OUT_EMPTY|BF_SHUTW_NOW))) - s->rep->cons->shutw(s->rep->cons); + s->rep->cons->sock.shutw(s->rep->cons); /* shutdown(write) done on the client side, we must stop the server too */ if (unlikely((s->rep->flags & (BF_SHUTW|BF_SHUTR|BF_SHUTR_NOW)) == BF_SHUTW) && @@ -2035,7 +2035,7 @@ struct task *process_session(struct task *t) /* shutdown(read) pending */ if (unlikely((s->rep->flags & (BF_SHUTR|BF_SHUTR_NOW)) == BF_SHUTR_NOW)) - s->rep->prod->shutr(s->rep->prod); + s->rep->prod->sock.shutr(s->rep->prod); if (s->req->prod->state == SI_ST_DIS || s->req->cons->state == SI_ST_DIS) goto resync_stream_interface; @@ -2086,10 +2086,10 @@ struct task *process_session(struct task *t) session_process_counters(s); if (s->rep->cons->state == SI_ST_EST && s->rep->cons->target.type != TARG_TYPE_APPLET) - s->rep->cons->update(s->rep->cons); + s->rep->cons->sock.update(s->rep->cons); if (s->req->cons->state == SI_ST_EST && s->req->cons->target.type != TARG_TYPE_APPLET) - s->req->cons->update(s->req->cons); + s->req->cons->sock.update(s->req->cons); s->req->flags &= ~(BF_READ_NULL|BF_READ_PARTIAL|BF_WRITE_NULL|BF_WRITE_PARTIAL|BF_READ_ATTACHED); s->rep->flags &= ~(BF_READ_NULL|BF_READ_PARTIAL|BF_WRITE_NULL|BF_WRITE_PARTIAL|BF_READ_ATTACHED); diff --git a/src/stream_interface.c b/src/stream_interface.c index cea1a19b50..209762382e 100644 --- a/src/stream_interface.c +++ b/src/stream_interface.c @@ -108,7 +108,7 @@ void stream_int_update_embedded(struct stream_interface *si) return; if ((si->ob->flags & (BF_OUT_EMPTY|BF_SHUTW|BF_HIJACK|BF_SHUTW_NOW)) == (BF_OUT_EMPTY|BF_SHUTW_NOW)) - si->shutw(si); + si->sock.shutw(si); if ((si->ob->flags & (BF_FULL|BF_SHUTW|BF_SHUTW_NOW|BF_HIJACK)) == 0) si->flags |= SI_FL_WAIT_DATA; @@ -134,11 +134,11 @@ void stream_int_update_embedded(struct stream_interface *si) old_flags = si->flags; if (likely((si->ob->flags & (BF_SHUTW|BF_WRITE_PARTIAL|BF_FULL|BF_DONT_READ)) == BF_WRITE_PARTIAL && (si->ob->prod->flags & SI_FL_WAIT_ROOM))) - si->ob->prod->chk_rcv(si->ob->prod); + si->ob->prod->sock.chk_rcv(si->ob->prod); if (((si->ib->flags & (BF_READ_PARTIAL|BF_OUT_EMPTY)) == BF_READ_PARTIAL) && (si->ib->cons->flags & SI_FL_WAIT_DATA)) { - si->ib->cons->chk_snd(si->ib->cons); + si->ib->cons->sock.chk_snd(si->ib->cons); /* check if the consumer has freed some space */ if (!(si->ib->flags & BF_FULL)) si->flags &= ~SI_FL_WAIT_ROOM; @@ -308,11 +308,11 @@ struct task *stream_int_register_handler(struct stream_interface *si, struct si_ { DPRINTF(stderr, "registering handler %p for si %p (was %p)\n", app, si, si->owner); - si->update = stream_int_update_embedded; - si->shutr = stream_int_shutr; - si->shutw = stream_int_shutw; - si->chk_rcv = stream_int_chk_rcv; - si->chk_snd = stream_int_chk_snd; + si->sock.update = stream_int_update_embedded; + si->sock.shutr = stream_int_shutr; + si->sock.shutw = stream_int_shutw; + si->sock.chk_rcv = stream_int_chk_rcv; + si->sock.chk_snd = stream_int_chk_snd; si->connect = NULL; set_target_applet(&si->target, app); si->applet.state = 0; @@ -335,11 +335,11 @@ struct task *stream_int_register_handler_task(struct stream_interface *si, DPRINTF(stderr, "registering handler %p for si %p (was %p)\n", fct, si, si->owner); - si->update = stream_int_update; - si->shutr = stream_int_shutr; - si->shutw = stream_int_shutw; - si->chk_rcv = stream_int_chk_rcv; - si->chk_snd = stream_int_chk_snd; + si->sock.update = stream_int_update; + si->sock.shutr = stream_int_shutr; + si->sock.shutw = stream_int_shutw; + si->sock.chk_rcv = stream_int_chk_rcv; + si->sock.chk_snd = stream_int_chk_snd; si->connect = NULL; clear_target(&si->target); si->release = NULL; diff --git a/src/stream_sock.c b/src/stream_sock.c index 1b543d0070..3862ac8630 100644 --- a/src/stream_sock.c +++ b/src/stream_sock.c @@ -94,7 +94,7 @@ static int stream_sock_splice_in(struct buffer *b, struct stream_interface *si) si->flags |= SI_FL_WAIT_ROOM; EV_FD_CLR(fd, DIR_RD); b->rex = TICK_ETERNITY; - b->cons->chk_snd(b->cons); + b->cons->sock.chk_snd(b->cons); return 1; } @@ -443,7 +443,7 @@ int stream_sock_read(int fd) { (b->i == 0 && (b->cons->flags & SI_FL_WAIT_DATA))) { int last_len = b->pipe ? b->pipe->data : 0; - b->cons->chk_snd(b->cons); + b->cons->sock.chk_snd(b->cons); /* check if the consumer has freed some space */ if (!(b->flags & BF_FULL) && @@ -790,7 +790,7 @@ int stream_sock_write(int fd) /* the producer might be waiting for more room to store data */ if (likely((b->flags & (BF_SHUTW|BF_WRITE_PARTIAL|BF_FULL|BF_DONT_READ)) == BF_WRITE_PARTIAL && (b->prod->flags & SI_FL_WAIT_ROOM))) - b->prod->chk_rcv(b->prod); + b->prod->sock.chk_rcv(b->prod); /* we have to wake up if there is a special event or if we don't have * any more data to forward and it's not planned to send any more. @@ -1300,11 +1300,11 @@ int stream_sock_accept(int fd) /* Prepare a stream interface to be used in socket mode. */ void stream_sock_prepare_interface(struct stream_interface *si) { - si->update = stream_sock_data_finish; - si->shutr = stream_sock_shutr; - si->shutw = stream_sock_shutw; - si->chk_rcv = stream_sock_chk_rcv; - si->chk_snd = stream_sock_chk_snd; + si->sock.update = stream_sock_data_finish; + si->sock.shutr = stream_sock_shutr; + si->sock.shutw = stream_sock_shutw; + si->sock.chk_rcv = stream_sock_chk_rcv; + si->sock.chk_snd = stream_sock_chk_snd; }