/*
- * $Id: helper.cc,v 1.57 2003/02/21 22:50:09 robertc Exp $
+ * $Id: helper.cc,v 1.58 2003/05/29 15:54:08 hno Exp $
*
* DEBUG: section 84 Helper process maintenance
* AUTHOR: Harvest Derived?
srv->index = k;
srv->rfd = rfd;
srv->wfd = wfd;
- srv->buf = (char *)memAllocate(MEM_8K_BUF);
- srv->buf_sz = 8192;
- srv->offset = 0;
+ srv->rbuf = (char *)memAllocBuf(8192, &srv->rbuf_sz);
+ srv->roffset = 0;
+ srv->requests = (helper_request **)xcalloc(hlp->concurrency ? hlp->concurrency : 1, sizeof(*srv->requests));
srv->parent = cbdataReference(hlp);
dlinkAddTail(srv, &srv->link, &hlp->servers);
commSetNonBlocking(wfd);
comm_add_close_handler(rfd, helperServerFree, srv);
+
+ comm_read(srv->rfd, srv->rbuf, srv->rbuf_sz - 1, helperHandleRead, srv);
}
safe_free(shortname);
srv->index = k;
srv->rfd = rfd;
srv->wfd = wfd;
- srv->buf = (char *)memAllocate(MEM_8K_BUF);
- srv->buf_sz = 8192;
- srv->offset = 0;
+ srv->rbuf = (char *)memAllocBuf(8192, &srv->rbuf_sz);
+ srv->roffset = 0;
srv->parent = cbdataReference(hlp);
if (hlp->datapool != NULL)
commSetNonBlocking(wfd);
comm_add_close_handler(rfd, helperStatefulServerFree, srv);
+
+ comm_read(srv->rfd, srv->rbuf, srv->rbuf_sz - 1, helperStatefulHandleRead, srv);
+
}
safe_free(shortname);
debug(84, 1) ("helperStatefulReset: RESET During request %s \n",
hlp->id_name);
srv->flags.busy = 0;
- srv->offset = 0;
+ srv->roffset = 0;
helperStatefulRequestFree(r);
srv->request = NULL;
}
void
helperStats(StoreEntry * sentry, helper * hlp)
{
- helper_server *srv;
dlink_node *link;
- double tt;
storeAppendPrintf(sentry, "program: %s\n",
hlp->cmdline->key);
storeAppendPrintf(sentry, "number running: %d of %d\n",
"Request");
for (link = hlp->servers.head; link; link = link->next) {
- srv = (helper_server*)link->data;
- tt = 0.001 * tvSubMsec(srv->dispatch_time,
- srv->flags.busy ? current_time : srv->answer_time);
- storeAppendPrintf(sentry, "%7d\t%7d\t%7d\t%11d\t%c%c%c%c\t%7.3f\t%7d\t%s\n",
+ helper_server *srv = (helper_server*)link->data;
+ double tt = srv->requests[0] ? 0.001 * tvSubMsec(srv->requests[0]->dispatch_time, current_time) : 0.0;
+ storeAppendPrintf(sentry, "%7d\t%7d\t%7d\t%11d\t%c%c%c%c%c\t%7.3f\t%7d\t%s\n",
srv->index + 1,
srv->rfd,
srv->pid,
srv->stats.uses,
srv->flags.alive ? 'A' : ' ',
- srv->flags.busy ? 'B' : ' ',
+ srv->stats.pending ? 'B' : ' ',
+ srv->flags.writing ? 'W' : ' ',
srv->flags.closing ? 'C' : ' ',
srv->flags.shutdown ? 'S' : ' ',
tt < 0.0 ? 0.0 : tt,
- (int) srv->offset,
- srv->request ? log_quote(srv->request->buf) : "(none)");
+ (int) srv->roffset,
+ srv->requests[0] ? log_quote(srv->requests[0]->buf) : "(none)");
}
storeAppendPrintf(sentry, "\nFlags key:\n\n");
storeAppendPrintf(sentry, " A = ALIVE\n");
storeAppendPrintf(sentry, " B = BUSY\n");
+ storeAppendPrintf(sentry, " W = BUSY\n");
storeAppendPrintf(sentry, " C = CLOSING\n");
storeAppendPrintf(sentry, " S = SHUTDOWN\n");
}
srv->flags.shutdown ? 'S' : ' ',
srv->request ? (srv->request->placeholder ? 'P' : ' ') : ' ',
tt < 0.0 ? 0.0 : tt,
- (int) srv->offset,
+ (int) srv->roffset,
srv->request ? log_quote(srv->request->buf) : "(none)");
}
srv->flags.shutdown = 1; /* request it to shut itself down */
- if (srv->flags.busy) {
+ if (srv->stats.pending) {
debug(84, 3) ("helperShutdown: %s #%d is BUSY.\n",
hlp->id_name, srv->index + 1);
continue;
helper_server *srv = (helper_server *)data;
helper *hlp = srv->parent;
helper_request *r;
+ int i, concurrency = hlp->concurrency;
+
+ if (!concurrency)
+ concurrency = 1;
+
assert(srv->rfd == fd);
- if (srv->buf) {
- memFree(srv->buf, MEM_8K_BUF);
- srv->buf = NULL;
+ if (srv->rbuf) {
+ memFreeBuf(srv->rbuf_sz, srv->rbuf);
+ srv->rbuf = NULL;
}
- if ((r = srv->request)) {
- void *cbdata;
+ if (!memBufIsNull(&srv->wqueue))
+ memBufClean(&srv->wqueue);
- if (cbdataReferenceValidDone(r->data, &cbdata))
- r->callback(cbdata, srv->buf);
+ for (i = 0; i < concurrency; i++) {
+ if ((r = srv->requests[i])) {
+ void *cbdata;
- helperRequestFree(r);
+ if (cbdataReferenceValidDone(r->data, &cbdata))
+ r->callback(cbdata, NULL);
- srv->request = NULL;
+ helperRequestFree(r);
+
+ srv->requests[i] = NULL;
+ }
}
+ safe_free(srv->requests);
+
if (srv->wfd != srv->rfd && srv->wfd != -1)
comm_close(srv->wfd);
helper_stateful_request *r;
assert(srv->rfd == fd);
- if (srv->buf) {
- memFree(srv->buf, MEM_8K_BUF);
- srv->buf = NULL;
+ if (srv->rbuf) {
+ memFreeBuf(srv->rbuf_sz, srv->rbuf);
+ srv->rbuf = NULL;
}
+#if 0
+ if (!memBufIsNull(&srv->wqueue))
+ memBufClean(&srv->wqueue);
+
+#endif
+
if ((r = srv->request)) {
void *cbdata;
if (cbdataReferenceValidDone(r->data, &cbdata))
- r->callback(cbdata, srv, srv->buf);
+ r->callback(cbdata, srv, NULL);
helperStatefulRequestFree(r);
{
char *t = NULL;
helper_server *srv = (helper_server *)data;
- helper_request *r;
helper *hlp = srv->parent;
assert(fd == srv->rfd);
assert(cbdataReferenceValid(data));
return;
}
- srv->offset += len;
- srv->buf[srv->offset] = '\0';
- debug(84, 9) ("helperHandleRead: '%s'\n", srv->buf);
- r = srv->request;
+ srv->roffset += len;
+ srv->rbuf[srv->roffset] = '\0';
+ debug(84, 9) ("helperHandleRead: '%s'\n", srv->rbuf);
- if (r == NULL) {
+ if (!srv->stats.pending) {
/* someone spoke without being spoken to */
- debug(84, 1) ("helperHandleRead: unexpected read from %s #%d, %d bytes\n",
- hlp->id_name, srv->index + 1, (int)len);
- srv->offset = 0;
- } else if ((t = strchr(srv->buf, '\n'))) {
+ debug(84, 1) ("helperHandleRead: unexpected read from %s #%d, %d bytes '%s'\n",
+ hlp->id_name, srv->index + 1, (int)len, srv->rbuf);
+ srv->roffset = 0;
+ srv->rbuf[0] = '\0';
+ }
+
+ while ((t = strchr(srv->rbuf, '\n'))) {
/* end of reply found */
- HLPCB *callback;
- void *cbdata;
+ helper_request *r;
+ char *msg = srv->rbuf;
+ int i = 0;
debug(84, 3) ("helperHandleRead: end of reply found\n");
- *t = '\0';
- callback = r->callback;
- r->callback = NULL;
+ *t++ = '\0';
- if (cbdataReferenceValidDone(r->data, &cbdata))
- callback(cbdata, srv->buf);
+ if (hlp->concurrency) {
+ i = strtol(msg, &msg, 10);
- srv->flags.busy = 0;
+ while (*msg && isspace(*msg))
+ msg++;
+ }
- srv->offset = 0;
+ r = srv->requests[i];
- helperRequestFree(r);
+ if (r) {
+ HLPCB *callback = r->callback;
+ void *cbdata;
- srv->request = NULL;
+ srv->requests[i] = NULL;
- hlp->stats.replies++;
+ r->callback = NULL;
- srv->answer_time = current_time;
+ if (cbdataReferenceValidDone(r->data, &cbdata))
+ callback(cbdata, msg);
- hlp->stats.avg_svc_time =
- intAverage(hlp->stats.avg_svc_time,
- tvSubUsec(srv->dispatch_time, current_time),
- hlp->stats.replies, REDIRECT_AV_FACTOR);
+ srv->stats.pending--;
+
+ hlp->stats.replies++;
+
+ hlp->stats.avg_svc_time =
+ intAverage(hlp->stats.avg_svc_time,
+ tvSubMsec(r->dispatch_time, current_time),
+ hlp->stats.replies, REDIRECT_AV_FACTOR);
+
+ helperRequestFree(r);
+ } else {
+ debug(84, 1) ("helperHandleRead: unexpected reply on channel %d from %s #%d '%s'\n",
+ i, hlp->id_name, srv->index + 1, srv->rbuf);
+ }
+
+ srv->roffset -= (t - srv->rbuf);
+ memmove(srv->rbuf, t, srv->roffset + 1);
if (srv->flags.shutdown) {
int wfd = srv->wfd;
comm_close(wfd);
} else
helperKickQueue(hlp);
- } else {
- comm_read(fd, srv->buf + srv->offset, srv->buf_sz - srv->offset, helperHandleRead, data);
}
+
+ comm_read(fd, srv->rbuf + srv->roffset, srv->rbuf_sz - srv->roffset - 1, helperHandleRead, srv);
}
static void
return;
}
- srv->offset += len;
- srv->buf[srv->offset] = '\0';
+ srv->roffset += len;
+ srv->rbuf[srv->roffset] = '\0';
r = srv->request;
if (r == NULL) {
/* someone spoke without being spoken to */
- debug(84, 1) ("helperStatefulHandleRead: unexpected read from %s #%d, %d bytes\n",
- hlp->id_name, srv->index + 1, (int)len);
- srv->offset = 0;
- } else if ((t = strchr(srv->buf, '\n'))) {
+ debug(84, 1) ("helperStatefulHandleRead: unexpected read from %s #%d, %d bytes '%s'\n",
+ hlp->id_name, srv->index + 1, (int)len, srv->rbuf);
+ srv->roffset = 0;
+ }
+
+ if ((t = strchr(srv->rbuf, '\n'))) {
/* end of reply found */
debug(84, 3) ("helperStatefulHandleRead: end of reply found\n");
*t = '\0';
if (cbdataReferenceValid(r->data)) {
- switch ((r->callback(r->data, srv, srv->buf))) { /*if non-zero reserve helper */
+ switch ((r->callback(r->data, srv, srv->rbuf))) { /*if non-zero reserve helper */
case S_HELPER_UNKNOWN:
fatal("helperStatefulHandleRead: either a non-state aware callback was give to the stateful helper routines, or an uninitialised callback response was recieved.\n");
}
srv->flags.busy = 0;
- srv->offset = 0;
+ srv->roffset = 0;
helperStatefulRequestFree(r);
srv->request = NULL;
hlp->stats.replies++;
else
helperStatefulKickQueue(hlp);
}
- } else {
- comm_read(srv->rfd, srv->buf + srv->offset, srv->buf_sz - srv->offset,
- helperStatefulHandleRead, srv);
}
+
+ comm_read(srv->rfd, srv->rbuf + srv->roffset, srv->rbuf_sz - srv->roffset - 1,
+ helperStatefulHandleRead, srv);
}
static void
GetFirstAvailable(helper * hlp)
{
dlink_node *n;
- helper_server *srv = NULL;
+ helper_server *srv;
+ helper_server *selected = NULL;
if (hlp->n_running == 0)
return NULL;
+ /* Find "least" loaded helper (approx) */
for (n = hlp->servers.head; n != NULL; n = n->next) {
srv = (helper_server *)n->data;
- if (srv->flags.busy)
+ if (selected && selected->stats.pending <= srv->stats.pending)
continue;
if (!srv->flags.alive)
continue;
- return srv;
+ if (!srv->stats.pending)
+ return srv;
+
+ if (selected) {
+ selected = srv;
+ break;
+ }
+
+ selected = srv;
}
- return NULL;
+ /* Check for overload */
+ if (!selected)
+ return NULL;
+
+ if (selected->stats.pending >= (hlp->concurrency ? hlp->concurrency : 1))
+ return NULL;
+
+ return selected;
}
static helper_stateful_server *
static void
helperDispatchWriteDone(int fd, char *buf, size_t len, comm_err_t flag, int xerrno, void *data)
{
- /* nothing! */
+ helper_server *srv = (helper_server *)data;
+
+ memBufClean(&srv->writebuf);
+ srv->flags.writing = 0;
+
+ if (flag != COMM_OK) {
+ /* Helper server has crashed */
+ debug(84, 0) ("helperDispatch: Helper %s #%d has crashed\n",
+ srv->parent->id_name, srv->index + 1);
+ return;
+ }
+
+ if (!memBufIsNull(&srv->wqueue)) {
+ srv->writebuf = srv->wqueue;
+ srv->wqueue = MemBufNull;
+ srv->flags.writing = 1;
+ comm_write(srv->wfd,
+ srv->writebuf.buf,
+ srv->writebuf.size,
+ helperDispatchWriteDone, /* Handler */
+ srv); /* Handler-data */
+ }
}
static void
helperDispatch(helper_server * srv, helper_request * r)
{
helper *hlp = srv->parent;
+ helper_request **ptr = NULL;
+ unsigned int slot;
if (!cbdataReferenceValid(r->data)) {
debug(84, 1) ("helperDispatch: invalid callback data\n");
return;
}
- assert(!srv->flags.busy);
- srv->flags.busy = 1;
- srv->request = r;
- srv->dispatch_time = current_time;
- comm_write(srv->wfd,
- r->buf,
- strlen(r->buf),
- helperDispatchWriteDone, /* Handler */
- hlp); /* Handler-data */
- comm_read(srv->rfd, srv->buf + srv->offset, srv->buf_sz - srv->offset, helperHandleRead, srv);
+ for (slot = 0; slot < (hlp->concurrency ? hlp->concurrency : 1); slot++) {
+ if (!srv->requests[slot]) {
+ ptr = &srv->requests[slot];
+ break;
+ }
+ }
+
+ assert(ptr);
+ *ptr = r;
+ srv->stats.pending += 1;
+ r->dispatch_time = current_time;
+
+ if (memBufIsNull(&srv->wqueue))
+ memBufDefInit(&srv->wqueue);
+
+ if (hlp->concurrency)
+ memBufPrintf(&srv->wqueue, "%d %s", slot, r->buf);
+ else
+ memBufAppend(&srv->wqueue, r->buf, strlen(r->buf));
+
+ if (!srv->flags.writing) {
+ srv->writebuf = srv->wqueue;
+ srv->wqueue = MemBufNull;
+ srv->flags.writing = 1;
+ comm_write(srv->wfd,
+ srv->writebuf.buf,
+ srv->writebuf.size,
+ helperDispatchWriteDone, /* Handler */
+ srv); /* Handler-data */
+ }
+
debug(84, 5) ("helperDispatch: Request sent to %s #%d, %d bytes\n",
hlp->id_name, srv->index + 1, (int) strlen(r->buf));
srv->stats.uses++;
strlen(r->buf),
helperStatefulDispatchWriteDone, /* Handler */
hlp); /* Handler-data */
- comm_read(srv->rfd, srv->buf + srv->offset, srv->buf_sz - srv->offset,
- helperStatefulHandleRead, srv);
debug(84, 5) ("helperStatefulDispatch: Request sent to %s #%d, %d bytes\n",
hlp->id_name, srv->index + 1, (int) strlen(r->buf));
srv->stats.uses++;