]>
git.ipfire.org Git - thirdparty/squid.git/blob - src/helper.cc
3 #define HELPER_MAX_ARGS 64
5 static PF helperHandleRead
;
6 static PF helperServerFree
;
7 static void Enqueue(helper
* hlp
, helper_request
*);
8 static helper_request
*Dequeue(helper
* hlp
);
9 static helper_server
*GetFirstAvailable(helper
* hlp
);
10 static void helperDispatch(helper_server
* srv
, helper_request
* r
);
11 static void helperKickQueue(helper
* hlp
);
12 static void helperRequestFree(helper_request
* r
);
16 helperOpenServers(helper
* hlp
)
22 char *args
[HELPER_MAX_ARGS
];
23 char fd_note_buf
[FD_DESC_SZ
];
31 if (hlp
->cmdline
== NULL
)
33 progname
= hlp
->cmdline
->key
;
34 if ((s
= strrchr(progname
, '/')))
35 shortname
= xstrdup(s
+ 1);
37 shortname
= xstrdup(progname
);
38 debug(29, 1) ("helperOpenServers: Starting %d '%s' processes\n",
39 hlp
->n_to_start
, shortname
);
40 procname
= xmalloc(strlen(shortname
) + 3);
41 snprintf(procname
, strlen(shortname
) + 3, "(%s)", shortname
);
42 args
[nargs
++] = procname
;
43 for (w
= hlp
->cmdline
->next
; w
&& nargs
< HELPER_MAX_ARGS
; w
= w
->next
)
44 args
[nargs
++] = w
->key
;
46 assert(nargs
<= HELPER_MAX_ARGS
);
47 for (k
= 0; k
< hlp
->n_to_start
; k
++) {
50 x
= ipcCreate(hlp
->ipc_type
,
57 debug(29, 1) ("WARNING: Cannot run '%s' process.\n", progname
);
61 srv
= memAllocate(MEM_HELPER_SERVER
);
62 cbdataAdd(srv
, memFree
, MEM_HELPER_SERVER
);
67 srv
->buf
= memAllocate(MEM_8K_BUF
);
71 cbdataLock(hlp
); /* lock because of the parent backlink */
72 dlinkAddTail(srv
, &srv
->link
, &hlp
->servers
);
74 snprintf(fd_note_buf
, FD_DESC_SZ
, "%s #%d", shortname
, k
+ 1);
75 fd_note(rfd
, fd_note_buf
);
77 snprintf(fd_note_buf
, FD_DESC_SZ
, "reading %s #%d", shortname
, k
+ 1);
78 fd_note(rfd
, fd_note_buf
);
79 snprintf(fd_note_buf
, FD_DESC_SZ
, "writing %s #%d", shortname
, k
+ 1);
80 fd_note(wfd
, fd_note_buf
);
82 commSetNonBlocking(rfd
);
84 commSetNonBlocking(wfd
);
85 comm_add_close_handler(rfd
, helperServerFree
, srv
);
93 helperSubmit(helper
* hlp
, const char *buf
, HLPCB
* callback
, void *data
)
95 helper_request
*r
= memAllocate(MEM_HELPER_REQUEST
);
98 debug(29, 3) ("helperSubmit: hlp == NULL\n");
102 r
->callback
= callback
;
104 r
->buf
= xstrdup(buf
);
106 if ((srv
= GetFirstAvailable(hlp
)))
107 helperDispatch(srv
, r
);
113 helperStats(StoreEntry
* sentry
, helper
* hlp
)
118 storeAppendPrintf(sentry
, "number running: %d of %d\n",
119 hlp
->n_running
, hlp
->n_to_start
);
120 storeAppendPrintf(sentry
, "requests sent: %d\n",
121 hlp
->stats
.requests
);
122 storeAppendPrintf(sentry
, "replies received: %d\n",
124 storeAppendPrintf(sentry
, "queue length: %d\n",
125 hlp
->stats
.queue_size
);
126 storeAppendPrintf(sentry
, "avg service time: %d msec\n",
127 hlp
->stats
.avg_svc_time
);
128 storeAppendPrintf(sentry
, "\n");
129 storeAppendPrintf(sentry
, "%7s\t%7s\t%11s\t%s\t%7s\t%7s\n",
136 for (link
= hlp
->servers
.head
; link
; link
= link
->next
) {
138 tt
= 0.001 * tvSubMsec(srv
->dispatch_time
, current_time
);
139 storeAppendPrintf(sentry
, "%7d\t%7d\t%11d\t%c%c%c%c\t%7.3f\t%7d\n",
143 srv
->flags
.alive
? 'A' : ' ',
144 srv
->flags
.busy
? 'B' : ' ',
145 srv
->flags
.closing
? 'C' : ' ',
146 srv
->flags
.shutdown
? 'S' : ' ',
150 storeAppendPrintf(sentry
, "\nFlags key:\n\n");
151 storeAppendPrintf(sentry
, " A = ALIVE\n");
152 storeAppendPrintf(sentry
, " B = BUSY\n");
153 storeAppendPrintf(sentry
, " C = CLOSING\n");
154 storeAppendPrintf(sentry
, " S = SHUTDOWN\n");
158 helperShutdown(helper
* hlp
)
160 dlink_node
*link
= hlp
->servers
.head
;
165 if (!srv
->flags
.alive
) {
166 debug(34, 3) ("helperShutdown: %s #%d is NOT ALIVE.\n",
167 hlp
->id_name
, srv
->index
+ 1);
170 srv
->flags
.shutdown
= 1; /* request it to shut itself down */
171 if (srv
->flags
.busy
) {
172 debug(34, 3) ("helperShutdown: %s #%d is BUSY.\n",
173 hlp
->id_name
, srv
->index
+ 1);
176 if (srv
->flags
.closing
) {
177 debug(34, 3) ("helperShutdown: %s #%d is CLOSING.\n",
178 hlp
->id_name
, srv
->index
+ 1);
181 srv
->flags
.closing
= 1;
182 comm_close(srv
->rfd
);
187 helperCreate(const char *name
)
189 helper
*hlp
= memAllocate(MEM_HELPER
);
190 cbdataAdd(hlp
, memFree
, MEM_HELPER
);
196 helperFree(helper
* hlp
)
198 /* note, don't free hlp->name, it probably points to static memory */
200 debug(29, 0) ("WARNING: freeing %s helper with %d requests queued\n",
201 hlp
->id_name
, hlp
->stats
.queue_size
);
205 /* ====================================================================== */
206 /* LOCAL FUNCTIONS */
207 /* ====================================================================== */
210 helperServerFree(int fd
, void *data
)
212 helper_server
*srv
= data
;
213 helper
*hlp
= srv
->parent
;
215 assert(srv
->rfd
== fd
);
217 memFree(srv
->buf
, MEM_8K_BUF
);
220 if ((r
= srv
->request
)) {
221 if (cbdataValid(r
->data
))
222 r
->callback(r
->data
, srv
->buf
);
223 helperRequestFree(r
);
226 if (srv
->wfd
!= srv
->rfd
)
227 comm_close(srv
->wfd
);
228 dlinkDelete(&srv
->link
, &hlp
->servers
);
230 assert(hlp
->n_running
>= 0);
231 if (!srv
->flags
.shutdown
) {
232 debug(34, 0) ("WARNING: %s #%d (FD %d) exited\n",
233 hlp
->id_name
, srv
->index
+ 1, fd
);
234 assert(hlp
->n_running
>= hlp
->n_to_start
/ 2);
235 if (hlp
->n_running
< hlp
->n_to_start
/ 2)
236 fatalf("Too few %s processes are running", hlp
->id_name
);
238 cbdataUnlock(srv
->parent
);
243 helperHandleRead(int fd
, void *data
)
247 helper_server
*srv
= data
;
249 helper
*hlp
= srv
->parent
;
250 assert(fd
== srv
->rfd
);
251 assert(cbdataValid(data
));
252 Counter
.syscalls
.sock
.reads
++;
253 len
= read(fd
, srv
->buf
+ srv
->offset
, srv
->buf_sz
- srv
->offset
);
254 fd_bytes(fd
, len
, FD_READ
);
255 debug(29, 5) ("helperHandleRead: %d bytes from %s #%d.\n",
256 len
, hlp
->id_name
, srv
->index
+ 1);
259 debug(50, 1) ("helperHandleRead: FD %d read: %s\n", fd
, xstrerror());
264 srv
->buf
[srv
->offset
] = '\0';
267 /* someone spoke without being spoken to */
268 debug(29, 1) ("helperHandleRead: unexpected read from %s #%d, %d bytes\n",
269 hlp
->id_name
, srv
->index
+ 1, len
);
271 } else if ((t
= strchr(srv
->buf
, '\n'))) {
272 /* end of reply found */
273 debug(29, 3) ("helperHandleRead: end of reply found\n");
275 if (cbdataValid(r
->data
))
276 r
->callback(r
->data
, srv
->buf
);
279 helperRequestFree(r
);
281 hlp
->stats
.replies
++;
282 hlp
->stats
.avg_svc_time
=
283 intAverage(hlp
->stats
.avg_svc_time
,
284 tvSubMsec(srv
->dispatch_time
, current_time
),
285 hlp
->stats
.replies
, REDIRECT_AV_FACTOR
);
286 if (srv
->flags
.shutdown
)
287 comm_close(srv
->wfd
);
289 helperKickQueue(hlp
);
291 commSetSelect(srv
->rfd
, COMM_SELECT_READ
, helperHandleRead
, srv
, 0);
296 Enqueue(helper
* hlp
, helper_request
* r
)
298 dlink_node
*link
= memAllocate(MEM_DLINK_NODE
);
299 dlinkAddTail(r
, link
, &hlp
->queue
);
300 hlp
->stats
.queue_size
++;
301 if (hlp
->stats
.queue_size
< hlp
->n_running
)
303 if (squid_curtime
- hlp
->last_queue_warn
< 600)
305 if (shutting_down
|| reconfiguring
)
307 hlp
->last_queue_warn
= squid_curtime
;
308 debug(14, 0) ("WARNING: All %s processes are busy.\n", hlp
->id_name
);
309 debug(14, 0) ("WARNING: %d pending requests queued\n", hlp
->stats
.queue_size
);
310 if (hlp
->stats
.queue_size
> hlp
->n_running
* 2)
311 fatalf("Too many queued %s requests", hlp
->id_name
);
312 debug(14, 1) ("Consider increasing the number of %s processes in your config file.\n", hlp
->id_name
);
315 static helper_request
*
316 Dequeue(helper
* hlp
)
319 helper_request
*r
= NULL
;
320 if ((link
= hlp
->queue
.head
)) {
322 dlinkDelete(link
, &hlp
->queue
);
323 memFree(link
, MEM_DLINK_NODE
);
324 hlp
->stats
.queue_size
--;
329 static helper_server
*
330 GetFirstAvailable(helper
* hlp
)
333 helper_server
*srv
= NULL
;
334 if (hlp
->n_running
== 0)
336 for (n
= hlp
->servers
.head
; n
!= NULL
; n
= n
->next
) {
340 if (!srv
->flags
.alive
)
348 helperDispatch(helper_server
* srv
, helper_request
* r
)
350 helper
*hlp
= srv
->parent
;
351 if (!cbdataValid(r
->data
)) {
352 debug(29, 1) ("helperDispatch: invalid callback data\n");
353 helperRequestFree(r
);
356 assert(!srv
->flags
.busy
);
359 srv
->dispatch_time
= current_time
;
364 NULL
, /* Handler-data */
366 commSetSelect(srv
->rfd
,
370 debug(29, 5) ("helperDispatch: Request sent to %s #%d, %d bytes\n",
371 hlp
->id_name
, srv
->index
+ 1, strlen(r
->buf
));
373 hlp
->stats
.requests
++;
377 helperKickQueue(helper
* hlp
)
381 while ((srv
= GetFirstAvailable(hlp
)) && (r
= Dequeue(hlp
)))
382 helperDispatch(srv
, r
);
386 helperRequestFree(helper_request
* r
)
388 cbdataUnlock(r
->data
);
390 memFree(r
, MEM_HELPER_REQUEST
);