]> git.ipfire.org Git - thirdparty/squid.git/blame - src/helper.cc
use EBIT_CLR,EBIT_TEST for comm_select fd_mask bits
[thirdparty/squid.git] / src / helper.cc
CommitLineData
74addf6c 1#include "squid.h"
2
3#define HELPER_MAX_ARGS 64
4
5static PF helperHandleRead;
1f5f60dd 6static PF helperServerFree;
74addf6c 7static void Enqueue(helper * hlp, helper_request *);
8static helper_request *Dequeue(helper * hlp);
9static helper_server *GetFirstAvailable(helper * hlp);
10static void helperDispatch(helper_server * srv, helper_request * r);
11static void helperKickQueue(helper * hlp);
12static void helperRequestFree(helper_request * r);
13
14
15void
16helperOpenServers(helper * hlp)
17{
18 char *s;
19 char *progname;
20 char *shortname;
21 char *procname;
22 char *args[HELPER_MAX_ARGS];
23 char fd_note_buf[FD_DESC_SZ];
24 helper_server *srv;
25 int nargs = 0;
26 int k;
27 int x;
28 int rfd;
29 int wfd;
30 wordlist *w;
31 if (hlp->cmdline == NULL)
32 return;
33 progname = hlp->cmdline->key;
34 assert(hlp->servers.head == NULL);
35 assert(hlp->servers.tail == NULL);
36 if ((s = strrchr(progname, '/')))
37 shortname = xstrdup(s + 1);
38 else
39 shortname = xstrdup(progname);
40 debug(29, 1) ("helperOpenServers: Starting %d '%s' processes\n",
41 hlp->n_to_start, shortname);
42 procname = xmalloc(strlen(shortname) + 3);
43 snprintf(procname, strlen(shortname) + 3, "(%s)", shortname);
44 args[nargs++] = procname;
45 for (w = hlp->cmdline->next; w && nargs < HELPER_MAX_ARGS; w = w->next)
46 args[nargs++] = w->key;
47 args[nargs++] = NULL;
48 assert(nargs <= HELPER_MAX_ARGS);
49 for (k = 0; k < hlp->n_to_start; k++) {
50 rfd = wfd = -1;
51 x = ipcCreate(hlp->ipc_type,
52 progname,
53 args,
54 shortname,
55 &rfd,
56 &wfd);
57 if (x < 0) {
58 debug(29, 1) ("WARNING: Cannot run '%s' process.\n", progname);
59 continue;
60 }
61 hlp->n_running++;
62 srv = xcalloc(1, sizeof(*srv));
63 cbdataAdd(srv, MEM_NONE);
64 srv->flags.alive = 1;
65 srv->index = k;
66 srv->rfd = rfd;
67 srv->wfd = wfd;
68 srv->buf = memAllocate(MEM_8K_BUF);
69 srv->buf_sz = 8192;
70 srv->offset = 0;
71 srv->parent = hlp;
1f5f60dd 72 cbdataLock(hlp); /* lock because of the parent backlink */
74addf6c 73 dlinkAddTail(srv, &srv->link, &hlp->servers);
74 if (rfd == wfd) {
75 snprintf(fd_note_buf, FD_DESC_SZ, "%s #%d", shortname, k + 1);
76 fd_note(rfd, fd_note_buf);
77 } else {
78 snprintf(fd_note_buf, FD_DESC_SZ, "reading %s #%d", shortname, k + 1);
79 fd_note(rfd, fd_note_buf);
80 snprintf(fd_note_buf, FD_DESC_SZ, "writing %s #%d", shortname, k + 1);
81 fd_note(wfd, fd_note_buf);
82 }
83 commSetNonBlocking(rfd);
84 if (wfd != rfd)
85 commSetNonBlocking(wfd);
1f5f60dd 86 comm_add_close_handler(rfd, helperServerFree, srv);
74addf6c 87 }
88 safe_free(shortname);
89 safe_free(procname);
90}
91
92void
93helperSubmit(helper * hlp, const char *buf, HLPCB * callback, void *data)
94{
95 helper_request *r = xcalloc(1, sizeof(*r));
96 helper_server *srv;
97 r->callback = callback;
98 r->data = data;
99 r->buf = xstrdup(buf);
100 cbdataLock(r->data);
101 if ((srv = GetFirstAvailable(hlp)))
102 helperDispatch(srv, r);
103 else
104 Enqueue(hlp, r);
105}
106
107void
108helperStats(StoreEntry * sentry, helper * hlp)
109{
110 helper_server *srv;
111 dlink_node *link;
112 storeAppendPrintf(sentry, "number running: %d of %d\n",
113 hlp->n_running, hlp->n_to_start);
114 storeAppendPrintf(sentry, "requests sent: %d\n",
115 hlp->stats.requests);
116 storeAppendPrintf(sentry, "replies received: %d\n",
117 hlp->stats.replies);
118 storeAppendPrintf(sentry, "queue length: %d\n",
119 hlp->stats.queue_size);
120 storeAppendPrintf(sentry, "avg service time: %d msec\n",
121 hlp->stats.avg_svc_time);
122 storeAppendPrintf(sentry, "\n");
123 storeAppendPrintf(sentry, "%7s\t%7s\t%11s\t%s\t%7s\t%7s\n",
124 "#",
125 "FD",
126 "# Requests",
127 "Flags",
128 "Time",
129 "Offset");
130 for (link = hlp->servers.head; link; link = link->next) {
131 srv = link->data;
132 storeAppendPrintf(sentry, "%7d\t%7d\t%11d\t%c%c%c%c\t%7.3f\t%7d\n",
133 srv->index + 1,
134 srv->rfd,
135 srv->stats.uses,
136 srv->flags.alive ? 'A' : ' ',
137 srv->flags.busy ? 'B' : ' ',
138 srv->flags.closing ? 'C' : ' ',
139 srv->flags.shutdown ? 'S' : ' ',
140 0.001 * tvSubMsec(srv->dispatch_time, current_time),
141 (int) srv->offset);
142 }
143 storeAppendPrintf(sentry, "\nFlags key:\n\n");
144 storeAppendPrintf(sentry, " A = ALIVE\n");
145 storeAppendPrintf(sentry, " B = BUSY\n");
146 storeAppendPrintf(sentry, " C = CLOSING\n");
147 storeAppendPrintf(sentry, " S = SHUTDOWN\n");
148}
149
150void
151helperShutdown(helper * hlp)
152{
153 dlink_node *link;
154 helper_server *srv;
155 for (link = hlp->servers.head; link; link = link->next) {
156 srv = link->data;
157 if (!srv->flags.alive) {
158 debug(34, 3) ("helperShutdown: %s #%d is NOT ALIVE.\n",
159 hlp->id_name, srv->index + 1);
160 continue;
161 }
1f5f60dd 162 srv->flags.shutdown = 1; /* request it to shut itself down */
74addf6c 163 if (srv->flags.busy) {
164 debug(34, 3) ("helperShutdown: %s #%d is BUSY.\n",
165 hlp->id_name, srv->index + 1);
74addf6c 166 continue;
167 }
168 if (srv->flags.closing) {
169 debug(34, 3) ("helperShutdown: %s #%d is CLOSING.\n",
170 hlp->id_name, srv->index + 1);
171 continue;
172 }
74addf6c 173 srv->flags.closing = 1;
1f5f60dd 174 comm_close(srv->wfd);
74addf6c 175 }
176}
177
1f5f60dd 178helper *
179helperCreate(const char *name)
180{
181 helper *hlp = xcalloc(1, sizeof(*hlp));
182 cbdataAdd(hlp, MEM_NONE);
183 hlp->id_name = name;
184 return hlp;
185}
186
187void
188helperFree(helper * hlp)
189{
190 /* note, don't free hlp->name, it probably points to static memory */
fe73896c 191 if (hlp->queue.head)
192 debug(29, 0) ("WARNING: freeing %s helper with %d requests queued\n",
193 hlp->id_name, hlp->stats.queue_size);
1f5f60dd 194 cbdataFree(hlp);
195}
196
74addf6c 197/* ====================================================================== */
198/* LOCAL FUNCTIONS */
199/* ====================================================================== */
200
201static void
1f5f60dd 202helperServerFree(int fd, void *data)
74addf6c 203{
204 helper_server *srv = data;
205 helper *hlp = srv->parent;
206 assert(srv->rfd == fd);
207 if (srv->buf) {
208 memFree(MEM_8K_BUF, srv->buf);
209 srv->buf = NULL;
210 }
211 if (srv->wfd != srv->rfd)
212 comm_close(srv->wfd);
213 dlinkDelete(&srv->link, &hlp->servers);
74addf6c 214 hlp->n_running--;
215 assert(hlp->n_running >= 0);
1f5f60dd 216 if (!srv->flags.shutdown) {
217 debug(34, 0) ("WARNING: %s #%d (FD %d) exited\n",
14e87a44 218 hlp->id_name, srv->index + 1, fd);
1f5f60dd 219 assert(hlp->n_running >= hlp->n_to_start / 2);
220 if (hlp->n_running < hlp->n_to_start / 2)
14e87a44 221 fatalf("Too few %s processes are running", hlp->id_name);
222 }
1f5f60dd 223 cbdataUnlock(srv->parent);
14e87a44 224 cbdataFree(srv);
74addf6c 225}
226
227static void
228helperHandleRead(int fd, void *data)
229{
230 int len;
231 char *t = NULL;
232 helper_server *srv = data;
233 helper_request *r;
234 helper *hlp = srv->parent;
235 assert(fd == srv->rfd);
236 assert(cbdataValid(data));
237 Counter.syscalls.sock.reads++;
238 len = read(fd, srv->buf + srv->offset, srv->buf_sz - srv->offset);
239 fd_bytes(fd, len, FD_READ);
240 debug(29, 5) ("helperHandleRead: %d bytes from %s #%d.\n",
241 len, hlp->id_name, srv->index + 1);
242 if (len <= 0) {
243 if (len < 0)
244 debug(50, 1) ("helperHandleRead: FD %d read: %s\n", fd, xstrerror());
245 comm_close(fd);
246 return;
247 }
248 srv->offset += len;
249 srv->buf[srv->offset] = '\0';
250 r = srv->request;
251 if (r == NULL) {
252 /* someone spoke without being spoken to */
253 debug(29, 1) ("helperHandleRead: unexpected read from %s #%d, %d bytes\n",
254 hlp->id_name, srv->index + 1, len);
255 srv->offset = 0;
256 } else if ((t = strchr(srv->buf, '\n'))) {
257 /* end of reply found */
258 debug(29, 3) ("helperHandleRead: end of reply found\n");
259 *t = '\0';
260 if (cbdataValid(r->data))
261 r->callback(r->data, srv->buf);
262 srv->flags.busy = 0;
263 srv->offset = 0;
264 helperRequestFree(r);
265 hlp->stats.replies++;
266 hlp->stats.avg_svc_time =
267 intAverage(hlp->stats.avg_svc_time,
268 tvSubMsec(srv->dispatch_time, current_time),
269 hlp->stats.replies, REDIRECT_AV_FACTOR);
270 if (srv->flags.shutdown)
271 comm_close(srv->wfd);
272 } else {
273 commSetSelect(srv->rfd, COMM_SELECT_READ, helperHandleRead, srv, 0);
274 }
275 helperKickQueue(hlp);
276}
277
278static void
279Enqueue(helper * hlp, helper_request * r)
280{
281 dlink_node *link = xcalloc(1, sizeof(*link));
282 dlinkAddTail(r, link, &hlp->queue);
283 hlp->stats.queue_size++;
284 if (hlp->stats.queue_size < hlp->n_running)
285 return;
286 if (squid_curtime - hlp->last_queue_warn < 600)
287 return;
fe73896c 288 if (shutting_down || reconfiguring)
289 return;
74addf6c 290 hlp->last_queue_warn = squid_curtime;
291 debug(14, 0) ("WARNING: All %s processes are busy.\n", hlp->id_name);
292 debug(14, 0) ("WARNING: %d pending requests queued\n", hlp->stats.queue_size);
293 if (hlp->stats.queue_size > hlp->n_running * 2)
294 fatalf("Too many queued %s requests", hlp->id_name);
295 debug(14, 1) ("Consider increasing the number of %s processes in your config file.\n", hlp->id_name);
296}
297
298static helper_request *
299Dequeue(helper * hlp)
300{
301 dlink_node *link;
302 helper_request *r = NULL;
303 if ((link = hlp->queue.head)) {
304 r = link->data;
305 dlinkDelete(link, &hlp->queue);
306 safe_free(link);
307 hlp->stats.queue_size--;
308 }
309 return r;
310}
311
312static helper_server *
313GetFirstAvailable(helper * hlp)
314{
315 dlink_node *n;
316 helper_server *srv = NULL;
fe73896c 317 if (hlp->n_running == 0)
318 return NULL;
74addf6c 319 for (n = hlp->servers.head; n != NULL; n = n->next) {
320 srv = n->data;
321 if (srv->flags.busy)
322 continue;
323 if (!srv->flags.alive)
324 continue;
325 return srv;
326 }
327 return NULL;
328}
329
330static void
331helperDispatch(helper_server * srv, helper_request * r)
332{
333 helper *hlp = srv->parent;
334 if (!cbdataValid(r->data)) {
335 debug(29, 1) ("helperDispatch: invalid callback data\n");
336 helperRequestFree(r);
337 return;
338 }
339 assert(!srv->flags.busy);
340 srv->flags.busy = 1;
341 srv->request = r;
342 srv->dispatch_time = current_time;
343 comm_write(srv->wfd,
344 r->buf,
345 strlen(r->buf),
346 NULL, /* Handler */
347 NULL, /* Handler-data */
348 NULL); /* free */
349 commSetSelect(srv->rfd,
350 COMM_SELECT_READ,
351 helperHandleRead,
352 srv, 0);
353 debug(29, 5) ("helperDispatch: Request sent to %s #%d, %d bytes\n",
354 hlp->id_name, srv->index + 1, strlen(r->buf));
355 srv->stats.uses++;
356 hlp->stats.requests++;
357}
358
359static void
360helperKickQueue(helper * hlp)
361{
362 helper_request *r;
363 helper_server *srv;
364 while ((srv = GetFirstAvailable(hlp)) && (r = Dequeue(hlp)))
365 helperDispatch(srv, r);
366}
367
368static void
369helperRequestFree(helper_request * r)
370{
371 cbdataUnlock(r->data);
372 xfree(r->buf);
373 xfree(r);
374}