return sc_app_shut_applet(sc);
}
+/*
+ * This is the callback which is called by the applet layer to receive data into
+ * the buffer from the appctx. It iterates over the applet's rcv_buf
+ * function. Please do not statify this function, it's often present in
+ * backtraces, it's useful to recognize it.
+ */
+int sc_applet_recv(struct stconn *sc)
+{
+ struct appctx *appctx = __sc_appctx(sc);
+ struct channel *ic = sc_ic(sc);
+ int ret, max, cur_read = 0;
+ int read_poll = MAX_READ_POLL_LOOPS;
+ int flags = 0;
+
+
+ /* If another call to sc_applet_recv() failed, give up now.
+ */
+ if (sc_waiting_room(sc))
+ return 0;
+
+ /* maybe we were called immediately after an asynchronous abort */
+ if (sc->flags & (SC_FL_EOS|SC_FL_ABRT_DONE))
+ return 1;
+
+ /* We must wait because the applet is not fully initialized */
+ if (se_fl_test(sc->sedesc, SE_FL_ORPHAN))
+ return 0;
+
+ /* stop immediately on errors. */
+ if (!sc_ep_test(sc, SE_FL_RCV_MORE)) {
+ // TODO: be sure SE_FL_RCV_MORE may be set for applet ?
+ if (sc_ep_test(sc, SE_FL_ERROR))
+ goto end_recv;
+ }
+
+ /* prepare to detect if the mux needs more room */
+ sc_ep_clr(sc, SE_FL_WANT_ROOM);
+
+ channel_check_idletimer(ic);
+
+ /* TODO: Handle fastfwd here be implement callback function first ! */
+
+ if (!sc_alloc_ibuf(sc, &appctx->buffer_wait))
+ goto end_recv;
+
+ /* For an HTX stream, if the buffer is stuck (no output data with some
+ * input data) and if the HTX message is fragmented or if its free space
+ * wraps, we force an HTX deframentation. It is a way to have a
+ * contiguous free space nad to let the mux to copy as much data as
+ * possible.
+ *
+ * NOTE: A possible optim may be to let the mux decides if defrag is
+ * required or not, depending on amount of data to be xferred.
+ */
+ if (IS_HTX_STRM(__sc_strm(sc)) && !co_data(ic)) {
+ struct htx *htx = htxbuf(&ic->buf);
+
+ if (htx_is_not_empty(htx) && ((htx->flags & HTX_FL_FRAGMENTED) || htx_space_wraps(htx)))
+ htx_defrag(htx, NULL, 0);
+ }
+
+ /* Compute transient CO_RFL_* flags */
+ if (co_data(ic)) {
+ flags |= (CO_RFL_BUF_WET | CO_RFL_BUF_NOT_STUCK);
+ }
+
+ /* <max> may be null. This is the mux responsibility to set
+ * SE_FL_RCV_MORE on the SC if more space is needed.
+ */
+ max = channel_recv_max(ic);
+ ret = appctx->applet->rcv_buf(sc, &ic->buf, max, flags);
+ if (sc_ep_test(sc, SE_FL_WANT_ROOM)) {
+ /* SE_FL_WANT_ROOM must not be reported if the channel's
+ * buffer is empty.
+ */
+ BUG_ON(c_empty(ic));
+
+ sc_need_room(sc, channel_recv_max(ic) + 1);
+ /* Add READ_PARTIAL because some data are pending but
+ * cannot be xferred to the channel
+ */
+ ic->flags |= CF_READ_EVENT;
+ sc_ep_report_read_activity(sc);
+ }
+
+ if (ret <= 0) {
+ /* if we refrained from reading because we asked for a flush to
+ * satisfy rcv_pipe(), report that there's not enough room here
+ * to proceed.
+ */
+ if (flags & CO_RFL_BUF_FLUSH)
+ sc_need_room(sc, -1);
+ goto done_recv;
+ }
+
+ cur_read += ret;
+
+ /* if we're allowed to directly forward data, we must update ->o */
+ if (ic->to_forward && !(sc_opposite(sc)->flags & (SC_FL_SHUT_DONE|SC_FL_SHUT_WANTED))) {
+ unsigned long fwd = ret;
+ if (ic->to_forward != CHN_INFINITE_FORWARD) {
+ if (fwd > ic->to_forward)
+ fwd = ic->to_forward;
+ ic->to_forward -= fwd;
+ }
+ c_adv(ic, fwd);
+ }
+
+ ic->flags |= CF_READ_EVENT;
+ ic->total += ret;
+
+ /* End-of-input reached, we can leave. In this case, it is
+ * important to break the loop to not block the SC because of
+ * the channel's policies.This way, we are still able to receive
+ * shutdowns.
+ */
+ if (sc_ep_test(sc, SE_FL_EOI))
+ goto done_recv;
+
+ if ((sc->flags & SC_FL_RCV_ONCE) || --read_poll <= 0) {
+ /* we don't expect to read more data */
+ sc_wont_read(sc);
+ goto done_recv;
+ }
+
+ /* if too many bytes were missing from last read, it means that
+ * it's pointless trying to read again because the system does
+ * not have them in buffers.
+ */
+ if (ret < max) {
+ /* if a streamer has read few data, it may be because we
+ * have exhausted system buffers. It's not worth trying
+ * again.
+ */
+ if (ic->flags & CF_STREAMER) {
+ /* we're stopped by the channel's policy */
+ sc_wont_read(sc);
+ goto done_recv;
+ }
+
+ /* if we read a large block smaller than what we requested,
+ * it's almost certain we'll never get anything more.
+ */
+ if (ret >= global.tune.recv_enough) {
+ /* we're stopped by the channel's policy */
+ sc_wont_read(sc);
+ }
+ }
+
+ done_recv:
+ if (!cur_read)
+ se_have_no_more_data(sc->sedesc);
+ else {
+ channel_check_xfer(ic, cur_read);
+ sc_ep_report_read_activity(sc);
+ }
+
+ end_recv:
+ ret = (cur_read != 0);
+
+ /* Report EOI on the channel if it was reached from the mux point of
+ * view. */
+ if (sc_ep_test(sc, SE_FL_EOI) && !(sc->flags & SC_FL_EOI)) {
+ sc_ep_report_read_activity(sc);
+ sc->flags |= SC_FL_EOI;
+ ic->flags |= CF_READ_EVENT;
+ ret = 1;
+ }
+
+ if (sc_ep_test(sc, SE_FL_EOS)) {
+ /* we received a shutdown */
+ if (ic->flags & CF_AUTO_CLOSE)
+ sc_schedule_shutdown(sc_opposite(sc));
+ sc_applet_eos(sc);
+ ret = 1;
+ }
+
+ if (sc_ep_test(sc, SE_FL_ERROR)) {
+ sc->flags |= SC_FL_ERROR;
+ ret = 1;
+ }
+ else if (!cur_read &&
+ !(sc->flags & (SC_FL_WONT_READ|SC_FL_NEED_BUFF|SC_FL_NEED_ROOM)) &&
+ !(sc->flags & (SC_FL_EOS|SC_FL_ABRT_DONE))) {
+ se_have_no_more_data(sc->sedesc);
+ }
+ else {
+ se_have_more_data(sc->sedesc);
+ ret = 1;
+ }
+
+ return ret;
+}
+
+/* This tries to perform a synchronous receive on the stream connector to
+ * try to collect last arrived data. In practice it's only implemented on
+ * stconns. Returns 0 if nothing was done, non-zero if new data or a
+ * shutdown were collected. This may result on some delayed receive calls
+ * to be programmed and performed later, though it doesn't provide any
+ * such guarantee.
+ */
+int sc_applet_sync_recv(struct stconn *sc)
+{
+ if (!sc_state_in(sc->state, SC_SB_RDY|SC_SB_EST))
+ return 0;
+
+ if (se_fl_test(sc->sedesc, SE_FL_ORPHAN))
+ return 0;
+
+ if (!sc_is_recv_allowed(sc))
+ return 0; // already failed
+
+ return sc_applet_recv(sc);
+}
+
+/*
+ * This function is called to send buffer data to an applet. It calls the
+ * applet's snd_buf function. Please do not statify this function, it's often
+ * present in backtraces, it's useful to recognize it.
+ */
+int sc_applet_send(struct stconn *sc)
+{
+ struct appctx *appctx = __sc_appctx(sc);
+ struct stconn *sco = sc_opposite(sc);
+ struct channel *oc = sc_oc(sc);
+ size_t ret;
+ int did_send = 0;
+
+ if (sc_ep_test(sc, SE_FL_ERROR | SE_FL_ERR_PENDING)) {
+ BUG_ON(sc_ep_test(sc, SE_FL_EOS|SE_FL_ERROR|SE_FL_ERR_PENDING) == (SE_FL_EOS|SE_FL_ERR_PENDING));
+ return 1;
+ }
+
+ if (sc_ep_test(sc, SE_FL_WONT_CONSUME))
+ return 0;
+
+ /* we might have been called just after an asynchronous shutw */
+ if (sc->flags & SC_FL_SHUT_DONE)
+ return 1;
+
+ /* We must wait because the applet is not fully initialized */
+ if (se_fl_test(sc->sedesc, SE_FL_ORPHAN))
+ return 0;
+
+ if (co_data(oc)) {
+ ret = appctx->applet->snd_buf(sc, &oc->buf, co_data(oc), 0);
+ if (ret > 0) {
+ did_send = 1;
+ c_rew(oc, ret);
+ c_realign_if_empty(oc);
+
+ if (!co_data(oc)) {
+ /* Always clear both flags once everything has been sent, they're one-shot */
+ sc->flags &= ~(SC_FL_SND_ASAP|SC_FL_SND_EXP_MORE);
+ }
+ /* if some data remain in the buffer, it's only because the
+ * system buffers are full, we will try next time.
+ */
+ }
+ }
+
+ if (did_send)
+ oc->flags |= CF_WRITE_EVENT | CF_WROTE_DATA;
+
+ if (!sco->room_needed || (did_send && (sco->room_needed < 0 || channel_recv_max(sc_oc(sc)) >= sco->room_needed)))
+ sc_have_room(sco);
+
+ if (sc_ep_test(sc, SE_FL_ERROR | SE_FL_ERR_PENDING)) {
+ oc->flags |= CF_WRITE_EVENT;
+ BUG_ON(sc_ep_test(sc, SE_FL_EOS|SE_FL_ERROR|SE_FL_ERR_PENDING) == (SE_FL_EOS|SE_FL_ERR_PENDING));
+ if (sc_ep_test(sc, SE_FL_ERROR))
+ sc->flags |= SC_FL_ERROR;
+ return 1;
+ }
+
+ if (!co_data(oc)) {
+ if (did_send)
+ sc_ep_report_send_activity(sc);
+ }
+ else {
+ sc_ep_report_blocked_send(sc, did_send);
+ }
+
+ return did_send;
+}
+
+void sc_applet_sync_send(struct stconn *sc)
+{
+ struct channel *oc = sc_oc(sc);
+
+ oc->flags &= ~CF_WRITE_EVENT;
+
+ if (sc->flags & SC_FL_SHUT_DONE)
+ return;
+
+ if (!co_data(oc))
+ return;
+
+ if (!sc_state_in(sc->state, SC_SB_EST))
+ return;
+
+ if (se_fl_test(sc->sedesc, SE_FL_ORPHAN))
+ return;
+
+ sc_applet_send(sc);
+}
+
/* Callback to be used by applet handlers upon completion. It updates the stream
* (which may or may not take this opportunity to try to forward data), then
* may re-enable the applet's based on the channels and stream connector's final