si->flags |= SI_FL_RX_WAIT_EP;
}
+/* The stream interface just got the input buffer it was waiting for */
+static inline void si_rx_buff_rdy(struct stream_interface *si)
+{
+ si->flags &= ~SI_FL_RXBLK_BUFF;
+}
+
+/* The stream interface failed to get an input buffer and is waiting for it.
+ * Since it indicates a willingness to deliver data to the buffer that will
+ * have to be retried, we automatically clear RXBLK_ENDP to be called again
+ * as soon as RXBLK_BUFF is cleared.
+ */
+static inline void si_rx_buff_blk(struct stream_interface *si)
+{
+ si->flags |= SI_FL_RXBLK_BUFF;
+}
+
/* Returns non-zero if the stream interface's Rx path is blocked */
static inline int si_tx_blocked(const struct stream_interface *si)
{
/* Try to allocate a buffer for the stream-int's input channel. It relies on
* channel_alloc_buffer() for this so it abides by its rules. It returns 0 on
* failure, non-zero otherwise. If no buffer is available, the requester,
- * represented by <wait> pointer, will be added in the list of objects waiting
- * for an available buffer, and SI_FL_RXBLK_ROOM will be set on the stream-int.
- * The requester will be responsible for calling this function to try again
- * once woken up.
+ * represented by the <wait> pointer, will be added in the list of objects
+ * waiting for an available buffer, and SI_FL_RXBLK_BUFF will be set on the
+ * stream-int and SI_FL_RX_WAIT_EP cleared. The requester will be responsible
+ * for calling this function to try again once woken up.
*/
static inline int si_alloc_ibuf(struct stream_interface *si, struct buffer_wait *wait)
{
ret = channel_alloc_buffer(si_ic(si), wait);
if (!ret)
- si_cant_put(si);
+ si_rx_buff_blk(si);
return ret;
}
struct stream_interface *si = appctx->owner;
/* allocation requested ? */
- if (!(si->flags & SI_FL_RXBLK_ROOM) || c_size(si_ic(si)) || si_ic(si)->pipe)
+ if (!(si->flags & SI_FL_RXBLK_BUFF))
+ return 0;
+
+ si_rx_buff_rdy(si);
+
+ /* was already allocated another way ? if so, don't take this one */
+ if (c_size(si_ic(si)) || si_ic(si)->pipe)
return 0;
/* allocation possible now ? */
- if (!b_alloc_margin(&si_ic(si)->buf, global.tune.reserved_bufs))
+ if (!b_alloc_margin(&si_ic(si)->buf, global.tune.reserved_bufs)) {
+ si_rx_buff_blk(si);
return 0;
+ }
- si->flags &= ~SI_FL_RXBLK_ROOM;
task_wakeup(appctx->t, TASK_WOKEN_RES);
return 1;
}
__appctx_free(app);
return NULL;
}
- /* Now we'll try to allocate the input buffer. We wake up the
- * applet in all cases. So this is the applet responsibility to
- * check if this buffer was allocated or not. This let a chance
- * for applets to do some other processing if needed. */
- if (!si_alloc_ibuf(si, &app->buffer_wait))
- si_cant_put(si);
/* We always pretend the applet can't get and doesn't want to
* put, it's up to it to change this if needed. This ensures
si_cant_get(si);
si_stop_put(si);
+ /* Now we'll try to allocate the input buffer. We wake up the applet in
+ * all cases. So this is the applet's responsibility to check if this
+ * buffer was allocated or not. This leaves a chance for applets to do
+ * some other processing if needed. The applet doesn't have anything to
+ * do if it needs the buffer, it will be called again upon readiness.
+ */
+ if (!si_alloc_ibuf(si, &app->buffer_wait))
+ si_want_put(si);
+
app->applet->fct(app);
si_applet_wake_cb(si);
channel_release_buffer(si_ic(si), &app->buffer_wait);
{
struct stream *s = arg;
- if (!s->req.buf.size && !s->req.pipe && (s->si[0].flags & SI_FL_RXBLK_ROOM) &&
+ if (!s->req.buf.size && !s->req.pipe && (s->si[0].flags & SI_FL_RXBLK_BUFF) &&
b_alloc_margin(&s->req.buf, global.tune.reserved_bufs))
- s->si[0].flags &= ~SI_FL_RXBLK_ROOM;
- else if (!s->res.buf.size && !s->res.pipe && (s->si[1].flags & SI_FL_RXBLK_ROOM) &&
+ si_rx_buff_rdy(&s->si[0]);
+ else if (!s->res.buf.size && !s->res.pipe && (s->si[1].flags & SI_FL_RXBLK_BUFF) &&
b_alloc_margin(&s->res.buf, 0))
- s->si[1].flags &= ~SI_FL_RXBLK_ROOM;
+ si_rx_buff_rdy(&s->si[1]);
else
return 0;