]> git.ipfire.org Git - thirdparty/git.git/blob - compat/simple-ipc/ipc-unix-socket.c
Sync with 2.31.4
[thirdparty/git.git] / compat / simple-ipc / ipc-unix-socket.c
1 #include "cache.h"
2 #include "simple-ipc.h"
3 #include "strbuf.h"
4 #include "pkt-line.h"
5 #include "thread-utils.h"
6 #include "unix-socket.h"
7 #include "unix-stream-server.h"
8
9 #ifndef SUPPORTS_SIMPLE_IPC
10 /*
11 * This source file should only be compiled when Simple IPC is supported.
12 * See the top-level Makefile.
13 */
14 #error SUPPORTS_SIMPLE_IPC not defined
15 #endif
16
17 enum ipc_active_state ipc_get_active_state(const char *path)
18 {
19 enum ipc_active_state state = IPC_STATE__OTHER_ERROR;
20 struct ipc_client_connect_options options
21 = IPC_CLIENT_CONNECT_OPTIONS_INIT;
22 struct stat st;
23 struct ipc_client_connection *connection_test = NULL;
24
25 options.wait_if_busy = 0;
26 options.wait_if_not_found = 0;
27
28 if (lstat(path, &st) == -1) {
29 switch (errno) {
30 case ENOENT:
31 case ENOTDIR:
32 return IPC_STATE__NOT_LISTENING;
33 default:
34 return IPC_STATE__INVALID_PATH;
35 }
36 }
37
38 /* also complain if a plain file is in the way */
39 if ((st.st_mode & S_IFMT) != S_IFSOCK)
40 return IPC_STATE__INVALID_PATH;
41
42 /*
43 * Just because the filesystem has a S_IFSOCK type inode
44 * at `path`, doesn't mean it that there is a server listening.
45 * Ping it to be sure.
46 */
47 state = ipc_client_try_connect(path, &options, &connection_test);
48 ipc_client_close_connection(connection_test);
49
50 return state;
51 }
52
53 /*
54 * Retry frequency when trying to connect to a server.
55 *
56 * This value should be short enough that we don't seriously delay our
57 * caller, but not fast enough that our spinning puts pressure on the
58 * system.
59 */
60 #define WAIT_STEP_MS (50)
61
62 /*
63 * Try to connect to the server. If the server is just starting up or
64 * is very busy, we may not get a connection the first time.
65 */
66 static enum ipc_active_state connect_to_server(
67 const char *path,
68 int timeout_ms,
69 const struct ipc_client_connect_options *options,
70 int *pfd)
71 {
72 int k;
73
74 *pfd = -1;
75
76 for (k = 0; k < timeout_ms; k += WAIT_STEP_MS) {
77 int fd = unix_stream_connect(path, options->uds_disallow_chdir);
78
79 if (fd != -1) {
80 *pfd = fd;
81 return IPC_STATE__LISTENING;
82 }
83
84 if (errno == ENOENT) {
85 if (!options->wait_if_not_found)
86 return IPC_STATE__PATH_NOT_FOUND;
87
88 goto sleep_and_try_again;
89 }
90
91 if (errno == ETIMEDOUT) {
92 if (!options->wait_if_busy)
93 return IPC_STATE__NOT_LISTENING;
94
95 goto sleep_and_try_again;
96 }
97
98 if (errno == ECONNREFUSED) {
99 if (!options->wait_if_busy)
100 return IPC_STATE__NOT_LISTENING;
101
102 goto sleep_and_try_again;
103 }
104
105 return IPC_STATE__OTHER_ERROR;
106
107 sleep_and_try_again:
108 sleep_millisec(WAIT_STEP_MS);
109 }
110
111 return IPC_STATE__NOT_LISTENING;
112 }
113
114 /*
115 * The total amount of time that we are willing to wait when trying to
116 * connect to a server.
117 *
118 * When the server is first started, it might take a little while for
119 * it to become ready to service requests. Likewise, the server may
120 * be very (temporarily) busy and not respond to our connections.
121 *
122 * We should gracefully and silently handle those conditions and try
123 * again for a reasonable time period.
124 *
125 * The value chosen here should be long enough for the server
126 * to reliably heal from the above conditions.
127 */
128 #define MY_CONNECTION_TIMEOUT_MS (1000)
129
130 enum ipc_active_state ipc_client_try_connect(
131 const char *path,
132 const struct ipc_client_connect_options *options,
133 struct ipc_client_connection **p_connection)
134 {
135 enum ipc_active_state state = IPC_STATE__OTHER_ERROR;
136 int fd = -1;
137
138 *p_connection = NULL;
139
140 trace2_region_enter("ipc-client", "try-connect", NULL);
141 trace2_data_string("ipc-client", NULL, "try-connect/path", path);
142
143 state = connect_to_server(path, MY_CONNECTION_TIMEOUT_MS,
144 options, &fd);
145
146 trace2_data_intmax("ipc-client", NULL, "try-connect/state",
147 (intmax_t)state);
148 trace2_region_leave("ipc-client", "try-connect", NULL);
149
150 if (state == IPC_STATE__LISTENING) {
151 (*p_connection) = xcalloc(1, sizeof(struct ipc_client_connection));
152 (*p_connection)->fd = fd;
153 }
154
155 return state;
156 }
157
158 void ipc_client_close_connection(struct ipc_client_connection *connection)
159 {
160 if (!connection)
161 return;
162
163 if (connection->fd != -1)
164 close(connection->fd);
165
166 free(connection);
167 }
168
169 int ipc_client_send_command_to_connection(
170 struct ipc_client_connection *connection,
171 const char *message, struct strbuf *answer)
172 {
173 int ret = 0;
174
175 strbuf_setlen(answer, 0);
176
177 trace2_region_enter("ipc-client", "send-command", NULL);
178
179 if (write_packetized_from_buf_no_flush(message, strlen(message),
180 connection->fd) < 0 ||
181 packet_flush_gently(connection->fd) < 0) {
182 ret = error(_("could not send IPC command"));
183 goto done;
184 }
185
186 if (read_packetized_to_strbuf(
187 connection->fd, answer,
188 PACKET_READ_GENTLE_ON_EOF | PACKET_READ_GENTLE_ON_READ_ERROR) < 0) {
189 ret = error(_("could not read IPC response"));
190 goto done;
191 }
192
193 done:
194 trace2_region_leave("ipc-client", "send-command", NULL);
195 return ret;
196 }
197
198 int ipc_client_send_command(const char *path,
199 const struct ipc_client_connect_options *options,
200 const char *message, struct strbuf *answer)
201 {
202 int ret = -1;
203 enum ipc_active_state state;
204 struct ipc_client_connection *connection = NULL;
205
206 state = ipc_client_try_connect(path, options, &connection);
207
208 if (state != IPC_STATE__LISTENING)
209 return ret;
210
211 ret = ipc_client_send_command_to_connection(connection, message, answer);
212
213 ipc_client_close_connection(connection);
214
215 return ret;
216 }
217
218 static int set_socket_blocking_flag(int fd, int make_nonblocking)
219 {
220 int flags;
221
222 flags = fcntl(fd, F_GETFL, NULL);
223
224 if (flags < 0)
225 return -1;
226
227 if (make_nonblocking)
228 flags |= O_NONBLOCK;
229 else
230 flags &= ~O_NONBLOCK;
231
232 return fcntl(fd, F_SETFL, flags);
233 }
234
235 /*
236 * Magic numbers used to annotate callback instance data.
237 * These are used to help guard against accidentally passing the
238 * wrong instance data across multiple levels of callbacks (which
239 * is easy to do if there are `void*` arguments).
240 */
241 enum magic {
242 MAGIC_SERVER_REPLY_DATA,
243 MAGIC_WORKER_THREAD_DATA,
244 MAGIC_ACCEPT_THREAD_DATA,
245 MAGIC_SERVER_DATA,
246 };
247
248 struct ipc_server_reply_data {
249 enum magic magic;
250 int fd;
251 struct ipc_worker_thread_data *worker_thread_data;
252 };
253
254 struct ipc_worker_thread_data {
255 enum magic magic;
256 struct ipc_worker_thread_data *next_thread;
257 struct ipc_server_data *server_data;
258 pthread_t pthread_id;
259 };
260
261 struct ipc_accept_thread_data {
262 enum magic magic;
263 struct ipc_server_data *server_data;
264
265 struct unix_ss_socket *server_socket;
266
267 int fd_send_shutdown;
268 int fd_wait_shutdown;
269 pthread_t pthread_id;
270 };
271
272 /*
273 * With unix-sockets, the conceptual "ipc-server" is implemented as a single
274 * controller "accept-thread" thread and a pool of "worker-thread" threads.
275 * The former does the usual `accept()` loop and dispatches connections
276 * to an idle worker thread. The worker threads wait in an idle loop for
277 * a new connection, communicate with the client and relay data to/from
278 * the `application_cb` and then wait for another connection from the
279 * server thread. This avoids the overhead of constantly creating and
280 * destroying threads.
281 */
282 struct ipc_server_data {
283 enum magic magic;
284 ipc_server_application_cb *application_cb;
285 void *application_data;
286 struct strbuf buf_path;
287
288 struct ipc_accept_thread_data *accept_thread;
289 struct ipc_worker_thread_data *worker_thread_list;
290
291 pthread_mutex_t work_available_mutex;
292 pthread_cond_t work_available_cond;
293
294 /*
295 * Accepted but not yet processed client connections are kept
296 * in a circular buffer FIFO. The queue is empty when the
297 * positions are equal.
298 */
299 int *fifo_fds;
300 int queue_size;
301 int back_pos;
302 int front_pos;
303
304 int shutdown_requested;
305 int is_stopped;
306 };
307
308 /*
309 * Remove and return the oldest queued connection.
310 *
311 * Returns -1 if empty.
312 */
313 static int fifo_dequeue(struct ipc_server_data *server_data)
314 {
315 /* ASSERT holding mutex */
316
317 int fd;
318
319 if (server_data->back_pos == server_data->front_pos)
320 return -1;
321
322 fd = server_data->fifo_fds[server_data->front_pos];
323 server_data->fifo_fds[server_data->front_pos] = -1;
324
325 server_data->front_pos++;
326 if (server_data->front_pos == server_data->queue_size)
327 server_data->front_pos = 0;
328
329 return fd;
330 }
331
332 /*
333 * Push a new fd onto the back of the queue.
334 *
335 * Drop it and return -1 if queue is already full.
336 */
337 static int fifo_enqueue(struct ipc_server_data *server_data, int fd)
338 {
339 /* ASSERT holding mutex */
340
341 int next_back_pos;
342
343 next_back_pos = server_data->back_pos + 1;
344 if (next_back_pos == server_data->queue_size)
345 next_back_pos = 0;
346
347 if (next_back_pos == server_data->front_pos) {
348 /* Queue is full. Just drop it. */
349 close(fd);
350 return -1;
351 }
352
353 server_data->fifo_fds[server_data->back_pos] = fd;
354 server_data->back_pos = next_back_pos;
355
356 return fd;
357 }
358
359 /*
360 * Wait for a connection to be queued to the FIFO and return it.
361 *
362 * Returns -1 if someone has already requested a shutdown.
363 */
364 static int worker_thread__wait_for_connection(
365 struct ipc_worker_thread_data *worker_thread_data)
366 {
367 /* ASSERT NOT holding mutex */
368
369 struct ipc_server_data *server_data = worker_thread_data->server_data;
370 int fd = -1;
371
372 pthread_mutex_lock(&server_data->work_available_mutex);
373 for (;;) {
374 if (server_data->shutdown_requested)
375 break;
376
377 fd = fifo_dequeue(server_data);
378 if (fd >= 0)
379 break;
380
381 pthread_cond_wait(&server_data->work_available_cond,
382 &server_data->work_available_mutex);
383 }
384 pthread_mutex_unlock(&server_data->work_available_mutex);
385
386 return fd;
387 }
388
389 /*
390 * Forward declare our reply callback function so that any compiler
391 * errors are reported when we actually define the function (in addition
392 * to any errors reported when we try to pass this callback function as
393 * a parameter in a function call). The former are easier to understand.
394 */
395 static ipc_server_reply_cb do_io_reply_callback;
396
397 /*
398 * Relay application's response message to the client process.
399 * (We do not flush at this point because we allow the caller
400 * to chunk data to the client thru us.)
401 */
402 static int do_io_reply_callback(struct ipc_server_reply_data *reply_data,
403 const char *response, size_t response_len)
404 {
405 if (reply_data->magic != MAGIC_SERVER_REPLY_DATA)
406 BUG("reply_cb called with wrong instance data");
407
408 return write_packetized_from_buf_no_flush(response, response_len,
409 reply_data->fd);
410 }
411
412 /* A randomly chosen value. */
413 #define MY_WAIT_POLL_TIMEOUT_MS (10)
414
415 /*
416 * If the client hangs up without sending any data on the wire, just
417 * quietly close the socket and ignore this client.
418 *
419 * This worker thread is committed to reading the IPC request data
420 * from the client at the other end of this fd. Wait here for the
421 * client to actually put something on the wire -- because if the
422 * client just does a ping (connect and hangup without sending any
423 * data), our use of the pkt-line read routines will spew an error
424 * message.
425 *
426 * Return -1 if the client hung up.
427 * Return 0 if data (possibly incomplete) is ready.
428 */
429 static int worker_thread__wait_for_io_start(
430 struct ipc_worker_thread_data *worker_thread_data,
431 int fd)
432 {
433 struct ipc_server_data *server_data = worker_thread_data->server_data;
434 struct pollfd pollfd[1];
435 int result;
436
437 for (;;) {
438 pollfd[0].fd = fd;
439 pollfd[0].events = POLLIN;
440
441 result = poll(pollfd, 1, MY_WAIT_POLL_TIMEOUT_MS);
442 if (result < 0) {
443 if (errno == EINTR)
444 continue;
445 goto cleanup;
446 }
447
448 if (result == 0) {
449 /* a timeout */
450
451 int in_shutdown;
452
453 pthread_mutex_lock(&server_data->work_available_mutex);
454 in_shutdown = server_data->shutdown_requested;
455 pthread_mutex_unlock(&server_data->work_available_mutex);
456
457 /*
458 * If a shutdown is already in progress and this
459 * client has not started talking yet, just drop it.
460 */
461 if (in_shutdown)
462 goto cleanup;
463 continue;
464 }
465
466 if (pollfd[0].revents & POLLHUP)
467 goto cleanup;
468
469 if (pollfd[0].revents & POLLIN)
470 return 0;
471
472 goto cleanup;
473 }
474
475 cleanup:
476 close(fd);
477 return -1;
478 }
479
480 /*
481 * Receive the request/command from the client and pass it to the
482 * registered request-callback. The request-callback will compose
483 * a response and call our reply-callback to send it to the client.
484 */
485 static int worker_thread__do_io(
486 struct ipc_worker_thread_data *worker_thread_data,
487 int fd)
488 {
489 /* ASSERT NOT holding lock */
490
491 struct strbuf buf = STRBUF_INIT;
492 struct ipc_server_reply_data reply_data;
493 int ret = 0;
494
495 reply_data.magic = MAGIC_SERVER_REPLY_DATA;
496 reply_data.worker_thread_data = worker_thread_data;
497
498 reply_data.fd = fd;
499
500 ret = read_packetized_to_strbuf(
501 reply_data.fd, &buf,
502 PACKET_READ_GENTLE_ON_EOF | PACKET_READ_GENTLE_ON_READ_ERROR);
503 if (ret >= 0) {
504 ret = worker_thread_data->server_data->application_cb(
505 worker_thread_data->server_data->application_data,
506 buf.buf, do_io_reply_callback, &reply_data);
507
508 packet_flush_gently(reply_data.fd);
509 }
510 else {
511 /*
512 * The client probably disconnected/shutdown before it
513 * could send a well-formed message. Ignore it.
514 */
515 }
516
517 strbuf_release(&buf);
518 close(reply_data.fd);
519
520 return ret;
521 }
522
523 /*
524 * Block SIGPIPE on the current thread (so that we get EPIPE from
525 * write() rather than an actual signal).
526 *
527 * Note that using sigchain_push() and _pop() to control SIGPIPE
528 * around our IO calls is not thread safe:
529 * [] It uses a global stack of handler frames.
530 * [] It uses ALLOC_GROW() to resize it.
531 * [] Finally, according to the `signal(2)` man-page:
532 * "The effects of `signal()` in a multithreaded process are unspecified."
533 */
534 static void thread_block_sigpipe(sigset_t *old_set)
535 {
536 sigset_t new_set;
537
538 sigemptyset(&new_set);
539 sigaddset(&new_set, SIGPIPE);
540
541 sigemptyset(old_set);
542 pthread_sigmask(SIG_BLOCK, &new_set, old_set);
543 }
544
545 /*
546 * Thread proc for an IPC worker thread. It handles a series of
547 * connections from clients. It pulls the next fd from the queue
548 * processes it, and then waits for the next client.
549 *
550 * Block SIGPIPE in this worker thread for the life of the thread.
551 * This avoids stray (and sometimes delayed) SIGPIPE signals caused
552 * by client errors and/or when we are under extremely heavy IO load.
553 *
554 * This means that the application callback will have SIGPIPE blocked.
555 * The callback should not change it.
556 */
557 static void *worker_thread_proc(void *_worker_thread_data)
558 {
559 struct ipc_worker_thread_data *worker_thread_data = _worker_thread_data;
560 struct ipc_server_data *server_data = worker_thread_data->server_data;
561 sigset_t old_set;
562 int fd, io;
563 int ret;
564
565 trace2_thread_start("ipc-worker");
566
567 thread_block_sigpipe(&old_set);
568
569 for (;;) {
570 fd = worker_thread__wait_for_connection(worker_thread_data);
571 if (fd == -1)
572 break; /* in shutdown */
573
574 io = worker_thread__wait_for_io_start(worker_thread_data, fd);
575 if (io == -1)
576 continue; /* client hung up without sending anything */
577
578 ret = worker_thread__do_io(worker_thread_data, fd);
579
580 if (ret == SIMPLE_IPC_QUIT) {
581 trace2_data_string("ipc-worker", NULL, "queue_stop_async",
582 "application_quit");
583 /*
584 * The application layer is telling the ipc-server
585 * layer to shutdown.
586 *
587 * We DO NOT have a response to send to the client.
588 *
589 * Queue an async stop (to stop the other threads) and
590 * allow this worker thread to exit now (no sense waiting
591 * for the thread-pool shutdown signal).
592 *
593 * Other non-idle worker threads are allowed to finish
594 * responding to their current clients.
595 */
596 ipc_server_stop_async(server_data);
597 break;
598 }
599 }
600
601 trace2_thread_exit();
602 return NULL;
603 }
604
605 /* A randomly chosen value. */
606 #define MY_ACCEPT_POLL_TIMEOUT_MS (60 * 1000)
607
608 /*
609 * Accept a new client connection on our socket. This uses non-blocking
610 * IO so that we can also wait for shutdown requests on our socket-pair
611 * without actually spinning on a fast timeout.
612 */
613 static int accept_thread__wait_for_connection(
614 struct ipc_accept_thread_data *accept_thread_data)
615 {
616 struct pollfd pollfd[2];
617 int result;
618
619 for (;;) {
620 pollfd[0].fd = accept_thread_data->fd_wait_shutdown;
621 pollfd[0].events = POLLIN;
622
623 pollfd[1].fd = accept_thread_data->server_socket->fd_socket;
624 pollfd[1].events = POLLIN;
625
626 result = poll(pollfd, 2, MY_ACCEPT_POLL_TIMEOUT_MS);
627 if (result < 0) {
628 if (errno == EINTR)
629 continue;
630 return result;
631 }
632
633 if (result == 0) {
634 /* a timeout */
635
636 /*
637 * If someone deletes or force-creates a new unix
638 * domain socket at our path, all future clients
639 * will be routed elsewhere and we silently starve.
640 * If that happens, just queue a shutdown.
641 */
642 if (unix_ss_was_stolen(
643 accept_thread_data->server_socket)) {
644 trace2_data_string("ipc-accept", NULL,
645 "queue_stop_async",
646 "socket_stolen");
647 ipc_server_stop_async(
648 accept_thread_data->server_data);
649 }
650 continue;
651 }
652
653 if (pollfd[0].revents & POLLIN) {
654 /* shutdown message queued to socketpair */
655 return -1;
656 }
657
658 if (pollfd[1].revents & POLLIN) {
659 /* a connection is available on server_socket */
660
661 int client_fd =
662 accept(accept_thread_data->server_socket->fd_socket,
663 NULL, NULL);
664 if (client_fd >= 0)
665 return client_fd;
666
667 /*
668 * An error here is unlikely -- it probably
669 * indicates that the connecting process has
670 * already dropped the connection.
671 */
672 continue;
673 }
674
675 BUG("unandled poll result errno=%d r[0]=%d r[1]=%d",
676 errno, pollfd[0].revents, pollfd[1].revents);
677 }
678 }
679
680 /*
681 * Thread proc for the IPC server "accept thread". This waits for
682 * an incoming socket connection, appends it to the queue of available
683 * connections, and notifies a worker thread to process it.
684 *
685 * Block SIGPIPE in this thread for the life of the thread. This
686 * avoids any stray SIGPIPE signals when closing pipe fds under
687 * extremely heavy loads (such as when the fifo queue is full and we
688 * drop incomming connections).
689 */
690 static void *accept_thread_proc(void *_accept_thread_data)
691 {
692 struct ipc_accept_thread_data *accept_thread_data = _accept_thread_data;
693 struct ipc_server_data *server_data = accept_thread_data->server_data;
694 sigset_t old_set;
695
696 trace2_thread_start("ipc-accept");
697
698 thread_block_sigpipe(&old_set);
699
700 for (;;) {
701 int client_fd = accept_thread__wait_for_connection(
702 accept_thread_data);
703
704 pthread_mutex_lock(&server_data->work_available_mutex);
705 if (server_data->shutdown_requested) {
706 pthread_mutex_unlock(&server_data->work_available_mutex);
707 if (client_fd >= 0)
708 close(client_fd);
709 break;
710 }
711
712 if (client_fd < 0) {
713 /* ignore transient accept() errors */
714 }
715 else {
716 fifo_enqueue(server_data, client_fd);
717 pthread_cond_broadcast(&server_data->work_available_cond);
718 }
719 pthread_mutex_unlock(&server_data->work_available_mutex);
720 }
721
722 trace2_thread_exit();
723 return NULL;
724 }
725
726 /*
727 * We can't predict the connection arrival rate relative to the worker
728 * processing rate, therefore we allow the "accept-thread" to queue up
729 * a generous number of connections, since we'd rather have the client
730 * not unnecessarily timeout if we can avoid it. (The assumption is
731 * that this will be used for FSMonitor and a few second wait on a
732 * connection is better than having the client timeout and do the full
733 * computation itself.)
734 *
735 * The FIFO queue size is set to a multiple of the worker pool size.
736 * This value chosen at random.
737 */
738 #define FIFO_SCALE (100)
739
740 /*
741 * The backlog value for `listen(2)`. This doesn't need to huge,
742 * rather just large enough for our "accept-thread" to wake up and
743 * queue incoming connections onto the FIFO without the kernel
744 * dropping any.
745 *
746 * This value chosen at random.
747 */
748 #define LISTEN_BACKLOG (50)
749
750 static int create_listener_socket(
751 const char *path,
752 const struct ipc_server_opts *ipc_opts,
753 struct unix_ss_socket **new_server_socket)
754 {
755 struct unix_ss_socket *server_socket = NULL;
756 struct unix_stream_listen_opts uslg_opts = UNIX_STREAM_LISTEN_OPTS_INIT;
757 int ret;
758
759 uslg_opts.listen_backlog_size = LISTEN_BACKLOG;
760 uslg_opts.disallow_chdir = ipc_opts->uds_disallow_chdir;
761
762 ret = unix_ss_create(path, &uslg_opts, -1, &server_socket);
763 if (ret)
764 return ret;
765
766 if (set_socket_blocking_flag(server_socket->fd_socket, 1)) {
767 int saved_errno = errno;
768 unix_ss_free(server_socket);
769 errno = saved_errno;
770 return -1;
771 }
772
773 *new_server_socket = server_socket;
774
775 trace2_data_string("ipc-server", NULL, "listen-with-lock", path);
776 return 0;
777 }
778
779 static int setup_listener_socket(
780 const char *path,
781 const struct ipc_server_opts *ipc_opts,
782 struct unix_ss_socket **new_server_socket)
783 {
784 int ret, saved_errno;
785
786 trace2_region_enter("ipc-server", "create-listener_socket", NULL);
787
788 ret = create_listener_socket(path, ipc_opts, new_server_socket);
789
790 saved_errno = errno;
791 trace2_region_leave("ipc-server", "create-listener_socket", NULL);
792 errno = saved_errno;
793
794 return ret;
795 }
796
797 /*
798 * Start IPC server in a pool of background threads.
799 */
800 int ipc_server_run_async(struct ipc_server_data **returned_server_data,
801 const char *path, const struct ipc_server_opts *opts,
802 ipc_server_application_cb *application_cb,
803 void *application_data)
804 {
805 struct unix_ss_socket *server_socket = NULL;
806 struct ipc_server_data *server_data;
807 int sv[2];
808 int k;
809 int ret;
810 int nr_threads = opts->nr_threads;
811
812 *returned_server_data = NULL;
813
814 /*
815 * Create a socketpair and set sv[1] to non-blocking. This
816 * will used to send a shutdown message to the accept-thread
817 * and allows the accept-thread to wait on EITHER a client
818 * connection or a shutdown request without spinning.
819 */
820 if (socketpair(AF_UNIX, SOCK_STREAM, 0, sv) < 0)
821 return -1;
822
823 if (set_socket_blocking_flag(sv[1], 1)) {
824 int saved_errno = errno;
825 close(sv[0]);
826 close(sv[1]);
827 errno = saved_errno;
828 return -1;
829 }
830
831 ret = setup_listener_socket(path, opts, &server_socket);
832 if (ret) {
833 int saved_errno = errno;
834 close(sv[0]);
835 close(sv[1]);
836 errno = saved_errno;
837 return ret;
838 }
839
840 server_data = xcalloc(1, sizeof(*server_data));
841 server_data->magic = MAGIC_SERVER_DATA;
842 server_data->application_cb = application_cb;
843 server_data->application_data = application_data;
844 strbuf_init(&server_data->buf_path, 0);
845 strbuf_addstr(&server_data->buf_path, path);
846
847 if (nr_threads < 1)
848 nr_threads = 1;
849
850 pthread_mutex_init(&server_data->work_available_mutex, NULL);
851 pthread_cond_init(&server_data->work_available_cond, NULL);
852
853 server_data->queue_size = nr_threads * FIFO_SCALE;
854 CALLOC_ARRAY(server_data->fifo_fds, server_data->queue_size);
855
856 server_data->accept_thread =
857 xcalloc(1, sizeof(*server_data->accept_thread));
858 server_data->accept_thread->magic = MAGIC_ACCEPT_THREAD_DATA;
859 server_data->accept_thread->server_data = server_data;
860 server_data->accept_thread->server_socket = server_socket;
861 server_data->accept_thread->fd_send_shutdown = sv[0];
862 server_data->accept_thread->fd_wait_shutdown = sv[1];
863
864 if (pthread_create(&server_data->accept_thread->pthread_id, NULL,
865 accept_thread_proc, server_data->accept_thread))
866 die_errno(_("could not start accept_thread '%s'"), path);
867
868 for (k = 0; k < nr_threads; k++) {
869 struct ipc_worker_thread_data *wtd;
870
871 wtd = xcalloc(1, sizeof(*wtd));
872 wtd->magic = MAGIC_WORKER_THREAD_DATA;
873 wtd->server_data = server_data;
874
875 if (pthread_create(&wtd->pthread_id, NULL, worker_thread_proc,
876 wtd)) {
877 if (k == 0)
878 die(_("could not start worker[0] for '%s'"),
879 path);
880 /*
881 * Limp along with the thread pool that we have.
882 */
883 break;
884 }
885
886 wtd->next_thread = server_data->worker_thread_list;
887 server_data->worker_thread_list = wtd;
888 }
889
890 *returned_server_data = server_data;
891 return 0;
892 }
893
894 /*
895 * Gently tell the IPC server treads to shutdown.
896 * Can be run on any thread.
897 */
898 int ipc_server_stop_async(struct ipc_server_data *server_data)
899 {
900 /* ASSERT NOT holding mutex */
901
902 int fd;
903
904 if (!server_data)
905 return 0;
906
907 trace2_region_enter("ipc-server", "server-stop-async", NULL);
908
909 pthread_mutex_lock(&server_data->work_available_mutex);
910
911 server_data->shutdown_requested = 1;
912
913 /*
914 * Write a byte to the shutdown socket pair to wake up the
915 * accept-thread.
916 */
917 if (write(server_data->accept_thread->fd_send_shutdown, "Q", 1) < 0)
918 error_errno("could not write to fd_send_shutdown");
919
920 /*
921 * Drain the queue of existing connections.
922 */
923 while ((fd = fifo_dequeue(server_data)) != -1)
924 close(fd);
925
926 /*
927 * Gently tell worker threads to stop processing new connections
928 * and exit. (This does not abort in-process conversations.)
929 */
930 pthread_cond_broadcast(&server_data->work_available_cond);
931
932 pthread_mutex_unlock(&server_data->work_available_mutex);
933
934 trace2_region_leave("ipc-server", "server-stop-async", NULL);
935
936 return 0;
937 }
938
939 /*
940 * Wait for all IPC server threads to stop.
941 */
942 int ipc_server_await(struct ipc_server_data *server_data)
943 {
944 pthread_join(server_data->accept_thread->pthread_id, NULL);
945
946 if (!server_data->shutdown_requested)
947 BUG("ipc-server: accept-thread stopped for '%s'",
948 server_data->buf_path.buf);
949
950 while (server_data->worker_thread_list) {
951 struct ipc_worker_thread_data *wtd =
952 server_data->worker_thread_list;
953
954 pthread_join(wtd->pthread_id, NULL);
955
956 server_data->worker_thread_list = wtd->next_thread;
957 free(wtd);
958 }
959
960 server_data->is_stopped = 1;
961
962 return 0;
963 }
964
965 void ipc_server_free(struct ipc_server_data *server_data)
966 {
967 struct ipc_accept_thread_data * accept_thread_data;
968
969 if (!server_data)
970 return;
971
972 if (!server_data->is_stopped)
973 BUG("cannot free ipc-server while running for '%s'",
974 server_data->buf_path.buf);
975
976 accept_thread_data = server_data->accept_thread;
977 if (accept_thread_data) {
978 unix_ss_free(accept_thread_data->server_socket);
979
980 if (accept_thread_data->fd_send_shutdown != -1)
981 close(accept_thread_data->fd_send_shutdown);
982 if (accept_thread_data->fd_wait_shutdown != -1)
983 close(accept_thread_data->fd_wait_shutdown);
984
985 free(server_data->accept_thread);
986 }
987
988 while (server_data->worker_thread_list) {
989 struct ipc_worker_thread_data *wtd =
990 server_data->worker_thread_list;
991
992 server_data->worker_thread_list = wtd->next_thread;
993 free(wtd);
994 }
995
996 pthread_cond_destroy(&server_data->work_available_cond);
997 pthread_mutex_destroy(&server_data->work_available_mutex);
998
999 strbuf_release(&server_data->buf_path);
1000
1001 free(server_data->fifo_fds);
1002 free(server_data);
1003 }