]> git.ipfire.org Git - thirdparty/git.git/blob - compat/simple-ipc/ipc-win32.c
Sync with 2.31.5
[thirdparty/git.git] / compat / simple-ipc / ipc-win32.c
1 #include "cache.h"
2 #include "simple-ipc.h"
3 #include "strbuf.h"
4 #include "pkt-line.h"
5 #include "thread-utils.h"
6
7 #ifndef SUPPORTS_SIMPLE_IPC
8 /*
9 * This source file should only be compiled when Simple IPC is supported.
10 * See the top-level Makefile.
11 */
12 #error SUPPORTS_SIMPLE_IPC not defined
13 #endif
14
15 static int initialize_pipe_name(const char *path, wchar_t *wpath, size_t alloc)
16 {
17 int off = 0;
18 struct strbuf realpath = STRBUF_INIT;
19
20 if (!strbuf_realpath(&realpath, path, 0))
21 return -1;
22
23 off = swprintf(wpath, alloc, L"\\\\.\\pipe\\");
24 if (xutftowcs(wpath + off, realpath.buf, alloc - off) < 0)
25 return -1;
26
27 /* Handle drive prefix */
28 if (wpath[off] && wpath[off + 1] == L':') {
29 wpath[off + 1] = L'_';
30 off += 2;
31 }
32
33 for (; wpath[off]; off++)
34 if (wpath[off] == L'/')
35 wpath[off] = L'\\';
36
37 strbuf_release(&realpath);
38 return 0;
39 }
40
41 static enum ipc_active_state get_active_state(wchar_t *pipe_path)
42 {
43 if (WaitNamedPipeW(pipe_path, NMPWAIT_USE_DEFAULT_WAIT))
44 return IPC_STATE__LISTENING;
45
46 if (GetLastError() == ERROR_SEM_TIMEOUT)
47 return IPC_STATE__NOT_LISTENING;
48
49 if (GetLastError() == ERROR_FILE_NOT_FOUND)
50 return IPC_STATE__PATH_NOT_FOUND;
51
52 return IPC_STATE__OTHER_ERROR;
53 }
54
55 enum ipc_active_state ipc_get_active_state(const char *path)
56 {
57 wchar_t pipe_path[MAX_PATH];
58
59 if (initialize_pipe_name(path, pipe_path, ARRAY_SIZE(pipe_path)) < 0)
60 return IPC_STATE__INVALID_PATH;
61
62 return get_active_state(pipe_path);
63 }
64
65 #define WAIT_STEP_MS (50)
66
67 static enum ipc_active_state connect_to_server(
68 const wchar_t *wpath,
69 DWORD timeout_ms,
70 const struct ipc_client_connect_options *options,
71 int *pfd)
72 {
73 DWORD t_start_ms, t_waited_ms;
74 DWORD step_ms;
75 HANDLE hPipe = INVALID_HANDLE_VALUE;
76 DWORD mode = PIPE_READMODE_BYTE;
77 DWORD gle;
78
79 *pfd = -1;
80
81 for (;;) {
82 hPipe = CreateFileW(wpath, GENERIC_READ | GENERIC_WRITE,
83 0, NULL, OPEN_EXISTING, 0, NULL);
84 if (hPipe != INVALID_HANDLE_VALUE)
85 break;
86
87 gle = GetLastError();
88
89 switch (gle) {
90 case ERROR_FILE_NOT_FOUND:
91 if (!options->wait_if_not_found)
92 return IPC_STATE__PATH_NOT_FOUND;
93 if (!timeout_ms)
94 return IPC_STATE__PATH_NOT_FOUND;
95
96 step_ms = (timeout_ms < WAIT_STEP_MS) ?
97 timeout_ms : WAIT_STEP_MS;
98 sleep_millisec(step_ms);
99
100 timeout_ms -= step_ms;
101 break; /* try again */
102
103 case ERROR_PIPE_BUSY:
104 if (!options->wait_if_busy)
105 return IPC_STATE__NOT_LISTENING;
106 if (!timeout_ms)
107 return IPC_STATE__NOT_LISTENING;
108
109 t_start_ms = (DWORD)(getnanotime() / 1000000);
110
111 if (!WaitNamedPipeW(wpath, timeout_ms)) {
112 if (GetLastError() == ERROR_SEM_TIMEOUT)
113 return IPC_STATE__NOT_LISTENING;
114
115 return IPC_STATE__OTHER_ERROR;
116 }
117
118 /*
119 * A pipe server instance became available.
120 * Race other client processes to connect to
121 * it.
122 *
123 * But first decrement our overall timeout so
124 * that we don't starve if we keep losing the
125 * race. But also guard against special
126 * NPMWAIT_ values (0 and -1).
127 */
128 t_waited_ms = (DWORD)(getnanotime() / 1000000) - t_start_ms;
129 if (t_waited_ms < timeout_ms)
130 timeout_ms -= t_waited_ms;
131 else
132 timeout_ms = 1;
133 break; /* try again */
134
135 default:
136 return IPC_STATE__OTHER_ERROR;
137 }
138 }
139
140 if (!SetNamedPipeHandleState(hPipe, &mode, NULL, NULL)) {
141 CloseHandle(hPipe);
142 return IPC_STATE__OTHER_ERROR;
143 }
144
145 *pfd = _open_osfhandle((intptr_t)hPipe, O_RDWR|O_BINARY);
146 if (*pfd < 0) {
147 CloseHandle(hPipe);
148 return IPC_STATE__OTHER_ERROR;
149 }
150
151 /* fd now owns hPipe */
152
153 return IPC_STATE__LISTENING;
154 }
155
156 /*
157 * The default connection timeout for Windows clients.
158 *
159 * This is not currently part of the ipc_ API (nor the config settings)
160 * because of differences between Windows and other platforms.
161 *
162 * This value was chosen at random.
163 */
164 #define WINDOWS_CONNECTION_TIMEOUT_MS (30000)
165
166 enum ipc_active_state ipc_client_try_connect(
167 const char *path,
168 const struct ipc_client_connect_options *options,
169 struct ipc_client_connection **p_connection)
170 {
171 wchar_t wpath[MAX_PATH];
172 enum ipc_active_state state = IPC_STATE__OTHER_ERROR;
173 int fd = -1;
174
175 *p_connection = NULL;
176
177 trace2_region_enter("ipc-client", "try-connect", NULL);
178 trace2_data_string("ipc-client", NULL, "try-connect/path", path);
179
180 if (initialize_pipe_name(path, wpath, ARRAY_SIZE(wpath)) < 0)
181 state = IPC_STATE__INVALID_PATH;
182 else
183 state = connect_to_server(wpath, WINDOWS_CONNECTION_TIMEOUT_MS,
184 options, &fd);
185
186 trace2_data_intmax("ipc-client", NULL, "try-connect/state",
187 (intmax_t)state);
188 trace2_region_leave("ipc-client", "try-connect", NULL);
189
190 if (state == IPC_STATE__LISTENING) {
191 (*p_connection) = xcalloc(1, sizeof(struct ipc_client_connection));
192 (*p_connection)->fd = fd;
193 }
194
195 return state;
196 }
197
198 void ipc_client_close_connection(struct ipc_client_connection *connection)
199 {
200 if (!connection)
201 return;
202
203 if (connection->fd != -1)
204 close(connection->fd);
205
206 free(connection);
207 }
208
209 int ipc_client_send_command_to_connection(
210 struct ipc_client_connection *connection,
211 const char *message, struct strbuf *answer)
212 {
213 int ret = 0;
214
215 strbuf_setlen(answer, 0);
216
217 trace2_region_enter("ipc-client", "send-command", NULL);
218
219 if (write_packetized_from_buf_no_flush(message, strlen(message),
220 connection->fd) < 0 ||
221 packet_flush_gently(connection->fd) < 0) {
222 ret = error(_("could not send IPC command"));
223 goto done;
224 }
225
226 FlushFileBuffers((HANDLE)_get_osfhandle(connection->fd));
227
228 if (read_packetized_to_strbuf(
229 connection->fd, answer,
230 PACKET_READ_GENTLE_ON_EOF | PACKET_READ_GENTLE_ON_READ_ERROR) < 0) {
231 ret = error(_("could not read IPC response"));
232 goto done;
233 }
234
235 done:
236 trace2_region_leave("ipc-client", "send-command", NULL);
237 return ret;
238 }
239
240 int ipc_client_send_command(const char *path,
241 const struct ipc_client_connect_options *options,
242 const char *message, struct strbuf *response)
243 {
244 int ret = -1;
245 enum ipc_active_state state;
246 struct ipc_client_connection *connection = NULL;
247
248 state = ipc_client_try_connect(path, options, &connection);
249
250 if (state != IPC_STATE__LISTENING)
251 return ret;
252
253 ret = ipc_client_send_command_to_connection(connection, message, response);
254
255 ipc_client_close_connection(connection);
256
257 return ret;
258 }
259
260 /*
261 * Duplicate the given pipe handle and wrap it in a file descriptor so
262 * that we can use pkt-line on it.
263 */
264 static int dup_fd_from_pipe(const HANDLE pipe)
265 {
266 HANDLE process = GetCurrentProcess();
267 HANDLE handle;
268 int fd;
269
270 if (!DuplicateHandle(process, pipe, process, &handle, 0, FALSE,
271 DUPLICATE_SAME_ACCESS)) {
272 errno = err_win_to_posix(GetLastError());
273 return -1;
274 }
275
276 fd = _open_osfhandle((intptr_t)handle, O_RDWR|O_BINARY);
277 if (fd < 0) {
278 errno = err_win_to_posix(GetLastError());
279 CloseHandle(handle);
280 return -1;
281 }
282
283 /*
284 * `handle` is now owned by `fd` and will be automatically closed
285 * when the descriptor is closed.
286 */
287
288 return fd;
289 }
290
291 /*
292 * Magic numbers used to annotate callback instance data.
293 * These are used to help guard against accidentally passing the
294 * wrong instance data across multiple levels of callbacks (which
295 * is easy to do if there are `void*` arguments).
296 */
297 enum magic {
298 MAGIC_SERVER_REPLY_DATA,
299 MAGIC_SERVER_THREAD_DATA,
300 MAGIC_SERVER_DATA,
301 };
302
303 struct ipc_server_reply_data {
304 enum magic magic;
305 int fd;
306 struct ipc_server_thread_data *server_thread_data;
307 };
308
309 struct ipc_server_thread_data {
310 enum magic magic;
311 struct ipc_server_thread_data *next_thread;
312 struct ipc_server_data *server_data;
313 pthread_t pthread_id;
314 HANDLE hPipe;
315 };
316
317 /*
318 * On Windows, the conceptual "ipc-server" is implemented as a pool of
319 * n idential/peer "server-thread" threads. That is, there is no
320 * hierarchy of threads; and therefore no controller thread managing
321 * the pool. Each thread has an independent handle to the named pipe,
322 * receives incoming connections, processes the client, and re-uses
323 * the pipe for the next client connection.
324 *
325 * Therefore, the "ipc-server" only needs to maintain a list of the
326 * spawned threads for eventual "join" purposes.
327 *
328 * A single "stop-event" is visible to all of the server threads to
329 * tell them to shutdown (when idle).
330 */
331 struct ipc_server_data {
332 enum magic magic;
333 ipc_server_application_cb *application_cb;
334 void *application_data;
335 struct strbuf buf_path;
336 wchar_t wpath[MAX_PATH];
337
338 HANDLE hEventStopRequested;
339 struct ipc_server_thread_data *thread_list;
340 int is_stopped;
341 };
342
343 enum connect_result {
344 CR_CONNECTED = 0,
345 CR_CONNECT_PENDING,
346 CR_CONNECT_ERROR,
347 CR_WAIT_ERROR,
348 CR_SHUTDOWN,
349 };
350
351 static enum connect_result queue_overlapped_connect(
352 struct ipc_server_thread_data *server_thread_data,
353 OVERLAPPED *lpo)
354 {
355 if (ConnectNamedPipe(server_thread_data->hPipe, lpo))
356 goto failed;
357
358 switch (GetLastError()) {
359 case ERROR_IO_PENDING:
360 return CR_CONNECT_PENDING;
361
362 case ERROR_PIPE_CONNECTED:
363 SetEvent(lpo->hEvent);
364 return CR_CONNECTED;
365
366 default:
367 break;
368 }
369
370 failed:
371 error(_("ConnectNamedPipe failed for '%s' (%lu)"),
372 server_thread_data->server_data->buf_path.buf,
373 GetLastError());
374 return CR_CONNECT_ERROR;
375 }
376
377 /*
378 * Use Windows Overlapped IO to wait for a connection or for our event
379 * to be signalled.
380 */
381 static enum connect_result wait_for_connection(
382 struct ipc_server_thread_data *server_thread_data,
383 OVERLAPPED *lpo)
384 {
385 enum connect_result r;
386 HANDLE waitHandles[2];
387 DWORD dwWaitResult;
388
389 r = queue_overlapped_connect(server_thread_data, lpo);
390 if (r != CR_CONNECT_PENDING)
391 return r;
392
393 waitHandles[0] = server_thread_data->server_data->hEventStopRequested;
394 waitHandles[1] = lpo->hEvent;
395
396 dwWaitResult = WaitForMultipleObjects(2, waitHandles, FALSE, INFINITE);
397 switch (dwWaitResult) {
398 case WAIT_OBJECT_0 + 0:
399 return CR_SHUTDOWN;
400
401 case WAIT_OBJECT_0 + 1:
402 ResetEvent(lpo->hEvent);
403 return CR_CONNECTED;
404
405 default:
406 return CR_WAIT_ERROR;
407 }
408 }
409
410 /*
411 * Forward declare our reply callback function so that any compiler
412 * errors are reported when we actually define the function (in addition
413 * to any errors reported when we try to pass this callback function as
414 * a parameter in a function call). The former are easier to understand.
415 */
416 static ipc_server_reply_cb do_io_reply_callback;
417
418 /*
419 * Relay application's response message to the client process.
420 * (We do not flush at this point because we allow the caller
421 * to chunk data to the client thru us.)
422 */
423 static int do_io_reply_callback(struct ipc_server_reply_data *reply_data,
424 const char *response, size_t response_len)
425 {
426 if (reply_data->magic != MAGIC_SERVER_REPLY_DATA)
427 BUG("reply_cb called with wrong instance data");
428
429 return write_packetized_from_buf_no_flush(response, response_len,
430 reply_data->fd);
431 }
432
433 /*
434 * Receive the request/command from the client and pass it to the
435 * registered request-callback. The request-callback will compose
436 * a response and call our reply-callback to send it to the client.
437 *
438 * Simple-IPC only contains one round trip, so we flush and close
439 * here after the response.
440 */
441 static int do_io(struct ipc_server_thread_data *server_thread_data)
442 {
443 struct strbuf buf = STRBUF_INIT;
444 struct ipc_server_reply_data reply_data;
445 int ret = 0;
446
447 reply_data.magic = MAGIC_SERVER_REPLY_DATA;
448 reply_data.server_thread_data = server_thread_data;
449
450 reply_data.fd = dup_fd_from_pipe(server_thread_data->hPipe);
451 if (reply_data.fd < 0)
452 return error(_("could not create fd from pipe for '%s'"),
453 server_thread_data->server_data->buf_path.buf);
454
455 ret = read_packetized_to_strbuf(
456 reply_data.fd, &buf,
457 PACKET_READ_GENTLE_ON_EOF | PACKET_READ_GENTLE_ON_READ_ERROR);
458 if (ret >= 0) {
459 ret = server_thread_data->server_data->application_cb(
460 server_thread_data->server_data->application_data,
461 buf.buf, do_io_reply_callback, &reply_data);
462
463 packet_flush_gently(reply_data.fd);
464
465 FlushFileBuffers((HANDLE)_get_osfhandle((reply_data.fd)));
466 }
467 else {
468 /*
469 * The client probably disconnected/shutdown before it
470 * could send a well-formed message. Ignore it.
471 */
472 }
473
474 strbuf_release(&buf);
475 close(reply_data.fd);
476
477 return ret;
478 }
479
480 /*
481 * Handle IPC request and response with this connected client. And reset
482 * the pipe to prepare for the next client.
483 */
484 static int use_connection(struct ipc_server_thread_data *server_thread_data)
485 {
486 int ret;
487
488 ret = do_io(server_thread_data);
489
490 FlushFileBuffers(server_thread_data->hPipe);
491 DisconnectNamedPipe(server_thread_data->hPipe);
492
493 return ret;
494 }
495
496 /*
497 * Thread proc for an IPC server worker thread. It handles a series of
498 * connections from clients. It cleans and reuses the hPipe between each
499 * client.
500 */
501 static void *server_thread_proc(void *_server_thread_data)
502 {
503 struct ipc_server_thread_data *server_thread_data = _server_thread_data;
504 HANDLE hEventConnected = INVALID_HANDLE_VALUE;
505 OVERLAPPED oConnect;
506 enum connect_result cr;
507 int ret;
508
509 assert(server_thread_data->hPipe != INVALID_HANDLE_VALUE);
510
511 trace2_thread_start("ipc-server");
512 trace2_data_string("ipc-server", NULL, "pipe",
513 server_thread_data->server_data->buf_path.buf);
514
515 hEventConnected = CreateEventW(NULL, TRUE, FALSE, NULL);
516
517 memset(&oConnect, 0, sizeof(oConnect));
518 oConnect.hEvent = hEventConnected;
519
520 for (;;) {
521 cr = wait_for_connection(server_thread_data, &oConnect);
522
523 switch (cr) {
524 case CR_SHUTDOWN:
525 goto finished;
526
527 case CR_CONNECTED:
528 ret = use_connection(server_thread_data);
529 if (ret == SIMPLE_IPC_QUIT) {
530 ipc_server_stop_async(
531 server_thread_data->server_data);
532 goto finished;
533 }
534 if (ret > 0) {
535 /*
536 * Ignore (transient) IO errors with this
537 * client and reset for the next client.
538 */
539 }
540 break;
541
542 case CR_CONNECT_PENDING:
543 /* By construction, this should not happen. */
544 BUG("ipc-server[%s]: unexpeced CR_CONNECT_PENDING",
545 server_thread_data->server_data->buf_path.buf);
546
547 case CR_CONNECT_ERROR:
548 case CR_WAIT_ERROR:
549 /*
550 * Ignore these theoretical errors.
551 */
552 DisconnectNamedPipe(server_thread_data->hPipe);
553 break;
554
555 default:
556 BUG("unandled case after wait_for_connection");
557 }
558 }
559
560 finished:
561 CloseHandle(server_thread_data->hPipe);
562 CloseHandle(hEventConnected);
563
564 trace2_thread_exit();
565 return NULL;
566 }
567
568 static HANDLE create_new_pipe(wchar_t *wpath, int is_first)
569 {
570 HANDLE hPipe;
571 DWORD dwOpenMode, dwPipeMode;
572 LPSECURITY_ATTRIBUTES lpsa = NULL;
573
574 dwOpenMode = PIPE_ACCESS_INBOUND | PIPE_ACCESS_OUTBOUND |
575 FILE_FLAG_OVERLAPPED;
576
577 dwPipeMode = PIPE_TYPE_MESSAGE | PIPE_READMODE_BYTE | PIPE_WAIT |
578 PIPE_REJECT_REMOTE_CLIENTS;
579
580 if (is_first) {
581 dwOpenMode |= FILE_FLAG_FIRST_PIPE_INSTANCE;
582
583 /*
584 * On Windows, the first server pipe instance gets to
585 * set the ACL / Security Attributes on the named
586 * pipe; subsequent instances inherit and cannot
587 * change them.
588 *
589 * TODO Should we allow the application layer to
590 * specify security attributes, such as `LocalService`
591 * or `LocalSystem`, when we create the named pipe?
592 * This question is probably not important when the
593 * daemon is started by a foreground user process and
594 * only needs to talk to the current user, but may be
595 * if the daemon is run via the Control Panel as a
596 * System Service.
597 */
598 }
599
600 hPipe = CreateNamedPipeW(wpath, dwOpenMode, dwPipeMode,
601 PIPE_UNLIMITED_INSTANCES, 1024, 1024, 0, lpsa);
602
603 return hPipe;
604 }
605
606 int ipc_server_run_async(struct ipc_server_data **returned_server_data,
607 const char *path, const struct ipc_server_opts *opts,
608 ipc_server_application_cb *application_cb,
609 void *application_data)
610 {
611 struct ipc_server_data *server_data;
612 wchar_t wpath[MAX_PATH];
613 HANDLE hPipeFirst = INVALID_HANDLE_VALUE;
614 int k;
615 int ret = 0;
616 int nr_threads = opts->nr_threads;
617
618 *returned_server_data = NULL;
619
620 ret = initialize_pipe_name(path, wpath, ARRAY_SIZE(wpath));
621 if (ret < 0) {
622 errno = EINVAL;
623 return -1;
624 }
625
626 hPipeFirst = create_new_pipe(wpath, 1);
627 if (hPipeFirst == INVALID_HANDLE_VALUE) {
628 errno = EADDRINUSE;
629 return -2;
630 }
631
632 server_data = xcalloc(1, sizeof(*server_data));
633 server_data->magic = MAGIC_SERVER_DATA;
634 server_data->application_cb = application_cb;
635 server_data->application_data = application_data;
636 server_data->hEventStopRequested = CreateEvent(NULL, TRUE, FALSE, NULL);
637 strbuf_init(&server_data->buf_path, 0);
638 strbuf_addstr(&server_data->buf_path, path);
639 wcscpy(server_data->wpath, wpath);
640
641 if (nr_threads < 1)
642 nr_threads = 1;
643
644 for (k = 0; k < nr_threads; k++) {
645 struct ipc_server_thread_data *std;
646
647 std = xcalloc(1, sizeof(*std));
648 std->magic = MAGIC_SERVER_THREAD_DATA;
649 std->server_data = server_data;
650 std->hPipe = INVALID_HANDLE_VALUE;
651
652 std->hPipe = (k == 0)
653 ? hPipeFirst
654 : create_new_pipe(server_data->wpath, 0);
655
656 if (std->hPipe == INVALID_HANDLE_VALUE) {
657 /*
658 * If we've reached a pipe instance limit for
659 * this path, just use fewer threads.
660 */
661 free(std);
662 break;
663 }
664
665 if (pthread_create(&std->pthread_id, NULL,
666 server_thread_proc, std)) {
667 /*
668 * Likewise, if we're out of threads, just use
669 * fewer threads than requested.
670 *
671 * However, we just give up if we can't even get
672 * one thread. This should not happen.
673 */
674 if (k == 0)
675 die(_("could not start thread[0] for '%s'"),
676 path);
677
678 CloseHandle(std->hPipe);
679 free(std);
680 break;
681 }
682
683 std->next_thread = server_data->thread_list;
684 server_data->thread_list = std;
685 }
686
687 *returned_server_data = server_data;
688 return 0;
689 }
690
691 int ipc_server_stop_async(struct ipc_server_data *server_data)
692 {
693 if (!server_data)
694 return 0;
695
696 /*
697 * Gently tell all of the ipc_server threads to shutdown.
698 * This will be seen the next time they are idle (and waiting
699 * for a connection).
700 *
701 * We DO NOT attempt to force them to drop an active connection.
702 */
703 SetEvent(server_data->hEventStopRequested);
704 return 0;
705 }
706
707 int ipc_server_await(struct ipc_server_data *server_data)
708 {
709 DWORD dwWaitResult;
710
711 if (!server_data)
712 return 0;
713
714 dwWaitResult = WaitForSingleObject(server_data->hEventStopRequested, INFINITE);
715 if (dwWaitResult != WAIT_OBJECT_0)
716 return error(_("wait for hEvent failed for '%s'"),
717 server_data->buf_path.buf);
718
719 while (server_data->thread_list) {
720 struct ipc_server_thread_data *std = server_data->thread_list;
721
722 pthread_join(std->pthread_id, NULL);
723
724 server_data->thread_list = std->next_thread;
725 free(std);
726 }
727
728 server_data->is_stopped = 1;
729
730 return 0;
731 }
732
733 void ipc_server_free(struct ipc_server_data *server_data)
734 {
735 if (!server_data)
736 return;
737
738 if (!server_data->is_stopped)
739 BUG("cannot free ipc-server while running for '%s'",
740 server_data->buf_path.buf);
741
742 strbuf_release(&server_data->buf_path);
743
744 if (server_data->hEventStopRequested != INVALID_HANDLE_VALUE)
745 CloseHandle(server_data->hEventStopRequested);
746
747 while (server_data->thread_list) {
748 struct ipc_server_thread_data *std = server_data->thread_list;
749
750 server_data->thread_list = std->next_thread;
751 free(std);
752 }
753
754 free(server_data);
755 }