From fa8aa867b9159297e4061d4970b167851c259f5b Mon Sep 17 00:00:00 2001 From: Olivier Houchard Date: Wed, 10 Oct 2018 18:25:41 +0200 Subject: [PATCH] MEDIUM: connections: Change struct wait_list to wait_event. When subscribing, we don't need to provide a list element, only the h2 mux needs it. So instead, Add a list element to struct h2s, and use it when a list is needed. This forces us to use the unsubscribe method, since we can't just unsubscribe by using LIST_DEL anymore. This patch is larger than it should be because it includes some renaming. --- include/proto/connection.h | 24 +-- include/proto/stream_interface.h | 11 +- include/types/checks.h | 2 +- include/types/connection.h | 8 +- include/types/stream_interface.h | 2 +- src/checks.c | 1 - src/connection.c | 85 ++------- src/mux_h2.c | 286 +++++++++++++++---------------- src/stream.c | 23 ++- src/stream_interface.c | 14 +- 10 files changed, 195 insertions(+), 261 deletions(-) diff --git a/include/proto/connection.h b/include/proto/connection.h index 028dc232c1..2d4c4c73d2 100644 --- a/include/proto/connection.h +++ b/include/proto/connection.h @@ -627,9 +627,8 @@ static inline void conn_init(struct connection *conn) conn->destroy_cb = NULL; conn->proxy_netns = NULL; LIST_INIT(&conn->list); - LIST_INIT(&conn->send_wait_list); - LIST_INIT(&conn->recv_wait_list); - LIST_INIT(&conn->sendrecv_wait_list); + conn->send_wait = NULL; + conn->recv_wait = NULL; } /* sets as the connection's owner */ @@ -705,19 +704,10 @@ static inline struct conn_stream *cs_new(struct connection *conn) /* Releases a connection previously allocated by conn_new() */ static inline void conn_free(struct connection *conn) { - struct wait_list *sw, *sw_back; - list_for_each_entry_safe(sw, sw_back, &conn->recv_wait_list, list) { - LIST_DEL(&sw->list); - LIST_INIT(&sw->list); - } - list_for_each_entry_safe(sw, sw_back, &conn->send_wait_list, list) { - LIST_DEL(&sw->list); - LIST_INIT(&sw->list); - } - list_for_each_entry_safe(sw, sw_back, &conn->sendrecv_wait_list, list) { - LIST_DEL(&sw->list); - LIST_INIT(&sw->list); - } + if (conn->recv_wait) + conn->recv_wait->wait_reason &= ~SUB_CAN_RECV; + if (conn->send_wait) + conn->send_wait->wait_reason &= ~SUB_CAN_SEND; pool_free(pool_head_connection, conn); } @@ -786,7 +776,7 @@ static inline void cs_attach(struct conn_stream *cs, void *data, const struct da cs->data = data; } -static inline struct wait_list *wl_set_waitcb(struct wait_list *wl, struct task *(*cb)(struct task *, void *, unsigned short), void *ctx) +static inline struct wait_event *wl_set_waitcb(struct wait_event *wl, struct task *(*cb)(struct task *, void *, unsigned short), void *ctx) { if (!wl->task->process) { wl->task->process = cb; diff --git a/include/proto/stream_interface.h b/include/proto/stream_interface.h index 4a38de81bb..e83187b4de 100644 --- a/include/proto/stream_interface.h +++ b/include/proto/stream_interface.h @@ -127,13 +127,12 @@ static inline int si_reset(struct stream_interface *si) si->end = NULL; si->state = si->prev_state = SI_ST_INI; si->ops = &si_embedded_ops; - si->wait_list.task = tasklet_new(); - if (!si->wait_list.task) + si->wait_event.task = tasklet_new(); + if (!si->wait_event.task) return -1; - si->wait_list.task->process = si_cs_io_cb; - si->wait_list.task->context = si; - si->wait_list.wait_reason = 0; - LIST_INIT(&si->wait_list.list); + si->wait_event.task->process = si_cs_io_cb; + si->wait_event.task->context = si; + si->wait_event.wait_reason = 0; return 0; } diff --git a/include/types/checks.h b/include/types/checks.h index d148e3b932..e0a4bd64be 100644 --- a/include/types/checks.h +++ b/include/types/checks.h @@ -184,7 +184,7 @@ struct check { char **envp; /* the environment to use if running a process-based check */ struct pid_list *curpid; /* entry in pid_list used for current process-based test, or -1 if not in test */ struct sockaddr_storage addr; /* the address to check */ - struct wait_list wait_list; /* Waiting for I/O events */ + struct wait_event wait_list; /* Waiting for I/O events */ char *sni; /* Server name */ }; diff --git a/include/types/connection.h b/include/types/connection.h index 26f9bed588..20daa43e1e 100644 --- a/include/types/connection.h +++ b/include/types/connection.h @@ -50,9 +50,8 @@ enum sub_event_type { SUB_CAN_RECV = 0x00000002, /* Schedule the tasklet when we can recv more */ }; -struct wait_list { +struct wait_event { struct tasklet *task; - struct list list; void *handle; /* To be used by the callee */ int wait_reason; }; @@ -404,9 +403,8 @@ struct connection { enum obj_type *target; /* the target to connect to (server, proxy, applet, ...) */ /* second cache line */ - struct list send_wait_list; /* list of tasks to wake when we're ready to send */ - struct list recv_wait_list; /* list of tasks to wake when we're ready to recv */ - struct list sendrecv_wait_list; /* list of tasks to wake when we're ready to either send or recv */ + struct wait_event *send_wait; /* Task to wake when we're ready to send */ + struct wait_event *recv_wait; /* Task to wake when we're ready to recv */ struct list list; /* attach point to various connection lists (idle, ...) */ int xprt_st; /* transport layer state, initialized to zero */ int tmp_early_data; /* 1st byte of early data, if any */ diff --git a/include/types/stream_interface.h b/include/types/stream_interface.h index eae192666a..76ed72eea4 100644 --- a/include/types/stream_interface.h +++ b/include/types/stream_interface.h @@ -101,7 +101,7 @@ struct stream_interface { unsigned int err_type; /* first error detected, one of SI_ET_* */ int conn_retries; /* number of connect retries left */ unsigned int hcto; /* half-closed timeout (0 = unset) */ - struct wait_list wait_list; /* We're in a wait list */ + struct wait_event wait_event; /* We're in a wait list */ }; /* operations available on a stream-interface */ diff --git a/src/checks.c b/src/checks.c index 5772c4f28d..a3110e73ca 100644 --- a/src/checks.c +++ b/src/checks.c @@ -3142,7 +3142,6 @@ const char *init_check(struct check *check, int type) check->wait_list.task = tasklet_new(); if (!check->wait_list.task) return "out of memroy while allocating check tasklet"; - LIST_INIT(&check->wait_list.list); check->wait_list.wait_reason = 0; check->wait_list.task->process = event_srv_chk_io; check->wait_list.task->context = check; diff --git a/src/connection.c b/src/connection.c index c8f1df1163..b62cccecda 100644 --- a/src/connection.c +++ b/src/connection.c @@ -128,25 +128,12 @@ void conn_fd_handler(int fd) * both of which will be detected below. */ flags = 0; - io_available = (LIST_ISEMPTY(&conn->send_wait_list) && - LIST_ISEMPTY(&conn->sendrecv_wait_list));; - while (!LIST_ISEMPTY(&conn->send_wait_list)) { - struct wait_list *sw = LIST_ELEM(conn->send_wait_list.n, - struct wait_list *, list); - LIST_DEL(&sw->list); - LIST_INIT(&sw->list); - sw->wait_reason &= ~SUB_CAN_SEND; - tasklet_wakeup(sw->task); - } - while (!(LIST_ISEMPTY(&conn->sendrecv_wait_list))) { - struct wait_list *sw = LIST_ELEM(conn->sendrecv_wait_list.n, - struct wait_list *, list); - LIST_DEL(&sw->list); - LIST_INIT(&sw->list); - LIST_ADDQ(&conn->recv_wait_list, &sw->list); - sw->wait_reason &= ~SUB_CAN_SEND; - tasklet_wakeup(sw->task); - } + if (conn->send_wait != NULL) { + conn->send_wait->wait_reason &= ~SUB_CAN_SEND; + tasklet_wakeup(conn->send_wait->task); + conn->send_wait = NULL; + } else + io_available = 1; } /* The data transfer starts here and stops on error and handshakes. Note @@ -160,26 +147,12 @@ void conn_fd_handler(int fd) * both of which will be detected below. */ flags = 0; - io_available |= (LIST_ISEMPTY(&conn->recv_wait_list) && - LIST_ISEMPTY(&conn->sendrecv_wait_list)); - while (!LIST_ISEMPTY(&conn->recv_wait_list)) { - struct wait_list *sw = LIST_ELEM(conn->recv_wait_list.n, - struct wait_list *, list); - LIST_DEL(&sw->list); - LIST_INIT(&sw->list); - sw->wait_reason &= ~SUB_CAN_RECV; - tasklet_wakeup(sw->task); - } - while (!(LIST_ISEMPTY(&conn->sendrecv_wait_list))) { - struct wait_list *sw = LIST_ELEM(conn->sendrecv_wait_list.n, - struct wait_list *, list); - LIST_DEL(&sw->list); - LIST_INIT(&sw->list); - LIST_ADDQ(&conn->send_wait_list, &sw->list); - sw->wait_reason &= ~SUB_CAN_RECV; - tasklet_wakeup(sw->task); - } - + if (conn->recv_wait) { + conn->recv_wait->wait_reason &= ~SUB_CAN_RECV; + tasklet_wakeup(conn->recv_wait->task); + conn->recv_wait = NULL; + } else + io_available = 1; } /* It may happen during the data phase that a handshake is @@ -360,26 +333,20 @@ int conn_sock_send(struct connection *conn, const void *buf, int len, int flags) int conn_unsubscribe(struct connection *conn, int event_type, void *param) { - struct wait_list *sw; + struct wait_event *sw; if (event_type & SUB_CAN_RECV) { sw = param; if (sw->wait_reason & SUB_CAN_RECV) { - LIST_DEL(&sw->list); - LIST_INIT(&sw->list); + conn->recv_wait = NULL; sw->wait_reason &= ~SUB_CAN_RECV; - if (sw->wait_reason & SUB_CAN_SEND) - LIST_ADDQ(&conn->send_wait_list, &sw->list); } } if (event_type & SUB_CAN_SEND) { sw = param; if (sw->wait_reason & SUB_CAN_SEND) { - LIST_DEL(&sw->list); - LIST_INIT(&sw->list); + conn->send_wait = NULL; sw->wait_reason &= ~SUB_CAN_SEND; - if (sw->wait_reason & SUB_CAN_RECV) - LIST_ADDQ(&conn->recv_wait_list, &sw->list); } } return 0; @@ -387,21 +354,13 @@ int conn_unsubscribe(struct connection *conn, int event_type, void *param) int conn_subscribe(struct connection *conn, int event_type, void *param) { - struct wait_list *sw; + struct wait_event *sw; if (event_type & SUB_CAN_RECV) { sw = param; if (!(sw->wait_reason & SUB_CAN_RECV)) { sw->wait_reason |= SUB_CAN_RECV; - /* If we're already subscribed for send(), move it - * to the send+recv list - */ - if (sw->wait_reason & SUB_CAN_SEND) { - LIST_DEL(&sw->list); - LIST_INIT(&sw->list); - LIST_ADDQ(&conn->sendrecv_wait_list, &sw->list); - } else - LIST_ADDQ(&conn->recv_wait_list, &sw->list); + conn->recv_wait = sw; } event_type &= ~SUB_CAN_RECV; } @@ -409,15 +368,7 @@ int conn_subscribe(struct connection *conn, int event_type, void *param) sw = param; if (!(sw->wait_reason & SUB_CAN_SEND)) { sw->wait_reason |= SUB_CAN_SEND; - /* If we're already subscribed for recv(), move it - * to the send+recv list - */ - if (sw->wait_reason & SUB_CAN_RECV) { - LIST_DEL(&sw->list); - LIST_INIT(&sw->list); - LIST_ADDQ(&conn->sendrecv_wait_list, &sw->list); - } else - LIST_ADDQ(&conn->send_wait_list, &sw->list); + conn->send_wait = sw; } event_type &= ~SUB_CAN_SEND; } diff --git a/src/mux_h2.c b/src/mux_h2.c index 8620151462..568168f532 100644 --- a/src/mux_h2.c +++ b/src/mux_h2.c @@ -120,7 +120,7 @@ struct h2c { struct list send_list; /* list of blocked streams requesting to send */ struct list fctl_list; /* list of streams blocked by connection's fctl */ struct buffer_wait buf_wait; /* wait list for buffer allocations */ - struct wait_list wait_list; /* We're in a wait list, to send */ + struct wait_event wait_event; /* To be used if we're waiting for I/Os */ }; /* H2 stream state, in h2s->st */ @@ -183,8 +183,10 @@ struct h2s { enum h2_ss st; uint16_t status; /* HTTP response status */ struct buffer rxbuf; /* receive buffer, always valid (buf_empty or real buffer) */ - struct wait_list wait_list; /* Wait list, when we're attempting to send a RST but we can't send */ - struct wait_list *recv_wait_list; /* Address of the wait_list the conn_stream associated is waiting on */ + struct wait_event wait_event; /* Wait list, when we're attempting to send a RST but we can't send */ + struct wait_event *recv_wait; /* Address of the wait_event the conn_stream associated is waiting on */ + struct wait_event *send_wait; /* The streeam is waiting for flow control */ + struct list list; /* To be used when adding in h2c->send_list or h2c->fctl_lsit */ }; /* descriptor for an h2 frame header */ @@ -284,7 +286,7 @@ static int h2_buf_available(void *target) h2c->flags &= ~H2_CF_DEM_DALLOC; if (h2_recv_allowed(h2c)) { conn_xprt_want_recv(h2c->conn); - tasklet_wakeup(h2c->wait_list.task); + tasklet_wakeup(h2c->wait_event.task); } return 1; } @@ -298,7 +300,7 @@ static int h2_buf_available(void *target) h2c->flags &= ~H2_CF_DEM_MROOM; if (h2_recv_allowed(h2c)) { conn_xprt_want_recv(h2c->conn); - tasklet_wakeup(h2c->wait_list.task); + tasklet_wakeup(h2c->wait_event.task); } } return 1; @@ -310,7 +312,7 @@ static int h2_buf_available(void *target) h2c->flags &= ~H2_CF_DEM_SALLOC; if (h2_recv_allowed(h2c)) { conn_xprt_want_recv(h2c->conn); - tasklet_wakeup(h2c->wait_list.task); + tasklet_wakeup(h2c->wait_event.task); } return 1; } @@ -375,13 +377,12 @@ static int h2c_frt_init(struct connection *conn) t->expire = tick_add(now_ms, h2c->timeout); } - h2c->wait_list.task = tasklet_new(); - if (!h2c->wait_list.task) + h2c->wait_event.task = tasklet_new(); + if (!h2c->wait_event.task) goto fail; - h2c->wait_list.task->process = h2_io_cb; - h2c->wait_list.task->context = h2c; - h2c->wait_list.wait_reason = 0; - LIST_INIT(&h2c->wait_list.list); + h2c->wait_event.task->process = h2_io_cb; + h2c->wait_event.task->context = h2c; + h2c->wait_event.wait_reason = 0; h2c->ddht = hpack_dht_alloc(h2_settings_header_table_size); if (!h2c->ddht) @@ -418,13 +419,13 @@ static int h2c_frt_init(struct connection *conn) /* prepare to read something */ conn_xprt_want_recv(conn); - tasklet_wakeup(h2c->wait_list.task); + tasklet_wakeup(h2c->wait_event.task); return 0; fail: if (t) task_free(t); - if (h2c->wait_list.task) - tasklet_free(h2c->wait_list.task); + if (h2c->wait_event.task) + tasklet_free(h2c->wait_event.task); pool_free(pool_head_h2c, h2c); fail_no_h2c: return -1; @@ -484,23 +485,11 @@ static void h2_release(struct connection *conn) task_wakeup(h2c->task, TASK_WOKEN_OTHER); h2c->task = NULL; } - if (h2c->wait_list.task) - tasklet_free(h2c->wait_list.task); - LIST_DEL(&h2c->wait_list.list); - LIST_INIT(&h2c->wait_list.list); - while (!LIST_ISEMPTY(&h2c->send_list)) { - struct wait_list *sw = LIST_ELEM(h2c->send_list.n, - struct wait_list *, list); - LIST_DEL(&sw->list); - LIST_INIT(&sw->list); - } - while (!LIST_ISEMPTY(&h2c->fctl_list)) { - struct wait_list *sw = LIST_ELEM(h2c->fctl_list.n, - struct wait_list *, list); - LIST_DEL(&sw->list); - LIST_INIT(&sw->list); - } - + if (h2c->wait_event.task) + tasklet_free(h2c->wait_event.task); + if (h2c->wait_event.wait_reason != 0) + conn->xprt->unsubscribe(conn, h2c->wait_event.wait_reason, + &h2c->wait_event); pool_free(pool_head_h2c, h2c); } @@ -665,9 +654,17 @@ static void h2s_destroy(struct h2s *h2s) b_free(&h2s->rxbuf); offer_buffers(NULL, tasks_run_queue); } - LIST_DEL(&h2s->wait_list.list); - LIST_INIT(&h2s->wait_list.list); - tasklet_free(h2s->wait_list.task); + if (h2s->send_wait != NULL) + h2s->send_wait->wait_reason &= ~SUB_CAN_SEND; + if (h2s->recv_wait != NULL) + h2s->recv_wait->wait_reason &= ~SUB_CAN_RECV; + /* There's no need to explicitely call unsubscribe here, the only + * reference left would be in the h2c send_list/fctl_list, and if + * we're in it, we're getting out anyway + */ + LIST_DEL(&h2s->list); + LIST_INIT(&h2s->list); + tasklet_free(h2s->wait_event.task); pool_free(pool_head_h2s, h2s); } @@ -684,16 +681,18 @@ static struct h2s *h2c_stream_new(struct h2c *h2c, int id) if (!h2s) goto out; - h2s->wait_list.task = tasklet_new(); - if (!h2s->wait_list.task) - goto out_free_h2s; - - LIST_INIT(&h2s->wait_list.list); - h2s->recv_wait_list = NULL; - h2s->wait_list.task->process = h2_deferred_shut; - h2s->wait_list.task->context = h2s; - h2s->wait_list.handle = NULL; - h2s->wait_list.wait_reason = 0; + h2s->wait_event.task = tasklet_new(); + if (!h2s->wait_event.task) { + pool_free(pool_head_h2s, h2s); + goto out; + } + h2s->send_wait = NULL; + h2s->recv_wait = NULL; + h2s->wait_event.task->process = h2_deferred_shut; + h2s->wait_event.task->context = h2s; + h2s->wait_event.handle = NULL; + h2s->wait_event.wait_reason = 0; + LIST_INIT(&h2s->list); h2s->h2c = h2c; h2s->mws = h2c->miw; h2s->flags = H2_SF_NONE; @@ -1130,11 +1129,11 @@ static void h2_wake_some_streams(struct h2c *h2c, int last, uint32_t flags) } h2s->cs->flags |= flags; - if (h2s->recv_wait_list) { - struct wait_list *sw = h2s->recv_wait_list; + if (h2s->recv_wait) { + struct wait_event *sw = h2s->recv_wait; sw->wait_reason &= ~SUB_CAN_RECV; tasklet_wakeup(sw->task); - h2s->recv_wait_list = NULL; + h2s->recv_wait = NULL; } else if (h2s->cs->data_cb->wake != NULL) h2s->cs->data_cb->wake(h2s->cs); @@ -1608,12 +1607,12 @@ static int h2c_handle_rst_stream(struct h2c *h2c, struct h2s *h2s) if (h2s->cs) { h2s->cs->flags |= CS_FL_REOS | CS_FL_ERROR; - if (h2s->recv_wait_list) { - struct wait_list *sw = h2s->recv_wait_list; + if (h2s->recv_wait) { + struct wait_event *sw = h2s->recv_wait; sw->wait_reason &= ~SUB_CAN_RECV; tasklet_wakeup(sw->task); - h2s->recv_wait_list = NULL; + h2s->recv_wait = NULL; } } @@ -1899,10 +1898,10 @@ static void h2_process_demux(struct h2c *h2c) if (tmp_h2s != h2s && h2s && h2s->cs && b_data(&h2s->rxbuf)) { /* we may have to signal the upper layers */ h2s->cs->flags |= CS_FL_RCV_MORE; - if (h2s->recv_wait_list) { - h2s->recv_wait_list->wait_reason &= ~SUB_CAN_RECV; - tasklet_wakeup(h2s->recv_wait_list->task); - h2s->recv_wait_list = NULL; + if (h2s->recv_wait) { + h2s->recv_wait->wait_reason &= ~SUB_CAN_RECV; + tasklet_wakeup(h2s->recv_wait->task); + h2s->recv_wait = NULL; } if (h2c->st0 >= H2_CS_ERROR) goto strm_err; @@ -2143,10 +2142,10 @@ static void h2_process_demux(struct h2c *h2c) if (h2s && h2s->cs && b_data(&h2s->rxbuf)) { /* we may have to signal the upper layers */ h2s->cs->flags |= CS_FL_RCV_MORE; - if (h2s->recv_wait_list) { - h2s->recv_wait_list->wait_reason &= ~SUB_CAN_RECV; - tasklet_wakeup(h2s->recv_wait_list->task); - h2s->recv_wait_list = NULL; + if (h2s->recv_wait) { + h2s->recv_wait->wait_reason &= ~SUB_CAN_RECV; + tasklet_wakeup(h2s->recv_wait->task); + h2s->recv_wait = NULL; } } return; @@ -2157,8 +2156,7 @@ static void h2_process_demux(struct h2c *h2c) */ static int h2_process_mux(struct h2c *h2c) { - struct h2s *h2s; - struct wait_list *sw, *sw_back; + struct h2s *h2s, *h2s_back; /* start by sending possibly pending window updates */ if (h2c->rcvd_c > 0 && @@ -2171,48 +2169,29 @@ static int h2_process_mux(struct h2c *h2c) * blocked just on this. */ - list_for_each_entry_safe(sw, sw_back, &h2c->fctl_list, list) { - h2s = sw->handle; + list_for_each_entry_safe(h2s, h2s_back, &h2c->fctl_list, list) { if (h2c->mws <= 0 || h2c->flags & H2_CF_MUX_BLOCK_ANY || h2c->st0 >= H2_CS_ERROR) break; - /* If the tasklet was added to finish shutr/shutw, just wake the task */ - if ((long)(h2s) & 3) { - sw->wait_reason &= ~SUB_CAN_SEND; - LIST_DEL(&sw->list); - LIST_INIT(&sw->list); - tasklet_wakeup(sw->task); - } else if (!(h2s->flags & H2_SF_BLK_SFCTL)) { - h2s->flags &= ~H2_SF_BLK_ANY; - LIST_DEL(&sw->list); - LIST_INIT(&sw->list); - sw->wait_reason &= ~SUB_CAN_SEND; - tasklet_wakeup(sw->task); - } + h2s->flags &= ~H2_SF_BLK_ANY; + h2s->send_wait->wait_reason &= ~SUB_CAN_SEND; + tasklet_wakeup(h2s->send_wait->task); + h2s->send_wait = NULL; + LIST_DEL(&h2s->list); + LIST_INIT(&h2s->list); } - list_for_each_entry_safe(sw, sw_back, &h2c->send_list, list) { - h2s = sw->handle; - + list_for_each_entry_safe(h2s, h2s_back, &h2c->send_list, list) { if (h2c->st0 >= H2_CS_ERROR || h2c->flags & H2_CF_MUX_BLOCK_ANY) break; - /* If the tasklet was added to finish shutr/shutw, just wake the task */ - if ((long)(h2s) & 3) { - sw->wait_reason &= ~SUB_CAN_SEND; - LIST_DEL(&sw->list); - LIST_INIT(&sw->list); - tasklet_wakeup(sw->task); - } - else if (!(h2s->flags & H2_SF_BLK_SFCTL)) { - h2s->flags &= ~H2_SF_BLK_ANY; - - LIST_DEL(&sw->list); - LIST_INIT(&sw->list); - sw->wait_reason &= ~SUB_CAN_SEND; - tasklet_wakeup(sw->task); - } + h2s->flags &= ~H2_SF_BLK_ANY; + h2s->send_wait->wait_reason &= ~SUB_CAN_SEND; + tasklet_wakeup(h2s->send_wait->task); + h2s->send_wait = NULL; + LIST_DEL(&h2s->list); + LIST_INIT(&h2s->list); } fail: @@ -2240,7 +2219,7 @@ static int h2_recv(struct h2c *h2c) int max; size_t ret; - if (h2c->wait_list.wait_reason & SUB_CAN_RECV) + if (h2c->wait_event.wait_reason & SUB_CAN_RECV) return 0; if (!h2_recv_allowed(h2c)) @@ -2262,7 +2241,7 @@ static int h2_recv(struct h2c *h2c) if (h2_recv_allowed(h2c)) { conn_xprt_want_recv(conn); - conn->xprt->subscribe(conn, SUB_CAN_RECV, &h2c->wait_list); + conn->xprt->subscribe(conn, SUB_CAN_RECV, &h2c->wait_event); } if (!b_data(buf)) { h2_release_buf(h2c, &h2c->dbuf); @@ -2345,20 +2324,21 @@ static int h2_send(struct h2c *h2c) */ if (!(h2c->flags & (H2_CF_MUX_MFULL | H2_CF_DEM_MROOM))) { while (!LIST_ISEMPTY(&h2c->send_list)) { - struct wait_list *sw = LIST_ELEM(h2c->send_list.n, - struct wait_list *, list); - LIST_DEL(&sw->list); - LIST_INIT(&sw->list); - sw->wait_reason &= ~SUB_CAN_SEND; - tasklet_wakeup(sw->task); + struct h2s *h2s = LIST_ELEM(h2c->send_list.n, + struct h2s *, list); + LIST_DEL(&h2s->list); + LIST_INIT(&h2s->list); + h2s->send_wait->wait_reason &= ~SUB_CAN_SEND; + tasklet_wakeup(h2s->send_wait->task); + h2s->send_wait = NULL; } } /* We're done, no more to send */ if (!b_data(&h2c->mbuf)) return sent; schedule: - if (LIST_ISEMPTY(&h2c->wait_list.list)) - conn->xprt->subscribe(conn, SUB_CAN_SEND, &h2c->wait_list); + if (!(h2c->wait_event.wait_reason & SUB_CAN_SEND)) + conn->xprt->subscribe(conn, SUB_CAN_SEND, &h2c->wait_event); return sent; } @@ -2367,9 +2347,9 @@ static struct task *h2_io_cb(struct task *t, void *ctx, unsigned short status) struct h2c *h2c = ctx; int ret = 0; - if (!(h2c->wait_list.wait_reason & SUB_CAN_SEND)) + if (!(h2c->wait_event.wait_reason & SUB_CAN_SEND)) ret = h2_send(h2c); - if (!(h2c->wait_list.wait_reason & SUB_CAN_RECV)) + if (!(h2c->wait_event.wait_reason & SUB_CAN_RECV)) ret |= h2_recv(h2c); if (ret) h2_process(h2c); @@ -2423,11 +2403,11 @@ static int h2_process(struct h2c *h2c) while (node) { h2s = container_of(node, struct h2s, by_id); if ((h2s->cs->flags & CS_FL_WAIT_FOR_HS) && - h2s->recv_wait_list) { - struct wait_list *sw = h2s->recv_wait_list; + h2s->recv_wait) { + struct wait_event *sw = h2s->recv_wait; sw->wait_reason &= ~SUB_CAN_RECV; tasklet_wakeup(sw->task); - h2s->recv_wait_list = NULL; + h2s->recv_wait = NULL; } node = eb32_next(node); } @@ -2575,7 +2555,7 @@ static void h2_update_poll(struct conn_stream *cs) h2s->h2c->flags &= ~H2_CF_DEM_SFULL; if (h2s->h2c->dsi == h2s->id) { conn_xprt_want_recv(cs->conn); - tasklet_wakeup(h2s->h2c->wait_list.task); + tasklet_wakeup(h2s->h2c->wait_event.task); conn_xprt_want_send(cs->conn); } } @@ -2590,7 +2570,7 @@ static void h2_update_poll(struct conn_stream *cs) if (cs->flags & CS_FL_DATA_WR_ENA) { if (!b_data(&h2s->h2c->mbuf) && !(cs->conn->flags & CO_FL_SOCK_WR_SH)) conn_xprt_want_send(cs->conn); - tasklet_wakeup(h2s->h2c->wait_list.task); + tasklet_wakeup(h2s->h2c->wait_event.task); } /* We don't support unsubscribing from here, it shouldn't be a problem */ @@ -2612,9 +2592,6 @@ static void h2_detach(struct conn_stream *cs) return; h2c = h2s->h2c; - /* If the stream we're detaching waited for more data, unsubscribe it now */ - if (h2s->recv_wait_list && !((long)h2s->recv_wait_list->handle & 3)) - h2s->recv_wait_list = NULL; h2s->cs = NULL; h2c->nb_cs--; if (h2c->flags & H2_CF_DEM_TOOMANY && @@ -2622,7 +2599,7 @@ static void h2_detach(struct conn_stream *cs) h2c->flags &= ~H2_CF_DEM_TOOMANY; if (h2_recv_allowed(h2c)) { __conn_xprt_want_recv(h2c->conn); - tasklet_wakeup(h2c->wait_list.task); + tasklet_wakeup(h2c->wait_event.task); conn_xprt_want_send(h2c->conn); } } @@ -2643,7 +2620,7 @@ static void h2_detach(struct conn_stream *cs) h2c->flags &= ~H2_CF_DEM_BLOCK_ANY; h2c->flags &= ~H2_CF_MUX_BLOCK_ANY; conn_xprt_want_recv(cs->conn); - tasklet_wakeup(h2c->wait_list.task); + tasklet_wakeup(h2c->wait_event.task); conn_xprt_want_send(cs->conn); } @@ -2677,7 +2654,7 @@ static void h2_detach(struct conn_stream *cs) static void h2_do_shutr(struct h2s *h2s) { struct h2c *h2c = h2s->h2c; - struct wait_list *sw = &h2s->wait_list; + struct wait_event *sw = &h2s->wait_event; if (h2s->st == H2_SS_HLOC || h2s->st == H2_SS_ERROR || h2s->st == H2_SS_CLOSED) return; @@ -2702,12 +2679,15 @@ static void h2_do_shutr(struct h2s *h2s) return; add_to_list: - if (LIST_ISEMPTY(&sw->list)) { + if (LIST_ISEMPTY(&h2s->list)) { sw->wait_reason |= SUB_CAN_SEND; - if (h2s->flags & H2_SF_BLK_MFCTL) - LIST_ADDQ(&h2c->fctl_list, &sw->list); - else if (h2s->flags & (H2_SF_BLK_MBUSY|H2_SF_BLK_MROOM)) - LIST_ADDQ(&h2c->send_list, &sw->list); + if (h2s->flags & H2_SF_BLK_MFCTL) { + LIST_ADDQ(&h2c->fctl_list, &h2s->list); + h2s->send_wait = sw; + } else if (h2s->flags & (H2_SF_BLK_MBUSY|H2_SF_BLK_MROOM)) { + h2s->send_wait = sw; + LIST_ADDQ(&h2c->send_list, &h2s->list); + } } /* Let the handler know we want shutr */ sw->handle = (void *)((long)sw->handle | 1); @@ -2717,7 +2697,7 @@ add_to_list: static void h2_do_shutw(struct h2s *h2s) { struct h2c *h2c = h2s->h2c; - struct wait_list *sw = &h2s->wait_list; + struct wait_event *sw = &h2s->wait_event; if (h2s->st == H2_SS_HLOC || h2s->st == H2_SS_ERROR || h2s->st == H2_SS_CLOSED) return; @@ -2755,23 +2735,25 @@ static void h2_do_shutw(struct h2s *h2s) conn_xprt_want_send(h2c->conn); add_to_list: - sw = &h2s->wait_list; - - if (LIST_ISEMPTY(&sw->list)) { + if (LIST_ISEMPTY(&h2s->list)) { sw->wait_reason |= SUB_CAN_SEND; - if (h2s->flags & H2_SF_BLK_MFCTL) - LIST_ADDQ(&h2s->h2c->fctl_list, &sw->list); - else if (h2s->flags & (H2_SF_BLK_MBUSY|H2_SF_BLK_MROOM)) - LIST_ADDQ(&h2s->h2c->send_list, &sw->list); + if (h2s->flags & H2_SF_BLK_MFCTL) { + LIST_ADDQ(&h2c->fctl_list, &h2s->list); + h2s->send_wait = sw; + } else if (h2s->flags & (H2_SF_BLK_MBUSY|H2_SF_BLK_MROOM)) { + h2s->send_wait = sw; + LIST_ADDQ(&h2c->send_list, &h2s->list); + } } - /* let the handler know we want to shutr */ - sw->handle = (void *)((long)(sw->handle) | 2); + /* let the handler know we want to shutw */ + sw->handle = (void *)((long)(sw->handle) | 2); + } static struct task *h2_deferred_shut(struct task *t, void *ctx, unsigned short state) { struct h2s *h2s = ctx; - long reason = (long)h2s->wait_list.handle; + long reason = (long)h2s->wait_event.handle; if (reason & 1) h2_do_shutr(h2s); @@ -3537,7 +3519,7 @@ static size_t h2s_frt_make_resp_data(struct h2s *h2s, const struct buffer *buf, /* Called from the upper layer, to subscribe to events, such as being able to send */ static int h2_subscribe(struct conn_stream *cs, int event_type, void *param) { - struct wait_list *sw; + struct wait_event *sw; struct h2s *h2s = cs->ctx; struct h2c *h2c = h2s->h2c; @@ -3546,7 +3528,7 @@ static int h2_subscribe(struct conn_stream *cs, int event_type, void *param) if (!(sw->wait_reason & SUB_CAN_RECV)) { sw->wait_reason |= SUB_CAN_RECV; sw->handle = h2s; - h2s->recv_wait_list = sw; + h2s->recv_wait = sw; } event_type &= ~SUB_CAN_RECV; } @@ -3555,10 +3537,13 @@ static int h2_subscribe(struct conn_stream *cs, int event_type, void *param) if (!(sw->wait_reason & SUB_CAN_SEND)) { sw->wait_reason |= SUB_CAN_SEND; sw->handle = h2s; - if (h2s->flags & H2_SF_BLK_MFCTL) - LIST_ADDQ(&h2c->fctl_list, &sw->list); - else - LIST_ADDQ(&h2c->send_list, &sw->list); + h2s->send_wait = sw; + if (!(h2s->flags & H2_SF_BLK_SFCTL)) { + if (h2s->flags & H2_SF_BLK_MFCTL) + LIST_ADDQ(&h2c->fctl_list, &h2s->list); + else + LIST_ADDQ(&h2c->send_list, &h2s->list); + } } event_type &= ~SUB_CAN_SEND; } @@ -3571,21 +3556,23 @@ static int h2_subscribe(struct conn_stream *cs, int event_type, void *param) static int h2_unsubscribe(struct conn_stream *cs, int event_type, void *param) { - struct wait_list *sw; + struct wait_event *sw; struct h2s *h2s = cs->ctx; if (event_type & SUB_CAN_RECV) { sw = param; - if (h2s->recv_wait_list == sw) { + if (h2s->recv_wait == sw) { sw->wait_reason &= ~SUB_CAN_RECV; - h2s->recv_wait_list = NULL; + h2s->recv_wait = NULL; } } if (event_type & SUB_CAN_SEND) { sw = param; - if (sw->wait_reason & SUB_CAN_SEND) { - LIST_DEL(&sw->list); - LIST_INIT(&sw->list); + if (h2s->send_wait == sw) { + LIST_DEL(&h2s->list); + LIST_INIT(&h2s->list); + sw->wait_reason &= ~SUB_CAN_SEND; + h2s->send_wait = NULL; } } return 0; @@ -3681,8 +3668,8 @@ static size_t h2_snd_buf(struct conn_stream *cs, struct buffer *buf, size_t coun b_del(buf, total); if (total > 0) { conn_xprt_want_send(h2s->h2c->conn); - if (!(h2s->h2c->wait_list.wait_reason & SUB_CAN_SEND)) - tasklet_wakeup(h2s->h2c->wait_list.task); + if (!(h2s->h2c->wait_event.wait_reason & SUB_CAN_SEND)) + tasklet_wakeup(h2s->h2c->wait_event.task); } return total; } @@ -3692,7 +3679,6 @@ static void h2_show_fd(struct buffer *msg, struct connection *conn) { struct h2c *h2c = conn->mux_ctx; struct h2s *h2s; - struct wait_list *sw; struct eb32_node *node; int fctl_cnt = 0; int send_cnt = 0; @@ -3702,10 +3688,10 @@ static void h2_show_fd(struct buffer *msg, struct connection *conn) if (!h2c) return; - list_for_each_entry(sw, &h2c->fctl_list, list) + list_for_each_entry(h2s, &h2c->fctl_list, list) fctl_cnt++; - list_for_each_entry(sw, &h2c->send_list, list) + list_for_each_entry(h2s, &h2c->send_list, list) send_cnt++; node = eb32_first(&h2c->streams_by_id); diff --git a/src/stream.c b/src/stream.c index b616346741..97c4d9bb58 100644 --- a/src/stream.c +++ b/src/stream.c @@ -288,9 +288,9 @@ struct stream *stream_new(struct session *sess, enum obj_type *origin) out_fail_accept: flt_stream_release(s, 0); task_free(t); - tasklet_free(s->si[1].wait_list.task); + tasklet_free(s->si[1].wait_event.task); out_fail_alloc_si1: - tasklet_free(s->si[0].wait_list.task); + tasklet_free(s->si[0].wait_event.task); out_fail_alloc: LIST_DEL(&s->list); pool_free(pool_head_stream, s); @@ -406,10 +406,21 @@ static void stream_free(struct stream *s) if (must_free_sess) session_free(sess); - tasklet_free(s->si[0].wait_list.task); - LIST_DEL(&s->si[0].wait_list.list); - tasklet_free(s->si[1].wait_list.task); - LIST_DEL(&s->si[1].wait_list.list); + tasklet_free(s->si[0].wait_event.task); + if (s->si[0].wait_event.wait_reason != 0) { + struct conn_stream *cs = objt_cs(s->si[0].end); + if (cs) + cs->conn->mux->unsubscribe(cs, s->si[0].wait_event.wait_reason, + &s->si[0].wait_event); + } + tasklet_free(s->si[1].wait_event.task); + if (s->si[1].wait_event.wait_reason != 0) { + struct conn_stream *cs = objt_cs(s->si[1].end); + if (cs) + cs->conn->mux->unsubscribe(cs, s->si[1].wait_event.wait_reason, + &s->si[1].wait_event); + } + pool_free(pool_head_stream, s); /* We may want to free the maximum amount of pools if the proxy is stopping */ diff --git a/src/stream_interface.c b/src/stream_interface.c index 759b5f0a76..a0487ef5ab 100644 --- a/src/stream_interface.c +++ b/src/stream_interface.c @@ -632,7 +632,7 @@ int si_cs_send(struct conn_stream *cs) int did_send = 0; /* We're already waiting to be able to send, give up */ - if (si->wait_list.wait_reason & SUB_CAN_SEND) + if (si->wait_event.wait_reason & SUB_CAN_SEND) return 0; if (conn->flags & CO_FL_ERROR || cs->flags & CS_FL_ERROR) @@ -641,7 +641,7 @@ int si_cs_send(struct conn_stream *cs) if (conn->flags & CO_FL_HANDSHAKE) { /* a handshake was requested */ /* Schedule ourself to be woken up once the handshake is done */ - conn->xprt->subscribe(conn, SUB_CAN_SEND, &si->wait_list); + conn->xprt->subscribe(conn, SUB_CAN_SEND, &si->wait_event); return 0; } @@ -722,7 +722,7 @@ int si_cs_send(struct conn_stream *cs) /* We couldn't send all of our data, let the mux know we'd like to send more */ if (co_data(oc)) { cs_want_send(cs); - conn->mux->subscribe(cs, SUB_CAN_SEND, &si->wait_list); + conn->mux->subscribe(cs, SUB_CAN_SEND, &si->wait_event); } return did_send; } @@ -736,9 +736,9 @@ struct task *si_cs_io_cb(struct task *t, void *ctx, unsigned short state) if (!cs) return NULL; redo: - if (!(si->wait_list.wait_reason & SUB_CAN_SEND)) + if (!(si->wait_event.wait_reason & SUB_CAN_SEND)) ret = si_cs_send(cs); - if (!(si->wait_list.wait_reason & SUB_CAN_RECV)) + if (!(si->wait_event.wait_reason & SUB_CAN_RECV)) ret |= si_cs_recv(cs); if (ret != 0) si_cs_process(cs); @@ -1137,7 +1137,7 @@ int si_cs_recv(struct conn_stream *cs) /* If another call to si_cs_recv() failed, and we subscribed to * recv events already, give up now. */ - if (si->wait_list.wait_reason & SUB_CAN_RECV) + if (si->wait_event.wait_reason & SUB_CAN_RECV) return 0; /* maybe we were called immediately after an asynchronous shutr */ @@ -1347,7 +1347,7 @@ int si_cs_recv(struct conn_stream *cs) goto out_shutdown_r; /* Subscribe to receive events */ - conn->mux->subscribe(cs, SUB_CAN_RECV, &si->wait_list); + conn->mux->subscribe(cs, SUB_CAN_RECV, &si->wait_event); return cur_read != 0; -- 2.39.5