]> git.ipfire.org Git - thirdparty/squid.git/blob - src/helper.cc
SourceFormat: enforcement
[thirdparty/squid.git] / src / helper.cc
1 /*
2 * $Id$
3 *
4 * DEBUG: section 84 Helper process maintenance
5 * AUTHOR: Harvest Derived?
6 *
7 * SQUID Web Proxy Cache http://www.squid-cache.org/
8 * ----------------------------------------------------------
9 *
10 * Squid is the result of efforts by numerous individuals from
11 * the Internet community; see the CONTRIBUTORS file for full
12 * details. Many organizations have provided support for Squid's
13 * development; see the SPONSORS file for full details. Squid is
14 * Copyrighted (C) 2001 by the Regents of the University of
15 * California; see the COPYRIGHT file for full details. Squid
16 * incorporates software developed and/or copyrighted by other
17 * sources; see the CREDITS file for full details.
18 *
19 * This program is free software; you can redistribute it and/or modify
20 * it under the terms of the GNU General Public License as published by
21 * the Free Software Foundation; either version 2 of the License, or
22 * (at your option) any later version.
23 *
24 * This program is distributed in the hope that it will be useful,
25 * but WITHOUT ANY WARRANTY; without even the implied warranty of
26 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
27 * GNU General Public License for more details.
28 *
29 * You should have received a copy of the GNU General Public License
30 * along with this program; if not, write to the Free Software
31 * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111, USA.
32 *
33 */
34
35 #include "squid.h"
36 #include "helper.h"
37 #include "SquidTime.h"
38 #include "Store.h"
39 #include "comm.h"
40 #include "MemBuf.h"
41 #include "wordlist.h"
42
43 #define HELPER_MAX_ARGS 64
44
45 /* size of helper read buffer (maximum?). no reason given for this size */
46 /* though it has been seen to be too short for some requests */
47 /* it is dynamic, so increasng should not have side effects */
48 #define BUF_8KB 8192
49
50 static IOCB helperHandleRead;
51 static IOCB helperStatefulHandleRead;
52 static PF helperServerFree;
53 static PF helperStatefulServerFree;
54 static void Enqueue(helper * hlp, helper_request *);
55 static helper_request *Dequeue(helper * hlp);
56 static helper_stateful_request *StatefulDequeue(statefulhelper * hlp);
57 static helper_server *GetFirstAvailable(helper * hlp);
58 static helper_stateful_server *StatefulGetFirstAvailable(statefulhelper * hlp);
59 static void helperDispatch(helper_server * srv, helper_request * r);
60 static void helperStatefulDispatch(helper_stateful_server * srv, helper_stateful_request * r);
61 static void helperKickQueue(helper * hlp);
62 static void helperStatefulKickQueue(statefulhelper * hlp);
63 static void helperStatefulServerDone(helper_stateful_server * srv);
64 static void helperRequestFree(helper_request * r);
65 static void helperStatefulRequestFree(helper_stateful_request * r);
66 static void StatefulEnqueue(statefulhelper * hlp, helper_stateful_request * r);
67 static bool helperStartStats(StoreEntry *sentry, void *hlp, const char *label);
68
69
70 CBDATA_TYPE(helper);
71 CBDATA_TYPE(helper_server);
72 CBDATA_TYPE(statefulhelper);
73 CBDATA_TYPE(helper_stateful_server);
74
75 void
76 helperOpenServers(helper * hlp)
77 {
78 char *s;
79 char *progname;
80 char *shortname;
81 char *procname;
82 const char *args[HELPER_MAX_ARGS+1]; // save space for a NULL terminator
83 char fd_note_buf[FD_DESC_SZ];
84 helper_server *srv;
85 int nargs = 0;
86 int k;
87 pid_t pid;
88 int rfd;
89 int wfd;
90 void * hIpc;
91 wordlist *w;
92
93 if (hlp->cmdline == NULL)
94 return;
95
96 progname = hlp->cmdline->key;
97
98 if ((s = strrchr(progname, '/')))
99 shortname = xstrdup(s + 1);
100 else
101 shortname = xstrdup(progname);
102
103 /* dont ever start more than hlp->n_to_start processes. */
104 int need_new = hlp->n_to_start - hlp->n_active;
105
106 debugs(84, 1, "helperOpenServers: Starting " << need_new << "/" << hlp->n_to_start << " '" << shortname << "' processes");
107
108 if (need_new < 1) {
109 debugs(84, 1, "helperOpenServers: No '" << shortname << "' processes needed.");
110 }
111
112 procname = (char *)xmalloc(strlen(shortname) + 3);
113
114 snprintf(procname, strlen(shortname) + 3, "(%s)", shortname);
115
116 args[nargs++] = procname;
117
118 for (w = hlp->cmdline->next; w && nargs < HELPER_MAX_ARGS; w = w->next)
119 args[nargs++] = w->key;
120
121 args[nargs++] = NULL;
122
123 assert(nargs <= HELPER_MAX_ARGS);
124
125 for (k = 0; k < need_new; k++) {
126 getCurrentTime();
127 rfd = wfd = -1;
128 pid = ipcCreate(hlp->ipc_type,
129 progname,
130 args,
131 shortname,
132 hlp->addr,
133 &rfd,
134 &wfd,
135 &hIpc);
136
137 if (pid < 0) {
138 debugs(84, 1, "WARNING: Cannot run '" << progname << "' process.");
139 continue;
140 }
141
142 hlp->n_running++;
143 hlp->n_active++;
144 CBDATA_INIT_TYPE(helper_server);
145 srv = cbdataAlloc(helper_server);
146 srv->hIpc = hIpc;
147 srv->pid = pid;
148 srv->index = k;
149 srv->addr = hlp->addr;
150 srv->rfd = rfd;
151 srv->wfd = wfd;
152 srv->rbuf = (char *)memAllocBuf(BUF_8KB, &srv->rbuf_sz);
153 srv->wqueue = new MemBuf;
154 srv->roffset = 0;
155 srv->requests = (helper_request **)xcalloc(hlp->concurrency ? hlp->concurrency : 1, sizeof(*srv->requests));
156 srv->parent = cbdataReference(hlp);
157 dlinkAddTail(srv, &srv->link, &hlp->servers);
158
159 if (rfd == wfd) {
160 snprintf(fd_note_buf, FD_DESC_SZ, "%s #%d", shortname, k + 1);
161 fd_note(rfd, fd_note_buf);
162 } else {
163 snprintf(fd_note_buf, FD_DESC_SZ, "reading %s #%d", shortname, k + 1);
164 fd_note(rfd, fd_note_buf);
165 snprintf(fd_note_buf, FD_DESC_SZ, "writing %s #%d", shortname, k + 1);
166 fd_note(wfd, fd_note_buf);
167 }
168
169 commSetNonBlocking(rfd);
170
171 if (wfd != rfd)
172 commSetNonBlocking(wfd);
173
174 comm_add_close_handler(rfd, helperServerFree, srv);
175
176 comm_read(srv->rfd, srv->rbuf, srv->rbuf_sz - 1, helperHandleRead, srv);
177 }
178
179 hlp->last_restart = squid_curtime;
180 safe_free(shortname);
181 safe_free(procname);
182 helperKickQueue(hlp);
183 }
184
185 /**
186 * DPW 2007-05-08
187 *
188 * helperStatefulOpenServers: create the stateful child helper processes
189 */
190 void
191 helperStatefulOpenServers(statefulhelper * hlp)
192 {
193 char *shortname;
194 const char *args[HELPER_MAX_ARGS+1]; // save space for a NULL terminator
195 char fd_note_buf[FD_DESC_SZ];
196 int nargs = 0;
197
198 if (hlp->cmdline == NULL)
199 return;
200
201 char *progname = hlp->cmdline->key;
202
203 char *s;
204 if ((s = strrchr(progname, '/')))
205 shortname = xstrdup(s + 1);
206 else
207 shortname = xstrdup(progname);
208
209 /* dont ever start more than hlp->n_to_start processes. */
210 /* n_active are the helpers which have not been shut down. */
211 int need_new = hlp->n_to_start - hlp->n_active;
212
213 debugs(84, 1, "helperOpenServers: Starting " << need_new << "/" << hlp->n_to_start << " '" << shortname << "' processes");
214
215 if (need_new < 1) {
216 debugs(84, 1, "helperStatefulOpenServers: No '" << shortname << "' processes needed.");
217 }
218
219 char *procname = (char *)xmalloc(strlen(shortname) + 3);
220
221 snprintf(procname, strlen(shortname) + 3, "(%s)", shortname);
222
223 args[nargs++] = procname;
224
225 for (wordlist *w = hlp->cmdline->next; w && nargs < HELPER_MAX_ARGS; w = w->next)
226 args[nargs++] = w->key;
227
228 args[nargs++] = NULL;
229
230 assert(nargs <= HELPER_MAX_ARGS);
231
232 for (int k = 0; k < need_new; k++) {
233 getCurrentTime();
234 int rfd = -1;
235 int wfd = -1;
236 void * hIpc;
237 pid_t pid = ipcCreate(hlp->ipc_type,
238 progname,
239 args,
240 shortname,
241 hlp->addr,
242 &rfd,
243 &wfd,
244 &hIpc);
245
246 if (pid < 0) {
247 debugs(84, 1, "WARNING: Cannot run '" << progname << "' process.");
248 continue;
249 }
250
251 hlp->n_running++;
252 hlp->n_active++;
253 CBDATA_INIT_TYPE(helper_stateful_server);
254 helper_stateful_server *srv = cbdataAlloc(helper_stateful_server);
255 srv->hIpc = hIpc;
256 srv->pid = pid;
257 srv->flags.reserved = 0;
258 srv->stats.submits = 0;
259 srv->stats.releases = 0;
260 srv->index = k;
261 srv->addr = hlp->addr;
262 srv->rfd = rfd;
263 srv->wfd = wfd;
264 srv->rbuf = (char *)memAllocBuf(BUF_8KB, &srv->rbuf_sz);
265 srv->roffset = 0;
266 srv->parent = cbdataReference(hlp);
267
268 if (hlp->datapool != NULL)
269 srv->data = hlp->datapool->alloc();
270
271 dlinkAddTail(srv, &srv->link, &hlp->servers);
272
273 if (rfd == wfd) {
274 snprintf(fd_note_buf, FD_DESC_SZ, "%s #%d", shortname, k + 1);
275 fd_note(rfd, fd_note_buf);
276 } else {
277 snprintf(fd_note_buf, FD_DESC_SZ, "reading %s #%d", shortname, k + 1);
278 fd_note(rfd, fd_note_buf);
279 snprintf(fd_note_buf, FD_DESC_SZ, "writing %s #%d", shortname, k + 1);
280 fd_note(wfd, fd_note_buf);
281 }
282
283 commSetNonBlocking(rfd);
284
285 if (wfd != rfd)
286 commSetNonBlocking(wfd);
287
288 comm_add_close_handler(rfd, helperStatefulServerFree, srv);
289
290 comm_read(srv->rfd, srv->rbuf, srv->rbuf_sz - 1, helperStatefulHandleRead, srv);
291 }
292
293 hlp->last_restart = squid_curtime;
294 safe_free(shortname);
295 safe_free(procname);
296 helperStatefulKickQueue(hlp);
297 }
298
299
300 void
301 helperSubmit(helper * hlp, const char *buf, HLPCB * callback, void *data)
302 {
303 if (hlp == NULL) {
304 debugs(84, 3, "helperSubmit: hlp == NULL");
305 callback(data, NULL);
306 return;
307 }
308
309 helper_request *r = new helper_request;
310 helper_server *srv;
311
312 r->callback = callback;
313 r->data = cbdataReference(data);
314 r->buf = xstrdup(buf);
315
316 if ((srv = GetFirstAvailable(hlp)))
317 helperDispatch(srv, r);
318 else
319 Enqueue(hlp, r);
320
321 debugs(84, 9, "helperSubmit: " << buf);
322 }
323
324 /// lastserver = "server last used as part of a reserved request sequence"
325 void
326 helperStatefulSubmit(statefulhelper * hlp, const char *buf, HLPSCB * callback, void *data, helper_stateful_server * lastserver)
327 {
328 if (hlp == NULL) {
329 debugs(84, 3, "helperStatefulSubmit: hlp == NULL");
330 callback(data, 0, NULL);
331 return;
332 }
333
334 helper_stateful_request *r = new helper_stateful_request;
335
336 r->callback = callback;
337 r->data = cbdataReference(data);
338
339 if (buf != NULL) {
340 r->buf = xstrdup(buf);
341 r->placeholder = 0;
342 } else {
343 r->buf = NULL;
344 r->placeholder = 1;
345 }
346
347 if ((buf != NULL) && lastserver) {
348 debugs(84, 5, "StatefulSubmit with lastserver " << lastserver);
349 assert(lastserver->flags.reserved);
350 assert(!(lastserver->request));
351
352 debugs(84, 5, "StatefulSubmit dispatching");
353 helperStatefulDispatch(lastserver, r);
354 } else {
355 helper_stateful_server *srv;
356 if ((srv = StatefulGetFirstAvailable(hlp))) {
357 helperStatefulDispatch(srv, r);
358 } else
359 StatefulEnqueue(hlp, r);
360 }
361
362 debugs(84, 9, "helperStatefulSubmit: placeholder: '" << r->placeholder << "', buf '" << buf << "'.");
363 }
364
365 /**
366 * DPW 2007-05-08
367 *
368 * helperStatefulReleaseServer tells the helper that whoever was
369 * using it no longer needs its services.
370 */
371 void
372 helperStatefulReleaseServer(helper_stateful_server * srv)
373 {
374 debugs(84, 3, HERE << "srv-" << srv->index << " flags.reserved = " << srv->flags.reserved);
375 if (!srv->flags.reserved)
376 return;
377
378 srv->stats.releases++;
379
380 srv->flags.reserved = 0;
381 if (srv->parent->OnEmptyQueue != NULL && srv->data)
382 srv->parent->OnEmptyQueue(srv->data);
383
384 helperStatefulServerDone(srv);
385 }
386
387 /** return a pointer to the stateful routines data area */
388 void *
389 helperStatefulServerGetData(helper_stateful_server * srv)
390 {
391 return srv->data;
392 }
393
394 /**
395 * Dump some stats about the helper states to a StoreEntry
396 */
397 void
398 helperStats(StoreEntry * sentry, helper * hlp, const char *label)
399 {
400 if (!helperStartStats(sentry, hlp, label))
401 return;
402
403 storeAppendPrintf(sentry, "program: %s\n",
404 hlp->cmdline->key);
405 storeAppendPrintf(sentry, "number active: %d of %d (%d shutting down)\n",
406 hlp->n_active, hlp->n_to_start, (hlp->n_running - hlp->n_active) );
407 storeAppendPrintf(sentry, "requests sent: %d\n",
408 hlp->stats.requests);
409 storeAppendPrintf(sentry, "replies received: %d\n",
410 hlp->stats.replies);
411 storeAppendPrintf(sentry, "queue length: %d\n",
412 hlp->stats.queue_size);
413 storeAppendPrintf(sentry, "avg service time: %d msec\n",
414 hlp->stats.avg_svc_time);
415 storeAppendPrintf(sentry, "\n");
416 storeAppendPrintf(sentry, "%7s\t%7s\t%7s\t%11s\t%s\t%7s\t%7s\t%7s\n",
417 "#",
418 "FD",
419 "PID",
420 "# Requests",
421 "Flags",
422 "Time",
423 "Offset",
424 "Request");
425
426 for (dlink_node *link = hlp->servers.head; link; link = link->next) {
427 helper_server *srv = (helper_server*)link->data;
428 double tt = 0.001 * (srv->requests[0] ? tvSubMsec(srv->requests[0]->dispatch_time, current_time) : tvSubMsec(srv->dispatch_time, srv->answer_time));
429 storeAppendPrintf(sentry, "%7d\t%7d\t%7d\t%11d\t%c%c%c%c\t%7.3f\t%7d\t%s\n",
430 srv->index + 1,
431 srv->rfd,
432 srv->pid,
433 srv->stats.uses,
434 srv->stats.pending ? 'B' : ' ',
435 srv->flags.writing ? 'W' : ' ',
436 srv->flags.closing ? 'C' : ' ',
437 srv->flags.shutdown ? 'S' : ' ',
438 tt < 0.0 ? 0.0 : tt,
439 (int) srv->roffset,
440 srv->requests[0] ? log_quote(srv->requests[0]->buf) : "(none)");
441 }
442
443 storeAppendPrintf(sentry, "\nFlags key:\n\n");
444 storeAppendPrintf(sentry, " B = BUSY\n");
445 storeAppendPrintf(sentry, " W = WRITING\n");
446 storeAppendPrintf(sentry, " C = CLOSING\n");
447 storeAppendPrintf(sentry, " S = SHUTDOWN PENDING\n");
448 }
449
450 void
451 helperStatefulStats(StoreEntry * sentry, statefulhelper * hlp, const char *label)
452 {
453 if (!helperStartStats(sentry, hlp, label))
454 return;
455
456 storeAppendPrintf(sentry, "program: %s\n",
457 hlp->cmdline->key);
458 storeAppendPrintf(sentry, "number active: %d of %d (%d shutting down)\n",
459 hlp->n_active, hlp->n_to_start, (hlp->n_running - hlp->n_active) );
460 storeAppendPrintf(sentry, "requests sent: %d\n",
461 hlp->stats.requests);
462 storeAppendPrintf(sentry, "replies received: %d\n",
463 hlp->stats.replies);
464 storeAppendPrintf(sentry, "queue length: %d\n",
465 hlp->stats.queue_size);
466 storeAppendPrintf(sentry, "avg service time: %d msec\n",
467 hlp->stats.avg_svc_time);
468 storeAppendPrintf(sentry, "\n");
469 storeAppendPrintf(sentry, "%7s\t%7s\t%7s\t%11s\t%6s\t%7s\t%7s\t%7s\n",
470 "#",
471 "FD",
472 "PID",
473 "# Requests",
474 "Flags",
475 "Time",
476 "Offset",
477 "Request");
478
479 for (dlink_node *link = hlp->servers.head; link; link = link->next) {
480 helper_stateful_server *srv = (helper_stateful_server *)link->data;
481 double tt = 0.001 * tvSubMsec(srv->dispatch_time, srv->flags.busy ? current_time : srv->answer_time);
482 storeAppendPrintf(sentry, "%7d\t%7d\t%7d\t%11d\t%c%c%c%c%c\t%7.3f\t%7d\t%s\n",
483 srv->index + 1,
484 srv->rfd,
485 srv->pid,
486 srv->stats.uses,
487 srv->flags.busy ? 'B' : ' ',
488 srv->flags.closing ? 'C' : ' ',
489 srv->flags.reserved ? 'R' : ' ',
490 srv->flags.shutdown ? 'S' : ' ',
491 srv->request ? (srv->request->placeholder ? 'P' : ' ') : ' ',
492 tt < 0.0 ? 0.0 : tt,
493 (int) srv->roffset,
494 srv->request ? log_quote(srv->request->buf) : "(none)");
495 }
496
497 storeAppendPrintf(sentry, "\nFlags key:\n\n");
498 storeAppendPrintf(sentry, " B = BUSY\n");
499 storeAppendPrintf(sentry, " C = CLOSING\n");
500 storeAppendPrintf(sentry, " R = RESERVED\n");
501 storeAppendPrintf(sentry, " S = SHUTDOWN PENDING\n");
502 storeAppendPrintf(sentry, " P = PLACEHOLDER\n");
503 }
504
505 void
506 helperShutdown(helper * hlp)
507 {
508 dlink_node *link = hlp->servers.head;
509 #ifdef _SQUID_MSWIN_
510
511 HANDLE hIpc;
512 pid_t pid;
513 int no;
514 #endif
515
516 while (link) {
517 helper_server *srv;
518 srv = (helper_server *)link->data;
519 link = link->next;
520
521 if (srv->flags.shutdown) {
522 debugs(84, 3, "helperShutdown: " << hlp->id_name << " #" << srv->index + 1 << " has already SHUT DOWN.");
523 continue;
524 }
525
526 hlp->n_active--;
527 assert(hlp->n_active >= 0);
528 srv->flags.shutdown = 1; /* request it to shut itself down */
529
530 if (srv->flags.closing) {
531 debugs(84, 3, "helperShutdown: " << hlp->id_name << " #" << srv->index + 1 << " is CLOSING.");
532 continue;
533 }
534
535 if (srv->stats.pending) {
536 debugs(84, 3, "helperShutdown: " << hlp->id_name << " #" << srv->index + 1 << " is BUSY.");
537 continue;
538 }
539
540 srv->flags.closing = 1;
541 #ifdef _SQUID_MSWIN_
542
543 hIpc = srv->hIpc;
544 pid = srv->pid;
545 no = srv->index + 1;
546 shutdown(srv->wfd, SD_BOTH);
547 #endif
548
549 debugs(84, 3, "helperShutdown: " << hlp->id_name << " #" << srv->index + 1 << " shutting down.");
550 /* the rest of the details is dealt with in the helperServerFree
551 * close handler
552 */
553 comm_close(srv->rfd);
554 #ifdef _SQUID_MSWIN_
555
556 if (hIpc) {
557 if (WaitForSingleObject(hIpc, 5000) != WAIT_OBJECT_0) {
558 getCurrentTime();
559 debugs(84, 1, "helperShutdown: WARNING: " << hlp->id_name <<
560 " #" << no << " (" << hlp->cmdline->key << "," <<
561 (long int)pid << ") didn't exit in 5 seconds");
562
563 }
564
565 CloseHandle(hIpc);
566 }
567
568 #endif
569
570 }
571 }
572
573 void
574 helperStatefulShutdown(statefulhelper * hlp)
575 {
576 dlink_node *link = hlp->servers.head;
577 helper_stateful_server *srv;
578 #ifdef _SQUID_MSWIN_
579
580 HANDLE hIpc;
581 pid_t pid;
582 int no;
583 #endif
584
585 while (link) {
586 srv = (helper_stateful_server *)link->data;
587 link = link->next;
588
589 if (srv->flags.shutdown) {
590 debugs(84, 3, "helperStatefulShutdown: " << hlp->id_name << " #" << srv->index + 1 << " has already SHUT DOWN.");
591 continue;
592 }
593
594 hlp->n_active--;
595 assert(hlp->n_active >= 0);
596 srv->flags.shutdown = 1; /* request it to shut itself down */
597
598 if (srv->flags.busy) {
599 debugs(84, 3, "helperStatefulShutdown: " << hlp->id_name << " #" << srv->index + 1 << " is BUSY.");
600 continue;
601 }
602
603 if (srv->flags.closing) {
604 debugs(84, 3, "helperStatefulShutdown: " << hlp->id_name << " #" << srv->index + 1 << " is CLOSING.");
605 continue;
606 }
607
608 if (srv->flags.reserved) {
609 if (shutting_down) {
610 debugs(84, 3, "helperStatefulShutdown: " << hlp->id_name << " #" << srv->index + 1 << " is RESERVED. Closing anyway.");
611 } else {
612 debugs(84, 3, "helperStatefulShutdown: " << hlp->id_name << " #" << srv->index + 1 << " is RESERVED. Not Shutting Down Yet.");
613 continue;
614 }
615 }
616
617 srv->flags.closing = 1;
618 #ifdef _SQUID_MSWIN_
619
620 hIpc = srv->hIpc;
621 pid = srv->pid;
622 no = srv->index + 1;
623 shutdown(srv->wfd, SD_BOTH);
624 #endif
625
626 debugs(84, 3, "helperStatefulShutdown: " << hlp->id_name << " #" << srv->index + 1 << " shutting down.");
627
628 /* the rest of the details is dealt with in the helperStatefulServerFree
629 * close handler
630 */
631 comm_close(srv->rfd);
632 #ifdef _SQUID_MSWIN_
633
634 if (hIpc) {
635 if (WaitForSingleObject(hIpc, 5000) != WAIT_OBJECT_0) {
636 getCurrentTime();
637 debugs(84, 1, "helperShutdown: WARNING: " << hlp->id_name <<
638 " #" << no << " (" << hlp->cmdline->key << "," <<
639 (long int)pid << ") didn't exit in 5 seconds");
640 }
641
642 CloseHandle(hIpc);
643 }
644
645 #endif
646
647 }
648 }
649
650
651 helper *
652 helperCreate(const char *name)
653 {
654 helper *hlp;
655 CBDATA_INIT_TYPE(helper);
656 hlp = cbdataAlloc(helper);
657 hlp->id_name = name;
658 return hlp;
659 }
660
661 statefulhelper *
662 helperStatefulCreate(const char *name)
663 {
664 statefulhelper *hlp;
665 CBDATA_INIT_TYPE(statefulhelper);
666 hlp = cbdataAlloc(statefulhelper);
667 hlp->id_name = name;
668 return hlp;
669 }
670
671
672 void
673 helperFree(helper * hlp)
674 {
675 if (!hlp)
676 return;
677
678 /* note, don't free hlp->name, it probably points to static memory */
679 if (hlp->queue.head)
680 debugs(84, 0, "WARNING: freeing " << hlp->id_name << " helper with " <<
681 hlp->stats.queue_size << " requests queued");
682
683 cbdataFree(hlp);
684 }
685
686 void
687 helperStatefulFree(statefulhelper * hlp)
688 {
689 if (!hlp)
690 return;
691
692 /* note, don't free hlp->name, it probably points to static memory */
693 if (hlp->queue.head)
694 debugs(84, 0, "WARNING: freeing " << hlp->id_name << " helper with " <<
695 hlp->stats.queue_size << " requests queued");
696
697 cbdataFree(hlp);
698 }
699
700
701 /* ====================================================================== */
702 /* LOCAL FUNCTIONS */
703 /* ====================================================================== */
704
705 static void
706 helperServerFree(int fd, void *data)
707 {
708 helper_server *srv = (helper_server *)data;
709 helper *hlp = srv->parent;
710 helper_request *r;
711 int i, concurrency = hlp->concurrency;
712
713 if (!concurrency)
714 concurrency = 1;
715
716 if (srv->rbuf) {
717 memFreeBuf(srv->rbuf_sz, srv->rbuf);
718 srv->rbuf = NULL;
719 }
720
721 srv->wqueue->clean();
722 delete srv->wqueue;
723
724 if (srv->writebuf) {
725 srv->writebuf->clean();
726 delete srv->writebuf;
727 srv->writebuf = NULL;
728 }
729
730 for (i = 0; i < concurrency; i++) {
731 if ((r = srv->requests[i])) {
732 void *cbdata;
733
734 if (cbdataReferenceValidDone(r->data, &cbdata))
735 r->callback(cbdata, NULL);
736
737 helperRequestFree(r);
738
739 srv->requests[i] = NULL;
740 }
741 }
742
743 safe_free(srv->requests);
744
745 if (srv->wfd != srv->rfd && srv->wfd != -1)
746 comm_close(srv->wfd);
747
748 dlinkDelete(&srv->link, &hlp->servers);
749
750 hlp->n_running--;
751
752 assert(hlp->n_running >= 0);
753
754 if (!srv->flags.shutdown) {
755 hlp->n_active--;
756 assert(hlp->n_active >= 0);
757 debugs(84, 0, "WARNING: " << hlp->id_name << " #" << srv->index + 1 <<
758 " (FD " << fd << ") exited");
759
760 if (hlp->n_active < hlp->n_to_start / 2) {
761 debugs(80, 0, "Too few " << hlp->id_name << " processes are running");
762
763 if (hlp->last_restart > squid_curtime - 30)
764 fatalf("The %s helpers are crashing too rapidly, need help!\n", hlp->id_name);
765
766 debugs(80, 0, "Starting new helpers");
767
768 helperOpenServers(hlp);
769 }
770 }
771
772 cbdataReferenceDone(srv->parent);
773 cbdataFree(srv);
774 }
775
776 static void
777 helperStatefulServerFree(int fd, void *data)
778 {
779 helper_stateful_server *srv = (helper_stateful_server *)data;
780 statefulhelper *hlp = srv->parent;
781 helper_stateful_request *r;
782
783 if (srv->rbuf) {
784 memFreeBuf(srv->rbuf_sz, srv->rbuf);
785 srv->rbuf = NULL;
786 }
787
788 #if 0
789 srv->wqueue->clean();
790
791 delete srv->wqueue;
792
793 #endif
794
795 if ((r = srv->request)) {
796 void *cbdata;
797
798 if (cbdataReferenceValidDone(r->data, &cbdata))
799 r->callback(cbdata, srv, NULL);
800
801 helperStatefulRequestFree(r);
802
803 srv->request = NULL;
804 }
805
806 /* TODO: walk the local queue of requests and carry them all out */
807 if (srv->wfd != srv->rfd && srv->wfd != -1)
808 comm_close(srv->wfd);
809
810 dlinkDelete(&srv->link, &hlp->servers);
811
812 hlp->n_running--;
813
814 assert(hlp->n_running >= 0);
815
816 if (!srv->flags.shutdown) {
817 hlp->n_active--;
818 assert( hlp->n_active >= 0);
819 debugs(84, 0, "WARNING: " << hlp->id_name << " #" << srv->index + 1 << " (FD " << fd << ") exited");
820
821 if (hlp->n_active <= hlp->n_to_start / 2) {
822 debugs(80, 0, "Too few " << hlp->id_name << " processes are running");
823
824 if (hlp->last_restart > squid_curtime - 30)
825 fatalf("The %s helpers are crashing too rapidly, need help!\n", hlp->id_name);
826
827 debugs(80, 0, "Starting new helpers");
828
829 helperStatefulOpenServers(hlp);
830 }
831 }
832
833 if (srv->data != NULL)
834 hlp->datapool->free(srv->data);
835
836 cbdataReferenceDone(srv->parent);
837
838 cbdataFree(srv);
839 }
840
841
842 static void
843 helperHandleRead(int fd, char *buf, size_t len, comm_err_t flag, int xerrno, void *data)
844 {
845 char *t = NULL;
846 helper_server *srv = (helper_server *)data;
847 helper *hlp = srv->parent;
848 assert(cbdataReferenceValid(data));
849
850 /* Bail out early on COMM_ERR_CLOSING - close handlers will tidy up for us */
851
852 if (flag == COMM_ERR_CLOSING) {
853 return;
854 }
855
856 assert(fd == srv->rfd);
857
858 debugs(84, 5, "helperHandleRead: " << len << " bytes from " << hlp->id_name << " #" << srv->index + 1);
859
860 if (flag != COMM_OK || len <= 0) {
861 if (len < 0)
862 debugs(84, 1, "helperHandleRead: FD " << fd << " read: " << xstrerror());
863
864 comm_close(fd);
865
866 return;
867 }
868
869 srv->roffset += len;
870 srv->rbuf[srv->roffset] = '\0';
871 debugs(84, 9, "helperHandleRead: '" << srv->rbuf << "'");
872
873 if (!srv->stats.pending) {
874 /* someone spoke without being spoken to */
875 debugs(84, 1, "helperHandleRead: unexpected read from " <<
876 hlp->id_name << " #" << srv->index + 1 << ", " << (int)len <<
877 " bytes '" << srv->rbuf << "'");
878
879 srv->roffset = 0;
880 srv->rbuf[0] = '\0';
881 }
882
883 while ((t = strchr(srv->rbuf, '\n'))) {
884 /* end of reply found */
885 helper_request *r;
886 char *msg = srv->rbuf;
887 int i = 0;
888 debugs(84, 3, "helperHandleRead: end of reply found");
889
890 if (t > srv->rbuf && t[-1] == '\r')
891 t[-1] = '\0';
892
893 *t++ = '\0';
894
895 if (hlp->concurrency) {
896 i = strtol(msg, &msg, 10);
897
898 while (*msg && xisspace(*msg))
899 msg++;
900 }
901
902 r = srv->requests[i];
903
904 if (r) {
905 HLPCB *callback = r->callback;
906 void *cbdata;
907
908 srv->requests[i] = NULL;
909
910 r->callback = NULL;
911
912 if (cbdataReferenceValidDone(r->data, &cbdata))
913 callback(cbdata, msg);
914
915 srv->stats.pending--;
916
917 hlp->stats.replies++;
918
919 srv->answer_time = current_time;
920
921 srv->dispatch_time = r->dispatch_time;
922
923 hlp->stats.avg_svc_time =
924 intAverage(hlp->stats.avg_svc_time,
925 tvSubMsec(r->dispatch_time, current_time),
926 hlp->stats.replies, REDIRECT_AV_FACTOR);
927
928 helperRequestFree(r);
929 } else {
930 debugs(84, 1, "helperHandleRead: unexpected reply on channel " <<
931 i << " from " << hlp->id_name << " #" << srv->index + 1 <<
932 " '" << srv->rbuf << "'");
933
934 }
935
936 srv->roffset -= (t - srv->rbuf);
937 memmove(srv->rbuf, t, srv->roffset + 1);
938
939 if (!srv->flags.shutdown) {
940 helperKickQueue(hlp);
941 } else if (!srv->flags.closing && !srv->stats.pending) {
942 int wfd = srv->wfd;
943 srv->wfd = -1;
944 if (srv->rfd == wfd)
945 srv->rfd = -1;
946 srv->flags.closing=1;
947 comm_close(wfd);
948 return;
949 }
950 }
951
952 if (srv->rfd != -1)
953 comm_read(fd, srv->rbuf + srv->roffset, srv->rbuf_sz - srv->roffset - 1, helperHandleRead, srv);
954 }
955
956 static void
957 helperStatefulHandleRead(int fd, char *buf, size_t len, comm_err_t flag, int xerrno, void *data)
958 {
959 char *t = NULL;
960 helper_stateful_server *srv = (helper_stateful_server *)data;
961 helper_stateful_request *r;
962 statefulhelper *hlp = srv->parent;
963 assert(cbdataReferenceValid(data));
964
965 /* Bail out early on COMM_ERR_CLOSING - close handlers will tidy up for us */
966
967 if (flag == COMM_ERR_CLOSING) {
968 return;
969 }
970
971 assert(fd == srv->rfd);
972
973 debugs(84, 5, "helperStatefulHandleRead: " << len << " bytes from " <<
974 hlp->id_name << " #" << srv->index + 1);
975
976
977 if (flag != COMM_OK || len <= 0) {
978 if (len < 0)
979 debugs(84, 1, "helperStatefulHandleRead: FD " << fd << " read: " << xstrerror());
980
981 comm_close(fd);
982
983 return;
984 }
985
986 srv->roffset += len;
987 srv->rbuf[srv->roffset] = '\0';
988 r = srv->request;
989
990 if (r == NULL) {
991 /* someone spoke without being spoken to */
992 debugs(84, 1, "helperStatefulHandleRead: unexpected read from " <<
993 hlp->id_name << " #" << srv->index + 1 << ", " << (int)len <<
994 " bytes '" << srv->rbuf << "'");
995
996 srv->roffset = 0;
997 }
998
999 if ((t = strchr(srv->rbuf, '\n'))) {
1000 /* end of reply found */
1001 int called = 1;
1002 debugs(84, 3, "helperStatefulHandleRead: end of reply found");
1003
1004 if (t > srv->rbuf && t[-1] == '\r')
1005 t[-1] = '\0';
1006
1007 *t = '\0';
1008
1009 if (r && cbdataReferenceValid(r->data)) {
1010 r->callback(r->data, srv, srv->rbuf);
1011 } else {
1012 debugs(84, 1, "StatefulHandleRead: no callback data registered");
1013 called = 0;
1014 }
1015
1016 srv->flags.busy = 0;
1017 srv->roffset = 0;
1018 helperStatefulRequestFree(r);
1019 srv->request = NULL;
1020 hlp->stats.replies++;
1021 srv->answer_time = current_time;
1022 hlp->stats.avg_svc_time =
1023 intAverage(hlp->stats.avg_svc_time,
1024 tvSubMsec(srv->dispatch_time, current_time),
1025 hlp->stats.replies, REDIRECT_AV_FACTOR);
1026
1027 if (called)
1028 helperStatefulServerDone(srv);
1029 else
1030 helperStatefulReleaseServer(srv);
1031 }
1032
1033 if (srv->rfd != -1)
1034 comm_read(srv->rfd, srv->rbuf + srv->roffset, srv->rbuf_sz - srv->roffset - 1,
1035 helperStatefulHandleRead, srv);
1036 }
1037
1038 static void
1039 Enqueue(helper * hlp, helper_request * r)
1040 {
1041 dlink_node *link = (dlink_node *)memAllocate(MEM_DLINK_NODE);
1042 dlinkAddTail(r, link, &hlp->queue);
1043 hlp->stats.queue_size++;
1044
1045 if (hlp->stats.queue_size < hlp->n_running)
1046 return;
1047
1048 if (squid_curtime - hlp->last_queue_warn < 600)
1049 return;
1050
1051 if (shutting_down || reconfiguring)
1052 return;
1053
1054 hlp->last_queue_warn = squid_curtime;
1055
1056 debugs(84, 0, "WARNING: All " << hlp->id_name << " processes are busy.");
1057 debugs(84, 0, "WARNING: " << hlp->stats.queue_size << " pending requests queued");
1058
1059
1060 if (hlp->stats.queue_size > hlp->n_running * 2)
1061 fatalf("Too many queued %s requests", hlp->id_name);
1062
1063 debugs(84, 1, "Consider increasing the number of " << hlp->id_name << " processes in your config file.");
1064
1065 }
1066
1067 static void
1068 StatefulEnqueue(statefulhelper * hlp, helper_stateful_request * r)
1069 {
1070 dlink_node *link = (dlink_node *)memAllocate(MEM_DLINK_NODE);
1071 dlinkAddTail(r, link, &hlp->queue);
1072 hlp->stats.queue_size++;
1073
1074 if (hlp->stats.queue_size < hlp->n_running)
1075 return;
1076
1077 if (hlp->stats.queue_size > hlp->n_running * 2)
1078 fatalf("Too many queued %s requests", hlp->id_name);
1079
1080 if (squid_curtime - hlp->last_queue_warn < 600)
1081 return;
1082
1083 if (shutting_down || reconfiguring)
1084 return;
1085
1086 hlp->last_queue_warn = squid_curtime;
1087
1088 debugs(84, 0, "WARNING: All " << hlp->id_name << " processes are busy.");
1089
1090 debugs(84, 0, "WARNING: " << hlp->stats.queue_size << " pending requests queued");
1091 debugs(84, 1, "Consider increasing the number of " << hlp->id_name << " processes in your config file.");
1092
1093 }
1094
1095 static helper_request *
1096 Dequeue(helper * hlp)
1097 {
1098 dlink_node *link;
1099 helper_request *r = NULL;
1100
1101 if ((link = hlp->queue.head)) {
1102 r = (helper_request *)link->data;
1103 dlinkDelete(link, &hlp->queue);
1104 memFree(link, MEM_DLINK_NODE);
1105 hlp->stats.queue_size--;
1106 }
1107
1108 return r;
1109 }
1110
1111 static helper_stateful_request *
1112 StatefulDequeue(statefulhelper * hlp)
1113 {
1114 dlink_node *link;
1115 helper_stateful_request *r = NULL;
1116
1117 if ((link = hlp->queue.head)) {
1118 r = (helper_stateful_request *)link->data;
1119 dlinkDelete(link, &hlp->queue);
1120 memFree(link, MEM_DLINK_NODE);
1121 hlp->stats.queue_size--;
1122 }
1123
1124 return r;
1125 }
1126
1127 static helper_server *
1128 GetFirstAvailable(helper * hlp)
1129 {
1130 dlink_node *n;
1131 helper_server *srv;
1132 helper_server *selected = NULL;
1133
1134 if (hlp->n_running == 0)
1135 return NULL;
1136
1137 /* Find "least" loaded helper (approx) */
1138 for (n = hlp->servers.head; n != NULL; n = n->next) {
1139 srv = (helper_server *)n->data;
1140
1141 if (selected && selected->stats.pending <= srv->stats.pending)
1142 continue;
1143
1144 if (srv->flags.shutdown)
1145 continue;
1146
1147 if (!srv->stats.pending)
1148 return srv;
1149
1150 if (selected) {
1151 selected = srv;
1152 break;
1153 }
1154
1155 selected = srv;
1156 }
1157
1158 /* Check for overload */
1159 if (!selected)
1160 return NULL;
1161
1162 if (selected->stats.pending >= (hlp->concurrency ? hlp->concurrency : 1))
1163 return NULL;
1164
1165 return selected;
1166 }
1167
1168 static helper_stateful_server *
1169 StatefulGetFirstAvailable(statefulhelper * hlp)
1170 {
1171 dlink_node *n;
1172 helper_stateful_server *srv = NULL;
1173 debugs(84, 5, "StatefulGetFirstAvailable: Running servers " << hlp->n_running);
1174
1175 if (hlp->n_running == 0)
1176 return NULL;
1177
1178 for (n = hlp->servers.head; n != NULL; n = n->next) {
1179 srv = (helper_stateful_server *)n->data;
1180
1181 if (srv->flags.busy)
1182 continue;
1183
1184 if (srv->flags.reserved)
1185 continue;
1186
1187 if (srv->flags.shutdown)
1188 continue;
1189
1190 if ((hlp->IsAvailable != NULL) && (srv->data != NULL) && !(hlp->IsAvailable(srv->data)))
1191 continue;
1192
1193 debugs(84, 5, "StatefulGetFirstAvailable: returning srv-" << srv->index);
1194 return srv;
1195 }
1196
1197 debugs(84, 5, "StatefulGetFirstAvailable: None available.");
1198 return NULL;
1199 }
1200
1201
1202 static void
1203 helperDispatchWriteDone(int fd, char *buf, size_t len, comm_err_t flag, int xerrno, void *data)
1204 {
1205 helper_server *srv = (helper_server *)data;
1206
1207 srv->writebuf->clean();
1208 delete srv->writebuf;
1209 srv->writebuf = NULL;
1210 srv->flags.writing = 0;
1211
1212 if (flag != COMM_OK) {
1213 /* Helper server has crashed */
1214 debugs(84, 0, "helperDispatch: Helper " << srv->parent->id_name << " #" << srv->index + 1 << " has crashed");
1215 return;
1216 }
1217
1218 if (!srv->wqueue->isNull()) {
1219 srv->writebuf = srv->wqueue;
1220 srv->wqueue = new MemBuf;
1221 srv->flags.writing = 1;
1222 comm_write(srv->wfd,
1223 srv->writebuf->content(),
1224 srv->writebuf->contentSize(),
1225 helperDispatchWriteDone, /* Handler */
1226 srv, NULL); /* Handler-data, freefunc */
1227 }
1228 }
1229
1230 static void
1231 helperDispatch(helper_server * srv, helper_request * r)
1232 {
1233 helper *hlp = srv->parent;
1234 helper_request **ptr = NULL;
1235 unsigned int slot;
1236
1237 if (!cbdataReferenceValid(r->data)) {
1238 debugs(84, 1, "helperDispatch: invalid callback data");
1239 helperRequestFree(r);
1240 return;
1241 }
1242
1243 for (slot = 0; slot < (hlp->concurrency ? hlp->concurrency : 1); slot++) {
1244 if (!srv->requests[slot]) {
1245 ptr = &srv->requests[slot];
1246 break;
1247 }
1248 }
1249
1250 assert(ptr);
1251 *ptr = r;
1252 srv->stats.pending += 1;
1253 r->dispatch_time = current_time;
1254
1255 if (srv->wqueue->isNull())
1256 srv->wqueue->init();
1257
1258 if (hlp->concurrency)
1259 srv->wqueue->Printf("%d %s", slot, r->buf);
1260 else
1261 srv->wqueue->append(r->buf, strlen(r->buf));
1262
1263 if (!srv->flags.writing) {
1264 assert(NULL == srv->writebuf);
1265 srv->writebuf = srv->wqueue;
1266 srv->wqueue = new MemBuf;
1267 srv->flags.writing = 1;
1268 comm_write(srv->wfd,
1269 srv->writebuf->content(),
1270 srv->writebuf->contentSize(),
1271 helperDispatchWriteDone, /* Handler */
1272 srv, NULL); /* Handler-data, free func */
1273 }
1274
1275 debugs(84, 5, "helperDispatch: Request sent to " << hlp->id_name << " #" << srv->index + 1 << ", " << strlen(r->buf) << " bytes");
1276
1277 srv->stats.uses++;
1278 hlp->stats.requests++;
1279 }
1280
1281 static void
1282 helperStatefulDispatchWriteDone(int fd, char *buf, size_t len, comm_err_t flag,
1283 int xerrno, void *data)
1284 {
1285 /* nothing! */
1286 }
1287
1288
1289 static void
1290 helperStatefulDispatch(helper_stateful_server * srv, helper_stateful_request * r)
1291 {
1292 statefulhelper *hlp = srv->parent;
1293
1294 if (!cbdataReferenceValid(r->data)) {
1295 debugs(84, 1, "helperStatefulDispatch: invalid callback data");
1296 helperStatefulRequestFree(r);
1297 helperStatefulReleaseServer(srv);
1298 return;
1299 }
1300
1301 debugs(84, 9, "helperStatefulDispatch busying helper " << hlp->id_name << " #" << srv->index + 1);
1302
1303 if (r->placeholder == 1) {
1304 /* a callback is needed before this request can _use_ a helper. */
1305 /* we don't care about releasing this helper. The request NEVER
1306 * gets to the helper. So we throw away the return code */
1307 r->callback(r->data, srv, NULL);
1308 /* throw away the placeholder */
1309 helperStatefulRequestFree(r);
1310 /* and push the queue. Note that the callback may have submitted a new
1311 * request to the helper which is why we test for the request*/
1312
1313 if (srv->request == NULL)
1314 helperStatefulServerDone(srv);
1315
1316 return;
1317 }
1318
1319 srv->flags.busy = 1;
1320 srv->flags.reserved = 1;
1321 srv->request = r;
1322 srv->dispatch_time = current_time;
1323 comm_write(srv->wfd,
1324 r->buf,
1325 strlen(r->buf),
1326 helperStatefulDispatchWriteDone, /* Handler */
1327 hlp, NULL); /* Handler-data, free func */
1328 debugs(84, 5, "helperStatefulDispatch: Request sent to " <<
1329 hlp->id_name << " #" << srv->index + 1 << ", " <<
1330 (int) strlen(r->buf) << " bytes");
1331
1332 srv->stats.uses++;
1333 hlp->stats.requests++;
1334 }
1335
1336
1337 static void
1338 helperKickQueue(helper * hlp)
1339 {
1340 helper_request *r;
1341 helper_server *srv;
1342
1343 while ((srv = GetFirstAvailable(hlp)) && (r = Dequeue(hlp)))
1344 helperDispatch(srv, r);
1345 }
1346
1347 static void
1348 helperStatefulKickQueue(statefulhelper * hlp)
1349 {
1350 helper_stateful_request *r;
1351 helper_stateful_server *srv;
1352
1353 while ((srv = StatefulGetFirstAvailable(hlp)) && (r = StatefulDequeue(hlp)))
1354 helperStatefulDispatch(srv, r);
1355 }
1356
1357 static void
1358 helperStatefulServerDone(helper_stateful_server * srv)
1359 {
1360 if (!srv->flags.shutdown) {
1361 helperStatefulKickQueue(srv->parent);
1362 } else if (!srv->flags.closing && !srv->flags.reserved && !srv->flags.busy) {
1363 int wfd = srv->wfd;
1364 srv->wfd = -1;
1365 if (srv->rfd == wfd)
1366 srv->rfd = -1;
1367 srv->flags.closing=1;
1368 comm_close(wfd);
1369 return;
1370 }
1371 }
1372
1373 static void
1374 helperRequestFree(helper_request * r)
1375 {
1376 cbdataReferenceDone(r->data);
1377 xfree(r->buf);
1378 delete r;
1379 }
1380
1381 static void
1382 helperStatefulRequestFree(helper_stateful_request * r)
1383 {
1384 if (r) {
1385 cbdataReferenceDone(r->data);
1386 xfree(r->buf);
1387 delete r;
1388 }
1389 }
1390
1391 // TODO: should helper_ and helper_stateful_ have a common parent?
1392 static bool
1393 helperStartStats(StoreEntry *sentry, void *hlp, const char *label)
1394 {
1395 if (!hlp) {
1396 if (label)
1397 storeAppendPrintf(sentry, "%s: unavailable\n", label);
1398 return false;
1399 }
1400
1401 if (label)
1402 storeAppendPrintf(sentry, "%s:\n", label);
1403
1404 return true;
1405 }