]> git.ipfire.org Git - thirdparty/squid.git/commitdiff
Overlapping helper requests for standard helpers (not stateful)
authorhno <>
Thu, 29 May 2003 21:54:07 +0000 (21:54 +0000)
committerhno <>
Thu, 29 May 2003 21:54:07 +0000 (21:54 +0000)
 - basic auth
 - redirectors
 - external acls

src/auth/basic/auth_basic.cc
src/auth/basic/auth_basic.h
src/cf.data.pre
src/external_acl.cc
src/helper.cc
src/redirect.cc
src/structs.h

index b134397373b89233a3b4370662c3015f46ea22b0..6dc32e28e8f1c51974989f24f7d0ee822ee1c6b4 100644 (file)
@@ -1,5 +1,5 @@
 /*
- * $Id: auth_basic.cc,v 1.24 2003/02/21 22:50:26 robertc Exp $
+ * $Id: auth_basic.cc,v 1.25 2003/05/29 15:54:09 hno Exp $
  *
  * DEBUG: section 29    Authenticator
  * AUTHOR: Duane Wessels
@@ -352,10 +352,12 @@ authBasicCfgDump(StoreEntry * entry, const char *name, authScheme * scheme)
         list = list->next;
     }
 
-    storeAppendPrintf(entry, "\n%s %s realm %s\n%s %s children %d\n%s %s credentialsttl %d seconds\n",
-                      name, "basic", config->basicAuthRealm,
-                      name, "basic", config->authenticateChildren,
-                      name, "basic", (int) config->credentialsTTL);
+    storeAppendPrintf(entry, "\n");
+
+    storeAppendPrintf(entry, "%s basic realm %s\n", name, config->basicAuthRealm);
+    storeAppendPrintf(entry, "%s basic children %d\n", name, config->authenticateChildren);
+    storeAppendPrintf(entry, "%s basic concurrency %d\n", name, config->authenticateConcurrency);
+    storeAppendPrintf(entry, "%s basic credentialsttl %d seconds\n", name, (int) config->credentialsTTL);
 
 }
 
@@ -383,6 +385,8 @@ authBasicParse(authScheme * scheme, int n_configured, char *param_str)
         requirePathnameExists("authparam basic program", basicConfig->authenticate->key);
     } else if (strcasecmp(param_str, "children") == 0) {
         parse_int(&basicConfig->authenticateChildren);
+    } else if (strcasecmp(param_str, "concurrency") == 0) {
+        parse_int(&basicConfig->authenticateConcurrency);
     } else if (strcasecmp(param_str, "realm") == 0) {
         parse_eol(&basicConfig->basicAuthRealm);
     } else if (strcasecmp(param_str, "credentialsttl") == 0) {
@@ -625,6 +629,8 @@ authBasicInit(authScheme * scheme)
 
         basicauthenticators->n_to_start = basicConfig->authenticateChildren;
 
+        basicauthenticators->concurrency = basicConfig->authenticateConcurrency;
+
         basicauthenticators->ipc_type = IPC_STREAM;
 
         helperOpenServers(basicauthenticators);
index 7123a92f0cd6d8019eb98cc3244c35f9a9027879..e895aade5dd9dc541ee3a857a71b8cf209c5c962 100644 (file)
@@ -58,6 +58,7 @@ class auth_basic_config
 
 public:
     int authenticateChildren;
+    int authenticateConcurrency;
     char *basicAuthRealm;
     wordlist *authenticate;
     time_t credentialsTTL;
index e34b5c4c57087639a0aa29a5ac8d24c74fe15aba..ae249d98738d72e81a99b8d8f27f503066d51337 100644 (file)
@@ -1,6 +1,6 @@
 
 #
-# $Id: cf.data.pre,v 1.317 2003/05/21 02:58:11 robertc Exp $
+# $Id: cf.data.pre,v 1.318 2003/05/29 15:54:07 hno Exp $
 #
 #
 # SQUID Web Proxy Cache          http://www.squid-cache.org/
@@ -1473,6 +1473,16 @@ DOC_START
        and other system resources.
 DOC_END
 
+NAME: redirect_concurrency
+TYPE: int
+DEFAULT: 0
+LOC: Config.redirectConcurrency
+DOC_START
+       The number of requests each redirector helper can handle in
+       parallell. Defaults to 0 which indicates that the redirector
+       is a old-style singlethreaded redirector.
+DOC_END
+
 NAME: redirect_rewrites_host_header
 TYPE: onoff
 DEFAULT: on
@@ -1550,6 +1560,12 @@ DOC_START
        processes.
        auth_param basic children 5
 
+       "concurrency" concurrency
+       The number of concurrent requests the helper can process.
+       The default of 0 is used for helpers who only supports
+       one request at a time.
+       auth_param basic concurrency 0
+
        "realm" realmstring
        Specifies the realm name which is to be reported to the
        client for the basic proxy authentication scheme (part of
@@ -1732,6 +1748,9 @@ DOC_START
                        as ttl)
          children=n    Number of acl helper processes spawn to service
                        external acl lookups of this type.
+         concurrency=n concurrency level per process. Use 0 for old style
+                       helpers who can only process a single request at a
+                       time.
          cache=n       result cache size, 0 is unbounded (default)
        
        FORMAT specifications
index a1bed3d499371603ebae1ef274d9494859cb1b0c..2ee2007362bb863b44184cddc86632e688a79993 100644 (file)
@@ -1,6 +1,6 @@
 
 /*
- * $Id: external_acl.cc,v 1.41 2003/05/20 12:17:39 robertc Exp $
+ * $Id: external_acl.cc,v 1.42 2003/05/29 15:54:08 hno Exp $
  *
  * DEBUG: section 82    External ACL
  * AUTHOR: Henrik Nordstrom, MARA Systems AB
@@ -95,6 +95,8 @@ public:
 
     int children;
 
+    int concurrency;
+
     helper *theHelper;
 
     hash_table *cache;
@@ -211,8 +213,10 @@ parse_externalAclHelper(external_acl ** list)
             a->ttl = atoi(token + 4);
         } else if (strncmp(token, "negative_ttl=", 13) == 0) {
             a->negative_ttl = atoi(token + 13);
-        } else if (strncmp(token, "children=", 12) == 0) {
-            a->children = atoi(token + 12);
+        } else if (strncmp(token, "children=", 9) == 0) {
+            a->children = atoi(token + 9);
+        } else if (strncmp(token, "concurrency=", 12) == 0) {
+            a->concurrency = atoi(token + 12);
         } else if (strncmp(token, "cache=", 6) == 0) {
             a->cache_size = atoi(token + 6);
         } else {
@@ -359,6 +363,9 @@ dump_externalAclHelper(StoreEntry * sentry, const char *name, const external_acl
         if (node->children != DEFAULT_EXTERNAL_ACL_CHILDREN)
             storeAppendPrintf(sentry, " children=%d", node->children);
 
+        if (node->concurrency)
+            storeAppendPrintf(sentry, " concurrency=%d", node->concurrency);
+
         for (format = node->format; format; format = format->next) {
             switch (format->type) {
 
@@ -831,11 +838,7 @@ free_externalAclState(void *data)
 
 /*
  * The helper program receives queries on stdin, one
- * per line, and must return the result on on stdout as
- *   OK user="Users login name"
- * on success, and
- *   ERR error="Description of the error"
- * on error (the user/error options are optional)
+ * per line, and must return the result on on stdout
  *
  * General result syntax:
  *
@@ -1045,6 +1048,8 @@ externalAclInit(void)
 
         p->theHelper->n_to_start = p->children;
 
+        p->theHelper->concurrency = p->concurrency;
+
         p->theHelper->ipc_type = IPC_TCP_SOCKET;
 
         helperOpenServers(p->theHelper);
index 75f9b7b096353e3b8242390a304ce92680ddc880..3832f7d3c1ad23022ab103183fc788763dddf567 100644 (file)
@@ -1,6 +1,6 @@
 
 /*
- * $Id: helper.cc,v 1.57 2003/02/21 22:50:09 robertc Exp $
+ * $Id: helper.cc,v 1.58 2003/05/29 15:54:08 hno Exp $
  *
  * DEBUG: section 84    Helper process maintenance
  * AUTHOR: Harvest Derived?
@@ -124,9 +124,9 @@ helperOpenServers(helper * hlp)
         srv->index = k;
         srv->rfd = rfd;
         srv->wfd = wfd;
-        srv->buf = (char *)memAllocate(MEM_8K_BUF);
-        srv->buf_sz = 8192;
-        srv->offset = 0;
+        srv->rbuf = (char *)memAllocBuf(8192, &srv->rbuf_sz);
+        srv->roffset = 0;
+        srv->requests = (helper_request **)xcalloc(hlp->concurrency ? hlp->concurrency : 1, sizeof(*srv->requests));
         srv->parent = cbdataReference(hlp);
         dlinkAddTail(srv, &srv->link, &hlp->servers);
 
@@ -146,6 +146,8 @@ helperOpenServers(helper * hlp)
             commSetNonBlocking(wfd);
 
         comm_add_close_handler(rfd, helperServerFree, srv);
+
+        comm_read(srv->rfd, srv->rbuf, srv->rbuf_sz - 1, helperHandleRead, srv);
     }
 
     safe_free(shortname);
@@ -224,9 +226,8 @@ helperStatefulOpenServers(statefulhelper * hlp)
         srv->index = k;
         srv->rfd = rfd;
         srv->wfd = wfd;
-        srv->buf = (char *)memAllocate(MEM_8K_BUF);
-        srv->buf_sz = 8192;
-        srv->offset = 0;
+        srv->rbuf = (char *)memAllocBuf(8192, &srv->rbuf_sz);
+        srv->roffset = 0;
         srv->parent = cbdataReference(hlp);
 
         if (hlp->datapool != NULL)
@@ -250,6 +251,9 @@ helperStatefulOpenServers(statefulhelper * hlp)
             commSetNonBlocking(wfd);
 
         comm_add_close_handler(rfd, helperStatefulServerFree, srv);
+
+        comm_read(srv->rfd, srv->rbuf, srv->rbuf_sz - 1, helperStatefulHandleRead, srv);
+
     }
 
     safe_free(shortname);
@@ -434,7 +438,7 @@ helperStatefulReset(helper_stateful_server * srv)
         debug(84, 1) ("helperStatefulReset: RESET During request %s \n",
                       hlp->id_name);
         srv->flags.busy = 0;
-        srv->offset = 0;
+        srv->roffset = 0;
         helperStatefulRequestFree(r);
         srv->request = NULL;
     }
@@ -487,9 +491,7 @@ helperStatefulServerGetData(helper_stateful_server * srv)
 void
 helperStats(StoreEntry * sentry, helper * hlp)
 {
-    helper_server *srv;
     dlink_node *link;
-    double tt;
     storeAppendPrintf(sentry, "program: %s\n",
                       hlp->cmdline->key);
     storeAppendPrintf(sentry, "number running: %d of %d\n",
@@ -514,26 +516,27 @@ helperStats(StoreEntry * sentry, helper * hlp)
                       "Request");
 
     for (link = hlp->servers.head; link; link = link->next) {
-        srv = (helper_server*)link->data;
-        tt = 0.001 * tvSubMsec(srv->dispatch_time,
-                               srv->flags.busy ? current_time : srv->answer_time);
-        storeAppendPrintf(sentry, "%7d\t%7d\t%7d\t%11d\t%c%c%c%c\t%7.3f\t%7d\t%s\n",
+        helper_server *srv = (helper_server*)link->data;
+        double tt = srv->requests[0] ? 0.001 * tvSubMsec(srv->requests[0]->dispatch_time, current_time) : 0.0;
+        storeAppendPrintf(sentry, "%7d\t%7d\t%7d\t%11d\t%c%c%c%c%c\t%7.3f\t%7d\t%s\n",
                           srv->index + 1,
                           srv->rfd,
                           srv->pid,
                           srv->stats.uses,
                           srv->flags.alive ? 'A' : ' ',
-                          srv->flags.busy ? 'B' : ' ',
+                          srv->stats.pending ? 'B' : ' ',
+                          srv->flags.writing ? 'W' : ' ',
                           srv->flags.closing ? 'C' : ' ',
                           srv->flags.shutdown ? 'S' : ' ',
                           tt < 0.0 ? 0.0 : tt,
-                          (int) srv->offset,
-                          srv->request ? log_quote(srv->request->buf) : "(none)");
+                          (int) srv->roffset,
+                          srv->requests[0] ? log_quote(srv->requests[0]->buf) : "(none)");
     }
 
     storeAppendPrintf(sentry, "\nFlags key:\n\n");
     storeAppendPrintf(sentry, "   A = ALIVE\n");
     storeAppendPrintf(sentry, "   B = BUSY\n");
+    storeAppendPrintf(sentry, "   W = BUSY\n");
     storeAppendPrintf(sentry, "   C = CLOSING\n");
     storeAppendPrintf(sentry, "   S = SHUTDOWN\n");
 }
@@ -582,7 +585,7 @@ helperStatefulStats(StoreEntry * sentry, statefulhelper * hlp)
                           srv->flags.shutdown ? 'S' : ' ',
                           srv->request ? (srv->request->placeholder ? 'P' : ' ') : ' ',
                                   tt < 0.0 ? 0.0 : tt,
-                                  (int) srv->offset,
+                                  (int) srv->roffset,
                                   srv->request ? log_quote(srv->request->buf) : "(none)");
     }
 
@@ -613,7 +616,7 @@ helperShutdown(helper * hlp)
 
         srv->flags.shutdown = 1;       /* request it to shut itself down */
 
-        if (srv->flags.busy) {
+        if (srv->stats.pending) {
             debug(84, 3) ("helperShutdown: %s #%d is BUSY.\n",
                           hlp->id_name, srv->index + 1);
             continue;
@@ -742,24 +745,36 @@ helperServerFree(int fd, void *data)
     helper_server *srv = (helper_server *)data;
     helper *hlp = srv->parent;
     helper_request *r;
+    int i, concurrency = hlp->concurrency;
+
+    if (!concurrency)
+        concurrency = 1;
+
     assert(srv->rfd == fd);
 
-    if (srv->buf) {
-        memFree(srv->buf, MEM_8K_BUF);
-        srv->buf = NULL;
+    if (srv->rbuf) {
+        memFreeBuf(srv->rbuf_sz, srv->rbuf);
+        srv->rbuf = NULL;
     }
 
-    if ((r = srv->request)) {
-        void *cbdata;
+    if (!memBufIsNull(&srv->wqueue))
+        memBufClean(&srv->wqueue);
 
-        if (cbdataReferenceValidDone(r->data, &cbdata))
-            r->callback(cbdata, srv->buf);
+    for (i = 0; i < concurrency; i++) {
+        if ((r = srv->requests[i])) {
+            void *cbdata;
 
-        helperRequestFree(r);
+            if (cbdataReferenceValidDone(r->data, &cbdata))
+                r->callback(cbdata, NULL);
 
-        srv->request = NULL;
+            helperRequestFree(r);
+
+            srv->requests[i] = NULL;
+        }
     }
 
+    safe_free(srv->requests);
+
     if (srv->wfd != srv->rfd && srv->wfd != -1)
         comm_close(srv->wfd);
 
@@ -789,16 +804,22 @@ helperStatefulServerFree(int fd, void *data)
     helper_stateful_request *r;
     assert(srv->rfd == fd);
 
-    if (srv->buf) {
-        memFree(srv->buf, MEM_8K_BUF);
-        srv->buf = NULL;
+    if (srv->rbuf) {
+        memFreeBuf(srv->rbuf_sz, srv->rbuf);
+        srv->rbuf = NULL;
     }
 
+#if 0
+    if (!memBufIsNull(&srv->wqueue))
+        memBufClean(&srv->wqueue);
+
+#endif
+
     if ((r = srv->request)) {
         void *cbdata;
 
         if (cbdataReferenceValidDone(r->data, &cbdata))
-            r->callback(cbdata, srv, srv->buf);
+            r->callback(cbdata, srv, NULL);
 
         helperStatefulRequestFree(r);
 
@@ -837,7 +858,6 @@ helperHandleRead(int fd, char *buf, size_t len, comm_err_t flag, int xerrno, voi
 {
     char *t = NULL;
     helper_server *srv = (helper_server *)data;
-    helper_request *r;
     helper *hlp = srv->parent;
     assert(fd == srv->rfd);
     assert(cbdataReferenceValid(data));
@@ -860,44 +880,63 @@ helperHandleRead(int fd, char *buf, size_t len, comm_err_t flag, int xerrno, voi
         return;
     }
 
-    srv->offset += len;
-    srv->buf[srv->offset] = '\0';
-    debug(84, 9) ("helperHandleRead: '%s'\n", srv->buf);
-    r = srv->request;
+    srv->roffset += len;
+    srv->rbuf[srv->roffset] = '\0';
+    debug(84, 9) ("helperHandleRead: '%s'\n", srv->rbuf);
 
-    if (r == NULL) {
+    if (!srv->stats.pending) {
         /* someone spoke without being spoken to */
-        debug(84, 1) ("helperHandleRead: unexpected read from %s #%d, %d bytes\n",
-                      hlp->id_name, srv->index + 1, (int)len);
-        srv->offset = 0;
-    } else if ((t = strchr(srv->buf, '\n'))) {
+        debug(84, 1) ("helperHandleRead: unexpected read from %s #%d, %d bytes '%s'\n",
+                      hlp->id_name, srv->index + 1, (int)len, srv->rbuf);
+        srv->roffset = 0;
+        srv->rbuf[0] = '\0';
+    }
+
+    while ((t = strchr(srv->rbuf, '\n'))) {
         /* end of reply found */
-        HLPCB *callback;
-        void *cbdata;
+        helper_request *r;
+        char *msg = srv->rbuf;
+        int i = 0;
         debug(84, 3) ("helperHandleRead: end of reply found\n");
-        *t = '\0';
-        callback = r->callback;
-        r->callback = NULL;
+        *t++ = '\0';
 
-        if (cbdataReferenceValidDone(r->data, &cbdata))
-            callback(cbdata, srv->buf);
+        if (hlp->concurrency) {
+            i = strtol(msg, &msg, 10);
 
-        srv->flags.busy = 0;
+            while (*msg && isspace(*msg))
+                msg++;
+        }
 
-        srv->offset = 0;
+        r = srv->requests[i];
 
-        helperRequestFree(r);
+        if (r) {
+            HLPCB *callback = r->callback;
+            void *cbdata;
 
-        srv->request = NULL;
+            srv->requests[i] = NULL;
 
-        hlp->stats.replies++;
+            r->callback = NULL;
 
-        srv->answer_time = current_time;
+            if (cbdataReferenceValidDone(r->data, &cbdata))
+                callback(cbdata, msg);
 
-        hlp->stats.avg_svc_time =
-            intAverage(hlp->stats.avg_svc_time,
-                       tvSubUsec(srv->dispatch_time, current_time),
-                       hlp->stats.replies, REDIRECT_AV_FACTOR);
+            srv->stats.pending--;
+
+            hlp->stats.replies++;
+
+            hlp->stats.avg_svc_time =
+                intAverage(hlp->stats.avg_svc_time,
+                           tvSubMsec(r->dispatch_time, current_time),
+                           hlp->stats.replies, REDIRECT_AV_FACTOR);
+
+            helperRequestFree(r);
+        } else {
+            debug(84, 1) ("helperHandleRead: unexpected reply on channel %d from %s #%d '%s'\n",
+                          i, hlp->id_name, srv->index + 1, srv->rbuf);
+        }
+
+        srv->roffset -= (t - srv->rbuf);
+        memmove(srv->rbuf, t, srv->roffset + 1);
 
         if (srv->flags.shutdown) {
             int wfd = srv->wfd;
@@ -905,9 +944,9 @@ helperHandleRead(int fd, char *buf, size_t len, comm_err_t flag, int xerrno, voi
             comm_close(wfd);
         } else
             helperKickQueue(hlp);
-    } else {
-        comm_read(fd, srv->buf + srv->offset, srv->buf_sz - srv->offset, helperHandleRead, data);
     }
+
+    comm_read(fd, srv->rbuf + srv->roffset, srv->rbuf_sz - srv->roffset - 1, helperHandleRead, srv);
 }
 
 static void
@@ -938,22 +977,24 @@ helperStatefulHandleRead(int fd, char *buf, size_t len, comm_err_t flag, int xer
         return;
     }
 
-    srv->offset += len;
-    srv->buf[srv->offset] = '\0';
+    srv->roffset += len;
+    srv->rbuf[srv->roffset] = '\0';
     r = srv->request;
 
     if (r == NULL) {
         /* someone spoke without being spoken to */
-        debug(84, 1) ("helperStatefulHandleRead: unexpected read from %s #%d, %d bytes\n",
-                      hlp->id_name, srv->index + 1, (int)len);
-        srv->offset = 0;
-    } else if ((t = strchr(srv->buf, '\n'))) {
+        debug(84, 1) ("helperStatefulHandleRead: unexpected read from %s #%d, %d bytes '%s'\n",
+                      hlp->id_name, srv->index + 1, (int)len, srv->rbuf);
+        srv->roffset = 0;
+    }
+
+    if ((t = strchr(srv->rbuf, '\n'))) {
         /* end of reply found */
         debug(84, 3) ("helperStatefulHandleRead: end of reply found\n");
         *t = '\0';
 
         if (cbdataReferenceValid(r->data)) {
-            switch ((r->callback(r->data, srv, srv->buf))) {   /*if non-zero reserve helper */
+            switch ((r->callback(r->data, srv, srv->rbuf))) {  /*if non-zero reserve helper */
 
             case S_HELPER_UNKNOWN:
                     fatal("helperStatefulHandleRead: either a non-state aware callback was give to the stateful helper routines, or an uninitialised callback response was recieved.\n");
@@ -1006,7 +1047,7 @@ helperStatefulHandleRead(int fd, char *buf, size_t len, comm_err_t flag, int xer
         }
 
         srv->flags.busy = 0;
-        srv->offset = 0;
+        srv->roffset = 0;
         helperStatefulRequestFree(r);
         srv->request = NULL;
         hlp->stats.replies++;
@@ -1027,10 +1068,10 @@ helperStatefulHandleRead(int fd, char *buf, size_t len, comm_err_t flag, int xer
             else
                 helperStatefulKickQueue(hlp);
         }
-    } else {
-        comm_read(srv->rfd, srv->buf + srv->offset, srv->buf_sz - srv->offset,
-                  helperStatefulHandleRead, srv);
     }
+
+    comm_read(srv->rfd, srv->rbuf + srv->roffset, srv->rbuf_sz - srv->roffset - 1,
+              helperStatefulHandleRead, srv);
 }
 
 static void
@@ -1165,24 +1206,41 @@ static helper_server *
 GetFirstAvailable(helper * hlp)
 {
     dlink_node *n;
-    helper_server *srv = NULL;
+    helper_server *srv;
+    helper_server *selected = NULL;
 
     if (hlp->n_running == 0)
         return NULL;
 
+    /* Find "least" loaded helper (approx) */
     for (n = hlp->servers.head; n != NULL; n = n->next) {
         srv = (helper_server *)n->data;
 
-        if (srv->flags.busy)
+        if (selected && selected->stats.pending <= srv->stats.pending)
             continue;
 
         if (!srv->flags.alive)
             continue;
 
-        return srv;
+        if (!srv->stats.pending)
+            return srv;
+
+        if (selected) {
+            selected = srv;
+            break;
+        }
+
+        selected = srv;
     }
 
-    return NULL;
+    /* Check for overload */
+    if (!selected)
+        return NULL;
+
+    if (selected->stats.pending >= (hlp->concurrency ? hlp->concurrency : 1))
+        return NULL;
+
+    return selected;
 }
 
 static helper_stateful_server *
@@ -1221,13 +1279,36 @@ StatefulGetFirstAvailable(statefulhelper * hlp)
 static void
 helperDispatchWriteDone(int fd, char *buf, size_t len, comm_err_t flag, int xerrno, void *data)
 {
-    /* nothing! */
+    helper_server *srv = (helper_server *)data;
+
+    memBufClean(&srv->writebuf);
+    srv->flags.writing = 0;
+
+    if (flag != COMM_OK) {
+        /* Helper server has crashed */
+        debug(84, 0) ("helperDispatch: Helper %s #%d has crashed\n",
+                      srv->parent->id_name, srv->index + 1);
+        return;
+    }
+
+    if (!memBufIsNull(&srv->wqueue)) {
+        srv->writebuf = srv->wqueue;
+        srv->wqueue = MemBufNull;
+        srv->flags.writing = 1;
+        comm_write(srv->wfd,
+                   srv->writebuf.buf,
+                   srv->writebuf.size,
+                   helperDispatchWriteDone,    /* Handler */
+                   srv);                       /* Handler-data */
+    }
 }
 
 static void
 helperDispatch(helper_server * srv, helper_request * r)
 {
     helper *hlp = srv->parent;
+    helper_request **ptr = NULL;
+    unsigned int slot;
 
     if (!cbdataReferenceValid(r->data)) {
         debug(84, 1) ("helperDispatch: invalid callback data\n");
@@ -1235,16 +1316,37 @@ helperDispatch(helper_server * srv, helper_request * r)
         return;
     }
 
-    assert(!srv->flags.busy);
-    srv->flags.busy = 1;
-    srv->request = r;
-    srv->dispatch_time = current_time;
-    comm_write(srv->wfd,
-               r->buf,
-               strlen(r->buf),
-               helperDispatchWriteDone,        /* Handler */
-               hlp);                           /* Handler-data */
-    comm_read(srv->rfd, srv->buf + srv->offset, srv->buf_sz - srv->offset, helperHandleRead, srv);
+    for (slot = 0; slot < (hlp->concurrency ? hlp->concurrency : 1); slot++) {
+        if (!srv->requests[slot]) {
+            ptr = &srv->requests[slot];
+            break;
+        }
+    }
+
+    assert(ptr);
+    *ptr = r;
+    srv->stats.pending += 1;
+    r->dispatch_time = current_time;
+
+    if (memBufIsNull(&srv->wqueue))
+        memBufDefInit(&srv->wqueue);
+
+    if (hlp->concurrency)
+        memBufPrintf(&srv->wqueue, "%d %s", slot, r->buf);
+    else
+        memBufAppend(&srv->wqueue, r->buf, strlen(r->buf));
+
+    if (!srv->flags.writing) {
+        srv->writebuf = srv->wqueue;
+        srv->wqueue = MemBufNull;
+        srv->flags.writing = 1;
+        comm_write(srv->wfd,
+                   srv->writebuf.buf,
+                   srv->writebuf.size,
+                   helperDispatchWriteDone,    /* Handler */
+                   srv);                       /* Handler-data */
+    }
+
     debug(84, 5) ("helperDispatch: Request sent to %s #%d, %d bytes\n",
                   hlp->id_name, srv->index + 1, (int) strlen(r->buf));
     srv->stats.uses++;
@@ -1308,8 +1410,6 @@ helperStatefulDispatch(helper_stateful_server * srv, helper_stateful_request * r
                strlen(r->buf),
                helperStatefulDispatchWriteDone,        /* Handler */
                hlp);                           /* Handler-data */
-    comm_read(srv->rfd, srv->buf + srv->offset, srv->buf_sz - srv->offset,
-              helperStatefulHandleRead, srv);
     debug(84, 5) ("helperStatefulDispatch: Request sent to %s #%d, %d bytes\n",
                   hlp->id_name, srv->index + 1, (int) strlen(r->buf));
     srv->stats.uses++;
index 4702de145d856b597f3ba297a31366433d5c9bdf..e7fb14510fc47a191bda257451f8fb31a5776e3e 100644 (file)
@@ -1,6 +1,6 @@
 
 /*
- * $Id: redirect.cc,v 1.97 2003/03/04 01:40:29 robertc Exp $
+ * $Id: redirect.cc,v 1.98 2003/05/29 15:54:08 hno Exp $
  *
  * DEBUG: section 61    Redirector
  * AUTHOR: Duane Wessels
@@ -184,6 +184,8 @@ redirectInit(void)
 
     redirectors->n_to_start = Config.redirectChildren;
 
+    redirectors->concurrency = Config.redirectConcurrency;
+
     redirectors->ipc_type = IPC_STREAM;
 
     helperOpenServers(redirectors);
index 3d4026f920900e1adce52ed744afb34be49ff61f..aace2a32c3239bd0c70f1fe5662a85535620c8bf 100644 (file)
@@ -1,6 +1,6 @@
 
 /*
- * $Id: structs.h,v 1.464 2003/05/20 12:17:39 robertc Exp $
+ * $Id: structs.h,v 1.465 2003/05/29 15:54:08 hno Exp $
  *
  *
  * SQUID Web Proxy Cache          http://www.squid-cache.org/
@@ -407,6 +407,7 @@ struct _SquidConfig
 #endif
 
     int redirectChildren;
+    int redirectConcurrency;
     time_t authenticateGCInterval;
     time_t authenticateTTL;
     time_t authenticateIpTTL;
@@ -2125,6 +2126,8 @@ struct _helper_request
     char *buf;
     HLPCB *callback;
     void *data;
+
+    struct timeval dispatch_time;
 };
 
 struct _helper_stateful_request
@@ -2145,6 +2148,7 @@ struct _helper
     int n_to_start;
     int n_running;
     int ipc_type;
+    unsigned int concurrency;
     time_t last_queue_warn;
 
     struct
@@ -2189,24 +2193,23 @@ struct _helper_server
     int pid;
     int rfd;
     int wfd;
-    char *buf;
-    size_t buf_sz;
-    off_t offset;
-
-    struct timeval dispatch_time;
+    MemBuf wqueue;
+    MemBuf writebuf;
+    char *rbuf;
+    size_t rbuf_sz;
+    off_t roffset;
 
-    struct timeval answer_time;
     dlink_node link;
     helper *parent;
-    helper_request *request;
+    helper_request **requests;
 
     struct _helper_flags
     {
 
-unsigned int alive:
+unsigned int writing:
         1;
 
-unsigned int busy:
+unsigned int alive:
         1;
 
 unsigned int closing:
@@ -2221,6 +2224,7 @@ unsigned int shutdown:
     struct
     {
         int uses;
+        unsigned int pending;
     }
 
     stats;
@@ -2233,9 +2237,11 @@ struct _helper_stateful_server
     int pid;
     int rfd;
     int wfd;
-    char *buf;
-    size_t buf_sz;
-    off_t offset;
+    /* MemBuf wqueue; */
+    /* MemBuf writebuf; */
+    char *rbuf;
+    size_t rbuf_sz;
+    off_t roffset;
 
     struct timeval dispatch_time;