]> git.ipfire.org Git - thirdparty/git.git/blob - t/helper/test-simple-ipc.c
submodule-config.c: strengthen URL fsck check
[thirdparty/git.git] / t / helper / test-simple-ipc.c
1 /*
2 * test-simple-ipc.c: verify that the Inter-Process Communication works.
3 */
4
5 #include "test-tool.h"
6 #include "gettext.h"
7 #include "strbuf.h"
8 #include "simple-ipc.h"
9 #include "parse-options.h"
10 #include "thread-utils.h"
11 #include "strvec.h"
12 #include "run-command.h"
13 #include "trace2.h"
14
15 #ifndef SUPPORTS_SIMPLE_IPC
16 int cmd__simple_ipc(int argc, const char **argv)
17 {
18 die("simple IPC not available on this platform");
19 }
20 #else
21
22 /*
23 * The test daemon defines an "application callback" that supports a
24 * series of commands (see `test_app_cb()`).
25 *
26 * Unknown commands are caught here and we send an error message back
27 * to the client process.
28 */
29 static int app__unhandled_command(const char *command,
30 ipc_server_reply_cb *reply_cb,
31 struct ipc_server_reply_data *reply_data)
32 {
33 struct strbuf buf = STRBUF_INIT;
34 int ret;
35
36 strbuf_addf(&buf, "unhandled command: %s", command);
37 ret = reply_cb(reply_data, buf.buf, buf.len);
38 strbuf_release(&buf);
39
40 return ret;
41 }
42
43 /*
44 * Reply with a single very large buffer. This is to ensure that
45 * long response are properly handled -- whether the chunking occurs
46 * in the kernel or in the (probably pkt-line) layer.
47 */
48 #define BIG_ROWS (10000)
49 static int app__big_command(ipc_server_reply_cb *reply_cb,
50 struct ipc_server_reply_data *reply_data)
51 {
52 struct strbuf buf = STRBUF_INIT;
53 int row;
54 int ret;
55
56 for (row = 0; row < BIG_ROWS; row++)
57 strbuf_addf(&buf, "big: %.75d\n", row);
58
59 ret = reply_cb(reply_data, buf.buf, buf.len);
60 strbuf_release(&buf);
61
62 return ret;
63 }
64
65 /*
66 * Reply with a series of lines. This is to ensure that we can incrementally
67 * compute the response and chunk it to the client.
68 */
69 #define CHUNK_ROWS (10000)
70 static int app__chunk_command(ipc_server_reply_cb *reply_cb,
71 struct ipc_server_reply_data *reply_data)
72 {
73 struct strbuf buf = STRBUF_INIT;
74 int row;
75 int ret;
76
77 for (row = 0; row < CHUNK_ROWS; row++) {
78 strbuf_setlen(&buf, 0);
79 strbuf_addf(&buf, "big: %.75d\n", row);
80 ret = reply_cb(reply_data, buf.buf, buf.len);
81 }
82
83 strbuf_release(&buf);
84
85 return ret;
86 }
87
88 /*
89 * Slowly reply with a series of lines. This is to model an expensive to
90 * compute chunked response (which might happen if this callback is running
91 * in a thread and is fighting for a lock with other threads).
92 */
93 #define SLOW_ROWS (1000)
94 #define SLOW_DELAY_MS (10)
95 static int app__slow_command(ipc_server_reply_cb *reply_cb,
96 struct ipc_server_reply_data *reply_data)
97 {
98 struct strbuf buf = STRBUF_INIT;
99 int row;
100 int ret;
101
102 for (row = 0; row < SLOW_ROWS; row++) {
103 strbuf_setlen(&buf, 0);
104 strbuf_addf(&buf, "big: %.75d\n", row);
105 ret = reply_cb(reply_data, buf.buf, buf.len);
106 sleep_millisec(SLOW_DELAY_MS);
107 }
108
109 strbuf_release(&buf);
110
111 return ret;
112 }
113
114 /*
115 * The client sent a command followed by a (possibly very) large buffer.
116 */
117 static int app__sendbytes_command(const char *received, size_t received_len,
118 ipc_server_reply_cb *reply_cb,
119 struct ipc_server_reply_data *reply_data)
120 {
121 struct strbuf buf_resp = STRBUF_INIT;
122 const char *p = "?";
123 int len_ballast = 0;
124 int k;
125 int errs = 0;
126 int ret;
127
128 /*
129 * The test is setup to send:
130 * "sendbytes" SP <n * char>
131 */
132 if (received_len < strlen("sendbytes "))
133 BUG("received_len is short in app__sendbytes_command");
134
135 if (skip_prefix(received, "sendbytes ", &p))
136 len_ballast = strlen(p);
137
138 /*
139 * Verify that the ballast is n copies of a single letter.
140 * And that the multi-threaded IO layer didn't cross the streams.
141 */
142 for (k = 1; k < len_ballast; k++)
143 if (p[k] != p[0])
144 errs++;
145
146 if (errs)
147 strbuf_addf(&buf_resp, "errs:%d\n", errs);
148 else
149 strbuf_addf(&buf_resp, "rcvd:%c%08d\n", p[0], len_ballast);
150
151 ret = reply_cb(reply_data, buf_resp.buf, buf_resp.len);
152
153 strbuf_release(&buf_resp);
154
155 return ret;
156 }
157
158 /*
159 * An arbitrary fixed address to verify that the application instance
160 * data is handled properly.
161 */
162 static int my_app_data = 42;
163
164 static ipc_server_application_cb test_app_cb;
165
166 /*
167 * This is the "application callback" that sits on top of the
168 * "ipc-server". It completely defines the set of commands supported
169 * by this application.
170 */
171 static int test_app_cb(void *application_data,
172 const char *command, size_t command_len,
173 ipc_server_reply_cb *reply_cb,
174 struct ipc_server_reply_data *reply_data)
175 {
176 /*
177 * Verify that we received the application-data that we passed
178 * when we started the ipc-server. (We have several layers of
179 * callbacks calling callbacks and it's easy to get things mixed
180 * up (especially when some are "void*").)
181 */
182 if (application_data != (void*)&my_app_data)
183 BUG("application_cb: application_data pointer wrong");
184
185 if (command_len == 4 && !strncmp(command, "quit", 4)) {
186 /*
187 * The client sent a "quit" command. This is an async
188 * request for the server to shutdown.
189 *
190 * We DO NOT send the client a response message
191 * (because we have nothing to say and the other
192 * server threads have not yet stopped).
193 *
194 * Tell the ipc-server layer to start shutting down.
195 * This includes: stop listening for new connections
196 * on the socket/pipe and telling all worker threads
197 * to finish/drain their outgoing responses to other
198 * clients.
199 *
200 * This DOES NOT force an immediate sync shutdown.
201 */
202 return SIMPLE_IPC_QUIT;
203 }
204
205 if (command_len == 4 && !strncmp(command, "ping", 4)) {
206 const char *answer = "pong";
207 return reply_cb(reply_data, answer, strlen(answer));
208 }
209
210 if (command_len == 3 && !strncmp(command, "big", 3))
211 return app__big_command(reply_cb, reply_data);
212
213 if (command_len == 5 && !strncmp(command, "chunk", 5))
214 return app__chunk_command(reply_cb, reply_data);
215
216 if (command_len == 4 && !strncmp(command, "slow", 4))
217 return app__slow_command(reply_cb, reply_data);
218
219 if (command_len >= 10 && starts_with(command, "sendbytes "))
220 return app__sendbytes_command(command, command_len,
221 reply_cb, reply_data);
222
223 return app__unhandled_command(command, reply_cb, reply_data);
224 }
225
226 struct cl_args
227 {
228 const char *subcommand;
229 const char *path;
230 const char *token;
231
232 int nr_threads;
233 int max_wait_sec;
234 int bytecount;
235 int batchsize;
236
237 char bytevalue;
238 };
239
240 static struct cl_args cl_args = {
241 .subcommand = NULL,
242 .path = "ipc-test",
243 .token = NULL,
244
245 .nr_threads = 5,
246 .max_wait_sec = 60,
247 .bytecount = 1024,
248 .batchsize = 10,
249
250 .bytevalue = 'x',
251 };
252
253 /*
254 * This process will run as a simple-ipc server and listen for IPC commands
255 * from client processes.
256 */
257 static int daemon__run_server(void)
258 {
259 int ret;
260
261 struct ipc_server_opts opts = {
262 .nr_threads = cl_args.nr_threads,
263 };
264
265 /*
266 * Synchronously run the ipc-server. We don't need any application
267 * instance data, so pass an arbitrary pointer (that we'll later
268 * verify made the round trip).
269 */
270 ret = ipc_server_run(cl_args.path, &opts, test_app_cb, (void*)&my_app_data);
271 if (ret == -2)
272 error("socket/pipe already in use: '%s'", cl_args.path);
273 else if (ret == -1)
274 error_errno("could not start server on: '%s'", cl_args.path);
275
276 return ret;
277 }
278
279 static start_bg_wait_cb bg_wait_cb;
280
281 static int bg_wait_cb(const struct child_process *cp UNUSED,
282 void *cb_data UNUSED)
283 {
284 int s = ipc_get_active_state(cl_args.path);
285
286 switch (s) {
287 case IPC_STATE__LISTENING:
288 /* child is "ready" */
289 return 0;
290
291 case IPC_STATE__NOT_LISTENING:
292 case IPC_STATE__PATH_NOT_FOUND:
293 /* give child more time */
294 return 1;
295
296 default:
297 case IPC_STATE__INVALID_PATH:
298 case IPC_STATE__OTHER_ERROR:
299 /* all the time in world won't help */
300 return -1;
301 }
302 }
303
304 static int daemon__start_server(void)
305 {
306 struct child_process cp = CHILD_PROCESS_INIT;
307 enum start_bg_result sbgr;
308
309 strvec_push(&cp.args, "test-tool");
310 strvec_push(&cp.args, "simple-ipc");
311 strvec_push(&cp.args, "run-daemon");
312 strvec_pushf(&cp.args, "--name=%s", cl_args.path);
313 strvec_pushf(&cp.args, "--threads=%d", cl_args.nr_threads);
314
315 cp.no_stdin = 1;
316 cp.no_stdout = 1;
317 cp.no_stderr = 1;
318
319 sbgr = start_bg_command(&cp, bg_wait_cb, NULL, cl_args.max_wait_sec);
320
321 switch (sbgr) {
322 case SBGR_READY:
323 return 0;
324
325 default:
326 case SBGR_ERROR:
327 case SBGR_CB_ERROR:
328 return error("daemon failed to start");
329
330 case SBGR_TIMEOUT:
331 return error("daemon not online yet");
332
333 case SBGR_DIED:
334 return error("daemon terminated");
335 }
336 }
337
338 /*
339 * This process will run a quick probe to see if a simple-ipc server
340 * is active on this path.
341 *
342 * Returns 0 if the server is alive.
343 */
344 static int client__probe_server(void)
345 {
346 enum ipc_active_state s;
347
348 s = ipc_get_active_state(cl_args.path);
349 switch (s) {
350 case IPC_STATE__LISTENING:
351 return 0;
352
353 case IPC_STATE__NOT_LISTENING:
354 return error("no server listening at '%s'", cl_args.path);
355
356 case IPC_STATE__PATH_NOT_FOUND:
357 return error("path not found '%s'", cl_args.path);
358
359 case IPC_STATE__INVALID_PATH:
360 return error("invalid pipe/socket name '%s'", cl_args.path);
361
362 case IPC_STATE__OTHER_ERROR:
363 default:
364 return error("other error for '%s'", cl_args.path);
365 }
366 }
367
368 /*
369 * Send an IPC command token to an already-running server daemon and
370 * print the response.
371 *
372 * This is a simple 1 word command/token that `test_app_cb()` (in the
373 * daemon process) will understand.
374 */
375 static int client__send_ipc(void)
376 {
377 const char *command = "(no-command)";
378 struct strbuf buf = STRBUF_INIT;
379 struct ipc_client_connect_options options
380 = IPC_CLIENT_CONNECT_OPTIONS_INIT;
381
382 if (cl_args.token && *cl_args.token)
383 command = cl_args.token;
384
385 options.wait_if_busy = 1;
386 options.wait_if_not_found = 0;
387
388 if (!ipc_client_send_command(cl_args.path, &options,
389 command, strlen(command),
390 &buf)) {
391 if (buf.len) {
392 printf("%s\n", buf.buf);
393 fflush(stdout);
394 }
395 strbuf_release(&buf);
396
397 return 0;
398 }
399
400 return error("failed to send '%s' to '%s'", command, cl_args.path);
401 }
402
403 /*
404 * Send an IPC command to an already-running server and ask it to
405 * shutdown. "send quit" is an async request and queues a shutdown
406 * event in the server, so we spin and wait here for it to actually
407 * shutdown to make the unit tests a little easier to write.
408 */
409 static int client__stop_server(void)
410 {
411 int ret;
412 time_t time_limit, now;
413 enum ipc_active_state s;
414
415 time(&time_limit);
416 time_limit += cl_args.max_wait_sec;
417
418 cl_args.token = "quit";
419
420 ret = client__send_ipc();
421 if (ret)
422 return ret;
423
424 for (;;) {
425 sleep_millisec(100);
426
427 s = ipc_get_active_state(cl_args.path);
428
429 if (s != IPC_STATE__LISTENING) {
430 /*
431 * The socket/pipe is gone and/or has stopped
432 * responding. Lets assume that the daemon
433 * process has exited too.
434 */
435 return 0;
436 }
437
438 time(&now);
439 if (now > time_limit)
440 return error("daemon has not shutdown yet");
441 }
442 }
443
444 /*
445 * Send an IPC command followed by ballast to confirm that a large
446 * message can be sent and that the kernel or pkt-line layers will
447 * properly chunk it and that the daemon receives the entire message.
448 */
449 static int do_sendbytes(int bytecount, char byte, const char *path,
450 const struct ipc_client_connect_options *options)
451 {
452 struct strbuf buf_send = STRBUF_INIT;
453 struct strbuf buf_resp = STRBUF_INIT;
454
455 strbuf_addstr(&buf_send, "sendbytes ");
456 strbuf_addchars(&buf_send, byte, bytecount);
457
458 if (!ipc_client_send_command(path, options,
459 buf_send.buf, buf_send.len,
460 &buf_resp)) {
461 strbuf_rtrim(&buf_resp);
462 printf("sent:%c%08d %s\n", byte, bytecount, buf_resp.buf);
463 fflush(stdout);
464 strbuf_release(&buf_send);
465 strbuf_release(&buf_resp);
466
467 return 0;
468 }
469
470 return error("client failed to sendbytes(%d, '%c') to '%s'",
471 bytecount, byte, path);
472 }
473
474 /*
475 * Send an IPC command with ballast to an already-running server daemon.
476 */
477 static int client__sendbytes(void)
478 {
479 struct ipc_client_connect_options options
480 = IPC_CLIENT_CONNECT_OPTIONS_INIT;
481
482 options.wait_if_busy = 1;
483 options.wait_if_not_found = 0;
484 options.uds_disallow_chdir = 0;
485
486 return do_sendbytes(cl_args.bytecount, cl_args.bytevalue, cl_args.path,
487 &options);
488 }
489
490 struct multiple_thread_data {
491 pthread_t pthread_id;
492 struct multiple_thread_data *next;
493 const char *path;
494 int bytecount;
495 int batchsize;
496 int sum_errors;
497 int sum_good;
498 char letter;
499 };
500
501 static void *multiple_thread_proc(void *_multiple_thread_data)
502 {
503 struct multiple_thread_data *d = _multiple_thread_data;
504 int k;
505 struct ipc_client_connect_options options
506 = IPC_CLIENT_CONNECT_OPTIONS_INIT;
507
508 options.wait_if_busy = 1;
509 options.wait_if_not_found = 0;
510 /*
511 * A multi-threaded client should not be randomly calling chdir().
512 * The test will pass without this restriction because the test is
513 * not otherwise accessing the filesystem, but it makes us honest.
514 */
515 options.uds_disallow_chdir = 1;
516
517 trace2_thread_start("multiple");
518
519 for (k = 0; k < d->batchsize; k++) {
520 if (do_sendbytes(d->bytecount + k, d->letter, d->path, &options))
521 d->sum_errors++;
522 else
523 d->sum_good++;
524 }
525
526 trace2_thread_exit();
527 return NULL;
528 }
529
530 /*
531 * Start a client-side thread pool. Each thread sends a series of
532 * IPC requests. Each request is on a new connection to the server.
533 */
534 static int client__multiple(void)
535 {
536 struct multiple_thread_data *list = NULL;
537 int k;
538 int sum_join_errors = 0;
539 int sum_thread_errors = 0;
540 int sum_good = 0;
541
542 for (k = 0; k < cl_args.nr_threads; k++) {
543 struct multiple_thread_data *d = xcalloc(1, sizeof(*d));
544 d->next = list;
545 d->path = cl_args.path;
546 d->bytecount = cl_args.bytecount + cl_args.batchsize*(k/26);
547 d->batchsize = cl_args.batchsize;
548 d->sum_errors = 0;
549 d->sum_good = 0;
550 d->letter = 'A' + (k % 26);
551
552 if (pthread_create(&d->pthread_id, NULL, multiple_thread_proc, d)) {
553 warning("failed to create thread[%d] skipping remainder", k);
554 free(d);
555 break;
556 }
557
558 list = d;
559 }
560
561 while (list) {
562 struct multiple_thread_data *d = list;
563
564 if (pthread_join(d->pthread_id, NULL))
565 sum_join_errors++;
566
567 sum_thread_errors += d->sum_errors;
568 sum_good += d->sum_good;
569
570 list = d->next;
571 free(d);
572 }
573
574 printf("client (good %d) (join %d), (errors %d)\n",
575 sum_good, sum_join_errors, sum_thread_errors);
576
577 return (sum_join_errors + sum_thread_errors) ? 1 : 0;
578 }
579
580 int cmd__simple_ipc(int argc, const char **argv)
581 {
582 const char * const simple_ipc_usage[] = {
583 N_("test-helper simple-ipc is-active [<name>] [<options>]"),
584 N_("test-helper simple-ipc run-daemon [<name>] [<threads>]"),
585 N_("test-helper simple-ipc start-daemon [<name>] [<threads>] [<max-wait>]"),
586 N_("test-helper simple-ipc stop-daemon [<name>] [<max-wait>]"),
587 N_("test-helper simple-ipc send [<name>] [<token>]"),
588 N_("test-helper simple-ipc sendbytes [<name>] [<bytecount>] [<byte>]"),
589 N_("test-helper simple-ipc multiple [<name>] [<threads>] [<bytecount>] [<batchsize>]"),
590 NULL
591 };
592
593 const char *bytevalue = NULL;
594
595 struct option options[] = {
596 #ifndef GIT_WINDOWS_NATIVE
597 OPT_STRING(0, "name", &cl_args.path, N_("name"), N_("name or pathname of unix domain socket")),
598 #else
599 OPT_STRING(0, "name", &cl_args.path, N_("name"), N_("named-pipe name")),
600 #endif
601 OPT_INTEGER(0, "threads", &cl_args.nr_threads, N_("number of threads in server thread pool")),
602 OPT_INTEGER(0, "max-wait", &cl_args.max_wait_sec, N_("seconds to wait for daemon to start or stop")),
603
604 OPT_INTEGER(0, "bytecount", &cl_args.bytecount, N_("number of bytes")),
605 OPT_INTEGER(0, "batchsize", &cl_args.batchsize, N_("number of requests per thread")),
606
607 OPT_STRING(0, "byte", &bytevalue, N_("byte"), N_("ballast character")),
608 OPT_STRING(0, "token", &cl_args.token, N_("token"), N_("command token to send to the server")),
609
610 OPT_END()
611 };
612
613 if (argc < 2)
614 usage_with_options(simple_ipc_usage, options);
615
616 if (argc == 2 && !strcmp(argv[1], "-h"))
617 usage_with_options(simple_ipc_usage, options);
618
619 if (argc == 2 && !strcmp(argv[1], "SUPPORTS_SIMPLE_IPC"))
620 return 0;
621
622 cl_args.subcommand = argv[1];
623
624 argc--;
625 argv++;
626
627 argc = parse_options(argc, argv, NULL, options, simple_ipc_usage, 0);
628
629 if (cl_args.nr_threads < 1)
630 cl_args.nr_threads = 1;
631 if (cl_args.max_wait_sec < 0)
632 cl_args.max_wait_sec = 0;
633 if (cl_args.bytecount < 1)
634 cl_args.bytecount = 1;
635 if (cl_args.batchsize < 1)
636 cl_args.batchsize = 1;
637
638 if (bytevalue && *bytevalue)
639 cl_args.bytevalue = bytevalue[0];
640
641 /*
642 * Use '!!' on all dispatch functions to map from `error()` style
643 * (returns -1) style to `test_must_fail` style (expects 1). This
644 * makes shell error messages less confusing.
645 */
646
647 if (!strcmp(cl_args.subcommand, "is-active"))
648 return !!client__probe_server();
649
650 if (!strcmp(cl_args.subcommand, "run-daemon"))
651 return !!daemon__run_server();
652
653 if (!strcmp(cl_args.subcommand, "start-daemon"))
654 return !!daemon__start_server();
655
656 /*
657 * Client commands follow. Ensure a server is running before
658 * sending any data. This might be overkill, but then again
659 * this is a test harness.
660 */
661
662 if (!strcmp(cl_args.subcommand, "stop-daemon")) {
663 if (client__probe_server())
664 return 1;
665 return !!client__stop_server();
666 }
667
668 if (!strcmp(cl_args.subcommand, "send")) {
669 if (client__probe_server())
670 return 1;
671 return !!client__send_ipc();
672 }
673
674 if (!strcmp(cl_args.subcommand, "sendbytes")) {
675 if (client__probe_server())
676 return 1;
677 return !!client__sendbytes();
678 }
679
680 if (!strcmp(cl_args.subcommand, "multiple")) {
681 if (client__probe_server())
682 return 1;
683 return !!client__multiple();
684 }
685
686 die("Unhandled subcommand: '%s'", cl_args.subcommand);
687 }
688 #endif