]> git.ipfire.org Git - thirdparty/git.git/blob - compat/simple-ipc/ipc-unix-socket.c
Merge branch 'vd/fsck-submodule-url-test'
[thirdparty/git.git] / compat / simple-ipc / ipc-unix-socket.c
1 #include "git-compat-util.h"
2 #include "gettext.h"
3 #include "simple-ipc.h"
4 #include "strbuf.h"
5 #include "thread-utils.h"
6 #include "trace2.h"
7 #include "unix-socket.h"
8 #include "unix-stream-server.h"
9
10 #ifndef SUPPORTS_SIMPLE_IPC
11 /*
12 * This source file should only be compiled when Simple IPC is supported.
13 * See the top-level Makefile.
14 */
15 #error SUPPORTS_SIMPLE_IPC not defined
16 #endif
17
18 enum ipc_active_state ipc_get_active_state(const char *path)
19 {
20 enum ipc_active_state state = IPC_STATE__OTHER_ERROR;
21 struct ipc_client_connect_options options
22 = IPC_CLIENT_CONNECT_OPTIONS_INIT;
23 struct stat st;
24 struct ipc_client_connection *connection_test = NULL;
25
26 options.wait_if_busy = 0;
27 options.wait_if_not_found = 0;
28
29 if (lstat(path, &st) == -1) {
30 switch (errno) {
31 case ENOENT:
32 case ENOTDIR:
33 return IPC_STATE__NOT_LISTENING;
34 default:
35 return IPC_STATE__INVALID_PATH;
36 }
37 }
38
39 #ifdef __CYGWIN__
40 /*
41 * Cygwin emulates Unix sockets by writing special-crafted files whose
42 * `system` bit is set.
43 *
44 * If we are too fast, Cygwin might still be in the process of marking
45 * the underlying file as a system file. Until then, we will not see a
46 * Unix socket here, but a plain file instead. Just in case that this
47 * is happening, wait a little and try again.
48 */
49 {
50 static const int delay[] = { 1, 10, 20, 40, -1 };
51 int i;
52
53 for (i = 0; S_ISREG(st.st_mode) && delay[i] > 0; i++) {
54 sleep_millisec(delay[i]);
55 if (lstat(path, &st) == -1)
56 return IPC_STATE__INVALID_PATH;
57 }
58 }
59 #endif
60
61 /* also complain if a plain file is in the way */
62 if ((st.st_mode & S_IFMT) != S_IFSOCK)
63 return IPC_STATE__INVALID_PATH;
64
65 /*
66 * Just because the filesystem has a S_IFSOCK type inode
67 * at `path`, doesn't mean it that there is a server listening.
68 * Ping it to be sure.
69 */
70 state = ipc_client_try_connect(path, &options, &connection_test);
71 ipc_client_close_connection(connection_test);
72
73 return state;
74 }
75
76 /*
77 * Retry frequency when trying to connect to a server.
78 *
79 * This value should be short enough that we don't seriously delay our
80 * caller, but not fast enough that our spinning puts pressure on the
81 * system.
82 */
83 #define WAIT_STEP_MS (50)
84
85 /*
86 * Try to connect to the server. If the server is just starting up or
87 * is very busy, we may not get a connection the first time.
88 */
89 static enum ipc_active_state connect_to_server(
90 const char *path,
91 int timeout_ms,
92 const struct ipc_client_connect_options *options,
93 int *pfd)
94 {
95 int k;
96
97 *pfd = -1;
98
99 for (k = 0; k < timeout_ms; k += WAIT_STEP_MS) {
100 int fd = unix_stream_connect(path, options->uds_disallow_chdir);
101
102 if (fd != -1) {
103 *pfd = fd;
104 return IPC_STATE__LISTENING;
105 }
106
107 if (errno == ENOENT) {
108 if (!options->wait_if_not_found)
109 return IPC_STATE__PATH_NOT_FOUND;
110
111 goto sleep_and_try_again;
112 }
113
114 if (errno == ETIMEDOUT) {
115 if (!options->wait_if_busy)
116 return IPC_STATE__NOT_LISTENING;
117
118 goto sleep_and_try_again;
119 }
120
121 if (errno == ECONNREFUSED) {
122 if (!options->wait_if_busy)
123 return IPC_STATE__NOT_LISTENING;
124
125 goto sleep_and_try_again;
126 }
127
128 return IPC_STATE__OTHER_ERROR;
129
130 sleep_and_try_again:
131 sleep_millisec(WAIT_STEP_MS);
132 }
133
134 return IPC_STATE__NOT_LISTENING;
135 }
136
137 /*
138 * The total amount of time that we are willing to wait when trying to
139 * connect to a server.
140 *
141 * When the server is first started, it might take a little while for
142 * it to become ready to service requests. Likewise, the server may
143 * be very (temporarily) busy and not respond to our connections.
144 *
145 * We should gracefully and silently handle those conditions and try
146 * again for a reasonable time period.
147 *
148 * The value chosen here should be long enough for the server
149 * to reliably heal from the above conditions.
150 */
151 #define MY_CONNECTION_TIMEOUT_MS (1000)
152
153 enum ipc_active_state ipc_client_try_connect(
154 const char *path,
155 const struct ipc_client_connect_options *options,
156 struct ipc_client_connection **p_connection)
157 {
158 enum ipc_active_state state = IPC_STATE__OTHER_ERROR;
159 int fd = -1;
160
161 *p_connection = NULL;
162
163 trace2_region_enter("ipc-client", "try-connect", NULL);
164 trace2_data_string("ipc-client", NULL, "try-connect/path", path);
165
166 state = connect_to_server(path, MY_CONNECTION_TIMEOUT_MS,
167 options, &fd);
168
169 trace2_data_intmax("ipc-client", NULL, "try-connect/state",
170 (intmax_t)state);
171 trace2_region_leave("ipc-client", "try-connect", NULL);
172
173 if (state == IPC_STATE__LISTENING) {
174 (*p_connection) = xcalloc(1, sizeof(struct ipc_client_connection));
175 (*p_connection)->fd = fd;
176 }
177
178 return state;
179 }
180
181 void ipc_client_close_connection(struct ipc_client_connection *connection)
182 {
183 if (!connection)
184 return;
185
186 if (connection->fd != -1)
187 close(connection->fd);
188
189 free(connection);
190 }
191
192 int ipc_client_send_command_to_connection(
193 struct ipc_client_connection *connection,
194 const char *message, size_t message_len,
195 struct strbuf *answer)
196 {
197 int ret = 0;
198
199 strbuf_setlen(answer, 0);
200
201 trace2_region_enter("ipc-client", "send-command", NULL);
202
203 if (write_packetized_from_buf_no_flush(message, message_len,
204 connection->fd) < 0 ||
205 packet_flush_gently(connection->fd) < 0) {
206 ret = error(_("could not send IPC command"));
207 goto done;
208 }
209
210 if (read_packetized_to_strbuf(
211 connection->fd, answer,
212 PACKET_READ_GENTLE_ON_EOF | PACKET_READ_GENTLE_ON_READ_ERROR) < 0) {
213 ret = error(_("could not read IPC response"));
214 goto done;
215 }
216
217 done:
218 trace2_region_leave("ipc-client", "send-command", NULL);
219 return ret;
220 }
221
222 int ipc_client_send_command(const char *path,
223 const struct ipc_client_connect_options *options,
224 const char *message, size_t message_len,
225 struct strbuf *answer)
226 {
227 int ret = -1;
228 enum ipc_active_state state;
229 struct ipc_client_connection *connection = NULL;
230
231 state = ipc_client_try_connect(path, options, &connection);
232
233 if (state != IPC_STATE__LISTENING)
234 return ret;
235
236 ret = ipc_client_send_command_to_connection(connection,
237 message, message_len,
238 answer);
239
240 ipc_client_close_connection(connection);
241
242 return ret;
243 }
244
245 static int set_socket_blocking_flag(int fd, int make_nonblocking)
246 {
247 int flags;
248
249 flags = fcntl(fd, F_GETFL, NULL);
250
251 if (flags < 0)
252 return -1;
253
254 if (make_nonblocking)
255 flags |= O_NONBLOCK;
256 else
257 flags &= ~O_NONBLOCK;
258
259 return fcntl(fd, F_SETFL, flags);
260 }
261
262 /*
263 * Magic numbers used to annotate callback instance data.
264 * These are used to help guard against accidentally passing the
265 * wrong instance data across multiple levels of callbacks (which
266 * is easy to do if there are `void*` arguments).
267 */
268 enum magic {
269 MAGIC_SERVER_REPLY_DATA,
270 MAGIC_WORKER_THREAD_DATA,
271 MAGIC_ACCEPT_THREAD_DATA,
272 MAGIC_SERVER_DATA,
273 };
274
275 struct ipc_server_reply_data {
276 enum magic magic;
277 int fd;
278 struct ipc_worker_thread_data *worker_thread_data;
279 };
280
281 struct ipc_worker_thread_data {
282 enum magic magic;
283 struct ipc_worker_thread_data *next_thread;
284 struct ipc_server_data *server_data;
285 pthread_t pthread_id;
286 };
287
288 struct ipc_accept_thread_data {
289 enum magic magic;
290 struct ipc_server_data *server_data;
291
292 struct unix_ss_socket *server_socket;
293
294 int fd_send_shutdown;
295 int fd_wait_shutdown;
296 pthread_t pthread_id;
297 };
298
299 /*
300 * With unix-sockets, the conceptual "ipc-server" is implemented as a single
301 * controller "accept-thread" thread and a pool of "worker-thread" threads.
302 * The former does the usual `accept()` loop and dispatches connections
303 * to an idle worker thread. The worker threads wait in an idle loop for
304 * a new connection, communicate with the client and relay data to/from
305 * the `application_cb` and then wait for another connection from the
306 * server thread. This avoids the overhead of constantly creating and
307 * destroying threads.
308 */
309 struct ipc_server_data {
310 enum magic magic;
311 ipc_server_application_cb *application_cb;
312 void *application_data;
313 struct strbuf buf_path;
314
315 struct ipc_accept_thread_data *accept_thread;
316 struct ipc_worker_thread_data *worker_thread_list;
317
318 pthread_mutex_t work_available_mutex;
319 pthread_cond_t work_available_cond;
320
321 /*
322 * Accepted but not yet processed client connections are kept
323 * in a circular buffer FIFO. The queue is empty when the
324 * positions are equal.
325 */
326 int *fifo_fds;
327 int queue_size;
328 int back_pos;
329 int front_pos;
330
331 int shutdown_requested;
332 int is_stopped;
333 };
334
335 /*
336 * Remove and return the oldest queued connection.
337 *
338 * Returns -1 if empty.
339 */
340 static int fifo_dequeue(struct ipc_server_data *server_data)
341 {
342 /* ASSERT holding mutex */
343
344 int fd;
345
346 if (server_data->back_pos == server_data->front_pos)
347 return -1;
348
349 fd = server_data->fifo_fds[server_data->front_pos];
350 server_data->fifo_fds[server_data->front_pos] = -1;
351
352 server_data->front_pos++;
353 if (server_data->front_pos == server_data->queue_size)
354 server_data->front_pos = 0;
355
356 return fd;
357 }
358
359 /*
360 * Push a new fd onto the back of the queue.
361 *
362 * Drop it and return -1 if queue is already full.
363 */
364 static int fifo_enqueue(struct ipc_server_data *server_data, int fd)
365 {
366 /* ASSERT holding mutex */
367
368 int next_back_pos;
369
370 next_back_pos = server_data->back_pos + 1;
371 if (next_back_pos == server_data->queue_size)
372 next_back_pos = 0;
373
374 if (next_back_pos == server_data->front_pos) {
375 /* Queue is full. Just drop it. */
376 close(fd);
377 return -1;
378 }
379
380 server_data->fifo_fds[server_data->back_pos] = fd;
381 server_data->back_pos = next_back_pos;
382
383 return fd;
384 }
385
386 /*
387 * Wait for a connection to be queued to the FIFO and return it.
388 *
389 * Returns -1 if someone has already requested a shutdown.
390 */
391 static int worker_thread__wait_for_connection(
392 struct ipc_worker_thread_data *worker_thread_data)
393 {
394 /* ASSERT NOT holding mutex */
395
396 struct ipc_server_data *server_data = worker_thread_data->server_data;
397 int fd = -1;
398
399 pthread_mutex_lock(&server_data->work_available_mutex);
400 for (;;) {
401 if (server_data->shutdown_requested)
402 break;
403
404 fd = fifo_dequeue(server_data);
405 if (fd >= 0)
406 break;
407
408 pthread_cond_wait(&server_data->work_available_cond,
409 &server_data->work_available_mutex);
410 }
411 pthread_mutex_unlock(&server_data->work_available_mutex);
412
413 return fd;
414 }
415
416 /*
417 * Forward declare our reply callback function so that any compiler
418 * errors are reported when we actually define the function (in addition
419 * to any errors reported when we try to pass this callback function as
420 * a parameter in a function call). The former are easier to understand.
421 */
422 static ipc_server_reply_cb do_io_reply_callback;
423
424 /*
425 * Relay application's response message to the client process.
426 * (We do not flush at this point because we allow the caller
427 * to chunk data to the client thru us.)
428 */
429 static int do_io_reply_callback(struct ipc_server_reply_data *reply_data,
430 const char *response, size_t response_len)
431 {
432 if (reply_data->magic != MAGIC_SERVER_REPLY_DATA)
433 BUG("reply_cb called with wrong instance data");
434
435 return write_packetized_from_buf_no_flush(response, response_len,
436 reply_data->fd);
437 }
438
439 /* A randomly chosen value. */
440 #define MY_WAIT_POLL_TIMEOUT_MS (10)
441
442 /*
443 * If the client hangs up without sending any data on the wire, just
444 * quietly close the socket and ignore this client.
445 *
446 * This worker thread is committed to reading the IPC request data
447 * from the client at the other end of this fd. Wait here for the
448 * client to actually put something on the wire -- because if the
449 * client just does a ping (connect and hangup without sending any
450 * data), our use of the pkt-line read routines will spew an error
451 * message.
452 *
453 * Return -1 if the client hung up.
454 * Return 0 if data (possibly incomplete) is ready.
455 */
456 static int worker_thread__wait_for_io_start(
457 struct ipc_worker_thread_data *worker_thread_data,
458 int fd)
459 {
460 struct ipc_server_data *server_data = worker_thread_data->server_data;
461 struct pollfd pollfd[1];
462 int result;
463
464 for (;;) {
465 pollfd[0].fd = fd;
466 pollfd[0].events = POLLIN;
467
468 result = poll(pollfd, 1, MY_WAIT_POLL_TIMEOUT_MS);
469 if (result < 0) {
470 if (errno == EINTR)
471 continue;
472 goto cleanup;
473 }
474
475 if (result == 0) {
476 /* a timeout */
477
478 int in_shutdown;
479
480 pthread_mutex_lock(&server_data->work_available_mutex);
481 in_shutdown = server_data->shutdown_requested;
482 pthread_mutex_unlock(&server_data->work_available_mutex);
483
484 /*
485 * If a shutdown is already in progress and this
486 * client has not started talking yet, just drop it.
487 */
488 if (in_shutdown)
489 goto cleanup;
490 continue;
491 }
492
493 if (pollfd[0].revents & POLLHUP)
494 goto cleanup;
495
496 if (pollfd[0].revents & POLLIN)
497 return 0;
498
499 goto cleanup;
500 }
501
502 cleanup:
503 close(fd);
504 return -1;
505 }
506
507 /*
508 * Receive the request/command from the client and pass it to the
509 * registered request-callback. The request-callback will compose
510 * a response and call our reply-callback to send it to the client.
511 */
512 static int worker_thread__do_io(
513 struct ipc_worker_thread_data *worker_thread_data,
514 int fd)
515 {
516 /* ASSERT NOT holding lock */
517
518 struct strbuf buf = STRBUF_INIT;
519 struct ipc_server_reply_data reply_data;
520 int ret = 0;
521
522 reply_data.magic = MAGIC_SERVER_REPLY_DATA;
523 reply_data.worker_thread_data = worker_thread_data;
524
525 reply_data.fd = fd;
526
527 ret = read_packetized_to_strbuf(
528 reply_data.fd, &buf,
529 PACKET_READ_GENTLE_ON_EOF | PACKET_READ_GENTLE_ON_READ_ERROR);
530 if (ret >= 0) {
531 ret = worker_thread_data->server_data->application_cb(
532 worker_thread_data->server_data->application_data,
533 buf.buf, buf.len, do_io_reply_callback, &reply_data);
534
535 packet_flush_gently(reply_data.fd);
536 }
537 else {
538 /*
539 * The client probably disconnected/shutdown before it
540 * could send a well-formed message. Ignore it.
541 */
542 }
543
544 strbuf_release(&buf);
545 close(reply_data.fd);
546
547 return ret;
548 }
549
550 /*
551 * Block SIGPIPE on the current thread (so that we get EPIPE from
552 * write() rather than an actual signal).
553 *
554 * Note that using sigchain_push() and _pop() to control SIGPIPE
555 * around our IO calls is not thread safe:
556 * [] It uses a global stack of handler frames.
557 * [] It uses ALLOC_GROW() to resize it.
558 * [] Finally, according to the `signal(2)` man-page:
559 * "The effects of `signal()` in a multithreaded process are unspecified."
560 */
561 static void thread_block_sigpipe(sigset_t *old_set)
562 {
563 sigset_t new_set;
564
565 sigemptyset(&new_set);
566 sigaddset(&new_set, SIGPIPE);
567
568 sigemptyset(old_set);
569 pthread_sigmask(SIG_BLOCK, &new_set, old_set);
570 }
571
572 /*
573 * Thread proc for an IPC worker thread. It handles a series of
574 * connections from clients. It pulls the next fd from the queue
575 * processes it, and then waits for the next client.
576 *
577 * Block SIGPIPE in this worker thread for the life of the thread.
578 * This avoids stray (and sometimes delayed) SIGPIPE signals caused
579 * by client errors and/or when we are under extremely heavy IO load.
580 *
581 * This means that the application callback will have SIGPIPE blocked.
582 * The callback should not change it.
583 */
584 static void *worker_thread_proc(void *_worker_thread_data)
585 {
586 struct ipc_worker_thread_data *worker_thread_data = _worker_thread_data;
587 struct ipc_server_data *server_data = worker_thread_data->server_data;
588 sigset_t old_set;
589 int fd, io;
590 int ret;
591
592 trace2_thread_start("ipc-worker");
593
594 thread_block_sigpipe(&old_set);
595
596 for (;;) {
597 fd = worker_thread__wait_for_connection(worker_thread_data);
598 if (fd == -1)
599 break; /* in shutdown */
600
601 io = worker_thread__wait_for_io_start(worker_thread_data, fd);
602 if (io == -1)
603 continue; /* client hung up without sending anything */
604
605 ret = worker_thread__do_io(worker_thread_data, fd);
606
607 if (ret == SIMPLE_IPC_QUIT) {
608 trace2_data_string("ipc-worker", NULL, "queue_stop_async",
609 "application_quit");
610 /*
611 * The application layer is telling the ipc-server
612 * layer to shutdown.
613 *
614 * We DO NOT have a response to send to the client.
615 *
616 * Queue an async stop (to stop the other threads) and
617 * allow this worker thread to exit now (no sense waiting
618 * for the thread-pool shutdown signal).
619 *
620 * Other non-idle worker threads are allowed to finish
621 * responding to their current clients.
622 */
623 ipc_server_stop_async(server_data);
624 break;
625 }
626 }
627
628 trace2_thread_exit();
629 return NULL;
630 }
631
632 /* A randomly chosen value. */
633 #define MY_ACCEPT_POLL_TIMEOUT_MS (60 * 1000)
634
635 /*
636 * Accept a new client connection on our socket. This uses non-blocking
637 * IO so that we can also wait for shutdown requests on our socket-pair
638 * without actually spinning on a fast timeout.
639 */
640 static int accept_thread__wait_for_connection(
641 struct ipc_accept_thread_data *accept_thread_data)
642 {
643 struct pollfd pollfd[2];
644 int result;
645
646 for (;;) {
647 pollfd[0].fd = accept_thread_data->fd_wait_shutdown;
648 pollfd[0].events = POLLIN;
649
650 pollfd[1].fd = accept_thread_data->server_socket->fd_socket;
651 pollfd[1].events = POLLIN;
652
653 result = poll(pollfd, 2, MY_ACCEPT_POLL_TIMEOUT_MS);
654 if (result < 0) {
655 if (errno == EINTR)
656 continue;
657 return result;
658 }
659
660 if (result == 0) {
661 /* a timeout */
662
663 /*
664 * If someone deletes or force-creates a new unix
665 * domain socket at our path, all future clients
666 * will be routed elsewhere and we silently starve.
667 * If that happens, just queue a shutdown.
668 */
669 if (unix_ss_was_stolen(
670 accept_thread_data->server_socket)) {
671 trace2_data_string("ipc-accept", NULL,
672 "queue_stop_async",
673 "socket_stolen");
674 ipc_server_stop_async(
675 accept_thread_data->server_data);
676 }
677 continue;
678 }
679
680 if (pollfd[0].revents & POLLIN) {
681 /* shutdown message queued to socketpair */
682 return -1;
683 }
684
685 if (pollfd[1].revents & POLLIN) {
686 /* a connection is available on server_socket */
687
688 int client_fd =
689 accept(accept_thread_data->server_socket->fd_socket,
690 NULL, NULL);
691 if (client_fd >= 0)
692 return client_fd;
693
694 /*
695 * An error here is unlikely -- it probably
696 * indicates that the connecting process has
697 * already dropped the connection.
698 */
699 continue;
700 }
701
702 BUG("unandled poll result errno=%d r[0]=%d r[1]=%d",
703 errno, pollfd[0].revents, pollfd[1].revents);
704 }
705 }
706
707 /*
708 * Thread proc for the IPC server "accept thread". This waits for
709 * an incoming socket connection, appends it to the queue of available
710 * connections, and notifies a worker thread to process it.
711 *
712 * Block SIGPIPE in this thread for the life of the thread. This
713 * avoids any stray SIGPIPE signals when closing pipe fds under
714 * extremely heavy loads (such as when the fifo queue is full and we
715 * drop incomming connections).
716 */
717 static void *accept_thread_proc(void *_accept_thread_data)
718 {
719 struct ipc_accept_thread_data *accept_thread_data = _accept_thread_data;
720 struct ipc_server_data *server_data = accept_thread_data->server_data;
721 sigset_t old_set;
722
723 trace2_thread_start("ipc-accept");
724
725 thread_block_sigpipe(&old_set);
726
727 for (;;) {
728 int client_fd = accept_thread__wait_for_connection(
729 accept_thread_data);
730
731 pthread_mutex_lock(&server_data->work_available_mutex);
732 if (server_data->shutdown_requested) {
733 pthread_mutex_unlock(&server_data->work_available_mutex);
734 if (client_fd >= 0)
735 close(client_fd);
736 break;
737 }
738
739 if (client_fd < 0) {
740 /* ignore transient accept() errors */
741 }
742 else {
743 fifo_enqueue(server_data, client_fd);
744 pthread_cond_broadcast(&server_data->work_available_cond);
745 }
746 pthread_mutex_unlock(&server_data->work_available_mutex);
747 }
748
749 trace2_thread_exit();
750 return NULL;
751 }
752
753 /*
754 * We can't predict the connection arrival rate relative to the worker
755 * processing rate, therefore we allow the "accept-thread" to queue up
756 * a generous number of connections, since we'd rather have the client
757 * not unnecessarily timeout if we can avoid it. (The assumption is
758 * that this will be used for FSMonitor and a few second wait on a
759 * connection is better than having the client timeout and do the full
760 * computation itself.)
761 *
762 * The FIFO queue size is set to a multiple of the worker pool size.
763 * This value chosen at random.
764 */
765 #define FIFO_SCALE (100)
766
767 /*
768 * The backlog value for `listen(2)`. This doesn't need to huge,
769 * rather just large enough for our "accept-thread" to wake up and
770 * queue incoming connections onto the FIFO without the kernel
771 * dropping any.
772 *
773 * This value chosen at random.
774 */
775 #define LISTEN_BACKLOG (50)
776
777 static int create_listener_socket(
778 const char *path,
779 const struct ipc_server_opts *ipc_opts,
780 struct unix_ss_socket **new_server_socket)
781 {
782 struct unix_ss_socket *server_socket = NULL;
783 struct unix_stream_listen_opts uslg_opts = UNIX_STREAM_LISTEN_OPTS_INIT;
784 int ret;
785
786 uslg_opts.listen_backlog_size = LISTEN_BACKLOG;
787 uslg_opts.disallow_chdir = ipc_opts->uds_disallow_chdir;
788
789 ret = unix_ss_create(path, &uslg_opts, -1, &server_socket);
790 if (ret)
791 return ret;
792
793 if (set_socket_blocking_flag(server_socket->fd_socket, 1)) {
794 int saved_errno = errno;
795 unix_ss_free(server_socket);
796 errno = saved_errno;
797 return -1;
798 }
799
800 *new_server_socket = server_socket;
801
802 trace2_data_string("ipc-server", NULL, "listen-with-lock", path);
803 return 0;
804 }
805
806 static int setup_listener_socket(
807 const char *path,
808 const struct ipc_server_opts *ipc_opts,
809 struct unix_ss_socket **new_server_socket)
810 {
811 int ret, saved_errno;
812
813 trace2_region_enter("ipc-server", "create-listener_socket", NULL);
814
815 ret = create_listener_socket(path, ipc_opts, new_server_socket);
816
817 saved_errno = errno;
818 trace2_region_leave("ipc-server", "create-listener_socket", NULL);
819 errno = saved_errno;
820
821 return ret;
822 }
823
824 /*
825 * Start IPC server in a pool of background threads.
826 */
827 int ipc_server_run_async(struct ipc_server_data **returned_server_data,
828 const char *path, const struct ipc_server_opts *opts,
829 ipc_server_application_cb *application_cb,
830 void *application_data)
831 {
832 struct unix_ss_socket *server_socket = NULL;
833 struct ipc_server_data *server_data;
834 int sv[2];
835 int k;
836 int ret;
837 int nr_threads = opts->nr_threads;
838
839 *returned_server_data = NULL;
840
841 /*
842 * Create a socketpair and set sv[1] to non-blocking. This
843 * will used to send a shutdown message to the accept-thread
844 * and allows the accept-thread to wait on EITHER a client
845 * connection or a shutdown request without spinning.
846 */
847 if (socketpair(AF_UNIX, SOCK_STREAM, 0, sv) < 0)
848 return -1;
849
850 if (set_socket_blocking_flag(sv[1], 1)) {
851 int saved_errno = errno;
852 close(sv[0]);
853 close(sv[1]);
854 errno = saved_errno;
855 return -1;
856 }
857
858 ret = setup_listener_socket(path, opts, &server_socket);
859 if (ret) {
860 int saved_errno = errno;
861 close(sv[0]);
862 close(sv[1]);
863 errno = saved_errno;
864 return ret;
865 }
866
867 server_data = xcalloc(1, sizeof(*server_data));
868 server_data->magic = MAGIC_SERVER_DATA;
869 server_data->application_cb = application_cb;
870 server_data->application_data = application_data;
871 strbuf_init(&server_data->buf_path, 0);
872 strbuf_addstr(&server_data->buf_path, path);
873
874 if (nr_threads < 1)
875 nr_threads = 1;
876
877 pthread_mutex_init(&server_data->work_available_mutex, NULL);
878 pthread_cond_init(&server_data->work_available_cond, NULL);
879
880 server_data->queue_size = nr_threads * FIFO_SCALE;
881 CALLOC_ARRAY(server_data->fifo_fds, server_data->queue_size);
882
883 server_data->accept_thread =
884 xcalloc(1, sizeof(*server_data->accept_thread));
885 server_data->accept_thread->magic = MAGIC_ACCEPT_THREAD_DATA;
886 server_data->accept_thread->server_data = server_data;
887 server_data->accept_thread->server_socket = server_socket;
888 server_data->accept_thread->fd_send_shutdown = sv[0];
889 server_data->accept_thread->fd_wait_shutdown = sv[1];
890
891 if (pthread_create(&server_data->accept_thread->pthread_id, NULL,
892 accept_thread_proc, server_data->accept_thread))
893 die_errno(_("could not start accept_thread '%s'"), path);
894
895 for (k = 0; k < nr_threads; k++) {
896 struct ipc_worker_thread_data *wtd;
897
898 wtd = xcalloc(1, sizeof(*wtd));
899 wtd->magic = MAGIC_WORKER_THREAD_DATA;
900 wtd->server_data = server_data;
901
902 if (pthread_create(&wtd->pthread_id, NULL, worker_thread_proc,
903 wtd)) {
904 if (k == 0)
905 die(_("could not start worker[0] for '%s'"),
906 path);
907 /*
908 * Limp along with the thread pool that we have.
909 */
910 break;
911 }
912
913 wtd->next_thread = server_data->worker_thread_list;
914 server_data->worker_thread_list = wtd;
915 }
916
917 *returned_server_data = server_data;
918 return 0;
919 }
920
921 /*
922 * Gently tell the IPC server treads to shutdown.
923 * Can be run on any thread.
924 */
925 int ipc_server_stop_async(struct ipc_server_data *server_data)
926 {
927 /* ASSERT NOT holding mutex */
928
929 int fd;
930
931 if (!server_data)
932 return 0;
933
934 trace2_region_enter("ipc-server", "server-stop-async", NULL);
935
936 pthread_mutex_lock(&server_data->work_available_mutex);
937
938 server_data->shutdown_requested = 1;
939
940 /*
941 * Write a byte to the shutdown socket pair to wake up the
942 * accept-thread.
943 */
944 if (write(server_data->accept_thread->fd_send_shutdown, "Q", 1) < 0)
945 error_errno("could not write to fd_send_shutdown");
946
947 /*
948 * Drain the queue of existing connections.
949 */
950 while ((fd = fifo_dequeue(server_data)) != -1)
951 close(fd);
952
953 /*
954 * Gently tell worker threads to stop processing new connections
955 * and exit. (This does not abort in-process conversations.)
956 */
957 pthread_cond_broadcast(&server_data->work_available_cond);
958
959 pthread_mutex_unlock(&server_data->work_available_mutex);
960
961 trace2_region_leave("ipc-server", "server-stop-async", NULL);
962
963 return 0;
964 }
965
966 /*
967 * Wait for all IPC server threads to stop.
968 */
969 int ipc_server_await(struct ipc_server_data *server_data)
970 {
971 pthread_join(server_data->accept_thread->pthread_id, NULL);
972
973 if (!server_data->shutdown_requested)
974 BUG("ipc-server: accept-thread stopped for '%s'",
975 server_data->buf_path.buf);
976
977 while (server_data->worker_thread_list) {
978 struct ipc_worker_thread_data *wtd =
979 server_data->worker_thread_list;
980
981 pthread_join(wtd->pthread_id, NULL);
982
983 server_data->worker_thread_list = wtd->next_thread;
984 free(wtd);
985 }
986
987 server_data->is_stopped = 1;
988
989 return 0;
990 }
991
992 void ipc_server_free(struct ipc_server_data *server_data)
993 {
994 struct ipc_accept_thread_data * accept_thread_data;
995
996 if (!server_data)
997 return;
998
999 if (!server_data->is_stopped)
1000 BUG("cannot free ipc-server while running for '%s'",
1001 server_data->buf_path.buf);
1002
1003 accept_thread_data = server_data->accept_thread;
1004 if (accept_thread_data) {
1005 unix_ss_free(accept_thread_data->server_socket);
1006
1007 if (accept_thread_data->fd_send_shutdown != -1)
1008 close(accept_thread_data->fd_send_shutdown);
1009 if (accept_thread_data->fd_wait_shutdown != -1)
1010 close(accept_thread_data->fd_wait_shutdown);
1011
1012 free(server_data->accept_thread);
1013 }
1014
1015 while (server_data->worker_thread_list) {
1016 struct ipc_worker_thread_data *wtd =
1017 server_data->worker_thread_list;
1018
1019 server_data->worker_thread_list = wtd->next_thread;
1020 free(wtd);
1021 }
1022
1023 pthread_cond_destroy(&server_data->work_available_cond);
1024 pthread_mutex_destroy(&server_data->work_available_mutex);
1025
1026 strbuf_release(&server_data->buf_path);
1027
1028 free(server_data->fifo_fds);
1029 free(server_data);
1030 }