]> git.ipfire.org Git - thirdparty/git.git/blob - compat/simple-ipc/ipc-unix-socket.c
parse-options: simplify positivation handling
[thirdparty/git.git] / compat / simple-ipc / ipc-unix-socket.c
1 #include "git-compat-util.h"
2 #include "gettext.h"
3 #include "simple-ipc.h"
4 #include "strbuf.h"
5 #include "pkt-line.h"
6 #include "thread-utils.h"
7 #include "trace2.h"
8 #include "unix-socket.h"
9 #include "unix-stream-server.h"
10
11 #ifndef SUPPORTS_SIMPLE_IPC
12 /*
13 * This source file should only be compiled when Simple IPC is supported.
14 * See the top-level Makefile.
15 */
16 #error SUPPORTS_SIMPLE_IPC not defined
17 #endif
18
19 enum ipc_active_state ipc_get_active_state(const char *path)
20 {
21 enum ipc_active_state state = IPC_STATE__OTHER_ERROR;
22 struct ipc_client_connect_options options
23 = IPC_CLIENT_CONNECT_OPTIONS_INIT;
24 struct stat st;
25 struct ipc_client_connection *connection_test = NULL;
26
27 options.wait_if_busy = 0;
28 options.wait_if_not_found = 0;
29
30 if (lstat(path, &st) == -1) {
31 switch (errno) {
32 case ENOENT:
33 case ENOTDIR:
34 return IPC_STATE__NOT_LISTENING;
35 default:
36 return IPC_STATE__INVALID_PATH;
37 }
38 }
39
40 #ifdef __CYGWIN__
41 /*
42 * Cygwin emulates Unix sockets by writing special-crafted files whose
43 * `system` bit is set.
44 *
45 * If we are too fast, Cygwin might still be in the process of marking
46 * the underlying file as a system file. Until then, we will not see a
47 * Unix socket here, but a plain file instead. Just in case that this
48 * is happening, wait a little and try again.
49 */
50 {
51 static const int delay[] = { 1, 10, 20, 40, -1 };
52 int i;
53
54 for (i = 0; S_ISREG(st.st_mode) && delay[i] > 0; i++) {
55 sleep_millisec(delay[i]);
56 if (lstat(path, &st) == -1)
57 return IPC_STATE__INVALID_PATH;
58 }
59 }
60 #endif
61
62 /* also complain if a plain file is in the way */
63 if ((st.st_mode & S_IFMT) != S_IFSOCK)
64 return IPC_STATE__INVALID_PATH;
65
66 /*
67 * Just because the filesystem has a S_IFSOCK type inode
68 * at `path`, doesn't mean it that there is a server listening.
69 * Ping it to be sure.
70 */
71 state = ipc_client_try_connect(path, &options, &connection_test);
72 ipc_client_close_connection(connection_test);
73
74 return state;
75 }
76
77 /*
78 * Retry frequency when trying to connect to a server.
79 *
80 * This value should be short enough that we don't seriously delay our
81 * caller, but not fast enough that our spinning puts pressure on the
82 * system.
83 */
84 #define WAIT_STEP_MS (50)
85
86 /*
87 * Try to connect to the server. If the server is just starting up or
88 * is very busy, we may not get a connection the first time.
89 */
90 static enum ipc_active_state connect_to_server(
91 const char *path,
92 int timeout_ms,
93 const struct ipc_client_connect_options *options,
94 int *pfd)
95 {
96 int k;
97
98 *pfd = -1;
99
100 for (k = 0; k < timeout_ms; k += WAIT_STEP_MS) {
101 int fd = unix_stream_connect(path, options->uds_disallow_chdir);
102
103 if (fd != -1) {
104 *pfd = fd;
105 return IPC_STATE__LISTENING;
106 }
107
108 if (errno == ENOENT) {
109 if (!options->wait_if_not_found)
110 return IPC_STATE__PATH_NOT_FOUND;
111
112 goto sleep_and_try_again;
113 }
114
115 if (errno == ETIMEDOUT) {
116 if (!options->wait_if_busy)
117 return IPC_STATE__NOT_LISTENING;
118
119 goto sleep_and_try_again;
120 }
121
122 if (errno == ECONNREFUSED) {
123 if (!options->wait_if_busy)
124 return IPC_STATE__NOT_LISTENING;
125
126 goto sleep_and_try_again;
127 }
128
129 return IPC_STATE__OTHER_ERROR;
130
131 sleep_and_try_again:
132 sleep_millisec(WAIT_STEP_MS);
133 }
134
135 return IPC_STATE__NOT_LISTENING;
136 }
137
138 /*
139 * The total amount of time that we are willing to wait when trying to
140 * connect to a server.
141 *
142 * When the server is first started, it might take a little while for
143 * it to become ready to service requests. Likewise, the server may
144 * be very (temporarily) busy and not respond to our connections.
145 *
146 * We should gracefully and silently handle those conditions and try
147 * again for a reasonable time period.
148 *
149 * The value chosen here should be long enough for the server
150 * to reliably heal from the above conditions.
151 */
152 #define MY_CONNECTION_TIMEOUT_MS (1000)
153
154 enum ipc_active_state ipc_client_try_connect(
155 const char *path,
156 const struct ipc_client_connect_options *options,
157 struct ipc_client_connection **p_connection)
158 {
159 enum ipc_active_state state = IPC_STATE__OTHER_ERROR;
160 int fd = -1;
161
162 *p_connection = NULL;
163
164 trace2_region_enter("ipc-client", "try-connect", NULL);
165 trace2_data_string("ipc-client", NULL, "try-connect/path", path);
166
167 state = connect_to_server(path, MY_CONNECTION_TIMEOUT_MS,
168 options, &fd);
169
170 trace2_data_intmax("ipc-client", NULL, "try-connect/state",
171 (intmax_t)state);
172 trace2_region_leave("ipc-client", "try-connect", NULL);
173
174 if (state == IPC_STATE__LISTENING) {
175 (*p_connection) = xcalloc(1, sizeof(struct ipc_client_connection));
176 (*p_connection)->fd = fd;
177 }
178
179 return state;
180 }
181
182 void ipc_client_close_connection(struct ipc_client_connection *connection)
183 {
184 if (!connection)
185 return;
186
187 if (connection->fd != -1)
188 close(connection->fd);
189
190 free(connection);
191 }
192
193 int ipc_client_send_command_to_connection(
194 struct ipc_client_connection *connection,
195 const char *message, size_t message_len,
196 struct strbuf *answer)
197 {
198 int ret = 0;
199
200 strbuf_setlen(answer, 0);
201
202 trace2_region_enter("ipc-client", "send-command", NULL);
203
204 if (write_packetized_from_buf_no_flush(message, message_len,
205 connection->fd) < 0 ||
206 packet_flush_gently(connection->fd) < 0) {
207 ret = error(_("could not send IPC command"));
208 goto done;
209 }
210
211 if (read_packetized_to_strbuf(
212 connection->fd, answer,
213 PACKET_READ_GENTLE_ON_EOF | PACKET_READ_GENTLE_ON_READ_ERROR) < 0) {
214 ret = error(_("could not read IPC response"));
215 goto done;
216 }
217
218 done:
219 trace2_region_leave("ipc-client", "send-command", NULL);
220 return ret;
221 }
222
223 int ipc_client_send_command(const char *path,
224 const struct ipc_client_connect_options *options,
225 const char *message, size_t message_len,
226 struct strbuf *answer)
227 {
228 int ret = -1;
229 enum ipc_active_state state;
230 struct ipc_client_connection *connection = NULL;
231
232 state = ipc_client_try_connect(path, options, &connection);
233
234 if (state != IPC_STATE__LISTENING)
235 return ret;
236
237 ret = ipc_client_send_command_to_connection(connection,
238 message, message_len,
239 answer);
240
241 ipc_client_close_connection(connection);
242
243 return ret;
244 }
245
246 static int set_socket_blocking_flag(int fd, int make_nonblocking)
247 {
248 int flags;
249
250 flags = fcntl(fd, F_GETFL, NULL);
251
252 if (flags < 0)
253 return -1;
254
255 if (make_nonblocking)
256 flags |= O_NONBLOCK;
257 else
258 flags &= ~O_NONBLOCK;
259
260 return fcntl(fd, F_SETFL, flags);
261 }
262
263 /*
264 * Magic numbers used to annotate callback instance data.
265 * These are used to help guard against accidentally passing the
266 * wrong instance data across multiple levels of callbacks (which
267 * is easy to do if there are `void*` arguments).
268 */
269 enum magic {
270 MAGIC_SERVER_REPLY_DATA,
271 MAGIC_WORKER_THREAD_DATA,
272 MAGIC_ACCEPT_THREAD_DATA,
273 MAGIC_SERVER_DATA,
274 };
275
276 struct ipc_server_reply_data {
277 enum magic magic;
278 int fd;
279 struct ipc_worker_thread_data *worker_thread_data;
280 };
281
282 struct ipc_worker_thread_data {
283 enum magic magic;
284 struct ipc_worker_thread_data *next_thread;
285 struct ipc_server_data *server_data;
286 pthread_t pthread_id;
287 };
288
289 struct ipc_accept_thread_data {
290 enum magic magic;
291 struct ipc_server_data *server_data;
292
293 struct unix_ss_socket *server_socket;
294
295 int fd_send_shutdown;
296 int fd_wait_shutdown;
297 pthread_t pthread_id;
298 };
299
300 /*
301 * With unix-sockets, the conceptual "ipc-server" is implemented as a single
302 * controller "accept-thread" thread and a pool of "worker-thread" threads.
303 * The former does the usual `accept()` loop and dispatches connections
304 * to an idle worker thread. The worker threads wait in an idle loop for
305 * a new connection, communicate with the client and relay data to/from
306 * the `application_cb` and then wait for another connection from the
307 * server thread. This avoids the overhead of constantly creating and
308 * destroying threads.
309 */
310 struct ipc_server_data {
311 enum magic magic;
312 ipc_server_application_cb *application_cb;
313 void *application_data;
314 struct strbuf buf_path;
315
316 struct ipc_accept_thread_data *accept_thread;
317 struct ipc_worker_thread_data *worker_thread_list;
318
319 pthread_mutex_t work_available_mutex;
320 pthread_cond_t work_available_cond;
321
322 /*
323 * Accepted but not yet processed client connections are kept
324 * in a circular buffer FIFO. The queue is empty when the
325 * positions are equal.
326 */
327 int *fifo_fds;
328 int queue_size;
329 int back_pos;
330 int front_pos;
331
332 int shutdown_requested;
333 int is_stopped;
334 };
335
336 /*
337 * Remove and return the oldest queued connection.
338 *
339 * Returns -1 if empty.
340 */
341 static int fifo_dequeue(struct ipc_server_data *server_data)
342 {
343 /* ASSERT holding mutex */
344
345 int fd;
346
347 if (server_data->back_pos == server_data->front_pos)
348 return -1;
349
350 fd = server_data->fifo_fds[server_data->front_pos];
351 server_data->fifo_fds[server_data->front_pos] = -1;
352
353 server_data->front_pos++;
354 if (server_data->front_pos == server_data->queue_size)
355 server_data->front_pos = 0;
356
357 return fd;
358 }
359
360 /*
361 * Push a new fd onto the back of the queue.
362 *
363 * Drop it and return -1 if queue is already full.
364 */
365 static int fifo_enqueue(struct ipc_server_data *server_data, int fd)
366 {
367 /* ASSERT holding mutex */
368
369 int next_back_pos;
370
371 next_back_pos = server_data->back_pos + 1;
372 if (next_back_pos == server_data->queue_size)
373 next_back_pos = 0;
374
375 if (next_back_pos == server_data->front_pos) {
376 /* Queue is full. Just drop it. */
377 close(fd);
378 return -1;
379 }
380
381 server_data->fifo_fds[server_data->back_pos] = fd;
382 server_data->back_pos = next_back_pos;
383
384 return fd;
385 }
386
387 /*
388 * Wait for a connection to be queued to the FIFO and return it.
389 *
390 * Returns -1 if someone has already requested a shutdown.
391 */
392 static int worker_thread__wait_for_connection(
393 struct ipc_worker_thread_data *worker_thread_data)
394 {
395 /* ASSERT NOT holding mutex */
396
397 struct ipc_server_data *server_data = worker_thread_data->server_data;
398 int fd = -1;
399
400 pthread_mutex_lock(&server_data->work_available_mutex);
401 for (;;) {
402 if (server_data->shutdown_requested)
403 break;
404
405 fd = fifo_dequeue(server_data);
406 if (fd >= 0)
407 break;
408
409 pthread_cond_wait(&server_data->work_available_cond,
410 &server_data->work_available_mutex);
411 }
412 pthread_mutex_unlock(&server_data->work_available_mutex);
413
414 return fd;
415 }
416
417 /*
418 * Forward declare our reply callback function so that any compiler
419 * errors are reported when we actually define the function (in addition
420 * to any errors reported when we try to pass this callback function as
421 * a parameter in a function call). The former are easier to understand.
422 */
423 static ipc_server_reply_cb do_io_reply_callback;
424
425 /*
426 * Relay application's response message to the client process.
427 * (We do not flush at this point because we allow the caller
428 * to chunk data to the client thru us.)
429 */
430 static int do_io_reply_callback(struct ipc_server_reply_data *reply_data,
431 const char *response, size_t response_len)
432 {
433 if (reply_data->magic != MAGIC_SERVER_REPLY_DATA)
434 BUG("reply_cb called with wrong instance data");
435
436 return write_packetized_from_buf_no_flush(response, response_len,
437 reply_data->fd);
438 }
439
440 /* A randomly chosen value. */
441 #define MY_WAIT_POLL_TIMEOUT_MS (10)
442
443 /*
444 * If the client hangs up without sending any data on the wire, just
445 * quietly close the socket and ignore this client.
446 *
447 * This worker thread is committed to reading the IPC request data
448 * from the client at the other end of this fd. Wait here for the
449 * client to actually put something on the wire -- because if the
450 * client just does a ping (connect and hangup without sending any
451 * data), our use of the pkt-line read routines will spew an error
452 * message.
453 *
454 * Return -1 if the client hung up.
455 * Return 0 if data (possibly incomplete) is ready.
456 */
457 static int worker_thread__wait_for_io_start(
458 struct ipc_worker_thread_data *worker_thread_data,
459 int fd)
460 {
461 struct ipc_server_data *server_data = worker_thread_data->server_data;
462 struct pollfd pollfd[1];
463 int result;
464
465 for (;;) {
466 pollfd[0].fd = fd;
467 pollfd[0].events = POLLIN;
468
469 result = poll(pollfd, 1, MY_WAIT_POLL_TIMEOUT_MS);
470 if (result < 0) {
471 if (errno == EINTR)
472 continue;
473 goto cleanup;
474 }
475
476 if (result == 0) {
477 /* a timeout */
478
479 int in_shutdown;
480
481 pthread_mutex_lock(&server_data->work_available_mutex);
482 in_shutdown = server_data->shutdown_requested;
483 pthread_mutex_unlock(&server_data->work_available_mutex);
484
485 /*
486 * If a shutdown is already in progress and this
487 * client has not started talking yet, just drop it.
488 */
489 if (in_shutdown)
490 goto cleanup;
491 continue;
492 }
493
494 if (pollfd[0].revents & POLLHUP)
495 goto cleanup;
496
497 if (pollfd[0].revents & POLLIN)
498 return 0;
499
500 goto cleanup;
501 }
502
503 cleanup:
504 close(fd);
505 return -1;
506 }
507
508 /*
509 * Receive the request/command from the client and pass it to the
510 * registered request-callback. The request-callback will compose
511 * a response and call our reply-callback to send it to the client.
512 */
513 static int worker_thread__do_io(
514 struct ipc_worker_thread_data *worker_thread_data,
515 int fd)
516 {
517 /* ASSERT NOT holding lock */
518
519 struct strbuf buf = STRBUF_INIT;
520 struct ipc_server_reply_data reply_data;
521 int ret = 0;
522
523 reply_data.magic = MAGIC_SERVER_REPLY_DATA;
524 reply_data.worker_thread_data = worker_thread_data;
525
526 reply_data.fd = fd;
527
528 ret = read_packetized_to_strbuf(
529 reply_data.fd, &buf,
530 PACKET_READ_GENTLE_ON_EOF | PACKET_READ_GENTLE_ON_READ_ERROR);
531 if (ret >= 0) {
532 ret = worker_thread_data->server_data->application_cb(
533 worker_thread_data->server_data->application_data,
534 buf.buf, buf.len, do_io_reply_callback, &reply_data);
535
536 packet_flush_gently(reply_data.fd);
537 }
538 else {
539 /*
540 * The client probably disconnected/shutdown before it
541 * could send a well-formed message. Ignore it.
542 */
543 }
544
545 strbuf_release(&buf);
546 close(reply_data.fd);
547
548 return ret;
549 }
550
551 /*
552 * Block SIGPIPE on the current thread (so that we get EPIPE from
553 * write() rather than an actual signal).
554 *
555 * Note that using sigchain_push() and _pop() to control SIGPIPE
556 * around our IO calls is not thread safe:
557 * [] It uses a global stack of handler frames.
558 * [] It uses ALLOC_GROW() to resize it.
559 * [] Finally, according to the `signal(2)` man-page:
560 * "The effects of `signal()` in a multithreaded process are unspecified."
561 */
562 static void thread_block_sigpipe(sigset_t *old_set)
563 {
564 sigset_t new_set;
565
566 sigemptyset(&new_set);
567 sigaddset(&new_set, SIGPIPE);
568
569 sigemptyset(old_set);
570 pthread_sigmask(SIG_BLOCK, &new_set, old_set);
571 }
572
573 /*
574 * Thread proc for an IPC worker thread. It handles a series of
575 * connections from clients. It pulls the next fd from the queue
576 * processes it, and then waits for the next client.
577 *
578 * Block SIGPIPE in this worker thread for the life of the thread.
579 * This avoids stray (and sometimes delayed) SIGPIPE signals caused
580 * by client errors and/or when we are under extremely heavy IO load.
581 *
582 * This means that the application callback will have SIGPIPE blocked.
583 * The callback should not change it.
584 */
585 static void *worker_thread_proc(void *_worker_thread_data)
586 {
587 struct ipc_worker_thread_data *worker_thread_data = _worker_thread_data;
588 struct ipc_server_data *server_data = worker_thread_data->server_data;
589 sigset_t old_set;
590 int fd, io;
591 int ret;
592
593 trace2_thread_start("ipc-worker");
594
595 thread_block_sigpipe(&old_set);
596
597 for (;;) {
598 fd = worker_thread__wait_for_connection(worker_thread_data);
599 if (fd == -1)
600 break; /* in shutdown */
601
602 io = worker_thread__wait_for_io_start(worker_thread_data, fd);
603 if (io == -1)
604 continue; /* client hung up without sending anything */
605
606 ret = worker_thread__do_io(worker_thread_data, fd);
607
608 if (ret == SIMPLE_IPC_QUIT) {
609 trace2_data_string("ipc-worker", NULL, "queue_stop_async",
610 "application_quit");
611 /*
612 * The application layer is telling the ipc-server
613 * layer to shutdown.
614 *
615 * We DO NOT have a response to send to the client.
616 *
617 * Queue an async stop (to stop the other threads) and
618 * allow this worker thread to exit now (no sense waiting
619 * for the thread-pool shutdown signal).
620 *
621 * Other non-idle worker threads are allowed to finish
622 * responding to their current clients.
623 */
624 ipc_server_stop_async(server_data);
625 break;
626 }
627 }
628
629 trace2_thread_exit();
630 return NULL;
631 }
632
633 /* A randomly chosen value. */
634 #define MY_ACCEPT_POLL_TIMEOUT_MS (60 * 1000)
635
636 /*
637 * Accept a new client connection on our socket. This uses non-blocking
638 * IO so that we can also wait for shutdown requests on our socket-pair
639 * without actually spinning on a fast timeout.
640 */
641 static int accept_thread__wait_for_connection(
642 struct ipc_accept_thread_data *accept_thread_data)
643 {
644 struct pollfd pollfd[2];
645 int result;
646
647 for (;;) {
648 pollfd[0].fd = accept_thread_data->fd_wait_shutdown;
649 pollfd[0].events = POLLIN;
650
651 pollfd[1].fd = accept_thread_data->server_socket->fd_socket;
652 pollfd[1].events = POLLIN;
653
654 result = poll(pollfd, 2, MY_ACCEPT_POLL_TIMEOUT_MS);
655 if (result < 0) {
656 if (errno == EINTR)
657 continue;
658 return result;
659 }
660
661 if (result == 0) {
662 /* a timeout */
663
664 /*
665 * If someone deletes or force-creates a new unix
666 * domain socket at our path, all future clients
667 * will be routed elsewhere and we silently starve.
668 * If that happens, just queue a shutdown.
669 */
670 if (unix_ss_was_stolen(
671 accept_thread_data->server_socket)) {
672 trace2_data_string("ipc-accept", NULL,
673 "queue_stop_async",
674 "socket_stolen");
675 ipc_server_stop_async(
676 accept_thread_data->server_data);
677 }
678 continue;
679 }
680
681 if (pollfd[0].revents & POLLIN) {
682 /* shutdown message queued to socketpair */
683 return -1;
684 }
685
686 if (pollfd[1].revents & POLLIN) {
687 /* a connection is available on server_socket */
688
689 int client_fd =
690 accept(accept_thread_data->server_socket->fd_socket,
691 NULL, NULL);
692 if (client_fd >= 0)
693 return client_fd;
694
695 /*
696 * An error here is unlikely -- it probably
697 * indicates that the connecting process has
698 * already dropped the connection.
699 */
700 continue;
701 }
702
703 BUG("unandled poll result errno=%d r[0]=%d r[1]=%d",
704 errno, pollfd[0].revents, pollfd[1].revents);
705 }
706 }
707
708 /*
709 * Thread proc for the IPC server "accept thread". This waits for
710 * an incoming socket connection, appends it to the queue of available
711 * connections, and notifies a worker thread to process it.
712 *
713 * Block SIGPIPE in this thread for the life of the thread. This
714 * avoids any stray SIGPIPE signals when closing pipe fds under
715 * extremely heavy loads (such as when the fifo queue is full and we
716 * drop incomming connections).
717 */
718 static void *accept_thread_proc(void *_accept_thread_data)
719 {
720 struct ipc_accept_thread_data *accept_thread_data = _accept_thread_data;
721 struct ipc_server_data *server_data = accept_thread_data->server_data;
722 sigset_t old_set;
723
724 trace2_thread_start("ipc-accept");
725
726 thread_block_sigpipe(&old_set);
727
728 for (;;) {
729 int client_fd = accept_thread__wait_for_connection(
730 accept_thread_data);
731
732 pthread_mutex_lock(&server_data->work_available_mutex);
733 if (server_data->shutdown_requested) {
734 pthread_mutex_unlock(&server_data->work_available_mutex);
735 if (client_fd >= 0)
736 close(client_fd);
737 break;
738 }
739
740 if (client_fd < 0) {
741 /* ignore transient accept() errors */
742 }
743 else {
744 fifo_enqueue(server_data, client_fd);
745 pthread_cond_broadcast(&server_data->work_available_cond);
746 }
747 pthread_mutex_unlock(&server_data->work_available_mutex);
748 }
749
750 trace2_thread_exit();
751 return NULL;
752 }
753
754 /*
755 * We can't predict the connection arrival rate relative to the worker
756 * processing rate, therefore we allow the "accept-thread" to queue up
757 * a generous number of connections, since we'd rather have the client
758 * not unnecessarily timeout if we can avoid it. (The assumption is
759 * that this will be used for FSMonitor and a few second wait on a
760 * connection is better than having the client timeout and do the full
761 * computation itself.)
762 *
763 * The FIFO queue size is set to a multiple of the worker pool size.
764 * This value chosen at random.
765 */
766 #define FIFO_SCALE (100)
767
768 /*
769 * The backlog value for `listen(2)`. This doesn't need to huge,
770 * rather just large enough for our "accept-thread" to wake up and
771 * queue incoming connections onto the FIFO without the kernel
772 * dropping any.
773 *
774 * This value chosen at random.
775 */
776 #define LISTEN_BACKLOG (50)
777
778 static int create_listener_socket(
779 const char *path,
780 const struct ipc_server_opts *ipc_opts,
781 struct unix_ss_socket **new_server_socket)
782 {
783 struct unix_ss_socket *server_socket = NULL;
784 struct unix_stream_listen_opts uslg_opts = UNIX_STREAM_LISTEN_OPTS_INIT;
785 int ret;
786
787 uslg_opts.listen_backlog_size = LISTEN_BACKLOG;
788 uslg_opts.disallow_chdir = ipc_opts->uds_disallow_chdir;
789
790 ret = unix_ss_create(path, &uslg_opts, -1, &server_socket);
791 if (ret)
792 return ret;
793
794 if (set_socket_blocking_flag(server_socket->fd_socket, 1)) {
795 int saved_errno = errno;
796 unix_ss_free(server_socket);
797 errno = saved_errno;
798 return -1;
799 }
800
801 *new_server_socket = server_socket;
802
803 trace2_data_string("ipc-server", NULL, "listen-with-lock", path);
804 return 0;
805 }
806
807 static int setup_listener_socket(
808 const char *path,
809 const struct ipc_server_opts *ipc_opts,
810 struct unix_ss_socket **new_server_socket)
811 {
812 int ret, saved_errno;
813
814 trace2_region_enter("ipc-server", "create-listener_socket", NULL);
815
816 ret = create_listener_socket(path, ipc_opts, new_server_socket);
817
818 saved_errno = errno;
819 trace2_region_leave("ipc-server", "create-listener_socket", NULL);
820 errno = saved_errno;
821
822 return ret;
823 }
824
825 /*
826 * Start IPC server in a pool of background threads.
827 */
828 int ipc_server_run_async(struct ipc_server_data **returned_server_data,
829 const char *path, const struct ipc_server_opts *opts,
830 ipc_server_application_cb *application_cb,
831 void *application_data)
832 {
833 struct unix_ss_socket *server_socket = NULL;
834 struct ipc_server_data *server_data;
835 int sv[2];
836 int k;
837 int ret;
838 int nr_threads = opts->nr_threads;
839
840 *returned_server_data = NULL;
841
842 /*
843 * Create a socketpair and set sv[1] to non-blocking. This
844 * will used to send a shutdown message to the accept-thread
845 * and allows the accept-thread to wait on EITHER a client
846 * connection or a shutdown request without spinning.
847 */
848 if (socketpair(AF_UNIX, SOCK_STREAM, 0, sv) < 0)
849 return -1;
850
851 if (set_socket_blocking_flag(sv[1], 1)) {
852 int saved_errno = errno;
853 close(sv[0]);
854 close(sv[1]);
855 errno = saved_errno;
856 return -1;
857 }
858
859 ret = setup_listener_socket(path, opts, &server_socket);
860 if (ret) {
861 int saved_errno = errno;
862 close(sv[0]);
863 close(sv[1]);
864 errno = saved_errno;
865 return ret;
866 }
867
868 server_data = xcalloc(1, sizeof(*server_data));
869 server_data->magic = MAGIC_SERVER_DATA;
870 server_data->application_cb = application_cb;
871 server_data->application_data = application_data;
872 strbuf_init(&server_data->buf_path, 0);
873 strbuf_addstr(&server_data->buf_path, path);
874
875 if (nr_threads < 1)
876 nr_threads = 1;
877
878 pthread_mutex_init(&server_data->work_available_mutex, NULL);
879 pthread_cond_init(&server_data->work_available_cond, NULL);
880
881 server_data->queue_size = nr_threads * FIFO_SCALE;
882 CALLOC_ARRAY(server_data->fifo_fds, server_data->queue_size);
883
884 server_data->accept_thread =
885 xcalloc(1, sizeof(*server_data->accept_thread));
886 server_data->accept_thread->magic = MAGIC_ACCEPT_THREAD_DATA;
887 server_data->accept_thread->server_data = server_data;
888 server_data->accept_thread->server_socket = server_socket;
889 server_data->accept_thread->fd_send_shutdown = sv[0];
890 server_data->accept_thread->fd_wait_shutdown = sv[1];
891
892 if (pthread_create(&server_data->accept_thread->pthread_id, NULL,
893 accept_thread_proc, server_data->accept_thread))
894 die_errno(_("could not start accept_thread '%s'"), path);
895
896 for (k = 0; k < nr_threads; k++) {
897 struct ipc_worker_thread_data *wtd;
898
899 wtd = xcalloc(1, sizeof(*wtd));
900 wtd->magic = MAGIC_WORKER_THREAD_DATA;
901 wtd->server_data = server_data;
902
903 if (pthread_create(&wtd->pthread_id, NULL, worker_thread_proc,
904 wtd)) {
905 if (k == 0)
906 die(_("could not start worker[0] for '%s'"),
907 path);
908 /*
909 * Limp along with the thread pool that we have.
910 */
911 break;
912 }
913
914 wtd->next_thread = server_data->worker_thread_list;
915 server_data->worker_thread_list = wtd;
916 }
917
918 *returned_server_data = server_data;
919 return 0;
920 }
921
922 /*
923 * Gently tell the IPC server treads to shutdown.
924 * Can be run on any thread.
925 */
926 int ipc_server_stop_async(struct ipc_server_data *server_data)
927 {
928 /* ASSERT NOT holding mutex */
929
930 int fd;
931
932 if (!server_data)
933 return 0;
934
935 trace2_region_enter("ipc-server", "server-stop-async", NULL);
936
937 pthread_mutex_lock(&server_data->work_available_mutex);
938
939 server_data->shutdown_requested = 1;
940
941 /*
942 * Write a byte to the shutdown socket pair to wake up the
943 * accept-thread.
944 */
945 if (write(server_data->accept_thread->fd_send_shutdown, "Q", 1) < 0)
946 error_errno("could not write to fd_send_shutdown");
947
948 /*
949 * Drain the queue of existing connections.
950 */
951 while ((fd = fifo_dequeue(server_data)) != -1)
952 close(fd);
953
954 /*
955 * Gently tell worker threads to stop processing new connections
956 * and exit. (This does not abort in-process conversations.)
957 */
958 pthread_cond_broadcast(&server_data->work_available_cond);
959
960 pthread_mutex_unlock(&server_data->work_available_mutex);
961
962 trace2_region_leave("ipc-server", "server-stop-async", NULL);
963
964 return 0;
965 }
966
967 /*
968 * Wait for all IPC server threads to stop.
969 */
970 int ipc_server_await(struct ipc_server_data *server_data)
971 {
972 pthread_join(server_data->accept_thread->pthread_id, NULL);
973
974 if (!server_data->shutdown_requested)
975 BUG("ipc-server: accept-thread stopped for '%s'",
976 server_data->buf_path.buf);
977
978 while (server_data->worker_thread_list) {
979 struct ipc_worker_thread_data *wtd =
980 server_data->worker_thread_list;
981
982 pthread_join(wtd->pthread_id, NULL);
983
984 server_data->worker_thread_list = wtd->next_thread;
985 free(wtd);
986 }
987
988 server_data->is_stopped = 1;
989
990 return 0;
991 }
992
993 void ipc_server_free(struct ipc_server_data *server_data)
994 {
995 struct ipc_accept_thread_data * accept_thread_data;
996
997 if (!server_data)
998 return;
999
1000 if (!server_data->is_stopped)
1001 BUG("cannot free ipc-server while running for '%s'",
1002 server_data->buf_path.buf);
1003
1004 accept_thread_data = server_data->accept_thread;
1005 if (accept_thread_data) {
1006 unix_ss_free(accept_thread_data->server_socket);
1007
1008 if (accept_thread_data->fd_send_shutdown != -1)
1009 close(accept_thread_data->fd_send_shutdown);
1010 if (accept_thread_data->fd_wait_shutdown != -1)
1011 close(accept_thread_data->fd_wait_shutdown);
1012
1013 free(server_data->accept_thread);
1014 }
1015
1016 while (server_data->worker_thread_list) {
1017 struct ipc_worker_thread_data *wtd =
1018 server_data->worker_thread_list;
1019
1020 server_data->worker_thread_list = wtd->next_thread;
1021 free(wtd);
1022 }
1023
1024 pthread_cond_destroy(&server_data->work_available_cond);
1025 pthread_mutex_destroy(&server_data->work_available_mutex);
1026
1027 strbuf_release(&server_data->buf_path);
1028
1029 free(server_data->fifo_fds);
1030 free(server_data);
1031 }