]> git.ipfire.org Git - thirdparty/squid.git/blob - src/ipc_win32.cc
Merged from trunk rev.13627
[thirdparty/squid.git] / src / ipc_win32.cc
1 /*
2 * Copyright (C) 1996-2014 The Squid Software Foundation and contributors
3 *
4 * Squid software is distributed under GPLv2+ license and includes
5 * contributions from numerous individuals and organizations.
6 * Please see the COPYING and CONTRIBUTORS files for details.
7 */
8
9 /* DEBUG: section 54 Windows Interprocess Communication */
10
11 #include "squid.h"
12 #include "cache_cf.h"
13 #include "comm.h"
14 #include "comm/Connection.h"
15 #include "fd.h"
16 #include "fde.h"
17 #include "globals.h"
18 #include "ip/Address.h"
19 #include "rfc1738.h"
20 #include "SquidConfig.h"
21 #include "SquidIpc.h"
22 #include "SquidTime.h"
23 #include "tools.h"
24
25 #include <cerrno>
26 #if HAVE_MSWSOCK_H
27 #include <mswsock.h>
28 #endif
29 #include <process.h>
30
31 struct ipc_params {
32 int type;
33 int crfd;
34 int cwfd;
35 Ip::Address local_addr;
36 struct addrinfo PS;
37 const char *prog;
38 char **args;
39 };
40
41 struct thread_params {
42 int type;
43 int rfd;
44 int send_fd;
45 const char *prog;
46 pid_t pid;
47 };
48
49 static unsigned int __stdcall ipc_thread_1(void *params);
50 static unsigned int __stdcall ipc_thread_2(void *params);
51
52 static const char *ok_string = "OK\n";
53 static const char *err_string = "ERR\n";
54 static const char *shutdown_string = "$shutdown\n";
55
56 static const char *hello_string = "hi there\n";
57 #define HELLO_BUF_SZ 32
58 static char hello_buf[HELLO_BUF_SZ];
59
60 static int
61 ipcCloseAllFD(int prfd, int pwfd, int crfd, int cwfd)
62 {
63 if (prfd >= 0)
64 comm_close(prfd);
65
66 if (prfd != pwfd)
67 if (pwfd >= 0)
68 comm_close(pwfd);
69
70 if (crfd >= 0)
71 comm_close(crfd);
72
73 if (crfd != cwfd)
74 if (cwfd >= 0)
75 comm_close(cwfd);
76
77 return -1;
78 }
79
80 static void
81 PutEnvironment()
82 {
83 #if HAVE_PUTENV
84 char *env_str;
85 int tmp_s;
86 env_str = (char *)xcalloc((tmp_s = strlen(Debug::debugOptions) + 32), 1);
87 snprintf(env_str, tmp_s, "SQUID_DEBUG=%s", Debug::debugOptions);
88 putenv(env_str);
89 #endif
90 }
91
92 pid_t
93 ipcCreate(int type, const char *prog, const char *const args[], const char *name, Ip::Address &local_addr, int *rfd, int *wfd, void **hIpc)
94 {
95 unsigned long thread;
96
97 struct ipc_params params;
98 int opt;
99 int optlen = sizeof(opt);
100 DWORD ecode = 0;
101 pid_t pid;
102
103 Ip::Address tmp_addr;
104 struct addrinfo *aiCS = NULL;
105 struct addrinfo *aiPS = NULL;
106
107 int crfd = -1;
108 int prfd = -1;
109 int cwfd = -1;
110 int pwfd = -1;
111 int x;
112
113 requirePathnameExists(name, prog);
114
115 if (rfd)
116 *rfd = -1;
117
118 if (wfd)
119 *wfd = -1;
120
121 if (hIpc)
122 *hIpc = NULL;
123
124 if (WIN32_OS_version != _WIN_OS_WINNT) {
125 getsockopt(INVALID_SOCKET, SOL_SOCKET, SO_OPENTYPE, (char *) &opt, &optlen);
126 opt = opt & ~(SO_SYNCHRONOUS_NONALERT | SO_SYNCHRONOUS_ALERT);
127 setsockopt(INVALID_SOCKET, SOL_SOCKET, SO_OPENTYPE, (char *) &opt, sizeof(opt));
128 }
129
130 if (type == IPC_TCP_SOCKET) {
131 crfd = cwfd = comm_open(SOCK_STREAM,
132 IPPROTO_TCP,
133 local_addr,
134 COMM_NOCLOEXEC,
135 name);
136 prfd = pwfd = comm_open(SOCK_STREAM,
137 IPPROTO_TCP, /* protocol */
138 local_addr,
139 0, /* blocking */
140 name);
141 } else if (type == IPC_UDP_SOCKET) {
142 crfd = cwfd = comm_open(SOCK_DGRAM,
143 IPPROTO_UDP,
144 local_addr,
145 COMM_NOCLOEXEC,
146 name);
147 prfd = pwfd = comm_open(SOCK_DGRAM,
148 IPPROTO_UDP,
149 local_addr,
150 0,
151 name);
152 } else if (type == IPC_FIFO) {
153 debugs(54, DBG_CRITICAL, "ipcCreate: " << prog << ": use IPC_TCP_SOCKET instead of IP_FIFO on Windows");
154 assert(0);
155 } else {
156 assert(IPC_NONE);
157 }
158
159 debugs(54, 3, "ipcCreate: prfd FD " << prfd);
160 debugs(54, 3, "ipcCreate: pwfd FD " << pwfd);
161 debugs(54, 3, "ipcCreate: crfd FD " << crfd);
162 debugs(54, 3, "ipcCreate: cwfd FD " << cwfd);
163
164 if (WIN32_OS_version != _WIN_OS_WINNT) {
165 getsockopt(INVALID_SOCKET, SOL_SOCKET, SO_OPENTYPE, (char *) &opt, &optlen);
166 opt = opt | SO_SYNCHRONOUS_NONALERT;
167 setsockopt(INVALID_SOCKET, SOL_SOCKET, SO_OPENTYPE, (char *) &opt, optlen);
168 }
169
170 if (crfd < 0) {
171 debugs(54, DBG_CRITICAL, "ipcCreate: Failed to create child FD.");
172 return ipcCloseAllFD(prfd, pwfd, crfd, cwfd);
173 }
174
175 if (pwfd < 0) {
176 debugs(54, DBG_CRITICAL, "ipcCreate: Failed to create server FD.");
177 return ipcCloseAllFD(prfd, pwfd, crfd, cwfd);
178 }
179
180 // AYJ: these flags should be neutral, but if not IPv6 version needs adding
181 if (type == IPC_TCP_SOCKET || type == IPC_UDP_SOCKET) {
182
183 Ip::Address::InitAddr(aiPS);
184
185 if (getsockname(pwfd, aiPS->ai_addr, &(aiPS->ai_addrlen) ) < 0) {
186 debugs(54, DBG_CRITICAL, "ipcCreate: getsockname: " << xstrerror());
187 Ip::Address::FreeAddr(aiPS);
188 return ipcCloseAllFD(prfd, pwfd, crfd, cwfd);
189 }
190
191 tmp_addr = *aiPS;
192 Ip::Address::FreeAddr(aiPS);
193
194 debugs(54, 3, "ipcCreate: FD " << pwfd << " sockaddr " << tmp_addr );
195
196 Ip::Address::InitAddr(aiCS);
197
198 if (getsockname(crfd, aiCS->ai_addr, &(aiCS->ai_addrlen) ) < 0) {
199 debugs(54, DBG_CRITICAL, "ipcCreate: getsockname: " << xstrerror());
200 Ip::Address::FreeAddr(aiCS);
201 return ipcCloseAllFD(prfd, pwfd, crfd, cwfd);
202 }
203
204 tmp_addr.setEmpty();
205 tmp_addr = *aiCS;
206 Ip::Address::FreeAddr(aiCS);
207
208 debugs(54, 3, "ipcCreate: FD " << crfd << " sockaddr " << tmp_addr );
209 }
210
211 if (type == IPC_TCP_SOCKET) {
212 if (listen(crfd, 1) < 0) {
213 debugs(54, DBG_IMPORTANT, "ipcCreate: listen FD " << crfd << ": " << xstrerror());
214 return ipcCloseAllFD(prfd, pwfd, crfd, cwfd);
215 }
216
217 debugs(54, 3, "ipcCreate: FD " << crfd << " listening...");
218 }
219
220 /* flush or else we get dup data if unbuffered_logs is set */
221 logsFlush();
222
223 params.type = type;
224
225 params.crfd = crfd;
226
227 params.cwfd = cwfd;
228
229 params.PS = *aiPS;
230
231 params.local_addr = local_addr;
232
233 params.prog = prog;
234
235 params.args = (char **) args;
236
237 thread = _beginthreadex(NULL, 0, ipc_thread_1, &params, 0, NULL);
238
239 if (thread == 0) {
240 debugs(54, DBG_IMPORTANT, "ipcCreate: _beginthread: " << xstrerror());
241 return ipcCloseAllFD(prfd, pwfd, crfd, cwfd);
242 }
243
244 /* NP: tmp_addr was left with eiether empty or aiCS in Ip::Address format */
245 if (comm_connect_addr(pwfd, tmp_addr) == Comm::COMM_ERROR) {
246 CloseHandle((HANDLE) thread);
247 return ipcCloseAllFD(prfd, pwfd, -1, -1);
248 }
249
250 memset(hello_buf, '\0', HELLO_BUF_SZ);
251 x = recv(prfd, (void *)hello_buf, HELLO_BUF_SZ - 1, 0);
252
253 if (x < 0) {
254 debugs(54, DBG_CRITICAL, "ipcCreate: PARENT: hello read test failed");
255 debugs(54, DBG_CRITICAL, "--> read: " << xstrerror());
256 CloseHandle((HANDLE) thread);
257 return ipcCloseAllFD(prfd, pwfd, -1, -1);
258 } else if (strcmp(hello_buf, hello_string)) {
259 debugs(54, DBG_CRITICAL, "ipcCreate: PARENT: hello read test failed");
260 debugs(54, DBG_CRITICAL, "--> read returned " << x);
261 debugs(54, DBG_CRITICAL, "--> got '" << rfc1738_escape(hello_buf) << "'");
262 CloseHandle((HANDLE) thread);
263 return ipcCloseAllFD(prfd, pwfd, -1, -1);
264 }
265
266 x = send(pwfd, (const void *)ok_string, strlen(ok_string), 0);
267
268 if (x < 0) {
269 debugs(54, DBG_CRITICAL, "ipcCreate: PARENT: OK write test failed");
270 debugs(54, DBG_CRITICAL, "--> read: " << xstrerror());
271 CloseHandle((HANDLE) thread);
272 return ipcCloseAllFD(prfd, pwfd, -1, -1);
273 }
274
275 memset(hello_buf, '\0', HELLO_BUF_SZ);
276 x = recv(prfd, (void *)hello_buf, HELLO_BUF_SZ - 1, 0);
277
278 if (x < 0) {
279 debugs(54, DBG_CRITICAL, "ipcCreate: PARENT: OK read test failed");
280 debugs(54, DBG_CRITICAL, "--> read: " << xstrerror());
281 CloseHandle((HANDLE) thread);
282 return ipcCloseAllFD(prfd, pwfd, -1, -1);
283 } else if (!strcmp(hello_buf, err_string)) {
284 debugs(54, DBG_CRITICAL, "ipcCreate: PARENT: OK read test failed");
285 debugs(54, DBG_CRITICAL, "--> read returned " << x);
286 debugs(54, DBG_CRITICAL, "--> got '" << rfc1738_escape(hello_buf) << "'");
287 CloseHandle((HANDLE) thread);
288 return ipcCloseAllFD(prfd, pwfd, -1, -1);
289 }
290
291 hello_buf[x] = '\0';
292 pid = atol(hello_buf);
293 commUnsetFdTimeout(prfd);
294 commSetNonBlocking(prfd);
295 commSetNonBlocking(pwfd);
296 commSetCloseOnExec(prfd);
297 commSetCloseOnExec(pwfd);
298
299 if (rfd)
300 *rfd = prfd;
301
302 if (wfd)
303 *wfd = pwfd;
304
305 fd_table[prfd].flags.ipc = true;
306 fd_table[pwfd].flags.ipc = true;
307 fd_table[crfd].flags.ipc = true;
308 fd_table[cwfd].flags.ipc = true;
309
310 if (Config.sleep_after_fork) {
311 /* XXX emulation of usleep() */
312 DWORD sl;
313 sl = Config.sleep_after_fork / 1000;
314
315 if (sl == 0)
316 sl = 1;
317
318 Sleep(sl);
319 }
320
321 if (GetExitCodeThread((HANDLE) thread, &ecode) && ecode == STILL_ACTIVE) {
322 if (hIpc)
323 *hIpc = (HANDLE) thread;
324
325 return pid;
326 } else {
327 CloseHandle((HANDLE) thread);
328 return ipcCloseAllFD(prfd, pwfd, -1, -1);
329 }
330 }
331
332 static int
333 ipcSend(int cwfd, const char *buf, int len)
334 {
335 int x;
336
337 x = send(cwfd, (const void *)buf, len, 0);
338
339 if (x < 0) {
340 debugs(54, DBG_CRITICAL, "sendto FD " << cwfd << ": " << xstrerror());
341 debugs(54, DBG_CRITICAL, "ipcCreate: CHILD: hello write test failed");
342 }
343
344 return x;
345 }
346
347 static unsigned int __stdcall
348 ipc_thread_1(void *in_params)
349 {
350 int t1, t2, t3, retval = -1;
351 int p2c[2] = {-1, -1};
352 int c2p[2] = {-1, -1};
353 HANDLE hProcess = NULL, thread = NULL;
354 pid_t pid = -1;
355
356 struct thread_params thread_params;
357 ssize_t x;
358 int fd = -1;
359 char *str;
360 STARTUPINFO si;
361 PROCESS_INFORMATION pi;
362 long F;
363 int prfd_ipc = -1, pwfd_ipc = -1, crfd_ipc = -1, cwfd_ipc = -1;
364 char *prog = NULL, *buf1 = NULL;
365
366 Ip::Address PS_ipc;
367 Ip::Address CS_ipc;
368 struct addrinfo *aiPS_ipc = NULL;
369 struct addrinfo *aiCS_ipc = NULL;
370
371 struct ipc_params *params = (struct ipc_params *) in_params;
372 int type = params->type;
373 int crfd = params->crfd;
374 int cwfd = params->cwfd;
375 char **args = params->args;
376
377 Ip::Address PS = params->PS;
378 Ip::Address local_addr = params->local_addr;
379
380 const size_t bufSz = 8192;
381 buf1 = (char *)xcalloc(1, bufSz);
382 strcpy(buf1, params->prog);
383 prog = strtok(buf1, w_space);
384
385 if ((str = strrchr(prog, '/')))
386 prog = ++str;
387
388 if ((str = strrchr(prog, '\\')))
389 prog = ++str;
390
391 prog = xstrdup(prog);
392
393 if (type == IPC_TCP_SOCKET) {
394 debugs(54, 3, "ipcCreate: calling accept on FD " << crfd);
395
396 if ((fd = accept(crfd, NULL, NULL)) < 0) {
397 debugs(54, DBG_CRITICAL, "ipcCreate: FD " << crfd << " accept: " << xstrerror());
398 goto cleanup;
399 }
400
401 debugs(54, 3, "ipcCreate: CHILD accepted new FD " << fd);
402 comm_close(crfd);
403 snprintf(buf1, bufSz-1, "%s CHILD socket", prog);
404 fd_open(fd, FD_SOCKET, buf1);
405 fd_table[fd].flags.ipc = 1;
406 cwfd = crfd = fd;
407 } else if (type == IPC_UDP_SOCKET) {
408 if (comm_connect_addr(crfd, params->PS) == Comm::COMM_ERROR)
409 goto cleanup;
410 }
411
412 x = send(cwfd, (const void *)hello_string, strlen(hello_string) + 1, 0);
413
414 if (x < 0) {
415 debugs(54, DBG_CRITICAL, "sendto FD " << cwfd << ": " << xstrerror());
416 debugs(54, DBG_CRITICAL, "ipcCreate: CHILD: hello write test failed");
417 goto cleanup;
418 }
419
420 PutEnvironment();
421 memset(buf1, '\0', bufSz);
422 x = recv(crfd, (void *)buf1, bufSz-1, 0);
423
424 if (x < 0) {
425 debugs(54, DBG_CRITICAL, "ipcCreate: CHILD: OK read test failed");
426 debugs(54, DBG_CRITICAL, "--> read: " << xstrerror());
427 goto cleanup;
428 } else if (strcmp(buf1, ok_string)) {
429 debugs(54, DBG_CRITICAL, "ipcCreate: CHILD: OK read test failed");
430 debugs(54, DBG_CRITICAL, "--> read returned " << x);
431 debugs(54, DBG_CRITICAL, "--> got '" << rfc1738_escape(hello_buf) << "'");
432 goto cleanup;
433 }
434
435 /* assign file descriptors to child process */
436 if (_pipe(p2c, 1024, _O_BINARY | _O_NOINHERIT) < 0) {
437 debugs(54, DBG_CRITICAL, "ipcCreate: CHILD: pipe: " << xstrerror());
438 ipcSend(cwfd, err_string, strlen(err_string));
439 goto cleanup;
440 }
441
442 if (_pipe(c2p, 1024, _O_BINARY | _O_NOINHERIT) < 0) {
443 debugs(54, DBG_CRITICAL, "ipcCreate: CHILD: pipe: " << xstrerror());
444 ipcSend(cwfd, err_string, strlen(err_string));
445 goto cleanup;
446 }
447
448 if (type == IPC_UDP_SOCKET) {
449 snprintf(buf1, bufSz, "%s(%ld) <-> ipc CHILD socket", prog, -1L);
450 crfd_ipc = cwfd_ipc = comm_open(SOCK_DGRAM, IPPROTO_UDP, local_addr, 0, buf1);
451
452 if (crfd_ipc < 0) {
453 debugs(54, DBG_CRITICAL, "ipcCreate: CHILD: Failed to create child FD for " << prog << ".");
454 ipcSend(cwfd, err_string, strlen(err_string));
455 goto cleanup;
456 }
457
458 snprintf(buf1, bufSz, "%s(%ld) <-> ipc PARENT socket", prog, -1L);
459 prfd_ipc = pwfd_ipc = comm_open(SOCK_DGRAM, IPPROTO_UDP, local_addr, 0, buf1);
460
461 if (pwfd_ipc < 0) {
462 debugs(54, DBG_CRITICAL, "ipcCreate: CHILD: Failed to create server FD for " << prog << ".");
463 ipcSend(cwfd, err_string, strlen(err_string));
464 goto cleanup;
465 }
466
467 Ip::Address::InitAddr(aiPS_ipc);
468
469 if (getsockname(pwfd_ipc, aiPS_ipc->ai_addr, &(aiPS_ipc->ai_addrlen)) < 0) {
470 debugs(54, DBG_CRITICAL, "ipcCreate: getsockname: " << xstrerror());
471 ipcSend(cwfd, err_string, strlen(err_string));
472 Ip::Address::FreeAddr(aiPS_ipc);
473 goto cleanup;
474 }
475
476 PS_ipc = *aiPS_ipc;
477 Ip::Address::FreeAddr(aiPS_ipc);
478
479 debugs(54, 3, "ipcCreate: FD " << pwfd_ipc << " sockaddr " << PS_ipc);
480
481 Ip::Address::InitAddr(aiCS_ipc);
482
483 if (getsockname(crfd_ipc, aiCS_ipc->ai_addr, &(aiCS_ipc->ai_addrlen)) < 0) {
484 debugs(54, DBG_CRITICAL, "ipcCreate: getsockname: " << xstrerror());
485 ipcSend(cwfd, err_string, strlen(err_string));
486 Ip::Address::FreeAddr(aiCS_ipc);
487 goto cleanup;
488 }
489
490 CS_ipc = *aiCS_ipc;
491 Ip::Address::FreeAddr(aiCS_ipc);
492
493 debugs(54, 3, "ipcCreate: FD " << crfd_ipc << " sockaddr " << CS_ipc);
494
495 if (comm_connect_addr(pwfd_ipc, CS_ipc) == Comm::COMM_ERROR) {
496 ipcSend(cwfd, err_string, strlen(err_string));
497 goto cleanup;
498 }
499
500 fd = crfd;
501
502 if (comm_connect_addr(crfd_ipc, PS_ipc) == Comm::COMM_ERROR) {
503 ipcSend(cwfd, err_string, strlen(err_string));
504 goto cleanup;
505 }
506 } /* IPC_UDP_SOCKET */
507
508 t1 = dup(0);
509
510 t2 = dup(1);
511
512 t3 = dup(2);
513
514 dup2(c2p[0], 0);
515
516 dup2(p2c[1], 1);
517
518 dup2(fileno(debug_log), 2);
519
520 close(c2p[0]);
521
522 close(p2c[1]);
523
524 commUnsetNonBlocking(fd);
525
526 memset(&si, 0, sizeof(STARTUPINFO));
527
528 si.cb = sizeof(STARTUPINFO);
529
530 si.hStdInput = (HANDLE) _get_osfhandle(0);
531
532 si.hStdOutput = (HANDLE) _get_osfhandle(1);
533
534 si.hStdError = (HANDLE) _get_osfhandle(2);
535
536 si.dwFlags = STARTF_USESTDHANDLES;
537
538 /* Make sure all other valid handles are not inerithable */
539 for (x = 3; x < Squid_MaxFD; ++x) {
540 if ((F = _get_osfhandle(x)) == -1)
541 continue;
542
543 SetHandleInformation((HANDLE) F, HANDLE_FLAG_INHERIT, 0);
544 }
545
546 *buf1 = '\0';
547 strcpy(buf1 + 4096, params->prog);
548 str = strtok(buf1 + 4096, w_space);
549
550 do {
551 strcat(buf1, str);
552 strcat(buf1, " ");
553 } while ((str = strtok(NULL, w_space)));
554
555 x = 1;
556
557 while (args[x]) {
558 strcat(buf1, args[x]);
559 ++x;
560 strcat(buf1, " ");
561 }
562
563 if (CreateProcess(buf1 + 4096, buf1, NULL, NULL, TRUE, CREATE_NO_WINDOW,
564 NULL, NULL, &si, &pi)) {
565 pid = pi.dwProcessId;
566 hProcess = pi.hProcess;
567 } else {
568 pid = -1;
569 x = GetLastError();
570 }
571
572 dup2(t1, 0);
573 dup2(t2, 1);
574 dup2(t3, 2);
575 close(t1);
576 close(t2);
577 close(t3);
578
579 if (pid == -1) {
580 errno = x;
581 debugs(54, DBG_CRITICAL, "ipcCreate: CHILD: " << params->prog << ": " << xstrerror());
582
583 ipcSend(cwfd, err_string, strlen(err_string));
584 goto cleanup;
585 }
586
587 if (type == IPC_UDP_SOCKET) {
588 WSAPROTOCOL_INFO wpi;
589
590 memset(&wpi, 0, sizeof(wpi));
591
592 if (SOCKET_ERROR == WSADuplicateSocket(crfd_ipc, pid, &wpi)) {
593 debugs(54, DBG_CRITICAL, "ipcCreate: CHILD: WSADuplicateSocket: " << xstrerror());
594
595 ipcSend(cwfd, err_string, strlen(err_string));
596 goto cleanup;
597 }
598
599 x = write(c2p[1], (const char *) &wpi, sizeof(wpi));
600
601 if (x < (ssize_t)sizeof(wpi)) {
602 debugs(54, DBG_CRITICAL, "ipcCreate: CHILD: write FD " << c2p[1] << ": " << xstrerror());
603 debugs(54, DBG_CRITICAL, "ipcCreate: CHILD: " << prog << ": socket exchange failed");
604
605 ipcSend(cwfd, err_string, strlen(err_string));
606 goto cleanup;
607 }
608
609 x = read(p2c[0], buf1, bufSz-1);
610
611 if (x < 0) {
612 debugs(54, DBG_CRITICAL, "ipcCreate: CHILD: read FD " << p2c[0] << ": " << xstrerror());
613 debugs(54, DBG_CRITICAL, "ipcCreate: CHILD: " << prog << ": socket exchange failed");
614
615 ipcSend(cwfd, err_string, strlen(err_string));
616 goto cleanup;
617 } else if (strncmp(buf1, ok_string, strlen(ok_string))) {
618 debugs(54, DBG_CRITICAL, "ipcCreate: CHILD: " << prog << ": socket exchange failed");
619 debugs(54, DBG_CRITICAL, "--> read returned " << x);
620 buf1[x] = '\0';
621 debugs(54, DBG_CRITICAL, "--> got '" << rfc1738_escape(buf1) << "'");
622 ipcSend(cwfd, err_string, strlen(err_string));
623 goto cleanup;
624 }
625
626 x = write(c2p[1], (const char *) &PS_ipc, sizeof(PS_ipc));
627
628 if (x < (ssize_t)sizeof(PS_ipc)) {
629 debugs(54, DBG_CRITICAL, "ipcCreate: CHILD: write FD " << c2p[1] << ": " << xstrerror());
630 debugs(54, DBG_CRITICAL, "ipcCreate: CHILD: " << prog << ": socket exchange failed");
631
632 ipcSend(cwfd, err_string, strlen(err_string));
633 goto cleanup;
634 }
635
636 x = read(p2c[0], buf1, bufSz-1);
637
638 if (x < 0) {
639 debugs(54, DBG_CRITICAL, "ipcCreate: CHILD: read FD " << p2c[0] << ": " << xstrerror());
640 debugs(54, DBG_CRITICAL, "ipcCreate: CHILD: " << prog << ": socket exchange failed");
641
642 ipcSend(cwfd, err_string, strlen(err_string));
643 goto cleanup;
644 } else if (strncmp(buf1, ok_string, strlen(ok_string))) {
645 debugs(54, DBG_CRITICAL, "ipcCreate: CHILD: " << prog << ": socket exchange failed");
646 debugs(54, DBG_CRITICAL, "--> read returned " << x);
647 buf1[x] = '\0';
648 debugs(54, DBG_CRITICAL, "--> got '" << rfc1738_escape(buf1) << "'");
649 ipcSend(cwfd, err_string, strlen(err_string));
650 goto cleanup;
651 }
652
653 x = send(pwfd_ipc, (const void *)ok_string, strlen(ok_string), 0);
654 x = recv(prfd_ipc, (void *)(buf1 + 200), bufSz -1 - 200, 0);
655 assert((size_t) x == strlen(ok_string)
656 && !strncmp(ok_string, buf1 + 200, strlen(ok_string)));
657 } /* IPC_UDP_SOCKET */
658
659 snprintf(buf1, bufSz-1, "%s(%ld) CHILD socket", prog, (long int) pid);
660
661 fd_note(fd, buf1);
662
663 if (prfd_ipc != -1) {
664 snprintf(buf1, bufSz-1, "%s(%ld) <-> ipc CHILD socket", prog, (long int) pid);
665 fd_note(crfd_ipc, buf1);
666 snprintf(buf1, bufSz-1, "%s(%ld) <-> ipc PARENT socket", prog, (long int) pid);
667 fd_note(prfd_ipc, buf1);
668 }
669
670 /* else { IPC_TCP_SOCKET */
671 /* commSetNoLinger(fd); */
672 /* } */
673 thread_params.prog = prog;
674
675 thread_params.send_fd = cwfd;
676
677 thread_params.pid = pid;
678
679 if ((thread_params.type = type) == IPC_TCP_SOCKET)
680 thread_params.rfd = p2c[0];
681 else
682 thread_params.rfd = prfd_ipc;
683
684 thread = (HANDLE)_beginthreadex(NULL, 0, ipc_thread_2, &thread_params, 0, NULL);
685
686 if (!thread) {
687 debugs(54, DBG_CRITICAL, "ipcCreate: CHILD: _beginthreadex: " << xstrerror());
688 ipcSend(cwfd, err_string, strlen(err_string));
689 goto cleanup;
690 }
691
692 snprintf(buf1, bufSz-1, "%ld\n", (long int) pid);
693
694 if (-1 == ipcSend(cwfd, buf1, strlen(buf1)))
695 goto cleanup;
696
697 debugs(54, 2, "ipc(" << prog << "," << pid << "): started successfully");
698
699 /* cycle */
700 for (;;) {
701 x = recv(crfd, (void *)buf1, bufSz-1, 0);
702
703 if (x <= 0) {
704 debugs(54, 3, "ipc(" << prog << "," << pid << "): " << x << " bytes received from parent. Exiting...");
705 break;
706 }
707
708 buf1[x] = '\0';
709
710 if (type == IPC_UDP_SOCKET && !strcmp(buf1, shutdown_string)) {
711 debugs(54, 3, "ipc(" << prog << "," << pid << "): request for shutdown received from parent. Exiting...");
712
713 TerminateProcess(hProcess, 0);
714 break;
715 }
716
717 debugs(54, 5, "ipc(" << prog << "," << pid << "): received from parent: " << rfc1738_escape_unescaped(buf1));
718
719 if (type == IPC_TCP_SOCKET)
720 x = write(c2p[1], buf1, x);
721 else
722 x = send(pwfd_ipc, (const void *)buf1, x, 0);
723
724 if (x <= 0) {
725 debugs(54, 3, "ipc(" << prog << "," << pid << "): " << x << " bytes written to " << prog << ". Exiting...");
726
727 break;
728 }
729 }
730
731 retval = 0;
732
733 cleanup:
734
735 if (c2p[1] != -1)
736 close(c2p[1]);
737
738 if (fd_table[crfd].flags.open)
739 ipcCloseAllFD(-1, -1, crfd, cwfd);
740
741 if (prfd_ipc != -1) {
742 send(crfd_ipc, (const void *)shutdown_string, strlen(shutdown_string), 0);
743 shutdown(crfd_ipc, SD_BOTH);
744 shutdown(prfd_ipc, SD_BOTH);
745 }
746
747 ipcCloseAllFD(prfd_ipc, pwfd_ipc, crfd_ipc, cwfd_ipc);
748
749 if (hProcess && WAIT_OBJECT_0 !=
750 WaitForSingleObject(hProcess, type == IPC_UDP_SOCKET ? 12000 : 5000)) {
751
752 getCurrentTime();
753 debugs(54, DBG_CRITICAL, "ipc(" << prog << "," << pid << "): WARNING: " << prog <<
754 " didn't exit in " << (type == IPC_UDP_SOCKET ? 12 : 5) << " seconds.");
755
756 }
757
758 if (thread && WAIT_OBJECT_0 != WaitForSingleObject(thread, 3000)) {
759 getCurrentTime();
760 debugs(54, DBG_CRITICAL, "ipc(" << prog << "," << pid << "): WARNING: ipc_thread_2 didn't exit in 3 seconds.");
761
762 }
763
764 getCurrentTime();
765
766 if (!retval)
767 debugs(54, 2, "ipc(" << prog << "," << pid << "): normal exit");
768
769 xfree(buf1);
770 xfree(prog);
771
772 if (thread)
773 CloseHandle(thread);
774
775 if (hProcess)
776 CloseHandle(hProcess);
777
778 if (p2c[0] != -1)
779 close(p2c[0]);
780
781 return retval;
782 }
783
784 static unsigned int __stdcall
785 ipc_thread_2(void *in_params)
786 {
787 int x;
788
789 struct thread_params *params = (struct thread_params *) in_params;
790 int type = params->type;
791 int rfd = params->rfd;
792 int send_fd = params->send_fd;
793 char *prog = xstrdup(params->prog);
794 pid_t pid = params->pid;
795 const size_t bufSz = 8192;
796 char *buf2 = (char *)xcalloc(1, bufSz);
797
798 for (;;) {
799 if (type == IPC_TCP_SOCKET)
800 x = read(rfd, buf2, bufSz-1);
801 else
802 x = recv(rfd, (void *)buf2, bufSz-1, 0);
803
804 if ((x <= 0 && type == IPC_TCP_SOCKET) ||
805 (x < 0 && type == IPC_UDP_SOCKET)) {
806 debugs(54, 3, "ipc(" << prog << "," << pid << "): " << x << " bytes read from " << prog << ". Exiting...");
807
808 break;
809 }
810
811 buf2[x] = '\0';
812
813 if (type == IPC_UDP_SOCKET && !strcmp(buf2, shutdown_string)) {
814 debugs(54, 3, "ipc(" << prog << "," << pid << "): request for shutdown received. Exiting...");
815 break;
816 }
817
818 if (x >= 2) {
819 if ((buf2[x - 1] == '\n') && (buf2[x - 2] == '\r')) {
820 buf2[x - 2] = '\n';
821 buf2[x - 1] = '\0';
822 --x;
823 }
824 }
825
826 debugs(54, 5, "ipc(" << prog << "," << pid << "): received from child : " << rfc1738_escape_unescaped(buf2));
827
828 x = send(send_fd, (const void *)buf2, x, 0);
829
830 if ((x <= 0 && type == IPC_TCP_SOCKET) ||
831 (x < 0 && type == IPC_UDP_SOCKET)) {
832 debugs(54, 3, "ipc(" << prog << "," << pid << "): " << x << " bytes sent to parent. Exiting...");
833
834 break;
835 }
836 }
837
838 xfree(prog);
839 xfree(buf2);
840 return 0;
841 }