]> git.ipfire.org Git - thirdparty/squid.git/blobdiff - src/helper.cc
Major rewrite of proxy authentication to support other schemes than
[thirdparty/squid.git] / src / helper.cc
index cbb4b6aa8deff7de3a26cea605c455469d1e6f00..0201482d158f7e81fc6f612972f0fd44260d0b57 100644 (file)
@@ -1,16 +1,61 @@
+
+/*
+ * $Id: helper.cc,v 1.22 2001/01/07 23:36:39 hno Exp $
+ *
+ * DEBUG: section 29    Helper process maintenance
+ * AUTHOR: Harvest Derived?
+ *
+ * SQUID Internet Object Cache  http://squid.nlanr.net/Squid/
+ * ----------------------------------------------------------
+ *
+ *  Squid is the result of efforts by numerous individuals from the
+ *  Internet community.  Development is led by Duane Wessels of the
+ *  National Laboratory for Applied Network Research and funded by the
+ *  National Science Foundation.  Squid is Copyrighted (C) 1998 by
+ *  the Regents of the University of California.  Please see the
+ *  COPYRIGHT file for full details.  Squid incorporates software
+ *  developed and/or copyrighted by other sources.  Please see the
+ *  CREDITS file for full details.
+ *
+ *  This program is free software; you can redistribute it and/or modify
+ *  it under the terms of the GNU General Public License as published by
+ *  the Free Software Foundation; either version 2 of the License, or
+ *  (at your option) any later version.
+ *  
+ *  This program is distributed in the hope that it will be useful,
+ *  but WITHOUT ANY WARRANTY; without even the implied warranty of
+ *  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+ *  GNU General Public License for more details.
+ *  
+ *  You should have received a copy of the GNU General Public License
+ *  along with this program; if not, write to the Free Software
+ *  Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111, USA.
+ *
+ */
+
 #include "squid.h"
 
 #define HELPER_MAX_ARGS 64
 
 static PF helperHandleRead;
+static PF helperStatefulHandleRead;
 static PF helperServerFree;
+static PF helperStatefulServerFree;
 static void Enqueue(helper * hlp, helper_request *);
 static helper_request *Dequeue(helper * hlp);
+static helper_stateful_request *StatefulDequeue(statefulhelper * hlp);
 static helper_server *GetFirstAvailable(helper * hlp);
+static helper_stateful_server *StatefulGetFirstAvailable(statefulhelper * hlp);
 static void helperDispatch(helper_server * srv, helper_request * r);
+static void helperStatefulDispatch(helper_stateful_server * srv, helper_stateful_request * r);
 static void helperKickQueue(helper * hlp);
+static void helperStatefulKickQueue(statefulhelper * hlp);
 static void helperRequestFree(helper_request * r);
-
+static void helperStatefulRequestFree(helper_stateful_request * r);
+static void StatefulEnqueue(statefulhelper * hlp, helper_stateful_request * r);
+static helper_stateful_request *StatefulServerDequeue(helper_stateful_server * srv);
+static void StatefulServerEnqueue(helper_stateful_server * srv, helper_stateful_request * r);
+static void helperStatefulServerKickQueue(helper_stateful_server * srv);
 
 void
 helperOpenServers(helper * hlp)
@@ -58,8 +103,7 @@ helperOpenServers(helper * hlp)
            continue;
        }
        hlp->n_running++;
-       srv = memAllocate(MEM_HELPER_SERVER);
-       cbdataAdd(srv, MEM_HELPER_SERVER);
+       srv = CBDATA_ALLOC(helper_server, NULL);
        srv->flags.alive = 1;
        srv->index = k;
        srv->rfd = rfd;
@@ -89,6 +133,87 @@ helperOpenServers(helper * hlp)
     helperKickQueue(hlp);
 }
 
+void
+helperStatefulOpenServers(statefulhelper * hlp)
+{
+    char *s;
+    char *progname;
+    char *shortname;
+    char *procname;
+    char *args[HELPER_MAX_ARGS];
+    char fd_note_buf[FD_DESC_SZ];
+    helper_stateful_server *srv;
+    int nargs = 0;
+    int k;
+    int x;
+    int rfd;
+    int wfd;
+    wordlist *w;
+    if (hlp->cmdline == NULL)
+       return;
+    progname = hlp->cmdline->key;
+    if ((s = strrchr(progname, '/')))
+       shortname = xstrdup(s + 1);
+    else
+       shortname = xstrdup(progname);
+    debug(29, 1) ("helperStatefulOpenServers: Starting %d '%s' processes\n",
+       hlp->n_to_start, shortname);
+    procname = xmalloc(strlen(shortname) + 3);
+    snprintf(procname, strlen(shortname) + 3, "(%s)", shortname);
+    args[nargs++] = procname;
+    for (w = hlp->cmdline->next; w && nargs < HELPER_MAX_ARGS; w = w->next)
+       args[nargs++] = w->key;
+    args[nargs++] = NULL;
+    assert(nargs <= HELPER_MAX_ARGS);
+    for (k = 0; k < hlp->n_to_start; k++) {
+       getCurrentTime();
+       rfd = wfd = -1;
+       x = ipcCreate(hlp->ipc_type,
+           progname,
+           args,
+           shortname,
+           &rfd,
+           &wfd);
+       if (x < 0) {
+           debug(29, 1) ("WARNING: Cannot run '%s' process.\n", progname);
+           continue;
+       }
+       hlp->n_running++;
+       srv = CBDATA_ALLOC(helper_stateful_server, NULL);
+       srv->flags.alive = 1;
+       srv->flags.reserved = S_HELPER_FREE;
+       srv->deferred_requests = 0;
+       srv->index = k;
+       srv->rfd = rfd;
+       srv->wfd = wfd;
+       srv->buf = memAllocate(MEM_8K_BUF);
+       srv->buf_sz = 8192;
+       srv->offset = 0;
+       srv->parent = hlp;
+       if (hlp->datapool != NULL)
+           srv->data = memPoolAlloc(hlp->datapool);
+       cbdataLock(hlp);        /* lock because of the parent backlink */
+       dlinkAddTail(srv, &srv->link, &hlp->servers);
+       if (rfd == wfd) {
+           snprintf(fd_note_buf, FD_DESC_SZ, "%s #%d", shortname, k + 1);
+           fd_note(rfd, fd_note_buf);
+       } else {
+           snprintf(fd_note_buf, FD_DESC_SZ, "reading %s #%d", shortname, k + 1);
+           fd_note(rfd, fd_note_buf);
+           snprintf(fd_note_buf, FD_DESC_SZ, "writing %s #%d", shortname, k + 1);
+           fd_note(wfd, fd_note_buf);
+       }
+       commSetNonBlocking(rfd);
+       if (wfd != rfd)
+           commSetNonBlocking(wfd);
+       comm_add_close_handler(rfd, helperStatefulServerFree, srv);
+    }
+    safe_free(shortname);
+    safe_free(procname);
+    helperStatefulKickQueue(hlp);
+}
+
+
 void
 helperSubmit(helper * hlp, const char *buf, HLPCB * callback, void *data)
 {
@@ -107,6 +232,136 @@ helperSubmit(helper * hlp, const char *buf, HLPCB * callback, void *data)
        helperDispatch(srv, r);
     else
        Enqueue(hlp, r);
+    debug(29, 9) ("helperSubmit: %s\n", buf);
+}
+
+void
+helperStatefulSubmit(statefulhelper * hlp, const char *buf, HLPSCB * callback, void *data, helper_stateful_server * lastserver)
+{
+    helper_stateful_request *r = memAllocate(MEM_HELPER_STATEFUL_REQUEST);
+    helper_stateful_server *srv;
+    if (hlp == NULL) {
+       debug(29, 3) ("helperStatefulSubmit: hlp == NULL\n");
+       callback(data, 0, NULL);
+       return;
+    }
+    r->callback = callback;
+    r->data = data;
+    if (buf != NULL)
+       r->buf = xstrdup(buf);
+    else
+       r->placeholder = 1;
+    cbdataLock(r->data);
+    if ((buf != NULL) && lastserver) {
+       debug(29, 5) ("StatefulSubmit with lastserver %d\n", lastserver);
+       if (lastserver->flags.reserved != S_HELPER_RESERVED)
+           lastserver->deferred_requests--;
+       if (!(lastserver->request)) {
+           debug(29, 5) ("StatefulSubmit dispatching\n");
+           helperStatefulDispatch(lastserver, r);
+       } else {
+           debug(29, 5) ("StatefulSubmit queuing\n");
+           StatefulServerEnqueue(lastserver, r);
+       }
+    } else {
+       if ((srv = StatefulGetFirstAvailable(hlp))) {
+           helperStatefulDispatch(srv, r);
+       } else
+           StatefulEnqueue(hlp, r);
+    }
+    debug(29, 9) ("helperStatefulSubmit: placeholder: '%d', buf '%s'.\n", r->placeholder, buf);
+}
+
+helper_stateful_server *
+helperStatefulDefer(statefulhelper * hlp)
+/* find and add a deferred request to a server */
+{
+    dlink_node *n;
+    helper_stateful_server *srv = NULL, *rv = NULL;
+    if (hlp == NULL) {
+       debug(29, 3) ("helperStatefulReserve: hlp == NULL\n");
+       return NULL;
+    }
+    debug(29, 5) ("helperStatefulDefer: Running servers %d.\n", hlp->n_running);
+    if (hlp->n_running == 0) {
+       debug(29, 1) ("helperStatefulDefer: No running servers!. \n");
+       return NULL;
+    }
+    srv = StatefulGetFirstAvailable(hlp);
+    /* all currently busy:loop through servers and find server with the shortest queue */
+    rv = srv;
+    if (rv == NULL)
+       for (n = hlp->servers.head; n != NULL; n = n->next) {
+           srv = n->data;
+           if (srv->flags.reserved == S_HELPER_RESERVED)
+               continue;
+           if (!srv->flags.alive)
+               continue;
+           if ((hlp->IsAvailable != NULL) && (srv->data != NULL) &&
+               !(hlp->IsAvailable(srv->data)))
+               continue;
+           if ((rv != NULL) && (rv->deferred_requests < srv->deferred_requests))
+               continue;
+           rv = srv;
+       }
+    if (rv == NULL) {
+       debug(29, 1) ("helperStatefulDefer: None available.\n");
+       return NULL;
+    }
+    rv->flags.reserved = S_HELPER_DEFERRED;
+    rv->deferred_requests++;
+    return rv;
+}
+
+void
+helperStatefulReset(helper_stateful_server * srv)
+/* puts this helper back in the queue. the calling app is required to 
+ * manage the state in the helper.
+ */
+{
+    statefulhelper *hlp = srv->parent;
+    helper_stateful_request *r;
+    r = srv->request;
+    if (r != NULL) {
+       /* reset attempt DURING an outstaning request */
+       debug(29, 1) ("helperStatefulReset: RESET During request %s \n",
+           hlp->id_name);
+       srv->flags.busy = 0;
+       srv->offset = 0;
+       helperStatefulRequestFree(r);
+       srv->request = NULL;
+    }
+    debug(29, 1) ("helperStatefulReset reset helper %s #%d\n", hlp->id_name, srv->index + 1);
+    srv->flags.busy = 0;
+    if (srv->queue.head) {
+       srv->flags.reserved = S_HELPER_DEFERRED;
+       helperStatefulServerKickQueue(srv);
+    } else {
+       srv->flags.reserved = S_HELPER_FREE;
+       if ((srv->parent->OnEmptyQueue != NULL) && (srv->data))
+           srv->parent->OnEmptyQueue(srv->data);
+       helperStatefulKickQueue(hlp);
+    }
+}
+
+void
+helperStatefulReleaseServer(helper_stateful_server * srv)
+/*decrease the number of 'waiting' clients that set the helper to be DEFERRED */
+{
+    if (srv->deferred_requests > 0)
+       srv->deferred_requests--;
+    if (!(srv->deferred_requests) && (srv->flags.reserved == S_HELPER_DEFERRED) && !(srv->queue.head)) {
+       srv->flags.reserved = S_HELPER_FREE;
+       if ((srv->parent->OnEmptyQueue != NULL) && (srv->data))
+           srv->parent->OnEmptyQueue(srv->data);
+    }
+}
+
+void *
+helperStatefulServerGetData(helper_stateful_server * srv)
+/* return a pointer to the stateful routines data area */
+{
+    return srv->data;
 }
 
 void
@@ -114,6 +369,7 @@ helperStats(StoreEntry * sentry, helper * hlp)
 {
     helper_server *srv;
     dlink_node *link;
+    double tt;
     storeAppendPrintf(sentry, "number running: %d of %d\n",
        hlp->n_running, hlp->n_to_start);
     storeAppendPrintf(sentry, "requests sent: %d\n",
@@ -125,16 +381,18 @@ helperStats(StoreEntry * sentry, helper * hlp)
     storeAppendPrintf(sentry, "avg service time: %d msec\n",
        hlp->stats.avg_svc_time);
     storeAppendPrintf(sentry, "\n");
-    storeAppendPrintf(sentry, "%7s\t%7s\t%11s\t%s\t%7s\t%7s\n",
+    storeAppendPrintf(sentry, "%7s\t%7s\t%11s\t%s\t%7s\t%7s\t%7s\n",
        "#",
        "FD",
        "# Requests",
        "Flags",
        "Time",
-       "Offset");
+       "Offset",
+       "Request");
     for (link = hlp->servers.head; link; link = link->next) {
        srv = link->data;
-       storeAppendPrintf(sentry, "%7d\t%7d\t%11d\t%c%c%c%c\t%7.3f\t%7d\n",
+       tt = 0.001 * tvSubMsec(srv->dispatch_time, current_time);
+       storeAppendPrintf(sentry, "%7d\t%7d\t%11d\t%c%c%c%c\t%7.3f\t%7d\t%s\n",
            srv->index + 1,
            srv->rfd,
            srv->stats.uses,
@@ -142,8 +400,9 @@ helperStats(StoreEntry * sentry, helper * hlp)
            srv->flags.busy ? 'B' : ' ',
            srv->flags.closing ? 'C' : ' ',
            srv->flags.shutdown ? 'S' : ' ',
-           0.001 * tvSubMsec(srv->dispatch_time, current_time),
-           (int) srv->offset);
+           tt < 0.0 ? 0.0 : tt,
+           (int) srv->offset,
+           srv->request ? log_quote(srv->request->buf) : "(none)");
     }
     storeAppendPrintf(sentry, "\nFlags key:\n\n");
     storeAppendPrintf(sentry, "   A = ALIVE\n");
@@ -152,6 +411,57 @@ helperStats(StoreEntry * sentry, helper * hlp)
     storeAppendPrintf(sentry, "   S = SHUTDOWN\n");
 }
 
+void
+helperStatefulStats(StoreEntry * sentry, statefulhelper * hlp)
+{
+    helper_stateful_server *srv;
+    dlink_node *link;
+    double tt;
+    storeAppendPrintf(sentry, "number running: %d of %d\n",
+       hlp->n_running, hlp->n_to_start);
+    storeAppendPrintf(sentry, "requests sent: %d\n",
+       hlp->stats.requests);
+    storeAppendPrintf(sentry, "replies received: %d\n",
+       hlp->stats.replies);
+    storeAppendPrintf(sentry, "queue length: %d\n",
+       hlp->stats.queue_size);
+    storeAppendPrintf(sentry, "avg service time: %d msec\n",
+       hlp->stats.avg_svc_time);
+    storeAppendPrintf(sentry, "\n");
+    storeAppendPrintf(sentry, "%7s\t%7s\t%11s\t%s\t%7s\t%7s\t%7s\n",
+       "#",
+       "FD",
+       "# Requests",
+       "# Deferred Requests",
+       "Flags",
+       "Time",
+       "Offset",
+       "Request");
+    for (link = hlp->servers.head; link; link = link->next) {
+       srv = link->data;
+       tt = 0.001 * tvSubMsec(srv->dispatch_time, current_time);
+       storeAppendPrintf(sentry, "%7d\t%7d\t%11d\t%11d\t%c%c%c%c%c\t%7.3f\t%7d\t%s\n",
+           srv->index + 1,
+           srv->rfd,
+           srv->stats.uses,
+           srv->deferred_requests,
+           srv->flags.alive ? 'A' : ' ',
+           srv->flags.busy ? 'B' : ' ',
+           srv->flags.closing ? 'C' : ' ',
+           srv->flags.reserved != S_HELPER_FREE ? 'R' : ' ',
+           srv->flags.shutdown ? 'S' : ' ',
+           tt < 0.0 ? 0.0 : tt,
+           (int) srv->offset,
+           srv->request ? log_quote(srv->request->buf) : "(none)");
+    }
+    storeAppendPrintf(sentry, "\nFlags key:\n\n");
+    storeAppendPrintf(sentry, "   A = ALIVE\n");
+    storeAppendPrintf(sentry, "   B = BUSY\n");
+    storeAppendPrintf(sentry, "   C = CLOSING\n");
+    storeAppendPrintf(sentry, "   R = RESERVED or DEFERRED\n");
+    storeAppendPrintf(sentry, "   S = SHUTDOWN\n");
+}
+
 void
 helperShutdown(helper * hlp)
 {
@@ -178,18 +488,70 @@ helperShutdown(helper * hlp)
        }
        srv->flags.closing = 1;
        comm_close(srv->wfd);
+       srv->wfd = -1;
+    }
+}
+
+void
+helperStatefulShutdown(statefulhelper * hlp)
+{
+    dlink_node *link = hlp->servers.head;
+    helper_stateful_server *srv;
+    while (link) {
+       srv = link->data;
+       link = link->next;
+       if (!srv->flags.alive) {
+           debug(34, 3) ("helperStatefulShutdown: %s #%d is NOT ALIVE.\n",
+               hlp->id_name, srv->index + 1);
+           continue;
+       }
+       srv->flags.shutdown = 1;        /* request it to shut itself down */
+       if (srv->flags.busy) {
+           debug(34, 3) ("helperStatefulShutdown: %s #%d is BUSY.\n",
+               hlp->id_name, srv->index + 1);
+           continue;
+       }
+       if (srv->flags.closing) {
+           debug(34, 3) ("helperStatefulShutdown: %s #%d is CLOSING.\n",
+               hlp->id_name, srv->index + 1);
+           continue;
+       }
+       if (srv->flags.reserved != S_HELPER_FREE) {
+           debug(34, 3) ("helperStatefulShutdown: %s #%d is RESERVED.\n",
+               hlp->id_name, srv->index + 1);
+           continue;
+       }
+       if (srv->deferred_requests) {
+           debug(34, 3) ("helperStatefulShutdown: %s #%d has DEFERRED requests.\n",
+               hlp->id_name, srv->index + 1);
+           continue;
+       }
+       srv->flags.closing = 1;
+       comm_close(srv->wfd);
+       srv->wfd = -1;
     }
 }
 
+
 helper *
 helperCreate(const char *name)
 {
-    helper *hlp = memAllocate(MEM_HELPER);
-    cbdataAdd(hlp, MEM_HELPER);
+    helper *hlp;
+    hlp = CBDATA_ALLOC(helper, NULL);
     hlp->id_name = name;
     return hlp;
 }
 
+statefulhelper *
+helperStatefulCreate(const char *name)
+{
+    statefulhelper *hlp;
+    hlp = CBDATA_ALLOC(statefulhelper, NULL);
+    hlp->id_name = name;
+    return hlp;
+}
+
+
 void
 helperFree(helper * hlp)
 {
@@ -200,6 +562,17 @@ helperFree(helper * hlp)
     cbdataFree(hlp);
 }
 
+void
+helperStatefulFree(statefulhelper * hlp)
+{
+    /* note, don't free hlp->name, it probably points to static memory */
+    if (hlp->queue.head)
+       debug(29, 0) ("WARNING: freeing %s helper with %d requests queued\n",
+           hlp->id_name, hlp->stats.queue_size);
+    cbdataFree(hlp);
+}
+
+
 /* ====================================================================== */
 /* LOCAL FUNCTIONS */
 /* ====================================================================== */
@@ -209,12 +582,19 @@ helperServerFree(int fd, void *data)
 {
     helper_server *srv = data;
     helper *hlp = srv->parent;
+    helper_request *r;
     assert(srv->rfd == fd);
     if (srv->buf) {
-       memFree(MEM_8K_BUF, srv->buf);
+       memFree(srv->buf, MEM_8K_BUF);
        srv->buf = NULL;
     }
-    if (srv->wfd != srv->rfd)
+    if ((r = srv->request)) {
+       if (cbdataValid(r->data))
+           r->callback(r->data, srv->buf);
+       helperRequestFree(r);
+       srv->request = NULL;
+    }
+    if (srv->wfd != srv->rfd && srv->wfd != -1)
        comm_close(srv->wfd);
     dlinkDelete(&srv->link, &hlp->servers);
     hlp->n_running--;
@@ -222,7 +602,6 @@ helperServerFree(int fd, void *data)
     if (!srv->flags.shutdown) {
        debug(34, 0) ("WARNING: %s #%d (FD %d) exited\n",
            hlp->id_name, srv->index + 1, fd);
-       assert(hlp->n_running >= hlp->n_to_start / 2);
        if (hlp->n_running < hlp->n_to_start / 2)
            fatalf("Too few %s processes are running", hlp->id_name);
     }
@@ -230,6 +609,41 @@ helperServerFree(int fd, void *data)
     cbdataFree(srv);
 }
 
+static void
+helperStatefulServerFree(int fd, void *data)
+{
+    helper_stateful_server *srv = data;
+    statefulhelper *hlp = srv->parent;
+    helper_stateful_request *r;
+    assert(srv->rfd == fd);
+    if (srv->buf) {
+       memFree(srv->buf, MEM_8K_BUF);
+       srv->buf = NULL;
+    }
+    if ((r = srv->request)) {
+       if (cbdataValid(r->data))
+           r->callback(r->data, srv, srv->buf);
+       helperStatefulRequestFree(r);
+       srv->request = NULL;
+    }
+    if (srv->wfd != srv->rfd && srv->wfd != -1)
+       comm_close(srv->wfd);
+    dlinkDelete(&srv->link, &hlp->servers);
+    hlp->n_running--;
+    assert(hlp->n_running >= 0);
+    if (!srv->flags.shutdown) {
+       debug(34, 0) ("WARNING: %s #%d (FD %d) exited\n",
+           hlp->id_name, srv->index + 1, fd);
+       if (hlp->n_running < hlp->n_to_start / 2)
+           fatalf("Too few %s processes are running", hlp->id_name);
+    }
+    if (srv->data != NULL)
+       memPoolFree(hlp->datapool, srv->data);
+    cbdataUnlock(srv->parent);
+    cbdataFree(srv);
+}
+
+
 static void
 helperHandleRead(int fd, void *data)
 {
@@ -240,7 +654,7 @@ helperHandleRead(int fd, void *data)
     helper *hlp = srv->parent;
     assert(fd == srv->rfd);
     assert(cbdataValid(data));
-    Counter.syscalls.sock.reads++;
+    statCounter.syscalls.sock.reads++;
     len = read(fd, srv->buf + srv->offset, srv->buf_sz - srv->offset);
     fd_bytes(fd, len, FD_READ);
     debug(29, 5) ("helperHandleRead: %d bytes from %s #%d.\n",
@@ -268,20 +682,117 @@ helperHandleRead(int fd, void *data)
        srv->flags.busy = 0;
        srv->offset = 0;
        helperRequestFree(r);
+       srv->request = NULL;
        hlp->stats.replies++;
        hlp->stats.avg_svc_time =
            intAverage(hlp->stats.avg_svc_time,
            tvSubMsec(srv->dispatch_time, current_time),
            hlp->stats.replies, REDIRECT_AV_FACTOR);
-       if (srv->flags.shutdown)
+       if (srv->flags.shutdown) {
            comm_close(srv->wfd);
-       else
+           srv->wfd = -1;
+       } else
            helperKickQueue(hlp);
     } else {
        commSetSelect(srv->rfd, COMM_SELECT_READ, helperHandleRead, srv, 0);
     }
 }
 
+static void
+helperStatefulHandleRead(int fd, void *data)
+{
+    int len;
+    char *t = NULL;
+    helper_stateful_server *srv = data;
+    helper_stateful_request *r;
+    statefulhelper *hlp = srv->parent;
+    assert(fd == srv->rfd);
+    assert(cbdataValid(data));
+    statCounter.syscalls.sock.reads++;
+    len = read(fd, srv->buf + srv->offset, srv->buf_sz - srv->offset);
+    fd_bytes(fd, len, FD_READ);
+    debug(29, 5) ("helperStatefulHandleRead: %d bytes from %s #%d.\n",
+       len, hlp->id_name, srv->index + 1);
+    if (len <= 0) {
+       if (len < 0)
+           debug(50, 1) ("helperStatefulHandleRead: FD %d read: %s\n", fd, xstrerror());
+       comm_close(fd);
+       return;
+    }
+    srv->offset += len;
+    srv->buf[srv->offset] = '\0';
+    r = srv->request;
+    if (r == NULL) {
+       /* someone spoke without being spoken to */
+       debug(29, 1) ("helperStatefulHandleRead: unexpected read from %s #%d, %d bytes\n",
+           hlp->id_name, srv->index + 1, len);
+       srv->offset = 0;
+    } else if ((t = strchr(srv->buf, '\n'))) {
+       /* end of reply found */
+       debug(29, 3) ("helperStatefulHandleRead: end of reply found\n");
+       *t = '\0';
+       if (cbdataValid(r->data)) {
+           switch ((r->callback(r->data, srv, srv->buf))) {    /*if non-zero reserve helper */
+           case S_HELPER_UNKNOWN:
+               fatal("helperStatefulHandleRead: either a non-state aware callback was give to the stateful helper routines, or an uninitialised callback response was recieved.\n");
+               break;
+           case S_HELPER_RELEASE:      /* helper finished with */
+               if (!srv->queue.head) {
+                   srv->flags.reserved = S_HELPER_FREE;
+                   if ((srv->parent->OnEmptyQueue != NULL) && (srv->data))
+                       srv->parent->OnEmptyQueue(srv->data);
+                   debug(29, 5) ("StatefulHandleRead: releasing %s #%d\n", hlp->id_name, srv->index + 1);
+               } else {
+                   srv->flags.reserved = S_HELPER_DEFERRED;
+                   debug(29, 5) ("StatefulHandleRead: outstanding deferred requests on %s #%d. reserving for deferred requests.\n", hlp->id_name, srv->index + 1);
+               }
+               break;
+           case S_HELPER_RESERVE:      /* 'pin' this helper for the caller */
+               if (!srv->queue.head) {
+                   srv->flags.reserved = S_HELPER_RESERVED;
+                   debug(29, 5) ("StatefulHandleRead: reserving %s #%d\n", hlp->id_name, srv->index + 1);
+               } else {
+                   fatal("StatefulHandleRead: Callback routine attempted to reserve a stateful helper with deferred requests. This can lead to deadlock.\n");
+               }
+               break;
+           case S_HELPER_DEFER:
+               /* the helper is still needed, but can
+                * be used for other requests in the meantime.
+                */
+               srv->flags.reserved = S_HELPER_DEFERRED;
+               srv->deferred_requests++;
+               debug(29, 5) ("StatefulHandleRead: reserving %s #%d for deferred requests.\n", hlp->id_name, srv->index + 1);
+               break;
+           default:
+               fatal("helperStatefulHandleRead: unknown stateful helper callback result.\n");
+           }
+
+       } else {
+           debug(29, 1) ("StatefulHandleRead: no callback data registered\n");
+       }
+       srv->flags.busy = 0;
+       srv->offset = 0;
+       helperStatefulRequestFree(r);
+       srv->request = NULL;
+       hlp->stats.replies++;
+       hlp->stats.avg_svc_time =
+           intAverage(hlp->stats.avg_svc_time,
+           tvSubMsec(srv->dispatch_time, current_time),
+           hlp->stats.replies, REDIRECT_AV_FACTOR);
+       if (srv->flags.shutdown) {
+           comm_close(srv->wfd);
+           srv->wfd = -1;
+       } else {
+           if (srv->queue.head)
+               helperStatefulServerKickQueue(srv);
+           else
+               helperStatefulKickQueue(hlp);
+       }
+    } else {
+       commSetSelect(srv->rfd, COMM_SELECT_READ, helperStatefulHandleRead, srv, 0);
+    }
+}
+
 static void
 Enqueue(helper * hlp, helper_request * r)
 {
@@ -302,6 +813,35 @@ Enqueue(helper * hlp, helper_request * r)
     debug(14, 1) ("Consider increasing the number of %s processes in your config file.\n", hlp->id_name);
 }
 
+static void
+StatefulEnqueue(statefulhelper * hlp, helper_stateful_request * r)
+{
+    dlink_node *link = memAllocate(MEM_DLINK_NODE);
+    dlinkAddTail(r, link, &hlp->queue);
+    hlp->stats.queue_size++;
+    if (hlp->stats.queue_size < hlp->n_running)
+       return;
+    if (squid_curtime - hlp->last_queue_warn < 600)
+       return;
+    if (shutting_down || reconfiguring)
+       return;
+    hlp->last_queue_warn = squid_curtime;
+    debug(14, 0) ("WARNING: All %s processes are busy.\n", hlp->id_name);
+    debug(14, 0) ("WARNING: %d pending requests queued\n", hlp->stats.queue_size);
+    if (hlp->stats.queue_size > hlp->n_running * 2)
+       fatalf("Too many queued %s requests", hlp->id_name);
+    debug(14, 1) ("Consider increasing the number of %s processes in your config file.\n", hlp->id_name);
+}
+
+static void
+StatefulServerEnqueue(helper_stateful_server * srv, helper_stateful_request * r)
+{
+    dlink_node *link = memAllocate(MEM_DLINK_NODE);
+    dlinkAddTail(r, link, &srv->queue);
+    /* XXX No queue length check here? */
+}
+
+
 static helper_request *
 Dequeue(helper * hlp)
 {
@@ -310,7 +850,34 @@ Dequeue(helper * hlp)
     if ((link = hlp->queue.head)) {
        r = link->data;
        dlinkDelete(link, &hlp->queue);
-       memFree(MEM_DLINK_NODE, link);
+       memFree(link, MEM_DLINK_NODE);
+       hlp->stats.queue_size--;
+    }
+    return r;
+}
+
+static helper_stateful_request *
+StatefulServerDequeue(helper_stateful_server * srv)
+{
+    dlink_node *link;
+    helper_stateful_request *r = NULL;
+    if ((link = srv->queue.head)) {
+       r = link->data;
+       dlinkDelete(link, &srv->queue);
+       memFree(link, MEM_DLINK_NODE);
+    }
+    return r;
+}
+
+static helper_stateful_request *
+StatefulDequeue(statefulhelper * hlp)
+{
+    dlink_node *link;
+    helper_stateful_request *r = NULL;
+    if ((link = hlp->queue.head)) {
+       r = link->data;
+       dlinkDelete(link, &hlp->queue);
+       memFree(link, MEM_DLINK_NODE);
        hlp->stats.queue_size--;
     }
     return r;
@@ -334,6 +901,31 @@ GetFirstAvailable(helper * hlp)
     return NULL;
 }
 
+static helper_stateful_server *
+StatefulGetFirstAvailable(statefulhelper * hlp)
+{
+    dlink_node *n;
+    helper_stateful_server *srv = NULL;
+    debug(29, 5) ("StatefulGetFirstAvailable: Running servers %d.\n", hlp->n_running);
+    if (hlp->n_running == 0)
+       return NULL;
+    for (n = hlp->servers.head; n != NULL; n = n->next) {
+       srv = n->data;
+       if (srv->flags.busy)
+           continue;
+       if (srv->flags.reserved == S_HELPER_RESERVED)
+           continue;
+       if (!srv->flags.alive)
+           continue;
+       if ((hlp->IsAvailable != NULL) && (srv->data != NULL) && !(hlp->IsAvailable(srv->data)))
+           continue;
+       return srv;
+    }
+    debug(29, 5) ("StatefulGetFirstAvailable: None available.\n");
+    return NULL;
+}
+
+
 static void
 helperDispatch(helper_server * srv, helper_request * r)
 {
@@ -363,6 +955,60 @@ helperDispatch(helper_server * srv, helper_request * r)
     hlp->stats.requests++;
 }
 
+static void
+helperStatefulDispatch(helper_stateful_server * srv, helper_stateful_request * r)
+{
+    statefulhelper *hlp = srv->parent;
+    if (!cbdataValid(r->data)) {
+       debug(29, 1) ("helperStatefulDispatch: invalid callback data\n");
+       helperStatefulRequestFree(r);
+       return;
+    }
+    debug(29, 9) ("helperStatefulDispatch busying helper %s #%d\n", hlp->id_name, srv->index + 1);
+    if (r->placeholder == 1) {
+       /* a callback is needed before this request can _use_ a helper. */
+       if (cbdataValid(r->data)) {
+           /* we don't care about releasing/deferring this helper. The request NEVER
+            * gets to the helper. So we throw away the return code */
+           r->callback(r->data, srv, NULL);
+           /* throw away the placeholder */
+           helperStatefulRequestFree(r);
+           /* and push the queue. Note that the callback may have call submit again - 
+            * which is why we test for the request*/
+           if (srv->request == NULL) {
+               if (srv->flags.shutdown) {
+                   comm_close(srv->wfd);
+                   srv->wfd = -1;
+               } else {
+                   if (srv->queue.head)
+                       helperStatefulServerKickQueue(srv);
+                   else
+                       helperStatefulKickQueue(hlp);
+               }
+           }
+       }
+       return;
+    }
+    srv->flags.busy = 1;
+    srv->request = r;
+    srv->dispatch_time = current_time;
+    comm_write(srv->wfd,
+       r->buf,
+       strlen(r->buf),
+       NULL,                   /* Handler */
+       NULL,                   /* Handler-data */
+       NULL);                  /* free */
+    commSetSelect(srv->rfd,
+       COMM_SELECT_READ,
+       helperStatefulHandleRead,
+       srv, 0);
+    debug(29, 5) ("helperStatefulDispatch: Request sent to %s #%d, %d bytes\n",
+       hlp->id_name, srv->index + 1, strlen(r->buf));
+    srv->stats.uses++;
+    hlp->stats.requests++;
+}
+
+
 static void
 helperKickQueue(helper * hlp)
 {
@@ -372,10 +1018,35 @@ helperKickQueue(helper * hlp)
        helperDispatch(srv, r);
 }
 
+static void
+helperStatefulKickQueue(statefulhelper * hlp)
+{
+    helper_stateful_request *r;
+    helper_stateful_server *srv;
+    while ((srv = StatefulGetFirstAvailable(hlp)) && (r = StatefulDequeue(hlp)))
+       helperStatefulDispatch(srv, r);
+}
+
+static void
+helperStatefulServerKickQueue(helper_stateful_server * srv)
+{
+    helper_stateful_request *r;
+    if ((r = StatefulServerDequeue(srv)))
+       helperStatefulDispatch(srv, r);
+}
+
 static void
 helperRequestFree(helper_request * r)
 {
     cbdataUnlock(r->data);
     xfree(r->buf);
-    memFree(MEM_HELPER_REQUEST, r);
+    memFree(r, MEM_HELPER_REQUEST);
+}
+
+static void
+helperStatefulRequestFree(helper_stateful_request * r)
+{
+    cbdataUnlock(r->data);
+    xfree(r->buf);
+    memFree(r, MEM_HELPER_STATEFUL_REQUEST);
 }