]> git.ipfire.org Git - thirdparty/haproxy.git/commitdiff
[MAJOR] implemented client-side support for PF_UNIX sockets
authorWilly Tarreau <w@1wt.eu>
Tue, 16 Oct 2007 15:34:28 +0000 (17:34 +0200)
committerWilly Tarreau <w@1wt.eu>
Thu, 18 Oct 2007 12:11:15 +0000 (14:11 +0200)
A new file, proto_uxst.c, implements support of PF_UNIX sockets
of type SOCK_STREAM. It relies on generic stream_sock_read/write
and uses its own accept primitive which also tries to be generic.

Right now it only implements an echo service in sight of a general
support for start dumping via unix socket. The echo code is more
of a proof of concept than useful code.

Makefile
Makefile.bsd
Makefile.osx
include/proto/proto_uxst.h [new file with mode: 0644]
include/types/global.h
include/types/protocols.h
src/proto_uxst.c [new file with mode: 0644]
src/session.c

index f7474d37eafb8ba0e83dee98ce080630073dba5e..7a0a0774ca18907bc6ae02c5881f7f9cad558b04 100644 (file)
--- a/Makefile
+++ b/Makefile
@@ -229,7 +229,7 @@ all: haproxy
 OBJS = src/haproxy.o src/sessionhash.o src/base64.o src/protocols.o \
        src/uri_auth.o src/standard.o src/buffers.o src/log.o src/task.o \
        src/time.o src/fd.o src/regex.o src/cfgparse.o src/server.o \
-       src/checks.o src/queue.o src/client.o src/proxy.o \
+       src/checks.o src/queue.o src/client.o src/proxy.o src/proto_uxst.o \
        src/proto_http.o src/stream_sock.o src/appsession.o src/backend.o \
        src/session.o src/hdr_idx.o src/ev_select.o src/acl.o src/memory.o
 
index 31a894d1f28d8773a4cd39e6c0938eb274fa42b0..cf0269432f738bc1d65c4d4f41157fe2f19fa37b 100644 (file)
@@ -101,7 +101,7 @@ LDFLAGS = -g
 OBJS = src/haproxy.o src/sessionhash.o src/base64.o src/protocols.o \
        src/uri_auth.o src/standard.o src/buffers.o src/log.o src/task.o \
        src/time.o src/fd.o src/regex.o src/cfgparse.o src/server.o \
-       src/checks.o src/queue.o src/client.o src/proxy.o \
+       src/checks.o src/queue.o src/client.o src/proxy.o src/proto_uxst.o \
        src/proto_http.o src/stream_sock.o src/appsession.o src/backend.o \
        src/session.o src/hdr_idx.o src/ev_select.o src/ev_poll.o \
        src/ev_kqueue.o src/acl.o src/memory.o
index 5d6d875f6a030ead56ff814215a0e154a2b68621..a1a6c470f3769eec332eaf2ecee10cf7a027631f 100644 (file)
@@ -98,7 +98,7 @@ LDFLAGS = -g -isysroot /Developer/SDKs/MacOSX10.4u.sdk -arch ppc -arch i386
 OBJS = src/haproxy.o src/sessionhash.o src/base64.o src/protocols.o \
        src/uri_auth.o src/standard.o src/buffers.o src/log.o src/task.o \
        src/time.o src/fd.o src/regex.o src/cfgparse.o src/server.o \
-       src/checks.o src/queue.o src/client.o src/proxy.o \
+       src/checks.o src/queue.o src/client.o src/proxy.o src/proto_uxst.o \
        src/proto_http.o src/stream_sock.o src/appsession.o src/backend.o \
        src/session.o src/hdr_idx.o src/ev_select.o src/ev_poll.o src/acl.o \
        src/memory.o
diff --git a/include/proto/proto_uxst.h b/include/proto/proto_uxst.h
new file mode 100644 (file)
index 0000000..642beb8
--- /dev/null
@@ -0,0 +1,40 @@
+/*
+  include/proto/proto_uxst.h
+  This file contains UNIX-stream socket protocol definitions.
+
+  Copyright (C) 2000-2007 Willy Tarreau - w@1wt.eu
+  
+  This library is free software; you can redistribute it and/or
+  modify it under the terms of the GNU Lesser General Public
+  License as published by the Free Software Foundation, version 2.1
+  exclusively.
+
+  This library is distributed in the hope that it will be useful,
+  but WITHOUT ANY WARRANTY; without even the implied warranty of
+  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
+  Lesser General Public License for more details.
+
+  You should have received a copy of the GNU Lesser General Public
+  License along with this library; if not, write to the Free Software
+  Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301  USA
+*/
+
+#ifndef _PROTO_PROTO_UXST_H
+#define _PROTO_PROTO_UXST_H
+
+#include <common/config.h>
+#include <types/session.h>
+#include <types/task.h>
+
+int uxst_event_accept(int fd);
+void uxst_add_listener(struct listener *listener);
+void process_uxst_stats(struct task *t, struct timeval *next);
+
+#endif /* _PROTO_PROTO_UXST_H */
+
+/*
+ * Local variables:
+ *  c-indent-level: 8
+ *  c-basic-offset: 8
+ * End:
+ */
index 340b583d778bee58fae246568b00348b386df59d..bf95ffdc35c565d9175ca2351bf8165cf3f207e6 100644 (file)
@@ -2,7 +2,7 @@
   include/types/global.h
   Global variables.
 
-  Copyright (C) 2000-2006 Willy Tarreau - w@1wt.eu
+  Copyright (C) 2000-2007 Willy Tarreau - w@1wt.eu
   
   This library is free software; you can redistribute it and/or
   modify it under the terms of the GNU Lesser General Public
@@ -25,6 +25,7 @@
 #include <netinet/in.h>
 
 #include <common/config.h>
+#include <types/protocols.h>
 #include <types/task.h>
 
 /* modes of operation (global.mode) */
@@ -64,6 +65,8 @@ struct global {
        struct {
                int maxpollevents; /* max number of poll events at once */
        } tune;
+       struct listener stats_sock; /* unix socket listener for statistics */
+       struct timeval stats_timeout;
 };
 
 extern struct global global;
index 12636d83aafb1aaf26dfa9ea4ee9df50b0286926..2c68e8866f2c6f340dff9d61061d188aad027761 100644 (file)
 
 /* listener state */
 #define LI_NEW         0       /* not initialized yet */
-#define LI_LISTEN      1       /* started, listening but not enabled */
-#define LI_READY       2       /* started, listening and enabled */
-#define LI_FULL                3       /* reached its connection limit */
+#define LI_INIT                1       /* attached to the protocol, but not listening yet */
+#define LI_LISTEN      2       /* started, listening but not enabled */
+#define LI_READY       3       /* started, listening and enabled */
+#define LI_FULL                4       /* reached its connection limit */
 
 /* The listener will be directly referenced by the fdtab[] which holds its
  * socket. The listener provides the protocol-specific accept() function to
@@ -48,7 +49,7 @@
  */
 struct listener {
        int fd;                         /* the listen socket */
-       int state;                      /* state: NEW, READY, FULL */
+       int state;                      /* state: NEW, INIT, LISTEN, READY, FULL */
        struct sockaddr_storage addr;   /* the address we listen to */
        struct protocol *proto;         /* protocol this listener belongs to */
        int nbconn;                     /* current number of connections on this listener */
diff --git a/src/proto_uxst.c b/src/proto_uxst.c
new file mode 100644 (file)
index 0000000..1920376
--- /dev/null
@@ -0,0 +1,1414 @@
+/*
+ * UNIX SOCK_STREAM protocol layer (uxst)
+ *
+ * Copyright 2000-2007 Willy Tarreau <w@1wt.eu>
+ *
+ * This program is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU General Public License
+ * as published by the Free Software Foundation; either version
+ * 2 of the License, or (at your option) any later version.
+ *
+ */
+
+#include <ctype.h>
+#include <errno.h>
+#include <fcntl.h>
+#include <stdio.h>
+#include <stdlib.h>
+#include <string.h>
+#include <syslog.h>
+#include <time.h>
+
+#include <sys/param.h>
+#include <sys/socket.h>
+#include <sys/stat.h>
+#include <sys/types.h>
+#include <sys/un.h>
+
+#include <common/compat.h>
+#include <common/config.h>
+#include <common/debug.h>
+#include <common/memory.h>
+#include <common/mini-clist.h>
+#include <common/standard.h>
+#include <common/time.h>
+#include <common/version.h>
+
+#include <types/acl.h>
+#include <types/capture.h>
+#include <types/client.h>
+#include <types/global.h>
+#include <types/polling.h>
+#include <types/proxy.h>
+#include <types/server.h>
+
+#include <proto/acl.h>
+#include <proto/backend.h>
+#include <proto/buffers.h>
+#include <proto/fd.h>
+#include <proto/log.h>
+#include <proto/protocols.h>
+#include <proto/proto_uxst.h>
+#include <proto/queue.h>
+#include <proto/session.h>
+#include <proto/stream_sock.h>
+#include <proto/task.h>
+
+#ifndef MAXPATHLEN
+#define MAXPATHLEN 128
+#endif
+
+/* This function creates a named PF_UNIX stream socket at address <path>. Note
+ * that the path cannot be NULL nor empty.
+ * It returns the assigned file descriptor, or -1 in the event of an error.
+ */
+static int create_uxst_socket(const char *path)
+{
+       char tempname[MAXPATHLEN];
+       char backname[MAXPATHLEN];
+       struct sockaddr_un addr;
+
+       int ret, sock;
+
+       /* 1. create socket names */
+       if (!path[0]) {
+               Alert("Invalid name for a UNIX socket. Aborting.\n");
+               goto err_return;
+       }
+
+       ret = snprintf(tempname, MAXPATHLEN, "%s.%d.tmp", path, pid);
+       if (ret < 0 || ret >= MAXPATHLEN) {
+               Alert("name too long for UNIX socket. Aborting.\n");
+               goto err_return;
+       }
+
+       ret = snprintf(backname, MAXPATHLEN, "%s.%d.bak", path, pid);
+       if (ret < 0 || ret >= MAXPATHLEN) {
+               Alert("name too long for UNIX socket. Aborting.\n");
+               goto err_return;
+       }
+
+       /* 2. clean existing orphaned entries */
+       if (unlink(tempname) < 0 && errno != ENOENT) {
+               Alert("error when trying to unlink previous UNIX socket. Aborting.\n");
+               goto err_return;
+       }
+
+       if (unlink(backname) < 0 && errno != ENOENT) {
+               Alert("error when trying to unlink previous UNIX socket. Aborting.\n");
+               goto err_return;
+       }
+
+       /* 3. backup existing socket */
+       if (link(path, backname) < 0 && errno != ENOENT) {
+               Alert("error when trying to preserve previous UNIX socket. Aborting.\n");
+               goto err_return;
+       }
+
+       /* 4. prepare new socket */
+       addr.sun_family = AF_UNIX;
+       strncpy(addr.sun_path, tempname, sizeof(addr.sun_path));
+       addr.sun_path[sizeof(addr.sun_path) - 1] = 0;
+
+       sock = socket(PF_UNIX, SOCK_STREAM, 0);
+       if (sock < 0) {
+               Alert("cannot create socket for UNIX listener. Aborting.\n");
+               goto err_unlink_back;
+       }
+
+       if (sock >= global.maxsock) {
+               Alert("socket(): not enough free sockets for UNIX listener. Raise -n argument. Aborting.\n");
+               goto err_unlink_temp;
+       }
+
+       if (fcntl(sock, F_SETFL, O_NONBLOCK) == -1) {
+               Alert("cannot make UNIX socket non-blocking. Aborting.\n");
+               goto err_unlink_temp;
+       }
+
+       if (bind(sock, (struct sockaddr *)&addr, sizeof(addr)) < 0) {
+               /* note that bind() creates the socket <tempname> on the file system */
+               Alert("cannot bind socket for UNIX listener. Aborting.\n");
+               goto err_unlink_temp;
+       }
+
+       if (listen(sock, 0) < 0) {
+               Alert("cannot listen to socket for UNIX listener. Aborting.\n");
+               goto err_unlink_temp;
+       }
+
+       /* 5. install.
+        * Point of no return: we are ready, we'll switch the sockets. We don't
+        * fear loosing the socket <path> because we have a copy of it in
+        * backname.
+        */
+       if (rename(tempname, path) < 0) {
+               Alert("cannot switch final and temporary sockets for UNIX listener. Aborting.\n");
+               goto err_rename;
+       }
+
+       /* 6. cleanup */
+       unlink(backname); /* no need to keep this one either */
+
+       return sock;
+
+ err_rename:
+       ret = rename(backname, path);
+       if (ret < 0 && errno == ENOENT)
+               unlink(path);
+ err_unlink_temp:
+       unlink(tempname);
+       close(sock);
+ err_unlink_back:
+       unlink(backname);
+ err_return:
+       return -1;
+}
+
+/* Tries to destroy the UNIX stream socket <path>. The socket must not be used
+ * anymore. It practises best effort, and no error is returned.
+ */
+static void destroy_uxst_socket(const char *path)
+{
+       struct sockaddr_un addr;
+       int sock, ret;
+
+       /* We might have been chrooted, so we may not be able to access the
+        * socket. In order to avoid bothering the other end, we connect with a
+        * wrong protocol, namely SOCK_DGRAM. The return code from connect()
+        * is enough to know if the socket is still live or not. If it's live
+        * in mode SOCK_STREAM, we get EPROTOTYPE or anything else but not
+        * ECONNREFUSED. In this case, we do not touch it because it's used
+        * by some other process.
+        */
+       sock = socket(PF_UNIX, SOCK_DGRAM, 0);
+       if (sock < 0)
+               return;
+
+       addr.sun_family = AF_UNIX;
+       strncpy(addr.sun_path, path, sizeof(addr.sun_path));
+       addr.sun_path[sizeof(addr.sun_path)] = 0;
+       ret = connect(sock, (struct sockaddr *)&addr, sizeof(addr));
+       if (ret < 0 && errno == ECONNREFUSED) {
+               /* Connect failed: the socket still exists but is not used
+                * anymore. Let's remove this socket now.
+                */
+               unlink(path);
+       }
+       close(sock);
+}
+
+
+/* This function creates all UNIX sockets bound to the protocol entry <proto>.
+ * It is intended to be used as the protocol's bind_all() function.
+ * The sockets will be registered but not added to any fd_set, in order not to
+ * loose them across the fork(). A call to uxst_enable_listeners() is needed
+ * to complete initialization.
+ *
+ * The return value is composed from ERR_NONE, ERR_RETRYABLE and ERR_FATAL.
+ */
+static int uxst_bind_listeners(struct protocol *proto)
+{
+       struct listener *listener;
+       int err = ERR_NONE;
+       int fd;
+
+       list_for_each_entry(listener, &proto->listeners, proto_list) {
+               if (listener->state != LI_INIT)
+                       continue; /* already started */
+
+               fd = create_uxst_socket(((struct sockaddr_un *)&listener->addr)->sun_path);
+               if (fd == -1) {
+                       err |= ERR_FATAL;
+                       continue;
+               }
+       
+               /* the socket is listening */
+               listener->fd = fd;
+               listener->state = LI_LISTEN;
+
+               /* the function for the accept() event */
+               fd_insert(fd);
+               fdtab[fd].cb[DIR_RD].f = listener->accept;
+               fdtab[fd].cb[DIR_WR].f = NULL; /* never called */
+               fdtab[fd].cb[DIR_RD].b = fdtab[fd].cb[DIR_WR].b = NULL;
+               fdtab[fd].owner = (struct task *)listener; /* reference the listener instead of a task */
+               fdtab[fd].state = FD_STLISTEN;
+               fdtab[fd].peeraddr = NULL;
+               fdtab[fd].peerlen = 0;
+               fdtab[fd].listener = NULL;
+               fdtab[fd].ev = 0;
+       }
+
+       return err;
+}
+
+/* This function adds the UNIX sockets file descriptors to the polling lists
+ * for all listeners in the LI_LISTEN state. It is intended to be used as the
+ * protocol's enable_all() primitive, after the fork(). It always returns
+ * ERR_NONE.
+ */
+static int uxst_enable_listeners(struct protocol *proto)
+{
+       struct listener *listener;
+
+       list_for_each_entry(listener, &proto->listeners, proto_list) {
+               if (listener->state == LI_LISTEN) {
+                       EV_FD_SET(listener->fd, DIR_RD);
+                       listener->state = LI_READY;
+               }
+       }
+       return ERR_NONE;
+}
+
+/* This function stops all listening UNIX sockets bound to the protocol
+ * <proto>. It does not detaches them from the protocol.
+ * It always returns ERR_NONE.
+ */
+static int uxst_unbind_listeners(struct protocol *proto)
+{
+       struct listener *listener;
+
+       list_for_each_entry(listener, &proto->listeners, proto_list) {
+               if (listener->state != LI_INIT) {
+                       EV_FD_CLR(listener->fd, DIR_RD);
+                       close(listener->fd);
+                       listener->state = LI_INIT;
+                       destroy_uxst_socket(((struct sockaddr_un *)&listener->addr)->sun_path);
+               }
+       }
+       return ERR_NONE;
+}
+
+/*
+ * This function is called on a read event from a listen socket, corresponding
+ * to an accept. It tries to accept as many connections as possible.
+ * It returns 0. Since we use UNIX sockets on the local system for monitoring
+ * purposes and other related things, we do not need to output as many messages
+ * as with TCP which can fall under attack.
+ */
+int uxst_event_accept(int fd) {
+       struct listener *l = (struct listener *)fdtab[fd].owner;
+       struct session *s;
+       struct task *t;
+       int cfd;
+       int max_accept;
+
+       if (global.nbproc > 1)
+               max_accept = 8; /* let other processes catch some connections too */
+       else
+               max_accept = -1;
+
+       while (max_accept--) {
+               struct sockaddr_storage addr;
+               socklen_t laddr = sizeof(addr);
+
+               if ((cfd = accept(fd, (struct sockaddr *)&addr, &laddr)) == -1) {
+                       switch (errno) {
+                       case EAGAIN:
+                       case EINTR:
+                       case ECONNABORTED:
+                               return 0;           /* nothing more to accept */
+                       case ENFILE:
+                               /* Process reached system FD limit. Check system tunables. */
+                               return 0;
+                       case EMFILE:
+                               /* Process reached process FD limit. Check 'ulimit-n'. */
+                               return 0;
+                       case ENOBUFS:
+                       case ENOMEM:
+                               /* Process reached system memory limit. Check system tunables. */
+                               return 0;
+                       default:
+                               return 0;
+                       }
+               }
+
+               if (l->nbconn >= l->maxconn) {
+                       /* too many connections, we shoot this one and return.
+                        * FIXME: it would be better to simply switch the listener's
+                        * state to LI_FULL and disable the FD. We could re-enable
+                        * it upon fd_delete(), but this requires all protocols to
+                        * be switched.
+                        */
+                       close(cfd);
+                       return 0;
+               }
+
+               if ((s = pool_alloc2(pool2_session)) == NULL) {
+                       Alert("out of memory in uxst_event_accept().\n");
+                       close(cfd);
+                       return 0;
+               }
+
+               if ((t = pool_alloc2(pool2_task)) == NULL) {
+                       Alert("out of memory in uxst_event_accept().\n");
+                       close(cfd);
+                       pool_free2(pool2_session, s);
+                       return 0;
+               }
+
+               s->cli_addr = addr;
+
+               /* FIXME: should be checked earlier */
+               if (cfd >= global.maxsock) {
+                       Alert("accept(): not enough free sockets. Raise -n argument. Giving up.\n");
+                       close(cfd);
+                       pool_free2(pool2_task, t);
+                       pool_free2(pool2_session, s);
+                       return 0;
+               }
+
+               if (fcntl(cfd, F_SETFL, O_NONBLOCK) == -1) {
+                       Alert("accept(): cannot set the socket in non blocking mode. Giving up\n");
+                       close(cfd);
+                       pool_free2(pool2_task, t);
+                       pool_free2(pool2_session, s);
+                       return 0;
+               }
+
+               t->wq = NULL;
+               t->qlist.p = NULL;
+               t->state = TASK_IDLE;
+               t->process = l->handler;
+               t->context = s;
+
+               s->task = t;
+               s->fe = NULL;
+               s->be = NULL;
+
+               s->cli_state = CL_STDATA;
+               s->srv_state = SV_STIDLE;
+               s->req = s->rep = NULL; /* will be allocated later */
+
+               s->cli_fd = cfd;
+               s->srv_fd = -1;
+               s->srv = NULL;
+               s->pend_pos = NULL;
+
+               memset(&s->logs, 0, sizeof(s->logs));
+               memset(&s->txn, 0, sizeof(s->txn));
+
+               s->data_source = DATA_SRC_NONE;
+               s->uniq_id = totalconn;
+
+               if ((s->req = pool_alloc2(pool2_buffer)) == NULL) { /* no memory */
+                       close(cfd); /* nothing can be done for this fd without memory */
+                       pool_free2(pool2_task, t);
+                       pool_free2(pool2_session, s);
+                       return 0;
+               }
+
+               if ((s->rep = pool_alloc2(pool2_buffer)) == NULL) { /* no memory */
+                       pool_free2(pool2_buffer, s->req);
+                       close(cfd); /* nothing can be done for this fd without memory */
+                       pool_free2(pool2_task, t);
+                       pool_free2(pool2_session, s);
+                       return 0;
+               }
+
+               buffer_init(s->req);
+               buffer_init(s->rep);
+               s->req->rlim += BUFSIZE;
+               s->rep->rlim += BUFSIZE;
+
+               fd_insert(cfd);
+               fdtab[cfd].owner = t;
+               fdtab[cfd].listener = l;
+               fdtab[cfd].state = FD_STREADY;
+               fdtab[cfd].cb[DIR_RD].f = l->proto->read;
+               fdtab[cfd].cb[DIR_RD].b = s->req;
+               fdtab[cfd].cb[DIR_WR].f = l->proto->write;
+               fdtab[cfd].cb[DIR_WR].b = s->rep;
+               fdtab[cfd].peeraddr = (struct sockaddr *)&s->cli_addr;
+               fdtab[cfd].peerlen = sizeof(s->cli_addr);
+               fdtab[cfd].ev = 0;
+
+
+               tv_eternity(&s->req->rex);
+               tv_eternity(&s->req->wex);
+               tv_eternity(&s->req->cex);
+               tv_eternity(&s->rep->rex);
+               tv_eternity(&s->rep->wex);
+
+               tv_eternity(&s->req->wto);
+               tv_eternity(&s->req->cto);
+               tv_eternity(&s->req->rto);
+               tv_eternity(&s->rep->rto);
+               tv_eternity(&s->rep->cto);
+               tv_eternity(&s->rep->wto);
+
+               if (l->timeout)
+                       s->req->rto = *l->timeout;
+
+               if (l->timeout)
+                       s->rep->wto = *l->timeout;
+
+               tv_eternity(&t->expire);
+               if (l->timeout && tv_isset(l->timeout)) {
+                       EV_FD_SET(cfd, DIR_RD);
+                       tv_add(&s->req->rex, &now, &s->req->rto);
+                       tv_add(&s->rep->wex, &now, &s->rep->wto);
+                       t->expire = s->req->rex;
+               }
+
+               task_queue(t);
+               task_wakeup(t);
+
+               l->nbconn++; /* warning! right now, it's up to the handler to decrease this */
+               if (l->nbconn >= l->maxconn) {
+                       EV_FD_CLR(l->fd, DIR_RD);
+                       l->state = LI_FULL;
+               }
+               actconn++;
+               totalconn++;
+
+               //fprintf(stderr, "accepting from %p => %d conn, %d total, task=%p, cfd=%d, maxfd=%d\n", p, actconn, totalconn, t, cfd, maxfd);
+       } /* end of while (p->feconn < p->maxconn) */
+       //fprintf(stderr,"fct %s:%d\n", __FUNCTION__, __LINE__);
+       return 0;
+}
+
+/*
+ * manages the client FSM and its socket. It returns 1 if a state has changed
+ * (and a resync may be needed), otherwise 0.
+ */
+static int process_uxst_cli(struct session *t)
+{
+       int s = t->srv_state;
+       int c = t->cli_state;
+       struct buffer *req = t->req;
+       struct buffer *rep = t->rep;
+       //fprintf(stderr,"fct %s:%d\n", __FUNCTION__, __LINE__);
+       if (c == CL_STDATA) {
+               /* FIXME: this error handling is partly buggy because we always report
+                * a 'DATA' phase while we don't know if the server was in IDLE, CONN
+                * or HEADER phase. BTW, it's not logical to expire the client while
+                * we're waiting for the server to connect.
+                */
+               /* read or write error */
+               if (rep->flags & BF_WRITE_ERROR || req->flags & BF_READ_ERROR) {
+                       buffer_shutr(req);
+                       buffer_shutw(rep);
+                       fd_delete(t->cli_fd);
+                       t->cli_state = CL_STCLOSE;
+                       if (!(t->flags & SN_ERR_MASK))
+                               t->flags |= SN_ERR_CLICL;
+                       if (!(t->flags & SN_FINST_MASK)) {
+                               if (t->pend_pos)
+                                       t->flags |= SN_FINST_Q;
+                               else if (s == SV_STCONN)
+                                       t->flags |= SN_FINST_C;
+                               else
+                                       t->flags |= SN_FINST_D;
+                       }
+                       return 1;
+               }
+               /* last read, or end of server write */
+               else if (req->flags & BF_READ_NULL || s == SV_STSHUTW || s == SV_STCLOSE) {
+                       EV_FD_CLR(t->cli_fd, DIR_RD);
+                       buffer_shutr(req);
+                       t->cli_state = CL_STSHUTR;
+                       return 1;
+               }       
+               /* last server read and buffer empty */
+               else if ((s == SV_STSHUTR || s == SV_STCLOSE) && (rep->l == 0)) {
+                       EV_FD_CLR(t->cli_fd, DIR_WR);
+                       buffer_shutw(rep);
+                       shutdown(t->cli_fd, SHUT_WR);
+                       /* We must ensure that the read part is still alive when switching
+                        * to shutw */
+                       EV_FD_SET(t->cli_fd, DIR_RD);
+                       tv_add_ifset(&req->rex, &now, &req->rto);
+                       t->cli_state = CL_STSHUTW;
+                       //fprintf(stderr,"%p:%s(%d), c=%d, s=%d\n", t, __FUNCTION__, __LINE__, t->cli_state, t->cli_state);
+                       return 1;
+               }
+               /* read timeout */
+               else if (tv_isle(&req->rex, &now)) {
+                       EV_FD_CLR(t->cli_fd, DIR_RD);
+                       buffer_shutr(req);
+                       t->cli_state = CL_STSHUTR;
+                       if (!(t->flags & SN_ERR_MASK))
+                               t->flags |= SN_ERR_CLITO;
+                       if (!(t->flags & SN_FINST_MASK)) {
+                               if (t->pend_pos)
+                                       t->flags |= SN_FINST_Q;
+                               else if (s == SV_STCONN)
+                                       t->flags |= SN_FINST_C;
+                               else
+                                       t->flags |= SN_FINST_D;
+                       }
+                       return 1;
+               }       
+               /* write timeout */
+               else if (tv_isle(&rep->wex, &now)) {
+                       EV_FD_CLR(t->cli_fd, DIR_WR);
+                       buffer_shutw(rep);
+                       shutdown(t->cli_fd, SHUT_WR);
+                       /* We must ensure that the read part is still alive when switching
+                        * to shutw */
+                       EV_FD_SET(t->cli_fd, DIR_RD);
+                       tv_add_ifset(&req->rex, &now, &req->rto);
+
+                       t->cli_state = CL_STSHUTW;
+                       if (!(t->flags & SN_ERR_MASK))
+                               t->flags |= SN_ERR_CLITO;
+                       if (!(t->flags & SN_FINST_MASK)) {
+                               if (t->pend_pos)
+                                       t->flags |= SN_FINST_Q;
+                               else if (s == SV_STCONN)
+                                       t->flags |= SN_FINST_C;
+                               else
+                                       t->flags |= SN_FINST_D;
+                       }
+                       return 1;
+               }
+
+               if (req->l >= req->rlim - req->data) {
+                       /* no room to read more data */
+                       if (EV_FD_COND_C(t->cli_fd, DIR_RD)) {
+                               /* stop reading until we get some space */
+                               tv_eternity(&req->rex);
+                       }
+               } else {
+                       /* there's still some space in the buffer */
+                       if (EV_FD_COND_S(t->cli_fd, DIR_RD)) {
+                               if (!tv_isset(&req->rto) ||
+                                   (t->srv_state < SV_STDATA && tv_isset(&req->wto)))
+                                       /* If the client has no timeout, or if the server not ready yet, and we
+                                        * know for sure that it can expire, then it's cleaner to disable the
+                                        * timeout on the client side so that too low values cannot make the
+                                        * sessions abort too early.
+                                        */
+                                       tv_eternity(&req->rex);
+                               else
+                                       tv_add(&req->rex, &now, &req->rto);
+                       }
+               }
+
+               if ((rep->l == 0) ||
+                   ((s < SV_STDATA) /* FIXME: this may be optimized && (rep->w == rep->h)*/)) {
+                       if (EV_FD_COND_C(t->cli_fd, DIR_WR)) {
+                               /* stop writing */
+                               tv_eternity(&rep->wex);
+                       }
+               } else {
+                       /* buffer not empty */
+                       if (EV_FD_COND_S(t->cli_fd, DIR_WR)) {
+                               /* restart writing */
+                               if (tv_add_ifset(&rep->wex, &now, &rep->wto)) {
+                                       /* FIXME: to prevent the client from expiring read timeouts during writes,
+                                        * we refresh it. */
+                                       req->rex = rep->wex;
+                               }
+                               else
+                                       tv_eternity(&rep->wex);
+                       }
+               }
+               return 0; /* other cases change nothing */
+       }
+       else if (c == CL_STSHUTR) {
+               if (rep->flags & BF_WRITE_ERROR) {
+                       buffer_shutw(rep);
+                       fd_delete(t->cli_fd);
+                       t->cli_state = CL_STCLOSE;
+                       if (!(t->flags & SN_ERR_MASK))
+                               t->flags |= SN_ERR_CLICL;
+                       if (!(t->flags & SN_FINST_MASK)) {
+                               if (t->pend_pos)
+                                       t->flags |= SN_FINST_Q;
+                               else if (s == SV_STCONN)
+                                       t->flags |= SN_FINST_C;
+                               else
+                                       t->flags |= SN_FINST_D;
+                       }
+                       return 1;
+               }
+               else if ((s == SV_STSHUTR || s == SV_STCLOSE) && (rep->l == 0)) {
+                       buffer_shutw(rep);
+                       fd_delete(t->cli_fd);
+                       t->cli_state = CL_STCLOSE;
+                       return 1;
+               }
+               else if (tv_isle(&rep->wex, &now)) {
+                       buffer_shutw(rep);
+                       fd_delete(t->cli_fd);
+                       t->cli_state = CL_STCLOSE;
+                       if (!(t->flags & SN_ERR_MASK))
+                               t->flags |= SN_ERR_CLITO;
+                       if (!(t->flags & SN_FINST_MASK)) {
+                               if (t->pend_pos)
+                                       t->flags |= SN_FINST_Q;
+                               else if (s == SV_STCONN)
+                                       t->flags |= SN_FINST_C;
+                               else
+                                       t->flags |= SN_FINST_D;
+                       }
+                       return 1;
+               }
+
+               if (rep->l == 0) {
+                       if (EV_FD_COND_C(t->cli_fd, DIR_WR)) {
+                               /* stop writing */
+                               tv_eternity(&rep->wex);
+                       }
+               } else {
+                       /* buffer not empty */
+                       if (EV_FD_COND_S(t->cli_fd, DIR_WR)) {
+                               /* restart writing */
+                               if (!tv_add_ifset(&rep->wex, &now, &rep->wto))
+                                       tv_eternity(&rep->wex);
+                       }
+               }
+               return 0;
+       }
+       else if (c == CL_STSHUTW) {
+               if (req->flags & BF_READ_ERROR) {
+                       buffer_shutr(req);
+                       fd_delete(t->cli_fd);
+                       t->cli_state = CL_STCLOSE;
+                       if (!(t->flags & SN_ERR_MASK))
+                               t->flags |= SN_ERR_CLICL;
+                       if (!(t->flags & SN_FINST_MASK)) {
+                               if (t->pend_pos)
+                                       t->flags |= SN_FINST_Q;
+                               else if (s == SV_STCONN)
+                                       t->flags |= SN_FINST_C;
+                               else
+                                       t->flags |= SN_FINST_D;
+                       }
+                       return 1;
+               }
+               else if (req->flags & BF_READ_NULL || s == SV_STSHUTW || s == SV_STCLOSE) {
+                       buffer_shutr(req);
+                       fd_delete(t->cli_fd);
+                       t->cli_state = CL_STCLOSE;
+                       return 1;
+               }
+               else if (tv_isle(&req->rex, &now)) {
+                       buffer_shutr(req);
+                       fd_delete(t->cli_fd);
+                       t->cli_state = CL_STCLOSE;
+                       if (!(t->flags & SN_ERR_MASK))
+                               t->flags |= SN_ERR_CLITO;
+                       if (!(t->flags & SN_FINST_MASK)) {
+                               if (t->pend_pos)
+                                       t->flags |= SN_FINST_Q;
+                               else if (s == SV_STCONN)
+                                       t->flags |= SN_FINST_C;
+                               else
+                                       t->flags |= SN_FINST_D;
+                       }
+                       return 1;
+               }
+               else if (req->l >= req->rlim - req->data) {
+                       /* no room to read more data */
+
+                       /* FIXME-20050705: is it possible for a client to maintain a session
+                        * after the timeout by sending more data after it receives a close ?
+                        */
+
+                       if (EV_FD_COND_C(t->cli_fd, DIR_RD)) {
+                               /* stop reading until we get some space */
+                               tv_eternity(&req->rex);
+                               //fprintf(stderr,"%p:%s(%d), c=%d, s=%d\n", t, __FUNCTION__, __LINE__, t->cli_state, t->cli_state);
+                       }
+               } else {
+                       /* there's still some space in the buffer */
+                       if (EV_FD_COND_S(t->cli_fd, DIR_RD)) {
+                               if (!tv_add_ifset(&req->rex, &now, &req->rto))
+                                       tv_eternity(&req->rex);
+                               //fprintf(stderr,"%p:%s(%d), c=%d, s=%d\n", t, __FUNCTION__, __LINE__, t->cli_state, t->cli_state);
+                       }
+               }
+               return 0;
+       }
+       else { /* CL_STCLOSE: nothing to do */
+               if ((global.mode & MODE_DEBUG) && (!(global.mode & MODE_QUIET) || (global.mode & MODE_VERBOSE))) {
+                       int len;
+                       len = sprintf(trash, "%08x:%s.clicls[%04x:%04x]\n", t->uniq_id, t->be?t->be->id:"",
+                                     (unsigned short)t->cli_fd, (unsigned short)t->srv_fd);
+                       write(1, trash, len);
+               }
+               return 0;
+       }
+       return 0;
+}
+
+#if 0
+       /* FIXME! This part has not been completely converted yet, and it may
+        * still be very specific to TCPv4 ! Also, it relies on some parameters
+        * such as conn_retries which are not set upon accept().
+        */
+/*
+ * Manages the server FSM and its socket. It returns 1 if a state has changed
+ * (and a resync may be needed), otherwise 0.
+ */
+static int process_uxst_srv(struct session *t)
+{
+       int s = t->srv_state;
+       int c = t->cli_state;
+       struct buffer *req = t->req;
+       struct buffer *rep = t->rep;
+       int conn_err;
+
+       if (s == SV_STIDLE) {
+               if (c == CL_STCLOSE || c == CL_STSHUTW ||
+                        (c == CL_STSHUTR &&
+                         (t->req->l == 0 || t->be->options & PR_O_ABRT_CLOSE))) { /* give up */
+                       tv_eternity(&req->cex);
+                       if (t->pend_pos)
+                               t->logs.t_queue = tv_ms_elapsed(&t->logs.tv_accept, &now);
+                       srv_close_with_err(t, SN_ERR_CLICL, t->pend_pos ? SN_FINST_Q : SN_FINST_C);
+                       return 1;
+               }
+               else {
+                       /* FIXME: reimplement the TARPIT check here */
+
+                       /* Right now, we will need to create a connection to the server.
+                        * We might already have tried, and got a connection pending, in
+                        * which case we will not do anything till it's pending. It's up
+                        * to any other session to release it and wake us up again.
+                        */
+                       if (t->pend_pos) {
+                               if (!tv_isle(&req->cex, &now))
+                                       return 0;
+                               else {
+                                       /* we've been waiting too long here */
+                                       tv_eternity(&req->cex);
+                                       t->logs.t_queue = tv_ms_elapsed(&t->logs.tv_accept, &now);
+                                       srv_close_with_err(t, SN_ERR_SRVTO, SN_FINST_Q);
+                                       if (t->srv)
+                                               t->srv->failed_conns++;
+                                       if (t->fe)
+                                               t->fe->failed_conns++;
+                                       return 1;
+                               }
+                       }
+
+                       do {
+                               /* first, get a connection */
+                               if (srv_redispatch_connect(t))
+                                       return t->srv_state != SV_STIDLE;
+
+                               /* try to (re-)connect to the server, and fail if we expire the
+                                * number of retries.
+                                */
+                               if (srv_retryable_connect(t)) {
+                                       t->logs.t_queue = tv_ms_elapsed(&t->logs.tv_accept, &now);
+                                       return t->srv_state != SV_STIDLE;
+                               }
+                       } while (1);
+               }
+       }
+       else if (s == SV_STCONN) { /* connection in progress */
+               if (c == CL_STCLOSE || c == CL_STSHUTW ||
+                   (c == CL_STSHUTR &&
+                    ((t->req->l == 0 && !(req->flags & BF_WRITE_STATUS)) ||
+                     t->be->options & PR_O_ABRT_CLOSE))) { /* give up */
+                       tv_eternity(&req->cex);
+                       fd_delete(t->srv_fd);
+                       if (t->srv)
+                               t->srv->cur_sess--;
+
+                       srv_close_with_err(t, SN_ERR_CLICL, SN_FINST_C);
+                       return 1;
+               }
+               if (!(req->flags & BF_WRITE_STATUS) && !tv_isle(&req->cex, &now)) {
+                       //fprintf(stderr,"1: c=%d, s=%d, now=%d.%06d, exp=%d.%06d\n", c, s, now.tv_sec, now.tv_usec, req->cex.tv_sec, req->cex.tv_usec);
+                       return 0; /* nothing changed */
+               }
+               else if (!(req->flags & BF_WRITE_STATUS) || (req->flags & BF_WRITE_ERROR)) {
+                       /* timeout, asynchronous connect error or first write error */
+                       //fprintf(stderr,"2: c=%d, s=%d\n", c, s);
+
+                       fd_delete(t->srv_fd);
+                       if (t->srv)
+                               t->srv->cur_sess--;
+
+                       if (!(req->flags & BF_WRITE_STATUS))
+                               conn_err = SN_ERR_SRVTO; // it was a connect timeout.
+                       else
+                               conn_err = SN_ERR_SRVCL; // it was an asynchronous connect error.
+
+                       /* ensure that we have enough retries left */
+                       if (srv_count_retry_down(t, conn_err))
+                               return 1;
+
+                       if (t->srv && t->conn_retries == 0 && t->be->options & PR_O_REDISP) {
+                               /* We're on our last chance, and the REDISP option was specified.
+                                * We will ignore cookie and force to balance or use the dispatcher.
+                                */
+                               /* let's try to offer this slot to anybody */
+                               if (may_dequeue_tasks(t->srv, t->be))
+                                       task_wakeup(t->srv->queue_mgt);
+
+                               if (t->srv)
+                                       t->srv->failed_conns++;
+                               t->be->failed_conns++;
+
+                               t->flags &= ~(SN_DIRECT | SN_ASSIGNED | SN_ADDR_SET);
+                               t->srv = NULL; /* it's left to the dispatcher to choose a server */
+
+                               /* first, get a connection */
+                               if (srv_redispatch_connect(t))
+                                       return t->srv_state != SV_STIDLE;
+                       }
+
+                       do {
+                               /* Now we will try to either reconnect to the same server or
+                                * connect to another server. If the connection gets queued
+                                * because all servers are saturated, then we will go back to
+                                * the SV_STIDLE state.
+                                */
+                               if (srv_retryable_connect(t)) {
+                                       t->logs.t_queue = tv_ms_elapsed(&t->logs.tv_accept, &now);
+                                       return t->srv_state != SV_STCONN;
+                               }
+
+                               /* we need to redispatch the connection to another server */
+                               if (srv_redispatch_connect(t))
+                                       return t->srv_state != SV_STCONN;
+                       } while (1);
+               }
+               else { /* no error or write 0 */
+                       t->logs.t_connect = tv_ms_elapsed(&t->logs.tv_accept, &now);
+
+                       //fprintf(stderr,"3: c=%d, s=%d\n", c, s);
+                       if (req->l == 0) /* nothing to write */ {
+                               EV_FD_CLR(t->srv_fd, DIR_WR);
+                               tv_eternity(&req->wex);
+                       } else  /* need the right to write */ {
+                               EV_FD_SET(t->srv_fd, DIR_WR);
+                               if (tv_add_ifset(&req->wex, &now, &req->wto)) {
+                                       /* FIXME: to prevent the server from expiring read timeouts during writes,
+                                        * we refresh it. */
+                                       rep->rex = req->wex;
+                               }
+                               else
+                                       tv_eternity(&req->wex);
+                       }
+
+                       EV_FD_SET(t->srv_fd, DIR_RD);
+                       if (!tv_add_ifset(&rep->rex, &now, &rep->rto))
+                               tv_eternity(&rep->rex);
+               
+                       t->srv_state = SV_STDATA;
+                       if (t->srv)
+                               t->srv->cum_sess++;
+                       rep->rlim = rep->data + BUFSIZE; /* no rewrite needed */
+
+                       /* if the user wants to log as soon as possible, without counting
+                          bytes from the server, then this is the right moment. */
+                       if (t->fe && t->fe->to_log && !(t->logs.logwait & LW_BYTES)) {
+                               t->logs.t_close = t->logs.t_connect; /* to get a valid end date */
+                               //uxst_sess_log(t);
+                       }
+                       tv_eternity(&req->cex);
+                       return 1;
+               }
+       }
+       else if (s == SV_STDATA) {
+               /* read or write error */
+               if (req->flags & BF_WRITE_ERROR || rep->flags & BF_READ_ERROR) {
+                       buffer_shutr(rep);
+                       buffer_shutw(req);
+                       fd_delete(t->srv_fd);
+                       if (t->srv) {
+                               t->srv->cur_sess--;
+                               t->srv->failed_resp++;
+                       }
+                       t->be->failed_resp++;
+                       t->srv_state = SV_STCLOSE;
+                       if (!(t->flags & SN_ERR_MASK))
+                               t->flags |= SN_ERR_SRVCL;
+                       if (!(t->flags & SN_FINST_MASK))
+                               t->flags |= SN_FINST_D;
+                       /* We used to have a free connection slot. Since we'll never use it,
+                        * we have to inform the server that it may be used by another session.
+                        */
+                       if (may_dequeue_tasks(t->srv, t->be))
+                               task_wakeup(t->srv->queue_mgt);
+
+                       return 1;
+               }
+               /* last read, or end of client write */
+               else if (rep->flags & BF_READ_NULL || c == CL_STSHUTW || c == CL_STCLOSE) {
+                       EV_FD_CLR(t->srv_fd, DIR_RD);
+                       buffer_shutr(rep);
+                       t->srv_state = SV_STSHUTR;
+                       //fprintf(stderr,"%p:%s(%d), c=%d, s=%d\n", t, __FUNCTION__, __LINE__, t->cli_state, t->cli_state);
+                       return 1;
+               }
+               /* end of client read and no more data to send */
+               else if ((c == CL_STSHUTR || c == CL_STCLOSE) && (req->l == 0)) {
+                       EV_FD_CLR(t->srv_fd, DIR_WR);
+                       buffer_shutw(req);
+                       shutdown(t->srv_fd, SHUT_WR);
+                       /* We must ensure that the read part is still alive when switching
+                        * to shutw */
+                       EV_FD_SET(t->srv_fd, DIR_RD);
+                       tv_add_ifset(&rep->rex, &now, &rep->rto);
+
+                       t->srv_state = SV_STSHUTW;
+                       return 1;
+               }
+               /* read timeout */
+               else if (tv_isle(&rep->rex, &now)) {
+                       EV_FD_CLR(t->srv_fd, DIR_RD);
+                       buffer_shutr(rep);
+                       t->srv_state = SV_STSHUTR;
+                       if (!(t->flags & SN_ERR_MASK))
+                               t->flags |= SN_ERR_SRVTO;
+                       if (!(t->flags & SN_FINST_MASK))
+                               t->flags |= SN_FINST_D;
+                       return 1;
+               }       
+               /* write timeout */
+               else if (tv_isle(&req->wex, &now)) {
+                       EV_FD_CLR(t->srv_fd, DIR_WR);
+                       buffer_shutw(req);
+                       shutdown(t->srv_fd, SHUT_WR);
+                       /* We must ensure that the read part is still alive when switching
+                        * to shutw */
+                       EV_FD_SET(t->srv_fd, DIR_RD);
+                       tv_add_ifset(&rep->rex, &now, &rep->rto);
+                       t->srv_state = SV_STSHUTW;
+                       if (!(t->flags & SN_ERR_MASK))
+                               t->flags |= SN_ERR_SRVTO;
+                       if (!(t->flags & SN_FINST_MASK))
+                               t->flags |= SN_FINST_D;
+                       return 1;
+               }
+
+               /* recompute request time-outs */
+               if (req->l == 0) {
+                       if (EV_FD_COND_C(t->srv_fd, DIR_WR)) {
+                               /* stop writing */
+                               tv_eternity(&req->wex);
+                       }
+               }
+               else { /* buffer not empty, there are still data to be transferred */
+                       if (EV_FD_COND_S(t->srv_fd, DIR_WR)) {
+                               /* restart writing */
+                               if (tv_add_ifset(&req->wex, &now, &req->wto)) {
+                                       /* FIXME: to prevent the server from expiring read timeouts during writes,
+                                        * we refresh it. */
+                                       rep->rex = req->wex;
+                               }
+                               else
+                                       tv_eternity(&req->wex);
+                       }
+               }
+
+               /* recompute response time-outs */
+               if (rep->l == BUFSIZE) { /* no room to read more data */
+                       if (EV_FD_COND_C(t->srv_fd, DIR_RD)) {
+                               tv_eternity(&rep->rex);
+                       }
+               }
+               else {
+                       if (EV_FD_COND_S(t->srv_fd, DIR_RD)) {
+                               if (!tv_add_ifset(&rep->rex, &now, &rep->rto))
+                                       tv_eternity(&rep->rex);
+                       }
+               }
+
+               return 0; /* other cases change nothing */
+       }
+       else if (s == SV_STSHUTR) {
+               if (req->flags & BF_WRITE_ERROR) {
+                       //EV_FD_CLR(t->srv_fd, DIR_WR);
+                       buffer_shutw(req);
+                       fd_delete(t->srv_fd);
+                       if (t->srv) {
+                               t->srv->cur_sess--;
+                               t->srv->failed_resp++;
+                       }
+                       t->be->failed_resp++;
+                       //close(t->srv_fd);
+                       t->srv_state = SV_STCLOSE;
+                       if (!(t->flags & SN_ERR_MASK))
+                               t->flags |= SN_ERR_SRVCL;
+                       if (!(t->flags & SN_FINST_MASK))
+                               t->flags |= SN_FINST_D;
+                       /* We used to have a free connection slot. Since we'll never use it,
+                        * we have to inform the server that it may be used by another session.
+                        */
+                       if (may_dequeue_tasks(t->srv, t->be))
+                               task_wakeup(t->srv->queue_mgt);
+
+                       return 1;
+               }
+               else if ((c == CL_STSHUTR || c == CL_STCLOSE) && (req->l == 0)) {
+                       //EV_FD_CLR(t->srv_fd, DIR_WR);
+                       buffer_shutw(req);
+                       fd_delete(t->srv_fd);
+                       if (t->srv)
+                               t->srv->cur_sess--;
+                       //close(t->srv_fd);
+                       t->srv_state = SV_STCLOSE;
+                       /* We used to have a free connection slot. Since we'll never use it,
+                        * we have to inform the server that it may be used by another session.
+                        */
+                       if (may_dequeue_tasks(t->srv, t->be))
+                               task_wakeup(t->srv->queue_mgt);
+
+                       return 1;
+               }
+               else if (tv_isle(&req->wex, &now)) {
+                       //EV_FD_CLR(t->srv_fd, DIR_WR);
+                       buffer_shutw(req);
+                       fd_delete(t->srv_fd);
+                       if (t->srv)
+                               t->srv->cur_sess--;
+                       //close(t->srv_fd);
+                       t->srv_state = SV_STCLOSE;
+                       if (!(t->flags & SN_ERR_MASK))
+                               t->flags |= SN_ERR_SRVTO;
+                       if (!(t->flags & SN_FINST_MASK))
+                               t->flags |= SN_FINST_D;
+                       /* We used to have a free connection slot. Since we'll never use it,
+                        * we have to inform the server that it may be used by another session.
+                        */
+                       if (may_dequeue_tasks(t->srv, t->be))
+                               task_wakeup(t->srv->queue_mgt);
+
+                       return 1;
+               }
+               else if (req->l == 0) {
+                       if (EV_FD_COND_C(t->srv_fd, DIR_WR)) {
+                               /* stop writing */
+                               tv_eternity(&req->wex);
+                       }
+               }
+               else { /* buffer not empty */
+                       if (EV_FD_COND_S(t->srv_fd, DIR_WR)) {
+                               /* restart writing */
+                               if (!tv_add_ifset(&req->wex, &now, &req->wto))
+                                       tv_eternity(&req->wex);
+                       }
+               }
+               return 0;
+       }
+       else if (s == SV_STSHUTW) {
+               if (rep->flags & BF_READ_ERROR) {
+                       //EV_FD_CLR(t->srv_fd, DIR_RD);
+                       buffer_shutr(rep);
+                       fd_delete(t->srv_fd);
+                       if (t->srv) {
+                               t->srv->cur_sess--;
+                               t->srv->failed_resp++;
+                       }
+                       t->be->failed_resp++;
+                       //close(t->srv_fd);
+                       t->srv_state = SV_STCLOSE;
+                       if (!(t->flags & SN_ERR_MASK))
+                               t->flags |= SN_ERR_SRVCL;
+                       if (!(t->flags & SN_FINST_MASK))
+                               t->flags |= SN_FINST_D;
+                       /* We used to have a free connection slot. Since we'll never use it,
+                        * we have to inform the server that it may be used by another session.
+                        */
+                       if (may_dequeue_tasks(t->srv, t->be))
+                               task_wakeup(t->srv->queue_mgt);
+
+                       return 1;
+               }
+               else if (rep->flags & BF_READ_NULL || c == CL_STSHUTW || c == CL_STCLOSE) {
+                       //EV_FD_CLR(t->srv_fd, DIR_RD);
+                       buffer_shutr(rep);
+                       fd_delete(t->srv_fd);
+                       if (t->srv)
+                               t->srv->cur_sess--;
+                       //close(t->srv_fd);
+                       t->srv_state = SV_STCLOSE;
+                       /* We used to have a free connection slot. Since we'll never use it,
+                        * we have to inform the server that it may be used by another session.
+                        */
+                       if (may_dequeue_tasks(t->srv, t->be))
+                               task_wakeup(t->srv->queue_mgt);
+
+                       return 1;
+               }
+               else if (tv_isle(&rep->rex, &now)) {
+                       //EV_FD_CLR(t->srv_fd, DIR_RD);
+                       buffer_shutr(rep);
+                       fd_delete(t->srv_fd);
+                       if (t->srv)
+                               t->srv->cur_sess--;
+                       //close(t->srv_fd);
+                       t->srv_state = SV_STCLOSE;
+                       if (!(t->flags & SN_ERR_MASK))
+                               t->flags |= SN_ERR_SRVTO;
+                       if (!(t->flags & SN_FINST_MASK))
+                               t->flags |= SN_FINST_D;
+                       /* We used to have a free connection slot. Since we'll never use it,
+                        * we have to inform the server that it may be used by another session.
+                        */
+                       if (may_dequeue_tasks(t->srv, t->be))
+                               task_wakeup(t->srv->queue_mgt);
+
+                       return 1;
+               }
+               else if (rep->l == BUFSIZE) { /* no room to read more data */
+                       if (EV_FD_COND_C(t->srv_fd, DIR_RD)) {
+                               tv_eternity(&rep->rex);
+                       }
+               }
+               else {
+                       if (EV_FD_COND_S(t->srv_fd, DIR_RD)) {
+                               if (!tv_add_ifset(&rep->rex, &now, &rep->rto))
+                                       tv_eternity(&rep->rex);
+                       }
+               }
+               return 0;
+       }
+       else { /* SV_STCLOSE : nothing to do */
+               if ((global.mode & MODE_DEBUG) && (!(global.mode & MODE_QUIET) || (global.mode & MODE_VERBOSE))) {
+                       int len;
+                       len = sprintf(trash, "%08x:%s.srvcls[%04x:%04x]\n",
+                                     t->uniq_id, t->be->id, (unsigned short)t->cli_fd, (unsigned short)t->srv_fd);
+                       write(1, trash, len);
+               }
+               return 0;
+       }
+       return 0;
+}
+
+/* Processes the client and server jobs of a session task, then
+ * puts it back to the wait queue in a clean state, or
+ * cleans up its resources if it must be deleted. Returns
+ * the time the task accepts to wait, or TIME_ETERNITY for
+ * infinity.
+ */
+void process_uxst_session(struct task *t, struct timeval *next)
+{
+       struct session *s = t->context;
+       int fsm_resync = 0;
+
+       do {
+               fsm_resync = 0;
+               fsm_resync |= process_uxst_cli(s);
+               if (s->srv_state == SV_STIDLE) {
+                       if (s->cli_state == CL_STCLOSE || s->cli_state == CL_STSHUTW) {
+                               s->srv_state = SV_STCLOSE;
+                               fsm_resync |= 1;
+                               continue;
+                       }
+                       if (s->cli_state == CL_STSHUTR ||
+                           (s->req->l >= s->req->rlim - s->req->data)) {
+                               if (s->req->l == 0) {
+                                       s->srv_state = SV_STCLOSE;
+                                       fsm_resync |= 1;
+                                       continue;
+                               }
+                               /* OK we have some remaining data to process */
+                               /* Just as an exercice, we copy the req into the resp,
+                                * and flush the req.
+                                */
+                               memcpy(s->rep->data, s->req->data, sizeof(s->rep->data));
+                               s->rep->l = s->req->l;
+                               s->rep->rlim = s->rep->data + BUFSIZE;
+                               s->rep->w = s->rep->data;
+                               s->rep->lr = s->rep->r = s->rep->data + s->rep->l;
+
+                               s->req->l = 0;
+                               s->srv_state = SV_STCLOSE;
+
+                               fsm_resync |= 1;
+                               continue;
+                       }
+               }
+       } while (fsm_resync);
+
+       if (likely(s->cli_state != CL_STCLOSE || s->srv_state != SV_STCLOSE)) {
+               s->req->flags &= BF_CLEAR_READ & BF_CLEAR_WRITE;
+               s->rep->flags &= BF_CLEAR_READ & BF_CLEAR_WRITE;
+
+               t->expire = s->req->rex;
+               tv_min(&t->expire, &s->req->rex, &s->req->wex);
+               tv_bound(&t->expire, &s->req->cex);
+               tv_bound(&t->expire, &s->rep->rex);
+               tv_bound(&t->expire, &s->rep->wex);
+
+               /* restore t to its place in the task list */
+               task_queue(t);
+
+               *next = t->expire;
+               return; /* nothing more to do */
+       }
+
+       if (s->fe)
+               s->fe->feconn--;
+       if (s->be && (s->flags & SN_BE_ASSIGNED))
+               s->be->beconn--;
+       actconn--;
+    
+       if (unlikely((global.mode & MODE_DEBUG) &&
+                    (!(global.mode & MODE_QUIET) || (global.mode & MODE_VERBOSE)))) {
+               int len;
+               len = sprintf(trash, "%08x:%s.closed[%04x:%04x]\n",
+                             s->uniq_id, s->be->id,
+                             (unsigned short)s->cli_fd, (unsigned short)s->srv_fd);
+               write(1, trash, len);
+       }
+
+       s->logs.t_close = tv_ms_elapsed(&s->logs.tv_accept, &now);
+       if (s->req != NULL)
+               s->logs.bytes_in = s->req->total;
+       if (s->rep != NULL)
+               s->logs.bytes_out = s->rep->total;
+
+       if (s->fe) {
+               s->fe->bytes_in  += s->logs.bytes_in;
+               s->fe->bytes_out += s->logs.bytes_out;
+       }
+       if (s->be && (s->be != s->fe)) {
+               s->be->bytes_in  += s->logs.bytes_in;
+               s->be->bytes_out += s->logs.bytes_out;
+       }
+       if (s->srv) {
+               s->srv->bytes_in  += s->logs.bytes_in;
+               s->srv->bytes_out += s->logs.bytes_out;
+       }
+
+       /* let's do a final log if we need it */
+       if (s->logs.logwait && 
+           !(s->flags & SN_MONITOR) &&
+           (s->req->total || !(s->fe && s->fe->options & PR_O_NULLNOLOG))) {
+               //uxst_sess_log(s);
+       }
+
+       /* the task MUST not be in the run queue anymore */
+       task_delete(t);
+       session_free(s);
+       task_free(t);
+       tv_eternity(next);
+}
+#endif /* not converted */
+
+
+/* Processes data exchanges on the statistics socket. The client processing
+ * is called and the task is put back in the wait queue or it is cleared.
+ * In order to ease the transition, we simply simulate the server status
+ * for now. It only knows states SV_STIDLE and SV_STCLOSE. Returns in <next>
+ * the task's expiration date.
+ */
+void process_uxst_stats(struct task *t, struct timeval *next)
+{
+       struct session *s = t->context;
+       struct listener *listener;
+       int fsm_resync = 0;
+
+       do {
+               //fprintf(stderr,"fct %s:%d\n", __FUNCTION__, __LINE__);
+               fsm_resync = 0;
+               fsm_resync |= process_uxst_cli(s);
+               if (s->srv_state == SV_STIDLE) {
+                       if (s->cli_state == CL_STCLOSE || s->cli_state == CL_STSHUTW) {
+                               s->srv_state = SV_STCLOSE;
+                               fsm_resync |= 1;
+                               continue;
+                       }
+                       else if (s->cli_state == CL_STSHUTR ||
+                                (s->req->l >= s->req->rlim - s->req->data)) {
+                               if (s->req->l == 0) {
+                                       s->srv_state = SV_STCLOSE;
+                                       fsm_resync |= 1;
+                                       continue;
+                               }
+                               /* OK we have some remaining data to process. Just for the
+                                * sake of an exercice, we copy the req into the resp,
+                                * and flush the req. This produces a simple echo function.
+                                */
+                               memcpy(s->rep->data, s->req->data, sizeof(s->rep->data));
+                               s->rep->l = s->req->l;
+                               s->rep->rlim = s->rep->data + BUFSIZE;
+                               s->rep->w = s->rep->data;
+                               s->rep->lr = s->rep->r = s->rep->data + s->rep->l;
+
+                               s->req->l = 0;
+                               s->srv_state = SV_STCLOSE;
+
+                               fsm_resync |= 1;
+                               continue;
+                       }
+               }
+       } while (fsm_resync);
+
+       if (likely(s->cli_state != CL_STCLOSE || s->srv_state != SV_STCLOSE)) {
+               s->req->flags &= BF_CLEAR_READ & BF_CLEAR_WRITE;
+               s->rep->flags &= BF_CLEAR_READ & BF_CLEAR_WRITE;
+
+               t->expire = s->req->rex;
+               tv_min(&t->expire, &s->req->rex, &s->req->wex);
+               tv_bound(&t->expire, &s->req->cex);
+               tv_bound(&t->expire, &s->rep->rex);
+               tv_bound(&t->expire, &s->rep->wex);
+
+               /* restore t to its place in the task list */
+               task_queue(t);
+
+               *next = t->expire;
+               return; /* nothing more to do */
+       }
+
+       actconn--;
+       listener = fdtab[s->cli_fd].listener;
+       if (listener) {
+               listener->nbconn--;
+               if (listener->state == LI_FULL &&
+                   listener->nbconn < listener->maxconn) {
+                       /* we should reactivate the listener */
+                       EV_FD_SET(listener->fd, DIR_RD);
+                       listener->state = LI_READY;
+               }
+       }
+
+       /* the task MUST not be in the run queue anymore */
+       task_delete(t);
+       session_free(s);
+       task_free(t);
+       tv_eternity(next);
+}
+
+/* Note: must not be declared <const> as its list will be overwritten */
+static struct protocol proto_unix = {
+       .name = "unix_stream",
+       .sock_domain = PF_UNIX,
+       .sock_type = SOCK_STREAM,
+       .sock_prot = 0,
+       .sock_family = AF_UNIX,
+       .read = &stream_sock_read,
+       .write = &stream_sock_write,
+       .bind_all = uxst_bind_listeners,
+       .unbind_all = uxst_unbind_listeners,
+       .enable_all = uxst_enable_listeners,
+       .listeners = LIST_HEAD_INIT(proto_unix.listeners),
+       .nb_listeners = 0,
+};
+
+/* Adds listener to the list of unix stream listeners */
+void uxst_add_listener(struct listener *listener)
+{
+       listener->proto = &proto_unix;
+       LIST_ADDQ(&proto_unix.listeners, &listener->proto_list);
+       proto_unix.nb_listeners++;
+}
+
+__attribute__((constructor))
+static void __uxst_protocol_init(void)
+{
+       protocol_register(&proto_unix);
+       //tv_eternity(&global.unix_fe.clitimeout);
+}
+
+
+/*
+ * Local variables:
+ *  c-indent-level: 8
+ *  c-basic-offset: 8
+ * End:
+ */
index 26f953ee2e8a6658801efd5bdf614c889e830fe5..86696f4a6d736b31d1544e4d5c1ea36dd12a15a2 100644 (file)
@@ -45,26 +45,27 @@ void session_free(struct session *s)
        if (s->rep)
                pool_free2(pool2_buffer, s->rep);
 
-       if (txn->hdr_idx.v != NULL)
-               pool_free2(fe->hdr_idx_pool, txn->hdr_idx.v);
-
-       if (txn->rsp.cap != NULL) {
-               struct cap_hdr *h;
-               for (h = fe->rsp_cap; h; h = h->next) {
-                       if (txn->rsp.cap[h->index] != NULL)
-                               pool_free2(h->pool, txn->rsp.cap[h->index]);
+       if (fe) {
+               if (txn->hdr_idx.v != NULL)
+                       pool_free2(fe->hdr_idx_pool, txn->hdr_idx.v);
+
+               if (txn->rsp.cap != NULL) {
+                       struct cap_hdr *h;
+                       for (h = fe->rsp_cap; h; h = h->next) {
+                               if (txn->rsp.cap[h->index] != NULL)
+                                       pool_free2(h->pool, txn->rsp.cap[h->index]);
+                       }
+                       pool_free2(fe->rsp_cap_pool, txn->rsp.cap);
                }
-               pool_free2(fe->rsp_cap_pool, txn->rsp.cap);
-       }
-       if (txn->req.cap != NULL) {
-               struct cap_hdr *h;
-               for (h = fe->req_cap; h; h = h->next) {
-                       if (txn->req.cap[h->index] != NULL)
-                               pool_free2(h->pool, txn->req.cap[h->index]);
+               if (txn->req.cap != NULL) {
+                       struct cap_hdr *h;
+                       for (h = fe->req_cap; h; h = h->next) {
+                               if (txn->req.cap[h->index] != NULL)
+                                       pool_free2(h->pool, txn->req.cap[h->index]);
+                       }
+                       pool_free2(fe->req_cap_pool, txn->req.cap);
                }
-               pool_free2(fe->req_cap_pool, txn->req.cap);
        }
-
        if (txn->uri)
                pool_free2(pool2_requri, txn->uri);
        if (txn->cli_cookie)
@@ -75,7 +76,7 @@ void session_free(struct session *s)
        pool_free2(pool2_session, s);
 
        /* We may want to free the maximum amount of pools if the proxy is stopping */
-       if (unlikely(fe->state == PR_STSTOPPED)) {
+       if (fe && unlikely(fe->state == PR_STSTOPPED)) {
                if (pool2_buffer)
                        pool_flush2(pool2_buffer);
                if (fe->hdr_idx_pool)