From: Olivier Houchard Date: Thu, 2 Aug 2018 16:21:38 +0000 (+0200) Subject: MINOR: stream_interface: Give stream_interface its own wait_list. X-Git-Tag: v1.9-dev2~159 X-Git-Url: http://git.ipfire.org/cgi-bin/gitweb.cgi?a=commitdiff_plain;h=8f0b4c66f59156a73cf32fc587f6e8c693956a1d;p=thirdparty%2Fhaproxy.git MINOR: stream_interface: Give stream_interface its own wait_list. Instead of just using the conn_stream wait_list, give the stream_interface its own. When the conn_stream will have its own buffers, the stream_interface may have to wait on it. --- diff --git a/include/proto/stream_interface.h b/include/proto/stream_interface.h index a102ac1761..a9dca82538 100644 --- a/include/proto/stream_interface.h +++ b/include/proto/stream_interface.h @@ -116,7 +116,7 @@ static inline struct stream_interface *si_opposite(struct stream_interface *si) * any endpoint and only keeps its side which is expected to have already been * set. */ -static inline void si_reset(struct stream_interface *si) +static inline int si_reset(struct stream_interface *si) { si->err_type = SI_ET_NONE; si->conn_retries = 0; /* used for logging too */ @@ -125,6 +125,14 @@ static inline void si_reset(struct stream_interface *si) si->end = NULL; si->state = si->prev_state = SI_ST_INI; si->ops = &si_embedded_ops; + si->wait_list.task = tasklet_new(); + if (!si->wait_list.task) + return -1; + si->wait_list.task->process = si_cs_io_cb; + si->wait_list.task->context = si; + si->wait_list.wait_reason = 0; + LIST_INIT(&si->wait_list.list); + return 0; } /* sets the current and previous state of a stream interface to . This diff --git a/include/types/stream_interface.h b/include/types/stream_interface.h index 0c83759da2..eae192666a 100644 --- a/include/types/stream_interface.h +++ b/include/types/stream_interface.h @@ -101,6 +101,7 @@ struct stream_interface { unsigned int err_type; /* first error detected, one of SI_ET_* */ int conn_retries; /* number of connect retries left */ unsigned int hcto; /* half-closed timeout (0 = unset) */ + struct wait_list wait_list; /* We're in a wait list */ }; /* operations available on a stream-interface */ diff --git a/src/stream.c b/src/stream.c index 9c427d4c9d..bc0f1ac708 100644 --- a/src/stream.c +++ b/src/stream.c @@ -192,7 +192,8 @@ struct stream *stream_new(struct session *sess, enum obj_type *origin) vars_init(&s->vars_reqres, SCOPE_REQ); /* this part should be common with other protocols */ - si_reset(&s->si[0]); + if (si_reset(&s->si[0]) < 0) + goto out_fail_alloc; si_set_state(&s->si[0], SI_ST_EST); s->si[0].hcto = sess->fe->timeout.clientfin; @@ -211,7 +212,8 @@ struct stream *stream_new(struct session *sess, enum obj_type *origin) /* pre-initialize the other side's stream interface to an INIT state. The * callbacks will be initialized before attempting to connect. */ - si_reset(&s->si[1]); + if (si_reset(&s->si[1]) < 0) + goto out_fail_alloc_si1; s->si[1].hcto = TICK_ETERNITY; if (likely(sess->fe->options2 & PR_O2_INDEPSTR)) @@ -288,6 +290,9 @@ struct stream *stream_new(struct session *sess, enum obj_type *origin) out_fail_accept: flt_stream_release(s, 0); task_free(t); + tasklet_free(s->si[1].wait_list.task); +out_fail_alloc_si1: + tasklet_free(s->si[0].wait_list.task); out_fail_alloc: LIST_DEL(&s->list); pool_free(pool_head_stream, s); @@ -403,6 +408,8 @@ static void stream_free(struct stream *s) if (must_free_sess) session_free(sess); + tasklet_free(s->si[0].wait_list.task); + tasklet_free(s->si[1].wait_list.task); pool_free(pool_head_stream, s); /* We may want to free the maximum amount of pools if the proxy is stopping */ diff --git a/src/stream_interface.c b/src/stream_interface.c index 290459ab31..4b5b760c85 100644 --- a/src/stream_interface.c +++ b/src/stream_interface.c @@ -651,7 +651,7 @@ static struct task * si_cs_send(struct conn_stream *cs) int did_send = 0; /* We're already waiting to be able to send, give up */ - if (cs->wait_list.wait_reason & SUB_CAN_SEND) + if (si->wait_list.wait_reason & SUB_CAN_SEND) return NULL; if (conn->flags & CO_FL_ERROR || cs->flags & CS_FL_ERROR) @@ -660,7 +660,7 @@ static struct task * si_cs_send(struct conn_stream *cs) if (conn->flags & CO_FL_HANDSHAKE) { /* a handshake was requested */ /* Schedule ourself to be woken up once the handshake is done */ - conn->xprt->subscribe(conn, SUB_CAN_SEND, wl_set_waitcb(&cs->wait_list, si_cs_io_cb, cs)); + conn->xprt->subscribe(conn, SUB_CAN_SEND, &si->wait_list); return NULL; } @@ -740,7 +740,7 @@ static struct task * si_cs_send(struct conn_stream *cs) } /* We couldn't send all of our data, let the mux know we'd like to send more */ if (co_data(oc)) - conn->mux->subscribe(cs, SUB_CAN_SEND, wl_set_waitcb(&cs->wait_list, si_cs_io_cb, cs)); + conn->mux->subscribe(cs, SUB_CAN_SEND, &si->wait_list); wake_others: /* Maybe somebody was waiting for this conn_stream, wake them */ @@ -759,7 +759,9 @@ wake_others: struct task *si_cs_io_cb(struct task *t, void *ctx, unsigned short state) { - si_cs_send(ctx); + struct stream_interface *si = ctx; + if (!(si->wait_list.wait_reason & SUB_CAN_SEND)) + si_cs_send(__objt_cs(si->end)); return (NULL); }