]>
Commit | Line | Data |
---|---|---|
7cd5dbca | 1 | #include "cache.h" |
f394e093 | 2 | #include "gettext.h" |
7cd5dbca JH |
3 | #include "simple-ipc.h" |
4 | #include "strbuf.h" | |
5 | #include "pkt-line.h" | |
6 | #include "thread-utils.h" | |
7 | #include "unix-socket.h" | |
8 | #include "unix-stream-server.h" | |
9 | ||
6aac70a8 JH |
10 | #ifndef SUPPORTS_SIMPLE_IPC |
11 | /* | |
12 | * This source file should only be compiled when Simple IPC is supported. | |
13 | * See the top-level Makefile. | |
14 | */ | |
15 | #error SUPPORTS_SIMPLE_IPC not defined | |
7cd5dbca JH |
16 | #endif |
17 | ||
18 | enum ipc_active_state ipc_get_active_state(const char *path) | |
19 | { | |
20 | enum ipc_active_state state = IPC_STATE__OTHER_ERROR; | |
21 | struct ipc_client_connect_options options | |
22 | = IPC_CLIENT_CONNECT_OPTIONS_INIT; | |
23 | struct stat st; | |
24 | struct ipc_client_connection *connection_test = NULL; | |
25 | ||
26 | options.wait_if_busy = 0; | |
27 | options.wait_if_not_found = 0; | |
28 | ||
29 | if (lstat(path, &st) == -1) { | |
30 | switch (errno) { | |
31 | case ENOENT: | |
32 | case ENOTDIR: | |
33 | return IPC_STATE__NOT_LISTENING; | |
34 | default: | |
35 | return IPC_STATE__INVALID_PATH; | |
36 | } | |
37 | } | |
38 | ||
974ef7ce JS |
39 | #ifdef __CYGWIN__ |
40 | /* | |
41 | * Cygwin emulates Unix sockets by writing special-crafted files whose | |
42 | * `system` bit is set. | |
43 | * | |
44 | * If we are too fast, Cygwin might still be in the process of marking | |
45 | * the underlying file as a system file. Until then, we will not see a | |
46 | * Unix socket here, but a plain file instead. Just in case that this | |
47 | * is happening, wait a little and try again. | |
48 | */ | |
49 | { | |
50 | static const int delay[] = { 1, 10, 20, 40, -1 }; | |
51 | int i; | |
52 | ||
53 | for (i = 0; S_ISREG(st.st_mode) && delay[i] > 0; i++) { | |
54 | sleep_millisec(delay[i]); | |
55 | if (lstat(path, &st) == -1) | |
56 | return IPC_STATE__INVALID_PATH; | |
57 | } | |
58 | } | |
59 | #endif | |
60 | ||
7cd5dbca JH |
61 | /* also complain if a plain file is in the way */ |
62 | if ((st.st_mode & S_IFMT) != S_IFSOCK) | |
63 | return IPC_STATE__INVALID_PATH; | |
64 | ||
65 | /* | |
66 | * Just because the filesystem has a S_IFSOCK type inode | |
67 | * at `path`, doesn't mean it that there is a server listening. | |
68 | * Ping it to be sure. | |
69 | */ | |
70 | state = ipc_client_try_connect(path, &options, &connection_test); | |
71 | ipc_client_close_connection(connection_test); | |
72 | ||
73 | return state; | |
74 | } | |
75 | ||
76 | /* | |
77 | * Retry frequency when trying to connect to a server. | |
78 | * | |
79 | * This value should be short enough that we don't seriously delay our | |
80 | * caller, but not fast enough that our spinning puts pressure on the | |
81 | * system. | |
82 | */ | |
83 | #define WAIT_STEP_MS (50) | |
84 | ||
85 | /* | |
86 | * Try to connect to the server. If the server is just starting up or | |
87 | * is very busy, we may not get a connection the first time. | |
88 | */ | |
89 | static enum ipc_active_state connect_to_server( | |
90 | const char *path, | |
91 | int timeout_ms, | |
92 | const struct ipc_client_connect_options *options, | |
93 | int *pfd) | |
94 | { | |
95 | int k; | |
96 | ||
97 | *pfd = -1; | |
98 | ||
99 | for (k = 0; k < timeout_ms; k += WAIT_STEP_MS) { | |
100 | int fd = unix_stream_connect(path, options->uds_disallow_chdir); | |
101 | ||
102 | if (fd != -1) { | |
103 | *pfd = fd; | |
104 | return IPC_STATE__LISTENING; | |
105 | } | |
106 | ||
107 | if (errno == ENOENT) { | |
108 | if (!options->wait_if_not_found) | |
109 | return IPC_STATE__PATH_NOT_FOUND; | |
110 | ||
111 | goto sleep_and_try_again; | |
112 | } | |
113 | ||
114 | if (errno == ETIMEDOUT) { | |
115 | if (!options->wait_if_busy) | |
116 | return IPC_STATE__NOT_LISTENING; | |
117 | ||
118 | goto sleep_and_try_again; | |
119 | } | |
120 | ||
121 | if (errno == ECONNREFUSED) { | |
122 | if (!options->wait_if_busy) | |
123 | return IPC_STATE__NOT_LISTENING; | |
124 | ||
125 | goto sleep_and_try_again; | |
126 | } | |
127 | ||
128 | return IPC_STATE__OTHER_ERROR; | |
129 | ||
130 | sleep_and_try_again: | |
131 | sleep_millisec(WAIT_STEP_MS); | |
132 | } | |
133 | ||
134 | return IPC_STATE__NOT_LISTENING; | |
135 | } | |
136 | ||
137 | /* | |
138 | * The total amount of time that we are willing to wait when trying to | |
139 | * connect to a server. | |
140 | * | |
141 | * When the server is first started, it might take a little while for | |
142 | * it to become ready to service requests. Likewise, the server may | |
143 | * be very (temporarily) busy and not respond to our connections. | |
144 | * | |
145 | * We should gracefully and silently handle those conditions and try | |
146 | * again for a reasonable time period. | |
147 | * | |
148 | * The value chosen here should be long enough for the server | |
149 | * to reliably heal from the above conditions. | |
150 | */ | |
151 | #define MY_CONNECTION_TIMEOUT_MS (1000) | |
152 | ||
153 | enum ipc_active_state ipc_client_try_connect( | |
154 | const char *path, | |
155 | const struct ipc_client_connect_options *options, | |
156 | struct ipc_client_connection **p_connection) | |
157 | { | |
158 | enum ipc_active_state state = IPC_STATE__OTHER_ERROR; | |
159 | int fd = -1; | |
160 | ||
161 | *p_connection = NULL; | |
162 | ||
163 | trace2_region_enter("ipc-client", "try-connect", NULL); | |
164 | trace2_data_string("ipc-client", NULL, "try-connect/path", path); | |
165 | ||
166 | state = connect_to_server(path, MY_CONNECTION_TIMEOUT_MS, | |
167 | options, &fd); | |
168 | ||
169 | trace2_data_intmax("ipc-client", NULL, "try-connect/state", | |
170 | (intmax_t)state); | |
171 | trace2_region_leave("ipc-client", "try-connect", NULL); | |
172 | ||
173 | if (state == IPC_STATE__LISTENING) { | |
174 | (*p_connection) = xcalloc(1, sizeof(struct ipc_client_connection)); | |
175 | (*p_connection)->fd = fd; | |
176 | } | |
177 | ||
178 | return state; | |
179 | } | |
180 | ||
181 | void ipc_client_close_connection(struct ipc_client_connection *connection) | |
182 | { | |
183 | if (!connection) | |
184 | return; | |
185 | ||
186 | if (connection->fd != -1) | |
187 | close(connection->fd); | |
188 | ||
189 | free(connection); | |
190 | } | |
191 | ||
192 | int ipc_client_send_command_to_connection( | |
193 | struct ipc_client_connection *connection, | |
a3e2033e JH |
194 | const char *message, size_t message_len, |
195 | struct strbuf *answer) | |
7cd5dbca JH |
196 | { |
197 | int ret = 0; | |
198 | ||
199 | strbuf_setlen(answer, 0); | |
200 | ||
201 | trace2_region_enter("ipc-client", "send-command", NULL); | |
202 | ||
a3e2033e | 203 | if (write_packetized_from_buf_no_flush(message, message_len, |
7cd5dbca JH |
204 | connection->fd) < 0 || |
205 | packet_flush_gently(connection->fd) < 0) { | |
206 | ret = error(_("could not send IPC command")); | |
207 | goto done; | |
208 | } | |
209 | ||
210 | if (read_packetized_to_strbuf( | |
211 | connection->fd, answer, | |
212 | PACKET_READ_GENTLE_ON_EOF | PACKET_READ_GENTLE_ON_READ_ERROR) < 0) { | |
213 | ret = error(_("could not read IPC response")); | |
214 | goto done; | |
215 | } | |
216 | ||
217 | done: | |
218 | trace2_region_leave("ipc-client", "send-command", NULL); | |
219 | return ret; | |
220 | } | |
221 | ||
222 | int ipc_client_send_command(const char *path, | |
223 | const struct ipc_client_connect_options *options, | |
a3e2033e JH |
224 | const char *message, size_t message_len, |
225 | struct strbuf *answer) | |
7cd5dbca JH |
226 | { |
227 | int ret = -1; | |
228 | enum ipc_active_state state; | |
229 | struct ipc_client_connection *connection = NULL; | |
230 | ||
231 | state = ipc_client_try_connect(path, options, &connection); | |
232 | ||
233 | if (state != IPC_STATE__LISTENING) | |
234 | return ret; | |
235 | ||
a3e2033e JH |
236 | ret = ipc_client_send_command_to_connection(connection, |
237 | message, message_len, | |
238 | answer); | |
7cd5dbca JH |
239 | |
240 | ipc_client_close_connection(connection); | |
241 | ||
242 | return ret; | |
243 | } | |
244 | ||
245 | static int set_socket_blocking_flag(int fd, int make_nonblocking) | |
246 | { | |
247 | int flags; | |
248 | ||
249 | flags = fcntl(fd, F_GETFL, NULL); | |
250 | ||
251 | if (flags < 0) | |
252 | return -1; | |
253 | ||
254 | if (make_nonblocking) | |
255 | flags |= O_NONBLOCK; | |
256 | else | |
257 | flags &= ~O_NONBLOCK; | |
258 | ||
259 | return fcntl(fd, F_SETFL, flags); | |
260 | } | |
261 | ||
262 | /* | |
263 | * Magic numbers used to annotate callback instance data. | |
264 | * These are used to help guard against accidentally passing the | |
265 | * wrong instance data across multiple levels of callbacks (which | |
266 | * is easy to do if there are `void*` arguments). | |
267 | */ | |
268 | enum magic { | |
269 | MAGIC_SERVER_REPLY_DATA, | |
270 | MAGIC_WORKER_THREAD_DATA, | |
271 | MAGIC_ACCEPT_THREAD_DATA, | |
272 | MAGIC_SERVER_DATA, | |
273 | }; | |
274 | ||
275 | struct ipc_server_reply_data { | |
276 | enum magic magic; | |
277 | int fd; | |
278 | struct ipc_worker_thread_data *worker_thread_data; | |
279 | }; | |
280 | ||
281 | struct ipc_worker_thread_data { | |
282 | enum magic magic; | |
283 | struct ipc_worker_thread_data *next_thread; | |
284 | struct ipc_server_data *server_data; | |
285 | pthread_t pthread_id; | |
286 | }; | |
287 | ||
288 | struct ipc_accept_thread_data { | |
289 | enum magic magic; | |
290 | struct ipc_server_data *server_data; | |
291 | ||
292 | struct unix_ss_socket *server_socket; | |
293 | ||
294 | int fd_send_shutdown; | |
295 | int fd_wait_shutdown; | |
296 | pthread_t pthread_id; | |
297 | }; | |
298 | ||
299 | /* | |
300 | * With unix-sockets, the conceptual "ipc-server" is implemented as a single | |
301 | * controller "accept-thread" thread and a pool of "worker-thread" threads. | |
302 | * The former does the usual `accept()` loop and dispatches connections | |
303 | * to an idle worker thread. The worker threads wait in an idle loop for | |
304 | * a new connection, communicate with the client and relay data to/from | |
305 | * the `application_cb` and then wait for another connection from the | |
306 | * server thread. This avoids the overhead of constantly creating and | |
307 | * destroying threads. | |
308 | */ | |
309 | struct ipc_server_data { | |
310 | enum magic magic; | |
311 | ipc_server_application_cb *application_cb; | |
312 | void *application_data; | |
313 | struct strbuf buf_path; | |
314 | ||
315 | struct ipc_accept_thread_data *accept_thread; | |
316 | struct ipc_worker_thread_data *worker_thread_list; | |
317 | ||
318 | pthread_mutex_t work_available_mutex; | |
319 | pthread_cond_t work_available_cond; | |
320 | ||
321 | /* | |
322 | * Accepted but not yet processed client connections are kept | |
323 | * in a circular buffer FIFO. The queue is empty when the | |
324 | * positions are equal. | |
325 | */ | |
326 | int *fifo_fds; | |
327 | int queue_size; | |
328 | int back_pos; | |
329 | int front_pos; | |
330 | ||
331 | int shutdown_requested; | |
332 | int is_stopped; | |
333 | }; | |
334 | ||
335 | /* | |
336 | * Remove and return the oldest queued connection. | |
337 | * | |
338 | * Returns -1 if empty. | |
339 | */ | |
340 | static int fifo_dequeue(struct ipc_server_data *server_data) | |
341 | { | |
342 | /* ASSERT holding mutex */ | |
343 | ||
344 | int fd; | |
345 | ||
346 | if (server_data->back_pos == server_data->front_pos) | |
347 | return -1; | |
348 | ||
349 | fd = server_data->fifo_fds[server_data->front_pos]; | |
350 | server_data->fifo_fds[server_data->front_pos] = -1; | |
351 | ||
352 | server_data->front_pos++; | |
353 | if (server_data->front_pos == server_data->queue_size) | |
354 | server_data->front_pos = 0; | |
355 | ||
356 | return fd; | |
357 | } | |
358 | ||
359 | /* | |
360 | * Push a new fd onto the back of the queue. | |
361 | * | |
362 | * Drop it and return -1 if queue is already full. | |
363 | */ | |
364 | static int fifo_enqueue(struct ipc_server_data *server_data, int fd) | |
365 | { | |
366 | /* ASSERT holding mutex */ | |
367 | ||
368 | int next_back_pos; | |
369 | ||
370 | next_back_pos = server_data->back_pos + 1; | |
371 | if (next_back_pos == server_data->queue_size) | |
372 | next_back_pos = 0; | |
373 | ||
374 | if (next_back_pos == server_data->front_pos) { | |
375 | /* Queue is full. Just drop it. */ | |
376 | close(fd); | |
377 | return -1; | |
378 | } | |
379 | ||
380 | server_data->fifo_fds[server_data->back_pos] = fd; | |
381 | server_data->back_pos = next_back_pos; | |
382 | ||
383 | return fd; | |
384 | } | |
385 | ||
386 | /* | |
387 | * Wait for a connection to be queued to the FIFO and return it. | |
388 | * | |
389 | * Returns -1 if someone has already requested a shutdown. | |
390 | */ | |
391 | static int worker_thread__wait_for_connection( | |
392 | struct ipc_worker_thread_data *worker_thread_data) | |
393 | { | |
394 | /* ASSERT NOT holding mutex */ | |
395 | ||
396 | struct ipc_server_data *server_data = worker_thread_data->server_data; | |
397 | int fd = -1; | |
398 | ||
399 | pthread_mutex_lock(&server_data->work_available_mutex); | |
400 | for (;;) { | |
401 | if (server_data->shutdown_requested) | |
402 | break; | |
403 | ||
404 | fd = fifo_dequeue(server_data); | |
405 | if (fd >= 0) | |
406 | break; | |
407 | ||
408 | pthread_cond_wait(&server_data->work_available_cond, | |
409 | &server_data->work_available_mutex); | |
410 | } | |
411 | pthread_mutex_unlock(&server_data->work_available_mutex); | |
412 | ||
413 | return fd; | |
414 | } | |
415 | ||
416 | /* | |
417 | * Forward declare our reply callback function so that any compiler | |
418 | * errors are reported when we actually define the function (in addition | |
419 | * to any errors reported when we try to pass this callback function as | |
420 | * a parameter in a function call). The former are easier to understand. | |
421 | */ | |
422 | static ipc_server_reply_cb do_io_reply_callback; | |
423 | ||
424 | /* | |
425 | * Relay application's response message to the client process. | |
426 | * (We do not flush at this point because we allow the caller | |
427 | * to chunk data to the client thru us.) | |
428 | */ | |
429 | static int do_io_reply_callback(struct ipc_server_reply_data *reply_data, | |
430 | const char *response, size_t response_len) | |
431 | { | |
432 | if (reply_data->magic != MAGIC_SERVER_REPLY_DATA) | |
433 | BUG("reply_cb called with wrong instance data"); | |
434 | ||
435 | return write_packetized_from_buf_no_flush(response, response_len, | |
436 | reply_data->fd); | |
437 | } | |
438 | ||
439 | /* A randomly chosen value. */ | |
440 | #define MY_WAIT_POLL_TIMEOUT_MS (10) | |
441 | ||
442 | /* | |
443 | * If the client hangs up without sending any data on the wire, just | |
444 | * quietly close the socket and ignore this client. | |
445 | * | |
446 | * This worker thread is committed to reading the IPC request data | |
447 | * from the client at the other end of this fd. Wait here for the | |
448 | * client to actually put something on the wire -- because if the | |
449 | * client just does a ping (connect and hangup without sending any | |
450 | * data), our use of the pkt-line read routines will spew an error | |
451 | * message. | |
452 | * | |
453 | * Return -1 if the client hung up. | |
454 | * Return 0 if data (possibly incomplete) is ready. | |
455 | */ | |
456 | static int worker_thread__wait_for_io_start( | |
457 | struct ipc_worker_thread_data *worker_thread_data, | |
458 | int fd) | |
459 | { | |
460 | struct ipc_server_data *server_data = worker_thread_data->server_data; | |
461 | struct pollfd pollfd[1]; | |
462 | int result; | |
463 | ||
464 | for (;;) { | |
465 | pollfd[0].fd = fd; | |
466 | pollfd[0].events = POLLIN; | |
467 | ||
468 | result = poll(pollfd, 1, MY_WAIT_POLL_TIMEOUT_MS); | |
469 | if (result < 0) { | |
470 | if (errno == EINTR) | |
471 | continue; | |
472 | goto cleanup; | |
473 | } | |
474 | ||
475 | if (result == 0) { | |
476 | /* a timeout */ | |
477 | ||
478 | int in_shutdown; | |
479 | ||
480 | pthread_mutex_lock(&server_data->work_available_mutex); | |
481 | in_shutdown = server_data->shutdown_requested; | |
482 | pthread_mutex_unlock(&server_data->work_available_mutex); | |
483 | ||
484 | /* | |
485 | * If a shutdown is already in progress and this | |
486 | * client has not started talking yet, just drop it. | |
487 | */ | |
488 | if (in_shutdown) | |
489 | goto cleanup; | |
490 | continue; | |
491 | } | |
492 | ||
493 | if (pollfd[0].revents & POLLHUP) | |
494 | goto cleanup; | |
495 | ||
496 | if (pollfd[0].revents & POLLIN) | |
497 | return 0; | |
498 | ||
499 | goto cleanup; | |
500 | } | |
501 | ||
502 | cleanup: | |
503 | close(fd); | |
504 | return -1; | |
505 | } | |
506 | ||
507 | /* | |
508 | * Receive the request/command from the client and pass it to the | |
509 | * registered request-callback. The request-callback will compose | |
510 | * a response and call our reply-callback to send it to the client. | |
511 | */ | |
512 | static int worker_thread__do_io( | |
513 | struct ipc_worker_thread_data *worker_thread_data, | |
514 | int fd) | |
515 | { | |
516 | /* ASSERT NOT holding lock */ | |
517 | ||
518 | struct strbuf buf = STRBUF_INIT; | |
519 | struct ipc_server_reply_data reply_data; | |
520 | int ret = 0; | |
521 | ||
522 | reply_data.magic = MAGIC_SERVER_REPLY_DATA; | |
523 | reply_data.worker_thread_data = worker_thread_data; | |
524 | ||
525 | reply_data.fd = fd; | |
526 | ||
527 | ret = read_packetized_to_strbuf( | |
528 | reply_data.fd, &buf, | |
529 | PACKET_READ_GENTLE_ON_EOF | PACKET_READ_GENTLE_ON_READ_ERROR); | |
530 | if (ret >= 0) { | |
531 | ret = worker_thread_data->server_data->application_cb( | |
532 | worker_thread_data->server_data->application_data, | |
a3e2033e | 533 | buf.buf, buf.len, do_io_reply_callback, &reply_data); |
7cd5dbca JH |
534 | |
535 | packet_flush_gently(reply_data.fd); | |
536 | } | |
537 | else { | |
538 | /* | |
539 | * The client probably disconnected/shutdown before it | |
540 | * could send a well-formed message. Ignore it. | |
541 | */ | |
542 | } | |
543 | ||
544 | strbuf_release(&buf); | |
545 | close(reply_data.fd); | |
546 | ||
547 | return ret; | |
548 | } | |
549 | ||
550 | /* | |
551 | * Block SIGPIPE on the current thread (so that we get EPIPE from | |
552 | * write() rather than an actual signal). | |
553 | * | |
554 | * Note that using sigchain_push() and _pop() to control SIGPIPE | |
555 | * around our IO calls is not thread safe: | |
556 | * [] It uses a global stack of handler frames. | |
557 | * [] It uses ALLOC_GROW() to resize it. | |
558 | * [] Finally, according to the `signal(2)` man-page: | |
559 | * "The effects of `signal()` in a multithreaded process are unspecified." | |
560 | */ | |
561 | static void thread_block_sigpipe(sigset_t *old_set) | |
562 | { | |
563 | sigset_t new_set; | |
564 | ||
565 | sigemptyset(&new_set); | |
566 | sigaddset(&new_set, SIGPIPE); | |
567 | ||
568 | sigemptyset(old_set); | |
569 | pthread_sigmask(SIG_BLOCK, &new_set, old_set); | |
570 | } | |
571 | ||
572 | /* | |
573 | * Thread proc for an IPC worker thread. It handles a series of | |
574 | * connections from clients. It pulls the next fd from the queue | |
575 | * processes it, and then waits for the next client. | |
576 | * | |
577 | * Block SIGPIPE in this worker thread for the life of the thread. | |
578 | * This avoids stray (and sometimes delayed) SIGPIPE signals caused | |
579 | * by client errors and/or when we are under extremely heavy IO load. | |
580 | * | |
581 | * This means that the application callback will have SIGPIPE blocked. | |
582 | * The callback should not change it. | |
583 | */ | |
584 | static void *worker_thread_proc(void *_worker_thread_data) | |
585 | { | |
586 | struct ipc_worker_thread_data *worker_thread_data = _worker_thread_data; | |
587 | struct ipc_server_data *server_data = worker_thread_data->server_data; | |
588 | sigset_t old_set; | |
589 | int fd, io; | |
590 | int ret; | |
591 | ||
592 | trace2_thread_start("ipc-worker"); | |
593 | ||
594 | thread_block_sigpipe(&old_set); | |
595 | ||
596 | for (;;) { | |
597 | fd = worker_thread__wait_for_connection(worker_thread_data); | |
598 | if (fd == -1) | |
599 | break; /* in shutdown */ | |
600 | ||
601 | io = worker_thread__wait_for_io_start(worker_thread_data, fd); | |
602 | if (io == -1) | |
603 | continue; /* client hung up without sending anything */ | |
604 | ||
605 | ret = worker_thread__do_io(worker_thread_data, fd); | |
606 | ||
607 | if (ret == SIMPLE_IPC_QUIT) { | |
608 | trace2_data_string("ipc-worker", NULL, "queue_stop_async", | |
609 | "application_quit"); | |
610 | /* | |
611 | * The application layer is telling the ipc-server | |
612 | * layer to shutdown. | |
613 | * | |
614 | * We DO NOT have a response to send to the client. | |
615 | * | |
616 | * Queue an async stop (to stop the other threads) and | |
617 | * allow this worker thread to exit now (no sense waiting | |
618 | * for the thread-pool shutdown signal). | |
619 | * | |
620 | * Other non-idle worker threads are allowed to finish | |
621 | * responding to their current clients. | |
622 | */ | |
623 | ipc_server_stop_async(server_data); | |
624 | break; | |
625 | } | |
626 | } | |
627 | ||
628 | trace2_thread_exit(); | |
629 | return NULL; | |
630 | } | |
631 | ||
632 | /* A randomly chosen value. */ | |
633 | #define MY_ACCEPT_POLL_TIMEOUT_MS (60 * 1000) | |
634 | ||
635 | /* | |
636 | * Accept a new client connection on our socket. This uses non-blocking | |
637 | * IO so that we can also wait for shutdown requests on our socket-pair | |
638 | * without actually spinning on a fast timeout. | |
639 | */ | |
640 | static int accept_thread__wait_for_connection( | |
641 | struct ipc_accept_thread_data *accept_thread_data) | |
642 | { | |
643 | struct pollfd pollfd[2]; | |
644 | int result; | |
645 | ||
646 | for (;;) { | |
647 | pollfd[0].fd = accept_thread_data->fd_wait_shutdown; | |
648 | pollfd[0].events = POLLIN; | |
649 | ||
650 | pollfd[1].fd = accept_thread_data->server_socket->fd_socket; | |
651 | pollfd[1].events = POLLIN; | |
652 | ||
653 | result = poll(pollfd, 2, MY_ACCEPT_POLL_TIMEOUT_MS); | |
654 | if (result < 0) { | |
655 | if (errno == EINTR) | |
656 | continue; | |
657 | return result; | |
658 | } | |
659 | ||
660 | if (result == 0) { | |
661 | /* a timeout */ | |
662 | ||
663 | /* | |
664 | * If someone deletes or force-creates a new unix | |
665 | * domain socket at our path, all future clients | |
666 | * will be routed elsewhere and we silently starve. | |
667 | * If that happens, just queue a shutdown. | |
668 | */ | |
669 | if (unix_ss_was_stolen( | |
670 | accept_thread_data->server_socket)) { | |
671 | trace2_data_string("ipc-accept", NULL, | |
672 | "queue_stop_async", | |
673 | "socket_stolen"); | |
674 | ipc_server_stop_async( | |
675 | accept_thread_data->server_data); | |
676 | } | |
677 | continue; | |
678 | } | |
679 | ||
680 | if (pollfd[0].revents & POLLIN) { | |
681 | /* shutdown message queued to socketpair */ | |
682 | return -1; | |
683 | } | |
684 | ||
685 | if (pollfd[1].revents & POLLIN) { | |
686 | /* a connection is available on server_socket */ | |
687 | ||
688 | int client_fd = | |
689 | accept(accept_thread_data->server_socket->fd_socket, | |
690 | NULL, NULL); | |
691 | if (client_fd >= 0) | |
692 | return client_fd; | |
693 | ||
694 | /* | |
695 | * An error here is unlikely -- it probably | |
696 | * indicates that the connecting process has | |
697 | * already dropped the connection. | |
698 | */ | |
699 | continue; | |
700 | } | |
701 | ||
702 | BUG("unandled poll result errno=%d r[0]=%d r[1]=%d", | |
703 | errno, pollfd[0].revents, pollfd[1].revents); | |
704 | } | |
705 | } | |
706 | ||
707 | /* | |
708 | * Thread proc for the IPC server "accept thread". This waits for | |
709 | * an incoming socket connection, appends it to the queue of available | |
710 | * connections, and notifies a worker thread to process it. | |
711 | * | |
712 | * Block SIGPIPE in this thread for the life of the thread. This | |
713 | * avoids any stray SIGPIPE signals when closing pipe fds under | |
714 | * extremely heavy loads (such as when the fifo queue is full and we | |
715 | * drop incomming connections). | |
716 | */ | |
717 | static void *accept_thread_proc(void *_accept_thread_data) | |
718 | { | |
719 | struct ipc_accept_thread_data *accept_thread_data = _accept_thread_data; | |
720 | struct ipc_server_data *server_data = accept_thread_data->server_data; | |
721 | sigset_t old_set; | |
722 | ||
723 | trace2_thread_start("ipc-accept"); | |
724 | ||
725 | thread_block_sigpipe(&old_set); | |
726 | ||
727 | for (;;) { | |
728 | int client_fd = accept_thread__wait_for_connection( | |
729 | accept_thread_data); | |
730 | ||
731 | pthread_mutex_lock(&server_data->work_available_mutex); | |
732 | if (server_data->shutdown_requested) { | |
733 | pthread_mutex_unlock(&server_data->work_available_mutex); | |
734 | if (client_fd >= 0) | |
735 | close(client_fd); | |
736 | break; | |
737 | } | |
738 | ||
739 | if (client_fd < 0) { | |
740 | /* ignore transient accept() errors */ | |
741 | } | |
742 | else { | |
743 | fifo_enqueue(server_data, client_fd); | |
744 | pthread_cond_broadcast(&server_data->work_available_cond); | |
745 | } | |
746 | pthread_mutex_unlock(&server_data->work_available_mutex); | |
747 | } | |
748 | ||
749 | trace2_thread_exit(); | |
750 | return NULL; | |
751 | } | |
752 | ||
753 | /* | |
754 | * We can't predict the connection arrival rate relative to the worker | |
755 | * processing rate, therefore we allow the "accept-thread" to queue up | |
756 | * a generous number of connections, since we'd rather have the client | |
757 | * not unnecessarily timeout if we can avoid it. (The assumption is | |
758 | * that this will be used for FSMonitor and a few second wait on a | |
759 | * connection is better than having the client timeout and do the full | |
760 | * computation itself.) | |
761 | * | |
762 | * The FIFO queue size is set to a multiple of the worker pool size. | |
763 | * This value chosen at random. | |
764 | */ | |
765 | #define FIFO_SCALE (100) | |
766 | ||
767 | /* | |
768 | * The backlog value for `listen(2)`. This doesn't need to huge, | |
769 | * rather just large enough for our "accept-thread" to wake up and | |
770 | * queue incoming connections onto the FIFO without the kernel | |
771 | * dropping any. | |
772 | * | |
773 | * This value chosen at random. | |
774 | */ | |
775 | #define LISTEN_BACKLOG (50) | |
776 | ||
777 | static int create_listener_socket( | |
778 | const char *path, | |
779 | const struct ipc_server_opts *ipc_opts, | |
780 | struct unix_ss_socket **new_server_socket) | |
781 | { | |
782 | struct unix_ss_socket *server_socket = NULL; | |
783 | struct unix_stream_listen_opts uslg_opts = UNIX_STREAM_LISTEN_OPTS_INIT; | |
784 | int ret; | |
785 | ||
786 | uslg_opts.listen_backlog_size = LISTEN_BACKLOG; | |
787 | uslg_opts.disallow_chdir = ipc_opts->uds_disallow_chdir; | |
788 | ||
789 | ret = unix_ss_create(path, &uslg_opts, -1, &server_socket); | |
790 | if (ret) | |
791 | return ret; | |
792 | ||
793 | if (set_socket_blocking_flag(server_socket->fd_socket, 1)) { | |
794 | int saved_errno = errno; | |
795 | unix_ss_free(server_socket); | |
796 | errno = saved_errno; | |
797 | return -1; | |
798 | } | |
799 | ||
800 | *new_server_socket = server_socket; | |
801 | ||
802 | trace2_data_string("ipc-server", NULL, "listen-with-lock", path); | |
803 | return 0; | |
804 | } | |
805 | ||
806 | static int setup_listener_socket( | |
807 | const char *path, | |
808 | const struct ipc_server_opts *ipc_opts, | |
809 | struct unix_ss_socket **new_server_socket) | |
810 | { | |
811 | int ret, saved_errno; | |
812 | ||
813 | trace2_region_enter("ipc-server", "create-listener_socket", NULL); | |
814 | ||
815 | ret = create_listener_socket(path, ipc_opts, new_server_socket); | |
816 | ||
817 | saved_errno = errno; | |
818 | trace2_region_leave("ipc-server", "create-listener_socket", NULL); | |
819 | errno = saved_errno; | |
820 | ||
821 | return ret; | |
822 | } | |
823 | ||
824 | /* | |
825 | * Start IPC server in a pool of background threads. | |
826 | */ | |
827 | int ipc_server_run_async(struct ipc_server_data **returned_server_data, | |
828 | const char *path, const struct ipc_server_opts *opts, | |
829 | ipc_server_application_cb *application_cb, | |
830 | void *application_data) | |
831 | { | |
832 | struct unix_ss_socket *server_socket = NULL; | |
833 | struct ipc_server_data *server_data; | |
834 | int sv[2]; | |
835 | int k; | |
836 | int ret; | |
837 | int nr_threads = opts->nr_threads; | |
838 | ||
839 | *returned_server_data = NULL; | |
840 | ||
841 | /* | |
842 | * Create a socketpair and set sv[1] to non-blocking. This | |
843 | * will used to send a shutdown message to the accept-thread | |
844 | * and allows the accept-thread to wait on EITHER a client | |
845 | * connection or a shutdown request without spinning. | |
846 | */ | |
847 | if (socketpair(AF_UNIX, SOCK_STREAM, 0, sv) < 0) | |
848 | return -1; | |
849 | ||
850 | if (set_socket_blocking_flag(sv[1], 1)) { | |
851 | int saved_errno = errno; | |
852 | close(sv[0]); | |
853 | close(sv[1]); | |
854 | errno = saved_errno; | |
855 | return -1; | |
856 | } | |
857 | ||
858 | ret = setup_listener_socket(path, opts, &server_socket); | |
859 | if (ret) { | |
860 | int saved_errno = errno; | |
861 | close(sv[0]); | |
862 | close(sv[1]); | |
863 | errno = saved_errno; | |
864 | return ret; | |
865 | } | |
866 | ||
867 | server_data = xcalloc(1, sizeof(*server_data)); | |
868 | server_data->magic = MAGIC_SERVER_DATA; | |
869 | server_data->application_cb = application_cb; | |
870 | server_data->application_data = application_data; | |
871 | strbuf_init(&server_data->buf_path, 0); | |
872 | strbuf_addstr(&server_data->buf_path, path); | |
873 | ||
874 | if (nr_threads < 1) | |
875 | nr_threads = 1; | |
876 | ||
877 | pthread_mutex_init(&server_data->work_available_mutex, NULL); | |
878 | pthread_cond_init(&server_data->work_available_cond, NULL); | |
879 | ||
880 | server_data->queue_size = nr_threads * FIFO_SCALE; | |
881 | CALLOC_ARRAY(server_data->fifo_fds, server_data->queue_size); | |
882 | ||
883 | server_data->accept_thread = | |
884 | xcalloc(1, sizeof(*server_data->accept_thread)); | |
885 | server_data->accept_thread->magic = MAGIC_ACCEPT_THREAD_DATA; | |
886 | server_data->accept_thread->server_data = server_data; | |
887 | server_data->accept_thread->server_socket = server_socket; | |
888 | server_data->accept_thread->fd_send_shutdown = sv[0]; | |
889 | server_data->accept_thread->fd_wait_shutdown = sv[1]; | |
890 | ||
891 | if (pthread_create(&server_data->accept_thread->pthread_id, NULL, | |
892 | accept_thread_proc, server_data->accept_thread)) | |
893 | die_errno(_("could not start accept_thread '%s'"), path); | |
894 | ||
895 | for (k = 0; k < nr_threads; k++) { | |
896 | struct ipc_worker_thread_data *wtd; | |
897 | ||
898 | wtd = xcalloc(1, sizeof(*wtd)); | |
899 | wtd->magic = MAGIC_WORKER_THREAD_DATA; | |
900 | wtd->server_data = server_data; | |
901 | ||
902 | if (pthread_create(&wtd->pthread_id, NULL, worker_thread_proc, | |
903 | wtd)) { | |
904 | if (k == 0) | |
905 | die(_("could not start worker[0] for '%s'"), | |
906 | path); | |
907 | /* | |
908 | * Limp along with the thread pool that we have. | |
909 | */ | |
910 | break; | |
911 | } | |
912 | ||
913 | wtd->next_thread = server_data->worker_thread_list; | |
914 | server_data->worker_thread_list = wtd; | |
915 | } | |
916 | ||
917 | *returned_server_data = server_data; | |
918 | return 0; | |
919 | } | |
920 | ||
921 | /* | |
922 | * Gently tell the IPC server treads to shutdown. | |
923 | * Can be run on any thread. | |
924 | */ | |
925 | int ipc_server_stop_async(struct ipc_server_data *server_data) | |
926 | { | |
927 | /* ASSERT NOT holding mutex */ | |
928 | ||
929 | int fd; | |
930 | ||
931 | if (!server_data) | |
932 | return 0; | |
933 | ||
934 | trace2_region_enter("ipc-server", "server-stop-async", NULL); | |
935 | ||
936 | pthread_mutex_lock(&server_data->work_available_mutex); | |
937 | ||
938 | server_data->shutdown_requested = 1; | |
939 | ||
940 | /* | |
941 | * Write a byte to the shutdown socket pair to wake up the | |
942 | * accept-thread. | |
943 | */ | |
944 | if (write(server_data->accept_thread->fd_send_shutdown, "Q", 1) < 0) | |
945 | error_errno("could not write to fd_send_shutdown"); | |
946 | ||
947 | /* | |
948 | * Drain the queue of existing connections. | |
949 | */ | |
950 | while ((fd = fifo_dequeue(server_data)) != -1) | |
951 | close(fd); | |
952 | ||
953 | /* | |
954 | * Gently tell worker threads to stop processing new connections | |
955 | * and exit. (This does not abort in-process conversations.) | |
956 | */ | |
957 | pthread_cond_broadcast(&server_data->work_available_cond); | |
958 | ||
959 | pthread_mutex_unlock(&server_data->work_available_mutex); | |
960 | ||
961 | trace2_region_leave("ipc-server", "server-stop-async", NULL); | |
962 | ||
963 | return 0; | |
964 | } | |
965 | ||
966 | /* | |
967 | * Wait for all IPC server threads to stop. | |
968 | */ | |
969 | int ipc_server_await(struct ipc_server_data *server_data) | |
970 | { | |
971 | pthread_join(server_data->accept_thread->pthread_id, NULL); | |
972 | ||
973 | if (!server_data->shutdown_requested) | |
974 | BUG("ipc-server: accept-thread stopped for '%s'", | |
975 | server_data->buf_path.buf); | |
976 | ||
977 | while (server_data->worker_thread_list) { | |
978 | struct ipc_worker_thread_data *wtd = | |
979 | server_data->worker_thread_list; | |
980 | ||
981 | pthread_join(wtd->pthread_id, NULL); | |
982 | ||
983 | server_data->worker_thread_list = wtd->next_thread; | |
984 | free(wtd); | |
985 | } | |
986 | ||
987 | server_data->is_stopped = 1; | |
988 | ||
989 | return 0; | |
990 | } | |
991 | ||
992 | void ipc_server_free(struct ipc_server_data *server_data) | |
993 | { | |
994 | struct ipc_accept_thread_data * accept_thread_data; | |
995 | ||
996 | if (!server_data) | |
997 | return; | |
998 | ||
999 | if (!server_data->is_stopped) | |
1000 | BUG("cannot free ipc-server while running for '%s'", | |
1001 | server_data->buf_path.buf); | |
1002 | ||
1003 | accept_thread_data = server_data->accept_thread; | |
1004 | if (accept_thread_data) { | |
1005 | unix_ss_free(accept_thread_data->server_socket); | |
1006 | ||
1007 | if (accept_thread_data->fd_send_shutdown != -1) | |
1008 | close(accept_thread_data->fd_send_shutdown); | |
1009 | if (accept_thread_data->fd_wait_shutdown != -1) | |
1010 | close(accept_thread_data->fd_wait_shutdown); | |
1011 | ||
1012 | free(server_data->accept_thread); | |
1013 | } | |
1014 | ||
1015 | while (server_data->worker_thread_list) { | |
1016 | struct ipc_worker_thread_data *wtd = | |
1017 | server_data->worker_thread_list; | |
1018 | ||
1019 | server_data->worker_thread_list = wtd->next_thread; | |
1020 | free(wtd); | |
1021 | } | |
1022 | ||
1023 | pthread_cond_destroy(&server_data->work_available_cond); | |
1024 | pthread_mutex_destroy(&server_data->work_available_mutex); | |
1025 | ||
1026 | strbuf_release(&server_data->buf_path); | |
1027 | ||
1028 | free(server_data->fifo_fds); | |
1029 | free(server_data); | |
1030 | } |