]> git.ipfire.org Git - thirdparty/squid.git/blob - src/helper.cc
people complain about negative times in stats output:
[thirdparty/squid.git] / src / helper.cc
1 #include "squid.h"
2
3 #define HELPER_MAX_ARGS 64
4
5 static PF helperHandleRead;
6 static PF helperServerFree;
7 static void Enqueue(helper * hlp, helper_request *);
8 static helper_request *Dequeue(helper * hlp);
9 static helper_server *GetFirstAvailable(helper * hlp);
10 static void helperDispatch(helper_server * srv, helper_request * r);
11 static void helperKickQueue(helper * hlp);
12 static void helperRequestFree(helper_request * r);
13
14
15 void
16 helperOpenServers(helper * hlp)
17 {
18 char *s;
19 char *progname;
20 char *shortname;
21 char *procname;
22 char *args[HELPER_MAX_ARGS];
23 char fd_note_buf[FD_DESC_SZ];
24 helper_server *srv;
25 int nargs = 0;
26 int k;
27 int x;
28 int rfd;
29 int wfd;
30 wordlist *w;
31 if (hlp->cmdline == NULL)
32 return;
33 progname = hlp->cmdline->key;
34 if ((s = strrchr(progname, '/')))
35 shortname = xstrdup(s + 1);
36 else
37 shortname = xstrdup(progname);
38 debug(29, 1) ("helperOpenServers: Starting %d '%s' processes\n",
39 hlp->n_to_start, shortname);
40 procname = xmalloc(strlen(shortname) + 3);
41 snprintf(procname, strlen(shortname) + 3, "(%s)", shortname);
42 args[nargs++] = procname;
43 for (w = hlp->cmdline->next; w && nargs < HELPER_MAX_ARGS; w = w->next)
44 args[nargs++] = w->key;
45 args[nargs++] = NULL;
46 assert(nargs <= HELPER_MAX_ARGS);
47 for (k = 0; k < hlp->n_to_start; k++) {
48 getCurrentTime();
49 rfd = wfd = -1;
50 x = ipcCreate(hlp->ipc_type,
51 progname,
52 args,
53 shortname,
54 &rfd,
55 &wfd);
56 if (x < 0) {
57 debug(29, 1) ("WARNING: Cannot run '%s' process.\n", progname);
58 continue;
59 }
60 hlp->n_running++;
61 srv = memAllocate(MEM_HELPER_SERVER);
62 cbdataAdd(srv, memFree, MEM_HELPER_SERVER);
63 srv->flags.alive = 1;
64 srv->index = k;
65 srv->rfd = rfd;
66 srv->wfd = wfd;
67 srv->buf = memAllocate(MEM_8K_BUF);
68 srv->buf_sz = 8192;
69 srv->offset = 0;
70 srv->parent = hlp;
71 cbdataLock(hlp); /* lock because of the parent backlink */
72 dlinkAddTail(srv, &srv->link, &hlp->servers);
73 if (rfd == wfd) {
74 snprintf(fd_note_buf, FD_DESC_SZ, "%s #%d", shortname, k + 1);
75 fd_note(rfd, fd_note_buf);
76 } else {
77 snprintf(fd_note_buf, FD_DESC_SZ, "reading %s #%d", shortname, k + 1);
78 fd_note(rfd, fd_note_buf);
79 snprintf(fd_note_buf, FD_DESC_SZ, "writing %s #%d", shortname, k + 1);
80 fd_note(wfd, fd_note_buf);
81 }
82 commSetNonBlocking(rfd);
83 if (wfd != rfd)
84 commSetNonBlocking(wfd);
85 comm_add_close_handler(rfd, helperServerFree, srv);
86 }
87 safe_free(shortname);
88 safe_free(procname);
89 helperKickQueue(hlp);
90 }
91
92 void
93 helperSubmit(helper * hlp, const char *buf, HLPCB * callback, void *data)
94 {
95 helper_request *r = memAllocate(MEM_HELPER_REQUEST);
96 helper_server *srv;
97 if (hlp == NULL) {
98 debug(29, 3) ("helperSubmit: hlp == NULL\n");
99 callback(data, NULL);
100 return;
101 }
102 r->callback = callback;
103 r->data = data;
104 r->buf = xstrdup(buf);
105 cbdataLock(r->data);
106 if ((srv = GetFirstAvailable(hlp)))
107 helperDispatch(srv, r);
108 else
109 Enqueue(hlp, r);
110 }
111
112 void
113 helperStats(StoreEntry * sentry, helper * hlp)
114 {
115 helper_server *srv;
116 dlink_node *link;
117 double tt;
118 storeAppendPrintf(sentry, "number running: %d of %d\n",
119 hlp->n_running, hlp->n_to_start);
120 storeAppendPrintf(sentry, "requests sent: %d\n",
121 hlp->stats.requests);
122 storeAppendPrintf(sentry, "replies received: %d\n",
123 hlp->stats.replies);
124 storeAppendPrintf(sentry, "queue length: %d\n",
125 hlp->stats.queue_size);
126 storeAppendPrintf(sentry, "avg service time: %d msec\n",
127 hlp->stats.avg_svc_time);
128 storeAppendPrintf(sentry, "\n");
129 storeAppendPrintf(sentry, "%7s\t%7s\t%11s\t%s\t%7s\t%7s\n",
130 "#",
131 "FD",
132 "# Requests",
133 "Flags",
134 "Time",
135 "Offset");
136 for (link = hlp->servers.head; link; link = link->next) {
137 srv = link->data;
138 tt = 0.001 * tvSubMsec(srv->dispatch_time, current_time);
139 storeAppendPrintf(sentry, "%7d\t%7d\t%11d\t%c%c%c%c\t%7.3f\t%7d\n",
140 srv->index + 1,
141 srv->rfd,
142 srv->stats.uses,
143 srv->flags.alive ? 'A' : ' ',
144 srv->flags.busy ? 'B' : ' ',
145 srv->flags.closing ? 'C' : ' ',
146 srv->flags.shutdown ? 'S' : ' ',
147 tt < 0.0 ? 0.0 : tt,
148 (int) srv->offset);
149 }
150 storeAppendPrintf(sentry, "\nFlags key:\n\n");
151 storeAppendPrintf(sentry, " A = ALIVE\n");
152 storeAppendPrintf(sentry, " B = BUSY\n");
153 storeAppendPrintf(sentry, " C = CLOSING\n");
154 storeAppendPrintf(sentry, " S = SHUTDOWN\n");
155 }
156
157 void
158 helperShutdown(helper * hlp)
159 {
160 dlink_node *link = hlp->servers.head;
161 helper_server *srv;
162 while (link) {
163 srv = link->data;
164 link = link->next;
165 if (!srv->flags.alive) {
166 debug(34, 3) ("helperShutdown: %s #%d is NOT ALIVE.\n",
167 hlp->id_name, srv->index + 1);
168 continue;
169 }
170 srv->flags.shutdown = 1; /* request it to shut itself down */
171 if (srv->flags.busy) {
172 debug(34, 3) ("helperShutdown: %s #%d is BUSY.\n",
173 hlp->id_name, srv->index + 1);
174 continue;
175 }
176 if (srv->flags.closing) {
177 debug(34, 3) ("helperShutdown: %s #%d is CLOSING.\n",
178 hlp->id_name, srv->index + 1);
179 continue;
180 }
181 srv->flags.closing = 1;
182 comm_close(srv->rfd);
183 }
184 }
185
186 helper *
187 helperCreate(const char *name)
188 {
189 helper *hlp = memAllocate(MEM_HELPER);
190 cbdataAdd(hlp, memFree, MEM_HELPER);
191 hlp->id_name = name;
192 return hlp;
193 }
194
195 void
196 helperFree(helper * hlp)
197 {
198 /* note, don't free hlp->name, it probably points to static memory */
199 if (hlp->queue.head)
200 debug(29, 0) ("WARNING: freeing %s helper with %d requests queued\n",
201 hlp->id_name, hlp->stats.queue_size);
202 cbdataFree(hlp);
203 }
204
205 /* ====================================================================== */
206 /* LOCAL FUNCTIONS */
207 /* ====================================================================== */
208
209 static void
210 helperServerFree(int fd, void *data)
211 {
212 helper_server *srv = data;
213 helper *hlp = srv->parent;
214 helper_request *r;
215 assert(srv->rfd == fd);
216 if (srv->buf) {
217 memFree(srv->buf, MEM_8K_BUF);
218 srv->buf = NULL;
219 }
220 if ((r = srv->request)) {
221 if (cbdataValid(r->data))
222 r->callback(r->data, srv->buf);
223 helperRequestFree(r);
224 srv->request = NULL;
225 }
226 if (srv->wfd != srv->rfd)
227 comm_close(srv->wfd);
228 dlinkDelete(&srv->link, &hlp->servers);
229 hlp->n_running--;
230 assert(hlp->n_running >= 0);
231 if (!srv->flags.shutdown) {
232 debug(34, 0) ("WARNING: %s #%d (FD %d) exited\n",
233 hlp->id_name, srv->index + 1, fd);
234 assert(hlp->n_running >= hlp->n_to_start / 2);
235 if (hlp->n_running < hlp->n_to_start / 2)
236 fatalf("Too few %s processes are running", hlp->id_name);
237 }
238 cbdataUnlock(srv->parent);
239 cbdataFree(srv);
240 }
241
242 static void
243 helperHandleRead(int fd, void *data)
244 {
245 int len;
246 char *t = NULL;
247 helper_server *srv = data;
248 helper_request *r;
249 helper *hlp = srv->parent;
250 assert(fd == srv->rfd);
251 assert(cbdataValid(data));
252 Counter.syscalls.sock.reads++;
253 len = read(fd, srv->buf + srv->offset, srv->buf_sz - srv->offset);
254 fd_bytes(fd, len, FD_READ);
255 debug(29, 5) ("helperHandleRead: %d bytes from %s #%d.\n",
256 len, hlp->id_name, srv->index + 1);
257 if (len <= 0) {
258 if (len < 0)
259 debug(50, 1) ("helperHandleRead: FD %d read: %s\n", fd, xstrerror());
260 comm_close(fd);
261 return;
262 }
263 srv->offset += len;
264 srv->buf[srv->offset] = '\0';
265 r = srv->request;
266 if (r == NULL) {
267 /* someone spoke without being spoken to */
268 debug(29, 1) ("helperHandleRead: unexpected read from %s #%d, %d bytes\n",
269 hlp->id_name, srv->index + 1, len);
270 srv->offset = 0;
271 } else if ((t = strchr(srv->buf, '\n'))) {
272 /* end of reply found */
273 debug(29, 3) ("helperHandleRead: end of reply found\n");
274 *t = '\0';
275 if (cbdataValid(r->data))
276 r->callback(r->data, srv->buf);
277 srv->flags.busy = 0;
278 srv->offset = 0;
279 helperRequestFree(r);
280 srv->request = NULL;
281 hlp->stats.replies++;
282 hlp->stats.avg_svc_time =
283 intAverage(hlp->stats.avg_svc_time,
284 tvSubMsec(srv->dispatch_time, current_time),
285 hlp->stats.replies, REDIRECT_AV_FACTOR);
286 if (srv->flags.shutdown)
287 comm_close(srv->wfd);
288 else
289 helperKickQueue(hlp);
290 } else {
291 commSetSelect(srv->rfd, COMM_SELECT_READ, helperHandleRead, srv, 0);
292 }
293 }
294
295 static void
296 Enqueue(helper * hlp, helper_request * r)
297 {
298 dlink_node *link = memAllocate(MEM_DLINK_NODE);
299 dlinkAddTail(r, link, &hlp->queue);
300 hlp->stats.queue_size++;
301 if (hlp->stats.queue_size < hlp->n_running)
302 return;
303 if (squid_curtime - hlp->last_queue_warn < 600)
304 return;
305 if (shutting_down || reconfiguring)
306 return;
307 hlp->last_queue_warn = squid_curtime;
308 debug(14, 0) ("WARNING: All %s processes are busy.\n", hlp->id_name);
309 debug(14, 0) ("WARNING: %d pending requests queued\n", hlp->stats.queue_size);
310 if (hlp->stats.queue_size > hlp->n_running * 2)
311 fatalf("Too many queued %s requests", hlp->id_name);
312 debug(14, 1) ("Consider increasing the number of %s processes in your config file.\n", hlp->id_name);
313 }
314
315 static helper_request *
316 Dequeue(helper * hlp)
317 {
318 dlink_node *link;
319 helper_request *r = NULL;
320 if ((link = hlp->queue.head)) {
321 r = link->data;
322 dlinkDelete(link, &hlp->queue);
323 memFree(link, MEM_DLINK_NODE);
324 hlp->stats.queue_size--;
325 }
326 return r;
327 }
328
329 static helper_server *
330 GetFirstAvailable(helper * hlp)
331 {
332 dlink_node *n;
333 helper_server *srv = NULL;
334 if (hlp->n_running == 0)
335 return NULL;
336 for (n = hlp->servers.head; n != NULL; n = n->next) {
337 srv = n->data;
338 if (srv->flags.busy)
339 continue;
340 if (!srv->flags.alive)
341 continue;
342 return srv;
343 }
344 return NULL;
345 }
346
347 static void
348 helperDispatch(helper_server * srv, helper_request * r)
349 {
350 helper *hlp = srv->parent;
351 if (!cbdataValid(r->data)) {
352 debug(29, 1) ("helperDispatch: invalid callback data\n");
353 helperRequestFree(r);
354 return;
355 }
356 assert(!srv->flags.busy);
357 srv->flags.busy = 1;
358 srv->request = r;
359 srv->dispatch_time = current_time;
360 comm_write(srv->wfd,
361 r->buf,
362 strlen(r->buf),
363 NULL, /* Handler */
364 NULL, /* Handler-data */
365 NULL); /* free */
366 commSetSelect(srv->rfd,
367 COMM_SELECT_READ,
368 helperHandleRead,
369 srv, 0);
370 debug(29, 5) ("helperDispatch: Request sent to %s #%d, %d bytes\n",
371 hlp->id_name, srv->index + 1, strlen(r->buf));
372 srv->stats.uses++;
373 hlp->stats.requests++;
374 }
375
376 static void
377 helperKickQueue(helper * hlp)
378 {
379 helper_request *r;
380 helper_server *srv;
381 while ((srv = GetFirstAvailable(hlp)) && (r = Dequeue(hlp)))
382 helperDispatch(srv, r);
383 }
384
385 static void
386 helperRequestFree(helper_request * r)
387 {
388 cbdataUnlock(r->data);
389 xfree(r->buf);
390 memFree(r, MEM_HELPER_REQUEST);
391 }