]> git.ipfire.org Git - thirdparty/haproxy.git/commitdiff
MINOR: stream_interface: Give stream_interface its own wait_list.
authorOlivier Houchard <ohouchard@haproxy.com>
Thu, 2 Aug 2018 16:21:38 +0000 (18:21 +0200)
committerWilly Tarreau <w@1wt.eu>
Thu, 16 Aug 2018 15:29:54 +0000 (17:29 +0200)
Instead of just using the conn_stream wait_list, give the stream_interface
its own. When the conn_stream will have its own buffers, the stream_interface
may have to wait on it.

include/proto/stream_interface.h
include/types/stream_interface.h
src/stream.c
src/stream_interface.c

index a102ac17613996870be854ad074d4fb4f4afd88e..a9dca8253893993dfea8a8765e5d0c20ddc4af51 100644 (file)
@@ -116,7 +116,7 @@ static inline struct stream_interface *si_opposite(struct stream_interface *si)
  * any endpoint and only keeps its side which is expected to have already been
  * set.
  */
-static inline void si_reset(struct stream_interface *si)
+static inline int si_reset(struct stream_interface *si)
 {
        si->err_type       = SI_ET_NONE;
        si->conn_retries   = 0;  /* used for logging too */
@@ -125,6 +125,14 @@ static inline void si_reset(struct stream_interface *si)
        si->end            = NULL;
        si->state          = si->prev_state = SI_ST_INI;
        si->ops            = &si_embedded_ops;
+       si->wait_list.task = tasklet_new();
+       if (!si->wait_list.task)
+               return -1;
+       si->wait_list.task->process    = si_cs_io_cb;
+       si->wait_list.task->context = si;
+       si->wait_list.wait_reason = 0;
+       LIST_INIT(&si->wait_list.list);
+       return 0;
 }
 
 /* sets the current and previous state of a stream interface to <state>. This
index 0c83759da2051bb19dded54dfb5c6cd2591ccb9c..eae192666a2fa771319b936e2a95e7cf41455dcb 100644 (file)
@@ -101,6 +101,7 @@ struct stream_interface {
        unsigned int err_type;  /* first error detected, one of SI_ET_* */
        int conn_retries;       /* number of connect retries left */
        unsigned int hcto;      /* half-closed timeout (0 = unset) */
+       struct wait_list wait_list; /* We're in a wait list */
 };
 
 /* operations available on a stream-interface */
index 9c427d4c9da6f2c8a2be488eb659dd1f78e2e02c..bc0f1ac7088f5d683589b9387b261f3929f81b5d 100644 (file)
@@ -192,7 +192,8 @@ struct stream *stream_new(struct session *sess, enum obj_type *origin)
        vars_init(&s->vars_reqres, SCOPE_REQ);
 
        /* this part should be common with other protocols */
-       si_reset(&s->si[0]);
+       if (si_reset(&s->si[0]) < 0)
+               goto out_fail_alloc;
        si_set_state(&s->si[0], SI_ST_EST);
        s->si[0].hcto = sess->fe->timeout.clientfin;
 
@@ -211,7 +212,8 @@ struct stream *stream_new(struct session *sess, enum obj_type *origin)
        /* pre-initialize the other side's stream interface to an INIT state. The
         * callbacks will be initialized before attempting to connect.
         */
-       si_reset(&s->si[1]);
+       if (si_reset(&s->si[1]) < 0)
+               goto out_fail_alloc_si1;
        s->si[1].hcto = TICK_ETERNITY;
 
        if (likely(sess->fe->options2 & PR_O2_INDEPSTR))
@@ -288,6 +290,9 @@ struct stream *stream_new(struct session *sess, enum obj_type *origin)
  out_fail_accept:
        flt_stream_release(s, 0);
        task_free(t);
+       tasklet_free(s->si[1].wait_list.task);
+out_fail_alloc_si1:
+       tasklet_free(s->si[0].wait_list.task);
  out_fail_alloc:
        LIST_DEL(&s->list);
        pool_free(pool_head_stream, s);
@@ -403,6 +408,8 @@ static void stream_free(struct stream *s)
        if (must_free_sess)
                session_free(sess);
 
+       tasklet_free(s->si[0].wait_list.task);
+       tasklet_free(s->si[1].wait_list.task);
        pool_free(pool_head_stream, s);
 
        /* We may want to free the maximum amount of pools if the proxy is stopping */
index 290459ab31856339dbb11b248630fff8a68b7519..4b5b760c853b488dc96db36a7cebd668ce00cdff 100644 (file)
@@ -651,7 +651,7 @@ static struct task * si_cs_send(struct conn_stream *cs)
        int did_send = 0;
 
        /* We're already waiting to be able to send, give up */
-       if (cs->wait_list.wait_reason & SUB_CAN_SEND)
+       if (si->wait_list.wait_reason & SUB_CAN_SEND)
                return NULL;
 
        if (conn->flags & CO_FL_ERROR || cs->flags & CS_FL_ERROR)
@@ -660,7 +660,7 @@ static struct task * si_cs_send(struct conn_stream *cs)
        if (conn->flags & CO_FL_HANDSHAKE) {
                /* a handshake was requested */
                /* Schedule ourself to be woken up once the handshake is done */
-               conn->xprt->subscribe(conn, SUB_CAN_SEND,  wl_set_waitcb(&cs->wait_list, si_cs_io_cb, cs));
+               conn->xprt->subscribe(conn, SUB_CAN_SEND,  &si->wait_list);
                return NULL;
        }
 
@@ -740,7 +740,7 @@ static struct task * si_cs_send(struct conn_stream *cs)
        }
        /* We couldn't send all of our data, let the mux know we'd like to send more */
        if (co_data(oc))
-               conn->mux->subscribe(cs, SUB_CAN_SEND, wl_set_waitcb(&cs->wait_list, si_cs_io_cb, cs));
+               conn->mux->subscribe(cs, SUB_CAN_SEND, &si->wait_list);
 
 wake_others:
        /* Maybe somebody was waiting for this conn_stream, wake them */
@@ -759,7 +759,9 @@ wake_others:
 
 struct task *si_cs_io_cb(struct task *t, void *ctx, unsigned short state)
 {
-       si_cs_send(ctx);
+       struct stream_interface *si = ctx;
+       if (!(si->wait_list.wait_reason & SUB_CAN_SEND))
+               si_cs_send(__objt_cs(si->end));
        return (NULL);
 }