]> git.ipfire.org Git - thirdparty/haproxy.git/commitdiff
REORG: stream_interface: create a struct sock_ops to hold socket operations
authorWilly Tarreau <w@1wt.eu>
Mon, 7 May 2012 14:50:03 +0000 (16:50 +0200)
committerWilly Tarreau <w@1wt.eu>
Tue, 8 May 2012 19:28:14 +0000 (21:28 +0200)
These operators are used regardless of the socket protocol family. Move
them to a "sock_ops" struct. ->read and ->write have been moved there too
as they have no reason to remain at the protocol level.

include/types/stream_interface.h
src/dumpstats.c
src/peers.c
src/proto_http.c
src/session.c
src/stream_interface.c
src/stream_sock.c

index 271abc75c0ddf66f11080aef9dd96a03758edc81..1dc5052caab59ec5f6b1bdf917ae1a44979b6db9 100644 (file)
@@ -91,6 +91,7 @@ enum {
 struct server;
 struct proxy;
 struct si_applet;
+struct stream_interface;
 
 struct target {
        int type;
@@ -103,6 +104,16 @@ struct target {
        } ptr;
 };
 
+struct sock_ops {
+       void (*update)(struct stream_interface *);  /* I/O update function */
+       void (*shutr)(struct stream_interface *);   /* shutr function */
+       void (*shutw)(struct stream_interface *);   /* shutw function */
+       void (*chk_rcv)(struct stream_interface *); /* chk_rcv function */
+       void (*chk_snd)(struct stream_interface *); /* chk_snd function */
+       int (*read)(int fd);                        /* read callback after poll() */
+       int (*write)(int fd);                       /* wrtie callback after poll() */
+};
+
 /* A stream interface has 3 parts :
  *  - the buffer side, which interfaces to the buffers.
  *  - the remote side, which describes the state and address of the other side.
@@ -125,17 +136,14 @@ struct stream_interface {
        unsigned int err_type;  /* first error detected, one of SI_ET_* */
        void *err_loc;          /* commonly the server, NULL when SI_ET_NONE */
 
-       /* these struct members are used by the buffer side to act on the remote side */
-       void (*update)(struct stream_interface *); /* I/O update function */
-       void (*shutr)(struct stream_interface *);  /* shutr function */
-       void (*shutw)(struct stream_interface *);  /* shutw function */
-       void (*chk_rcv)(struct stream_interface *);/* chk_rcv function */
-       void (*chk_snd)(struct stream_interface *);/* chk_snd function */
+       struct sock_ops sock;   /* socket level operations */
+
        int  (*connect)(struct stream_interface *); /* connect function if any */
-       void (*release)(struct stream_interface *); /* handler to call after the last close() */
        int (*get_src)(int, struct sockaddr *, socklen_t *); /* syscall used to retrieve src addr */
        int (*get_dst)(int, struct sockaddr *, socklen_t *); /* syscall used to retrieve dst addr */
 
+       void (*release)(struct stream_interface *); /* handler to call after the last close() */
+
        /* struct members below are the "remote" part, as seen from the buffer side */
        struct target target;   /* the target to connect to (server, proxy, applet, ...) */
        int conn_retries;       /* number of connect retries left */
index cddbc3ed7b8a17374c4de016dad87059467c287b..465844f8483cc794261a167ab0328308229da50a 100644 (file)
@@ -1376,7 +1376,7 @@ static void cli_io_handler(struct stream_interface *si)
                        /* Let's close for real now. We just close the request
                         * side, the conditions below will complete if needed.
                         */
-                       si->shutw(si);
+                       si->sock.shutw(si);
                        break;
                }
                else if (si->applet.st0 == STAT_CLI_GETREQ) {
@@ -1518,7 +1518,7 @@ static void cli_io_handler(struct stream_interface *si)
                 * we forward the close to the request side so that it flows upstream to
                 * the client.
                 */
-               si->shutw(si);
+               si->sock.shutw(si);
        }
 
        if ((req->flags & BF_SHUTW) && (si->state == SI_ST_EST) && (si->applet.st0 < STAT_CLI_OUTPUT)) {
@@ -1528,12 +1528,12 @@ static void cli_io_handler(struct stream_interface *si)
                 * the client side has closed. So we'll forward this state downstream
                 * on the response buffer.
                 */
-               si->shutr(si);
+               si->sock.shutr(si);
                res->flags |= BF_READ_NULL;
        }
 
        /* update all other flags and resync with the other side */
-       si->update(si);
+       si->sock.update(si);
 
        /* we don't want to expire timeouts while we're processing requests */
        si->ib->rex = TICK_ETERNITY;
@@ -1735,26 +1735,26 @@ static void http_stats_io_handler(struct stream_interface *si)
                if (s->txn.meth == HTTP_METH_POST) {
                        if (stats_http_redir(si, s->be->uri_auth)) {
                                si->applet.st0 = 1;
-                               si->shutw(si);
+                               si->sock.shutw(si);
                        }
                } else {
                        if (stats_dump_http(si, s->be->uri_auth)) {
                                si->applet.st0 = 1;
-                               si->shutw(si);
+                               si->sock.shutw(si);
                        }
                }
        }
 
        if ((res->flags & BF_SHUTR) && (si->state == SI_ST_EST))
-               si->shutw(si);
+               si->sock.shutw(si);
 
        if ((req->flags & BF_SHUTW) && (si->state == SI_ST_EST) && si->applet.st0) {
-               si->shutr(si);
+               si->sock.shutr(si);
                res->flags |= BF_READ_NULL;
        }
 
        /* update all other flags and resync with the other side */
-       si->update(si);
+       si->sock.update(si);
 
        /* we don't want to expire timeouts while we're processing requests */
        si->ib->rex = TICK_ETERNITY;
index ab6f7ed0384d7a5c74eeabd9036d9d6682d8cc47..8c8795af2721de3f23fb17fd9e1b8a0dd23a5d1c 100644 (file)
@@ -1023,15 +1023,15 @@ incomplete:
                                si->applet.st0 = PEER_SESSION_END;
                                /* fall through */
                        case PEER_SESSION_END: {
-                               si->shutw(si);
-                               si->shutr(si);
+                               si->sock.shutw(si);
+                               si->sock.shutr(si);
                                si->ib->flags |= BF_READ_NULL;
                                goto quit;
                        }
                }
        }
 out:
-       si->update(si);
+       si->sock.update(si);
        si->ob->flags |= BF_READ_DONTWAIT;
        /* we don't want to expire timeouts while we're processing requests */
        si->ib->rex = TICK_ETERNITY;
index 0fffad4d9d21c0a687dfd2551ac2206038318faa..2c1ff566cf569abbb95a907797f23161144288f0 100644 (file)
@@ -803,8 +803,8 @@ void perform_http_redirect(struct session *s, struct stream_interface *si)
        }
 
        /* prepare to return without error. */
-       si->shutr(si);
-       si->shutw(si);
+       si->sock.shutr(si);
+       si->sock.shutw(si);
        si->err_type = SI_ET_NONE;
        si->err_loc  = NULL;
        si->state    = SI_ST_CLO;
@@ -3713,8 +3713,8 @@ void http_end_txn_clean_session(struct session *s)
        http_silent_debug(__LINE__, s);
 
        s->req->cons->flags |= SI_FL_NOLINGER;
-       s->req->cons->shutr(s->req->cons);
-       s->req->cons->shutw(s->req->cons);
+       s->req->cons->sock.shutr(s->req->cons);
+       s->req->cons->sock.shutw(s->req->cons);
 
        http_silent_debug(__LINE__, s);
 
index f8b651682ca48f02e62304b318a12a560c7b3a49..fabba1f2016590875164bd26234d0961602c8e0d 100644 (file)
@@ -200,8 +200,8 @@ int session_accept(struct listener *l, int cfd, struct sockaddr_storage *addr)
        s->si[1].get_src   = NULL;
        s->si[1].get_dst   = NULL;
        clear_target(&s->si[1].target);
-       s->si[1].shutr     = stream_int_shutr;
-       s->si[1].shutw     = stream_int_shutw;
+       s->si[1].sock.shutr= stream_int_shutr;
+       s->si[1].sock.shutw= stream_int_shutw;
        s->si[1].exp       = TICK_ETERNITY;
        s->si[1].flags     = SI_FL_NONE;
 
@@ -573,7 +573,7 @@ static int sess_update_st_con_tcp(struct session *s, struct stream_interface *si
                      (((req->flags & (BF_OUT_EMPTY|BF_WRITE_ACTIVITY)) == BF_OUT_EMPTY) ||
                       s->be->options & PR_O_ABRT_CLOSE)))) {
                /* give up */
-               si->shutw(si);
+               si->sock.shutw(si);
                si->err_type |= SI_ET_CONN_ABRT;
                si->err_loc  = target_srv(&s->target);
                si->flags &= ~SI_FL_CAP_SPLICE;
@@ -634,7 +634,7 @@ static int sess_update_st_cer(struct session *s, struct stream_interface *si)
                        process_srv_queue(target_srv(&s->target));
 
                /* shutw is enough so stop a connecting socket */
-               si->shutw(si);
+               si->sock.shutw(si);
                si->ob->flags |= BF_WRITE_ERROR;
                si->ib->flags |= BF_READ_ERROR;
 
@@ -773,8 +773,8 @@ static void sess_update_stream_int(struct session *s, struct stream_interface *s
                                process_srv_queue(srv);
 
                        /* Failed and not retryable. */
-                       si->shutr(si);
-                       si->shutw(si);
+                       si->sock.shutr(si);
+                       si->sock.shutw(si);
                        si->ob->flags |= BF_WRITE_ERROR;
 
                        s->logs.t_queue = tv_ms_elapsed(&s->logs.tv_accept, &now);
@@ -822,8 +822,8 @@ static void sess_update_stream_int(struct session *s, struct stream_interface *s
                        if (srv)
                                srv->counters.failed_conns++;
                        s->be->be_counters.failed_conns++;
-                       si->shutr(si);
-                       si->shutw(si);
+                       si->sock.shutr(si);
+                       si->sock.shutw(si);
                        si->ob->flags |= BF_WRITE_TIMEOUT;
                        if (!si->err_type)
                                si->err_type = SI_ET_QUEUE_TO;
@@ -840,8 +840,8 @@ static void sess_update_stream_int(struct session *s, struct stream_interface *s
                        /* give up */
                        si->exp = TICK_ETERNITY;
                        s->logs.t_queue = tv_ms_elapsed(&s->logs.tv_accept, &now);
-                       si->shutr(si);
-                       si->shutw(si);
+                       si->sock.shutr(si);
+                       si->sock.shutw(si);
                        si->err_type |= SI_ET_QUEUE_ABRT;
                        si->state = SI_ST_CLO;
                        if (s->srv_error)
@@ -859,8 +859,8 @@ static void sess_update_stream_int(struct session *s, struct stream_interface *s
                     (si->ob->flags & BF_OUT_EMPTY || s->be->options & PR_O_ABRT_CLOSE))) {
                        /* give up */
                        si->exp = TICK_ETERNITY;
-                       si->shutr(si);
-                       si->shutw(si);
+                       si->sock.shutr(si);
+                       si->sock.shutw(si);
                        si->err_type |= SI_ET_CONN_ABRT;
                        si->state = SI_ST_CLO;
                        if (s->srv_error)
@@ -938,8 +938,8 @@ static void sess_prepare_conn_req(struct session *s, struct stream_interface *si
                        return;
 
                /* we did not get any server, let's check the cause */
-               si->shutr(si);
-               si->shutw(si);
+               si->sock.shutr(si);
+               si->sock.shutw(si);
                si->ob->flags |= BF_WRITE_ERROR;
                if (!si->err_type)
                        si->err_type = SI_ET_CONN_OTHER;
@@ -1352,21 +1352,21 @@ struct task *process_session(struct task *t)
 
                if (unlikely((s->req->flags & (BF_SHUTW|BF_WRITE_TIMEOUT)) == BF_WRITE_TIMEOUT)) {
                        s->req->cons->flags |= SI_FL_NOLINGER;
-                       s->req->cons->shutw(s->req->cons);
+                       s->req->cons->sock.shutw(s->req->cons);
                }
 
                if (unlikely((s->req->flags & (BF_SHUTR|BF_READ_TIMEOUT)) == BF_READ_TIMEOUT))
-                       s->req->prod->shutr(s->req->prod);
+                       s->req->prod->sock.shutr(s->req->prod);
 
                buffer_check_timeouts(s->rep);
 
                if (unlikely((s->rep->flags & (BF_SHUTW|BF_WRITE_TIMEOUT)) == BF_WRITE_TIMEOUT)) {
                        s->rep->cons->flags |= SI_FL_NOLINGER;
-                       s->rep->cons->shutw(s->rep->cons);
+                       s->rep->cons->sock.shutw(s->rep->cons);
                }
 
                if (unlikely((s->rep->flags & (BF_SHUTR|BF_READ_TIMEOUT)) == BF_READ_TIMEOUT))
-                       s->rep->prod->shutr(s->rep->prod);
+                       s->rep->prod->sock.shutr(s->rep->prod);
        }
 
        /* 1b: check for low-level errors reported at the stream interface.
@@ -1379,8 +1379,8 @@ struct task *process_session(struct task *t)
        srv = target_srv(&s->target);
        if (unlikely(s->si[0].flags & SI_FL_ERR)) {
                if (s->si[0].state == SI_ST_EST || s->si[0].state == SI_ST_DIS) {
-                       s->si[0].shutr(&s->si[0]);
-                       s->si[0].shutw(&s->si[0]);
+                       s->si[0].sock.shutr(&s->si[0]);
+                       s->si[0].sock.shutw(&s->si[0]);
                        stream_int_report_error(&s->si[0]);
                        if (!(s->req->analysers) && !(s->rep->analysers)) {
                                s->be->be_counters.cli_aborts++;
@@ -1397,8 +1397,8 @@ struct task *process_session(struct task *t)
 
        if (unlikely(s->si[1].flags & SI_FL_ERR)) {
                if (s->si[1].state == SI_ST_EST || s->si[1].state == SI_ST_DIS) {
-                       s->si[1].shutr(&s->si[1]);
-                       s->si[1].shutw(&s->si[1]);
+                       s->si[1].sock.shutr(&s->si[1]);
+                       s->si[1].sock.shutw(&s->si[1]);
                        stream_int_report_error(&s->si[1]);
                        s->be->be_counters.failed_resp++;
                        if (srv)
@@ -1895,7 +1895,7 @@ struct task *process_session(struct task *t)
 
        /* shutdown(write) pending */
        if (unlikely((s->req->flags & (BF_SHUTW|BF_SHUTW_NOW|BF_OUT_EMPTY)) == (BF_SHUTW_NOW|BF_OUT_EMPTY)))
-               s->req->cons->shutw(s->req->cons);
+               s->req->cons->sock.shutw(s->req->cons);
 
        /* shutdown(write) done on server side, we must stop the client too */
        if (unlikely((s->req->flags & (BF_SHUTW|BF_SHUTR|BF_SHUTR_NOW)) == BF_SHUTW &&
@@ -1904,7 +1904,7 @@ struct task *process_session(struct task *t)
 
        /* shutdown(read) pending */
        if (unlikely((s->req->flags & (BF_SHUTR|BF_SHUTR_NOW)) == BF_SHUTR_NOW))
-               s->req->prod->shutr(s->req->prod);
+               s->req->prod->sock.shutr(s->req->prod);
 
        /* it's possible that an upper layer has requested a connection setup or abort.
         * There are 2 situations where we decide to establish a new connection :
@@ -2026,7 +2026,7 @@ struct task *process_session(struct task *t)
 
        /* shutdown(write) pending */
        if (unlikely((s->rep->flags & (BF_SHUTW|BF_OUT_EMPTY|BF_SHUTW_NOW)) == (BF_OUT_EMPTY|BF_SHUTW_NOW)))
-               s->rep->cons->shutw(s->rep->cons);
+               s->rep->cons->sock.shutw(s->rep->cons);
 
        /* shutdown(write) done on the client side, we must stop the server too */
        if (unlikely((s->rep->flags & (BF_SHUTW|BF_SHUTR|BF_SHUTR_NOW)) == BF_SHUTW) &&
@@ -2035,7 +2035,7 @@ struct task *process_session(struct task *t)
 
        /* shutdown(read) pending */
        if (unlikely((s->rep->flags & (BF_SHUTR|BF_SHUTR_NOW)) == BF_SHUTR_NOW))
-               s->rep->prod->shutr(s->rep->prod);
+               s->rep->prod->sock.shutr(s->rep->prod);
 
        if (s->req->prod->state == SI_ST_DIS || s->req->cons->state == SI_ST_DIS)
                goto resync_stream_interface;
@@ -2086,10 +2086,10 @@ struct task *process_session(struct task *t)
                        session_process_counters(s);
 
                if (s->rep->cons->state == SI_ST_EST && s->rep->cons->target.type != TARG_TYPE_APPLET)
-                       s->rep->cons->update(s->rep->cons);
+                       s->rep->cons->sock.update(s->rep->cons);
 
                if (s->req->cons->state == SI_ST_EST && s->req->cons->target.type != TARG_TYPE_APPLET)
-                       s->req->cons->update(s->req->cons);
+                       s->req->cons->sock.update(s->req->cons);
 
                s->req->flags &= ~(BF_READ_NULL|BF_READ_PARTIAL|BF_WRITE_NULL|BF_WRITE_PARTIAL|BF_READ_ATTACHED);
                s->rep->flags &= ~(BF_READ_NULL|BF_READ_PARTIAL|BF_WRITE_NULL|BF_WRITE_PARTIAL|BF_READ_ATTACHED);
index cea1a19b50cb392b7733e3d3d21dc31bfb40fc61..209762382ee2e31df20d15c2041a5b72694ce27f 100644 (file)
@@ -108,7 +108,7 @@ void stream_int_update_embedded(struct stream_interface *si)
                return;
 
        if ((si->ob->flags & (BF_OUT_EMPTY|BF_SHUTW|BF_HIJACK|BF_SHUTW_NOW)) == (BF_OUT_EMPTY|BF_SHUTW_NOW))
-               si->shutw(si);
+               si->sock.shutw(si);
 
        if ((si->ob->flags & (BF_FULL|BF_SHUTW|BF_SHUTW_NOW|BF_HIJACK)) == 0)
                si->flags |= SI_FL_WAIT_DATA;
@@ -134,11 +134,11 @@ void stream_int_update_embedded(struct stream_interface *si)
        old_flags = si->flags;
        if (likely((si->ob->flags & (BF_SHUTW|BF_WRITE_PARTIAL|BF_FULL|BF_DONT_READ)) == BF_WRITE_PARTIAL &&
                   (si->ob->prod->flags & SI_FL_WAIT_ROOM)))
-               si->ob->prod->chk_rcv(si->ob->prod);
+               si->ob->prod->sock.chk_rcv(si->ob->prod);
 
        if (((si->ib->flags & (BF_READ_PARTIAL|BF_OUT_EMPTY)) == BF_READ_PARTIAL) &&
            (si->ib->cons->flags & SI_FL_WAIT_DATA)) {
-               si->ib->cons->chk_snd(si->ib->cons);
+               si->ib->cons->sock.chk_snd(si->ib->cons);
                /* check if the consumer has freed some space */
                if (!(si->ib->flags & BF_FULL))
                        si->flags &= ~SI_FL_WAIT_ROOM;
@@ -308,11 +308,11 @@ struct task *stream_int_register_handler(struct stream_interface *si, struct si_
 {
        DPRINTF(stderr, "registering handler %p for si %p (was %p)\n", app, si, si->owner);
 
-       si->update  = stream_int_update_embedded;
-       si->shutr   = stream_int_shutr;
-       si->shutw   = stream_int_shutw;
-       si->chk_rcv = stream_int_chk_rcv;
-       si->chk_snd = stream_int_chk_snd;
+       si->sock.update  = stream_int_update_embedded;
+       si->sock.shutr   = stream_int_shutr;
+       si->sock.shutw   = stream_int_shutw;
+       si->sock.chk_rcv = stream_int_chk_rcv;
+       si->sock.chk_snd = stream_int_chk_snd;
        si->connect = NULL;
        set_target_applet(&si->target, app);
        si->applet.state = 0;
@@ -335,11 +335,11 @@ struct task *stream_int_register_handler_task(struct stream_interface *si,
 
        DPRINTF(stderr, "registering handler %p for si %p (was %p)\n", fct, si, si->owner);
 
-       si->update  = stream_int_update;
-       si->shutr   = stream_int_shutr;
-       si->shutw   = stream_int_shutw;
-       si->chk_rcv = stream_int_chk_rcv;
-       si->chk_snd = stream_int_chk_snd;
+       si->sock.update  = stream_int_update;
+       si->sock.shutr   = stream_int_shutr;
+       si->sock.shutw   = stream_int_shutw;
+       si->sock.chk_rcv = stream_int_chk_rcv;
+       si->sock.chk_snd = stream_int_chk_snd;
        si->connect = NULL;
        clear_target(&si->target);
        si->release   = NULL;
index 1b543d00701086662bd5f09d47c8aa10e67a034b..3862ac8630480552bf220f993eba62d962937ae4 100644 (file)
@@ -94,7 +94,7 @@ static int stream_sock_splice_in(struct buffer *b, struct stream_interface *si)
                si->flags |= SI_FL_WAIT_ROOM;
                EV_FD_CLR(fd, DIR_RD);
                b->rex = TICK_ETERNITY;
-               b->cons->chk_snd(b->cons);
+               b->cons->sock.chk_snd(b->cons);
                return 1;
        }
 
@@ -443,7 +443,7 @@ int stream_sock_read(int fd) {
            (b->i == 0 && (b->cons->flags & SI_FL_WAIT_DATA))) {
                int last_len = b->pipe ? b->pipe->data : 0;
 
-               b->cons->chk_snd(b->cons);
+               b->cons->sock.chk_snd(b->cons);
 
                /* check if the consumer has freed some space */
                if (!(b->flags & BF_FULL) &&
@@ -790,7 +790,7 @@ int stream_sock_write(int fd)
                /* the producer might be waiting for more room to store data */
                if (likely((b->flags & (BF_SHUTW|BF_WRITE_PARTIAL|BF_FULL|BF_DONT_READ)) == BF_WRITE_PARTIAL &&
                           (b->prod->flags & SI_FL_WAIT_ROOM)))
-                       b->prod->chk_rcv(b->prod);
+                       b->prod->sock.chk_rcv(b->prod);
 
                /* we have to wake up if there is a special event or if we don't have
                 * any more data to forward and it's not planned to send any more.
@@ -1300,11 +1300,11 @@ int stream_sock_accept(int fd)
 /* Prepare a stream interface to be used in socket mode. */
 void stream_sock_prepare_interface(struct stream_interface *si)
 {
-       si->update = stream_sock_data_finish;
-       si->shutr = stream_sock_shutr;
-       si->shutw = stream_sock_shutw;
-       si->chk_rcv = stream_sock_chk_rcv;
-       si->chk_snd = stream_sock_chk_snd;
+       si->sock.update  = stream_sock_data_finish;
+       si->sock.shutr   = stream_sock_shutr;
+       si->sock.shutw   = stream_sock_shutw;
+       si->sock.chk_rcv = stream_sock_chk_rcv;
+       si->sock.chk_snd = stream_sock_chk_snd;
 }