]> git.ipfire.org Git - thirdparty/haproxy.git/commitdiff
[MEDIUM] enable inter-stream_interface wakeup calls
authorWilly Tarreau <w@1wt.eu>
Sun, 14 Dec 2008 13:42:35 +0000 (14:42 +0100)
committerWilly Tarreau <w@1wt.eu>
Sun, 28 Dec 2008 10:09:02 +0000 (11:09 +0100)
By letting the producer tell the consumer there is data to check,
and the consumer tell the producer there is some space left again,
we can cut in half the number of session wakeups.

This is also an important starting point for future splicing support.

include/proto/stream_sock.h
include/types/stream_interface.h
src/client.c
src/proto_uxst.c
src/stream_sock.c

index a3fd992e9ce5cee88090058f486e9b7925344cec..929cb082f0422a8a8b1466714733e10e314daa8e 100644 (file)
@@ -3,7 +3,7 @@
   This file contains client-side definitions.
 
   Copyright (C) 2000-2008 Willy Tarreau - w@1wt.eu
-  
+
   This library is free software; you can redistribute it and/or
   modify it under the terms of the GNU Lesser General Public
   License as published by the Free Software Foundation, version 2.1
@@ -36,6 +36,8 @@ int stream_sock_write(int fd);
 void stream_sock_data_finish(struct stream_interface *si);
 void stream_sock_shutr(struct stream_interface *si);
 void stream_sock_shutw(struct stream_interface *si);
+void stream_sock_chk_rcv(struct stream_interface *si);
+void stream_sock_chk_snd(struct stream_interface *si);
 
 
 /* This either returns the sockname or the original destination address. Code
index d34cfa4a819300701e553096aba23393063b593b..bb4a9e3b5f2f20867d0a1edd0f8053af55bc267c 100644 (file)
@@ -78,6 +78,8 @@ struct stream_interface {
        unsigned int exp;       /* wake up time for connect, queue, turn-around, ... */
        void (*shutr)(struct stream_interface *);  /* shutr function */
        void (*shutw)(struct stream_interface *);  /* shutw function */
+       void (*chk_rcv)(struct stream_interface *);/* chk_rcv function */
+       void (*chk_snd)(struct stream_interface *);/* chk_snd function */
        struct buffer *ib, *ob; /* input and output buffers */
        unsigned int err_type;  /* first error detected, one of SI_ET_* */
        void *err_loc;          /* commonly the server, NULL when SI_ET_NONE */
index 4e8004e7666b887e36a8f964e9d51b7b5ccc197f..5885f06f4f81cbb2ed28b0289d3d67e41a892646 100644 (file)
@@ -182,6 +182,8 @@ int event_accept(int fd) {
                s->si[0].owner = t;
                s->si[0].shutr = stream_sock_shutr;
                s->si[0].shutw = stream_sock_shutw;
+               s->si[0].chk_rcv = stream_sock_chk_rcv;
+               s->si[0].chk_snd = stream_sock_chk_snd;
                s->si[0].fd = cfd;
                s->si[0].flags = SI_FL_NONE;
                s->si[0].exp = TICK_ETERNITY;
@@ -192,6 +194,8 @@ int event_accept(int fd) {
                s->si[1].owner = t;
                s->si[1].shutr = stream_sock_shutr;
                s->si[1].shutw = stream_sock_shutw;
+               s->si[1].chk_rcv = stream_sock_chk_rcv;
+               s->si[1].chk_snd = stream_sock_chk_snd;
                s->si[1].exp = TICK_ETERNITY;
                s->si[1].fd = -1; /* just to help with debugging */
                s->si[1].flags = SI_FL_NONE;
index 3da304999da5fef67565f8df4cc54e0229fea1bb..6c1367936e9788e056d6623e07e7f9baf13e15cd 100644 (file)
@@ -452,6 +452,8 @@ int uxst_event_accept(int fd) {
                s->si[0].owner = t;
                s->si[0].shutr = stream_sock_shutr;
                s->si[0].shutw = stream_sock_shutw;
+               s->si[0].chk_rcv = stream_sock_chk_rcv;
+               s->si[0].chk_snd = stream_sock_chk_snd;
                s->si[0].fd = cfd;
                s->si[0].flags = SI_FL_NONE;
                s->si[0].exp = TICK_ETERNITY;
@@ -462,6 +464,8 @@ int uxst_event_accept(int fd) {
                s->si[1].owner = t;
                s->si[1].shutr = stream_sock_shutr;
                s->si[1].shutw = stream_sock_shutw;
+               s->si[1].chk_rcv = stream_sock_chk_rcv;
+               s->si[1].chk_snd = stream_sock_chk_snd;
                s->si[1].exp = TICK_ETERNITY;
                s->si[1].fd = -1; /* just to help with debugging */
                s->si[1].flags = SI_FL_NONE;
index 72a42c70df116635ae78fa440c9fafd0d1ff3b8b..fdd0dbdf3a9b46968f1c4741d4e8c3581f5a7ce4 100644 (file)
@@ -244,12 +244,16 @@ int stream_sock_read(int fd) {
         * have at least read something.
         */
 
-       if (tick_isset(b->rex) && b->flags & BF_READ_PARTIAL)
+       if ((b->flags & (BF_READ_PARTIAL|BF_FULL|BF_READ_NOEXP)) == BF_READ_PARTIAL)
                b->rex = tick_add_ifset(now_ms, b->rto);
 
        if (!(b->flags & BF_READ_ACTIVITY))
                goto out_skip_wakeup;
  out_wakeup:
+       /* the consumer might be waiting for data */
+       if (b->cons->flags & SI_FL_WAIT_DATA && (b->flags & BF_READ_PARTIAL))
+               b->cons->chk_snd(b->cons);
+
        task_wakeup(si->owner, TASK_WOKEN_IO);
 
  out_skip_wakeup:
@@ -433,7 +437,7 @@ int stream_sock_write(int fd) {
         * written something.
         */
 
-       if (tick_isset(b->wex) && b->flags & BF_WRITE_PARTIAL) {
+       if ((b->flags & (BF_WRITE_PARTIAL|BF_EMPTY|BF_SHUTW)) == BF_WRITE_PARTIAL) {
                b->wex = tick_add_ifset(now_ms, b->wto);
                if (tick_isset(b->wex) & tick_isset(si->ib->rex)) {
                        /* FIXME: to prevent the client from expiring read timeouts during writes,
@@ -448,6 +452,10 @@ int stream_sock_write(int fd) {
        if (!(b->flags & BF_WRITE_ACTIVITY))
                goto out_skip_wakeup;
  out_wakeup:
+       /* the producer might be waiting for more room to store data */
+       if ((b->prod->flags & SI_FL_WAIT_ROOM) && (b->flags & BF_WRITE_PARTIAL))
+               b->prod->chk_rcv(b->prod);
+
        task_wakeup(si->owner, TASK_WOKEN_IO);
 
  out_skip_wakeup:
@@ -579,7 +587,8 @@ void stream_sock_data_finish(struct stream_interface *si)
        /* Check if we need to close the write side */
        if (!(ob->flags & BF_SHUTW)) {
                /* Write not closed, update FD status and timeout for writes */
-               if ((ob->flags & BF_EMPTY) ||
+               if ((ob->send_max == 0) ||
+                   (ob->flags & BF_EMPTY) ||
                    (ob->flags & (BF_HIJACK|BF_WRITE_ENA)) == 0) {
                        /* stop writing */
                        if ((ob->flags & (BF_EMPTY|BF_HIJACK|BF_WRITE_ENA)) == (BF_EMPTY|BF_WRITE_ENA))
@@ -609,6 +618,75 @@ void stream_sock_data_finish(struct stream_interface *si)
        }
 }
 
+/* This function is used for inter-stream-interface calls. It is called by the
+ * consumer to inform the producer side that it may be interested in checking
+ * for free space in the buffer. Note that it intentionally does not update
+ * timeouts, so that we can still check them later at wake-up.
+ */
+void stream_sock_chk_rcv(struct stream_interface *si)
+{
+       struct buffer *ib = si->ib;
+
+       DPRINTF(stderr,"[%u] %s: fd=%d owner=%p ib=%p, ob=%p, exp(r,w)=%u,%u ibf=%08x obf=%08x ibl=%d obl=%d si=%d\n",
+               now_ms, __FUNCTION__,
+               fd, fdtab[fd].owner,
+               ib, ob,
+               ib->rex, ob->wex,
+               ib->flags, ob->flags,
+               ib->l, ob->l, si->state);
+
+       if (unlikely(si->state != SI_ST_EST || (ib->flags & BF_SHUTR)))
+               return;
+
+       if (ib->flags & (BF_FULL|BF_HIJACK)) {
+               /* stop reading */
+               if ((ib->flags & (BF_FULL|BF_HIJACK)) == BF_FULL)
+                       si->flags |= SI_FL_WAIT_ROOM;
+               EV_FD_COND_C(si->fd, DIR_RD);
+       }
+       else {
+               /* (re)start reading */
+               si->flags &= ~SI_FL_WAIT_ROOM;
+               EV_FD_COND_S(si->fd, DIR_RD);
+       }
+}
+
+
+/* This function is used for inter-stream-interface calls. It is called by the
+ * producer to inform the consumer side that it may be interested in checking
+ * for data in the buffer. Note that it intentionally does not update timeouts,
+ * so that we can still check them later at wake-up.
+ */
+void stream_sock_chk_snd(struct stream_interface *si)
+{
+       struct buffer *ob = si->ob;
+
+       DPRINTF(stderr,"[%u] %s: fd=%d owner=%p ib=%p, ob=%p, exp(r,w)=%u,%u ibf=%08x obf=%08x ibl=%d obl=%d si=%d\n",
+               now_ms, __FUNCTION__,
+               fd, fdtab[fd].owner,
+               ib, ob,
+               ib->rex, ob->wex,
+               ib->flags, ob->flags,
+               ib->l, ob->l, si->state);
+
+       if (unlikely(si->state != SI_ST_EST || (ob->flags & BF_SHUTW)))
+               return;
+
+       if ((ob->send_max == 0) ||
+           (ob->flags & BF_EMPTY) ||
+           (ob->flags & (BF_HIJACK|BF_WRITE_ENA)) == 0) {
+               /* stop writing */
+               if ((ob->flags & (BF_EMPTY|BF_HIJACK|BF_WRITE_ENA)) == (BF_EMPTY|BF_WRITE_ENA))
+                       si->flags |= SI_FL_WAIT_DATA;
+               EV_FD_COND_C(si->fd, DIR_WR);
+       }
+       else {
+               /* (re)start writing. */
+               si->flags &= ~SI_FL_WAIT_DATA;
+               EV_FD_COND_S(si->fd, DIR_WR);
+       }
+}
+
 
 /*
  * Local variables: