2 #include "simple-ipc.h"
5 #include "thread-utils.h"
7 #ifndef SUPPORTS_SIMPLE_IPC
9 * This source file should only be compiled when Simple IPC is supported.
10 * See the top-level Makefile.
12 #error SUPPORTS_SIMPLE_IPC not defined
15 static int initialize_pipe_name(const char *path
, wchar_t *wpath
, size_t alloc
)
18 struct strbuf realpath
= STRBUF_INIT
;
20 if (!strbuf_realpath(&realpath
, path
, 0))
23 off
= swprintf(wpath
, alloc
, L
"\\\\.\\pipe\\");
24 if (xutftowcs(wpath
+ off
, realpath
.buf
, alloc
- off
) < 0)
27 /* Handle drive prefix */
28 if (wpath
[off
] && wpath
[off
+ 1] == L
':') {
29 wpath
[off
+ 1] = L
'_';
33 for (; wpath
[off
]; off
++)
34 if (wpath
[off
] == L
'/')
37 strbuf_release(&realpath
);
41 static enum ipc_active_state
get_active_state(wchar_t *pipe_path
)
43 if (WaitNamedPipeW(pipe_path
, NMPWAIT_USE_DEFAULT_WAIT
))
44 return IPC_STATE__LISTENING
;
46 if (GetLastError() == ERROR_SEM_TIMEOUT
)
47 return IPC_STATE__NOT_LISTENING
;
49 if (GetLastError() == ERROR_FILE_NOT_FOUND
)
50 return IPC_STATE__PATH_NOT_FOUND
;
52 return IPC_STATE__OTHER_ERROR
;
55 enum ipc_active_state
ipc_get_active_state(const char *path
)
57 wchar_t pipe_path
[MAX_PATH
];
59 if (initialize_pipe_name(path
, pipe_path
, ARRAY_SIZE(pipe_path
)) < 0)
60 return IPC_STATE__INVALID_PATH
;
62 return get_active_state(pipe_path
);
65 #define WAIT_STEP_MS (50)
67 static enum ipc_active_state
connect_to_server(
70 const struct ipc_client_connect_options
*options
,
73 DWORD t_start_ms
, t_waited_ms
;
75 HANDLE hPipe
= INVALID_HANDLE_VALUE
;
76 DWORD mode
= PIPE_READMODE_BYTE
;
82 hPipe
= CreateFileW(wpath
, GENERIC_READ
| GENERIC_WRITE
,
83 0, NULL
, OPEN_EXISTING
, 0, NULL
);
84 if (hPipe
!= INVALID_HANDLE_VALUE
)
90 case ERROR_FILE_NOT_FOUND
:
91 if (!options
->wait_if_not_found
)
92 return IPC_STATE__PATH_NOT_FOUND
;
94 return IPC_STATE__PATH_NOT_FOUND
;
96 step_ms
= (timeout_ms
< WAIT_STEP_MS
) ?
97 timeout_ms
: WAIT_STEP_MS
;
98 sleep_millisec(step_ms
);
100 timeout_ms
-= step_ms
;
101 break; /* try again */
103 case ERROR_PIPE_BUSY
:
104 if (!options
->wait_if_busy
)
105 return IPC_STATE__NOT_LISTENING
;
107 return IPC_STATE__NOT_LISTENING
;
109 t_start_ms
= (DWORD
)(getnanotime() / 1000000);
111 if (!WaitNamedPipeW(wpath
, timeout_ms
)) {
112 if (GetLastError() == ERROR_SEM_TIMEOUT
)
113 return IPC_STATE__NOT_LISTENING
;
115 return IPC_STATE__OTHER_ERROR
;
119 * A pipe server instance became available.
120 * Race other client processes to connect to
123 * But first decrement our overall timeout so
124 * that we don't starve if we keep losing the
125 * race. But also guard against special
126 * NPMWAIT_ values (0 and -1).
128 t_waited_ms
= (DWORD
)(getnanotime() / 1000000) - t_start_ms
;
129 if (t_waited_ms
< timeout_ms
)
130 timeout_ms
-= t_waited_ms
;
133 break; /* try again */
136 return IPC_STATE__OTHER_ERROR
;
140 if (!SetNamedPipeHandleState(hPipe
, &mode
, NULL
, NULL
)) {
142 return IPC_STATE__OTHER_ERROR
;
145 *pfd
= _open_osfhandle((intptr_t)hPipe
, O_RDWR
|O_BINARY
);
148 return IPC_STATE__OTHER_ERROR
;
151 /* fd now owns hPipe */
153 return IPC_STATE__LISTENING
;
157 * The default connection timeout for Windows clients.
159 * This is not currently part of the ipc_ API (nor the config settings)
160 * because of differences between Windows and other platforms.
162 * This value was chosen at random.
164 #define WINDOWS_CONNECTION_TIMEOUT_MS (30000)
166 enum ipc_active_state
ipc_client_try_connect(
168 const struct ipc_client_connect_options
*options
,
169 struct ipc_client_connection
**p_connection
)
171 wchar_t wpath
[MAX_PATH
];
172 enum ipc_active_state state
= IPC_STATE__OTHER_ERROR
;
175 *p_connection
= NULL
;
177 trace2_region_enter("ipc-client", "try-connect", NULL
);
178 trace2_data_string("ipc-client", NULL
, "try-connect/path", path
);
180 if (initialize_pipe_name(path
, wpath
, ARRAY_SIZE(wpath
)) < 0)
181 state
= IPC_STATE__INVALID_PATH
;
183 state
= connect_to_server(wpath
, WINDOWS_CONNECTION_TIMEOUT_MS
,
186 trace2_data_intmax("ipc-client", NULL
, "try-connect/state",
188 trace2_region_leave("ipc-client", "try-connect", NULL
);
190 if (state
== IPC_STATE__LISTENING
) {
191 (*p_connection
) = xcalloc(1, sizeof(struct ipc_client_connection
));
192 (*p_connection
)->fd
= fd
;
198 void ipc_client_close_connection(struct ipc_client_connection
*connection
)
203 if (connection
->fd
!= -1)
204 close(connection
->fd
);
209 int ipc_client_send_command_to_connection(
210 struct ipc_client_connection
*connection
,
211 const char *message
, struct strbuf
*answer
)
215 strbuf_setlen(answer
, 0);
217 trace2_region_enter("ipc-client", "send-command", NULL
);
219 if (write_packetized_from_buf_no_flush(message
, strlen(message
),
220 connection
->fd
) < 0 ||
221 packet_flush_gently(connection
->fd
) < 0) {
222 ret
= error(_("could not send IPC command"));
226 FlushFileBuffers((HANDLE
)_get_osfhandle(connection
->fd
));
228 if (read_packetized_to_strbuf(
229 connection
->fd
, answer
,
230 PACKET_READ_GENTLE_ON_EOF
| PACKET_READ_GENTLE_ON_READ_ERROR
) < 0) {
231 ret
= error(_("could not read IPC response"));
236 trace2_region_leave("ipc-client", "send-command", NULL
);
240 int ipc_client_send_command(const char *path
,
241 const struct ipc_client_connect_options
*options
,
242 const char *message
, struct strbuf
*response
)
245 enum ipc_active_state state
;
246 struct ipc_client_connection
*connection
= NULL
;
248 state
= ipc_client_try_connect(path
, options
, &connection
);
250 if (state
!= IPC_STATE__LISTENING
)
253 ret
= ipc_client_send_command_to_connection(connection
, message
, response
);
255 ipc_client_close_connection(connection
);
261 * Duplicate the given pipe handle and wrap it in a file descriptor so
262 * that we can use pkt-line on it.
264 static int dup_fd_from_pipe(const HANDLE pipe
)
266 HANDLE process
= GetCurrentProcess();
270 if (!DuplicateHandle(process
, pipe
, process
, &handle
, 0, FALSE
,
271 DUPLICATE_SAME_ACCESS
)) {
272 errno
= err_win_to_posix(GetLastError());
276 fd
= _open_osfhandle((intptr_t)handle
, O_RDWR
|O_BINARY
);
278 errno
= err_win_to_posix(GetLastError());
284 * `handle` is now owned by `fd` and will be automatically closed
285 * when the descriptor is closed.
292 * Magic numbers used to annotate callback instance data.
293 * These are used to help guard against accidentally passing the
294 * wrong instance data across multiple levels of callbacks (which
295 * is easy to do if there are `void*` arguments).
298 MAGIC_SERVER_REPLY_DATA
,
299 MAGIC_SERVER_THREAD_DATA
,
303 struct ipc_server_reply_data
{
306 struct ipc_server_thread_data
*server_thread_data
;
309 struct ipc_server_thread_data
{
311 struct ipc_server_thread_data
*next_thread
;
312 struct ipc_server_data
*server_data
;
313 pthread_t pthread_id
;
318 * On Windows, the conceptual "ipc-server" is implemented as a pool of
319 * n idential/peer "server-thread" threads. That is, there is no
320 * hierarchy of threads; and therefore no controller thread managing
321 * the pool. Each thread has an independent handle to the named pipe,
322 * receives incoming connections, processes the client, and re-uses
323 * the pipe for the next client connection.
325 * Therefore, the "ipc-server" only needs to maintain a list of the
326 * spawned threads for eventual "join" purposes.
328 * A single "stop-event" is visible to all of the server threads to
329 * tell them to shutdown (when idle).
331 struct ipc_server_data
{
333 ipc_server_application_cb
*application_cb
;
334 void *application_data
;
335 struct strbuf buf_path
;
336 wchar_t wpath
[MAX_PATH
];
338 HANDLE hEventStopRequested
;
339 struct ipc_server_thread_data
*thread_list
;
343 enum connect_result
{
351 static enum connect_result
queue_overlapped_connect(
352 struct ipc_server_thread_data
*server_thread_data
,
355 if (ConnectNamedPipe(server_thread_data
->hPipe
, lpo
))
358 switch (GetLastError()) {
359 case ERROR_IO_PENDING
:
360 return CR_CONNECT_PENDING
;
362 case ERROR_PIPE_CONNECTED
:
363 SetEvent(lpo
->hEvent
);
371 error(_("ConnectNamedPipe failed for '%s' (%lu)"),
372 server_thread_data
->server_data
->buf_path
.buf
,
374 return CR_CONNECT_ERROR
;
378 * Use Windows Overlapped IO to wait for a connection or for our event
381 static enum connect_result
wait_for_connection(
382 struct ipc_server_thread_data
*server_thread_data
,
385 enum connect_result r
;
386 HANDLE waitHandles
[2];
389 r
= queue_overlapped_connect(server_thread_data
, lpo
);
390 if (r
!= CR_CONNECT_PENDING
)
393 waitHandles
[0] = server_thread_data
->server_data
->hEventStopRequested
;
394 waitHandles
[1] = lpo
->hEvent
;
396 dwWaitResult
= WaitForMultipleObjects(2, waitHandles
, FALSE
, INFINITE
);
397 switch (dwWaitResult
) {
398 case WAIT_OBJECT_0
+ 0:
401 case WAIT_OBJECT_0
+ 1:
402 ResetEvent(lpo
->hEvent
);
406 return CR_WAIT_ERROR
;
411 * Forward declare our reply callback function so that any compiler
412 * errors are reported when we actually define the function (in addition
413 * to any errors reported when we try to pass this callback function as
414 * a parameter in a function call). The former are easier to understand.
416 static ipc_server_reply_cb do_io_reply_callback
;
419 * Relay application's response message to the client process.
420 * (We do not flush at this point because we allow the caller
421 * to chunk data to the client thru us.)
423 static int do_io_reply_callback(struct ipc_server_reply_data
*reply_data
,
424 const char *response
, size_t response_len
)
426 if (reply_data
->magic
!= MAGIC_SERVER_REPLY_DATA
)
427 BUG("reply_cb called with wrong instance data");
429 return write_packetized_from_buf_no_flush(response
, response_len
,
434 * Receive the request/command from the client and pass it to the
435 * registered request-callback. The request-callback will compose
436 * a response and call our reply-callback to send it to the client.
438 * Simple-IPC only contains one round trip, so we flush and close
439 * here after the response.
441 static int do_io(struct ipc_server_thread_data
*server_thread_data
)
443 struct strbuf buf
= STRBUF_INIT
;
444 struct ipc_server_reply_data reply_data
;
447 reply_data
.magic
= MAGIC_SERVER_REPLY_DATA
;
448 reply_data
.server_thread_data
= server_thread_data
;
450 reply_data
.fd
= dup_fd_from_pipe(server_thread_data
->hPipe
);
451 if (reply_data
.fd
< 0)
452 return error(_("could not create fd from pipe for '%s'"),
453 server_thread_data
->server_data
->buf_path
.buf
);
455 ret
= read_packetized_to_strbuf(
457 PACKET_READ_GENTLE_ON_EOF
| PACKET_READ_GENTLE_ON_READ_ERROR
);
459 ret
= server_thread_data
->server_data
->application_cb(
460 server_thread_data
->server_data
->application_data
,
461 buf
.buf
, do_io_reply_callback
, &reply_data
);
463 packet_flush_gently(reply_data
.fd
);
465 FlushFileBuffers((HANDLE
)_get_osfhandle((reply_data
.fd
)));
469 * The client probably disconnected/shutdown before it
470 * could send a well-formed message. Ignore it.
474 strbuf_release(&buf
);
475 close(reply_data
.fd
);
481 * Handle IPC request and response with this connected client. And reset
482 * the pipe to prepare for the next client.
484 static int use_connection(struct ipc_server_thread_data
*server_thread_data
)
488 ret
= do_io(server_thread_data
);
490 FlushFileBuffers(server_thread_data
->hPipe
);
491 DisconnectNamedPipe(server_thread_data
->hPipe
);
497 * Thread proc for an IPC server worker thread. It handles a series of
498 * connections from clients. It cleans and reuses the hPipe between each
501 static void *server_thread_proc(void *_server_thread_data
)
503 struct ipc_server_thread_data
*server_thread_data
= _server_thread_data
;
504 HANDLE hEventConnected
= INVALID_HANDLE_VALUE
;
506 enum connect_result cr
;
509 assert(server_thread_data
->hPipe
!= INVALID_HANDLE_VALUE
);
511 trace2_thread_start("ipc-server");
512 trace2_data_string("ipc-server", NULL
, "pipe",
513 server_thread_data
->server_data
->buf_path
.buf
);
515 hEventConnected
= CreateEventW(NULL
, TRUE
, FALSE
, NULL
);
517 memset(&oConnect
, 0, sizeof(oConnect
));
518 oConnect
.hEvent
= hEventConnected
;
521 cr
= wait_for_connection(server_thread_data
, &oConnect
);
528 ret
= use_connection(server_thread_data
);
529 if (ret
== SIMPLE_IPC_QUIT
) {
530 ipc_server_stop_async(
531 server_thread_data
->server_data
);
536 * Ignore (transient) IO errors with this
537 * client and reset for the next client.
542 case CR_CONNECT_PENDING
:
543 /* By construction, this should not happen. */
544 BUG("ipc-server[%s]: unexpeced CR_CONNECT_PENDING",
545 server_thread_data
->server_data
->buf_path
.buf
);
547 case CR_CONNECT_ERROR
:
550 * Ignore these theoretical errors.
552 DisconnectNamedPipe(server_thread_data
->hPipe
);
556 BUG("unandled case after wait_for_connection");
561 CloseHandle(server_thread_data
->hPipe
);
562 CloseHandle(hEventConnected
);
564 trace2_thread_exit();
568 static HANDLE
create_new_pipe(wchar_t *wpath
, int is_first
)
571 DWORD dwOpenMode
, dwPipeMode
;
572 LPSECURITY_ATTRIBUTES lpsa
= NULL
;
574 dwOpenMode
= PIPE_ACCESS_INBOUND
| PIPE_ACCESS_OUTBOUND
|
575 FILE_FLAG_OVERLAPPED
;
577 dwPipeMode
= PIPE_TYPE_MESSAGE
| PIPE_READMODE_BYTE
| PIPE_WAIT
|
578 PIPE_REJECT_REMOTE_CLIENTS
;
581 dwOpenMode
|= FILE_FLAG_FIRST_PIPE_INSTANCE
;
584 * On Windows, the first server pipe instance gets to
585 * set the ACL / Security Attributes on the named
586 * pipe; subsequent instances inherit and cannot
589 * TODO Should we allow the application layer to
590 * specify security attributes, such as `LocalService`
591 * or `LocalSystem`, when we create the named pipe?
592 * This question is probably not important when the
593 * daemon is started by a foreground user process and
594 * only needs to talk to the current user, but may be
595 * if the daemon is run via the Control Panel as a
600 hPipe
= CreateNamedPipeW(wpath
, dwOpenMode
, dwPipeMode
,
601 PIPE_UNLIMITED_INSTANCES
, 1024, 1024, 0, lpsa
);
606 int ipc_server_run_async(struct ipc_server_data
**returned_server_data
,
607 const char *path
, const struct ipc_server_opts
*opts
,
608 ipc_server_application_cb
*application_cb
,
609 void *application_data
)
611 struct ipc_server_data
*server_data
;
612 wchar_t wpath
[MAX_PATH
];
613 HANDLE hPipeFirst
= INVALID_HANDLE_VALUE
;
616 int nr_threads
= opts
->nr_threads
;
618 *returned_server_data
= NULL
;
620 ret
= initialize_pipe_name(path
, wpath
, ARRAY_SIZE(wpath
));
626 hPipeFirst
= create_new_pipe(wpath
, 1);
627 if (hPipeFirst
== INVALID_HANDLE_VALUE
) {
632 server_data
= xcalloc(1, sizeof(*server_data
));
633 server_data
->magic
= MAGIC_SERVER_DATA
;
634 server_data
->application_cb
= application_cb
;
635 server_data
->application_data
= application_data
;
636 server_data
->hEventStopRequested
= CreateEvent(NULL
, TRUE
, FALSE
, NULL
);
637 strbuf_init(&server_data
->buf_path
, 0);
638 strbuf_addstr(&server_data
->buf_path
, path
);
639 wcscpy(server_data
->wpath
, wpath
);
644 for (k
= 0; k
< nr_threads
; k
++) {
645 struct ipc_server_thread_data
*std
;
647 std
= xcalloc(1, sizeof(*std
));
648 std
->magic
= MAGIC_SERVER_THREAD_DATA
;
649 std
->server_data
= server_data
;
650 std
->hPipe
= INVALID_HANDLE_VALUE
;
652 std
->hPipe
= (k
== 0)
654 : create_new_pipe(server_data
->wpath
, 0);
656 if (std
->hPipe
== INVALID_HANDLE_VALUE
) {
658 * If we've reached a pipe instance limit for
659 * this path, just use fewer threads.
665 if (pthread_create(&std
->pthread_id
, NULL
,
666 server_thread_proc
, std
)) {
668 * Likewise, if we're out of threads, just use
669 * fewer threads than requested.
671 * However, we just give up if we can't even get
672 * one thread. This should not happen.
675 die(_("could not start thread[0] for '%s'"),
678 CloseHandle(std
->hPipe
);
683 std
->next_thread
= server_data
->thread_list
;
684 server_data
->thread_list
= std
;
687 *returned_server_data
= server_data
;
691 int ipc_server_stop_async(struct ipc_server_data
*server_data
)
697 * Gently tell all of the ipc_server threads to shutdown.
698 * This will be seen the next time they are idle (and waiting
701 * We DO NOT attempt to force them to drop an active connection.
703 SetEvent(server_data
->hEventStopRequested
);
707 int ipc_server_await(struct ipc_server_data
*server_data
)
714 dwWaitResult
= WaitForSingleObject(server_data
->hEventStopRequested
, INFINITE
);
715 if (dwWaitResult
!= WAIT_OBJECT_0
)
716 return error(_("wait for hEvent failed for '%s'"),
717 server_data
->buf_path
.buf
);
719 while (server_data
->thread_list
) {
720 struct ipc_server_thread_data
*std
= server_data
->thread_list
;
722 pthread_join(std
->pthread_id
, NULL
);
724 server_data
->thread_list
= std
->next_thread
;
728 server_data
->is_stopped
= 1;
733 void ipc_server_free(struct ipc_server_data
*server_data
)
738 if (!server_data
->is_stopped
)
739 BUG("cannot free ipc-server while running for '%s'",
740 server_data
->buf_path
.buf
);
742 strbuf_release(&server_data
->buf_path
);
744 if (server_data
->hEventStopRequested
!= INVALID_HANDLE_VALUE
)
745 CloseHandle(server_data
->hEventStopRequested
);
747 while (server_data
->thread_list
) {
748 struct ipc_server_thread_data
*std
= server_data
->thread_list
;
750 server_data
->thread_list
= std
->next_thread
;