if (srv->queue.head) {
srv->flags.reserved = S_HELPER_DEFERRED;
- helperStatefulServerKickQueue(srv);
} else {
srv->flags.reserved = S_HELPER_FREE;
if ((srv->parent->OnEmptyQueue != NULL) && (srv->data))
srv->parent->OnEmptyQueue(srv->data);
-
- helperStatefulKickQueue(hlp);
}
+
+ helperStatefulServerKickQueue(srv);
}
/*
srv->flags.reserved = S_HELPER_FREE;
if (srv->parent->OnEmptyQueue != NULL && srv->data)
srv->parent->OnEmptyQueue(srv->data);
+
+ helperStatefulServerKickQueue(srv);
}
void *
if (!concurrency)
concurrency = 1;
- assert(srv->rfd == fd);
-
if (srv->rbuf) {
memFreeBuf(srv->rbuf_sz, srv->rbuf);
srv->rbuf = NULL;
helper_stateful_server *srv = (helper_stateful_server *)data;
statefulhelper *hlp = srv->parent;
helper_stateful_request *r;
- assert(srv->rfd == fd);
if (srv->rbuf) {
memFreeBuf(srv->rbuf_sz, srv->rbuf);
char *t = NULL;
helper_server *srv = (helper_server *)data;
helper *hlp = srv->parent;
- assert(fd == srv->rfd);
assert(cbdataReferenceValid(data));
/* Bail out early on COMM_ERR_CLOSING - close handlers will tidy up for us */
return;
}
+ assert(fd == srv->rfd);
+
debugs(84, 5, "helperHandleRead: " << len << " bytes from " << hlp->id_name << " #" << srv->index + 1);
if (flag != COMM_OK || len <= 0) {
srv->roffset -= (t - srv->rbuf);
memmove(srv->rbuf, t, srv->roffset + 1);
- if (srv->flags.shutdown) {
+ if (!srv->flags.shutdown) {
+ helperKickQueue(hlp);
+ } else if (!srv->flags.closing && !srv->stats.pending) {
int wfd = srv->wfd;
srv->wfd = -1;
+ if (srv->rfd == wfd)
+ srv->rfd = -1;
srv->flags.closing=1;
comm_close(wfd);
return;
- } else
- helperKickQueue(hlp);
+ }
}
- comm_read(fd, srv->rbuf + srv->roffset, srv->rbuf_sz - srv->roffset - 1, helperHandleRead, srv);
+ if (srv->rfd != -1)
+ comm_read(fd, srv->rbuf + srv->roffset, srv->rbuf_sz - srv->roffset - 1, helperHandleRead, srv);
}
static void
helper_stateful_server *srv = (helper_stateful_server *)data;
helper_stateful_request *r;
statefulhelper *hlp = srv->parent;
- assert(fd == srv->rfd);
assert(cbdataReferenceValid(data));
/* Bail out early on COMM_ERR_CLOSING - close handlers will tidy up for us */
return;
}
+ assert(fd == srv->rfd);
+
debugs(84, 5, "helperStatefulHandleRead: " << len << " bytes from " <<
hlp->id_name << " #" << srv->index + 1);
tvSubMsec(srv->dispatch_time, current_time),
hlp->stats.replies, REDIRECT_AV_FACTOR);
- if (srv->flags.shutdown
- && srv->flags.reserved == S_HELPER_FREE
- && !srv->deferred_requests) {
- int wfd = srv->wfd;
- srv->wfd = -1;
- srv->flags.closing=1;
- comm_close(wfd);
- } else {
- if (srv->queue.head)
- helperStatefulServerKickQueue(srv);
- else
- helperStatefulKickQueue(hlp);
- }
+ helperStatefulServerKickQueue(srv);
}
- comm_read(srv->rfd, srv->rbuf + srv->roffset, srv->rbuf_sz - srv->roffset - 1,
+ if (srv->rfd != -1)
+ comm_read(srv->rfd, srv->rbuf + srv->roffset, srv->rbuf_sz - srv->roffset - 1,
helperStatefulHandleRead, srv);
}
/* and push the queue. Note that the callback may have submitted a new
* request to the helper which is why we test for the request*/
- if (srv->request == NULL) {
- if (srv->flags.shutdown
- && srv->flags.reserved == S_HELPER_FREE
- && !srv->deferred_requests) {
- int wfd = srv->wfd;
- srv->wfd = -1;
- srv->flags.closing=1;
- comm_close(wfd);
- } else {
- if (srv->queue.head)
- helperStatefulServerKickQueue(srv);
- else
- helperStatefulKickQueue(hlp);
- }
- }
+ if (srv->request == NULL)
+ helperStatefulServerKickQueue(srv);
return;
}
{
helper_stateful_request *r;
- if ((r = StatefulServerDequeue(srv)))
+ if ((r = StatefulServerDequeue(srv))) {
helperStatefulDispatch(srv, r);
+ return;
+ }
+
+ if (!srv->flags.shutdown) {
+ helperStatefulKickQueue(srv->parent);
+ } else if (!srv->flags.closing && srv->flags.reserved == S_HELPER_FREE && !srv->flags.busy) {
+ int wfd = srv->wfd;
+ srv->wfd = -1;
+ if (srv->rfd == wfd)
+ srv->rfd = -1;
+ srv->flags.closing=1;
+ comm_close(wfd);
+ return;
+ }
}
static void