* have at least read something.
*/
- if (tick_isset(b->rex) && b->flags & BF_READ_PARTIAL)
+ if ((b->flags & (BF_READ_PARTIAL|BF_FULL|BF_READ_NOEXP)) == BF_READ_PARTIAL)
b->rex = tick_add_ifset(now_ms, b->rto);
if (!(b->flags & BF_READ_ACTIVITY))
goto out_skip_wakeup;
out_wakeup:
+ /* the consumer might be waiting for data */
+ if (b->cons->flags & SI_FL_WAIT_DATA && (b->flags & BF_READ_PARTIAL))
+ b->cons->chk_snd(b->cons);
+
task_wakeup(si->owner, TASK_WOKEN_IO);
out_skip_wakeup:
* written something.
*/
- if (tick_isset(b->wex) && b->flags & BF_WRITE_PARTIAL) {
+ if ((b->flags & (BF_WRITE_PARTIAL|BF_EMPTY|BF_SHUTW)) == BF_WRITE_PARTIAL) {
b->wex = tick_add_ifset(now_ms, b->wto);
if (tick_isset(b->wex) & tick_isset(si->ib->rex)) {
/* FIXME: to prevent the client from expiring read timeouts during writes,
if (!(b->flags & BF_WRITE_ACTIVITY))
goto out_skip_wakeup;
out_wakeup:
+ /* the producer might be waiting for more room to store data */
+ if ((b->prod->flags & SI_FL_WAIT_ROOM) && (b->flags & BF_WRITE_PARTIAL))
+ b->prod->chk_rcv(b->prod);
+
task_wakeup(si->owner, TASK_WOKEN_IO);
out_skip_wakeup:
/* Check if we need to close the write side */
if (!(ob->flags & BF_SHUTW)) {
/* Write not closed, update FD status and timeout for writes */
- if ((ob->flags & BF_EMPTY) ||
+ if ((ob->send_max == 0) ||
+ (ob->flags & BF_EMPTY) ||
(ob->flags & (BF_HIJACK|BF_WRITE_ENA)) == 0) {
/* stop writing */
if ((ob->flags & (BF_EMPTY|BF_HIJACK|BF_WRITE_ENA)) == (BF_EMPTY|BF_WRITE_ENA))
}
}
+/* This function is used for inter-stream-interface calls. It is called by the
+ * consumer to inform the producer side that it may be interested in checking
+ * for free space in the buffer. Note that it intentionally does not update
+ * timeouts, so that we can still check them later at wake-up.
+ */
+void stream_sock_chk_rcv(struct stream_interface *si)
+{
+ struct buffer *ib = si->ib;
+
+ DPRINTF(stderr,"[%u] %s: fd=%d owner=%p ib=%p, ob=%p, exp(r,w)=%u,%u ibf=%08x obf=%08x ibl=%d obl=%d si=%d\n",
+ now_ms, __FUNCTION__,
+ fd, fdtab[fd].owner,
+ ib, ob,
+ ib->rex, ob->wex,
+ ib->flags, ob->flags,
+ ib->l, ob->l, si->state);
+
+ if (unlikely(si->state != SI_ST_EST || (ib->flags & BF_SHUTR)))
+ return;
+
+ if (ib->flags & (BF_FULL|BF_HIJACK)) {
+ /* stop reading */
+ if ((ib->flags & (BF_FULL|BF_HIJACK)) == BF_FULL)
+ si->flags |= SI_FL_WAIT_ROOM;
+ EV_FD_COND_C(si->fd, DIR_RD);
+ }
+ else {
+ /* (re)start reading */
+ si->flags &= ~SI_FL_WAIT_ROOM;
+ EV_FD_COND_S(si->fd, DIR_RD);
+ }
+}
+
+
+/* This function is used for inter-stream-interface calls. It is called by the
+ * producer to inform the consumer side that it may be interested in checking
+ * for data in the buffer. Note that it intentionally does not update timeouts,
+ * so that we can still check them later at wake-up.
+ */
+void stream_sock_chk_snd(struct stream_interface *si)
+{
+ struct buffer *ob = si->ob;
+
+ DPRINTF(stderr,"[%u] %s: fd=%d owner=%p ib=%p, ob=%p, exp(r,w)=%u,%u ibf=%08x obf=%08x ibl=%d obl=%d si=%d\n",
+ now_ms, __FUNCTION__,
+ fd, fdtab[fd].owner,
+ ib, ob,
+ ib->rex, ob->wex,
+ ib->flags, ob->flags,
+ ib->l, ob->l, si->state);
+
+ if (unlikely(si->state != SI_ST_EST || (ob->flags & BF_SHUTW)))
+ return;
+
+ if ((ob->send_max == 0) ||
+ (ob->flags & BF_EMPTY) ||
+ (ob->flags & (BF_HIJACK|BF_WRITE_ENA)) == 0) {
+ /* stop writing */
+ if ((ob->flags & (BF_EMPTY|BF_HIJACK|BF_WRITE_ENA)) == (BF_EMPTY|BF_WRITE_ENA))
+ si->flags |= SI_FL_WAIT_DATA;
+ EV_FD_COND_C(si->fd, DIR_WR);
+ }
+ else {
+ /* (re)start writing. */
+ si->flags &= ~SI_FL_WAIT_DATA;
+ EV_FD_COND_S(si->fd, DIR_WR);
+ }
+}
+
/*
* Local variables: