]> git.ipfire.org Git - thirdparty/haproxy.git/commitdiff
MINOR: stream-int-conn-stream: Move si_update_* in conn-stream scope
authorChristopher Faulet <cfaulet@haproxy.com>
Fri, 1 Apr 2022 12:23:38 +0000 (14:23 +0200)
committerChristopher Faulet <cfaulet@haproxy.com>
Wed, 13 Apr 2022 13:10:15 +0000 (15:10 +0200)
si_update_rx(), si_update_tx() and si_update() are renamed cs_update_rx(),
cs_upate_tx() and cs_update() and updated to manipulate a conn-stream
instead of a stream-interface.

include/haproxy/cs_utils.h
include/haproxy/stream_interface-t.h
include/haproxy/stream_interface.h
src/conn_stream.c
src/hlua.c
src/stream.c
src/stream_interface.c

index a3a44a9aa3546b6ea0a345f94352c311bc6ae992..a9ed6a31f358f96a4351c4ef384713f94f603212 100644 (file)
 #include <haproxy/stream.h>
 #include <haproxy/stream_interface.h>
 
+void cs_update_rx(struct conn_stream *cs);
+void cs_update_tx(struct conn_stream *cs);
+void cs_update_both(struct conn_stream *csf, struct conn_stream *csb);
+
 /* returns the channel which receives data from this conn-stream (input channel) */
 static inline struct channel *cs_ic(struct conn_stream *cs)
 {
@@ -268,6 +272,13 @@ static inline void cs_chk_snd(struct conn_stream *cs)
        cs->ops->chk_snd(cs);
 }
 
+/* Combines both cs_update_rx() and cs_update_tx() at once */
+static inline void cs_update(struct conn_stream *cs)
+{
+       cs_update_rx(cs);
+       cs_update_tx(cs);
+}
+
 /* for debugging, reports the stream interface state name */
 static inline const char *cs_state_str(int state)
 {
index ea02d6c17b0819623496a55e298eca46f17d16e1..7d5e50c372b0171dc7d8ff70840f89c75e2c799f 100644 (file)
@@ -55,7 +55,7 @@ enum {
 
 /* Note that if an applet is registered, the update function will not be called
  * by the session handler, so it may be used to resync flags at the end of the
- * applet handler. See si_update() for reference.
+ * applet handler.
  */
 struct stream_interface {
        /* struct members used by the "buffer" side */
index 3875385c8d2a26332109130a25824cde3a62d612..45a8bacd3f8963ca38def10ef58602ddd848bfc3 100644 (file)
@@ -37,10 +37,7 @@ void si_free(struct stream_interface *si);
 
 /* main event functions used to move data between sockets and buffers */
 void si_applet_wake_cb(struct stream_interface *si);
-void si_update_rx(struct stream_interface *si);
-void si_update_tx(struct stream_interface *si);
 struct task *si_cs_io_cb(struct task *t, void *ctx, unsigned int state);
-void si_update_both(struct stream_interface *si_f, struct stream_interface *si_b);
 int si_sync_recv(struct stream_interface *si);
 void si_sync_send(struct stream_interface *si);
 
@@ -263,13 +260,6 @@ static inline int si_alloc_ibuf(struct stream_interface *si, struct buffer_wait
        return ret;
 }
 
-/* Combines both si_update_rx() and si_update_tx() at once */
-static inline void si_update(struct stream_interface *si)
-{
-       si_update_rx(si);
-       si_update_tx(si);
-}
-
 /* The stream interface is only responsible for the connection during the early
  * states, before plugging a mux. Thus it should only care about CO_FL_ERROR
  * before CS_ST_EST, and after that it must absolutely ignore it since the mux
index 736cfb2da53b0fd256b1935e81f53cd38abc81f8..b3738f6dfb650fd10ed20c65567f60237aba1f29 100644 (file)
@@ -940,3 +940,134 @@ static void cs_app_chk_snd_applet(struct conn_stream *cs)
                appctx_wakeup(__cs_appctx(cs));
        }
 }
+
+
+/* This function is designed to be called from within the stream handler to
+ * update the input channel's expiration timer and the conn-stream's
+ * Rx flags based on the channel's flags. It needs to be called only once
+ * after the channel's flags have settled down, and before they are cleared,
+ * though it doesn't harm to call it as often as desired (it just slightly
+ * hurts performance). It must not be called from outside of the stream
+ * handler, as what it does will be used to compute the stream task's
+ * expiration.
+ */
+void cs_update_rx(struct conn_stream *cs)
+{
+       struct channel *ic = cs_ic(cs);
+
+       if (ic->flags & CF_SHUTR) {
+               si_rx_shut_blk(cs->si);
+               return;
+       }
+
+       /* Read not closed, update FD status and timeout for reads */
+       if (ic->flags & CF_DONT_READ)
+               si_rx_chan_blk(cs->si);
+       else
+               si_rx_chan_rdy(cs->si);
+
+       if (!channel_is_empty(ic) || !channel_may_recv(ic)) {
+               /* stop reading, imposed by channel's policy or contents */
+               si_rx_room_blk(cs->si);
+       }
+       else {
+               /* (re)start reading and update timeout. Note: we don't recompute the timeout
+                * every time we get here, otherwise it would risk never to expire. We only
+                * update it if is was not yet set. The stream socket handler will already
+                * have updated it if there has been a completed I/O.
+                */
+               si_rx_room_rdy(cs->si);
+       }
+       if (cs->si->flags & SI_FL_RXBLK_ANY & ~SI_FL_RX_WAIT_EP)
+               ic->rex = TICK_ETERNITY;
+       else if (!(ic->flags & CF_READ_NOEXP) && !tick_isset(ic->rex))
+               ic->rex = tick_add_ifset(now_ms, ic->rto);
+
+       cs_chk_rcv(cs);
+}
+
+/* This function is designed to be called from within the stream handler to
+ * update the output channel's expiration timer and the conn-stream's
+ * Tx flags based on the channel's flags. It needs to be called only once
+ * after the channel's flags have settled down, and before they are cleared,
+ * though it doesn't harm to call it as often as desired (it just slightly
+ * hurts performance). It must not be called from outside of the stream
+ * handler, as what it does will be used to compute the stream task's
+ * expiration.
+ */
+void cs_update_tx(struct conn_stream *cs)
+{
+       struct channel *oc = cs_oc(cs);
+       struct channel *ic = cs_ic(cs);
+
+       if (oc->flags & CF_SHUTW)
+               return;
+
+       /* Write not closed, update FD status and timeout for writes */
+       if (channel_is_empty(oc)) {
+               /* stop writing */
+               if (!(cs->si->flags & SI_FL_WAIT_DATA)) {
+                       if ((oc->flags & CF_SHUTW_NOW) == 0)
+                               cs->si->flags |= SI_FL_WAIT_DATA;
+                       oc->wex = TICK_ETERNITY;
+               }
+               return;
+       }
+
+       /* (re)start writing and update timeout. Note: we don't recompute the timeout
+        * every time we get here, otherwise it would risk never to expire. We only
+        * update it if is was not yet set. The stream socket handler will already
+        * have updated it if there has been a completed I/O.
+        */
+       cs->si->flags &= ~SI_FL_WAIT_DATA;
+       if (!tick_isset(oc->wex)) {
+               oc->wex = tick_add_ifset(now_ms, oc->wto);
+               if (tick_isset(ic->rex) && !(cs->flags & CS_FL_INDEP_STR)) {
+                       /* Note: depending on the protocol, we don't know if we're waiting
+                        * for incoming data or not. So in order to prevent the socket from
+                        * expiring read timeouts during writes, we refresh the read timeout,
+                        * except if it was already infinite or if we have explicitly setup
+                        * independent streams.
+                        */
+                       ic->rex = tick_add_ifset(now_ms, ic->rto);
+               }
+       }
+}
+
+/* Updates at once the channel flags, and timers of both conn-streams of a
+ * same stream, to complete the work after the analysers, then updates the data
+ * layer below. This will ensure that any synchronous update performed at the
+ * data layer will be reflected in the channel flags and/or conn-stream.
+ * Note that this does not change the conn-stream's current state, though
+ * it updates the previous state to the current one.
+ */
+void cs_update_both(struct conn_stream *csf, struct conn_stream *csb)
+{
+       struct channel *req = cs_ic(csf);
+       struct channel *res = cs_oc(csf);
+
+       req->flags &= ~(CF_READ_NULL|CF_READ_PARTIAL|CF_READ_ATTACHED|CF_WRITE_NULL|CF_WRITE_PARTIAL);
+       res->flags &= ~(CF_READ_NULL|CF_READ_PARTIAL|CF_READ_ATTACHED|CF_WRITE_NULL|CF_WRITE_PARTIAL);
+
+       __cs_strm(csb)->prev_conn_state = csb->state;
+
+       /* let's recompute both sides states */
+       if (cs_state_in(csf->state, CS_SB_RDY|CS_SB_EST))
+               cs_update(csf);
+
+       if (cs_state_in(csb->state, CS_SB_RDY|CS_SB_EST))
+               cs_update(csb);
+
+       /* stream ints are processed outside of process_stream() and must be
+        * handled at the latest moment.
+        */
+       if (cs_appctx(csf) &&
+           ((si_rx_endp_ready(csf->si) && !si_rx_blocked(csf->si)) ||
+            (si_tx_endp_ready(csf->si) && !si_tx_blocked(csf->si))))
+               appctx_wakeup(__cs_appctx(csf));
+
+       if (cs_appctx(csb) &&
+           ((si_rx_endp_ready(csb->si) && !si_rx_blocked(csb->si)) ||
+            (si_tx_endp_ready(csb->si) && !si_tx_blocked(csb->si))))
+               appctx_wakeup(__cs_appctx(csb));
+}
index 449452576fbba55c263e617a27b89e9d05bc033c..fa0a211df11eb05fc8c7ae3e345b04292b3117ea 100644 (file)
@@ -1953,7 +1953,7 @@ static void hlua_socket_handler(struct appctx *appctx)
         * interface.
         */
        if (!channel_is_empty(cs_ic(cs)))
-               si_update(cs->si);
+               cs_update(cs);
 
        /* If write notifications are registered, we considers we want
         * to write, so we clear the blocking flag.
index 44ed445ef5179aa77a38eb87ff45206b84c78bc3..30c347bb099ae286ea9296ae1c2c739d8e892954 100644 (file)
@@ -2455,7 +2455,7 @@ struct task *process_stream(struct task *t, void *context, unsigned int state)
                if ((sess->fe->options & PR_O_CONTSTATS) && (s->flags & SF_BE_ASSIGNED) && !(s->flags & SF_IGNORE))
                        stream_process_counters(s);
 
-               si_update_both(si_f, si_b);
+               cs_update_both(s->csf, s->csb);
 
                /* Trick: if a request is being waiting for the server to respond,
                 * and if we know the server can timeout, we don't want the timeout
index e252cd5d29816f5dbdd172b0d73d7145e6a702b8..0f060ec05be8f022eeb4caada99f8f7604a038a0 100644 (file)
@@ -77,14 +77,14 @@ void si_free(struct stream_interface *si)
        pool_free(pool_head_streaminterface, si);
 }
 
-/* This function is the equivalent to si_update() except that it's
+/* This function is the equivalent to cs_update() except that it's
  * designed to be called from outside the stream handlers, typically the lower
  * layers (applets, connections) after I/O completion. After updating the stream
  * interface and timeouts, it will try to forward what can be forwarded, then to
  * wake the associated task up if an important event requires special handling.
  * It may update SI_FL_WAIT_DATA and/or SI_FL_RXBLK_ROOM, that the callers are
  * encouraged to watch to take appropriate action.
- * It should not be called from within the stream itself, si_update()
+ * It should not be called from within the stream itself, cs_update()
  * is designed for this.
  */
 static void stream_int_notify(struct stream_interface *si)
@@ -474,98 +474,6 @@ struct task *si_cs_io_cb(struct task *t, void *ctx, unsigned int state)
        return t;
 }
 
-/* This function is designed to be called from within the stream handler to
- * update the input channel's expiration timer and the stream interface's
- * Rx flags based on the channel's flags. It needs to be called only once
- * after the channel's flags have settled down, and before they are cleared,
- * though it doesn't harm to call it as often as desired (it just slightly
- * hurts performance). It must not be called from outside of the stream
- * handler, as what it does will be used to compute the stream task's
- * expiration.
- */
-void si_update_rx(struct stream_interface *si)
-{
-       struct channel *ic = si_ic(si);
-
-       if (ic->flags & CF_SHUTR) {
-               si_rx_shut_blk(si);
-               return;
-       }
-
-       /* Read not closed, update FD status and timeout for reads */
-       if (ic->flags & CF_DONT_READ)
-               si_rx_chan_blk(si);
-       else
-               si_rx_chan_rdy(si);
-
-       if (!channel_is_empty(ic) || !channel_may_recv(ic)) {
-               /* stop reading, imposed by channel's policy or contents */
-               si_rx_room_blk(si);
-       }
-       else {
-               /* (re)start reading and update timeout. Note: we don't recompute the timeout
-                * every time we get here, otherwise it would risk never to expire. We only
-                * update it if is was not yet set. The stream socket handler will already
-                * have updated it if there has been a completed I/O.
-                */
-               si_rx_room_rdy(si);
-       }
-       if (si->flags & SI_FL_RXBLK_ANY & ~SI_FL_RX_WAIT_EP)
-               ic->rex = TICK_ETERNITY;
-       else if (!(ic->flags & CF_READ_NOEXP) && !tick_isset(ic->rex))
-               ic->rex = tick_add_ifset(now_ms, ic->rto);
-
-       cs_chk_rcv(si->cs);
-}
-
-/* This function is designed to be called from within the stream handler to
- * update the output channel's expiration timer and the stream interface's
- * Tx flags based on the channel's flags. It needs to be called only once
- * after the channel's flags have settled down, and before they are cleared,
- * though it doesn't harm to call it as often as desired (it just slightly
- * hurts performance). It must not be called from outside of the stream
- * handler, as what it does will be used to compute the stream task's
- * expiration.
- */
-void si_update_tx(struct stream_interface *si)
-{
-       struct channel *oc = si_oc(si);
-       struct channel *ic = si_ic(si);
-
-       if (oc->flags & CF_SHUTW)
-               return;
-
-       /* Write not closed, update FD status and timeout for writes */
-       if (channel_is_empty(oc)) {
-               /* stop writing */
-               if (!(si->flags & SI_FL_WAIT_DATA)) {
-                       if ((oc->flags & CF_SHUTW_NOW) == 0)
-                               si->flags |= SI_FL_WAIT_DATA;
-                       oc->wex = TICK_ETERNITY;
-               }
-               return;
-       }
-
-       /* (re)start writing and update timeout. Note: we don't recompute the timeout
-        * every time we get here, otherwise it would risk never to expire. We only
-        * update it if is was not yet set. The stream socket handler will already
-        * have updated it if there has been a completed I/O.
-        */
-       si->flags &= ~SI_FL_WAIT_DATA;
-       if (!tick_isset(oc->wex)) {
-               oc->wex = tick_add_ifset(now_ms, oc->wto);
-               if (tick_isset(ic->rex) && !(si->cs->flags & CS_FL_INDEP_STR)) {
-                       /* Note: depending on the protocol, we don't know if we're waiting
-                        * for incoming data or not. So in order to prevent the socket from
-                        * expiring read timeouts during writes, we refresh the read timeout,
-                        * except if it was already infinite or if we have explicitly setup
-                        * independent streams.
-                        */
-                       ic->rex = tick_add_ifset(now_ms, ic->rto);
-               }
-       }
-}
-
 /* This tries to perform a synchronous receive on the stream interface to
  * try to collect last arrived data. In practice it's only implemented on
  * conn_streams. Returns 0 if nothing was done, non-zero if new data or a
@@ -615,44 +523,6 @@ void si_sync_send(struct stream_interface *si)
        si_cs_send(si->cs);
 }
 
-/* Updates at once the channel flags, and timers of both stream interfaces of a
- * same stream, to complete the work after the analysers, then updates the data
- * layer below. This will ensure that any synchronous update performed at the
- * data layer will be reflected in the channel flags and/or stream-interface.
- * Note that this does not change the stream interface's current state, though
- * it updates the previous state to the current one.
- */
-void si_update_both(struct stream_interface *si_f, struct stream_interface *si_b)
-{
-       struct channel *req = si_ic(si_f);
-       struct channel *res = si_oc(si_f);
-
-       req->flags &= ~(CF_READ_NULL|CF_READ_PARTIAL|CF_READ_ATTACHED|CF_WRITE_NULL|CF_WRITE_PARTIAL);
-       res->flags &= ~(CF_READ_NULL|CF_READ_PARTIAL|CF_READ_ATTACHED|CF_WRITE_NULL|CF_WRITE_PARTIAL);
-
-       si_strm(si_b)->prev_conn_state = si_b->cs->state;
-
-       /* let's recompute both sides states */
-       if (cs_state_in(si_f->cs->state, CS_SB_RDY|CS_SB_EST))
-               si_update(si_f);
-
-       if (cs_state_in(si_b->cs->state, CS_SB_RDY|CS_SB_EST))
-               si_update(si_b);
-
-       /* stream ints are processed outside of process_stream() and must be
-        * handled at the latest moment.
-        */
-       if (cs_appctx(si_f->cs) &&
-           ((si_rx_endp_ready(si_f) && !si_rx_blocked(si_f)) ||
-            (si_tx_endp_ready(si_f) && !si_tx_blocked(si_f))))
-               appctx_wakeup(__cs_appctx(si_f->cs));
-
-       if (cs_appctx(si_b->cs) &&
-           ((si_rx_endp_ready(si_b) && !si_rx_blocked(si_b)) ||
-            (si_tx_endp_ready(si_b) && !si_tx_blocked(si_b))))
-               appctx_wakeup(__cs_appctx(si_b->cs));
-}
-
 /*
  * This is the callback which is called by the connection layer to receive data
  * into the buffer from the connection. It iterates over the mux layer's