]> git.ipfire.org Git - thirdparty/haproxy.git/commitdiff
MINOR: mux-quic: implement subscribe on stream
authorAmaury Denoyelle <adenoyelle@haproxy.com>
Mon, 6 Dec 2021 10:24:00 +0000 (11:24 +0100)
committerAmaury Denoyelle <adenoyelle@haproxy.com>
Tue, 7 Dec 2021 14:44:45 +0000 (15:44 +0100)
Implement the subscription in the mux on the qcs instance.

Subscribe is now used by the h3 layer when receiving an incomplete frame
on the H3 control stream. It is also used when attaching the remote
uni-directional streams on the h3 layer.

In the qc_send, the mux wakes up the qcs for each new transfer executed.
This is done via the method qcs_notify_send().

The xprt wakes up the qcs when receiving data on unidirectional streams.
This is done via the method qcs_notify_recv().

include/haproxy/mux_quic.h
src/h3.c
src/mux_quic.c
src/xprt_quic.c

index aa8572c14711e52e5c79a8decee122e14b22a7d6..9e7ef175f05c88e921799a9b99053b975c716e87 100644 (file)
@@ -15,6 +15,10 @@ void uni_qcs_free(struct qcs *qcs);
 
 struct buffer *qc_get_buf(struct qcs *qcs, struct buffer *bptr);
 
+int qcs_subscribe(struct qcs *qcs, int event_type, struct wait_event *es);
+void qcs_notify_recv(struct qcs *qcs);
+void qcs_notify_send(struct qcs *qcs);
+
 /* Bit shift to get the stream sub ID for internal use which is obtained
  * shifting the stream IDs by this value, knowing that the
  * QCS_ID_TYPE_SHIFT less significant bits identify the stream ID
index ecaa02d499e3e158cc2d569a601d5b777621915b..359b27676ca41b2e197ad3b5279d1a52d73dd3fc 100644 (file)
--- a/src/h3.c
+++ b/src/h3.c
@@ -322,9 +322,12 @@ static int h3_control_recv(struct h3_uqs *h3_uqs, void *ctx)
                b_del(rxbuf, flen);
        }
 
-       /* TODO handle the case when the buffer is not empty. This can happens
-        * if there is an incomplete frame.
+       /* Handle the case where remaining data are present in the buffer. This
+        * can happen if there is an incomplete frame. In this case, subscribe
+        * on the lower layer to restart receive operation.
         */
+       if (b_data(rxbuf))
+               qcs_subscribe(h3_uqs->qcs, SUB_RETRY_RECV, &h3_uqs->wait_event);
 
        return 1;
 }
@@ -658,7 +661,7 @@ static int h3_attach_ruqs(struct qcs *qcs, void *ctx)
 
                h3->rctrl.qcs = qcs;
                h3->rctrl.cb = h3_control_recv;
-               // TODO wake-up rctrl tasklet on reception
+               qcs_subscribe(qcs, SUB_RETRY_RECV, &h3->rctrl.wait_event);
                break;
        case H3_UNI_STRM_TP_PUSH_STREAM:
                /* NOT SUPPORTED */
@@ -671,7 +674,7 @@ static int h3_attach_ruqs(struct qcs *qcs, void *ctx)
 
                h3->rqpack_enc.qcs = qcs;
                h3->rqpack_enc.cb = qpack_decode_enc;
-               // TODO wake-up rqpack_enc tasklet on reception
+               qcs_subscribe(qcs, SUB_RETRY_RECV, &h3->rqpack_enc.wait_event);
                break;
        case H3_UNI_STRM_TP_QPACK_DECODER:
                if (h3->rqpack_dec.qcs) {
@@ -681,7 +684,7 @@ static int h3_attach_ruqs(struct qcs *qcs, void *ctx)
 
                h3->rqpack_dec.qcs = qcs;
                h3->rqpack_dec.cb = qpack_decode_dec;
-               // TODO wake-up rqpack_dec tasklet on reception
+               qcs_subscribe(qcs, SUB_RETRY_RECV, &h3->rqpack_dec.wait_event);
                break;
        default:
                /* Error */
index df586863730d86e46cfec76256871806c6ce984e..3b194f687fc973265ebdd9098a4b94fc00f06bab 100644 (file)
@@ -84,6 +84,39 @@ struct buffer *qc_get_buf(struct qcs *qcs, struct buffer *bptr)
        return buf;
 }
 
+int qcs_subscribe(struct qcs *qcs, int event_type, struct wait_event *es)
+{
+       fprintf(stderr, "%s\n", __func__);
+
+       BUG_ON(event_type & ~(SUB_RETRY_SEND|SUB_RETRY_RECV));
+       BUG_ON(qcs->subs && qcs->subs != es);
+
+       es->events |= event_type;
+       qcs->subs = es;
+
+       return 0;
+}
+
+void qcs_notify_recv(struct qcs *qcs)
+{
+       if (qcs->subs && qcs->subs->events & SUB_RETRY_RECV) {
+               tasklet_wakeup(qcs->subs->tasklet);
+               qcs->subs->events &= ~SUB_RETRY_RECV;
+               if (!qcs->subs->events)
+                       qcs->subs = NULL;
+       }
+}
+
+void qcs_notify_send(struct qcs *qcs)
+{
+       if (qcs->subs && qcs->subs->events & SUB_RETRY_SEND) {
+               tasklet_wakeup(qcs->subs->tasklet);
+               qcs->subs->events &= ~SUB_RETRY_SEND;
+               if (!qcs->subs->events)
+                       qcs->subs = NULL;
+       }
+}
+
 static int qcs_push_frame(struct qcs *qcs, struct buffer *payload, int fin, uint64_t offset)
 {
        struct quic_frame *frm;
@@ -157,6 +190,9 @@ static int qc_send(struct qcc *qcc)
                        if (ret < 0)
                                ABORT_NOW();
 
+                       if (ret > 0)
+                               qcs_notify_send(qcs);
+
                        /* TODO wake-up xprt if data were transfered */
 
                        fprintf(stderr, "%s ret=%d\n", __func__, ret);
@@ -323,8 +359,7 @@ static size_t qc_snd_buf(struct conn_stream *cs, struct buffer *buf,
 static int qc_subscribe(struct conn_stream *cs, int event_type,
                         struct wait_event *es)
 {
-       /* XXX TODO XXX */
-       return 0;
+       return qcs_subscribe(cs->ctx, event_type, es);
 }
 
 /* Called from the upper layer, to unsubscribe <es> from events <event_type>.
index 96d409fa7df58ef5d9168756a768fd2a24875b77..d70be2cc51261223a7e2f54326fd005de73006e5 100644 (file)
@@ -2149,6 +2149,9 @@ static int qc_handle_uni_strm_frm(struct quic_rx_packet *pkt,
                        return 0;
                }
 
+               if (ret)
+                       qcs_notify_recv(strm);
+
                strm_frm->offset.key += ret;
        }
        /* Take this frame into an account for the stream flow control */