]>
git.ipfire.org Git - thirdparty/squid.git/blob - src/helper.cc
3 #define HELPER_MAX_ARGS 64
5 static PF helperHandleRead
;
6 static PF helperServerFree
;
7 static void Enqueue(helper
* hlp
, helper_request
*);
8 static helper_request
*Dequeue(helper
* hlp
);
9 static helper_server
*GetFirstAvailable(helper
* hlp
);
10 static void helperDispatch(helper_server
* srv
, helper_request
* r
);
11 static void helperKickQueue(helper
* hlp
);
12 static void helperRequestFree(helper_request
* r
);
16 helperOpenServers(helper
* hlp
)
22 char *args
[HELPER_MAX_ARGS
];
23 char fd_note_buf
[FD_DESC_SZ
];
31 if (hlp
->cmdline
== NULL
)
33 progname
= hlp
->cmdline
->key
;
34 assert(hlp
->servers
.head
== NULL
);
35 assert(hlp
->servers
.tail
== NULL
);
36 if ((s
= strrchr(progname
, '/')))
37 shortname
= xstrdup(s
+ 1);
39 shortname
= xstrdup(progname
);
40 debug(29, 1) ("helperOpenServers: Starting %d '%s' processes\n",
41 hlp
->n_to_start
, shortname
);
42 procname
= xmalloc(strlen(shortname
) + 3);
43 snprintf(procname
, strlen(shortname
) + 3, "(%s)", shortname
);
44 args
[nargs
++] = procname
;
45 for (w
= hlp
->cmdline
->next
; w
&& nargs
< HELPER_MAX_ARGS
; w
= w
->next
)
46 args
[nargs
++] = w
->key
;
48 assert(nargs
<= HELPER_MAX_ARGS
);
49 for (k
= 0; k
< hlp
->n_to_start
; k
++) {
51 x
= ipcCreate(hlp
->ipc_type
,
58 debug(29, 1) ("WARNING: Cannot run '%s' process.\n", progname
);
62 srv
= xcalloc(1, sizeof(*srv
));
63 cbdataAdd(srv
, MEM_NONE
);
68 srv
->buf
= memAllocate(MEM_8K_BUF
);
72 cbdataLock(hlp
); /* lock because of the parent backlink */
73 dlinkAddTail(srv
, &srv
->link
, &hlp
->servers
);
75 snprintf(fd_note_buf
, FD_DESC_SZ
, "%s #%d", shortname
, k
+ 1);
76 fd_note(rfd
, fd_note_buf
);
78 snprintf(fd_note_buf
, FD_DESC_SZ
, "reading %s #%d", shortname
, k
+ 1);
79 fd_note(rfd
, fd_note_buf
);
80 snprintf(fd_note_buf
, FD_DESC_SZ
, "writing %s #%d", shortname
, k
+ 1);
81 fd_note(wfd
, fd_note_buf
);
83 commSetNonBlocking(rfd
);
85 commSetNonBlocking(wfd
);
86 comm_add_close_handler(rfd
, helperServerFree
, srv
);
93 helperSubmit(helper
* hlp
, const char *buf
, HLPCB
* callback
, void *data
)
95 helper_request
*r
= xcalloc(1, sizeof(*r
));
98 debug(29,3)("helperSubmit: hlp == NULL\n");
102 r
->callback
= callback
;
104 r
->buf
= xstrdup(buf
);
106 if ((srv
= GetFirstAvailable(hlp
)))
107 helperDispatch(srv
, r
);
113 helperStats(StoreEntry
* sentry
, helper
* hlp
)
117 storeAppendPrintf(sentry
, "number running: %d of %d\n",
118 hlp
->n_running
, hlp
->n_to_start
);
119 storeAppendPrintf(sentry
, "requests sent: %d\n",
120 hlp
->stats
.requests
);
121 storeAppendPrintf(sentry
, "replies received: %d\n",
123 storeAppendPrintf(sentry
, "queue length: %d\n",
124 hlp
->stats
.queue_size
);
125 storeAppendPrintf(sentry
, "avg service time: %d msec\n",
126 hlp
->stats
.avg_svc_time
);
127 storeAppendPrintf(sentry
, "\n");
128 storeAppendPrintf(sentry
, "%7s\t%7s\t%11s\t%s\t%7s\t%7s\n",
135 for (link
= hlp
->servers
.head
; link
; link
= link
->next
) {
137 storeAppendPrintf(sentry
, "%7d\t%7d\t%11d\t%c%c%c%c\t%7.3f\t%7d\n",
141 srv
->flags
.alive
? 'A' : ' ',
142 srv
->flags
.busy
? 'B' : ' ',
143 srv
->flags
.closing
? 'C' : ' ',
144 srv
->flags
.shutdown
? 'S' : ' ',
145 0.001 * tvSubMsec(srv
->dispatch_time
, current_time
),
148 storeAppendPrintf(sentry
, "\nFlags key:\n\n");
149 storeAppendPrintf(sentry
, " A = ALIVE\n");
150 storeAppendPrintf(sentry
, " B = BUSY\n");
151 storeAppendPrintf(sentry
, " C = CLOSING\n");
152 storeAppendPrintf(sentry
, " S = SHUTDOWN\n");
156 helperShutdown(helper
* hlp
)
160 for (link
= hlp
->servers
.head
; link
; link
= link
->next
) {
162 if (!srv
->flags
.alive
) {
163 debug(34, 3) ("helperShutdown: %s #%d is NOT ALIVE.\n",
164 hlp
->id_name
, srv
->index
+ 1);
167 srv
->flags
.shutdown
= 1; /* request it to shut itself down */
168 if (srv
->flags
.busy
) {
169 debug(34, 3) ("helperShutdown: %s #%d is BUSY.\n",
170 hlp
->id_name
, srv
->index
+ 1);
173 if (srv
->flags
.closing
) {
174 debug(34, 3) ("helperShutdown: %s #%d is CLOSING.\n",
175 hlp
->id_name
, srv
->index
+ 1);
178 srv
->flags
.closing
= 1;
179 comm_close(srv
->wfd
);
184 helperCreate(const char *name
)
186 helper
*hlp
= xcalloc(1, sizeof(*hlp
));
187 cbdataAdd(hlp
, MEM_NONE
);
193 helperFree(helper
* hlp
)
195 /* note, don't free hlp->name, it probably points to static memory */
197 debug(29, 0) ("WARNING: freeing %s helper with %d requests queued\n",
198 hlp
->id_name
, hlp
->stats
.queue_size
);
202 /* ====================================================================== */
203 /* LOCAL FUNCTIONS */
204 /* ====================================================================== */
207 helperServerFree(int fd
, void *data
)
209 helper_server
*srv
= data
;
210 helper
*hlp
= srv
->parent
;
211 assert(srv
->rfd
== fd
);
213 memFree(MEM_8K_BUF
, srv
->buf
);
216 if (srv
->wfd
!= srv
->rfd
)
217 comm_close(srv
->wfd
);
218 dlinkDelete(&srv
->link
, &hlp
->servers
);
220 assert(hlp
->n_running
>= 0);
221 if (!srv
->flags
.shutdown
) {
222 debug(34, 0) ("WARNING: %s #%d (FD %d) exited\n",
223 hlp
->id_name
, srv
->index
+ 1, fd
);
224 assert(hlp
->n_running
>= hlp
->n_to_start
/ 2);
225 if (hlp
->n_running
< hlp
->n_to_start
/ 2)
226 fatalf("Too few %s processes are running", hlp
->id_name
);
228 cbdataUnlock(srv
->parent
);
233 helperHandleRead(int fd
, void *data
)
237 helper_server
*srv
= data
;
239 helper
*hlp
= srv
->parent
;
240 assert(fd
== srv
->rfd
);
241 assert(cbdataValid(data
));
242 Counter
.syscalls
.sock
.reads
++;
243 len
= read(fd
, srv
->buf
+ srv
->offset
, srv
->buf_sz
- srv
->offset
);
244 fd_bytes(fd
, len
, FD_READ
);
245 debug(29, 5) ("helperHandleRead: %d bytes from %s #%d.\n",
246 len
, hlp
->id_name
, srv
->index
+ 1);
249 debug(50, 1) ("helperHandleRead: FD %d read: %s\n", fd
, xstrerror());
254 srv
->buf
[srv
->offset
] = '\0';
257 /* someone spoke without being spoken to */
258 debug(29, 1) ("helperHandleRead: unexpected read from %s #%d, %d bytes\n",
259 hlp
->id_name
, srv
->index
+ 1, len
);
261 } else if ((t
= strchr(srv
->buf
, '\n'))) {
262 /* end of reply found */
263 debug(29, 3) ("helperHandleRead: end of reply found\n");
265 if (cbdataValid(r
->data
))
266 r
->callback(r
->data
, srv
->buf
);
269 helperRequestFree(r
);
270 hlp
->stats
.replies
++;
271 hlp
->stats
.avg_svc_time
=
272 intAverage(hlp
->stats
.avg_svc_time
,
273 tvSubMsec(srv
->dispatch_time
, current_time
),
274 hlp
->stats
.replies
, REDIRECT_AV_FACTOR
);
275 if (srv
->flags
.shutdown
)
276 comm_close(srv
->wfd
);
278 commSetSelect(srv
->rfd
, COMM_SELECT_READ
, helperHandleRead
, srv
, 0);
280 helperKickQueue(hlp
);
284 Enqueue(helper
* hlp
, helper_request
* r
)
286 dlink_node
*link
= xcalloc(1, sizeof(*link
));
287 dlinkAddTail(r
, link
, &hlp
->queue
);
288 hlp
->stats
.queue_size
++;
289 if (hlp
->stats
.queue_size
< hlp
->n_running
)
291 if (squid_curtime
- hlp
->last_queue_warn
< 600)
293 if (shutting_down
|| reconfiguring
)
295 hlp
->last_queue_warn
= squid_curtime
;
296 debug(14, 0) ("WARNING: All %s processes are busy.\n", hlp
->id_name
);
297 debug(14, 0) ("WARNING: %d pending requests queued\n", hlp
->stats
.queue_size
);
298 if (hlp
->stats
.queue_size
> hlp
->n_running
* 2)
299 fatalf("Too many queued %s requests", hlp
->id_name
);
300 debug(14, 1) ("Consider increasing the number of %s processes in your config file.\n", hlp
->id_name
);
303 static helper_request
*
304 Dequeue(helper
* hlp
)
307 helper_request
*r
= NULL
;
308 if ((link
= hlp
->queue
.head
)) {
310 dlinkDelete(link
, &hlp
->queue
);
312 hlp
->stats
.queue_size
--;
317 static helper_server
*
318 GetFirstAvailable(helper
* hlp
)
321 helper_server
*srv
= NULL
;
322 if (hlp
->n_running
== 0)
324 for (n
= hlp
->servers
.head
; n
!= NULL
; n
= n
->next
) {
328 if (!srv
->flags
.alive
)
336 helperDispatch(helper_server
* srv
, helper_request
* r
)
338 helper
*hlp
= srv
->parent
;
339 if (!cbdataValid(r
->data
)) {
340 debug(29, 1) ("helperDispatch: invalid callback data\n");
341 helperRequestFree(r
);
344 assert(!srv
->flags
.busy
);
347 srv
->dispatch_time
= current_time
;
352 NULL
, /* Handler-data */
354 commSetSelect(srv
->rfd
,
358 debug(29, 5) ("helperDispatch: Request sent to %s #%d, %d bytes\n",
359 hlp
->id_name
, srv
->index
+ 1, strlen(r
->buf
));
361 hlp
->stats
.requests
++;
365 helperKickQueue(helper
* hlp
)
369 while ((srv
= GetFirstAvailable(hlp
)) && (r
= Dequeue(hlp
)))
370 helperDispatch(srv
, r
);
374 helperRequestFree(helper_request
* r
)
376 cbdataUnlock(r
->data
);