si->flags |= SI_FL_RX_WAIT_EP;
}
+/* Tell a stream interface the input channel is OK with it sending it some data */
+static inline void si_rx_chan_rdy(struct stream_interface *si)
+{
+ si->flags &= ~SI_FL_RXBLK_CHAN;
+}
+
+/* Tell a stream interface the input channel is not OK with it sending it some data */
+static inline void si_rx_chan_blk(struct stream_interface *si)
+{
+ si->flags |= SI_FL_RXBLK_CHAN;
+}
+
/* The stream interface just got the input buffer it was waiting for */
static inline void si_rx_buff_rdy(struct stream_interface *si)
{
}
if ((sio->flags & SI_FL_RXBLK_ROOM) &&
- ((oc->flags & (CF_SHUTW|CF_WRITE_PARTIAL|CF_DONT_READ)) == CF_WRITE_PARTIAL ||
- channel_is_empty(oc))) {
+ ((oc->flags & CF_WRITE_PARTIAL) || channel_is_empty(oc)))
sio->flags &= ~SI_FL_RXBLK_ROOM;
- }
+
+ if (oc->flags & CF_DONT_READ)
+ si_rx_chan_blk(sio);
+ else
+ si_rx_chan_rdy(sio);
/* Notify the other side when we've injected data into the IC that
* needs to be forwarded. We can do fast-forwarding as soon as there
si->flags &= ~SI_FL_RXBLK_ROOM;
}
+ if (!(ic->flags & CF_DONT_READ))
+ si_rx_chan_rdy(si);
+
si_chk_rcv(si);
si_chk_rcv(sio);
- if (si->flags & SI_FL_RXBLK_ROOM) {
+ if (si_rx_blocked(si)) {
ic->rex = TICK_ETERNITY;
}
- else if ((ic->flags & (CF_SHUTR|CF_READ_PARTIAL|CF_DONT_READ)) == CF_READ_PARTIAL) {
+ else if ((ic->flags & (CF_SHUTR|CF_READ_PARTIAL)) == CF_READ_PARTIAL) {
/* we must re-enable reading if si_chk_snd() has freed some space */
if (!(ic->flags & CF_READ_NOEXP) && tick_isset(ic->rex))
ic->rex = tick_add_ifset(now_ms, ic->rto);
if (!(si->wait_event.wait_reason & SUB_CAN_SEND) && !channel_is_empty(si_oc(si)))
ret = si_cs_send(cs);
- if (!(si->wait_event.wait_reason & SUB_CAN_RECV)) {
+ if (!(si->wait_event.wait_reason & SUB_CAN_RECV))
ret |= si_cs_recv(cs);
- if (!(si_ic(si)->flags & (CF_SHUTR|CF_DONT_READ)))
- si->flags &= ~SI_FL_RX_WAIT_EP;
- }
if (ret != 0)
si_cs_process(cs);
if (!(ic->flags & CF_SHUTR)) {
/* Read not closed, update FD status and timeout for reads */
- if ((ic->flags & CF_DONT_READ) || !channel_is_empty(ic)) {
+ if (ic->flags & CF_DONT_READ)
+ si_rx_chan_blk(si);
+ else
+ si_rx_chan_rdy(si);
+
+ if (!channel_is_empty(ic)) {
/* stop reading, imposed by channel's policy or contents */
si_cant_put(si);
- ic->rex = TICK_ETERNITY;
}
else {
/* (re)start reading and update timeout. Note: we don't recompute the timeout
* have updated it if there has been a completed I/O.
*/
si->flags &= ~SI_FL_RXBLK_ROOM;
- if (!(ic->flags & (CF_READ_NOEXP|CF_DONT_READ)) && !tick_isset(ic->rex))
- ic->rex = tick_add_ifset(now_ms, ic->rto);
}
+ if (si->flags & SI_FL_RXBLK_ANY & ~SI_FL_RX_WAIT_EP)
+ ic->rex = TICK_ETERNITY;
+ else if (!(ic->flags & CF_READ_NOEXP) && !tick_isset(ic->rex))
+ ic->rex = tick_add_ifset(now_ms, ic->rto);
+
si_chk_rcv(si);
}
else
if ((ic->flags & CF_READ_DONTWAIT) || --read_poll <= 0) {
/* we're stopped by the channel's policy */
- si_cant_put(si);
+ si_rx_chan_blk(si);
break;
}
*/
if (ic->flags & CF_STREAMER) {
/* we're stopped by the channel's policy */
- si_cant_put(si);
+ si_rx_chan_blk(si);
break;
}
*/
if (ret >= global.tune.recv_enough) {
/* we're stopped by the channel's policy */
- si_cant_put(si);
+ si_rx_chan_blk(si);
break;
}
}
/* if we are waiting for more space, don't try to read more data
* right now.
*/
- if (si->flags & SI_FL_RXBLK_ROOM)
+ if (si_rx_blocked(si))
break;
} /* while !flags */