]> git.ipfire.org Git - thirdparty/squid.git/blob - src/helper.cc
changed helper usage so that during a reconfigure we keep the helper*
[thirdparty/squid.git] / src / helper.cc
1 #include "squid.h"
2
3 #define HELPER_MAX_ARGS 64
4
5 static PF helperHandleRead;
6 static PF helperServerFree;
7 static void Enqueue(helper * hlp, helper_request *);
8 static helper_request *Dequeue(helper * hlp);
9 static helper_server *GetFirstAvailable(helper * hlp);
10 static void helperDispatch(helper_server * srv, helper_request * r);
11 static void helperKickQueue(helper * hlp);
12 static void helperRequestFree(helper_request * r);
13
14
15 void
16 helperOpenServers(helper * hlp)
17 {
18 char *s;
19 char *progname;
20 char *shortname;
21 char *procname;
22 char *args[HELPER_MAX_ARGS];
23 char fd_note_buf[FD_DESC_SZ];
24 helper_server *srv;
25 int nargs = 0;
26 int k;
27 int x;
28 int rfd;
29 int wfd;
30 wordlist *w;
31 if (hlp->cmdline == NULL)
32 return;
33 progname = hlp->cmdline->key;
34 if ((s = strrchr(progname, '/')))
35 shortname = xstrdup(s + 1);
36 else
37 shortname = xstrdup(progname);
38 debug(29, 1) ("helperOpenServers: Starting %d '%s' processes\n",
39 hlp->n_to_start, shortname);
40 procname = xmalloc(strlen(shortname) + 3);
41 snprintf(procname, strlen(shortname) + 3, "(%s)", shortname);
42 args[nargs++] = procname;
43 for (w = hlp->cmdline->next; w && nargs < HELPER_MAX_ARGS; w = w->next)
44 args[nargs++] = w->key;
45 args[nargs++] = NULL;
46 assert(nargs <= HELPER_MAX_ARGS);
47 for (k = 0; k < hlp->n_to_start; k++) {
48 rfd = wfd = -1;
49 x = ipcCreate(hlp->ipc_type,
50 progname,
51 args,
52 shortname,
53 &rfd,
54 &wfd);
55 if (x < 0) {
56 debug(29, 1) ("WARNING: Cannot run '%s' process.\n", progname);
57 continue;
58 }
59 hlp->n_running++;
60 srv = xcalloc(1, sizeof(*srv));
61 cbdataAdd(srv, MEM_NONE);
62 srv->flags.alive = 1;
63 srv->index = k;
64 srv->rfd = rfd;
65 srv->wfd = wfd;
66 srv->buf = memAllocate(MEM_8K_BUF);
67 srv->buf_sz = 8192;
68 srv->offset = 0;
69 srv->parent = hlp;
70 cbdataLock(hlp); /* lock because of the parent backlink */
71 dlinkAddTail(srv, &srv->link, &hlp->servers);
72 if (rfd == wfd) {
73 snprintf(fd_note_buf, FD_DESC_SZ, "%s #%d", shortname, k + 1);
74 fd_note(rfd, fd_note_buf);
75 } else {
76 snprintf(fd_note_buf, FD_DESC_SZ, "reading %s #%d", shortname, k + 1);
77 fd_note(rfd, fd_note_buf);
78 snprintf(fd_note_buf, FD_DESC_SZ, "writing %s #%d", shortname, k + 1);
79 fd_note(wfd, fd_note_buf);
80 }
81 commSetNonBlocking(rfd);
82 if (wfd != rfd)
83 commSetNonBlocking(wfd);
84 comm_add_close_handler(rfd, helperServerFree, srv);
85 }
86 safe_free(shortname);
87 safe_free(procname);
88 helperKickQueue(hlp);
89 }
90
91 void
92 helperSubmit(helper * hlp, const char *buf, HLPCB * callback, void *data)
93 {
94 helper_request *r = xcalloc(1, sizeof(*r));
95 helper_server *srv;
96 if (hlp == NULL) {
97 debug(29,3)("helperSubmit: hlp == NULL\n");
98 callback(data, NULL);
99 return;
100 }
101 r->callback = callback;
102 r->data = data;
103 r->buf = xstrdup(buf);
104 cbdataLock(r->data);
105 if ((srv = GetFirstAvailable(hlp)))
106 helperDispatch(srv, r);
107 else
108 Enqueue(hlp, r);
109 }
110
111 void
112 helperStats(StoreEntry * sentry, helper * hlp)
113 {
114 helper_server *srv;
115 dlink_node *link;
116 storeAppendPrintf(sentry, "number running: %d of %d\n",
117 hlp->n_running, hlp->n_to_start);
118 storeAppendPrintf(sentry, "requests sent: %d\n",
119 hlp->stats.requests);
120 storeAppendPrintf(sentry, "replies received: %d\n",
121 hlp->stats.replies);
122 storeAppendPrintf(sentry, "queue length: %d\n",
123 hlp->stats.queue_size);
124 storeAppendPrintf(sentry, "avg service time: %d msec\n",
125 hlp->stats.avg_svc_time);
126 storeAppendPrintf(sentry, "\n");
127 storeAppendPrintf(sentry, "%7s\t%7s\t%11s\t%s\t%7s\t%7s\n",
128 "#",
129 "FD",
130 "# Requests",
131 "Flags",
132 "Time",
133 "Offset");
134 for (link = hlp->servers.head; link; link = link->next) {
135 srv = link->data;
136 storeAppendPrintf(sentry, "%7d\t%7d\t%11d\t%c%c%c%c\t%7.3f\t%7d\n",
137 srv->index + 1,
138 srv->rfd,
139 srv->stats.uses,
140 srv->flags.alive ? 'A' : ' ',
141 srv->flags.busy ? 'B' : ' ',
142 srv->flags.closing ? 'C' : ' ',
143 srv->flags.shutdown ? 'S' : ' ',
144 0.001 * tvSubMsec(srv->dispatch_time, current_time),
145 (int) srv->offset);
146 }
147 storeAppendPrintf(sentry, "\nFlags key:\n\n");
148 storeAppendPrintf(sentry, " A = ALIVE\n");
149 storeAppendPrintf(sentry, " B = BUSY\n");
150 storeAppendPrintf(sentry, " C = CLOSING\n");
151 storeAppendPrintf(sentry, " S = SHUTDOWN\n");
152 }
153
154 void
155 helperShutdown(helper * hlp)
156 {
157 dlink_node *link;
158 helper_server *srv;
159 for (link = hlp->servers.head; link; link = link->next) {
160 srv = link->data;
161 if (!srv->flags.alive) {
162 debug(34, 3) ("helperShutdown: %s #%d is NOT ALIVE.\n",
163 hlp->id_name, srv->index + 1);
164 continue;
165 }
166 srv->flags.shutdown = 1; /* request it to shut itself down */
167 if (srv->flags.busy) {
168 debug(34, 3) ("helperShutdown: %s #%d is BUSY.\n",
169 hlp->id_name, srv->index + 1);
170 continue;
171 }
172 if (srv->flags.closing) {
173 debug(34, 3) ("helperShutdown: %s #%d is CLOSING.\n",
174 hlp->id_name, srv->index + 1);
175 continue;
176 }
177 srv->flags.closing = 1;
178 comm_close(srv->wfd);
179 }
180 }
181
182 helper *
183 helperCreate(const char *name)
184 {
185 helper *hlp = xcalloc(1, sizeof(*hlp));
186 cbdataAdd(hlp, MEM_NONE);
187 hlp->id_name = name;
188 return hlp;
189 }
190
191 void
192 helperFree(helper * hlp)
193 {
194 /* note, don't free hlp->name, it probably points to static memory */
195 if (hlp->queue.head)
196 debug(29, 0) ("WARNING: freeing %s helper with %d requests queued\n",
197 hlp->id_name, hlp->stats.queue_size);
198 cbdataFree(hlp);
199 }
200
201 /* ====================================================================== */
202 /* LOCAL FUNCTIONS */
203 /* ====================================================================== */
204
205 static void
206 helperServerFree(int fd, void *data)
207 {
208 helper_server *srv = data;
209 helper *hlp = srv->parent;
210 assert(srv->rfd == fd);
211 if (srv->buf) {
212 memFree(MEM_8K_BUF, srv->buf);
213 srv->buf = NULL;
214 }
215 if (srv->wfd != srv->rfd)
216 comm_close(srv->wfd);
217 dlinkDelete(&srv->link, &hlp->servers);
218 hlp->n_running--;
219 assert(hlp->n_running >= 0);
220 if (!srv->flags.shutdown) {
221 debug(34, 0) ("WARNING: %s #%d (FD %d) exited\n",
222 hlp->id_name, srv->index + 1, fd);
223 assert(hlp->n_running >= hlp->n_to_start / 2);
224 if (hlp->n_running < hlp->n_to_start / 2)
225 fatalf("Too few %s processes are running", hlp->id_name);
226 }
227 cbdataUnlock(srv->parent);
228 cbdataFree(srv);
229 }
230
231 static void
232 helperHandleRead(int fd, void *data)
233 {
234 int len;
235 char *t = NULL;
236 helper_server *srv = data;
237 helper_request *r;
238 helper *hlp = srv->parent;
239 assert(fd == srv->rfd);
240 assert(cbdataValid(data));
241 Counter.syscalls.sock.reads++;
242 len = read(fd, srv->buf + srv->offset, srv->buf_sz - srv->offset);
243 fd_bytes(fd, len, FD_READ);
244 debug(29, 5) ("helperHandleRead: %d bytes from %s #%d.\n",
245 len, hlp->id_name, srv->index + 1);
246 if (len <= 0) {
247 if (len < 0)
248 debug(50, 1) ("helperHandleRead: FD %d read: %s\n", fd, xstrerror());
249 comm_close(fd);
250 return;
251 }
252 srv->offset += len;
253 srv->buf[srv->offset] = '\0';
254 r = srv->request;
255 if (r == NULL) {
256 /* someone spoke without being spoken to */
257 debug(29, 1) ("helperHandleRead: unexpected read from %s #%d, %d bytes\n",
258 hlp->id_name, srv->index + 1, len);
259 srv->offset = 0;
260 } else if ((t = strchr(srv->buf, '\n'))) {
261 /* end of reply found */
262 debug(29, 3) ("helperHandleRead: end of reply found\n");
263 *t = '\0';
264 if (cbdataValid(r->data))
265 r->callback(r->data, srv->buf);
266 srv->flags.busy = 0;
267 srv->offset = 0;
268 helperRequestFree(r);
269 hlp->stats.replies++;
270 hlp->stats.avg_svc_time =
271 intAverage(hlp->stats.avg_svc_time,
272 tvSubMsec(srv->dispatch_time, current_time),
273 hlp->stats.replies, REDIRECT_AV_FACTOR);
274 if (srv->flags.shutdown)
275 comm_close(srv->wfd);
276 } else {
277 commSetSelect(srv->rfd, COMM_SELECT_READ, helperHandleRead, srv, 0);
278 }
279 helperKickQueue(hlp);
280 }
281
282 static void
283 Enqueue(helper * hlp, helper_request * r)
284 {
285 dlink_node *link = xcalloc(1, sizeof(*link));
286 dlinkAddTail(r, link, &hlp->queue);
287 hlp->stats.queue_size++;
288 if (hlp->stats.queue_size < hlp->n_running)
289 return;
290 if (squid_curtime - hlp->last_queue_warn < 600)
291 return;
292 if (shutting_down || reconfiguring)
293 return;
294 hlp->last_queue_warn = squid_curtime;
295 debug(14, 0) ("WARNING: All %s processes are busy.\n", hlp->id_name);
296 debug(14, 0) ("WARNING: %d pending requests queued\n", hlp->stats.queue_size);
297 if (hlp->stats.queue_size > hlp->n_running * 2)
298 fatalf("Too many queued %s requests", hlp->id_name);
299 debug(14, 1) ("Consider increasing the number of %s processes in your config file.\n", hlp->id_name);
300 }
301
302 static helper_request *
303 Dequeue(helper * hlp)
304 {
305 dlink_node *link;
306 helper_request *r = NULL;
307 if ((link = hlp->queue.head)) {
308 r = link->data;
309 dlinkDelete(link, &hlp->queue);
310 safe_free(link);
311 hlp->stats.queue_size--;
312 }
313 return r;
314 }
315
316 static helper_server *
317 GetFirstAvailable(helper * hlp)
318 {
319 dlink_node *n;
320 helper_server *srv = NULL;
321 if (hlp->n_running == 0)
322 return NULL;
323 for (n = hlp->servers.head; n != NULL; n = n->next) {
324 srv = n->data;
325 if (srv->flags.busy)
326 continue;
327 if (!srv->flags.alive)
328 continue;
329 return srv;
330 }
331 return NULL;
332 }
333
334 static void
335 helperDispatch(helper_server * srv, helper_request * r)
336 {
337 helper *hlp = srv->parent;
338 if (!cbdataValid(r->data)) {
339 debug(29, 1) ("helperDispatch: invalid callback data\n");
340 helperRequestFree(r);
341 return;
342 }
343 assert(!srv->flags.busy);
344 srv->flags.busy = 1;
345 srv->request = r;
346 srv->dispatch_time = current_time;
347 comm_write(srv->wfd,
348 r->buf,
349 strlen(r->buf),
350 NULL, /* Handler */
351 NULL, /* Handler-data */
352 NULL); /* free */
353 commSetSelect(srv->rfd,
354 COMM_SELECT_READ,
355 helperHandleRead,
356 srv, 0);
357 debug(29, 5) ("helperDispatch: Request sent to %s #%d, %d bytes\n",
358 hlp->id_name, srv->index + 1, strlen(r->buf));
359 srv->stats.uses++;
360 hlp->stats.requests++;
361 }
362
363 static void
364 helperKickQueue(helper * hlp)
365 {
366 helper_request *r;
367 helper_server *srv;
368 while ((srv = GetFirstAvailable(hlp)) && (r = Dequeue(hlp)))
369 helperDispatch(srv, r);
370 }
371
372 static void
373 helperRequestFree(helper_request * r)
374 {
375 cbdataUnlock(r->data);
376 xfree(r->buf);
377 xfree(r);
378 }