]> git.ipfire.org Git - thirdparty/haproxy.git/commitdiff
[MAJOR] make stream sockets aware of the stream interface
authorWilly Tarreau <w@1wt.eu>
Sat, 30 Aug 2008 01:17:31 +0000 (03:17 +0200)
committerWilly Tarreau <w@1wt.eu>
Sun, 2 Nov 2008 09:19:08 +0000 (10:19 +0100)
As of now, a stream socket does not directly wake up the task
but it does contact the stream interface which itself knows the
task. This allows us to perform a few cleanups upon errors and
shutdowns, which reduces the number of calls to data_update()
from 8 per session to 2 per session, and make all the functions
called in the process_session() loop completely swappable.

Some improvements are required. We need to provide a shutw()
function on stream interfaces so that one side which closes
its read part on an empty buffer can propagate the close to
the remote side.

include/proto/stream_sock.h
include/types/stream_interface.h
src/backend.c
src/client.c
src/proto_http.c
src/stream_sock.c

index fe89d988a5d0ccc2a127142d47b7c0c7a7a7e608..e104054f3d31a9347d7e360757aa74283d95ef5a 100644 (file)
@@ -33,7 +33,7 @@
 /* main event functions used to move data between sockets and buffers */
 int stream_sock_read(int fd);
 int stream_sock_write(int fd);
-int stream_sock_data_check_errors(int fd);
+int stream_sock_data_check_timeouts(int fd);
 int stream_sock_data_update(int fd);
 int stream_sock_data_finish(int fd);
 
index edfb7583b2179baf97fc1223818834d777276e83..2525f1237bd41b62638818675929c2b61d7563ce 100644 (file)
@@ -42,24 +42,26 @@ enum {
 
 /* error types reported on the streams interface for more accurate reporting */
 enum {
-       SI_ET_NONE = 0,         /* no error yet, leave it to zero */
-       SI_ET_QUEUE_TO,         /* queue timeout */
-       SI_ET_QUEUE_ERR,        /* queue error (eg: full) */
-       SI_ET_QUEUE_ABRT,       /* aborted in queue by external cause */
-       SI_ET_CONN_TO,          /* connection timeout */
-       SI_ET_CONN_ERR,         /* connection error (eg: no server available) */
-       SI_ET_CONN_ABRT,        /* connection aborted by external cause (eg: abort) */
-       SI_ET_CONN_OTHER,       /* connection aborted for other reason (eg: 500) */
-       SI_ET_DATA_TO,          /* timeout during data phase */
-       SI_ET_DATA_ERR,         /* error during data phase */
-       SI_ET_DATA_ABRT,        /* data phase aborted by external cause */
+       SI_ET_NONE       = 0x0000,  /* no error yet, leave it to zero */
+       SI_ET_QUEUE_TO   = 0x0001,  /* queue timeout */
+       SI_ET_QUEUE_ERR  = 0x0002,  /* queue error (eg: full) */
+       SI_ET_QUEUE_ABRT = 0x0004,  /* aborted in queue by external cause */
+       SI_ET_CONN_TO    = 0x0008,  /* connection timeout */
+       SI_ET_CONN_ERR   = 0x0010,  /* connection error (eg: no server available) */
+       SI_ET_CONN_ABRT  = 0x0020,  /* connection aborted by external cause (eg: abort) */
+       SI_ET_CONN_OTHER = 0x0040,  /* connection aborted for other reason (eg: 500) */
+       SI_ET_DATA_TO    = 0x0080,  /* timeout during data phase */
+       SI_ET_DATA_ERR   = 0x0100,  /* error during data phase */
+       SI_ET_DATA_ABRT  = 0x0200,  /* data phase aborted by external cause */
 };
 
 struct stream_interface {
        unsigned int state;     /* SI_ST* */
-       int err_type;           /* first error detected, one of SI_ET_* */
-       void *err_loc;          /* commonly the server, NULL when SI_ET_NONE */
+       unsigned int prev_state;/* SI_ST*, copy of previous state */
+       void *owner;            /* generally a (struct task*) */
        int fd;                 /* file descriptor for a stream driver when known */
+       unsigned int err_type;  /* first error detected, one of SI_ET_* */
+       void *err_loc;          /* commonly the server, NULL when SI_ET_NONE */
 };
 
 
index f51ac8fc38ee3bdef8fc4cf99fb852418de63e4b..16b2cc9dfabe25dc64513fb23076901bd6101a0c 100644 (file)
@@ -1805,7 +1805,7 @@ int connect_server(struct session *s)
                }
        }
 
-       fdtab[fd].owner = s->task;
+       fdtab[fd].owner = s->req->cons;
        fdtab[fd].state = FD_STCONN; /* connection in progress */
        fdtab[fd].cb[DIR_RD].f = &stream_sock_read;
        fdtab[fd].cb[DIR_RD].b = s->rep;
index 1f577d1be8ca248dbf030efc112a5bd274f0ebcb..ef1ee098b7d85651f91ad3f7cf6b582e5808b0a2 100644 (file)
@@ -173,12 +173,14 @@ int event_accept(int fd) {
                s->si[0].state = SI_ST_EST;
                s->si[0].err_type = SI_ET_NONE;
                s->si[0].err_loc = NULL;
+               s->si[0].owner = t;
                s->si[0].fd = cfd;
                s->cli_fd = cfd;
 
                s->si[1].state = SI_ST_INI;
                s->si[1].err_type = SI_ET_NONE;
                s->si[1].err_loc = NULL;
+               s->si[1].owner = t;
                s->si[1].fd = -1; /* just to help with debugging */
 
                s->srv = s->prev_srv = s->srv_conn = NULL;
@@ -373,7 +375,7 @@ int event_accept(int fd) {
                t->expire = TICK_ETERNITY;
 
                fd_insert(cfd);
-               fdtab[cfd].owner = t;
+               fdtab[cfd].owner = &s->si[0];
                fdtab[cfd].listener = l;
                fdtab[cfd].state = FD_STREADY;
                fdtab[cfd].cb[DIR_RD].f = l->proto->read;
index bc8d3fbf23832f4bb4f503dd0142a647661d6e2b..b3db20dd8484b878c4e071581d371d1d7fe3c5f6 100644 (file)
@@ -660,40 +660,151 @@ void process_session(struct task *t, int *next)
        unsigned int rqf_srv, rpf_srv;
        unsigned int rqf_req, rpf_rep;
 
-       /* check server-side errors during data phase */
-       if (s->req->cons->state == SI_ST_EST) {
-               stream_sock_data_check_errors(s->req->cons->fd);
-               /* When a server-side connection is released, we have to
-                * count it and check for pending connections on this server.
-                */
-               if (unlikely(s->req->cons->state == SI_ST_CLO)) {
-                       /* Count server-side errors (but not timeouts). */
-                       if (s->req->flags & BF_WRITE_ERROR) {
-                               s->be->failed_resp++;
-                               if (s->srv)
-                                       s->srv->failed_resp++;
-                       }
+       /* Check timeouts only during data phase for now */
+       if (unlikely(t->state & TASK_WOKEN_TIMER)) {
+               if (s->rep->cons->state == SI_ST_EST)
+                       stream_sock_data_check_timeouts(s->rep->cons->fd);
 
-                       if (s->srv) {
-                               s->srv->cur_sess--;
-                               sess_change_server(s, NULL);
-                               if (may_dequeue_tasks(s->srv, s->be))
-                                       process_srv_queue(s->srv);
-                       }
+               if (s->req->cons->state == SI_ST_EST)
+                       stream_sock_data_check_timeouts(s->req->cons->fd);
+       }
+
+       /* When a server-side connection is released, we have to
+        * count it and check for pending connections on this server.
+        */
+       if (unlikely(s->req->cons->state == SI_ST_CLO &&
+                    s->req->cons->prev_state == SI_ST_EST)) {
+               /* Count server-side errors (but not timeouts). */
+               if (s->req->flags & BF_WRITE_ERROR) {
+                       s->be->failed_resp++;
+                       if (s->srv)
+                               s->srv->failed_resp++;
+               }
+
+               if (s->srv) {
+                       s->srv->cur_sess--;
+                       sess_change_server(s, NULL);
+                       if (may_dequeue_tasks(s->srv, s->be))
+                               process_srv_queue(s->srv);
+               }
+
+               if (unlikely((s->req->cons->state == SI_ST_CLO) &&
+                            (global.mode & MODE_DEBUG) &&
+                            (!(global.mode & MODE_QUIET) || (global.mode & MODE_VERBOSE)))) {
+                       int len;
+                       len = sprintf(trash, "%08x:%s.srvcls[%04x:%04x]\n",
+                                     s->uniq_id, s->be->id, (unsigned short)s->req->prod->fd, (unsigned short)s->req->cons->fd);
+                       write(1, trash, len);
+               }
+       }
+
+       if (unlikely(s->rep->cons->state == SI_ST_CLO &&
+                    s->rep->cons->prev_state == SI_ST_EST)) {
+               if (unlikely((s->rep->cons->state == SI_ST_CLO) &&
+                            (global.mode & MODE_DEBUG) &&
+                            (!(global.mode & MODE_QUIET) || (global.mode & MODE_VERBOSE)))) {
+                       int len;
+                       len = sprintf(trash, "%08x:%s.clicls[%04x:%04x]\n",
+                                     s->uniq_id, s->be->id, (unsigned short)s->rep->prod->fd, (unsigned short)s->req->cons->fd);
+                       write(1, trash, len);
                }
        }
 
-       /* check client-side errors during data phase */
-       if (s->rep->cons->state == SI_ST_EST)
-               stream_sock_data_check_errors(s->rep->cons->fd);
 
-       /* force one first pass everywhere */
+       /* Check if we need to close the write side. This can only happen
+        * when either SHUTR or EMPTY appears, because WRITE_ENA cannot appear
+        * from low level, and neither HIJACK nor SHUTW can disappear from low
+        * level. Later, this should move to stream_sock_{read,write}.
+        */
+       if ((s->req->flags & (BF_SHUTW|BF_EMPTY|BF_HIJACK|BF_WRITE_ENA|BF_SHUTR)) == (BF_EMPTY|BF_WRITE_ENA|BF_SHUTR)) {
+               buffer_shutw(s->req);
+               if (s->rep->flags & BF_SHUTR) {
+                       fd_delete(s->req->cons->fd);
+                       s->req->cons->state = SI_ST_CLO;
+               }
+               else {
+                       EV_FD_CLR(s->req->cons->fd, DIR_WR);
+                       shutdown(s->req->cons->fd, SHUT_WR);
+               }
+       }
+
+       /* Check if we need to close the write side */
+       if ((s->rep->flags & (BF_SHUTW|BF_EMPTY|BF_HIJACK|BF_WRITE_ENA|BF_SHUTR)) == (BF_EMPTY|BF_WRITE_ENA|BF_SHUTR)) {
+               buffer_shutw(s->rep);
+               if (s->req->flags & BF_SHUTR) {
+                       fd_delete(s->rep->cons->fd);
+                       s->rep->cons->state = SI_ST_CLO;
+               }
+               else {
+                       EV_FD_CLR(s->rep->cons->fd, DIR_WR);
+                       shutdown(s->rep->cons->fd, SHUT_WR);
+               }
+       }
+
+
+
+       /* Dirty trick: force one first pass everywhere */
        rqf_cli = rqf_srv = rqf_req = ~s->req->flags;
        rpf_cli = rpf_srv = rpf_rep = ~s->rep->flags;
 
+       /* well, the ST_CONN state is already handled properly */
+       if (s->req->prod->state == SI_ST_EST) {
+               rqf_cli = s->req->flags;
+               rpf_cli = s->rep->flags;
+       }
+
+       if (s->req->cons->state == SI_ST_EST) {
+               rqf_srv = s->req->flags;
+               rpf_srv = s->rep->flags;
+       }
+
        do {
+               DPRINTF(stderr,"[%u] %s: task=%p rq=%p, rp=%p, exp(r,w)=%u,%u rqf=%08x rpf=%08x rql=%d rpl=%d cs=%d ss=%d\n",
+                       now_ms, __FUNCTION__,
+                       t,
+                       s->req, s->rep,
+                       s->req->rex, s->rep->wex,
+                       s->req->flags, s->rep->flags,
+                       s->req->l, s->rep->l, s->rep->cons->state, s->req->cons->state);
+
                resync = 0;
 
+               /* Analyse request */
+               if ((rqf_req ^ s->req->flags) & BF_MASK_ANALYSER) {
+                       if (s->req->prod->state >= SI_ST_EST) {
+                               resync = 1;
+                               /* it's up to the analysers to reset write_ena */
+                               buffer_write_ena(s->req);
+                               if (s->req->analysers)
+                                       process_request(s);
+                               rqf_req = s->req->flags;
+                       }
+
+               }
+
+               /* Analyse response */
+               if (unlikely(s->rep->flags & BF_HIJACK)) {
+                       /* In inject mode, we wake up everytime something has
+                        * happened on the write side of the buffer.
+                        */
+                       if ((s->rep->flags & (BF_WRITE_PARTIAL|BF_WRITE_ERROR|BF_SHUTW)) &&
+                           !(s->rep->flags & BF_FULL)) {
+                               if (produce_content(s) != 0)
+                                       resync = 1; /* completed, better re-check flags */
+                       }
+               }
+               else if (s->rep->prod->state >= SI_ST_EST) {
+                       if ((rpf_rep ^ s->rep->flags) & BF_MASK_ANALYSER) {
+                               resync = 1;
+                               /* it's up to the analysers to reset write_ena */
+                               buffer_write_ena(s->rep);
+                               if (s->rep->analysers)
+                                       process_response(s);
+                               rpf_rep = s->rep->flags;
+                       }
+               }
+
+               /* Maybe resync client FD state */
                if (s->rep->cons->state != SI_ST_CLO) {
                        if (((rqf_cli ^ s->req->flags) & BF_MASK_INTERFACE_I) ||
                            ((rpf_cli ^ s->rep->flags) & BF_MASK_INTERFACE_O)) {
@@ -713,7 +824,7 @@ void process_session(struct task *t, int *next)
                        }
                }
 
-
+               /* Maybe resync server FD state */
                if (s->req->cons->state != SI_ST_CLO) {
                        if (((rpf_srv ^ s->rep->flags) & BF_MASK_INTERFACE_I) ||
                            ((rqf_srv ^ s->req->flags) & BF_MASK_INTERFACE_O)) {
@@ -761,38 +872,6 @@ void process_session(struct task *t, int *next)
                        }
                }
 
-               if ((rqf_req ^ s->req->flags) & BF_MASK_ANALYSER) {
-                       /* the analysers must block it themselves */
-                       if (s->req->prod->state >= SI_ST_EST) {
-                               resync = 1;
-                               buffer_write_ena(s->req);
-                               if (s->req->analysers)
-                                       process_request(s);
-                       }
-                       rqf_req = s->req->flags;
-               }
-
-               if (unlikely(s->rep->flags & BF_HIJACK)) {
-                       /* In inject mode, we wake up everytime something has
-                        * happened on the write side of the buffer.
-                        */
-                       if ((s->rep->flags & (BF_WRITE_PARTIAL|BF_WRITE_ERROR|BF_SHUTW)) &&
-                           !(s->rep->flags & BF_FULL)) {
-                               if (produce_content(s) != 0)
-                                       resync = 1; /* completed, better re-check flags */
-                       }
-               }
-               else if (s->rep->prod->state >= SI_ST_EST) {
-                       if ((rpf_rep ^ s->rep->flags) & BF_MASK_ANALYSER) {
-                               /* the analysers must block it themselves */
-                               resync = 1;
-                               buffer_write_ena(s->rep);
-                               if (s->rep->analysers)
-                                       process_response(s);
-                               rpf_rep = s->rep->flags;
-                       }
-               }
-
        } while (resync);
 
        if (likely((s->rep->cons->state != SI_ST_CLO) ||
@@ -809,6 +888,8 @@ void process_session(struct task *t, int *next)
 
                s->req->flags &= BF_CLEAR_READ & BF_CLEAR_WRITE;
                s->rep->flags &= BF_CLEAR_READ & BF_CLEAR_WRITE;
+               s->si[0].prev_state = s->si[0].state;
+               s->si[1].prev_state = s->si[1].state;
 
                /* Trick: if a request is being waiting for the server to respond,
                 * and if we know the server can timeout, we don't want the timeout
@@ -1766,7 +1847,7 @@ int process_request(struct session *t)
                 * - if one rule returns KO, then return KO
                 */
 
-               if (req->flags & (BF_READ_NULL | BF_SHUTR) || tick_is_expired(req->analyse_exp, now_ms))
+               if (req->flags & BF_SHUTR || tick_is_expired(req->analyse_exp, now_ms))
                        partial = 0;
                else
                        partial = ACL_PARTIAL;
@@ -1921,7 +2002,7 @@ int process_request(struct session *t)
                        }
 
                        /* 4: have we encountered a close ? */
-                       else if (req->flags & (BF_READ_NULL | BF_SHUTR)) {
+                       else if (req->flags & BF_SHUTR) {
                                txn->status = 400;
                                client_retnclose(t, error_message(t, HTTP_ERR_400));
                                msg->msg_state = HTTP_MSG_ERROR;
@@ -2607,7 +2688,7 @@ int process_request(struct session *t)
                 * timeout. We just have to check that the client is still
                 * there and that the timeout has not expired.
                 */
-               if ((req->flags & (BF_READ_NULL|BF_READ_ERROR)) == 0 &&
+               if ((req->flags & (BF_SHUTR|BF_READ_ERROR)) == 0 &&
                    !tick_is_expired(req->analyse_exp, now_ms))
                        return 0;
 
@@ -2690,7 +2771,7 @@ int process_request(struct session *t)
                 * buffer closed).
                 */
                if (req->l - body >= limit ||             /* enough bytes! */
-                   req->flags & (BF_FULL | BF_READ_ERROR | BF_SHUTR | BF_READ_NULL | BF_READ_TIMEOUT) ||
+                   req->flags & (BF_FULL | BF_READ_ERROR | BF_SHUTR | BF_READ_TIMEOUT) ||
                    tick_is_expired(req->analyse_exp, now_ms)) {
                        /* The situation will not evolve, so let's give up on the analysis. */
                        t->logs.tv_request = now;  /* update the request timer to reflect full request */
@@ -2887,7 +2968,7 @@ int process_response(struct session *t)
                                return 0;
                        }
                        /* write error to client, or close from server */
-                       else if (rep->flags & (BF_WRITE_ERROR|BF_SHUTR|BF_READ_NULL)) {
+                       else if (rep->flags & (BF_WRITE_ERROR|BF_SHUTR)) {
                                buffer_shutr_now(rep);
                                buffer_shutw_now(req);
                                //fd_delete(req->cons->fd);
index 52860eaee5a825c66662383f8e983f09869053e2..14a8df272076ecf1deddd4d729f8525b5c011126 100644 (file)
@@ -42,6 +42,7 @@
 int stream_sock_read(int fd) {
        __label__ out_wakeup, out_shutdown_r, out_error;
        struct buffer *b = fdtab[fd].cb[DIR_RD].b;
+       struct stream_interface *si = fdtab[fd].owner;
        int ret, max, retval, cur_read;
        int read_poll = MAX_READ_POLL_LOOPS;
 
@@ -239,16 +240,21 @@ int stream_sock_read(int fd) {
        if (!(b->flags & BF_READ_ACTIVITY))
                goto out_skip_wakeup;
  out_wakeup:
-       task_wakeup(fdtab[fd].owner, TASK_WOKEN_IO);
+       task_wakeup(si->owner, TASK_WOKEN_IO);
 
  out_skip_wakeup:
        fdtab[fd].ev &= ~FD_POLL_IN;
        return retval;
 
  out_shutdown_r:
+       /* we received a shutdown */
        fdtab[fd].ev &= ~FD_POLL_HUP;
        b->flags |= BF_READ_NULL;
-       b->rex = TICK_ETERNITY;
+       buffer_shutr(b);
+       /* Maybe we have to completely close the socket */
+       if (fdtab[fd].cb[DIR_WR].b->flags & BF_SHUTW)
+               goto do_close_and_return;
+       EV_FD_CLR(fd, DIR_RD);
        goto out_wakeup;
 
  out_error:
@@ -258,7 +264,27 @@ int stream_sock_read(int fd) {
        fdtab[fd].state = FD_STERROR;
        fdtab[fd].ev &= ~FD_POLL_STICKY;
        b->rex = TICK_ETERNITY;
-       goto out_wakeup;
+
+       /* Read error on the file descriptor. We close the FD and set
+        * the error on both buffers.
+        * Note: right now we only support connected sockets.
+        */
+       if (si->state != SI_ST_EST)
+               goto out_wakeup;
+
+       if (!si->err_type)
+               si->err_type = SI_ET_DATA_ERR;
+
+       buffer_shutr(fdtab[fd].cb[DIR_RD].b);
+       fdtab[fd].cb[DIR_RD].b->flags |= BF_READ_ERROR;
+       buffer_shutw(fdtab[fd].cb[DIR_WR].b);
+       fdtab[fd].cb[DIR_WR].b->flags |= BF_WRITE_ERROR;
+
+ do_close_and_return:
+       fd_delete(fd);
+       si->state = SI_ST_CLO;
+       task_wakeup(si->owner, TASK_WOKEN_IO);
+       return 1;
 }
 
 
@@ -271,6 +297,7 @@ int stream_sock_read(int fd) {
 int stream_sock_write(int fd) {
        __label__ out_wakeup, out_error;
        struct buffer *b = fdtab[fd].cb[DIR_WR].b;
+       struct stream_interface *si = fdtab[fd].owner;
        int ret, max, retval;
        int write_poll = MAX_WRITE_POLL_LOOPS;
 
@@ -411,7 +438,7 @@ int stream_sock_write(int fd) {
        if (!(b->flags & BF_WRITE_ACTIVITY))
                goto out_skip_wakeup;
  out_wakeup:
-       task_wakeup(fdtab[fd].owner, TASK_WOKEN_IO);
+       task_wakeup(si->owner, TASK_WOKEN_IO);
 
  out_skip_wakeup:
        fdtab[fd].ev &= ~FD_POLL_OUT;
@@ -424,7 +451,25 @@ int stream_sock_write(int fd) {
        fdtab[fd].state = FD_STERROR;
        fdtab[fd].ev &= ~FD_POLL_STICKY;
        b->wex = TICK_ETERNITY;
-       goto out_wakeup;
+       /* Read error on the file descriptor. We close the FD and set
+        * the error on both buffers.
+        * Note: right now we only support connected sockets.
+        */
+       if (si->state != SI_ST_EST)
+               goto out_wakeup;
+
+       if (!si->err_type)
+               si->err_type = SI_ET_DATA_ERR;
+
+       buffer_shutr(fdtab[fd].cb[DIR_RD].b);
+       fdtab[fd].cb[DIR_RD].b->flags |= BF_READ_ERROR;
+       buffer_shutw(fdtab[fd].cb[DIR_WR].b);
+       fdtab[fd].cb[DIR_WR].b->flags |= BF_WRITE_ERROR;
+
+       fd_delete(fd);
+       si->state = SI_ST_CLO;
+       task_wakeup(si->owner, TASK_WOKEN_IO);
+       return 1;
 }
 
 
@@ -433,7 +478,7 @@ int stream_sock_write(int fd) {
  * phase. It controls the file descriptor's status, as well as read and write
  * timeouts.
  */
-int stream_sock_data_check_errors(int fd)
+int stream_sock_data_check_timeouts(int fd)
 {
        struct buffer *ib = fdtab[fd].cb[DIR_RD].b;
        struct buffer *ob = fdtab[fd].cb[DIR_WR].b;
@@ -446,24 +491,6 @@ int stream_sock_data_check_errors(int fd)
                ib->flags, ob->flags,
                ib->l, ob->l);
 
-       /* Read or write error on the file descriptor */
-       if (unlikely(fdtab[fd].state == FD_STERROR)) {
-               //trace_term(t, TT_HTTP_SRV_6);
-               if (!ob->cons->err_type) {
-                       //ob->cons->err_loc = t->srv;
-                       ob->cons->err_type = SI_ET_DATA_ERR;
-               }
-               buffer_shutw(ob);
-               ob->flags |= BF_WRITE_ERROR;
-               buffer_shutr(ib);
-               ib->flags |= BF_READ_ERROR;
-
-       do_close_and_return:
-               fd_delete(fd);
-               ob->cons->state = SI_ST_CLO;
-               return 0;
-       }
-
        /* Read timeout */
        if (unlikely(!(ib->flags & (BF_SHUTR|BF_READ_TIMEOUT)) && tick_is_expired(ib->rex, now_ms))) {
                //trace_term(t, TT_HTTP_SRV_12);
@@ -473,8 +500,13 @@ int stream_sock_data_check_errors(int fd)
                        ob->cons->err_type = SI_ET_DATA_TO;
                }
                buffer_shutr(ib);
-               if (ob->flags & BF_SHUTW)
-                       goto do_close_and_return;
+               if (ob->flags & BF_SHUTW) {
+               do_close_and_return:
+                       fd_delete(fd);
+                       ob->cons->state = SI_ST_CLO;
+                       return 0;
+               }
+
                EV_FD_CLR(fd, DIR_RD);
        }
 
@@ -506,18 +538,18 @@ int stream_sock_data_update(int fd)
        struct buffer *ib = fdtab[fd].cb[DIR_RD].b;
        struct buffer *ob = fdtab[fd].cb[DIR_WR].b;
 
-       DPRINTF(stderr,"[%u] %s: fd=%d owner=%p ib=%p, ob=%p, exp(r,w)=%u,%u ibf=%08x obf=%08x ibl=%d obl=%d\n",
+       DPRINTF(stderr,"[%u] %s: fd=%d owner=%p ib=%p, ob=%p, exp(r,w)=%u,%u ibf=%08x obf=%08x ibl=%d obl=%d si=%d\n",
                now_ms, __FUNCTION__,
                fd, fdtab[fd].owner,
                ib, ob,
                ib->rex, ob->wex,
                ib->flags, ob->flags,
-               ib->l, ob->l);
+               ib->l, ob->l, ob->cons->state);
 
        /* Check if we need to close the read side */
        if (!(ib->flags & BF_SHUTR)) {
                /* Last read, forced read-shutdown, or other end closed */
-               if (ib->flags & (BF_READ_NULL|BF_SHUTR_NOW|BF_SHUTW)) {
+               if (ib->flags & (BF_SHUTR_NOW|BF_SHUTW)) {
                        //trace_term(t, TT_HTTP_SRV_10);
                        buffer_shutr(ib);
                        if (ob->flags & BF_SHUTW) {
@@ -560,13 +592,13 @@ int stream_sock_data_finish(int fd)
        struct buffer *ib = fdtab[fd].cb[DIR_RD].b;
        struct buffer *ob = fdtab[fd].cb[DIR_WR].b;
 
-       DPRINTF(stderr,"[%u] %s: fd=%d owner=%p ib=%p, ob=%p, exp(r,w)=%u,%u ibf=%08x obf=%08x ibl=%d obl=%d\n",
+       DPRINTF(stderr,"[%u] %s: fd=%d owner=%p ib=%p, ob=%p, exp(r,w)=%u,%u ibf=%08x obf=%08x ibl=%d obl=%d si=%d\n",
                now_ms, __FUNCTION__,
                fd, fdtab[fd].owner,
                ib, ob,
                ib->rex, ob->wex,
                ib->flags, ob->flags,
-               ib->l, ob->l);
+               ib->l, ob->l, ob->cons->state);
 
        /* Check if we need to close the read side */
        if (!(ib->flags & BF_SHUTR)) {