From: Christopher Faulet Date: Fri, 1 Apr 2022 15:06:32 +0000 (+0200) Subject: MINOR: conn-stream: Move si_conn_cb in the conn-stream scope X-Git-Tag: v2.6-dev6~50 X-Git-Url: http://git.ipfire.org/?a=commitdiff_plain;h=000ba3e613b31986d94b8e6eeeb76a22095b55ac;p=thirdparty%2Fhaproxy.git MINOR: conn-stream: Move si_conn_cb in the conn-stream scope si_conn_cb variable is renamed cs_data_conn_cb. In addtion, its associated functions are also renamed. si_cs_recv(), si_cs_send() and si_cs_process() are renamed cs_conn_recv(), cs_conn_send and cs_conn_process(). These functions are updated to manipulate conn-streams instead of stream-interfaces. --- diff --git a/include/haproxy/stream_interface.h b/include/haproxy/stream_interface.h index 63e55ff26b..8f9c98fd19 100644 --- a/include/haproxy/stream_interface.h +++ b/include/haproxy/stream_interface.h @@ -29,7 +29,7 @@ #include #include -extern struct data_cb si_conn_cb; +extern struct data_cb cs_data_conn_cb; extern struct data_cb cs_data_applet_cb; extern struct data_cb check_conn_cb; @@ -45,9 +45,9 @@ void cs_conn_sync_send(struct conn_stream *cs); /* Functions used to communicate with a conn_stream. The first two may be used * directly, the last one is mostly a wake callback. */ -int si_cs_recv(struct conn_stream *cs); -int si_cs_send(struct conn_stream *cs); -int si_cs_process(struct conn_stream *cs); +int cs_conn_recv(struct conn_stream *cs); +int cs_conn_send(struct conn_stream *cs); +int cs_conn_process(struct conn_stream *cs); /* returns the channel which receives data from this stream interface (input channel) */ static inline struct channel *si_ic(struct stream_interface *si) diff --git a/src/conn_stream.c b/src/conn_stream.c index 7f72e1c616..0a2edf12d1 100644 --- a/src/conn_stream.c +++ b/src/conn_stream.c @@ -233,7 +233,7 @@ int cs_attach_mux(struct conn_stream *cs, void *target, void *ctx) } cs->ops = &cs_app_conn_ops; - cs->data_cb = &si_conn_cb; + cs->data_cb = &cs_data_conn_cb; } else if (cs_check(cs)) cs->data_cb = &check_conn_cb; @@ -278,7 +278,7 @@ int cs_attach_strm(struct conn_stream *cs, struct stream *strm) cs->wait_event.events = 0; cs->ops = &cs_app_conn_ops; - cs->data_cb = &si_conn_cb; + cs->data_cb = &cs_data_conn_cb; } else if (cs->endp->flags & CS_EP_T_APPLET) { cs->ops = &cs_app_applet_ops; @@ -728,7 +728,7 @@ static void cs_app_chk_snd_conn(struct conn_stream *cs) return; if (!(cs->wait_event.events & SUB_RETRY_SEND) && !channel_is_empty(cs_oc(cs))) - si_cs_send(cs); + cs_conn_send(cs); if (cs->endp->flags & (CS_EP_ERROR|CS_EP_ERR_PENDING) || si_is_conn_error(cs->si)) { /* Write error on the file descriptor */ diff --git a/src/stream_interface.c b/src/stream_interface.c index 32b0d29190..11cd3395b2 100644 --- a/src/stream_interface.c +++ b/src/stream_interface.c @@ -47,17 +47,18 @@ static void cs_conn_read0(struct conn_stream *cs); /* post-IO notification callback */ static void cs_notify(struct conn_stream *cs); -struct data_cb si_conn_cb = { - .wake = si_cs_process, + +struct data_cb cs_data_conn_cb = { + .wake = cs_conn_process, .name = "STRM", }; - struct data_cb cs_data_applet_cb = { .wake = cs_applet_process, .name = "STRM", }; + struct stream_interface *si_new(struct conn_stream *cs) { struct stream_interface *si; @@ -222,18 +223,17 @@ static void cs_notify(struct conn_stream *cs) * connection's polling based on the channels and stream interface's final * states. The function always returns 0. */ -int si_cs_process(struct conn_stream *cs) +int cs_conn_process(struct conn_stream *cs) { struct connection *conn = __cs_conn(cs); - struct stream_interface *si = cs_si(cs); - struct channel *ic = si_ic(si); - struct channel *oc = si_oc(si); + struct channel *ic = cs_ic(cs); + struct channel *oc = cs_oc(cs); BUG_ON(!conn); /* If we have data to send, try it now */ - if (!channel_is_empty(oc) && !(si->cs->wait_event.events & SUB_RETRY_SEND)) - si_cs_send(cs); + if (!channel_is_empty(oc) && !(cs->wait_event.events & SUB_RETRY_SEND)) + cs_conn_send(cs); /* First step, report to the conn-stream what was detected at the * connection layer : errors and connection establishment. @@ -243,13 +243,13 @@ int si_cs_process(struct conn_stream *cs) * to retry to connect, the connection may still have CO_FL_ERROR, * and we don't want to add CS_EP_ERROR back * - * Note: This test is only required because si_cs_process is also the SI - * wake callback. Otherwise si_cs_recv()/si_cs_send() already take + * Note: This test is only required because cs_conn_process is also the SI + * wake callback. Otherwise cs_conn_recv()/cs_conn_send() already take * care of it. */ - if (si->cs->state >= CS_ST_CON) { - if (si_is_conn_error(si)) + if (cs->state >= CS_ST_CON) { + if (si_is_conn_error(cs->si)) cs->endp->flags |= CS_EP_ERROR; } @@ -261,22 +261,22 @@ int si_cs_process(struct conn_stream *cs) if (!(conn->flags & (CO_FL_WAIT_XPRT | CO_FL_EARLY_SSL_HS)) && (cs->endp->flags & CS_EP_WAIT_FOR_HS)) { cs->endp->flags &= ~CS_EP_WAIT_FOR_HS; - task_wakeup(si_task(si), TASK_WOKEN_MSG); + task_wakeup(cs_strm_task(cs), TASK_WOKEN_MSG); } - if (!cs_state_in(si->cs->state, CS_SB_EST|CS_SB_DIS|CS_SB_CLO) && + if (!cs_state_in(cs->state, CS_SB_EST|CS_SB_DIS|CS_SB_CLO) && (conn->flags & CO_FL_WAIT_XPRT) == 0) { __cs_strm(cs)->conn_exp = TICK_ETERNITY; oc->flags |= CF_WRITE_NULL; - if (si->cs->state == CS_ST_CON) - si->cs->state = CS_ST_RDY; + if (cs->state == CS_ST_CON) + cs->state = CS_ST_RDY; } /* Report EOS on the channel if it was reached from the mux point of * view. * - * Note: This test is only required because si_cs_process is also the SI - * wake callback. Otherwise si_cs_recv()/si_cs_send() already take + * Note: This test is only required because cs_conn_process is also the SI + * wake callback. Otherwise cs_conn_recv()/cs_conn_send() already take * care of it. */ if (cs->endp->flags & CS_EP_EOS && !(ic->flags & CF_SHUTR)) { @@ -290,8 +290,8 @@ int si_cs_process(struct conn_stream *cs) /* Report EOI on the channel if it was reached from the mux point of * view. * - * Note: This test is only required because si_cs_process is also the SI - * wake callback. Otherwise si_cs_recv()/si_cs_send() already take + * Note: This test is only required because cs_conn_process is also the SI + * wake callback. Otherwise cs_conn_recv()/cs_conn_send() already take * care of it. */ if ((cs->endp->flags & CS_EP_EOI) && !(ic->flags & CF_EOI)) @@ -302,7 +302,7 @@ int si_cs_process(struct conn_stream *cs) * stream-int status. */ cs_notify(cs); - stream_release_buffers(si_strm(si)); + stream_release_buffers(__cs_strm(cs)); return 0; } @@ -312,30 +312,29 @@ int si_cs_process(struct conn_stream *cs) * caller to commit polling changes. The caller should check conn->flags * for errors. */ -int si_cs_send(struct conn_stream *cs) +int cs_conn_send(struct conn_stream *cs) { struct connection *conn = __cs_conn(cs); - struct stream_interface *si = cs_si(cs); - struct stream *s = si_strm(si); - struct channel *oc = si_oc(si); + struct stream *s = __cs_strm(cs); + struct channel *oc = cs_oc(cs); int ret; int did_send = 0; - if (cs->endp->flags & (CS_EP_ERROR|CS_EP_ERR_PENDING) || si_is_conn_error(si)) { + if (cs->endp->flags & (CS_EP_ERROR|CS_EP_ERR_PENDING) || si_is_conn_error(cs->si)) { /* We're probably there because the tasklet was woken up, * but process_stream() ran before, detected there were an * error and put the si back to CS_ST_TAR. There's still * CO_FL_ERROR on the connection but we don't want to add * CS_EP_ERROR back, so give up */ - if (si->cs->state < CS_ST_CON) + if (cs->state < CS_ST_CON) return 0; cs->endp->flags |= CS_EP_ERROR; return 1; } /* We're already waiting to be able to send, give up */ - if (si->cs->wait_event.events & SUB_RETRY_SEND) + if (cs->wait_event.events & SUB_RETRY_SEND) return 0; /* we might have been called just after an asynchronous shutw */ @@ -383,7 +382,7 @@ int si_cs_send(struct conn_stream *cs) if ((!(oc->flags & (CF_NEVER_WAIT|CF_SEND_DONTWAIT)) && ((oc->to_forward && oc->to_forward != CHN_INFINITE_FORWARD) || (oc->flags & CF_EXPECT_MORE) || - (IS_HTX_STRM(si_strm(si)) && + (IS_HTX_STRM(s) && (!(oc->flags & (CF_EOI|CF_SHUTR)) && htx_expect_more(htxbuf(&oc->buf)))))) || ((oc->flags & CF_ISRESP) && ((oc->flags & (CF_AUTO_CLOSE|CF_SHUTW_NOW)) == (CF_AUTO_CLOSE|CF_SHUTW_NOW)))) @@ -437,10 +436,10 @@ int si_cs_send(struct conn_stream *cs) end: if (did_send) { oc->flags |= CF_WRITE_PARTIAL | CF_WROTE_DATA; - if (si->cs->state == CS_ST_CON) - si->cs->state = CS_ST_RDY; + if (cs->state == CS_ST_CON) + cs->state = CS_ST_RDY; - si_rx_room_rdy(si_opposite(si)); + si_rx_room_rdy(cs_opposite(cs)->si); } if (cs->endp->flags & (CS_EP_ERROR|CS_EP_ERR_PENDING)) { @@ -450,7 +449,7 @@ int si_cs_send(struct conn_stream *cs) /* We couldn't send all of our data, let the mux know we'd like to send more */ if (!channel_is_empty(oc)) - conn->mux->subscribe(cs, SUB_RETRY_SEND, &si->cs->wait_event); + conn->mux->subscribe(cs, SUB_RETRY_SEND, &cs->wait_event); return did_send; } @@ -468,11 +467,11 @@ struct task *cs_conn_io_cb(struct task *t, void *ctx, unsigned int state) return t; if (!(cs->wait_event.events & SUB_RETRY_SEND) && !channel_is_empty(cs_oc(cs))) - ret = si_cs_send(cs); + ret = cs_conn_send(cs); if (!(cs->wait_event.events & SUB_RETRY_RECV)) - ret |= si_cs_recv(cs); + ret |= cs_conn_recv(cs); if (ret != 0) - si_cs_process(cs); + cs_conn_process(cs); stream_release_buffers(__cs_strm(cs)); return t; @@ -499,7 +498,7 @@ int cs_conn_sync_recv(struct conn_stream *cs) if (!si_rx_endp_ready(cs->si) || si_rx_blocked(cs->si)) return 0; // already failed - return si_cs_recv(cs); + return cs_conn_recv(cs); } /* perform a synchronous send() for the stream interface. The CF_WRITE_NULL and @@ -524,7 +523,7 @@ void cs_conn_sync_send(struct conn_stream *cs) if (!cs_conn_mux(cs)) return; - si_cs_send(cs); + cs_conn_send(cs); } /* @@ -532,11 +531,10 @@ void cs_conn_sync_send(struct conn_stream *cs) * into the buffer from the connection. It iterates over the mux layer's * rcv_buf function. */ -int si_cs_recv(struct conn_stream *cs) +int cs_conn_recv(struct conn_stream *cs) { struct connection *conn = __cs_conn(cs); - struct stream_interface *si = cs_si(cs); - struct channel *ic = si_ic(si); + struct channel *ic = cs_ic(cs); int ret, max, cur_read = 0; int read_poll = MAX_READ_POLL_LOOPS; int flags = 0; @@ -545,10 +543,10 @@ int si_cs_recv(struct conn_stream *cs) if (cs->state != CS_ST_EST) return 0; - /* If another call to si_cs_recv() failed, and we subscribed to + /* If another call to cs_conn_recv() failed, and we subscribed to * recv events already, give up now. */ - if (si->cs->wait_event.events & SUB_RETRY_RECV) + if (cs->wait_event.events & SUB_RETRY_RECV) return 0; /* maybe we were called immediately after an asynchronous shutr */ @@ -636,7 +634,7 @@ int si_cs_recv(struct conn_stream *cs) /* the pipe is full or we have read enough data that it * could soon be full. Let's stop before needing to poll. */ - si_rx_room_blk(si); + si_rx_room_blk(cs->si); goto done_recv; } @@ -657,7 +655,7 @@ int si_cs_recv(struct conn_stream *cs) } /* now we'll need a input buffer for the stream */ - if (!si_alloc_ibuf(si, &(si_strm(si)->buffer_wait))) + if (!si_alloc_ibuf(cs->si, &(__cs_strm(cs)->buffer_wait))) goto end_recv; /* For an HTX stream, if the buffer is stuck (no output data with some @@ -669,15 +667,15 @@ int si_cs_recv(struct conn_stream *cs) * NOTE: A possible optim may be to let the mux decides if defrag is * required or not, depending on amount of data to be xferred. */ - if (IS_HTX_STRM(si_strm(si)) && !co_data(ic)) { + if (IS_HTX_STRM(__cs_strm(cs)) && !co_data(ic)) { struct htx *htx = htxbuf(&ic->buf); if (htx_is_not_empty(htx) && ((htx->flags & HTX_FL_FRAGMENTED) || htx_space_wraps(htx))) - htx_defrag(htxbuf(&ic->buf), NULL, 0); + htx_defrag(htx, NULL, 0); } /* Instruct the mux it must subscribed for read events */ - flags |= ((!conn_is_back(conn) && (si_strm(si)->be->options & PR_O_ABRT_CLOSE)) ? CO_RFL_KEEP_RECV : 0); + flags |= ((!conn_is_back(conn) && (__cs_strm(cs)->be->options & PR_O_ABRT_CLOSE)) ? CO_RFL_KEEP_RECV : 0); /* Important note : if we're called with POLL_IN|POLL_HUP, it means the read polling * was enabled, which implies that the recv buffer was not full. So we have a guarantee @@ -706,7 +704,7 @@ int si_cs_recv(struct conn_stream *cs) */ BUG_ON(c_empty(ic)); - si_rx_room_blk(si); + si_rx_room_blk(cs->si); /* Add READ_PARTIAL because some data are pending but * cannot be xferred to the channel */ @@ -720,7 +718,7 @@ int si_cs_recv(struct conn_stream *cs) * here to proceed. */ if (flags & CO_RFL_BUF_FLUSH) - si_rx_room_blk(si); + si_rx_room_blk(cs->si); break; } @@ -750,7 +748,7 @@ int si_cs_recv(struct conn_stream *cs) if ((ic->flags & CF_READ_DONTWAIT) || --read_poll <= 0) { /* we're stopped by the channel's policy */ - si_rx_chan_blk(si); + si_rx_chan_blk(cs->si); break; } @@ -765,7 +763,7 @@ int si_cs_recv(struct conn_stream *cs) */ if (ic->flags & CF_STREAMER) { /* we're stopped by the channel's policy */ - si_rx_chan_blk(si); + si_rx_chan_blk(cs->si); break; } @@ -774,7 +772,7 @@ int si_cs_recv(struct conn_stream *cs) */ if (ret >= global.tune.recv_enough) { /* we're stopped by the channel's policy */ - si_rx_chan_blk(si); + si_rx_chan_blk(cs->si); break; } } @@ -782,7 +780,7 @@ int si_cs_recv(struct conn_stream *cs) /* if we are waiting for more space, don't try to read more data * right now. */ - if (si_rx_blocked(si)) + if (si_rx_blocked(cs->si)) break; } /* while !flags */ @@ -846,12 +844,12 @@ int si_cs_recv(struct conn_stream *cs) cs_conn_read0(cs); ret = 1; } - else if (!si_rx_blocked(si)) { + else if (!si_rx_blocked(cs->si)) { /* Subscribe to receive events if we're blocking on I/O */ - conn->mux->subscribe(cs, SUB_RETRY_RECV, &si->cs->wait_event); - si_rx_endp_done(si); + conn->mux->subscribe(cs, SUB_RETRY_RECV, &cs->wait_event); + si_rx_endp_done(cs->si); } else { - si_rx_endp_more(si); + si_rx_endp_more(cs->si); ret = 1; } return ret;