Implement the subscription in the mux on the qcs instance.
Subscribe is now used by the h3 layer when receiving an incomplete frame
on the H3 control stream. It is also used when attaching the remote
uni-directional streams on the h3 layer.
In the qc_send, the mux wakes up the qcs for each new transfer executed.
This is done via the method qcs_notify_send().
The xprt wakes up the qcs when receiving data on unidirectional streams.
This is done via the method qcs_notify_recv().
struct buffer *qc_get_buf(struct qcs *qcs, struct buffer *bptr);
+int qcs_subscribe(struct qcs *qcs, int event_type, struct wait_event *es);
+void qcs_notify_recv(struct qcs *qcs);
+void qcs_notify_send(struct qcs *qcs);
+
/* Bit shift to get the stream sub ID for internal use which is obtained
* shifting the stream IDs by this value, knowing that the
* QCS_ID_TYPE_SHIFT less significant bits identify the stream ID
b_del(rxbuf, flen);
}
- /* TODO handle the case when the buffer is not empty. This can happens
- * if there is an incomplete frame.
+ /* Handle the case where remaining data are present in the buffer. This
+ * can happen if there is an incomplete frame. In this case, subscribe
+ * on the lower layer to restart receive operation.
*/
+ if (b_data(rxbuf))
+ qcs_subscribe(h3_uqs->qcs, SUB_RETRY_RECV, &h3_uqs->wait_event);
return 1;
}
h3->rctrl.qcs = qcs;
h3->rctrl.cb = h3_control_recv;
- // TODO wake-up rctrl tasklet on reception
+ qcs_subscribe(qcs, SUB_RETRY_RECV, &h3->rctrl.wait_event);
break;
case H3_UNI_STRM_TP_PUSH_STREAM:
/* NOT SUPPORTED */
h3->rqpack_enc.qcs = qcs;
h3->rqpack_enc.cb = qpack_decode_enc;
- // TODO wake-up rqpack_enc tasklet on reception
+ qcs_subscribe(qcs, SUB_RETRY_RECV, &h3->rqpack_enc.wait_event);
break;
case H3_UNI_STRM_TP_QPACK_DECODER:
if (h3->rqpack_dec.qcs) {
h3->rqpack_dec.qcs = qcs;
h3->rqpack_dec.cb = qpack_decode_dec;
- // TODO wake-up rqpack_dec tasklet on reception
+ qcs_subscribe(qcs, SUB_RETRY_RECV, &h3->rqpack_dec.wait_event);
break;
default:
/* Error */
return buf;
}
+int qcs_subscribe(struct qcs *qcs, int event_type, struct wait_event *es)
+{
+ fprintf(stderr, "%s\n", __func__);
+
+ BUG_ON(event_type & ~(SUB_RETRY_SEND|SUB_RETRY_RECV));
+ BUG_ON(qcs->subs && qcs->subs != es);
+
+ es->events |= event_type;
+ qcs->subs = es;
+
+ return 0;
+}
+
+void qcs_notify_recv(struct qcs *qcs)
+{
+ if (qcs->subs && qcs->subs->events & SUB_RETRY_RECV) {
+ tasklet_wakeup(qcs->subs->tasklet);
+ qcs->subs->events &= ~SUB_RETRY_RECV;
+ if (!qcs->subs->events)
+ qcs->subs = NULL;
+ }
+}
+
+void qcs_notify_send(struct qcs *qcs)
+{
+ if (qcs->subs && qcs->subs->events & SUB_RETRY_SEND) {
+ tasklet_wakeup(qcs->subs->tasklet);
+ qcs->subs->events &= ~SUB_RETRY_SEND;
+ if (!qcs->subs->events)
+ qcs->subs = NULL;
+ }
+}
+
static int qcs_push_frame(struct qcs *qcs, struct buffer *payload, int fin, uint64_t offset)
{
struct quic_frame *frm;
if (ret < 0)
ABORT_NOW();
+ if (ret > 0)
+ qcs_notify_send(qcs);
+
/* TODO wake-up xprt if data were transfered */
fprintf(stderr, "%s ret=%d\n", __func__, ret);
static int qc_subscribe(struct conn_stream *cs, int event_type,
struct wait_event *es)
{
- /* XXX TODO XXX */
- return 0;
+ return qcs_subscribe(cs->ctx, event_type, es);
}
/* Called from the upper layer, to unsubscribe <es> from events <event_type>.
return 0;
}
+ if (ret)
+ qcs_notify_recv(strm);
+
strm_frm->offset.key += ret;
}
/* Take this frame into an account for the stream flow control */