]> git.ipfire.org Git - thirdparty/git.git/blob - compat/simple-ipc/ipc-unix-socket.c
doc: asciidoc: remove custom header macro
[thirdparty/git.git] / compat / simple-ipc / ipc-unix-socket.c
1 #include "cache.h"
2 #include "simple-ipc.h"
3 #include "strbuf.h"
4 #include "pkt-line.h"
5 #include "thread-utils.h"
6 #include "unix-socket.h"
7 #include "unix-stream-server.h"
8
9 #ifndef SUPPORTS_SIMPLE_IPC
10 /*
11 * This source file should only be compiled when Simple IPC is supported.
12 * See the top-level Makefile.
13 */
14 #error SUPPORTS_SIMPLE_IPC not defined
15 #endif
16
17 enum ipc_active_state ipc_get_active_state(const char *path)
18 {
19 enum ipc_active_state state = IPC_STATE__OTHER_ERROR;
20 struct ipc_client_connect_options options
21 = IPC_CLIENT_CONNECT_OPTIONS_INIT;
22 struct stat st;
23 struct ipc_client_connection *connection_test = NULL;
24
25 options.wait_if_busy = 0;
26 options.wait_if_not_found = 0;
27
28 if (lstat(path, &st) == -1) {
29 switch (errno) {
30 case ENOENT:
31 case ENOTDIR:
32 return IPC_STATE__NOT_LISTENING;
33 default:
34 return IPC_STATE__INVALID_PATH;
35 }
36 }
37
38 #ifdef __CYGWIN__
39 /*
40 * Cygwin emulates Unix sockets by writing special-crafted files whose
41 * `system` bit is set.
42 *
43 * If we are too fast, Cygwin might still be in the process of marking
44 * the underlying file as a system file. Until then, we will not see a
45 * Unix socket here, but a plain file instead. Just in case that this
46 * is happening, wait a little and try again.
47 */
48 {
49 static const int delay[] = { 1, 10, 20, 40, -1 };
50 int i;
51
52 for (i = 0; S_ISREG(st.st_mode) && delay[i] > 0; i++) {
53 sleep_millisec(delay[i]);
54 if (lstat(path, &st) == -1)
55 return IPC_STATE__INVALID_PATH;
56 }
57 }
58 #endif
59
60 /* also complain if a plain file is in the way */
61 if ((st.st_mode & S_IFMT) != S_IFSOCK)
62 return IPC_STATE__INVALID_PATH;
63
64 /*
65 * Just because the filesystem has a S_IFSOCK type inode
66 * at `path`, doesn't mean it that there is a server listening.
67 * Ping it to be sure.
68 */
69 state = ipc_client_try_connect(path, &options, &connection_test);
70 ipc_client_close_connection(connection_test);
71
72 return state;
73 }
74
75 /*
76 * Retry frequency when trying to connect to a server.
77 *
78 * This value should be short enough that we don't seriously delay our
79 * caller, but not fast enough that our spinning puts pressure on the
80 * system.
81 */
82 #define WAIT_STEP_MS (50)
83
84 /*
85 * Try to connect to the server. If the server is just starting up or
86 * is very busy, we may not get a connection the first time.
87 */
88 static enum ipc_active_state connect_to_server(
89 const char *path,
90 int timeout_ms,
91 const struct ipc_client_connect_options *options,
92 int *pfd)
93 {
94 int k;
95
96 *pfd = -1;
97
98 for (k = 0; k < timeout_ms; k += WAIT_STEP_MS) {
99 int fd = unix_stream_connect(path, options->uds_disallow_chdir);
100
101 if (fd != -1) {
102 *pfd = fd;
103 return IPC_STATE__LISTENING;
104 }
105
106 if (errno == ENOENT) {
107 if (!options->wait_if_not_found)
108 return IPC_STATE__PATH_NOT_FOUND;
109
110 goto sleep_and_try_again;
111 }
112
113 if (errno == ETIMEDOUT) {
114 if (!options->wait_if_busy)
115 return IPC_STATE__NOT_LISTENING;
116
117 goto sleep_and_try_again;
118 }
119
120 if (errno == ECONNREFUSED) {
121 if (!options->wait_if_busy)
122 return IPC_STATE__NOT_LISTENING;
123
124 goto sleep_and_try_again;
125 }
126
127 return IPC_STATE__OTHER_ERROR;
128
129 sleep_and_try_again:
130 sleep_millisec(WAIT_STEP_MS);
131 }
132
133 return IPC_STATE__NOT_LISTENING;
134 }
135
136 /*
137 * The total amount of time that we are willing to wait when trying to
138 * connect to a server.
139 *
140 * When the server is first started, it might take a little while for
141 * it to become ready to service requests. Likewise, the server may
142 * be very (temporarily) busy and not respond to our connections.
143 *
144 * We should gracefully and silently handle those conditions and try
145 * again for a reasonable time period.
146 *
147 * The value chosen here should be long enough for the server
148 * to reliably heal from the above conditions.
149 */
150 #define MY_CONNECTION_TIMEOUT_MS (1000)
151
152 enum ipc_active_state ipc_client_try_connect(
153 const char *path,
154 const struct ipc_client_connect_options *options,
155 struct ipc_client_connection **p_connection)
156 {
157 enum ipc_active_state state = IPC_STATE__OTHER_ERROR;
158 int fd = -1;
159
160 *p_connection = NULL;
161
162 trace2_region_enter("ipc-client", "try-connect", NULL);
163 trace2_data_string("ipc-client", NULL, "try-connect/path", path);
164
165 state = connect_to_server(path, MY_CONNECTION_TIMEOUT_MS,
166 options, &fd);
167
168 trace2_data_intmax("ipc-client", NULL, "try-connect/state",
169 (intmax_t)state);
170 trace2_region_leave("ipc-client", "try-connect", NULL);
171
172 if (state == IPC_STATE__LISTENING) {
173 (*p_connection) = xcalloc(1, sizeof(struct ipc_client_connection));
174 (*p_connection)->fd = fd;
175 }
176
177 return state;
178 }
179
180 void ipc_client_close_connection(struct ipc_client_connection *connection)
181 {
182 if (!connection)
183 return;
184
185 if (connection->fd != -1)
186 close(connection->fd);
187
188 free(connection);
189 }
190
191 int ipc_client_send_command_to_connection(
192 struct ipc_client_connection *connection,
193 const char *message, size_t message_len,
194 struct strbuf *answer)
195 {
196 int ret = 0;
197
198 strbuf_setlen(answer, 0);
199
200 trace2_region_enter("ipc-client", "send-command", NULL);
201
202 if (write_packetized_from_buf_no_flush(message, message_len,
203 connection->fd) < 0 ||
204 packet_flush_gently(connection->fd) < 0) {
205 ret = error(_("could not send IPC command"));
206 goto done;
207 }
208
209 if (read_packetized_to_strbuf(
210 connection->fd, answer,
211 PACKET_READ_GENTLE_ON_EOF | PACKET_READ_GENTLE_ON_READ_ERROR) < 0) {
212 ret = error(_("could not read IPC response"));
213 goto done;
214 }
215
216 done:
217 trace2_region_leave("ipc-client", "send-command", NULL);
218 return ret;
219 }
220
221 int ipc_client_send_command(const char *path,
222 const struct ipc_client_connect_options *options,
223 const char *message, size_t message_len,
224 struct strbuf *answer)
225 {
226 int ret = -1;
227 enum ipc_active_state state;
228 struct ipc_client_connection *connection = NULL;
229
230 state = ipc_client_try_connect(path, options, &connection);
231
232 if (state != IPC_STATE__LISTENING)
233 return ret;
234
235 ret = ipc_client_send_command_to_connection(connection,
236 message, message_len,
237 answer);
238
239 ipc_client_close_connection(connection);
240
241 return ret;
242 }
243
244 static int set_socket_blocking_flag(int fd, int make_nonblocking)
245 {
246 int flags;
247
248 flags = fcntl(fd, F_GETFL, NULL);
249
250 if (flags < 0)
251 return -1;
252
253 if (make_nonblocking)
254 flags |= O_NONBLOCK;
255 else
256 flags &= ~O_NONBLOCK;
257
258 return fcntl(fd, F_SETFL, flags);
259 }
260
261 /*
262 * Magic numbers used to annotate callback instance data.
263 * These are used to help guard against accidentally passing the
264 * wrong instance data across multiple levels of callbacks (which
265 * is easy to do if there are `void*` arguments).
266 */
267 enum magic {
268 MAGIC_SERVER_REPLY_DATA,
269 MAGIC_WORKER_THREAD_DATA,
270 MAGIC_ACCEPT_THREAD_DATA,
271 MAGIC_SERVER_DATA,
272 };
273
274 struct ipc_server_reply_data {
275 enum magic magic;
276 int fd;
277 struct ipc_worker_thread_data *worker_thread_data;
278 };
279
280 struct ipc_worker_thread_data {
281 enum magic magic;
282 struct ipc_worker_thread_data *next_thread;
283 struct ipc_server_data *server_data;
284 pthread_t pthread_id;
285 };
286
287 struct ipc_accept_thread_data {
288 enum magic magic;
289 struct ipc_server_data *server_data;
290
291 struct unix_ss_socket *server_socket;
292
293 int fd_send_shutdown;
294 int fd_wait_shutdown;
295 pthread_t pthread_id;
296 };
297
298 /*
299 * With unix-sockets, the conceptual "ipc-server" is implemented as a single
300 * controller "accept-thread" thread and a pool of "worker-thread" threads.
301 * The former does the usual `accept()` loop and dispatches connections
302 * to an idle worker thread. The worker threads wait in an idle loop for
303 * a new connection, communicate with the client and relay data to/from
304 * the `application_cb` and then wait for another connection from the
305 * server thread. This avoids the overhead of constantly creating and
306 * destroying threads.
307 */
308 struct ipc_server_data {
309 enum magic magic;
310 ipc_server_application_cb *application_cb;
311 void *application_data;
312 struct strbuf buf_path;
313
314 struct ipc_accept_thread_data *accept_thread;
315 struct ipc_worker_thread_data *worker_thread_list;
316
317 pthread_mutex_t work_available_mutex;
318 pthread_cond_t work_available_cond;
319
320 /*
321 * Accepted but not yet processed client connections are kept
322 * in a circular buffer FIFO. The queue is empty when the
323 * positions are equal.
324 */
325 int *fifo_fds;
326 int queue_size;
327 int back_pos;
328 int front_pos;
329
330 int shutdown_requested;
331 int is_stopped;
332 };
333
334 /*
335 * Remove and return the oldest queued connection.
336 *
337 * Returns -1 if empty.
338 */
339 static int fifo_dequeue(struct ipc_server_data *server_data)
340 {
341 /* ASSERT holding mutex */
342
343 int fd;
344
345 if (server_data->back_pos == server_data->front_pos)
346 return -1;
347
348 fd = server_data->fifo_fds[server_data->front_pos];
349 server_data->fifo_fds[server_data->front_pos] = -1;
350
351 server_data->front_pos++;
352 if (server_data->front_pos == server_data->queue_size)
353 server_data->front_pos = 0;
354
355 return fd;
356 }
357
358 /*
359 * Push a new fd onto the back of the queue.
360 *
361 * Drop it and return -1 if queue is already full.
362 */
363 static int fifo_enqueue(struct ipc_server_data *server_data, int fd)
364 {
365 /* ASSERT holding mutex */
366
367 int next_back_pos;
368
369 next_back_pos = server_data->back_pos + 1;
370 if (next_back_pos == server_data->queue_size)
371 next_back_pos = 0;
372
373 if (next_back_pos == server_data->front_pos) {
374 /* Queue is full. Just drop it. */
375 close(fd);
376 return -1;
377 }
378
379 server_data->fifo_fds[server_data->back_pos] = fd;
380 server_data->back_pos = next_back_pos;
381
382 return fd;
383 }
384
385 /*
386 * Wait for a connection to be queued to the FIFO and return it.
387 *
388 * Returns -1 if someone has already requested a shutdown.
389 */
390 static int worker_thread__wait_for_connection(
391 struct ipc_worker_thread_data *worker_thread_data)
392 {
393 /* ASSERT NOT holding mutex */
394
395 struct ipc_server_data *server_data = worker_thread_data->server_data;
396 int fd = -1;
397
398 pthread_mutex_lock(&server_data->work_available_mutex);
399 for (;;) {
400 if (server_data->shutdown_requested)
401 break;
402
403 fd = fifo_dequeue(server_data);
404 if (fd >= 0)
405 break;
406
407 pthread_cond_wait(&server_data->work_available_cond,
408 &server_data->work_available_mutex);
409 }
410 pthread_mutex_unlock(&server_data->work_available_mutex);
411
412 return fd;
413 }
414
415 /*
416 * Forward declare our reply callback function so that any compiler
417 * errors are reported when we actually define the function (in addition
418 * to any errors reported when we try to pass this callback function as
419 * a parameter in a function call). The former are easier to understand.
420 */
421 static ipc_server_reply_cb do_io_reply_callback;
422
423 /*
424 * Relay application's response message to the client process.
425 * (We do not flush at this point because we allow the caller
426 * to chunk data to the client thru us.)
427 */
428 static int do_io_reply_callback(struct ipc_server_reply_data *reply_data,
429 const char *response, size_t response_len)
430 {
431 if (reply_data->magic != MAGIC_SERVER_REPLY_DATA)
432 BUG("reply_cb called with wrong instance data");
433
434 return write_packetized_from_buf_no_flush(response, response_len,
435 reply_data->fd);
436 }
437
438 /* A randomly chosen value. */
439 #define MY_WAIT_POLL_TIMEOUT_MS (10)
440
441 /*
442 * If the client hangs up without sending any data on the wire, just
443 * quietly close the socket and ignore this client.
444 *
445 * This worker thread is committed to reading the IPC request data
446 * from the client at the other end of this fd. Wait here for the
447 * client to actually put something on the wire -- because if the
448 * client just does a ping (connect and hangup without sending any
449 * data), our use of the pkt-line read routines will spew an error
450 * message.
451 *
452 * Return -1 if the client hung up.
453 * Return 0 if data (possibly incomplete) is ready.
454 */
455 static int worker_thread__wait_for_io_start(
456 struct ipc_worker_thread_data *worker_thread_data,
457 int fd)
458 {
459 struct ipc_server_data *server_data = worker_thread_data->server_data;
460 struct pollfd pollfd[1];
461 int result;
462
463 for (;;) {
464 pollfd[0].fd = fd;
465 pollfd[0].events = POLLIN;
466
467 result = poll(pollfd, 1, MY_WAIT_POLL_TIMEOUT_MS);
468 if (result < 0) {
469 if (errno == EINTR)
470 continue;
471 goto cleanup;
472 }
473
474 if (result == 0) {
475 /* a timeout */
476
477 int in_shutdown;
478
479 pthread_mutex_lock(&server_data->work_available_mutex);
480 in_shutdown = server_data->shutdown_requested;
481 pthread_mutex_unlock(&server_data->work_available_mutex);
482
483 /*
484 * If a shutdown is already in progress and this
485 * client has not started talking yet, just drop it.
486 */
487 if (in_shutdown)
488 goto cleanup;
489 continue;
490 }
491
492 if (pollfd[0].revents & POLLHUP)
493 goto cleanup;
494
495 if (pollfd[0].revents & POLLIN)
496 return 0;
497
498 goto cleanup;
499 }
500
501 cleanup:
502 close(fd);
503 return -1;
504 }
505
506 /*
507 * Receive the request/command from the client and pass it to the
508 * registered request-callback. The request-callback will compose
509 * a response and call our reply-callback to send it to the client.
510 */
511 static int worker_thread__do_io(
512 struct ipc_worker_thread_data *worker_thread_data,
513 int fd)
514 {
515 /* ASSERT NOT holding lock */
516
517 struct strbuf buf = STRBUF_INIT;
518 struct ipc_server_reply_data reply_data;
519 int ret = 0;
520
521 reply_data.magic = MAGIC_SERVER_REPLY_DATA;
522 reply_data.worker_thread_data = worker_thread_data;
523
524 reply_data.fd = fd;
525
526 ret = read_packetized_to_strbuf(
527 reply_data.fd, &buf,
528 PACKET_READ_GENTLE_ON_EOF | PACKET_READ_GENTLE_ON_READ_ERROR);
529 if (ret >= 0) {
530 ret = worker_thread_data->server_data->application_cb(
531 worker_thread_data->server_data->application_data,
532 buf.buf, buf.len, do_io_reply_callback, &reply_data);
533
534 packet_flush_gently(reply_data.fd);
535 }
536 else {
537 /*
538 * The client probably disconnected/shutdown before it
539 * could send a well-formed message. Ignore it.
540 */
541 }
542
543 strbuf_release(&buf);
544 close(reply_data.fd);
545
546 return ret;
547 }
548
549 /*
550 * Block SIGPIPE on the current thread (so that we get EPIPE from
551 * write() rather than an actual signal).
552 *
553 * Note that using sigchain_push() and _pop() to control SIGPIPE
554 * around our IO calls is not thread safe:
555 * [] It uses a global stack of handler frames.
556 * [] It uses ALLOC_GROW() to resize it.
557 * [] Finally, according to the `signal(2)` man-page:
558 * "The effects of `signal()` in a multithreaded process are unspecified."
559 */
560 static void thread_block_sigpipe(sigset_t *old_set)
561 {
562 sigset_t new_set;
563
564 sigemptyset(&new_set);
565 sigaddset(&new_set, SIGPIPE);
566
567 sigemptyset(old_set);
568 pthread_sigmask(SIG_BLOCK, &new_set, old_set);
569 }
570
571 /*
572 * Thread proc for an IPC worker thread. It handles a series of
573 * connections from clients. It pulls the next fd from the queue
574 * processes it, and then waits for the next client.
575 *
576 * Block SIGPIPE in this worker thread for the life of the thread.
577 * This avoids stray (and sometimes delayed) SIGPIPE signals caused
578 * by client errors and/or when we are under extremely heavy IO load.
579 *
580 * This means that the application callback will have SIGPIPE blocked.
581 * The callback should not change it.
582 */
583 static void *worker_thread_proc(void *_worker_thread_data)
584 {
585 struct ipc_worker_thread_data *worker_thread_data = _worker_thread_data;
586 struct ipc_server_data *server_data = worker_thread_data->server_data;
587 sigset_t old_set;
588 int fd, io;
589 int ret;
590
591 trace2_thread_start("ipc-worker");
592
593 thread_block_sigpipe(&old_set);
594
595 for (;;) {
596 fd = worker_thread__wait_for_connection(worker_thread_data);
597 if (fd == -1)
598 break; /* in shutdown */
599
600 io = worker_thread__wait_for_io_start(worker_thread_data, fd);
601 if (io == -1)
602 continue; /* client hung up without sending anything */
603
604 ret = worker_thread__do_io(worker_thread_data, fd);
605
606 if (ret == SIMPLE_IPC_QUIT) {
607 trace2_data_string("ipc-worker", NULL, "queue_stop_async",
608 "application_quit");
609 /*
610 * The application layer is telling the ipc-server
611 * layer to shutdown.
612 *
613 * We DO NOT have a response to send to the client.
614 *
615 * Queue an async stop (to stop the other threads) and
616 * allow this worker thread to exit now (no sense waiting
617 * for the thread-pool shutdown signal).
618 *
619 * Other non-idle worker threads are allowed to finish
620 * responding to their current clients.
621 */
622 ipc_server_stop_async(server_data);
623 break;
624 }
625 }
626
627 trace2_thread_exit();
628 return NULL;
629 }
630
631 /* A randomly chosen value. */
632 #define MY_ACCEPT_POLL_TIMEOUT_MS (60 * 1000)
633
634 /*
635 * Accept a new client connection on our socket. This uses non-blocking
636 * IO so that we can also wait for shutdown requests on our socket-pair
637 * without actually spinning on a fast timeout.
638 */
639 static int accept_thread__wait_for_connection(
640 struct ipc_accept_thread_data *accept_thread_data)
641 {
642 struct pollfd pollfd[2];
643 int result;
644
645 for (;;) {
646 pollfd[0].fd = accept_thread_data->fd_wait_shutdown;
647 pollfd[0].events = POLLIN;
648
649 pollfd[1].fd = accept_thread_data->server_socket->fd_socket;
650 pollfd[1].events = POLLIN;
651
652 result = poll(pollfd, 2, MY_ACCEPT_POLL_TIMEOUT_MS);
653 if (result < 0) {
654 if (errno == EINTR)
655 continue;
656 return result;
657 }
658
659 if (result == 0) {
660 /* a timeout */
661
662 /*
663 * If someone deletes or force-creates a new unix
664 * domain socket at our path, all future clients
665 * will be routed elsewhere and we silently starve.
666 * If that happens, just queue a shutdown.
667 */
668 if (unix_ss_was_stolen(
669 accept_thread_data->server_socket)) {
670 trace2_data_string("ipc-accept", NULL,
671 "queue_stop_async",
672 "socket_stolen");
673 ipc_server_stop_async(
674 accept_thread_data->server_data);
675 }
676 continue;
677 }
678
679 if (pollfd[0].revents & POLLIN) {
680 /* shutdown message queued to socketpair */
681 return -1;
682 }
683
684 if (pollfd[1].revents & POLLIN) {
685 /* a connection is available on server_socket */
686
687 int client_fd =
688 accept(accept_thread_data->server_socket->fd_socket,
689 NULL, NULL);
690 if (client_fd >= 0)
691 return client_fd;
692
693 /*
694 * An error here is unlikely -- it probably
695 * indicates that the connecting process has
696 * already dropped the connection.
697 */
698 continue;
699 }
700
701 BUG("unandled poll result errno=%d r[0]=%d r[1]=%d",
702 errno, pollfd[0].revents, pollfd[1].revents);
703 }
704 }
705
706 /*
707 * Thread proc for the IPC server "accept thread". This waits for
708 * an incoming socket connection, appends it to the queue of available
709 * connections, and notifies a worker thread to process it.
710 *
711 * Block SIGPIPE in this thread for the life of the thread. This
712 * avoids any stray SIGPIPE signals when closing pipe fds under
713 * extremely heavy loads (such as when the fifo queue is full and we
714 * drop incomming connections).
715 */
716 static void *accept_thread_proc(void *_accept_thread_data)
717 {
718 struct ipc_accept_thread_data *accept_thread_data = _accept_thread_data;
719 struct ipc_server_data *server_data = accept_thread_data->server_data;
720 sigset_t old_set;
721
722 trace2_thread_start("ipc-accept");
723
724 thread_block_sigpipe(&old_set);
725
726 for (;;) {
727 int client_fd = accept_thread__wait_for_connection(
728 accept_thread_data);
729
730 pthread_mutex_lock(&server_data->work_available_mutex);
731 if (server_data->shutdown_requested) {
732 pthread_mutex_unlock(&server_data->work_available_mutex);
733 if (client_fd >= 0)
734 close(client_fd);
735 break;
736 }
737
738 if (client_fd < 0) {
739 /* ignore transient accept() errors */
740 }
741 else {
742 fifo_enqueue(server_data, client_fd);
743 pthread_cond_broadcast(&server_data->work_available_cond);
744 }
745 pthread_mutex_unlock(&server_data->work_available_mutex);
746 }
747
748 trace2_thread_exit();
749 return NULL;
750 }
751
752 /*
753 * We can't predict the connection arrival rate relative to the worker
754 * processing rate, therefore we allow the "accept-thread" to queue up
755 * a generous number of connections, since we'd rather have the client
756 * not unnecessarily timeout if we can avoid it. (The assumption is
757 * that this will be used for FSMonitor and a few second wait on a
758 * connection is better than having the client timeout and do the full
759 * computation itself.)
760 *
761 * The FIFO queue size is set to a multiple of the worker pool size.
762 * This value chosen at random.
763 */
764 #define FIFO_SCALE (100)
765
766 /*
767 * The backlog value for `listen(2)`. This doesn't need to huge,
768 * rather just large enough for our "accept-thread" to wake up and
769 * queue incoming connections onto the FIFO without the kernel
770 * dropping any.
771 *
772 * This value chosen at random.
773 */
774 #define LISTEN_BACKLOG (50)
775
776 static int create_listener_socket(
777 const char *path,
778 const struct ipc_server_opts *ipc_opts,
779 struct unix_ss_socket **new_server_socket)
780 {
781 struct unix_ss_socket *server_socket = NULL;
782 struct unix_stream_listen_opts uslg_opts = UNIX_STREAM_LISTEN_OPTS_INIT;
783 int ret;
784
785 uslg_opts.listen_backlog_size = LISTEN_BACKLOG;
786 uslg_opts.disallow_chdir = ipc_opts->uds_disallow_chdir;
787
788 ret = unix_ss_create(path, &uslg_opts, -1, &server_socket);
789 if (ret)
790 return ret;
791
792 if (set_socket_blocking_flag(server_socket->fd_socket, 1)) {
793 int saved_errno = errno;
794 unix_ss_free(server_socket);
795 errno = saved_errno;
796 return -1;
797 }
798
799 *new_server_socket = server_socket;
800
801 trace2_data_string("ipc-server", NULL, "listen-with-lock", path);
802 return 0;
803 }
804
805 static int setup_listener_socket(
806 const char *path,
807 const struct ipc_server_opts *ipc_opts,
808 struct unix_ss_socket **new_server_socket)
809 {
810 int ret, saved_errno;
811
812 trace2_region_enter("ipc-server", "create-listener_socket", NULL);
813
814 ret = create_listener_socket(path, ipc_opts, new_server_socket);
815
816 saved_errno = errno;
817 trace2_region_leave("ipc-server", "create-listener_socket", NULL);
818 errno = saved_errno;
819
820 return ret;
821 }
822
823 /*
824 * Start IPC server in a pool of background threads.
825 */
826 int ipc_server_run_async(struct ipc_server_data **returned_server_data,
827 const char *path, const struct ipc_server_opts *opts,
828 ipc_server_application_cb *application_cb,
829 void *application_data)
830 {
831 struct unix_ss_socket *server_socket = NULL;
832 struct ipc_server_data *server_data;
833 int sv[2];
834 int k;
835 int ret;
836 int nr_threads = opts->nr_threads;
837
838 *returned_server_data = NULL;
839
840 /*
841 * Create a socketpair and set sv[1] to non-blocking. This
842 * will used to send a shutdown message to the accept-thread
843 * and allows the accept-thread to wait on EITHER a client
844 * connection or a shutdown request without spinning.
845 */
846 if (socketpair(AF_UNIX, SOCK_STREAM, 0, sv) < 0)
847 return -1;
848
849 if (set_socket_blocking_flag(sv[1], 1)) {
850 int saved_errno = errno;
851 close(sv[0]);
852 close(sv[1]);
853 errno = saved_errno;
854 return -1;
855 }
856
857 ret = setup_listener_socket(path, opts, &server_socket);
858 if (ret) {
859 int saved_errno = errno;
860 close(sv[0]);
861 close(sv[1]);
862 errno = saved_errno;
863 return ret;
864 }
865
866 server_data = xcalloc(1, sizeof(*server_data));
867 server_data->magic = MAGIC_SERVER_DATA;
868 server_data->application_cb = application_cb;
869 server_data->application_data = application_data;
870 strbuf_init(&server_data->buf_path, 0);
871 strbuf_addstr(&server_data->buf_path, path);
872
873 if (nr_threads < 1)
874 nr_threads = 1;
875
876 pthread_mutex_init(&server_data->work_available_mutex, NULL);
877 pthread_cond_init(&server_data->work_available_cond, NULL);
878
879 server_data->queue_size = nr_threads * FIFO_SCALE;
880 CALLOC_ARRAY(server_data->fifo_fds, server_data->queue_size);
881
882 server_data->accept_thread =
883 xcalloc(1, sizeof(*server_data->accept_thread));
884 server_data->accept_thread->magic = MAGIC_ACCEPT_THREAD_DATA;
885 server_data->accept_thread->server_data = server_data;
886 server_data->accept_thread->server_socket = server_socket;
887 server_data->accept_thread->fd_send_shutdown = sv[0];
888 server_data->accept_thread->fd_wait_shutdown = sv[1];
889
890 if (pthread_create(&server_data->accept_thread->pthread_id, NULL,
891 accept_thread_proc, server_data->accept_thread))
892 die_errno(_("could not start accept_thread '%s'"), path);
893
894 for (k = 0; k < nr_threads; k++) {
895 struct ipc_worker_thread_data *wtd;
896
897 wtd = xcalloc(1, sizeof(*wtd));
898 wtd->magic = MAGIC_WORKER_THREAD_DATA;
899 wtd->server_data = server_data;
900
901 if (pthread_create(&wtd->pthread_id, NULL, worker_thread_proc,
902 wtd)) {
903 if (k == 0)
904 die(_("could not start worker[0] for '%s'"),
905 path);
906 /*
907 * Limp along with the thread pool that we have.
908 */
909 break;
910 }
911
912 wtd->next_thread = server_data->worker_thread_list;
913 server_data->worker_thread_list = wtd;
914 }
915
916 *returned_server_data = server_data;
917 return 0;
918 }
919
920 /*
921 * Gently tell the IPC server treads to shutdown.
922 * Can be run on any thread.
923 */
924 int ipc_server_stop_async(struct ipc_server_data *server_data)
925 {
926 /* ASSERT NOT holding mutex */
927
928 int fd;
929
930 if (!server_data)
931 return 0;
932
933 trace2_region_enter("ipc-server", "server-stop-async", NULL);
934
935 pthread_mutex_lock(&server_data->work_available_mutex);
936
937 server_data->shutdown_requested = 1;
938
939 /*
940 * Write a byte to the shutdown socket pair to wake up the
941 * accept-thread.
942 */
943 if (write(server_data->accept_thread->fd_send_shutdown, "Q", 1) < 0)
944 error_errno("could not write to fd_send_shutdown");
945
946 /*
947 * Drain the queue of existing connections.
948 */
949 while ((fd = fifo_dequeue(server_data)) != -1)
950 close(fd);
951
952 /*
953 * Gently tell worker threads to stop processing new connections
954 * and exit. (This does not abort in-process conversations.)
955 */
956 pthread_cond_broadcast(&server_data->work_available_cond);
957
958 pthread_mutex_unlock(&server_data->work_available_mutex);
959
960 trace2_region_leave("ipc-server", "server-stop-async", NULL);
961
962 return 0;
963 }
964
965 /*
966 * Wait for all IPC server threads to stop.
967 */
968 int ipc_server_await(struct ipc_server_data *server_data)
969 {
970 pthread_join(server_data->accept_thread->pthread_id, NULL);
971
972 if (!server_data->shutdown_requested)
973 BUG("ipc-server: accept-thread stopped for '%s'",
974 server_data->buf_path.buf);
975
976 while (server_data->worker_thread_list) {
977 struct ipc_worker_thread_data *wtd =
978 server_data->worker_thread_list;
979
980 pthread_join(wtd->pthread_id, NULL);
981
982 server_data->worker_thread_list = wtd->next_thread;
983 free(wtd);
984 }
985
986 server_data->is_stopped = 1;
987
988 return 0;
989 }
990
991 void ipc_server_free(struct ipc_server_data *server_data)
992 {
993 struct ipc_accept_thread_data * accept_thread_data;
994
995 if (!server_data)
996 return;
997
998 if (!server_data->is_stopped)
999 BUG("cannot free ipc-server while running for '%s'",
1000 server_data->buf_path.buf);
1001
1002 accept_thread_data = server_data->accept_thread;
1003 if (accept_thread_data) {
1004 unix_ss_free(accept_thread_data->server_socket);
1005
1006 if (accept_thread_data->fd_send_shutdown != -1)
1007 close(accept_thread_data->fd_send_shutdown);
1008 if (accept_thread_data->fd_wait_shutdown != -1)
1009 close(accept_thread_data->fd_wait_shutdown);
1010
1011 free(server_data->accept_thread);
1012 }
1013
1014 while (server_data->worker_thread_list) {
1015 struct ipc_worker_thread_data *wtd =
1016 server_data->worker_thread_list;
1017
1018 server_data->worker_thread_list = wtd->next_thread;
1019 free(wtd);
1020 }
1021
1022 pthread_cond_destroy(&server_data->work_available_cond);
1023 pthread_mutex_destroy(&server_data->work_available_mutex);
1024
1025 strbuf_release(&server_data->buf_path);
1026
1027 free(server_data->fifo_fds);
1028 free(server_data);
1029 }