]>
git.ipfire.org Git - thirdparty/squid.git/blob - src/helper.cc
3 #define HELPER_MAX_ARGS 64
5 static PF helperHandleRead
;
6 static PF helperServerFree
;
7 static void Enqueue(helper
* hlp
, helper_request
*);
8 static helper_request
*Dequeue(helper
* hlp
);
9 static helper_server
*GetFirstAvailable(helper
* hlp
);
10 static void helperDispatch(helper_server
* srv
, helper_request
* r
);
11 static void helperKickQueue(helper
* hlp
);
12 static void helperRequestFree(helper_request
* r
);
16 helperOpenServers(helper
* hlp
)
22 char *args
[HELPER_MAX_ARGS
];
23 char fd_note_buf
[FD_DESC_SZ
];
31 if (hlp
->cmdline
== NULL
)
33 progname
= hlp
->cmdline
->key
;
34 if ((s
= strrchr(progname
, '/')))
35 shortname
= xstrdup(s
+ 1);
37 shortname
= xstrdup(progname
);
38 debug(29, 1) ("helperOpenServers: Starting %d '%s' processes\n",
39 hlp
->n_to_start
, shortname
);
40 procname
= xmalloc(strlen(shortname
) + 3);
41 snprintf(procname
, strlen(shortname
) + 3, "(%s)", shortname
);
42 args
[nargs
++] = procname
;
43 for (w
= hlp
->cmdline
->next
; w
&& nargs
< HELPER_MAX_ARGS
; w
= w
->next
)
44 args
[nargs
++] = w
->key
;
46 assert(nargs
<= HELPER_MAX_ARGS
);
47 for (k
= 0; k
< hlp
->n_to_start
; k
++) {
49 x
= ipcCreate(hlp
->ipc_type
,
56 debug(29, 1) ("WARNING: Cannot run '%s' process.\n", progname
);
60 srv
= xcalloc(1, sizeof(*srv
));
61 cbdataAdd(srv
, MEM_NONE
);
66 srv
->buf
= memAllocate(MEM_8K_BUF
);
70 cbdataLock(hlp
); /* lock because of the parent backlink */
71 dlinkAddTail(srv
, &srv
->link
, &hlp
->servers
);
73 snprintf(fd_note_buf
, FD_DESC_SZ
, "%s #%d", shortname
, k
+ 1);
74 fd_note(rfd
, fd_note_buf
);
76 snprintf(fd_note_buf
, FD_DESC_SZ
, "reading %s #%d", shortname
, k
+ 1);
77 fd_note(rfd
, fd_note_buf
);
78 snprintf(fd_note_buf
, FD_DESC_SZ
, "writing %s #%d", shortname
, k
+ 1);
79 fd_note(wfd
, fd_note_buf
);
81 commSetNonBlocking(rfd
);
83 commSetNonBlocking(wfd
);
84 comm_add_close_handler(rfd
, helperServerFree
, srv
);
92 helperSubmit(helper
* hlp
, const char *buf
, HLPCB
* callback
, void *data
)
94 helper_request
*r
= xcalloc(1, sizeof(*r
));
97 debug(29,3)("helperSubmit: hlp == NULL\n");
101 r
->callback
= callback
;
103 r
->buf
= xstrdup(buf
);
105 if ((srv
= GetFirstAvailable(hlp
)))
106 helperDispatch(srv
, r
);
112 helperStats(StoreEntry
* sentry
, helper
* hlp
)
116 storeAppendPrintf(sentry
, "number running: %d of %d\n",
117 hlp
->n_running
, hlp
->n_to_start
);
118 storeAppendPrintf(sentry
, "requests sent: %d\n",
119 hlp
->stats
.requests
);
120 storeAppendPrintf(sentry
, "replies received: %d\n",
122 storeAppendPrintf(sentry
, "queue length: %d\n",
123 hlp
->stats
.queue_size
);
124 storeAppendPrintf(sentry
, "avg service time: %d msec\n",
125 hlp
->stats
.avg_svc_time
);
126 storeAppendPrintf(sentry
, "\n");
127 storeAppendPrintf(sentry
, "%7s\t%7s\t%11s\t%s\t%7s\t%7s\n",
134 for (link
= hlp
->servers
.head
; link
; link
= link
->next
) {
136 storeAppendPrintf(sentry
, "%7d\t%7d\t%11d\t%c%c%c%c\t%7.3f\t%7d\n",
140 srv
->flags
.alive
? 'A' : ' ',
141 srv
->flags
.busy
? 'B' : ' ',
142 srv
->flags
.closing
? 'C' : ' ',
143 srv
->flags
.shutdown
? 'S' : ' ',
144 0.001 * tvSubMsec(srv
->dispatch_time
, current_time
),
147 storeAppendPrintf(sentry
, "\nFlags key:\n\n");
148 storeAppendPrintf(sentry
, " A = ALIVE\n");
149 storeAppendPrintf(sentry
, " B = BUSY\n");
150 storeAppendPrintf(sentry
, " C = CLOSING\n");
151 storeAppendPrintf(sentry
, " S = SHUTDOWN\n");
155 helperShutdown(helper
* hlp
)
159 for (link
= hlp
->servers
.head
; link
; link
= link
->next
) {
161 if (!srv
->flags
.alive
) {
162 debug(34, 3) ("helperShutdown: %s #%d is NOT ALIVE.\n",
163 hlp
->id_name
, srv
->index
+ 1);
166 srv
->flags
.shutdown
= 1; /* request it to shut itself down */
167 if (srv
->flags
.busy
) {
168 debug(34, 3) ("helperShutdown: %s #%d is BUSY.\n",
169 hlp
->id_name
, srv
->index
+ 1);
172 if (srv
->flags
.closing
) {
173 debug(34, 3) ("helperShutdown: %s #%d is CLOSING.\n",
174 hlp
->id_name
, srv
->index
+ 1);
177 srv
->flags
.closing
= 1;
178 comm_close(srv
->wfd
);
183 helperCreate(const char *name
)
185 helper
*hlp
= xcalloc(1, sizeof(*hlp
));
186 cbdataAdd(hlp
, MEM_NONE
);
192 helperFree(helper
* hlp
)
194 /* note, don't free hlp->name, it probably points to static memory */
196 debug(29, 0) ("WARNING: freeing %s helper with %d requests queued\n",
197 hlp
->id_name
, hlp
->stats
.queue_size
);
201 /* ====================================================================== */
202 /* LOCAL FUNCTIONS */
203 /* ====================================================================== */
206 helperServerFree(int fd
, void *data
)
208 helper_server
*srv
= data
;
209 helper
*hlp
= srv
->parent
;
210 assert(srv
->rfd
== fd
);
212 memFree(MEM_8K_BUF
, srv
->buf
);
215 if (srv
->wfd
!= srv
->rfd
)
216 comm_close(srv
->wfd
);
217 dlinkDelete(&srv
->link
, &hlp
->servers
);
219 assert(hlp
->n_running
>= 0);
220 if (!srv
->flags
.shutdown
) {
221 debug(34, 0) ("WARNING: %s #%d (FD %d) exited\n",
222 hlp
->id_name
, srv
->index
+ 1, fd
);
223 assert(hlp
->n_running
>= hlp
->n_to_start
/ 2);
224 if (hlp
->n_running
< hlp
->n_to_start
/ 2)
225 fatalf("Too few %s processes are running", hlp
->id_name
);
227 cbdataUnlock(srv
->parent
);
232 helperHandleRead(int fd
, void *data
)
236 helper_server
*srv
= data
;
238 helper
*hlp
= srv
->parent
;
239 assert(fd
== srv
->rfd
);
240 assert(cbdataValid(data
));
241 Counter
.syscalls
.sock
.reads
++;
242 len
= read(fd
, srv
->buf
+ srv
->offset
, srv
->buf_sz
- srv
->offset
);
243 fd_bytes(fd
, len
, FD_READ
);
244 debug(29, 5) ("helperHandleRead: %d bytes from %s #%d.\n",
245 len
, hlp
->id_name
, srv
->index
+ 1);
248 debug(50, 1) ("helperHandleRead: FD %d read: %s\n", fd
, xstrerror());
253 srv
->buf
[srv
->offset
] = '\0';
256 /* someone spoke without being spoken to */
257 debug(29, 1) ("helperHandleRead: unexpected read from %s #%d, %d bytes\n",
258 hlp
->id_name
, srv
->index
+ 1, len
);
260 } else if ((t
= strchr(srv
->buf
, '\n'))) {
261 /* end of reply found */
262 debug(29, 3) ("helperHandleRead: end of reply found\n");
264 if (cbdataValid(r
->data
))
265 r
->callback(r
->data
, srv
->buf
);
268 helperRequestFree(r
);
269 hlp
->stats
.replies
++;
270 hlp
->stats
.avg_svc_time
=
271 intAverage(hlp
->stats
.avg_svc_time
,
272 tvSubMsec(srv
->dispatch_time
, current_time
),
273 hlp
->stats
.replies
, REDIRECT_AV_FACTOR
);
274 if (srv
->flags
.shutdown
)
275 comm_close(srv
->wfd
);
277 commSetSelect(srv
->rfd
, COMM_SELECT_READ
, helperHandleRead
, srv
, 0);
279 helperKickQueue(hlp
);
283 Enqueue(helper
* hlp
, helper_request
* r
)
285 dlink_node
*link
= xcalloc(1, sizeof(*link
));
286 dlinkAddTail(r
, link
, &hlp
->queue
);
287 hlp
->stats
.queue_size
++;
288 if (hlp
->stats
.queue_size
< hlp
->n_running
)
290 if (squid_curtime
- hlp
->last_queue_warn
< 600)
292 if (shutting_down
|| reconfiguring
)
294 hlp
->last_queue_warn
= squid_curtime
;
295 debug(14, 0) ("WARNING: All %s processes are busy.\n", hlp
->id_name
);
296 debug(14, 0) ("WARNING: %d pending requests queued\n", hlp
->stats
.queue_size
);
297 if (hlp
->stats
.queue_size
> hlp
->n_running
* 2)
298 fatalf("Too many queued %s requests", hlp
->id_name
);
299 debug(14, 1) ("Consider increasing the number of %s processes in your config file.\n", hlp
->id_name
);
302 static helper_request
*
303 Dequeue(helper
* hlp
)
306 helper_request
*r
= NULL
;
307 if ((link
= hlp
->queue
.head
)) {
309 dlinkDelete(link
, &hlp
->queue
);
311 hlp
->stats
.queue_size
--;
316 static helper_server
*
317 GetFirstAvailable(helper
* hlp
)
320 helper_server
*srv
= NULL
;
321 if (hlp
->n_running
== 0)
323 for (n
= hlp
->servers
.head
; n
!= NULL
; n
= n
->next
) {
327 if (!srv
->flags
.alive
)
335 helperDispatch(helper_server
* srv
, helper_request
* r
)
337 helper
*hlp
= srv
->parent
;
338 if (!cbdataValid(r
->data
)) {
339 debug(29, 1) ("helperDispatch: invalid callback data\n");
340 helperRequestFree(r
);
343 assert(!srv
->flags
.busy
);
346 srv
->dispatch_time
= current_time
;
351 NULL
, /* Handler-data */
353 commSetSelect(srv
->rfd
,
357 debug(29, 5) ("helperDispatch: Request sent to %s #%d, %d bytes\n",
358 hlp
->id_name
, srv
->index
+ 1, strlen(r
->buf
));
360 hlp
->stats
.requests
++;
364 helperKickQueue(helper
* hlp
)
368 while ((srv
= GetFirstAvailable(hlp
)) && (r
= Dequeue(hlp
)))
369 helperDispatch(srv
, r
);
373 helperRequestFree(helper_request
* r
)
375 cbdataUnlock(r
->data
);