1 #include "git-compat-util.h"
3 #include "simple-ipc.h"
6 #include "thread-utils.h"
8 #include "unix-socket.h"
9 #include "unix-stream-server.h"
11 #ifndef SUPPORTS_SIMPLE_IPC
13 * This source file should only be compiled when Simple IPC is supported.
14 * See the top-level Makefile.
16 #error SUPPORTS_SIMPLE_IPC not defined
19 enum ipc_active_state
ipc_get_active_state(const char *path
)
21 enum ipc_active_state state
= IPC_STATE__OTHER_ERROR
;
22 struct ipc_client_connect_options options
23 = IPC_CLIENT_CONNECT_OPTIONS_INIT
;
25 struct ipc_client_connection
*connection_test
= NULL
;
27 options
.wait_if_busy
= 0;
28 options
.wait_if_not_found
= 0;
30 if (lstat(path
, &st
) == -1) {
34 return IPC_STATE__NOT_LISTENING
;
36 return IPC_STATE__INVALID_PATH
;
42 * Cygwin emulates Unix sockets by writing special-crafted files whose
43 * `system` bit is set.
45 * If we are too fast, Cygwin might still be in the process of marking
46 * the underlying file as a system file. Until then, we will not see a
47 * Unix socket here, but a plain file instead. Just in case that this
48 * is happening, wait a little and try again.
51 static const int delay
[] = { 1, 10, 20, 40, -1 };
54 for (i
= 0; S_ISREG(st
.st_mode
) && delay
[i
] > 0; i
++) {
55 sleep_millisec(delay
[i
]);
56 if (lstat(path
, &st
) == -1)
57 return IPC_STATE__INVALID_PATH
;
62 /* also complain if a plain file is in the way */
63 if ((st
.st_mode
& S_IFMT
) != S_IFSOCK
)
64 return IPC_STATE__INVALID_PATH
;
67 * Just because the filesystem has a S_IFSOCK type inode
68 * at `path`, doesn't mean it that there is a server listening.
71 state
= ipc_client_try_connect(path
, &options
, &connection_test
);
72 ipc_client_close_connection(connection_test
);
78 * Retry frequency when trying to connect to a server.
80 * This value should be short enough that we don't seriously delay our
81 * caller, but not fast enough that our spinning puts pressure on the
84 #define WAIT_STEP_MS (50)
87 * Try to connect to the server. If the server is just starting up or
88 * is very busy, we may not get a connection the first time.
90 static enum ipc_active_state
connect_to_server(
93 const struct ipc_client_connect_options
*options
,
100 for (k
= 0; k
< timeout_ms
; k
+= WAIT_STEP_MS
) {
101 int fd
= unix_stream_connect(path
, options
->uds_disallow_chdir
);
105 return IPC_STATE__LISTENING
;
108 if (errno
== ENOENT
) {
109 if (!options
->wait_if_not_found
)
110 return IPC_STATE__PATH_NOT_FOUND
;
112 goto sleep_and_try_again
;
115 if (errno
== ETIMEDOUT
) {
116 if (!options
->wait_if_busy
)
117 return IPC_STATE__NOT_LISTENING
;
119 goto sleep_and_try_again
;
122 if (errno
== ECONNREFUSED
) {
123 if (!options
->wait_if_busy
)
124 return IPC_STATE__NOT_LISTENING
;
126 goto sleep_and_try_again
;
129 return IPC_STATE__OTHER_ERROR
;
132 sleep_millisec(WAIT_STEP_MS
);
135 return IPC_STATE__NOT_LISTENING
;
139 * The total amount of time that we are willing to wait when trying to
140 * connect to a server.
142 * When the server is first started, it might take a little while for
143 * it to become ready to service requests. Likewise, the server may
144 * be very (temporarily) busy and not respond to our connections.
146 * We should gracefully and silently handle those conditions and try
147 * again for a reasonable time period.
149 * The value chosen here should be long enough for the server
150 * to reliably heal from the above conditions.
152 #define MY_CONNECTION_TIMEOUT_MS (1000)
154 enum ipc_active_state
ipc_client_try_connect(
156 const struct ipc_client_connect_options
*options
,
157 struct ipc_client_connection
**p_connection
)
159 enum ipc_active_state state
= IPC_STATE__OTHER_ERROR
;
162 *p_connection
= NULL
;
164 trace2_region_enter("ipc-client", "try-connect", NULL
);
165 trace2_data_string("ipc-client", NULL
, "try-connect/path", path
);
167 state
= connect_to_server(path
, MY_CONNECTION_TIMEOUT_MS
,
170 trace2_data_intmax("ipc-client", NULL
, "try-connect/state",
172 trace2_region_leave("ipc-client", "try-connect", NULL
);
174 if (state
== IPC_STATE__LISTENING
) {
175 (*p_connection
) = xcalloc(1, sizeof(struct ipc_client_connection
));
176 (*p_connection
)->fd
= fd
;
182 void ipc_client_close_connection(struct ipc_client_connection
*connection
)
187 if (connection
->fd
!= -1)
188 close(connection
->fd
);
193 int ipc_client_send_command_to_connection(
194 struct ipc_client_connection
*connection
,
195 const char *message
, size_t message_len
,
196 struct strbuf
*answer
)
200 strbuf_setlen(answer
, 0);
202 trace2_region_enter("ipc-client", "send-command", NULL
);
204 if (write_packetized_from_buf_no_flush(message
, message_len
,
205 connection
->fd
) < 0 ||
206 packet_flush_gently(connection
->fd
) < 0) {
207 ret
= error(_("could not send IPC command"));
211 if (read_packetized_to_strbuf(
212 connection
->fd
, answer
,
213 PACKET_READ_GENTLE_ON_EOF
| PACKET_READ_GENTLE_ON_READ_ERROR
) < 0) {
214 ret
= error(_("could not read IPC response"));
219 trace2_region_leave("ipc-client", "send-command", NULL
);
223 int ipc_client_send_command(const char *path
,
224 const struct ipc_client_connect_options
*options
,
225 const char *message
, size_t message_len
,
226 struct strbuf
*answer
)
229 enum ipc_active_state state
;
230 struct ipc_client_connection
*connection
= NULL
;
232 state
= ipc_client_try_connect(path
, options
, &connection
);
234 if (state
!= IPC_STATE__LISTENING
)
237 ret
= ipc_client_send_command_to_connection(connection
,
238 message
, message_len
,
241 ipc_client_close_connection(connection
);
246 static int set_socket_blocking_flag(int fd
, int make_nonblocking
)
250 flags
= fcntl(fd
, F_GETFL
, NULL
);
255 if (make_nonblocking
)
258 flags
&= ~O_NONBLOCK
;
260 return fcntl(fd
, F_SETFL
, flags
);
264 * Magic numbers used to annotate callback instance data.
265 * These are used to help guard against accidentally passing the
266 * wrong instance data across multiple levels of callbacks (which
267 * is easy to do if there are `void*` arguments).
270 MAGIC_SERVER_REPLY_DATA
,
271 MAGIC_WORKER_THREAD_DATA
,
272 MAGIC_ACCEPT_THREAD_DATA
,
276 struct ipc_server_reply_data
{
279 struct ipc_worker_thread_data
*worker_thread_data
;
282 struct ipc_worker_thread_data
{
284 struct ipc_worker_thread_data
*next_thread
;
285 struct ipc_server_data
*server_data
;
286 pthread_t pthread_id
;
289 struct ipc_accept_thread_data
{
291 struct ipc_server_data
*server_data
;
293 struct unix_ss_socket
*server_socket
;
295 int fd_send_shutdown
;
296 int fd_wait_shutdown
;
297 pthread_t pthread_id
;
301 * With unix-sockets, the conceptual "ipc-server" is implemented as a single
302 * controller "accept-thread" thread and a pool of "worker-thread" threads.
303 * The former does the usual `accept()` loop and dispatches connections
304 * to an idle worker thread. The worker threads wait in an idle loop for
305 * a new connection, communicate with the client and relay data to/from
306 * the `application_cb` and then wait for another connection from the
307 * server thread. This avoids the overhead of constantly creating and
308 * destroying threads.
310 struct ipc_server_data
{
312 ipc_server_application_cb
*application_cb
;
313 void *application_data
;
314 struct strbuf buf_path
;
316 struct ipc_accept_thread_data
*accept_thread
;
317 struct ipc_worker_thread_data
*worker_thread_list
;
319 pthread_mutex_t work_available_mutex
;
320 pthread_cond_t work_available_cond
;
323 * Accepted but not yet processed client connections are kept
324 * in a circular buffer FIFO. The queue is empty when the
325 * positions are equal.
332 int shutdown_requested
;
337 * Remove and return the oldest queued connection.
339 * Returns -1 if empty.
341 static int fifo_dequeue(struct ipc_server_data
*server_data
)
343 /* ASSERT holding mutex */
347 if (server_data
->back_pos
== server_data
->front_pos
)
350 fd
= server_data
->fifo_fds
[server_data
->front_pos
];
351 server_data
->fifo_fds
[server_data
->front_pos
] = -1;
353 server_data
->front_pos
++;
354 if (server_data
->front_pos
== server_data
->queue_size
)
355 server_data
->front_pos
= 0;
361 * Push a new fd onto the back of the queue.
363 * Drop it and return -1 if queue is already full.
365 static int fifo_enqueue(struct ipc_server_data
*server_data
, int fd
)
367 /* ASSERT holding mutex */
371 next_back_pos
= server_data
->back_pos
+ 1;
372 if (next_back_pos
== server_data
->queue_size
)
375 if (next_back_pos
== server_data
->front_pos
) {
376 /* Queue is full. Just drop it. */
381 server_data
->fifo_fds
[server_data
->back_pos
] = fd
;
382 server_data
->back_pos
= next_back_pos
;
388 * Wait for a connection to be queued to the FIFO and return it.
390 * Returns -1 if someone has already requested a shutdown.
392 static int worker_thread__wait_for_connection(
393 struct ipc_worker_thread_data
*worker_thread_data
)
395 /* ASSERT NOT holding mutex */
397 struct ipc_server_data
*server_data
= worker_thread_data
->server_data
;
400 pthread_mutex_lock(&server_data
->work_available_mutex
);
402 if (server_data
->shutdown_requested
)
405 fd
= fifo_dequeue(server_data
);
409 pthread_cond_wait(&server_data
->work_available_cond
,
410 &server_data
->work_available_mutex
);
412 pthread_mutex_unlock(&server_data
->work_available_mutex
);
418 * Forward declare our reply callback function so that any compiler
419 * errors are reported when we actually define the function (in addition
420 * to any errors reported when we try to pass this callback function as
421 * a parameter in a function call). The former are easier to understand.
423 static ipc_server_reply_cb do_io_reply_callback
;
426 * Relay application's response message to the client process.
427 * (We do not flush at this point because we allow the caller
428 * to chunk data to the client thru us.)
430 static int do_io_reply_callback(struct ipc_server_reply_data
*reply_data
,
431 const char *response
, size_t response_len
)
433 if (reply_data
->magic
!= MAGIC_SERVER_REPLY_DATA
)
434 BUG("reply_cb called with wrong instance data");
436 return write_packetized_from_buf_no_flush(response
, response_len
,
440 /* A randomly chosen value. */
441 #define MY_WAIT_POLL_TIMEOUT_MS (10)
444 * If the client hangs up without sending any data on the wire, just
445 * quietly close the socket and ignore this client.
447 * This worker thread is committed to reading the IPC request data
448 * from the client at the other end of this fd. Wait here for the
449 * client to actually put something on the wire -- because if the
450 * client just does a ping (connect and hangup without sending any
451 * data), our use of the pkt-line read routines will spew an error
454 * Return -1 if the client hung up.
455 * Return 0 if data (possibly incomplete) is ready.
457 static int worker_thread__wait_for_io_start(
458 struct ipc_worker_thread_data
*worker_thread_data
,
461 struct ipc_server_data
*server_data
= worker_thread_data
->server_data
;
462 struct pollfd pollfd
[1];
467 pollfd
[0].events
= POLLIN
;
469 result
= poll(pollfd
, 1, MY_WAIT_POLL_TIMEOUT_MS
);
481 pthread_mutex_lock(&server_data
->work_available_mutex
);
482 in_shutdown
= server_data
->shutdown_requested
;
483 pthread_mutex_unlock(&server_data
->work_available_mutex
);
486 * If a shutdown is already in progress and this
487 * client has not started talking yet, just drop it.
494 if (pollfd
[0].revents
& POLLHUP
)
497 if (pollfd
[0].revents
& POLLIN
)
509 * Receive the request/command from the client and pass it to the
510 * registered request-callback. The request-callback will compose
511 * a response and call our reply-callback to send it to the client.
513 static int worker_thread__do_io(
514 struct ipc_worker_thread_data
*worker_thread_data
,
517 /* ASSERT NOT holding lock */
519 struct strbuf buf
= STRBUF_INIT
;
520 struct ipc_server_reply_data reply_data
;
523 reply_data
.magic
= MAGIC_SERVER_REPLY_DATA
;
524 reply_data
.worker_thread_data
= worker_thread_data
;
528 ret
= read_packetized_to_strbuf(
530 PACKET_READ_GENTLE_ON_EOF
| PACKET_READ_GENTLE_ON_READ_ERROR
);
532 ret
= worker_thread_data
->server_data
->application_cb(
533 worker_thread_data
->server_data
->application_data
,
534 buf
.buf
, buf
.len
, do_io_reply_callback
, &reply_data
);
536 packet_flush_gently(reply_data
.fd
);
540 * The client probably disconnected/shutdown before it
541 * could send a well-formed message. Ignore it.
545 strbuf_release(&buf
);
546 close(reply_data
.fd
);
552 * Block SIGPIPE on the current thread (so that we get EPIPE from
553 * write() rather than an actual signal).
555 * Note that using sigchain_push() and _pop() to control SIGPIPE
556 * around our IO calls is not thread safe:
557 * [] It uses a global stack of handler frames.
558 * [] It uses ALLOC_GROW() to resize it.
559 * [] Finally, according to the `signal(2)` man-page:
560 * "The effects of `signal()` in a multithreaded process are unspecified."
562 static void thread_block_sigpipe(sigset_t
*old_set
)
566 sigemptyset(&new_set
);
567 sigaddset(&new_set
, SIGPIPE
);
569 sigemptyset(old_set
);
570 pthread_sigmask(SIG_BLOCK
, &new_set
, old_set
);
574 * Thread proc for an IPC worker thread. It handles a series of
575 * connections from clients. It pulls the next fd from the queue
576 * processes it, and then waits for the next client.
578 * Block SIGPIPE in this worker thread for the life of the thread.
579 * This avoids stray (and sometimes delayed) SIGPIPE signals caused
580 * by client errors and/or when we are under extremely heavy IO load.
582 * This means that the application callback will have SIGPIPE blocked.
583 * The callback should not change it.
585 static void *worker_thread_proc(void *_worker_thread_data
)
587 struct ipc_worker_thread_data
*worker_thread_data
= _worker_thread_data
;
588 struct ipc_server_data
*server_data
= worker_thread_data
->server_data
;
593 trace2_thread_start("ipc-worker");
595 thread_block_sigpipe(&old_set
);
598 fd
= worker_thread__wait_for_connection(worker_thread_data
);
600 break; /* in shutdown */
602 io
= worker_thread__wait_for_io_start(worker_thread_data
, fd
);
604 continue; /* client hung up without sending anything */
606 ret
= worker_thread__do_io(worker_thread_data
, fd
);
608 if (ret
== SIMPLE_IPC_QUIT
) {
609 trace2_data_string("ipc-worker", NULL
, "queue_stop_async",
612 * The application layer is telling the ipc-server
615 * We DO NOT have a response to send to the client.
617 * Queue an async stop (to stop the other threads) and
618 * allow this worker thread to exit now (no sense waiting
619 * for the thread-pool shutdown signal).
621 * Other non-idle worker threads are allowed to finish
622 * responding to their current clients.
624 ipc_server_stop_async(server_data
);
629 trace2_thread_exit();
633 /* A randomly chosen value. */
634 #define MY_ACCEPT_POLL_TIMEOUT_MS (60 * 1000)
637 * Accept a new client connection on our socket. This uses non-blocking
638 * IO so that we can also wait for shutdown requests on our socket-pair
639 * without actually spinning on a fast timeout.
641 static int accept_thread__wait_for_connection(
642 struct ipc_accept_thread_data
*accept_thread_data
)
644 struct pollfd pollfd
[2];
648 pollfd
[0].fd
= accept_thread_data
->fd_wait_shutdown
;
649 pollfd
[0].events
= POLLIN
;
651 pollfd
[1].fd
= accept_thread_data
->server_socket
->fd_socket
;
652 pollfd
[1].events
= POLLIN
;
654 result
= poll(pollfd
, 2, MY_ACCEPT_POLL_TIMEOUT_MS
);
665 * If someone deletes or force-creates a new unix
666 * domain socket at our path, all future clients
667 * will be routed elsewhere and we silently starve.
668 * If that happens, just queue a shutdown.
670 if (unix_ss_was_stolen(
671 accept_thread_data
->server_socket
)) {
672 trace2_data_string("ipc-accept", NULL
,
675 ipc_server_stop_async(
676 accept_thread_data
->server_data
);
681 if (pollfd
[0].revents
& POLLIN
) {
682 /* shutdown message queued to socketpair */
686 if (pollfd
[1].revents
& POLLIN
) {
687 /* a connection is available on server_socket */
690 accept(accept_thread_data
->server_socket
->fd_socket
,
696 * An error here is unlikely -- it probably
697 * indicates that the connecting process has
698 * already dropped the connection.
703 BUG("unandled poll result errno=%d r[0]=%d r[1]=%d",
704 errno
, pollfd
[0].revents
, pollfd
[1].revents
);
709 * Thread proc for the IPC server "accept thread". This waits for
710 * an incoming socket connection, appends it to the queue of available
711 * connections, and notifies a worker thread to process it.
713 * Block SIGPIPE in this thread for the life of the thread. This
714 * avoids any stray SIGPIPE signals when closing pipe fds under
715 * extremely heavy loads (such as when the fifo queue is full and we
716 * drop incomming connections).
718 static void *accept_thread_proc(void *_accept_thread_data
)
720 struct ipc_accept_thread_data
*accept_thread_data
= _accept_thread_data
;
721 struct ipc_server_data
*server_data
= accept_thread_data
->server_data
;
724 trace2_thread_start("ipc-accept");
726 thread_block_sigpipe(&old_set
);
729 int client_fd
= accept_thread__wait_for_connection(
732 pthread_mutex_lock(&server_data
->work_available_mutex
);
733 if (server_data
->shutdown_requested
) {
734 pthread_mutex_unlock(&server_data
->work_available_mutex
);
741 /* ignore transient accept() errors */
744 fifo_enqueue(server_data
, client_fd
);
745 pthread_cond_broadcast(&server_data
->work_available_cond
);
747 pthread_mutex_unlock(&server_data
->work_available_mutex
);
750 trace2_thread_exit();
755 * We can't predict the connection arrival rate relative to the worker
756 * processing rate, therefore we allow the "accept-thread" to queue up
757 * a generous number of connections, since we'd rather have the client
758 * not unnecessarily timeout if we can avoid it. (The assumption is
759 * that this will be used for FSMonitor and a few second wait on a
760 * connection is better than having the client timeout and do the full
761 * computation itself.)
763 * The FIFO queue size is set to a multiple of the worker pool size.
764 * This value chosen at random.
766 #define FIFO_SCALE (100)
769 * The backlog value for `listen(2)`. This doesn't need to huge,
770 * rather just large enough for our "accept-thread" to wake up and
771 * queue incoming connections onto the FIFO without the kernel
774 * This value chosen at random.
776 #define LISTEN_BACKLOG (50)
778 static int create_listener_socket(
780 const struct ipc_server_opts
*ipc_opts
,
781 struct unix_ss_socket
**new_server_socket
)
783 struct unix_ss_socket
*server_socket
= NULL
;
784 struct unix_stream_listen_opts uslg_opts
= UNIX_STREAM_LISTEN_OPTS_INIT
;
787 uslg_opts
.listen_backlog_size
= LISTEN_BACKLOG
;
788 uslg_opts
.disallow_chdir
= ipc_opts
->uds_disallow_chdir
;
790 ret
= unix_ss_create(path
, &uslg_opts
, -1, &server_socket
);
794 if (set_socket_blocking_flag(server_socket
->fd_socket
, 1)) {
795 int saved_errno
= errno
;
796 unix_ss_free(server_socket
);
801 *new_server_socket
= server_socket
;
803 trace2_data_string("ipc-server", NULL
, "listen-with-lock", path
);
807 static int setup_listener_socket(
809 const struct ipc_server_opts
*ipc_opts
,
810 struct unix_ss_socket
**new_server_socket
)
812 int ret
, saved_errno
;
814 trace2_region_enter("ipc-server", "create-listener_socket", NULL
);
816 ret
= create_listener_socket(path
, ipc_opts
, new_server_socket
);
819 trace2_region_leave("ipc-server", "create-listener_socket", NULL
);
826 * Start IPC server in a pool of background threads.
828 int ipc_server_run_async(struct ipc_server_data
**returned_server_data
,
829 const char *path
, const struct ipc_server_opts
*opts
,
830 ipc_server_application_cb
*application_cb
,
831 void *application_data
)
833 struct unix_ss_socket
*server_socket
= NULL
;
834 struct ipc_server_data
*server_data
;
838 int nr_threads
= opts
->nr_threads
;
840 *returned_server_data
= NULL
;
843 * Create a socketpair and set sv[1] to non-blocking. This
844 * will used to send a shutdown message to the accept-thread
845 * and allows the accept-thread to wait on EITHER a client
846 * connection or a shutdown request without spinning.
848 if (socketpair(AF_UNIX
, SOCK_STREAM
, 0, sv
) < 0)
851 if (set_socket_blocking_flag(sv
[1], 1)) {
852 int saved_errno
= errno
;
859 ret
= setup_listener_socket(path
, opts
, &server_socket
);
861 int saved_errno
= errno
;
868 server_data
= xcalloc(1, sizeof(*server_data
));
869 server_data
->magic
= MAGIC_SERVER_DATA
;
870 server_data
->application_cb
= application_cb
;
871 server_data
->application_data
= application_data
;
872 strbuf_init(&server_data
->buf_path
, 0);
873 strbuf_addstr(&server_data
->buf_path
, path
);
878 pthread_mutex_init(&server_data
->work_available_mutex
, NULL
);
879 pthread_cond_init(&server_data
->work_available_cond
, NULL
);
881 server_data
->queue_size
= nr_threads
* FIFO_SCALE
;
882 CALLOC_ARRAY(server_data
->fifo_fds
, server_data
->queue_size
);
884 server_data
->accept_thread
=
885 xcalloc(1, sizeof(*server_data
->accept_thread
));
886 server_data
->accept_thread
->magic
= MAGIC_ACCEPT_THREAD_DATA
;
887 server_data
->accept_thread
->server_data
= server_data
;
888 server_data
->accept_thread
->server_socket
= server_socket
;
889 server_data
->accept_thread
->fd_send_shutdown
= sv
[0];
890 server_data
->accept_thread
->fd_wait_shutdown
= sv
[1];
892 if (pthread_create(&server_data
->accept_thread
->pthread_id
, NULL
,
893 accept_thread_proc
, server_data
->accept_thread
))
894 die_errno(_("could not start accept_thread '%s'"), path
);
896 for (k
= 0; k
< nr_threads
; k
++) {
897 struct ipc_worker_thread_data
*wtd
;
899 wtd
= xcalloc(1, sizeof(*wtd
));
900 wtd
->magic
= MAGIC_WORKER_THREAD_DATA
;
901 wtd
->server_data
= server_data
;
903 if (pthread_create(&wtd
->pthread_id
, NULL
, worker_thread_proc
,
906 die(_("could not start worker[0] for '%s'"),
909 * Limp along with the thread pool that we have.
914 wtd
->next_thread
= server_data
->worker_thread_list
;
915 server_data
->worker_thread_list
= wtd
;
918 *returned_server_data
= server_data
;
923 * Gently tell the IPC server treads to shutdown.
924 * Can be run on any thread.
926 int ipc_server_stop_async(struct ipc_server_data
*server_data
)
928 /* ASSERT NOT holding mutex */
935 trace2_region_enter("ipc-server", "server-stop-async", NULL
);
937 pthread_mutex_lock(&server_data
->work_available_mutex
);
939 server_data
->shutdown_requested
= 1;
942 * Write a byte to the shutdown socket pair to wake up the
945 if (write(server_data
->accept_thread
->fd_send_shutdown
, "Q", 1) < 0)
946 error_errno("could not write to fd_send_shutdown");
949 * Drain the queue of existing connections.
951 while ((fd
= fifo_dequeue(server_data
)) != -1)
955 * Gently tell worker threads to stop processing new connections
956 * and exit. (This does not abort in-process conversations.)
958 pthread_cond_broadcast(&server_data
->work_available_cond
);
960 pthread_mutex_unlock(&server_data
->work_available_mutex
);
962 trace2_region_leave("ipc-server", "server-stop-async", NULL
);
968 * Wait for all IPC server threads to stop.
970 int ipc_server_await(struct ipc_server_data
*server_data
)
972 pthread_join(server_data
->accept_thread
->pthread_id
, NULL
);
974 if (!server_data
->shutdown_requested
)
975 BUG("ipc-server: accept-thread stopped for '%s'",
976 server_data
->buf_path
.buf
);
978 while (server_data
->worker_thread_list
) {
979 struct ipc_worker_thread_data
*wtd
=
980 server_data
->worker_thread_list
;
982 pthread_join(wtd
->pthread_id
, NULL
);
984 server_data
->worker_thread_list
= wtd
->next_thread
;
988 server_data
->is_stopped
= 1;
993 void ipc_server_free(struct ipc_server_data
*server_data
)
995 struct ipc_accept_thread_data
* accept_thread_data
;
1000 if (!server_data
->is_stopped
)
1001 BUG("cannot free ipc-server while running for '%s'",
1002 server_data
->buf_path
.buf
);
1004 accept_thread_data
= server_data
->accept_thread
;
1005 if (accept_thread_data
) {
1006 unix_ss_free(accept_thread_data
->server_socket
);
1008 if (accept_thread_data
->fd_send_shutdown
!= -1)
1009 close(accept_thread_data
->fd_send_shutdown
);
1010 if (accept_thread_data
->fd_wait_shutdown
!= -1)
1011 close(accept_thread_data
->fd_wait_shutdown
);
1013 free(server_data
->accept_thread
);
1016 while (server_data
->worker_thread_list
) {
1017 struct ipc_worker_thread_data
*wtd
=
1018 server_data
->worker_thread_list
;
1020 server_data
->worker_thread_list
= wtd
->next_thread
;
1024 pthread_cond_destroy(&server_data
->work_available_cond
);
1025 pthread_mutex_destroy(&server_data
->work_available_mutex
);
1027 strbuf_release(&server_data
->buf_path
);
1029 free(server_data
->fifo_fds
);