]> git.ipfire.org Git - thirdparty/git.git/blob - t/helper/test-simple-ipc.c
git-config: fix misworded --type=path explanation
[thirdparty/git.git] / t / helper / test-simple-ipc.c
1 /*
2 * test-simple-ipc.c: verify that the Inter-Process Communication works.
3 */
4
5 #include "test-tool.h"
6 #include "cache.h"
7 #include "strbuf.h"
8 #include "simple-ipc.h"
9 #include "parse-options.h"
10 #include "thread-utils.h"
11 #include "strvec.h"
12 #include "run-command.h"
13
14 #ifndef SUPPORTS_SIMPLE_IPC
15 int cmd__simple_ipc(int argc, const char **argv)
16 {
17 die("simple IPC not available on this platform");
18 }
19 #else
20
21 /*
22 * The test daemon defines an "application callback" that supports a
23 * series of commands (see `test_app_cb()`).
24 *
25 * Unknown commands are caught here and we send an error message back
26 * to the client process.
27 */
28 static int app__unhandled_command(const char *command,
29 ipc_server_reply_cb *reply_cb,
30 struct ipc_server_reply_data *reply_data)
31 {
32 struct strbuf buf = STRBUF_INIT;
33 int ret;
34
35 strbuf_addf(&buf, "unhandled command: %s", command);
36 ret = reply_cb(reply_data, buf.buf, buf.len);
37 strbuf_release(&buf);
38
39 return ret;
40 }
41
42 /*
43 * Reply with a single very large buffer. This is to ensure that
44 * long response are properly handled -- whether the chunking occurs
45 * in the kernel or in the (probably pkt-line) layer.
46 */
47 #define BIG_ROWS (10000)
48 static int app__big_command(ipc_server_reply_cb *reply_cb,
49 struct ipc_server_reply_data *reply_data)
50 {
51 struct strbuf buf = STRBUF_INIT;
52 int row;
53 int ret;
54
55 for (row = 0; row < BIG_ROWS; row++)
56 strbuf_addf(&buf, "big: %.75d\n", row);
57
58 ret = reply_cb(reply_data, buf.buf, buf.len);
59 strbuf_release(&buf);
60
61 return ret;
62 }
63
64 /*
65 * Reply with a series of lines. This is to ensure that we can incrementally
66 * compute the response and chunk it to the client.
67 */
68 #define CHUNK_ROWS (10000)
69 static int app__chunk_command(ipc_server_reply_cb *reply_cb,
70 struct ipc_server_reply_data *reply_data)
71 {
72 struct strbuf buf = STRBUF_INIT;
73 int row;
74 int ret;
75
76 for (row = 0; row < CHUNK_ROWS; row++) {
77 strbuf_setlen(&buf, 0);
78 strbuf_addf(&buf, "big: %.75d\n", row);
79 ret = reply_cb(reply_data, buf.buf, buf.len);
80 }
81
82 strbuf_release(&buf);
83
84 return ret;
85 }
86
87 /*
88 * Slowly reply with a series of lines. This is to model an expensive to
89 * compute chunked response (which might happen if this callback is running
90 * in a thread and is fighting for a lock with other threads).
91 */
92 #define SLOW_ROWS (1000)
93 #define SLOW_DELAY_MS (10)
94 static int app__slow_command(ipc_server_reply_cb *reply_cb,
95 struct ipc_server_reply_data *reply_data)
96 {
97 struct strbuf buf = STRBUF_INIT;
98 int row;
99 int ret;
100
101 for (row = 0; row < SLOW_ROWS; row++) {
102 strbuf_setlen(&buf, 0);
103 strbuf_addf(&buf, "big: %.75d\n", row);
104 ret = reply_cb(reply_data, buf.buf, buf.len);
105 sleep_millisec(SLOW_DELAY_MS);
106 }
107
108 strbuf_release(&buf);
109
110 return ret;
111 }
112
113 /*
114 * The client sent a command followed by a (possibly very) large buffer.
115 */
116 static int app__sendbytes_command(const char *received, size_t received_len,
117 ipc_server_reply_cb *reply_cb,
118 struct ipc_server_reply_data *reply_data)
119 {
120 struct strbuf buf_resp = STRBUF_INIT;
121 const char *p = "?";
122 int len_ballast = 0;
123 int k;
124 int errs = 0;
125 int ret;
126
127 /*
128 * The test is setup to send:
129 * "sendbytes" SP <n * char>
130 */
131 if (received_len < strlen("sendbytes "))
132 BUG("received_len is short in app__sendbytes_command");
133
134 if (skip_prefix(received, "sendbytes ", &p))
135 len_ballast = strlen(p);
136
137 /*
138 * Verify that the ballast is n copies of a single letter.
139 * And that the multi-threaded IO layer didn't cross the streams.
140 */
141 for (k = 1; k < len_ballast; k++)
142 if (p[k] != p[0])
143 errs++;
144
145 if (errs)
146 strbuf_addf(&buf_resp, "errs:%d\n", errs);
147 else
148 strbuf_addf(&buf_resp, "rcvd:%c%08d\n", p[0], len_ballast);
149
150 ret = reply_cb(reply_data, buf_resp.buf, buf_resp.len);
151
152 strbuf_release(&buf_resp);
153
154 return ret;
155 }
156
157 /*
158 * An arbitrary fixed address to verify that the application instance
159 * data is handled properly.
160 */
161 static int my_app_data = 42;
162
163 static ipc_server_application_cb test_app_cb;
164
165 /*
166 * This is the "application callback" that sits on top of the
167 * "ipc-server". It completely defines the set of commands supported
168 * by this application.
169 */
170 static int test_app_cb(void *application_data,
171 const char *command, size_t command_len,
172 ipc_server_reply_cb *reply_cb,
173 struct ipc_server_reply_data *reply_data)
174 {
175 /*
176 * Verify that we received the application-data that we passed
177 * when we started the ipc-server. (We have several layers of
178 * callbacks calling callbacks and it's easy to get things mixed
179 * up (especially when some are "void*").)
180 */
181 if (application_data != (void*)&my_app_data)
182 BUG("application_cb: application_data pointer wrong");
183
184 if (command_len == 4 && !strncmp(command, "quit", 4)) {
185 /*
186 * The client sent a "quit" command. This is an async
187 * request for the server to shutdown.
188 *
189 * We DO NOT send the client a response message
190 * (because we have nothing to say and the other
191 * server threads have not yet stopped).
192 *
193 * Tell the ipc-server layer to start shutting down.
194 * This includes: stop listening for new connections
195 * on the socket/pipe and telling all worker threads
196 * to finish/drain their outgoing responses to other
197 * clients.
198 *
199 * This DOES NOT force an immediate sync shutdown.
200 */
201 return SIMPLE_IPC_QUIT;
202 }
203
204 if (command_len == 4 && !strncmp(command, "ping", 4)) {
205 const char *answer = "pong";
206 return reply_cb(reply_data, answer, strlen(answer));
207 }
208
209 if (command_len == 3 && !strncmp(command, "big", 3))
210 return app__big_command(reply_cb, reply_data);
211
212 if (command_len == 5 && !strncmp(command, "chunk", 5))
213 return app__chunk_command(reply_cb, reply_data);
214
215 if (command_len == 4 && !strncmp(command, "slow", 4))
216 return app__slow_command(reply_cb, reply_data);
217
218 if (command_len >= 10 && starts_with(command, "sendbytes "))
219 return app__sendbytes_command(command, command_len,
220 reply_cb, reply_data);
221
222 return app__unhandled_command(command, reply_cb, reply_data);
223 }
224
225 struct cl_args
226 {
227 const char *subcommand;
228 const char *path;
229 const char *token;
230
231 int nr_threads;
232 int max_wait_sec;
233 int bytecount;
234 int batchsize;
235
236 char bytevalue;
237 };
238
239 static struct cl_args cl_args = {
240 .subcommand = NULL,
241 .path = "ipc-test",
242 .token = NULL,
243
244 .nr_threads = 5,
245 .max_wait_sec = 60,
246 .bytecount = 1024,
247 .batchsize = 10,
248
249 .bytevalue = 'x',
250 };
251
252 /*
253 * This process will run as a simple-ipc server and listen for IPC commands
254 * from client processes.
255 */
256 static int daemon__run_server(void)
257 {
258 int ret;
259
260 struct ipc_server_opts opts = {
261 .nr_threads = cl_args.nr_threads,
262 };
263
264 /*
265 * Synchronously run the ipc-server. We don't need any application
266 * instance data, so pass an arbitrary pointer (that we'll later
267 * verify made the round trip).
268 */
269 ret = ipc_server_run(cl_args.path, &opts, test_app_cb, (void*)&my_app_data);
270 if (ret == -2)
271 error("socket/pipe already in use: '%s'", cl_args.path);
272 else if (ret == -1)
273 error_errno("could not start server on: '%s'", cl_args.path);
274
275 return ret;
276 }
277
278 static start_bg_wait_cb bg_wait_cb;
279
280 static int bg_wait_cb(const struct child_process *cp, void *cb_data)
281 {
282 int s = ipc_get_active_state(cl_args.path);
283
284 switch (s) {
285 case IPC_STATE__LISTENING:
286 /* child is "ready" */
287 return 0;
288
289 case IPC_STATE__NOT_LISTENING:
290 case IPC_STATE__PATH_NOT_FOUND:
291 /* give child more time */
292 return 1;
293
294 default:
295 case IPC_STATE__INVALID_PATH:
296 case IPC_STATE__OTHER_ERROR:
297 /* all the time in world won't help */
298 return -1;
299 }
300 }
301
302 static int daemon__start_server(void)
303 {
304 struct child_process cp = CHILD_PROCESS_INIT;
305 enum start_bg_result sbgr;
306
307 strvec_push(&cp.args, "test-tool");
308 strvec_push(&cp.args, "simple-ipc");
309 strvec_push(&cp.args, "run-daemon");
310 strvec_pushf(&cp.args, "--name=%s", cl_args.path);
311 strvec_pushf(&cp.args, "--threads=%d", cl_args.nr_threads);
312
313 cp.no_stdin = 1;
314 cp.no_stdout = 1;
315 cp.no_stderr = 1;
316
317 sbgr = start_bg_command(&cp, bg_wait_cb, NULL, cl_args.max_wait_sec);
318
319 switch (sbgr) {
320 case SBGR_READY:
321 return 0;
322
323 default:
324 case SBGR_ERROR:
325 case SBGR_CB_ERROR:
326 return error("daemon failed to start");
327
328 case SBGR_TIMEOUT:
329 return error("daemon not online yet");
330
331 case SBGR_DIED:
332 return error("daemon terminated");
333 }
334 }
335
336 /*
337 * This process will run a quick probe to see if a simple-ipc server
338 * is active on this path.
339 *
340 * Returns 0 if the server is alive.
341 */
342 static int client__probe_server(void)
343 {
344 enum ipc_active_state s;
345
346 s = ipc_get_active_state(cl_args.path);
347 switch (s) {
348 case IPC_STATE__LISTENING:
349 return 0;
350
351 case IPC_STATE__NOT_LISTENING:
352 return error("no server listening at '%s'", cl_args.path);
353
354 case IPC_STATE__PATH_NOT_FOUND:
355 return error("path not found '%s'", cl_args.path);
356
357 case IPC_STATE__INVALID_PATH:
358 return error("invalid pipe/socket name '%s'", cl_args.path);
359
360 case IPC_STATE__OTHER_ERROR:
361 default:
362 return error("other error for '%s'", cl_args.path);
363 }
364 }
365
366 /*
367 * Send an IPC command token to an already-running server daemon and
368 * print the response.
369 *
370 * This is a simple 1 word command/token that `test_app_cb()` (in the
371 * daemon process) will understand.
372 */
373 static int client__send_ipc(void)
374 {
375 const char *command = "(no-command)";
376 struct strbuf buf = STRBUF_INIT;
377 struct ipc_client_connect_options options
378 = IPC_CLIENT_CONNECT_OPTIONS_INIT;
379
380 if (cl_args.token && *cl_args.token)
381 command = cl_args.token;
382
383 options.wait_if_busy = 1;
384 options.wait_if_not_found = 0;
385
386 if (!ipc_client_send_command(cl_args.path, &options,
387 command, strlen(command),
388 &buf)) {
389 if (buf.len) {
390 printf("%s\n", buf.buf);
391 fflush(stdout);
392 }
393 strbuf_release(&buf);
394
395 return 0;
396 }
397
398 return error("failed to send '%s' to '%s'", command, cl_args.path);
399 }
400
401 /*
402 * Send an IPC command to an already-running server and ask it to
403 * shutdown. "send quit" is an async request and queues a shutdown
404 * event in the server, so we spin and wait here for it to actually
405 * shutdown to make the unit tests a little easier to write.
406 */
407 static int client__stop_server(void)
408 {
409 int ret;
410 time_t time_limit, now;
411 enum ipc_active_state s;
412
413 time(&time_limit);
414 time_limit += cl_args.max_wait_sec;
415
416 cl_args.token = "quit";
417
418 ret = client__send_ipc();
419 if (ret)
420 return ret;
421
422 for (;;) {
423 sleep_millisec(100);
424
425 s = ipc_get_active_state(cl_args.path);
426
427 if (s != IPC_STATE__LISTENING) {
428 /*
429 * The socket/pipe is gone and/or has stopped
430 * responding. Lets assume that the daemon
431 * process has exited too.
432 */
433 return 0;
434 }
435
436 time(&now);
437 if (now > time_limit)
438 return error("daemon has not shutdown yet");
439 }
440 }
441
442 /*
443 * Send an IPC command followed by ballast to confirm that a large
444 * message can be sent and that the kernel or pkt-line layers will
445 * properly chunk it and that the daemon receives the entire message.
446 */
447 static int do_sendbytes(int bytecount, char byte, const char *path,
448 const struct ipc_client_connect_options *options)
449 {
450 struct strbuf buf_send = STRBUF_INIT;
451 struct strbuf buf_resp = STRBUF_INIT;
452
453 strbuf_addstr(&buf_send, "sendbytes ");
454 strbuf_addchars(&buf_send, byte, bytecount);
455
456 if (!ipc_client_send_command(path, options,
457 buf_send.buf, buf_send.len,
458 &buf_resp)) {
459 strbuf_rtrim(&buf_resp);
460 printf("sent:%c%08d %s\n", byte, bytecount, buf_resp.buf);
461 fflush(stdout);
462 strbuf_release(&buf_send);
463 strbuf_release(&buf_resp);
464
465 return 0;
466 }
467
468 return error("client failed to sendbytes(%d, '%c') to '%s'",
469 bytecount, byte, path);
470 }
471
472 /*
473 * Send an IPC command with ballast to an already-running server daemon.
474 */
475 static int client__sendbytes(void)
476 {
477 struct ipc_client_connect_options options
478 = IPC_CLIENT_CONNECT_OPTIONS_INIT;
479
480 options.wait_if_busy = 1;
481 options.wait_if_not_found = 0;
482 options.uds_disallow_chdir = 0;
483
484 return do_sendbytes(cl_args.bytecount, cl_args.bytevalue, cl_args.path,
485 &options);
486 }
487
488 struct multiple_thread_data {
489 pthread_t pthread_id;
490 struct multiple_thread_data *next;
491 const char *path;
492 int bytecount;
493 int batchsize;
494 int sum_errors;
495 int sum_good;
496 char letter;
497 };
498
499 static void *multiple_thread_proc(void *_multiple_thread_data)
500 {
501 struct multiple_thread_data *d = _multiple_thread_data;
502 int k;
503 struct ipc_client_connect_options options
504 = IPC_CLIENT_CONNECT_OPTIONS_INIT;
505
506 options.wait_if_busy = 1;
507 options.wait_if_not_found = 0;
508 /*
509 * A multi-threaded client should not be randomly calling chdir().
510 * The test will pass without this restriction because the test is
511 * not otherwise accessing the filesystem, but it makes us honest.
512 */
513 options.uds_disallow_chdir = 1;
514
515 trace2_thread_start("multiple");
516
517 for (k = 0; k < d->batchsize; k++) {
518 if (do_sendbytes(d->bytecount + k, d->letter, d->path, &options))
519 d->sum_errors++;
520 else
521 d->sum_good++;
522 }
523
524 trace2_thread_exit();
525 return NULL;
526 }
527
528 /*
529 * Start a client-side thread pool. Each thread sends a series of
530 * IPC requests. Each request is on a new connection to the server.
531 */
532 static int client__multiple(void)
533 {
534 struct multiple_thread_data *list = NULL;
535 int k;
536 int sum_join_errors = 0;
537 int sum_thread_errors = 0;
538 int sum_good = 0;
539
540 for (k = 0; k < cl_args.nr_threads; k++) {
541 struct multiple_thread_data *d = xcalloc(1, sizeof(*d));
542 d->next = list;
543 d->path = cl_args.path;
544 d->bytecount = cl_args.bytecount + cl_args.batchsize*(k/26);
545 d->batchsize = cl_args.batchsize;
546 d->sum_errors = 0;
547 d->sum_good = 0;
548 d->letter = 'A' + (k % 26);
549
550 if (pthread_create(&d->pthread_id, NULL, multiple_thread_proc, d)) {
551 warning("failed to create thread[%d] skipping remainder", k);
552 free(d);
553 break;
554 }
555
556 list = d;
557 }
558
559 while (list) {
560 struct multiple_thread_data *d = list;
561
562 if (pthread_join(d->pthread_id, NULL))
563 sum_join_errors++;
564
565 sum_thread_errors += d->sum_errors;
566 sum_good += d->sum_good;
567
568 list = d->next;
569 free(d);
570 }
571
572 printf("client (good %d) (join %d), (errors %d)\n",
573 sum_good, sum_join_errors, sum_thread_errors);
574
575 return (sum_join_errors + sum_thread_errors) ? 1 : 0;
576 }
577
578 int cmd__simple_ipc(int argc, const char **argv)
579 {
580 const char * const simple_ipc_usage[] = {
581 N_("test-helper simple-ipc is-active [<name>] [<options>]"),
582 N_("test-helper simple-ipc run-daemon [<name>] [<threads>]"),
583 N_("test-helper simple-ipc start-daemon [<name>] [<threads>] [<max-wait>]"),
584 N_("test-helper simple-ipc stop-daemon [<name>] [<max-wait>]"),
585 N_("test-helper simple-ipc send [<name>] [<token>]"),
586 N_("test-helper simple-ipc sendbytes [<name>] [<bytecount>] [<byte>]"),
587 N_("test-helper simple-ipc multiple [<name>] [<threads>] [<bytecount>] [<batchsize>]"),
588 NULL
589 };
590
591 const char *bytevalue = NULL;
592
593 struct option options[] = {
594 #ifndef GIT_WINDOWS_NATIVE
595 OPT_STRING(0, "name", &cl_args.path, N_("name"), N_("name or pathname of unix domain socket")),
596 #else
597 OPT_STRING(0, "name", &cl_args.path, N_("name"), N_("named-pipe name")),
598 #endif
599 OPT_INTEGER(0, "threads", &cl_args.nr_threads, N_("number of threads in server thread pool")),
600 OPT_INTEGER(0, "max-wait", &cl_args.max_wait_sec, N_("seconds to wait for daemon to start or stop")),
601
602 OPT_INTEGER(0, "bytecount", &cl_args.bytecount, N_("number of bytes")),
603 OPT_INTEGER(0, "batchsize", &cl_args.batchsize, N_("number of requests per thread")),
604
605 OPT_STRING(0, "byte", &bytevalue, N_("byte"), N_("ballast character")),
606 OPT_STRING(0, "token", &cl_args.token, N_("token"), N_("command token to send to the server")),
607
608 OPT_END()
609 };
610
611 if (argc < 2)
612 usage_with_options(simple_ipc_usage, options);
613
614 if (argc == 2 && !strcmp(argv[1], "-h"))
615 usage_with_options(simple_ipc_usage, options);
616
617 if (argc == 2 && !strcmp(argv[1], "SUPPORTS_SIMPLE_IPC"))
618 return 0;
619
620 cl_args.subcommand = argv[1];
621
622 argc--;
623 argv++;
624
625 argc = parse_options(argc, argv, NULL, options, simple_ipc_usage, 0);
626
627 if (cl_args.nr_threads < 1)
628 cl_args.nr_threads = 1;
629 if (cl_args.max_wait_sec < 0)
630 cl_args.max_wait_sec = 0;
631 if (cl_args.bytecount < 1)
632 cl_args.bytecount = 1;
633 if (cl_args.batchsize < 1)
634 cl_args.batchsize = 1;
635
636 if (bytevalue && *bytevalue)
637 cl_args.bytevalue = bytevalue[0];
638
639 /*
640 * Use '!!' on all dispatch functions to map from `error()` style
641 * (returns -1) style to `test_must_fail` style (expects 1). This
642 * makes shell error messages less confusing.
643 */
644
645 if (!strcmp(cl_args.subcommand, "is-active"))
646 return !!client__probe_server();
647
648 if (!strcmp(cl_args.subcommand, "run-daemon"))
649 return !!daemon__run_server();
650
651 if (!strcmp(cl_args.subcommand, "start-daemon"))
652 return !!daemon__start_server();
653
654 /*
655 * Client commands follow. Ensure a server is running before
656 * sending any data. This might be overkill, but then again
657 * this is a test harness.
658 */
659
660 if (!strcmp(cl_args.subcommand, "stop-daemon")) {
661 if (client__probe_server())
662 return 1;
663 return !!client__stop_server();
664 }
665
666 if (!strcmp(cl_args.subcommand, "send")) {
667 if (client__probe_server())
668 return 1;
669 return !!client__send_ipc();
670 }
671
672 if (!strcmp(cl_args.subcommand, "sendbytes")) {
673 if (client__probe_server())
674 return 1;
675 return !!client__sendbytes();
676 }
677
678 if (!strcmp(cl_args.subcommand, "multiple")) {
679 if (client__probe_server())
680 return 1;
681 return !!client__multiple();
682 }
683
684 die("Unhandled subcommand: '%s'", cl_args.subcommand);
685 }
686 #endif