2 #include "simple-ipc.h"
5 #include "thread-utils.h"
6 #include "unix-socket.h"
7 #include "unix-stream-server.h"
9 #ifndef SUPPORTS_SIMPLE_IPC
11 * This source file should only be compiled when Simple IPC is supported.
12 * See the top-level Makefile.
14 #error SUPPORTS_SIMPLE_IPC not defined
17 enum ipc_active_state
ipc_get_active_state(const char *path
)
19 enum ipc_active_state state
= IPC_STATE__OTHER_ERROR
;
20 struct ipc_client_connect_options options
21 = IPC_CLIENT_CONNECT_OPTIONS_INIT
;
23 struct ipc_client_connection
*connection_test
= NULL
;
25 options
.wait_if_busy
= 0;
26 options
.wait_if_not_found
= 0;
28 if (lstat(path
, &st
) == -1) {
32 return IPC_STATE__NOT_LISTENING
;
34 return IPC_STATE__INVALID_PATH
;
40 * Cygwin emulates Unix sockets by writing special-crafted files whose
41 * `system` bit is set.
43 * If we are too fast, Cygwin might still be in the process of marking
44 * the underlying file as a system file. Until then, we will not see a
45 * Unix socket here, but a plain file instead. Just in case that this
46 * is happening, wait a little and try again.
49 static const int delay
[] = { 1, 10, 20, 40, -1 };
52 for (i
= 0; S_ISREG(st
.st_mode
) && delay
[i
] > 0; i
++) {
53 sleep_millisec(delay
[i
]);
54 if (lstat(path
, &st
) == -1)
55 return IPC_STATE__INVALID_PATH
;
60 /* also complain if a plain file is in the way */
61 if ((st
.st_mode
& S_IFMT
) != S_IFSOCK
)
62 return IPC_STATE__INVALID_PATH
;
65 * Just because the filesystem has a S_IFSOCK type inode
66 * at `path`, doesn't mean it that there is a server listening.
69 state
= ipc_client_try_connect(path
, &options
, &connection_test
);
70 ipc_client_close_connection(connection_test
);
76 * Retry frequency when trying to connect to a server.
78 * This value should be short enough that we don't seriously delay our
79 * caller, but not fast enough that our spinning puts pressure on the
82 #define WAIT_STEP_MS (50)
85 * Try to connect to the server. If the server is just starting up or
86 * is very busy, we may not get a connection the first time.
88 static enum ipc_active_state
connect_to_server(
91 const struct ipc_client_connect_options
*options
,
98 for (k
= 0; k
< timeout_ms
; k
+= WAIT_STEP_MS
) {
99 int fd
= unix_stream_connect(path
, options
->uds_disallow_chdir
);
103 return IPC_STATE__LISTENING
;
106 if (errno
== ENOENT
) {
107 if (!options
->wait_if_not_found
)
108 return IPC_STATE__PATH_NOT_FOUND
;
110 goto sleep_and_try_again
;
113 if (errno
== ETIMEDOUT
) {
114 if (!options
->wait_if_busy
)
115 return IPC_STATE__NOT_LISTENING
;
117 goto sleep_and_try_again
;
120 if (errno
== ECONNREFUSED
) {
121 if (!options
->wait_if_busy
)
122 return IPC_STATE__NOT_LISTENING
;
124 goto sleep_and_try_again
;
127 return IPC_STATE__OTHER_ERROR
;
130 sleep_millisec(WAIT_STEP_MS
);
133 return IPC_STATE__NOT_LISTENING
;
137 * The total amount of time that we are willing to wait when trying to
138 * connect to a server.
140 * When the server is first started, it might take a little while for
141 * it to become ready to service requests. Likewise, the server may
142 * be very (temporarily) busy and not respond to our connections.
144 * We should gracefully and silently handle those conditions and try
145 * again for a reasonable time period.
147 * The value chosen here should be long enough for the server
148 * to reliably heal from the above conditions.
150 #define MY_CONNECTION_TIMEOUT_MS (1000)
152 enum ipc_active_state
ipc_client_try_connect(
154 const struct ipc_client_connect_options
*options
,
155 struct ipc_client_connection
**p_connection
)
157 enum ipc_active_state state
= IPC_STATE__OTHER_ERROR
;
160 *p_connection
= NULL
;
162 trace2_region_enter("ipc-client", "try-connect", NULL
);
163 trace2_data_string("ipc-client", NULL
, "try-connect/path", path
);
165 state
= connect_to_server(path
, MY_CONNECTION_TIMEOUT_MS
,
168 trace2_data_intmax("ipc-client", NULL
, "try-connect/state",
170 trace2_region_leave("ipc-client", "try-connect", NULL
);
172 if (state
== IPC_STATE__LISTENING
) {
173 (*p_connection
) = xcalloc(1, sizeof(struct ipc_client_connection
));
174 (*p_connection
)->fd
= fd
;
180 void ipc_client_close_connection(struct ipc_client_connection
*connection
)
185 if (connection
->fd
!= -1)
186 close(connection
->fd
);
191 int ipc_client_send_command_to_connection(
192 struct ipc_client_connection
*connection
,
193 const char *message
, size_t message_len
,
194 struct strbuf
*answer
)
198 strbuf_setlen(answer
, 0);
200 trace2_region_enter("ipc-client", "send-command", NULL
);
202 if (write_packetized_from_buf_no_flush(message
, message_len
,
203 connection
->fd
) < 0 ||
204 packet_flush_gently(connection
->fd
) < 0) {
205 ret
= error(_("could not send IPC command"));
209 if (read_packetized_to_strbuf(
210 connection
->fd
, answer
,
211 PACKET_READ_GENTLE_ON_EOF
| PACKET_READ_GENTLE_ON_READ_ERROR
) < 0) {
212 ret
= error(_("could not read IPC response"));
217 trace2_region_leave("ipc-client", "send-command", NULL
);
221 int ipc_client_send_command(const char *path
,
222 const struct ipc_client_connect_options
*options
,
223 const char *message
, size_t message_len
,
224 struct strbuf
*answer
)
227 enum ipc_active_state state
;
228 struct ipc_client_connection
*connection
= NULL
;
230 state
= ipc_client_try_connect(path
, options
, &connection
);
232 if (state
!= IPC_STATE__LISTENING
)
235 ret
= ipc_client_send_command_to_connection(connection
,
236 message
, message_len
,
239 ipc_client_close_connection(connection
);
244 static int set_socket_blocking_flag(int fd
, int make_nonblocking
)
248 flags
= fcntl(fd
, F_GETFL
, NULL
);
253 if (make_nonblocking
)
256 flags
&= ~O_NONBLOCK
;
258 return fcntl(fd
, F_SETFL
, flags
);
262 * Magic numbers used to annotate callback instance data.
263 * These are used to help guard against accidentally passing the
264 * wrong instance data across multiple levels of callbacks (which
265 * is easy to do if there are `void*` arguments).
268 MAGIC_SERVER_REPLY_DATA
,
269 MAGIC_WORKER_THREAD_DATA
,
270 MAGIC_ACCEPT_THREAD_DATA
,
274 struct ipc_server_reply_data
{
277 struct ipc_worker_thread_data
*worker_thread_data
;
280 struct ipc_worker_thread_data
{
282 struct ipc_worker_thread_data
*next_thread
;
283 struct ipc_server_data
*server_data
;
284 pthread_t pthread_id
;
287 struct ipc_accept_thread_data
{
289 struct ipc_server_data
*server_data
;
291 struct unix_ss_socket
*server_socket
;
293 int fd_send_shutdown
;
294 int fd_wait_shutdown
;
295 pthread_t pthread_id
;
299 * With unix-sockets, the conceptual "ipc-server" is implemented as a single
300 * controller "accept-thread" thread and a pool of "worker-thread" threads.
301 * The former does the usual `accept()` loop and dispatches connections
302 * to an idle worker thread. The worker threads wait in an idle loop for
303 * a new connection, communicate with the client and relay data to/from
304 * the `application_cb` and then wait for another connection from the
305 * server thread. This avoids the overhead of constantly creating and
306 * destroying threads.
308 struct ipc_server_data
{
310 ipc_server_application_cb
*application_cb
;
311 void *application_data
;
312 struct strbuf buf_path
;
314 struct ipc_accept_thread_data
*accept_thread
;
315 struct ipc_worker_thread_data
*worker_thread_list
;
317 pthread_mutex_t work_available_mutex
;
318 pthread_cond_t work_available_cond
;
321 * Accepted but not yet processed client connections are kept
322 * in a circular buffer FIFO. The queue is empty when the
323 * positions are equal.
330 int shutdown_requested
;
335 * Remove and return the oldest queued connection.
337 * Returns -1 if empty.
339 static int fifo_dequeue(struct ipc_server_data
*server_data
)
341 /* ASSERT holding mutex */
345 if (server_data
->back_pos
== server_data
->front_pos
)
348 fd
= server_data
->fifo_fds
[server_data
->front_pos
];
349 server_data
->fifo_fds
[server_data
->front_pos
] = -1;
351 server_data
->front_pos
++;
352 if (server_data
->front_pos
== server_data
->queue_size
)
353 server_data
->front_pos
= 0;
359 * Push a new fd onto the back of the queue.
361 * Drop it and return -1 if queue is already full.
363 static int fifo_enqueue(struct ipc_server_data
*server_data
, int fd
)
365 /* ASSERT holding mutex */
369 next_back_pos
= server_data
->back_pos
+ 1;
370 if (next_back_pos
== server_data
->queue_size
)
373 if (next_back_pos
== server_data
->front_pos
) {
374 /* Queue is full. Just drop it. */
379 server_data
->fifo_fds
[server_data
->back_pos
] = fd
;
380 server_data
->back_pos
= next_back_pos
;
386 * Wait for a connection to be queued to the FIFO and return it.
388 * Returns -1 if someone has already requested a shutdown.
390 static int worker_thread__wait_for_connection(
391 struct ipc_worker_thread_data
*worker_thread_data
)
393 /* ASSERT NOT holding mutex */
395 struct ipc_server_data
*server_data
= worker_thread_data
->server_data
;
398 pthread_mutex_lock(&server_data
->work_available_mutex
);
400 if (server_data
->shutdown_requested
)
403 fd
= fifo_dequeue(server_data
);
407 pthread_cond_wait(&server_data
->work_available_cond
,
408 &server_data
->work_available_mutex
);
410 pthread_mutex_unlock(&server_data
->work_available_mutex
);
416 * Forward declare our reply callback function so that any compiler
417 * errors are reported when we actually define the function (in addition
418 * to any errors reported when we try to pass this callback function as
419 * a parameter in a function call). The former are easier to understand.
421 static ipc_server_reply_cb do_io_reply_callback
;
424 * Relay application's response message to the client process.
425 * (We do not flush at this point because we allow the caller
426 * to chunk data to the client thru us.)
428 static int do_io_reply_callback(struct ipc_server_reply_data
*reply_data
,
429 const char *response
, size_t response_len
)
431 if (reply_data
->magic
!= MAGIC_SERVER_REPLY_DATA
)
432 BUG("reply_cb called with wrong instance data");
434 return write_packetized_from_buf_no_flush(response
, response_len
,
438 /* A randomly chosen value. */
439 #define MY_WAIT_POLL_TIMEOUT_MS (10)
442 * If the client hangs up without sending any data on the wire, just
443 * quietly close the socket and ignore this client.
445 * This worker thread is committed to reading the IPC request data
446 * from the client at the other end of this fd. Wait here for the
447 * client to actually put something on the wire -- because if the
448 * client just does a ping (connect and hangup without sending any
449 * data), our use of the pkt-line read routines will spew an error
452 * Return -1 if the client hung up.
453 * Return 0 if data (possibly incomplete) is ready.
455 static int worker_thread__wait_for_io_start(
456 struct ipc_worker_thread_data
*worker_thread_data
,
459 struct ipc_server_data
*server_data
= worker_thread_data
->server_data
;
460 struct pollfd pollfd
[1];
465 pollfd
[0].events
= POLLIN
;
467 result
= poll(pollfd
, 1, MY_WAIT_POLL_TIMEOUT_MS
);
479 pthread_mutex_lock(&server_data
->work_available_mutex
);
480 in_shutdown
= server_data
->shutdown_requested
;
481 pthread_mutex_unlock(&server_data
->work_available_mutex
);
484 * If a shutdown is already in progress and this
485 * client has not started talking yet, just drop it.
492 if (pollfd
[0].revents
& POLLHUP
)
495 if (pollfd
[0].revents
& POLLIN
)
507 * Receive the request/command from the client and pass it to the
508 * registered request-callback. The request-callback will compose
509 * a response and call our reply-callback to send it to the client.
511 static int worker_thread__do_io(
512 struct ipc_worker_thread_data
*worker_thread_data
,
515 /* ASSERT NOT holding lock */
517 struct strbuf buf
= STRBUF_INIT
;
518 struct ipc_server_reply_data reply_data
;
521 reply_data
.magic
= MAGIC_SERVER_REPLY_DATA
;
522 reply_data
.worker_thread_data
= worker_thread_data
;
526 ret
= read_packetized_to_strbuf(
528 PACKET_READ_GENTLE_ON_EOF
| PACKET_READ_GENTLE_ON_READ_ERROR
);
530 ret
= worker_thread_data
->server_data
->application_cb(
531 worker_thread_data
->server_data
->application_data
,
532 buf
.buf
, buf
.len
, do_io_reply_callback
, &reply_data
);
534 packet_flush_gently(reply_data
.fd
);
538 * The client probably disconnected/shutdown before it
539 * could send a well-formed message. Ignore it.
543 strbuf_release(&buf
);
544 close(reply_data
.fd
);
550 * Block SIGPIPE on the current thread (so that we get EPIPE from
551 * write() rather than an actual signal).
553 * Note that using sigchain_push() and _pop() to control SIGPIPE
554 * around our IO calls is not thread safe:
555 * [] It uses a global stack of handler frames.
556 * [] It uses ALLOC_GROW() to resize it.
557 * [] Finally, according to the `signal(2)` man-page:
558 * "The effects of `signal()` in a multithreaded process are unspecified."
560 static void thread_block_sigpipe(sigset_t
*old_set
)
564 sigemptyset(&new_set
);
565 sigaddset(&new_set
, SIGPIPE
);
567 sigemptyset(old_set
);
568 pthread_sigmask(SIG_BLOCK
, &new_set
, old_set
);
572 * Thread proc for an IPC worker thread. It handles a series of
573 * connections from clients. It pulls the next fd from the queue
574 * processes it, and then waits for the next client.
576 * Block SIGPIPE in this worker thread for the life of the thread.
577 * This avoids stray (and sometimes delayed) SIGPIPE signals caused
578 * by client errors and/or when we are under extremely heavy IO load.
580 * This means that the application callback will have SIGPIPE blocked.
581 * The callback should not change it.
583 static void *worker_thread_proc(void *_worker_thread_data
)
585 struct ipc_worker_thread_data
*worker_thread_data
= _worker_thread_data
;
586 struct ipc_server_data
*server_data
= worker_thread_data
->server_data
;
591 trace2_thread_start("ipc-worker");
593 thread_block_sigpipe(&old_set
);
596 fd
= worker_thread__wait_for_connection(worker_thread_data
);
598 break; /* in shutdown */
600 io
= worker_thread__wait_for_io_start(worker_thread_data
, fd
);
602 continue; /* client hung up without sending anything */
604 ret
= worker_thread__do_io(worker_thread_data
, fd
);
606 if (ret
== SIMPLE_IPC_QUIT
) {
607 trace2_data_string("ipc-worker", NULL
, "queue_stop_async",
610 * The application layer is telling the ipc-server
613 * We DO NOT have a response to send to the client.
615 * Queue an async stop (to stop the other threads) and
616 * allow this worker thread to exit now (no sense waiting
617 * for the thread-pool shutdown signal).
619 * Other non-idle worker threads are allowed to finish
620 * responding to their current clients.
622 ipc_server_stop_async(server_data
);
627 trace2_thread_exit();
631 /* A randomly chosen value. */
632 #define MY_ACCEPT_POLL_TIMEOUT_MS (60 * 1000)
635 * Accept a new client connection on our socket. This uses non-blocking
636 * IO so that we can also wait for shutdown requests on our socket-pair
637 * without actually spinning on a fast timeout.
639 static int accept_thread__wait_for_connection(
640 struct ipc_accept_thread_data
*accept_thread_data
)
642 struct pollfd pollfd
[2];
646 pollfd
[0].fd
= accept_thread_data
->fd_wait_shutdown
;
647 pollfd
[0].events
= POLLIN
;
649 pollfd
[1].fd
= accept_thread_data
->server_socket
->fd_socket
;
650 pollfd
[1].events
= POLLIN
;
652 result
= poll(pollfd
, 2, MY_ACCEPT_POLL_TIMEOUT_MS
);
663 * If someone deletes or force-creates a new unix
664 * domain socket at our path, all future clients
665 * will be routed elsewhere and we silently starve.
666 * If that happens, just queue a shutdown.
668 if (unix_ss_was_stolen(
669 accept_thread_data
->server_socket
)) {
670 trace2_data_string("ipc-accept", NULL
,
673 ipc_server_stop_async(
674 accept_thread_data
->server_data
);
679 if (pollfd
[0].revents
& POLLIN
) {
680 /* shutdown message queued to socketpair */
684 if (pollfd
[1].revents
& POLLIN
) {
685 /* a connection is available on server_socket */
688 accept(accept_thread_data
->server_socket
->fd_socket
,
694 * An error here is unlikely -- it probably
695 * indicates that the connecting process has
696 * already dropped the connection.
701 BUG("unandled poll result errno=%d r[0]=%d r[1]=%d",
702 errno
, pollfd
[0].revents
, pollfd
[1].revents
);
707 * Thread proc for the IPC server "accept thread". This waits for
708 * an incoming socket connection, appends it to the queue of available
709 * connections, and notifies a worker thread to process it.
711 * Block SIGPIPE in this thread for the life of the thread. This
712 * avoids any stray SIGPIPE signals when closing pipe fds under
713 * extremely heavy loads (such as when the fifo queue is full and we
714 * drop incomming connections).
716 static void *accept_thread_proc(void *_accept_thread_data
)
718 struct ipc_accept_thread_data
*accept_thread_data
= _accept_thread_data
;
719 struct ipc_server_data
*server_data
= accept_thread_data
->server_data
;
722 trace2_thread_start("ipc-accept");
724 thread_block_sigpipe(&old_set
);
727 int client_fd
= accept_thread__wait_for_connection(
730 pthread_mutex_lock(&server_data
->work_available_mutex
);
731 if (server_data
->shutdown_requested
) {
732 pthread_mutex_unlock(&server_data
->work_available_mutex
);
739 /* ignore transient accept() errors */
742 fifo_enqueue(server_data
, client_fd
);
743 pthread_cond_broadcast(&server_data
->work_available_cond
);
745 pthread_mutex_unlock(&server_data
->work_available_mutex
);
748 trace2_thread_exit();
753 * We can't predict the connection arrival rate relative to the worker
754 * processing rate, therefore we allow the "accept-thread" to queue up
755 * a generous number of connections, since we'd rather have the client
756 * not unnecessarily timeout if we can avoid it. (The assumption is
757 * that this will be used for FSMonitor and a few second wait on a
758 * connection is better than having the client timeout and do the full
759 * computation itself.)
761 * The FIFO queue size is set to a multiple of the worker pool size.
762 * This value chosen at random.
764 #define FIFO_SCALE (100)
767 * The backlog value for `listen(2)`. This doesn't need to huge,
768 * rather just large enough for our "accept-thread" to wake up and
769 * queue incoming connections onto the FIFO without the kernel
772 * This value chosen at random.
774 #define LISTEN_BACKLOG (50)
776 static int create_listener_socket(
778 const struct ipc_server_opts
*ipc_opts
,
779 struct unix_ss_socket
**new_server_socket
)
781 struct unix_ss_socket
*server_socket
= NULL
;
782 struct unix_stream_listen_opts uslg_opts
= UNIX_STREAM_LISTEN_OPTS_INIT
;
785 uslg_opts
.listen_backlog_size
= LISTEN_BACKLOG
;
786 uslg_opts
.disallow_chdir
= ipc_opts
->uds_disallow_chdir
;
788 ret
= unix_ss_create(path
, &uslg_opts
, -1, &server_socket
);
792 if (set_socket_blocking_flag(server_socket
->fd_socket
, 1)) {
793 int saved_errno
= errno
;
794 unix_ss_free(server_socket
);
799 *new_server_socket
= server_socket
;
801 trace2_data_string("ipc-server", NULL
, "listen-with-lock", path
);
805 static int setup_listener_socket(
807 const struct ipc_server_opts
*ipc_opts
,
808 struct unix_ss_socket
**new_server_socket
)
810 int ret
, saved_errno
;
812 trace2_region_enter("ipc-server", "create-listener_socket", NULL
);
814 ret
= create_listener_socket(path
, ipc_opts
, new_server_socket
);
817 trace2_region_leave("ipc-server", "create-listener_socket", NULL
);
824 * Start IPC server in a pool of background threads.
826 int ipc_server_run_async(struct ipc_server_data
**returned_server_data
,
827 const char *path
, const struct ipc_server_opts
*opts
,
828 ipc_server_application_cb
*application_cb
,
829 void *application_data
)
831 struct unix_ss_socket
*server_socket
= NULL
;
832 struct ipc_server_data
*server_data
;
836 int nr_threads
= opts
->nr_threads
;
838 *returned_server_data
= NULL
;
841 * Create a socketpair and set sv[1] to non-blocking. This
842 * will used to send a shutdown message to the accept-thread
843 * and allows the accept-thread to wait on EITHER a client
844 * connection or a shutdown request without spinning.
846 if (socketpair(AF_UNIX
, SOCK_STREAM
, 0, sv
) < 0)
849 if (set_socket_blocking_flag(sv
[1], 1)) {
850 int saved_errno
= errno
;
857 ret
= setup_listener_socket(path
, opts
, &server_socket
);
859 int saved_errno
= errno
;
866 server_data
= xcalloc(1, sizeof(*server_data
));
867 server_data
->magic
= MAGIC_SERVER_DATA
;
868 server_data
->application_cb
= application_cb
;
869 server_data
->application_data
= application_data
;
870 strbuf_init(&server_data
->buf_path
, 0);
871 strbuf_addstr(&server_data
->buf_path
, path
);
876 pthread_mutex_init(&server_data
->work_available_mutex
, NULL
);
877 pthread_cond_init(&server_data
->work_available_cond
, NULL
);
879 server_data
->queue_size
= nr_threads
* FIFO_SCALE
;
880 CALLOC_ARRAY(server_data
->fifo_fds
, server_data
->queue_size
);
882 server_data
->accept_thread
=
883 xcalloc(1, sizeof(*server_data
->accept_thread
));
884 server_data
->accept_thread
->magic
= MAGIC_ACCEPT_THREAD_DATA
;
885 server_data
->accept_thread
->server_data
= server_data
;
886 server_data
->accept_thread
->server_socket
= server_socket
;
887 server_data
->accept_thread
->fd_send_shutdown
= sv
[0];
888 server_data
->accept_thread
->fd_wait_shutdown
= sv
[1];
890 if (pthread_create(&server_data
->accept_thread
->pthread_id
, NULL
,
891 accept_thread_proc
, server_data
->accept_thread
))
892 die_errno(_("could not start accept_thread '%s'"), path
);
894 for (k
= 0; k
< nr_threads
; k
++) {
895 struct ipc_worker_thread_data
*wtd
;
897 wtd
= xcalloc(1, sizeof(*wtd
));
898 wtd
->magic
= MAGIC_WORKER_THREAD_DATA
;
899 wtd
->server_data
= server_data
;
901 if (pthread_create(&wtd
->pthread_id
, NULL
, worker_thread_proc
,
904 die(_("could not start worker[0] for '%s'"),
907 * Limp along with the thread pool that we have.
912 wtd
->next_thread
= server_data
->worker_thread_list
;
913 server_data
->worker_thread_list
= wtd
;
916 *returned_server_data
= server_data
;
921 * Gently tell the IPC server treads to shutdown.
922 * Can be run on any thread.
924 int ipc_server_stop_async(struct ipc_server_data
*server_data
)
926 /* ASSERT NOT holding mutex */
933 trace2_region_enter("ipc-server", "server-stop-async", NULL
);
935 pthread_mutex_lock(&server_data
->work_available_mutex
);
937 server_data
->shutdown_requested
= 1;
940 * Write a byte to the shutdown socket pair to wake up the
943 if (write(server_data
->accept_thread
->fd_send_shutdown
, "Q", 1) < 0)
944 error_errno("could not write to fd_send_shutdown");
947 * Drain the queue of existing connections.
949 while ((fd
= fifo_dequeue(server_data
)) != -1)
953 * Gently tell worker threads to stop processing new connections
954 * and exit. (This does not abort in-process conversations.)
956 pthread_cond_broadcast(&server_data
->work_available_cond
);
958 pthread_mutex_unlock(&server_data
->work_available_mutex
);
960 trace2_region_leave("ipc-server", "server-stop-async", NULL
);
966 * Wait for all IPC server threads to stop.
968 int ipc_server_await(struct ipc_server_data
*server_data
)
970 pthread_join(server_data
->accept_thread
->pthread_id
, NULL
);
972 if (!server_data
->shutdown_requested
)
973 BUG("ipc-server: accept-thread stopped for '%s'",
974 server_data
->buf_path
.buf
);
976 while (server_data
->worker_thread_list
) {
977 struct ipc_worker_thread_data
*wtd
=
978 server_data
->worker_thread_list
;
980 pthread_join(wtd
->pthread_id
, NULL
);
982 server_data
->worker_thread_list
= wtd
->next_thread
;
986 server_data
->is_stopped
= 1;
991 void ipc_server_free(struct ipc_server_data
*server_data
)
993 struct ipc_accept_thread_data
* accept_thread_data
;
998 if (!server_data
->is_stopped
)
999 BUG("cannot free ipc-server while running for '%s'",
1000 server_data
->buf_path
.buf
);
1002 accept_thread_data
= server_data
->accept_thread
;
1003 if (accept_thread_data
) {
1004 unix_ss_free(accept_thread_data
->server_socket
);
1006 if (accept_thread_data
->fd_send_shutdown
!= -1)
1007 close(accept_thread_data
->fd_send_shutdown
);
1008 if (accept_thread_data
->fd_wait_shutdown
!= -1)
1009 close(accept_thread_data
->fd_wait_shutdown
);
1011 free(server_data
->accept_thread
);
1014 while (server_data
->worker_thread_list
) {
1015 struct ipc_worker_thread_data
*wtd
=
1016 server_data
->worker_thread_list
;
1018 server_data
->worker_thread_list
= wtd
->next_thread
;
1022 pthread_cond_destroy(&server_data
->work_available_cond
);
1023 pthread_mutex_destroy(&server_data
->work_available_mutex
);
1025 strbuf_release(&server_data
->buf_path
);
1027 free(server_data
->fifo_fds
);