si->ops->chk_rcv(si);
}
+/* This tries to perform a synchronous receive on the stream interface to
+ * try to collect last arrived data. In practice it's only implemented on
+ * conn_streams. Returns 0 if nothing was done, non-zero if new data or a
+ * shutdown were collected. This may result on some delayed receive calls
+ * to be programmed and performed later, though it doesn't provide any
+ * such guarantee.
+ */
+static inline int si_sync_recv(struct stream_interface *si)
+{
+ struct conn_stream *cs;
+
+ if (si->state != SI_ST_EST)
+ return 0;
+
+ cs = objt_cs(si->end);
+ if (!cs)
+ return 0; // only conn_streams are supported
+
+ if (si->wait_event.wait_reason & SUB_CAN_RECV)
+ return 0; // already subscribed
+
+ if (si->flags & SI_FL_WAIT_ROOM && c_size(si_ic(si)))
+ return 0; // already failed
+
+ return si_cs_recv(cs);
+}
+
/* Calls chk_snd on the connection using the data layer */
static inline void si_chk_snd(struct stream_interface *si)
{
unsigned int req_ana_back;
struct channel *req, *res;
struct stream_interface *si_f, *si_b;
- struct conn_stream *cs;
activity[tid].stream++;
si_b = &s->si[1];
/* First, attempt to receive pending data from I/O layers */
- cs = objt_cs(si_f->end);
- if (cs && !(si_f->wait_event.wait_reason & SUB_CAN_RECV) &&
- (!(si_f->flags & SI_FL_WAIT_ROOM) || !c_size(req)))
- si_cs_recv(cs);
-
- cs = objt_cs(si_b->end);
- if (cs && !(si_b->wait_event.wait_reason & SUB_CAN_RECV) &&
- (!(si_b->flags & SI_FL_WAIT_ROOM) || !c_size(res)))
- si_cs_recv(cs);
+ si_sync_recv(si_f);
+ si_sync_recv(si_b);
+
redo:
//DPRINTF(stderr, "%s:%d: cs=%d ss=%d(%d) rqf=0x%08x rpf=0x%08x\n", __FUNCTION__, __LINE__,