if (max == 0) {
b->flags |= BF_FULL;
+ si->flags |= SI_FL_WAIT_ROOM;
break;
}
}
b->flags |= BF_FULL;
+ si->flags |= SI_FL_WAIT_ROOM;
break;
}
out_wakeup:
/* We might have some data the consumer is waiting for */
- if (likely((b->send_max || b->splice_len) &&
- (b->cons->flags & SI_FL_WAIT_DATA)))
+ if ((b->send_max || b->splice_len) && (b->cons->flags & SI_FL_WAIT_DATA)) {
+ int last_len = b->splice_len;
+
b->cons->chk_snd(b->cons);
- /* note that the consumer might have cleared BF_FULL */
- if ((b->flags & (BF_READ_PARTIAL|BF_FULL|BF_READ_NOEXP)) == BF_READ_PARTIAL)
- b->rex = tick_add_ifset(now_ms, b->rto);
- else if (b->flags & BF_FULL) {
- si->flags |= SI_FL_WAIT_ROOM;
+ /* check if the consumer has freed some space */
+ if (!(b->flags & BF_FULL) && (!last_len || b->splice_len < last_len))
+ si->flags &= ~SI_FL_WAIT_ROOM;
+ }
+
+ if (si->flags & SI_FL_WAIT_ROOM) {
EV_FD_CLR(fd, DIR_RD);
b->rex = TICK_ETERNITY;
}
+ else if ((b->flags & (BF_READ_PARTIAL|BF_FULL|BF_READ_NOEXP)) == BF_READ_PARTIAL)
+ b->rex = tick_add_ifset(now_ms, b->rto);
/* we have to wake up if there is a special event or if we don't have
* any more data to forward.
*/
- if (likely((b->flags & (BF_READ_NULL|BF_READ_ERROR|BF_SHUTR)) ||
- !b->to_forward ||
- si->state != SI_ST_EST ||
- b->cons->state != SI_ST_EST ||
- (si->flags & SI_FL_ERR)))
+ if ((b->flags & (BF_READ_NULL|BF_READ_ERROR|BF_SHUTR)) ||
+ !b->to_forward ||
+ si->state != SI_ST_EST ||
+ b->cons->state != SI_ST_EST ||
+ (si->flags & SI_FL_ERR))
task_wakeup(si->owner, TASK_WOKEN_IO);
-
+
fdtab[fd].ev &= ~FD_POLL_IN;
return retval;
* It returns -1 in case of unrecoverable error, 0 if the caller needs to poll
* before calling it again, otherwise 1.
*/
-int stream_sock_write_loop(struct stream_interface *si, struct buffer *b)
+static int stream_sock_write_loop(struct stream_interface *si, struct buffer *b)
{
int write_poll = MAX_WRITE_POLL_LOOPS;
int retval = 1;
void stream_sock_chk_snd(struct stream_interface *si)
{
struct buffer *ob = si->ob;
+ int retval;
DPRINTF(stderr,"[%u] %s: fd=%d owner=%p ib=%p, ob=%p, exp(r,w)=%u,%u ibf=%08x obf=%08x ibl=%d obl=%d si=%d\n",
now_ms, __FUNCTION__,
if (unlikely(si->state != SI_ST_EST || (ob->flags & BF_SHUTW)))
return;
- if ((ob->send_max == 0 && ob->splice_len == 0) ||
- (ob->flags & BF_EMPTY) ||
- (ob->flags & (BF_HIJACK|BF_WRITE_ENA)) == 0) {
- /* stop writing */
+ if (!(si->flags & SI_FL_WAIT_DATA) || /* not waiting for data */
+ (fdtab[si->fd].ev & FD_POLL_OUT) || /* we'll be called anyway */
+ !(ob->send_max || ob->splice_len) || /* called with nothing to send ! */
+ !(ob->flags & (BF_HIJACK|BF_WRITE_ENA))) /* we may not write */
+ return;
+
+ retval = stream_sock_write_loop(si, ob);
+ if (retval < 0) {
+ /* Write error on the file descriptor. We mark the FD as STERROR so
+ * that we don't use it anymore and we notify the task.
+ */
+ fdtab[si->fd].state = FD_STERROR;
+ fdtab[si->fd].ev &= ~FD_POLL_STICKY;
+ si->flags |= SI_FL_ERR;
+ goto out_wakeup;
+ }
+
+ if (retval > 0 || (ob->send_max == 0 && ob->splice_len == 0)) {
+ /* the connection is established but we can't write. Either the
+ * buffer is empty, or we just refrain from sending because the
+ * send_max limit was reached. Maybe we just wrote the last
+ * chunk and need to close.
+ */
+ if (((ob->flags & (BF_SHUTW|BF_EMPTY|BF_HIJACK|BF_WRITE_ENA|BF_SHUTR)) ==
+ (BF_EMPTY|BF_WRITE_ENA|BF_SHUTR)) &&
+ (si->state == SI_ST_EST)) {
+ stream_sock_shutw(si);
+ goto out_wakeup;
+ }
+
if ((ob->flags & (BF_EMPTY|BF_HIJACK|BF_WRITE_ENA)) == (BF_EMPTY|BF_WRITE_ENA))
si->flags |= SI_FL_WAIT_DATA;
- EV_FD_COND_C(si->fd, DIR_WR);
+ ob->wex = TICK_ETERNITY;
}
else {
/* (re)start writing. */
si->flags &= ~SI_FL_WAIT_DATA;
EV_FD_COND_S(si->fd, DIR_WR);
}
+
+ /* in case of special condition (error, shutdown, end of write...), we
+ * have to notify the task.
+ */
+ if (likely((ob->flags & (BF_WRITE_NULL|BF_WRITE_ERROR|BF_SHUTW)) ||
+ (!ob->to_forward && !ob->send_max && !ob->splice_len) ||
+ si->state != SI_ST_EST)) {
+ out_wakeup:
+ task_wakeup(si->owner, TASK_WOKEN_IO);
+ }
}