]>
Commit | Line | Data |
---|---|---|
5579f44d | 1 | #include "git-compat-util.h" |
f394e093 | 2 | #include "gettext.h" |
7cd5dbca JH |
3 | #include "simple-ipc.h" |
4 | #include "strbuf.h" | |
5 | #include "pkt-line.h" | |
6 | #include "thread-utils.h" | |
74ea5c95 | 7 | #include "trace2.h" |
7cd5dbca JH |
8 | #include "unix-socket.h" |
9 | #include "unix-stream-server.h" | |
10 | ||
6aac70a8 JH |
11 | #ifndef SUPPORTS_SIMPLE_IPC |
12 | /* | |
13 | * This source file should only be compiled when Simple IPC is supported. | |
14 | * See the top-level Makefile. | |
15 | */ | |
16 | #error SUPPORTS_SIMPLE_IPC not defined | |
7cd5dbca JH |
17 | #endif |
18 | ||
19 | enum ipc_active_state ipc_get_active_state(const char *path) | |
20 | { | |
21 | enum ipc_active_state state = IPC_STATE__OTHER_ERROR; | |
22 | struct ipc_client_connect_options options | |
23 | = IPC_CLIENT_CONNECT_OPTIONS_INIT; | |
24 | struct stat st; | |
25 | struct ipc_client_connection *connection_test = NULL; | |
26 | ||
27 | options.wait_if_busy = 0; | |
28 | options.wait_if_not_found = 0; | |
29 | ||
30 | if (lstat(path, &st) == -1) { | |
31 | switch (errno) { | |
32 | case ENOENT: | |
33 | case ENOTDIR: | |
34 | return IPC_STATE__NOT_LISTENING; | |
35 | default: | |
36 | return IPC_STATE__INVALID_PATH; | |
37 | } | |
38 | } | |
39 | ||
974ef7ce JS |
40 | #ifdef __CYGWIN__ |
41 | /* | |
42 | * Cygwin emulates Unix sockets by writing special-crafted files whose | |
43 | * `system` bit is set. | |
44 | * | |
45 | * If we are too fast, Cygwin might still be in the process of marking | |
46 | * the underlying file as a system file. Until then, we will not see a | |
47 | * Unix socket here, but a plain file instead. Just in case that this | |
48 | * is happening, wait a little and try again. | |
49 | */ | |
50 | { | |
51 | static const int delay[] = { 1, 10, 20, 40, -1 }; | |
52 | int i; | |
53 | ||
54 | for (i = 0; S_ISREG(st.st_mode) && delay[i] > 0; i++) { | |
55 | sleep_millisec(delay[i]); | |
56 | if (lstat(path, &st) == -1) | |
57 | return IPC_STATE__INVALID_PATH; | |
58 | } | |
59 | } | |
60 | #endif | |
61 | ||
7cd5dbca JH |
62 | /* also complain if a plain file is in the way */ |
63 | if ((st.st_mode & S_IFMT) != S_IFSOCK) | |
64 | return IPC_STATE__INVALID_PATH; | |
65 | ||
66 | /* | |
67 | * Just because the filesystem has a S_IFSOCK type inode | |
68 | * at `path`, doesn't mean it that there is a server listening. | |
69 | * Ping it to be sure. | |
70 | */ | |
71 | state = ipc_client_try_connect(path, &options, &connection_test); | |
72 | ipc_client_close_connection(connection_test); | |
73 | ||
74 | return state; | |
75 | } | |
76 | ||
77 | /* | |
78 | * Retry frequency when trying to connect to a server. | |
79 | * | |
80 | * This value should be short enough that we don't seriously delay our | |
81 | * caller, but not fast enough that our spinning puts pressure on the | |
82 | * system. | |
83 | */ | |
84 | #define WAIT_STEP_MS (50) | |
85 | ||
86 | /* | |
87 | * Try to connect to the server. If the server is just starting up or | |
88 | * is very busy, we may not get a connection the first time. | |
89 | */ | |
90 | static enum ipc_active_state connect_to_server( | |
91 | const char *path, | |
92 | int timeout_ms, | |
93 | const struct ipc_client_connect_options *options, | |
94 | int *pfd) | |
95 | { | |
96 | int k; | |
97 | ||
98 | *pfd = -1; | |
99 | ||
100 | for (k = 0; k < timeout_ms; k += WAIT_STEP_MS) { | |
101 | int fd = unix_stream_connect(path, options->uds_disallow_chdir); | |
102 | ||
103 | if (fd != -1) { | |
104 | *pfd = fd; | |
105 | return IPC_STATE__LISTENING; | |
106 | } | |
107 | ||
108 | if (errno == ENOENT) { | |
109 | if (!options->wait_if_not_found) | |
110 | return IPC_STATE__PATH_NOT_FOUND; | |
111 | ||
112 | goto sleep_and_try_again; | |
113 | } | |
114 | ||
115 | if (errno == ETIMEDOUT) { | |
116 | if (!options->wait_if_busy) | |
117 | return IPC_STATE__NOT_LISTENING; | |
118 | ||
119 | goto sleep_and_try_again; | |
120 | } | |
121 | ||
122 | if (errno == ECONNREFUSED) { | |
123 | if (!options->wait_if_busy) | |
124 | return IPC_STATE__NOT_LISTENING; | |
125 | ||
126 | goto sleep_and_try_again; | |
127 | } | |
128 | ||
129 | return IPC_STATE__OTHER_ERROR; | |
130 | ||
131 | sleep_and_try_again: | |
132 | sleep_millisec(WAIT_STEP_MS); | |
133 | } | |
134 | ||
135 | return IPC_STATE__NOT_LISTENING; | |
136 | } | |
137 | ||
138 | /* | |
139 | * The total amount of time that we are willing to wait when trying to | |
140 | * connect to a server. | |
141 | * | |
142 | * When the server is first started, it might take a little while for | |
143 | * it to become ready to service requests. Likewise, the server may | |
144 | * be very (temporarily) busy and not respond to our connections. | |
145 | * | |
146 | * We should gracefully and silently handle those conditions and try | |
147 | * again for a reasonable time period. | |
148 | * | |
149 | * The value chosen here should be long enough for the server | |
150 | * to reliably heal from the above conditions. | |
151 | */ | |
152 | #define MY_CONNECTION_TIMEOUT_MS (1000) | |
153 | ||
154 | enum ipc_active_state ipc_client_try_connect( | |
155 | const char *path, | |
156 | const struct ipc_client_connect_options *options, | |
157 | struct ipc_client_connection **p_connection) | |
158 | { | |
159 | enum ipc_active_state state = IPC_STATE__OTHER_ERROR; | |
160 | int fd = -1; | |
161 | ||
162 | *p_connection = NULL; | |
163 | ||
164 | trace2_region_enter("ipc-client", "try-connect", NULL); | |
165 | trace2_data_string("ipc-client", NULL, "try-connect/path", path); | |
166 | ||
167 | state = connect_to_server(path, MY_CONNECTION_TIMEOUT_MS, | |
168 | options, &fd); | |
169 | ||
170 | trace2_data_intmax("ipc-client", NULL, "try-connect/state", | |
171 | (intmax_t)state); | |
172 | trace2_region_leave("ipc-client", "try-connect", NULL); | |
173 | ||
174 | if (state == IPC_STATE__LISTENING) { | |
175 | (*p_connection) = xcalloc(1, sizeof(struct ipc_client_connection)); | |
176 | (*p_connection)->fd = fd; | |
177 | } | |
178 | ||
179 | return state; | |
180 | } | |
181 | ||
182 | void ipc_client_close_connection(struct ipc_client_connection *connection) | |
183 | { | |
184 | if (!connection) | |
185 | return; | |
186 | ||
187 | if (connection->fd != -1) | |
188 | close(connection->fd); | |
189 | ||
190 | free(connection); | |
191 | } | |
192 | ||
193 | int ipc_client_send_command_to_connection( | |
194 | struct ipc_client_connection *connection, | |
a3e2033e JH |
195 | const char *message, size_t message_len, |
196 | struct strbuf *answer) | |
7cd5dbca JH |
197 | { |
198 | int ret = 0; | |
199 | ||
200 | strbuf_setlen(answer, 0); | |
201 | ||
202 | trace2_region_enter("ipc-client", "send-command", NULL); | |
203 | ||
a3e2033e | 204 | if (write_packetized_from_buf_no_flush(message, message_len, |
7cd5dbca JH |
205 | connection->fd) < 0 || |
206 | packet_flush_gently(connection->fd) < 0) { | |
207 | ret = error(_("could not send IPC command")); | |
208 | goto done; | |
209 | } | |
210 | ||
211 | if (read_packetized_to_strbuf( | |
212 | connection->fd, answer, | |
213 | PACKET_READ_GENTLE_ON_EOF | PACKET_READ_GENTLE_ON_READ_ERROR) < 0) { | |
214 | ret = error(_("could not read IPC response")); | |
215 | goto done; | |
216 | } | |
217 | ||
218 | done: | |
219 | trace2_region_leave("ipc-client", "send-command", NULL); | |
220 | return ret; | |
221 | } | |
222 | ||
223 | int ipc_client_send_command(const char *path, | |
224 | const struct ipc_client_connect_options *options, | |
a3e2033e JH |
225 | const char *message, size_t message_len, |
226 | struct strbuf *answer) | |
7cd5dbca JH |
227 | { |
228 | int ret = -1; | |
229 | enum ipc_active_state state; | |
230 | struct ipc_client_connection *connection = NULL; | |
231 | ||
232 | state = ipc_client_try_connect(path, options, &connection); | |
233 | ||
234 | if (state != IPC_STATE__LISTENING) | |
235 | return ret; | |
236 | ||
a3e2033e JH |
237 | ret = ipc_client_send_command_to_connection(connection, |
238 | message, message_len, | |
239 | answer); | |
7cd5dbca JH |
240 | |
241 | ipc_client_close_connection(connection); | |
242 | ||
243 | return ret; | |
244 | } | |
245 | ||
246 | static int set_socket_blocking_flag(int fd, int make_nonblocking) | |
247 | { | |
248 | int flags; | |
249 | ||
250 | flags = fcntl(fd, F_GETFL, NULL); | |
251 | ||
252 | if (flags < 0) | |
253 | return -1; | |
254 | ||
255 | if (make_nonblocking) | |
256 | flags |= O_NONBLOCK; | |
257 | else | |
258 | flags &= ~O_NONBLOCK; | |
259 | ||
260 | return fcntl(fd, F_SETFL, flags); | |
261 | } | |
262 | ||
263 | /* | |
264 | * Magic numbers used to annotate callback instance data. | |
265 | * These are used to help guard against accidentally passing the | |
266 | * wrong instance data across multiple levels of callbacks (which | |
267 | * is easy to do if there are `void*` arguments). | |
268 | */ | |
269 | enum magic { | |
270 | MAGIC_SERVER_REPLY_DATA, | |
271 | MAGIC_WORKER_THREAD_DATA, | |
272 | MAGIC_ACCEPT_THREAD_DATA, | |
273 | MAGIC_SERVER_DATA, | |
274 | }; | |
275 | ||
276 | struct ipc_server_reply_data { | |
277 | enum magic magic; | |
278 | int fd; | |
279 | struct ipc_worker_thread_data *worker_thread_data; | |
280 | }; | |
281 | ||
282 | struct ipc_worker_thread_data { | |
283 | enum magic magic; | |
284 | struct ipc_worker_thread_data *next_thread; | |
285 | struct ipc_server_data *server_data; | |
286 | pthread_t pthread_id; | |
287 | }; | |
288 | ||
289 | struct ipc_accept_thread_data { | |
290 | enum magic magic; | |
291 | struct ipc_server_data *server_data; | |
292 | ||
293 | struct unix_ss_socket *server_socket; | |
294 | ||
295 | int fd_send_shutdown; | |
296 | int fd_wait_shutdown; | |
297 | pthread_t pthread_id; | |
298 | }; | |
299 | ||
300 | /* | |
301 | * With unix-sockets, the conceptual "ipc-server" is implemented as a single | |
302 | * controller "accept-thread" thread and a pool of "worker-thread" threads. | |
303 | * The former does the usual `accept()` loop and dispatches connections | |
304 | * to an idle worker thread. The worker threads wait in an idle loop for | |
305 | * a new connection, communicate with the client and relay data to/from | |
306 | * the `application_cb` and then wait for another connection from the | |
307 | * server thread. This avoids the overhead of constantly creating and | |
308 | * destroying threads. | |
309 | */ | |
310 | struct ipc_server_data { | |
311 | enum magic magic; | |
312 | ipc_server_application_cb *application_cb; | |
313 | void *application_data; | |
314 | struct strbuf buf_path; | |
315 | ||
316 | struct ipc_accept_thread_data *accept_thread; | |
317 | struct ipc_worker_thread_data *worker_thread_list; | |
318 | ||
319 | pthread_mutex_t work_available_mutex; | |
320 | pthread_cond_t work_available_cond; | |
321 | ||
322 | /* | |
323 | * Accepted but not yet processed client connections are kept | |
324 | * in a circular buffer FIFO. The queue is empty when the | |
325 | * positions are equal. | |
326 | */ | |
327 | int *fifo_fds; | |
328 | int queue_size; | |
329 | int back_pos; | |
330 | int front_pos; | |
331 | ||
332 | int shutdown_requested; | |
333 | int is_stopped; | |
334 | }; | |
335 | ||
336 | /* | |
337 | * Remove and return the oldest queued connection. | |
338 | * | |
339 | * Returns -1 if empty. | |
340 | */ | |
341 | static int fifo_dequeue(struct ipc_server_data *server_data) | |
342 | { | |
343 | /* ASSERT holding mutex */ | |
344 | ||
345 | int fd; | |
346 | ||
347 | if (server_data->back_pos == server_data->front_pos) | |
348 | return -1; | |
349 | ||
350 | fd = server_data->fifo_fds[server_data->front_pos]; | |
351 | server_data->fifo_fds[server_data->front_pos] = -1; | |
352 | ||
353 | server_data->front_pos++; | |
354 | if (server_data->front_pos == server_data->queue_size) | |
355 | server_data->front_pos = 0; | |
356 | ||
357 | return fd; | |
358 | } | |
359 | ||
360 | /* | |
361 | * Push a new fd onto the back of the queue. | |
362 | * | |
363 | * Drop it and return -1 if queue is already full. | |
364 | */ | |
365 | static int fifo_enqueue(struct ipc_server_data *server_data, int fd) | |
366 | { | |
367 | /* ASSERT holding mutex */ | |
368 | ||
369 | int next_back_pos; | |
370 | ||
371 | next_back_pos = server_data->back_pos + 1; | |
372 | if (next_back_pos == server_data->queue_size) | |
373 | next_back_pos = 0; | |
374 | ||
375 | if (next_back_pos == server_data->front_pos) { | |
376 | /* Queue is full. Just drop it. */ | |
377 | close(fd); | |
378 | return -1; | |
379 | } | |
380 | ||
381 | server_data->fifo_fds[server_data->back_pos] = fd; | |
382 | server_data->back_pos = next_back_pos; | |
383 | ||
384 | return fd; | |
385 | } | |
386 | ||
387 | /* | |
388 | * Wait for a connection to be queued to the FIFO and return it. | |
389 | * | |
390 | * Returns -1 if someone has already requested a shutdown. | |
391 | */ | |
392 | static int worker_thread__wait_for_connection( | |
393 | struct ipc_worker_thread_data *worker_thread_data) | |
394 | { | |
395 | /* ASSERT NOT holding mutex */ | |
396 | ||
397 | struct ipc_server_data *server_data = worker_thread_data->server_data; | |
398 | int fd = -1; | |
399 | ||
400 | pthread_mutex_lock(&server_data->work_available_mutex); | |
401 | for (;;) { | |
402 | if (server_data->shutdown_requested) | |
403 | break; | |
404 | ||
405 | fd = fifo_dequeue(server_data); | |
406 | if (fd >= 0) | |
407 | break; | |
408 | ||
409 | pthread_cond_wait(&server_data->work_available_cond, | |
410 | &server_data->work_available_mutex); | |
411 | } | |
412 | pthread_mutex_unlock(&server_data->work_available_mutex); | |
413 | ||
414 | return fd; | |
415 | } | |
416 | ||
417 | /* | |
418 | * Forward declare our reply callback function so that any compiler | |
419 | * errors are reported when we actually define the function (in addition | |
420 | * to any errors reported when we try to pass this callback function as | |
421 | * a parameter in a function call). The former are easier to understand. | |
422 | */ | |
423 | static ipc_server_reply_cb do_io_reply_callback; | |
424 | ||
425 | /* | |
426 | * Relay application's response message to the client process. | |
427 | * (We do not flush at this point because we allow the caller | |
428 | * to chunk data to the client thru us.) | |
429 | */ | |
430 | static int do_io_reply_callback(struct ipc_server_reply_data *reply_data, | |
431 | const char *response, size_t response_len) | |
432 | { | |
433 | if (reply_data->magic != MAGIC_SERVER_REPLY_DATA) | |
434 | BUG("reply_cb called with wrong instance data"); | |
435 | ||
436 | return write_packetized_from_buf_no_flush(response, response_len, | |
437 | reply_data->fd); | |
438 | } | |
439 | ||
440 | /* A randomly chosen value. */ | |
441 | #define MY_WAIT_POLL_TIMEOUT_MS (10) | |
442 | ||
443 | /* | |
444 | * If the client hangs up without sending any data on the wire, just | |
445 | * quietly close the socket and ignore this client. | |
446 | * | |
447 | * This worker thread is committed to reading the IPC request data | |
448 | * from the client at the other end of this fd. Wait here for the | |
449 | * client to actually put something on the wire -- because if the | |
450 | * client just does a ping (connect and hangup without sending any | |
451 | * data), our use of the pkt-line read routines will spew an error | |
452 | * message. | |
453 | * | |
454 | * Return -1 if the client hung up. | |
455 | * Return 0 if data (possibly incomplete) is ready. | |
456 | */ | |
457 | static int worker_thread__wait_for_io_start( | |
458 | struct ipc_worker_thread_data *worker_thread_data, | |
459 | int fd) | |
460 | { | |
461 | struct ipc_server_data *server_data = worker_thread_data->server_data; | |
462 | struct pollfd pollfd[1]; | |
463 | int result; | |
464 | ||
465 | for (;;) { | |
466 | pollfd[0].fd = fd; | |
467 | pollfd[0].events = POLLIN; | |
468 | ||
469 | result = poll(pollfd, 1, MY_WAIT_POLL_TIMEOUT_MS); | |
470 | if (result < 0) { | |
471 | if (errno == EINTR) | |
472 | continue; | |
473 | goto cleanup; | |
474 | } | |
475 | ||
476 | if (result == 0) { | |
477 | /* a timeout */ | |
478 | ||
479 | int in_shutdown; | |
480 | ||
481 | pthread_mutex_lock(&server_data->work_available_mutex); | |
482 | in_shutdown = server_data->shutdown_requested; | |
483 | pthread_mutex_unlock(&server_data->work_available_mutex); | |
484 | ||
485 | /* | |
486 | * If a shutdown is already in progress and this | |
487 | * client has not started talking yet, just drop it. | |
488 | */ | |
489 | if (in_shutdown) | |
490 | goto cleanup; | |
491 | continue; | |
492 | } | |
493 | ||
494 | if (pollfd[0].revents & POLLHUP) | |
495 | goto cleanup; | |
496 | ||
497 | if (pollfd[0].revents & POLLIN) | |
498 | return 0; | |
499 | ||
500 | goto cleanup; | |
501 | } | |
502 | ||
503 | cleanup: | |
504 | close(fd); | |
505 | return -1; | |
506 | } | |
507 | ||
508 | /* | |
509 | * Receive the request/command from the client and pass it to the | |
510 | * registered request-callback. The request-callback will compose | |
511 | * a response and call our reply-callback to send it to the client. | |
512 | */ | |
513 | static int worker_thread__do_io( | |
514 | struct ipc_worker_thread_data *worker_thread_data, | |
515 | int fd) | |
516 | { | |
517 | /* ASSERT NOT holding lock */ | |
518 | ||
519 | struct strbuf buf = STRBUF_INIT; | |
520 | struct ipc_server_reply_data reply_data; | |
521 | int ret = 0; | |
522 | ||
523 | reply_data.magic = MAGIC_SERVER_REPLY_DATA; | |
524 | reply_data.worker_thread_data = worker_thread_data; | |
525 | ||
526 | reply_data.fd = fd; | |
527 | ||
528 | ret = read_packetized_to_strbuf( | |
529 | reply_data.fd, &buf, | |
530 | PACKET_READ_GENTLE_ON_EOF | PACKET_READ_GENTLE_ON_READ_ERROR); | |
531 | if (ret >= 0) { | |
532 | ret = worker_thread_data->server_data->application_cb( | |
533 | worker_thread_data->server_data->application_data, | |
a3e2033e | 534 | buf.buf, buf.len, do_io_reply_callback, &reply_data); |
7cd5dbca JH |
535 | |
536 | packet_flush_gently(reply_data.fd); | |
537 | } | |
538 | else { | |
539 | /* | |
540 | * The client probably disconnected/shutdown before it | |
541 | * could send a well-formed message. Ignore it. | |
542 | */ | |
543 | } | |
544 | ||
545 | strbuf_release(&buf); | |
546 | close(reply_data.fd); | |
547 | ||
548 | return ret; | |
549 | } | |
550 | ||
551 | /* | |
552 | * Block SIGPIPE on the current thread (so that we get EPIPE from | |
553 | * write() rather than an actual signal). | |
554 | * | |
555 | * Note that using sigchain_push() and _pop() to control SIGPIPE | |
556 | * around our IO calls is not thread safe: | |
557 | * [] It uses a global stack of handler frames. | |
558 | * [] It uses ALLOC_GROW() to resize it. | |
559 | * [] Finally, according to the `signal(2)` man-page: | |
560 | * "The effects of `signal()` in a multithreaded process are unspecified." | |
561 | */ | |
562 | static void thread_block_sigpipe(sigset_t *old_set) | |
563 | { | |
564 | sigset_t new_set; | |
565 | ||
566 | sigemptyset(&new_set); | |
567 | sigaddset(&new_set, SIGPIPE); | |
568 | ||
569 | sigemptyset(old_set); | |
570 | pthread_sigmask(SIG_BLOCK, &new_set, old_set); | |
571 | } | |
572 | ||
573 | /* | |
574 | * Thread proc for an IPC worker thread. It handles a series of | |
575 | * connections from clients. It pulls the next fd from the queue | |
576 | * processes it, and then waits for the next client. | |
577 | * | |
578 | * Block SIGPIPE in this worker thread for the life of the thread. | |
579 | * This avoids stray (and sometimes delayed) SIGPIPE signals caused | |
580 | * by client errors and/or when we are under extremely heavy IO load. | |
581 | * | |
582 | * This means that the application callback will have SIGPIPE blocked. | |
583 | * The callback should not change it. | |
584 | */ | |
585 | static void *worker_thread_proc(void *_worker_thread_data) | |
586 | { | |
587 | struct ipc_worker_thread_data *worker_thread_data = _worker_thread_data; | |
588 | struct ipc_server_data *server_data = worker_thread_data->server_data; | |
589 | sigset_t old_set; | |
590 | int fd, io; | |
591 | int ret; | |
592 | ||
593 | trace2_thread_start("ipc-worker"); | |
594 | ||
595 | thread_block_sigpipe(&old_set); | |
596 | ||
597 | for (;;) { | |
598 | fd = worker_thread__wait_for_connection(worker_thread_data); | |
599 | if (fd == -1) | |
600 | break; /* in shutdown */ | |
601 | ||
602 | io = worker_thread__wait_for_io_start(worker_thread_data, fd); | |
603 | if (io == -1) | |
604 | continue; /* client hung up without sending anything */ | |
605 | ||
606 | ret = worker_thread__do_io(worker_thread_data, fd); | |
607 | ||
608 | if (ret == SIMPLE_IPC_QUIT) { | |
609 | trace2_data_string("ipc-worker", NULL, "queue_stop_async", | |
610 | "application_quit"); | |
611 | /* | |
612 | * The application layer is telling the ipc-server | |
613 | * layer to shutdown. | |
614 | * | |
615 | * We DO NOT have a response to send to the client. | |
616 | * | |
617 | * Queue an async stop (to stop the other threads) and | |
618 | * allow this worker thread to exit now (no sense waiting | |
619 | * for the thread-pool shutdown signal). | |
620 | * | |
621 | * Other non-idle worker threads are allowed to finish | |
622 | * responding to their current clients. | |
623 | */ | |
624 | ipc_server_stop_async(server_data); | |
625 | break; | |
626 | } | |
627 | } | |
628 | ||
629 | trace2_thread_exit(); | |
630 | return NULL; | |
631 | } | |
632 | ||
633 | /* A randomly chosen value. */ | |
634 | #define MY_ACCEPT_POLL_TIMEOUT_MS (60 * 1000) | |
635 | ||
636 | /* | |
637 | * Accept a new client connection on our socket. This uses non-blocking | |
638 | * IO so that we can also wait for shutdown requests on our socket-pair | |
639 | * without actually spinning on a fast timeout. | |
640 | */ | |
641 | static int accept_thread__wait_for_connection( | |
642 | struct ipc_accept_thread_data *accept_thread_data) | |
643 | { | |
644 | struct pollfd pollfd[2]; | |
645 | int result; | |
646 | ||
647 | for (;;) { | |
648 | pollfd[0].fd = accept_thread_data->fd_wait_shutdown; | |
649 | pollfd[0].events = POLLIN; | |
650 | ||
651 | pollfd[1].fd = accept_thread_data->server_socket->fd_socket; | |
652 | pollfd[1].events = POLLIN; | |
653 | ||
654 | result = poll(pollfd, 2, MY_ACCEPT_POLL_TIMEOUT_MS); | |
655 | if (result < 0) { | |
656 | if (errno == EINTR) | |
657 | continue; | |
658 | return result; | |
659 | } | |
660 | ||
661 | if (result == 0) { | |
662 | /* a timeout */ | |
663 | ||
664 | /* | |
665 | * If someone deletes or force-creates a new unix | |
666 | * domain socket at our path, all future clients | |
667 | * will be routed elsewhere and we silently starve. | |
668 | * If that happens, just queue a shutdown. | |
669 | */ | |
670 | if (unix_ss_was_stolen( | |
671 | accept_thread_data->server_socket)) { | |
672 | trace2_data_string("ipc-accept", NULL, | |
673 | "queue_stop_async", | |
674 | "socket_stolen"); | |
675 | ipc_server_stop_async( | |
676 | accept_thread_data->server_data); | |
677 | } | |
678 | continue; | |
679 | } | |
680 | ||
681 | if (pollfd[0].revents & POLLIN) { | |
682 | /* shutdown message queued to socketpair */ | |
683 | return -1; | |
684 | } | |
685 | ||
686 | if (pollfd[1].revents & POLLIN) { | |
687 | /* a connection is available on server_socket */ | |
688 | ||
689 | int client_fd = | |
690 | accept(accept_thread_data->server_socket->fd_socket, | |
691 | NULL, NULL); | |
692 | if (client_fd >= 0) | |
693 | return client_fd; | |
694 | ||
695 | /* | |
696 | * An error here is unlikely -- it probably | |
697 | * indicates that the connecting process has | |
698 | * already dropped the connection. | |
699 | */ | |
700 | continue; | |
701 | } | |
702 | ||
703 | BUG("unandled poll result errno=%d r[0]=%d r[1]=%d", | |
704 | errno, pollfd[0].revents, pollfd[1].revents); | |
705 | } | |
706 | } | |
707 | ||
708 | /* | |
709 | * Thread proc for the IPC server "accept thread". This waits for | |
710 | * an incoming socket connection, appends it to the queue of available | |
711 | * connections, and notifies a worker thread to process it. | |
712 | * | |
713 | * Block SIGPIPE in this thread for the life of the thread. This | |
714 | * avoids any stray SIGPIPE signals when closing pipe fds under | |
715 | * extremely heavy loads (such as when the fifo queue is full and we | |
716 | * drop incomming connections). | |
717 | */ | |
718 | static void *accept_thread_proc(void *_accept_thread_data) | |
719 | { | |
720 | struct ipc_accept_thread_data *accept_thread_data = _accept_thread_data; | |
721 | struct ipc_server_data *server_data = accept_thread_data->server_data; | |
722 | sigset_t old_set; | |
723 | ||
724 | trace2_thread_start("ipc-accept"); | |
725 | ||
726 | thread_block_sigpipe(&old_set); | |
727 | ||
728 | for (;;) { | |
729 | int client_fd = accept_thread__wait_for_connection( | |
730 | accept_thread_data); | |
731 | ||
732 | pthread_mutex_lock(&server_data->work_available_mutex); | |
733 | if (server_data->shutdown_requested) { | |
734 | pthread_mutex_unlock(&server_data->work_available_mutex); | |
735 | if (client_fd >= 0) | |
736 | close(client_fd); | |
737 | break; | |
738 | } | |
739 | ||
740 | if (client_fd < 0) { | |
741 | /* ignore transient accept() errors */ | |
742 | } | |
743 | else { | |
744 | fifo_enqueue(server_data, client_fd); | |
745 | pthread_cond_broadcast(&server_data->work_available_cond); | |
746 | } | |
747 | pthread_mutex_unlock(&server_data->work_available_mutex); | |
748 | } | |
749 | ||
750 | trace2_thread_exit(); | |
751 | return NULL; | |
752 | } | |
753 | ||
754 | /* | |
755 | * We can't predict the connection arrival rate relative to the worker | |
756 | * processing rate, therefore we allow the "accept-thread" to queue up | |
757 | * a generous number of connections, since we'd rather have the client | |
758 | * not unnecessarily timeout if we can avoid it. (The assumption is | |
759 | * that this will be used for FSMonitor and a few second wait on a | |
760 | * connection is better than having the client timeout and do the full | |
761 | * computation itself.) | |
762 | * | |
763 | * The FIFO queue size is set to a multiple of the worker pool size. | |
764 | * This value chosen at random. | |
765 | */ | |
766 | #define FIFO_SCALE (100) | |
767 | ||
768 | /* | |
769 | * The backlog value for `listen(2)`. This doesn't need to huge, | |
770 | * rather just large enough for our "accept-thread" to wake up and | |
771 | * queue incoming connections onto the FIFO without the kernel | |
772 | * dropping any. | |
773 | * | |
774 | * This value chosen at random. | |
775 | */ | |
776 | #define LISTEN_BACKLOG (50) | |
777 | ||
778 | static int create_listener_socket( | |
779 | const char *path, | |
780 | const struct ipc_server_opts *ipc_opts, | |
781 | struct unix_ss_socket **new_server_socket) | |
782 | { | |
783 | struct unix_ss_socket *server_socket = NULL; | |
784 | struct unix_stream_listen_opts uslg_opts = UNIX_STREAM_LISTEN_OPTS_INIT; | |
785 | int ret; | |
786 | ||
787 | uslg_opts.listen_backlog_size = LISTEN_BACKLOG; | |
788 | uslg_opts.disallow_chdir = ipc_opts->uds_disallow_chdir; | |
789 | ||
790 | ret = unix_ss_create(path, &uslg_opts, -1, &server_socket); | |
791 | if (ret) | |
792 | return ret; | |
793 | ||
794 | if (set_socket_blocking_flag(server_socket->fd_socket, 1)) { | |
795 | int saved_errno = errno; | |
796 | unix_ss_free(server_socket); | |
797 | errno = saved_errno; | |
798 | return -1; | |
799 | } | |
800 | ||
801 | *new_server_socket = server_socket; | |
802 | ||
803 | trace2_data_string("ipc-server", NULL, "listen-with-lock", path); | |
804 | return 0; | |
805 | } | |
806 | ||
807 | static int setup_listener_socket( | |
808 | const char *path, | |
809 | const struct ipc_server_opts *ipc_opts, | |
810 | struct unix_ss_socket **new_server_socket) | |
811 | { | |
812 | int ret, saved_errno; | |
813 | ||
814 | trace2_region_enter("ipc-server", "create-listener_socket", NULL); | |
815 | ||
816 | ret = create_listener_socket(path, ipc_opts, new_server_socket); | |
817 | ||
818 | saved_errno = errno; | |
819 | trace2_region_leave("ipc-server", "create-listener_socket", NULL); | |
820 | errno = saved_errno; | |
821 | ||
822 | return ret; | |
823 | } | |
824 | ||
825 | /* | |
826 | * Start IPC server in a pool of background threads. | |
827 | */ | |
828 | int ipc_server_run_async(struct ipc_server_data **returned_server_data, | |
829 | const char *path, const struct ipc_server_opts *opts, | |
830 | ipc_server_application_cb *application_cb, | |
831 | void *application_data) | |
832 | { | |
833 | struct unix_ss_socket *server_socket = NULL; | |
834 | struct ipc_server_data *server_data; | |
835 | int sv[2]; | |
836 | int k; | |
837 | int ret; | |
838 | int nr_threads = opts->nr_threads; | |
839 | ||
840 | *returned_server_data = NULL; | |
841 | ||
842 | /* | |
843 | * Create a socketpair and set sv[1] to non-blocking. This | |
844 | * will used to send a shutdown message to the accept-thread | |
845 | * and allows the accept-thread to wait on EITHER a client | |
846 | * connection or a shutdown request without spinning. | |
847 | */ | |
848 | if (socketpair(AF_UNIX, SOCK_STREAM, 0, sv) < 0) | |
849 | return -1; | |
850 | ||
851 | if (set_socket_blocking_flag(sv[1], 1)) { | |
852 | int saved_errno = errno; | |
853 | close(sv[0]); | |
854 | close(sv[1]); | |
855 | errno = saved_errno; | |
856 | return -1; | |
857 | } | |
858 | ||
859 | ret = setup_listener_socket(path, opts, &server_socket); | |
860 | if (ret) { | |
861 | int saved_errno = errno; | |
862 | close(sv[0]); | |
863 | close(sv[1]); | |
864 | errno = saved_errno; | |
865 | return ret; | |
866 | } | |
867 | ||
868 | server_data = xcalloc(1, sizeof(*server_data)); | |
869 | server_data->magic = MAGIC_SERVER_DATA; | |
870 | server_data->application_cb = application_cb; | |
871 | server_data->application_data = application_data; | |
872 | strbuf_init(&server_data->buf_path, 0); | |
873 | strbuf_addstr(&server_data->buf_path, path); | |
874 | ||
875 | if (nr_threads < 1) | |
876 | nr_threads = 1; | |
877 | ||
878 | pthread_mutex_init(&server_data->work_available_mutex, NULL); | |
879 | pthread_cond_init(&server_data->work_available_cond, NULL); | |
880 | ||
881 | server_data->queue_size = nr_threads * FIFO_SCALE; | |
882 | CALLOC_ARRAY(server_data->fifo_fds, server_data->queue_size); | |
883 | ||
884 | server_data->accept_thread = | |
885 | xcalloc(1, sizeof(*server_data->accept_thread)); | |
886 | server_data->accept_thread->magic = MAGIC_ACCEPT_THREAD_DATA; | |
887 | server_data->accept_thread->server_data = server_data; | |
888 | server_data->accept_thread->server_socket = server_socket; | |
889 | server_data->accept_thread->fd_send_shutdown = sv[0]; | |
890 | server_data->accept_thread->fd_wait_shutdown = sv[1]; | |
891 | ||
892 | if (pthread_create(&server_data->accept_thread->pthread_id, NULL, | |
893 | accept_thread_proc, server_data->accept_thread)) | |
894 | die_errno(_("could not start accept_thread '%s'"), path); | |
895 | ||
896 | for (k = 0; k < nr_threads; k++) { | |
897 | struct ipc_worker_thread_data *wtd; | |
898 | ||
899 | wtd = xcalloc(1, sizeof(*wtd)); | |
900 | wtd->magic = MAGIC_WORKER_THREAD_DATA; | |
901 | wtd->server_data = server_data; | |
902 | ||
903 | if (pthread_create(&wtd->pthread_id, NULL, worker_thread_proc, | |
904 | wtd)) { | |
905 | if (k == 0) | |
906 | die(_("could not start worker[0] for '%s'"), | |
907 | path); | |
908 | /* | |
909 | * Limp along with the thread pool that we have. | |
910 | */ | |
911 | break; | |
912 | } | |
913 | ||
914 | wtd->next_thread = server_data->worker_thread_list; | |
915 | server_data->worker_thread_list = wtd; | |
916 | } | |
917 | ||
918 | *returned_server_data = server_data; | |
919 | return 0; | |
920 | } | |
921 | ||
922 | /* | |
923 | * Gently tell the IPC server treads to shutdown. | |
924 | * Can be run on any thread. | |
925 | */ | |
926 | int ipc_server_stop_async(struct ipc_server_data *server_data) | |
927 | { | |
928 | /* ASSERT NOT holding mutex */ | |
929 | ||
930 | int fd; | |
931 | ||
932 | if (!server_data) | |
933 | return 0; | |
934 | ||
935 | trace2_region_enter("ipc-server", "server-stop-async", NULL); | |
936 | ||
937 | pthread_mutex_lock(&server_data->work_available_mutex); | |
938 | ||
939 | server_data->shutdown_requested = 1; | |
940 | ||
941 | /* | |
942 | * Write a byte to the shutdown socket pair to wake up the | |
943 | * accept-thread. | |
944 | */ | |
945 | if (write(server_data->accept_thread->fd_send_shutdown, "Q", 1) < 0) | |
946 | error_errno("could not write to fd_send_shutdown"); | |
947 | ||
948 | /* | |
949 | * Drain the queue of existing connections. | |
950 | */ | |
951 | while ((fd = fifo_dequeue(server_data)) != -1) | |
952 | close(fd); | |
953 | ||
954 | /* | |
955 | * Gently tell worker threads to stop processing new connections | |
956 | * and exit. (This does not abort in-process conversations.) | |
957 | */ | |
958 | pthread_cond_broadcast(&server_data->work_available_cond); | |
959 | ||
960 | pthread_mutex_unlock(&server_data->work_available_mutex); | |
961 | ||
962 | trace2_region_leave("ipc-server", "server-stop-async", NULL); | |
963 | ||
964 | return 0; | |
965 | } | |
966 | ||
967 | /* | |
968 | * Wait for all IPC server threads to stop. | |
969 | */ | |
970 | int ipc_server_await(struct ipc_server_data *server_data) | |
971 | { | |
972 | pthread_join(server_data->accept_thread->pthread_id, NULL); | |
973 | ||
974 | if (!server_data->shutdown_requested) | |
975 | BUG("ipc-server: accept-thread stopped for '%s'", | |
976 | server_data->buf_path.buf); | |
977 | ||
978 | while (server_data->worker_thread_list) { | |
979 | struct ipc_worker_thread_data *wtd = | |
980 | server_data->worker_thread_list; | |
981 | ||
982 | pthread_join(wtd->pthread_id, NULL); | |
983 | ||
984 | server_data->worker_thread_list = wtd->next_thread; | |
985 | free(wtd); | |
986 | } | |
987 | ||
988 | server_data->is_stopped = 1; | |
989 | ||
990 | return 0; | |
991 | } | |
992 | ||
993 | void ipc_server_free(struct ipc_server_data *server_data) | |
994 | { | |
995 | struct ipc_accept_thread_data * accept_thread_data; | |
996 | ||
997 | if (!server_data) | |
998 | return; | |
999 | ||
1000 | if (!server_data->is_stopped) | |
1001 | BUG("cannot free ipc-server while running for '%s'", | |
1002 | server_data->buf_path.buf); | |
1003 | ||
1004 | accept_thread_data = server_data->accept_thread; | |
1005 | if (accept_thread_data) { | |
1006 | unix_ss_free(accept_thread_data->server_socket); | |
1007 | ||
1008 | if (accept_thread_data->fd_send_shutdown != -1) | |
1009 | close(accept_thread_data->fd_send_shutdown); | |
1010 | if (accept_thread_data->fd_wait_shutdown != -1) | |
1011 | close(accept_thread_data->fd_wait_shutdown); | |
1012 | ||
1013 | free(server_data->accept_thread); | |
1014 | } | |
1015 | ||
1016 | while (server_data->worker_thread_list) { | |
1017 | struct ipc_worker_thread_data *wtd = | |
1018 | server_data->worker_thread_list; | |
1019 | ||
1020 | server_data->worker_thread_list = wtd->next_thread; | |
1021 | free(wtd); | |
1022 | } | |
1023 | ||
1024 | pthread_cond_destroy(&server_data->work_available_cond); | |
1025 | pthread_mutex_destroy(&server_data->work_available_mutex); | |
1026 | ||
1027 | strbuf_release(&server_data->buf_path); | |
1028 | ||
1029 | free(server_data->fifo_fds); | |
1030 | free(server_data); | |
1031 | } |