]> git.ipfire.org Git - thirdparty/haproxy.git/commitdiff
MINOR: conn-stream: Move si_conn_cb in the conn-stream scope
authorChristopher Faulet <cfaulet@haproxy.com>
Fri, 1 Apr 2022 15:06:32 +0000 (17:06 +0200)
committerChristopher Faulet <cfaulet@haproxy.com>
Wed, 13 Apr 2022 13:10:15 +0000 (15:10 +0200)
si_conn_cb variable is renamed cs_data_conn_cb. In addtion, its associated
functions are also renamed. si_cs_recv(), si_cs_send() and si_cs_process() are
renamed cs_conn_recv(), cs_conn_send and cs_conn_process(). These functions are
updated to manipulate conn-streams instead of stream-interfaces.

include/haproxy/stream_interface.h
src/conn_stream.c
src/stream_interface.c

index 63e55ff26b29cba51b97517d9f12e892cc0ba21f..8f9c98fd194df485c6de554463b4229557fdee29 100644 (file)
@@ -29,7 +29,7 @@
 #include <haproxy/conn_stream.h>
 #include <haproxy/obj_type.h>
 
-extern struct data_cb si_conn_cb;
+extern struct data_cb cs_data_conn_cb;
 extern struct data_cb cs_data_applet_cb;
 extern struct data_cb check_conn_cb;
 
@@ -45,9 +45,9 @@ void cs_conn_sync_send(struct conn_stream *cs);
 /* Functions used to communicate with a conn_stream. The first two may be used
  * directly, the last one is mostly a wake callback.
  */
-int si_cs_recv(struct conn_stream *cs);
-int si_cs_send(struct conn_stream *cs);
-int si_cs_process(struct conn_stream *cs);
+int cs_conn_recv(struct conn_stream *cs);
+int cs_conn_send(struct conn_stream *cs);
+int cs_conn_process(struct conn_stream *cs);
 
 /* returns the channel which receives data from this stream interface (input channel) */
 static inline struct channel *si_ic(struct stream_interface *si)
index 7f72e1c616c0de40b5e7f554eee073d5628eac9f..0a2edf12d1b33bf07fec0ea7792b1934d3cc61af 100644 (file)
@@ -233,7 +233,7 @@ int cs_attach_mux(struct conn_stream *cs, void *target, void *ctx)
                }
 
                cs->ops = &cs_app_conn_ops;
-               cs->data_cb = &si_conn_cb;
+               cs->data_cb = &cs_data_conn_cb;
        }
        else if (cs_check(cs))
                cs->data_cb = &check_conn_cb;
@@ -278,7 +278,7 @@ int cs_attach_strm(struct conn_stream *cs, struct stream *strm)
                cs->wait_event.events = 0;
 
                cs->ops = &cs_app_conn_ops;
-               cs->data_cb = &si_conn_cb;
+               cs->data_cb = &cs_data_conn_cb;
        }
        else if (cs->endp->flags & CS_EP_T_APPLET) {
                cs->ops = &cs_app_applet_ops;
@@ -728,7 +728,7 @@ static void cs_app_chk_snd_conn(struct conn_stream *cs)
                return;
 
        if (!(cs->wait_event.events & SUB_RETRY_SEND) && !channel_is_empty(cs_oc(cs)))
-               si_cs_send(cs);
+               cs_conn_send(cs);
 
        if (cs->endp->flags & (CS_EP_ERROR|CS_EP_ERR_PENDING) || si_is_conn_error(cs->si)) {
                /* Write error on the file descriptor */
index 32b0d291909a571608cda3cc2d17d066b176fa4e..11cd3395b2d2920698325463052ed1f421089f10 100644 (file)
@@ -47,17 +47,18 @@ static void cs_conn_read0(struct conn_stream *cs);
 /* post-IO notification callback */
 static void cs_notify(struct conn_stream *cs);
 
-struct data_cb si_conn_cb = {
-       .wake    = si_cs_process,
+
+struct data_cb cs_data_conn_cb = {
+       .wake    = cs_conn_process,
        .name    = "STRM",
 };
 
-
 struct data_cb cs_data_applet_cb = {
        .wake    = cs_applet_process,
        .name    = "STRM",
 };
 
+
 struct stream_interface *si_new(struct conn_stream *cs)
 {
        struct stream_interface *si;
@@ -222,18 +223,17 @@ static void cs_notify(struct conn_stream *cs)
  * connection's polling based on the channels and stream interface's final
  * states. The function always returns 0.
  */
-int si_cs_process(struct conn_stream *cs)
+int cs_conn_process(struct conn_stream *cs)
 {
        struct connection *conn = __cs_conn(cs);
-       struct stream_interface *si = cs_si(cs);
-       struct channel *ic = si_ic(si);
-       struct channel *oc = si_oc(si);
+       struct channel *ic = cs_ic(cs);
+       struct channel *oc = cs_oc(cs);
 
        BUG_ON(!conn);
 
        /* If we have data to send, try it now */
-       if (!channel_is_empty(oc) && !(si->cs->wait_event.events & SUB_RETRY_SEND))
-               si_cs_send(cs);
+       if (!channel_is_empty(oc) && !(cs->wait_event.events & SUB_RETRY_SEND))
+               cs_conn_send(cs);
 
        /* First step, report to the conn-stream what was detected at the
         * connection layer : errors and connection establishment.
@@ -243,13 +243,13 @@ int si_cs_process(struct conn_stream *cs)
         * to retry to connect, the connection may still have CO_FL_ERROR,
         * and we don't want to add CS_EP_ERROR back
         *
-        * Note: This test is only required because si_cs_process is also the SI
-        *       wake callback. Otherwise si_cs_recv()/si_cs_send() already take
+        * Note: This test is only required because cs_conn_process is also the SI
+        *       wake callback. Otherwise cs_conn_recv()/cs_conn_send() already take
         *       care of it.
         */
 
-       if (si->cs->state >= CS_ST_CON) {
-               if (si_is_conn_error(si))
+       if (cs->state >= CS_ST_CON) {
+               if (si_is_conn_error(cs->si))
                        cs->endp->flags |= CS_EP_ERROR;
        }
 
@@ -261,22 +261,22 @@ int si_cs_process(struct conn_stream *cs)
        if (!(conn->flags & (CO_FL_WAIT_XPRT | CO_FL_EARLY_SSL_HS)) &&
            (cs->endp->flags & CS_EP_WAIT_FOR_HS)) {
                cs->endp->flags &= ~CS_EP_WAIT_FOR_HS;
-               task_wakeup(si_task(si), TASK_WOKEN_MSG);
+               task_wakeup(cs_strm_task(cs), TASK_WOKEN_MSG);
        }
 
-       if (!cs_state_in(si->cs->state, CS_SB_EST|CS_SB_DIS|CS_SB_CLO) &&
+       if (!cs_state_in(cs->state, CS_SB_EST|CS_SB_DIS|CS_SB_CLO) &&
            (conn->flags & CO_FL_WAIT_XPRT) == 0) {
                __cs_strm(cs)->conn_exp = TICK_ETERNITY;
                oc->flags |= CF_WRITE_NULL;
-               if (si->cs->state == CS_ST_CON)
-                       si->cs->state = CS_ST_RDY;
+               if (cs->state == CS_ST_CON)
+                       cs->state = CS_ST_RDY;
        }
 
        /* Report EOS on the channel if it was reached from the mux point of
         * view.
         *
-        * Note: This test is only required because si_cs_process is also the SI
-        *       wake callback. Otherwise si_cs_recv()/si_cs_send() already take
+        * Note: This test is only required because cs_conn_process is also the SI
+        *       wake callback. Otherwise cs_conn_recv()/cs_conn_send() already take
         *       care of it.
         */
        if (cs->endp->flags & CS_EP_EOS && !(ic->flags & CF_SHUTR)) {
@@ -290,8 +290,8 @@ int si_cs_process(struct conn_stream *cs)
        /* Report EOI on the channel if it was reached from the mux point of
         * view.
         *
-        * Note: This test is only required because si_cs_process is also the SI
-        *       wake callback. Otherwise si_cs_recv()/si_cs_send() already take
+        * Note: This test is only required because cs_conn_process is also the SI
+        *       wake callback. Otherwise cs_conn_recv()/cs_conn_send() already take
         *       care of it.
         */
        if ((cs->endp->flags & CS_EP_EOI) && !(ic->flags & CF_EOI))
@@ -302,7 +302,7 @@ int si_cs_process(struct conn_stream *cs)
         * stream-int status.
         */
        cs_notify(cs);
-       stream_release_buffers(si_strm(si));
+       stream_release_buffers(__cs_strm(cs));
        return 0;
 }
 
@@ -312,30 +312,29 @@ int si_cs_process(struct conn_stream *cs)
  * caller to commit polling changes. The caller should check conn->flags
  * for errors.
  */
-int si_cs_send(struct conn_stream *cs)
+int cs_conn_send(struct conn_stream *cs)
 {
        struct connection *conn = __cs_conn(cs);
-       struct stream_interface *si = cs_si(cs);
-       struct stream *s = si_strm(si);
-       struct channel *oc = si_oc(si);
+       struct stream *s = __cs_strm(cs);
+       struct channel *oc = cs_oc(cs);
        int ret;
        int did_send = 0;
 
-       if (cs->endp->flags & (CS_EP_ERROR|CS_EP_ERR_PENDING) || si_is_conn_error(si)) {
+       if (cs->endp->flags & (CS_EP_ERROR|CS_EP_ERR_PENDING) || si_is_conn_error(cs->si)) {
                /* We're probably there because the tasklet was woken up,
                 * but process_stream() ran before, detected there were an
                 * error and put the si back to CS_ST_TAR. There's still
                 * CO_FL_ERROR on the connection but we don't want to add
                 * CS_EP_ERROR back, so give up
                 */
-               if (si->cs->state < CS_ST_CON)
+               if (cs->state < CS_ST_CON)
                        return 0;
                cs->endp->flags |= CS_EP_ERROR;
                return 1;
        }
 
        /* We're already waiting to be able to send, give up */
-       if (si->cs->wait_event.events & SUB_RETRY_SEND)
+       if (cs->wait_event.events & SUB_RETRY_SEND)
                return 0;
 
        /* we might have been called just after an asynchronous shutw */
@@ -383,7 +382,7 @@ int si_cs_send(struct conn_stream *cs)
                if ((!(oc->flags & (CF_NEVER_WAIT|CF_SEND_DONTWAIT)) &&
                     ((oc->to_forward && oc->to_forward != CHN_INFINITE_FORWARD) ||
                      (oc->flags & CF_EXPECT_MORE) ||
-                     (IS_HTX_STRM(si_strm(si)) &&
+                     (IS_HTX_STRM(s) &&
                       (!(oc->flags & (CF_EOI|CF_SHUTR)) && htx_expect_more(htxbuf(&oc->buf)))))) ||
                    ((oc->flags & CF_ISRESP) &&
                     ((oc->flags & (CF_AUTO_CLOSE|CF_SHUTW_NOW)) == (CF_AUTO_CLOSE|CF_SHUTW_NOW))))
@@ -437,10 +436,10 @@ int si_cs_send(struct conn_stream *cs)
  end:
        if (did_send) {
                oc->flags |= CF_WRITE_PARTIAL | CF_WROTE_DATA;
-               if (si->cs->state == CS_ST_CON)
-                       si->cs->state = CS_ST_RDY;
+               if (cs->state == CS_ST_CON)
+                       cs->state = CS_ST_RDY;
 
-               si_rx_room_rdy(si_opposite(si));
+               si_rx_room_rdy(cs_opposite(cs)->si);
        }
 
        if (cs->endp->flags & (CS_EP_ERROR|CS_EP_ERR_PENDING)) {
@@ -450,7 +449,7 @@ int si_cs_send(struct conn_stream *cs)
 
        /* We couldn't send all of our data, let the mux know we'd like to send more */
        if (!channel_is_empty(oc))
-               conn->mux->subscribe(cs, SUB_RETRY_SEND, &si->cs->wait_event);
+               conn->mux->subscribe(cs, SUB_RETRY_SEND, &cs->wait_event);
        return did_send;
 }
 
@@ -468,11 +467,11 @@ struct task *cs_conn_io_cb(struct task *t, void *ctx, unsigned int state)
                return t;
 
        if (!(cs->wait_event.events & SUB_RETRY_SEND) && !channel_is_empty(cs_oc(cs)))
-               ret = si_cs_send(cs);
+               ret = cs_conn_send(cs);
        if (!(cs->wait_event.events & SUB_RETRY_RECV))
-               ret |= si_cs_recv(cs);
+               ret |= cs_conn_recv(cs);
        if (ret != 0)
-               si_cs_process(cs);
+               cs_conn_process(cs);
 
        stream_release_buffers(__cs_strm(cs));
        return t;
@@ -499,7 +498,7 @@ int cs_conn_sync_recv(struct conn_stream *cs)
        if (!si_rx_endp_ready(cs->si) || si_rx_blocked(cs->si))
                return 0; // already failed
 
-       return si_cs_recv(cs);
+       return cs_conn_recv(cs);
 }
 
 /* perform a synchronous send() for the stream interface. The CF_WRITE_NULL and
@@ -524,7 +523,7 @@ void cs_conn_sync_send(struct conn_stream *cs)
        if (!cs_conn_mux(cs))
                return;
 
-       si_cs_send(cs);
+       cs_conn_send(cs);
 }
 
 /*
@@ -532,11 +531,10 @@ void cs_conn_sync_send(struct conn_stream *cs)
  * into the buffer from the connection. It iterates over the mux layer's
  * rcv_buf function.
  */
-int si_cs_recv(struct conn_stream *cs)
+int cs_conn_recv(struct conn_stream *cs)
 {
        struct connection *conn = __cs_conn(cs);
-       struct stream_interface *si = cs_si(cs);
-       struct channel *ic = si_ic(si);
+       struct channel *ic = cs_ic(cs);
        int ret, max, cur_read = 0;
        int read_poll = MAX_READ_POLL_LOOPS;
        int flags = 0;
@@ -545,10 +543,10 @@ int si_cs_recv(struct conn_stream *cs)
        if (cs->state != CS_ST_EST)
                return 0;
 
-       /* If another call to si_cs_recv() failed, and we subscribed to
+       /* If another call to cs_conn_recv() failed, and we subscribed to
         * recv events already, give up now.
         */
-       if (si->cs->wait_event.events & SUB_RETRY_RECV)
+       if (cs->wait_event.events & SUB_RETRY_RECV)
                return 0;
 
        /* maybe we were called immediately after an asynchronous shutr */
@@ -636,7 +634,7 @@ int si_cs_recv(struct conn_stream *cs)
                        /* the pipe is full or we have read enough data that it
                         * could soon be full. Let's stop before needing to poll.
                         */
-                       si_rx_room_blk(si);
+                       si_rx_room_blk(cs->si);
                        goto done_recv;
                }
 
@@ -657,7 +655,7 @@ int si_cs_recv(struct conn_stream *cs)
        }
 
        /* now we'll need a input buffer for the stream */
-       if (!si_alloc_ibuf(si, &(si_strm(si)->buffer_wait)))
+       if (!si_alloc_ibuf(cs->si, &(__cs_strm(cs)->buffer_wait)))
                goto end_recv;
 
        /* For an HTX stream, if the buffer is stuck (no output data with some
@@ -669,15 +667,15 @@ int si_cs_recv(struct conn_stream *cs)
         * NOTE: A possible optim may be to let the mux decides if defrag is
         *       required or not, depending on amount of data to be xferred.
         */
-       if (IS_HTX_STRM(si_strm(si)) && !co_data(ic)) {
+       if (IS_HTX_STRM(__cs_strm(cs)) && !co_data(ic)) {
                struct htx *htx = htxbuf(&ic->buf);
 
                if (htx_is_not_empty(htx) && ((htx->flags & HTX_FL_FRAGMENTED) || htx_space_wraps(htx)))
-                       htx_defrag(htxbuf(&ic->buf), NULL, 0);
+                       htx_defrag(htx, NULL, 0);
        }
 
        /* Instruct the mux it must subscribed for read events */
-       flags |= ((!conn_is_back(conn) && (si_strm(si)->be->options & PR_O_ABRT_CLOSE)) ? CO_RFL_KEEP_RECV : 0);
+       flags |= ((!conn_is_back(conn) && (__cs_strm(cs)->be->options & PR_O_ABRT_CLOSE)) ? CO_RFL_KEEP_RECV : 0);
 
        /* Important note : if we're called with POLL_IN|POLL_HUP, it means the read polling
         * was enabled, which implies that the recv buffer was not full. So we have a guarantee
@@ -706,7 +704,7 @@ int si_cs_recv(struct conn_stream *cs)
                         */
                        BUG_ON(c_empty(ic));
 
-                       si_rx_room_blk(si);
+                       si_rx_room_blk(cs->si);
                        /* Add READ_PARTIAL because some data are pending but
                         * cannot be xferred to the channel
                         */
@@ -720,7 +718,7 @@ int si_cs_recv(struct conn_stream *cs)
                         * here to proceed.
                         */
                        if (flags & CO_RFL_BUF_FLUSH)
-                               si_rx_room_blk(si);
+                               si_rx_room_blk(cs->si);
                        break;
                }
 
@@ -750,7 +748,7 @@ int si_cs_recv(struct conn_stream *cs)
 
                if ((ic->flags & CF_READ_DONTWAIT) || --read_poll <= 0) {
                        /* we're stopped by the channel's policy */
-                       si_rx_chan_blk(si);
+                       si_rx_chan_blk(cs->si);
                        break;
                }
 
@@ -765,7 +763,7 @@ int si_cs_recv(struct conn_stream *cs)
                         */
                        if (ic->flags & CF_STREAMER) {
                                /* we're stopped by the channel's policy */
-                               si_rx_chan_blk(si);
+                               si_rx_chan_blk(cs->si);
                                break;
                        }
 
@@ -774,7 +772,7 @@ int si_cs_recv(struct conn_stream *cs)
                         */
                        if (ret >= global.tune.recv_enough) {
                                /* we're stopped by the channel's policy */
-                               si_rx_chan_blk(si);
+                               si_rx_chan_blk(cs->si);
                                break;
                        }
                }
@@ -782,7 +780,7 @@ int si_cs_recv(struct conn_stream *cs)
                /* if we are waiting for more space, don't try to read more data
                 * right now.
                 */
-               if (si_rx_blocked(si))
+               if (si_rx_blocked(cs->si))
                        break;
        } /* while !flags */
 
@@ -846,12 +844,12 @@ int si_cs_recv(struct conn_stream *cs)
                cs_conn_read0(cs);
                ret = 1;
        }
-       else if (!si_rx_blocked(si)) {
+       else if (!si_rx_blocked(cs->si)) {
                /* Subscribe to receive events if we're blocking on I/O */
-               conn->mux->subscribe(cs, SUB_RETRY_RECV, &si->cs->wait_event);
-               si_rx_endp_done(si);
+               conn->mux->subscribe(cs, SUB_RETRY_RECV, &cs->wait_event);
+               si_rx_endp_done(cs->si);
        } else {
-               si_rx_endp_more(si);
+               si_rx_endp_more(cs->si);
                ret = 1;
        }
        return ret;