2 * test-simple-ipc.c: verify that the Inter-Process Communication works.
8 #include "simple-ipc.h"
9 #include "parse-options.h"
10 #include "thread-utils.h"
12 #include "run-command.h"
15 #ifndef SUPPORTS_SIMPLE_IPC
16 int cmd__simple_ipc(int argc
, const char **argv
)
18 die("simple IPC not available on this platform");
23 * The test daemon defines an "application callback" that supports a
24 * series of commands (see `test_app_cb()`).
26 * Unknown commands are caught here and we send an error message back
27 * to the client process.
29 static int app__unhandled_command(const char *command
,
30 ipc_server_reply_cb
*reply_cb
,
31 struct ipc_server_reply_data
*reply_data
)
33 struct strbuf buf
= STRBUF_INIT
;
36 strbuf_addf(&buf
, "unhandled command: %s", command
);
37 ret
= reply_cb(reply_data
, buf
.buf
, buf
.len
);
44 * Reply with a single very large buffer. This is to ensure that
45 * long response are properly handled -- whether the chunking occurs
46 * in the kernel or in the (probably pkt-line) layer.
48 #define BIG_ROWS (10000)
49 static int app__big_command(ipc_server_reply_cb
*reply_cb
,
50 struct ipc_server_reply_data
*reply_data
)
52 struct strbuf buf
= STRBUF_INIT
;
56 for (row
= 0; row
< BIG_ROWS
; row
++)
57 strbuf_addf(&buf
, "big: %.75d\n", row
);
59 ret
= reply_cb(reply_data
, buf
.buf
, buf
.len
);
66 * Reply with a series of lines. This is to ensure that we can incrementally
67 * compute the response and chunk it to the client.
69 #define CHUNK_ROWS (10000)
70 static int app__chunk_command(ipc_server_reply_cb
*reply_cb
,
71 struct ipc_server_reply_data
*reply_data
)
73 struct strbuf buf
= STRBUF_INIT
;
77 for (row
= 0; row
< CHUNK_ROWS
; row
++) {
78 strbuf_setlen(&buf
, 0);
79 strbuf_addf(&buf
, "big: %.75d\n", row
);
80 ret
= reply_cb(reply_data
, buf
.buf
, buf
.len
);
89 * Slowly reply with a series of lines. This is to model an expensive to
90 * compute chunked response (which might happen if this callback is running
91 * in a thread and is fighting for a lock with other threads).
93 #define SLOW_ROWS (1000)
94 #define SLOW_DELAY_MS (10)
95 static int app__slow_command(ipc_server_reply_cb
*reply_cb
,
96 struct ipc_server_reply_data
*reply_data
)
98 struct strbuf buf
= STRBUF_INIT
;
102 for (row
= 0; row
< SLOW_ROWS
; row
++) {
103 strbuf_setlen(&buf
, 0);
104 strbuf_addf(&buf
, "big: %.75d\n", row
);
105 ret
= reply_cb(reply_data
, buf
.buf
, buf
.len
);
106 sleep_millisec(SLOW_DELAY_MS
);
109 strbuf_release(&buf
);
115 * The client sent a command followed by a (possibly very) large buffer.
117 static int app__sendbytes_command(const char *received
, size_t received_len
,
118 ipc_server_reply_cb
*reply_cb
,
119 struct ipc_server_reply_data
*reply_data
)
121 struct strbuf buf_resp
= STRBUF_INIT
;
129 * The test is setup to send:
130 * "sendbytes" SP <n * char>
132 if (received_len
< strlen("sendbytes "))
133 BUG("received_len is short in app__sendbytes_command");
135 if (skip_prefix(received
, "sendbytes ", &p
))
136 len_ballast
= strlen(p
);
139 * Verify that the ballast is n copies of a single letter.
140 * And that the multi-threaded IO layer didn't cross the streams.
142 for (k
= 1; k
< len_ballast
; k
++)
147 strbuf_addf(&buf_resp
, "errs:%d\n", errs
);
149 strbuf_addf(&buf_resp
, "rcvd:%c%08d\n", p
[0], len_ballast
);
151 ret
= reply_cb(reply_data
, buf_resp
.buf
, buf_resp
.len
);
153 strbuf_release(&buf_resp
);
159 * An arbitrary fixed address to verify that the application instance
160 * data is handled properly.
162 static int my_app_data
= 42;
164 static ipc_server_application_cb test_app_cb
;
167 * This is the "application callback" that sits on top of the
168 * "ipc-server". It completely defines the set of commands supported
169 * by this application.
171 static int test_app_cb(void *application_data
,
172 const char *command
, size_t command_len
,
173 ipc_server_reply_cb
*reply_cb
,
174 struct ipc_server_reply_data
*reply_data
)
177 * Verify that we received the application-data that we passed
178 * when we started the ipc-server. (We have several layers of
179 * callbacks calling callbacks and it's easy to get things mixed
180 * up (especially when some are "void*").)
182 if (application_data
!= (void*)&my_app_data
)
183 BUG("application_cb: application_data pointer wrong");
185 if (command_len
== 4 && !strncmp(command
, "quit", 4)) {
187 * The client sent a "quit" command. This is an async
188 * request for the server to shutdown.
190 * We DO NOT send the client a response message
191 * (because we have nothing to say and the other
192 * server threads have not yet stopped).
194 * Tell the ipc-server layer to start shutting down.
195 * This includes: stop listening for new connections
196 * on the socket/pipe and telling all worker threads
197 * to finish/drain their outgoing responses to other
200 * This DOES NOT force an immediate sync shutdown.
202 return SIMPLE_IPC_QUIT
;
205 if (command_len
== 4 && !strncmp(command
, "ping", 4)) {
206 const char *answer
= "pong";
207 return reply_cb(reply_data
, answer
, strlen(answer
));
210 if (command_len
== 3 && !strncmp(command
, "big", 3))
211 return app__big_command(reply_cb
, reply_data
);
213 if (command_len
== 5 && !strncmp(command
, "chunk", 5))
214 return app__chunk_command(reply_cb
, reply_data
);
216 if (command_len
== 4 && !strncmp(command
, "slow", 4))
217 return app__slow_command(reply_cb
, reply_data
);
219 if (command_len
>= 10 && starts_with(command
, "sendbytes "))
220 return app__sendbytes_command(command
, command_len
,
221 reply_cb
, reply_data
);
223 return app__unhandled_command(command
, reply_cb
, reply_data
);
228 const char *subcommand
;
240 static struct cl_args cl_args
= {
254 * This process will run as a simple-ipc server and listen for IPC commands
255 * from client processes.
257 static int daemon__run_server(void)
261 struct ipc_server_opts opts
= {
262 .nr_threads
= cl_args
.nr_threads
,
266 * Synchronously run the ipc-server. We don't need any application
267 * instance data, so pass an arbitrary pointer (that we'll later
268 * verify made the round trip).
270 ret
= ipc_server_run(cl_args
.path
, &opts
, test_app_cb
, (void*)&my_app_data
);
272 error("socket/pipe already in use: '%s'", cl_args
.path
);
274 error_errno("could not start server on: '%s'", cl_args
.path
);
279 static start_bg_wait_cb bg_wait_cb
;
281 static int bg_wait_cb(const struct child_process
*cp UNUSED
,
282 void *cb_data UNUSED
)
284 int s
= ipc_get_active_state(cl_args
.path
);
287 case IPC_STATE__LISTENING
:
288 /* child is "ready" */
291 case IPC_STATE__NOT_LISTENING
:
292 case IPC_STATE__PATH_NOT_FOUND
:
293 /* give child more time */
297 case IPC_STATE__INVALID_PATH
:
298 case IPC_STATE__OTHER_ERROR
:
299 /* all the time in world won't help */
304 static int daemon__start_server(void)
306 struct child_process cp
= CHILD_PROCESS_INIT
;
307 enum start_bg_result sbgr
;
309 strvec_push(&cp
.args
, "test-tool");
310 strvec_push(&cp
.args
, "simple-ipc");
311 strvec_push(&cp
.args
, "run-daemon");
312 strvec_pushf(&cp
.args
, "--name=%s", cl_args
.path
);
313 strvec_pushf(&cp
.args
, "--threads=%d", cl_args
.nr_threads
);
319 sbgr
= start_bg_command(&cp
, bg_wait_cb
, NULL
, cl_args
.max_wait_sec
);
328 return error("daemon failed to start");
331 return error("daemon not online yet");
334 return error("daemon terminated");
339 * This process will run a quick probe to see if a simple-ipc server
340 * is active on this path.
342 * Returns 0 if the server is alive.
344 static int client__probe_server(void)
346 enum ipc_active_state s
;
348 s
= ipc_get_active_state(cl_args
.path
);
350 case IPC_STATE__LISTENING
:
353 case IPC_STATE__NOT_LISTENING
:
354 return error("no server listening at '%s'", cl_args
.path
);
356 case IPC_STATE__PATH_NOT_FOUND
:
357 return error("path not found '%s'", cl_args
.path
);
359 case IPC_STATE__INVALID_PATH
:
360 return error("invalid pipe/socket name '%s'", cl_args
.path
);
362 case IPC_STATE__OTHER_ERROR
:
364 return error("other error for '%s'", cl_args
.path
);
369 * Send an IPC command token to an already-running server daemon and
370 * print the response.
372 * This is a simple 1 word command/token that `test_app_cb()` (in the
373 * daemon process) will understand.
375 static int client__send_ipc(void)
377 const char *command
= "(no-command)";
378 struct strbuf buf
= STRBUF_INIT
;
379 struct ipc_client_connect_options options
380 = IPC_CLIENT_CONNECT_OPTIONS_INIT
;
382 if (cl_args
.token
&& *cl_args
.token
)
383 command
= cl_args
.token
;
385 options
.wait_if_busy
= 1;
386 options
.wait_if_not_found
= 0;
388 if (!ipc_client_send_command(cl_args
.path
, &options
,
389 command
, strlen(command
),
392 printf("%s\n", buf
.buf
);
395 strbuf_release(&buf
);
400 return error("failed to send '%s' to '%s'", command
, cl_args
.path
);
404 * Send an IPC command to an already-running server and ask it to
405 * shutdown. "send quit" is an async request and queues a shutdown
406 * event in the server, so we spin and wait here for it to actually
407 * shutdown to make the unit tests a little easier to write.
409 static int client__stop_server(void)
412 time_t time_limit
, now
;
413 enum ipc_active_state s
;
416 time_limit
+= cl_args
.max_wait_sec
;
418 cl_args
.token
= "quit";
420 ret
= client__send_ipc();
427 s
= ipc_get_active_state(cl_args
.path
);
429 if (s
!= IPC_STATE__LISTENING
) {
431 * The socket/pipe is gone and/or has stopped
432 * responding. Lets assume that the daemon
433 * process has exited too.
439 if (now
> time_limit
)
440 return error("daemon has not shutdown yet");
445 * Send an IPC command followed by ballast to confirm that a large
446 * message can be sent and that the kernel or pkt-line layers will
447 * properly chunk it and that the daemon receives the entire message.
449 static int do_sendbytes(int bytecount
, char byte
, const char *path
,
450 const struct ipc_client_connect_options
*options
)
452 struct strbuf buf_send
= STRBUF_INIT
;
453 struct strbuf buf_resp
= STRBUF_INIT
;
455 strbuf_addstr(&buf_send
, "sendbytes ");
456 strbuf_addchars(&buf_send
, byte
, bytecount
);
458 if (!ipc_client_send_command(path
, options
,
459 buf_send
.buf
, buf_send
.len
,
461 strbuf_rtrim(&buf_resp
);
462 printf("sent:%c%08d %s\n", byte
, bytecount
, buf_resp
.buf
);
464 strbuf_release(&buf_send
);
465 strbuf_release(&buf_resp
);
470 return error("client failed to sendbytes(%d, '%c') to '%s'",
471 bytecount
, byte
, path
);
475 * Send an IPC command with ballast to an already-running server daemon.
477 static int client__sendbytes(void)
479 struct ipc_client_connect_options options
480 = IPC_CLIENT_CONNECT_OPTIONS_INIT
;
482 options
.wait_if_busy
= 1;
483 options
.wait_if_not_found
= 0;
484 options
.uds_disallow_chdir
= 0;
486 return do_sendbytes(cl_args
.bytecount
, cl_args
.bytevalue
, cl_args
.path
,
490 struct multiple_thread_data
{
491 pthread_t pthread_id
;
492 struct multiple_thread_data
*next
;
501 static void *multiple_thread_proc(void *_multiple_thread_data
)
503 struct multiple_thread_data
*d
= _multiple_thread_data
;
505 struct ipc_client_connect_options options
506 = IPC_CLIENT_CONNECT_OPTIONS_INIT
;
508 options
.wait_if_busy
= 1;
509 options
.wait_if_not_found
= 0;
511 * A multi-threaded client should not be randomly calling chdir().
512 * The test will pass without this restriction because the test is
513 * not otherwise accessing the filesystem, but it makes us honest.
515 options
.uds_disallow_chdir
= 1;
517 trace2_thread_start("multiple");
519 for (k
= 0; k
< d
->batchsize
; k
++) {
520 if (do_sendbytes(d
->bytecount
+ k
, d
->letter
, d
->path
, &options
))
526 trace2_thread_exit();
531 * Start a client-side thread pool. Each thread sends a series of
532 * IPC requests. Each request is on a new connection to the server.
534 static int client__multiple(void)
536 struct multiple_thread_data
*list
= NULL
;
538 int sum_join_errors
= 0;
539 int sum_thread_errors
= 0;
542 for (k
= 0; k
< cl_args
.nr_threads
; k
++) {
543 struct multiple_thread_data
*d
= xcalloc(1, sizeof(*d
));
545 d
->path
= cl_args
.path
;
546 d
->bytecount
= cl_args
.bytecount
+ cl_args
.batchsize
*(k
/26);
547 d
->batchsize
= cl_args
.batchsize
;
550 d
->letter
= 'A' + (k
% 26);
552 if (pthread_create(&d
->pthread_id
, NULL
, multiple_thread_proc
, d
)) {
553 warning("failed to create thread[%d] skipping remainder", k
);
562 struct multiple_thread_data
*d
= list
;
564 if (pthread_join(d
->pthread_id
, NULL
))
567 sum_thread_errors
+= d
->sum_errors
;
568 sum_good
+= d
->sum_good
;
574 printf("client (good %d) (join %d), (errors %d)\n",
575 sum_good
, sum_join_errors
, sum_thread_errors
);
577 return (sum_join_errors
+ sum_thread_errors
) ? 1 : 0;
580 int cmd__simple_ipc(int argc
, const char **argv
)
582 const char * const simple_ipc_usage
[] = {
583 N_("test-helper simple-ipc is-active [<name>] [<options>]"),
584 N_("test-helper simple-ipc run-daemon [<name>] [<threads>]"),
585 N_("test-helper simple-ipc start-daemon [<name>] [<threads>] [<max-wait>]"),
586 N_("test-helper simple-ipc stop-daemon [<name>] [<max-wait>]"),
587 N_("test-helper simple-ipc send [<name>] [<token>]"),
588 N_("test-helper simple-ipc sendbytes [<name>] [<bytecount>] [<byte>]"),
589 N_("test-helper simple-ipc multiple [<name>] [<threads>] [<bytecount>] [<batchsize>]"),
593 const char *bytevalue
= NULL
;
595 struct option options
[] = {
596 #ifndef GIT_WINDOWS_NATIVE
597 OPT_STRING(0, "name", &cl_args
.path
, N_("name"), N_("name or pathname of unix domain socket")),
599 OPT_STRING(0, "name", &cl_args
.path
, N_("name"), N_("named-pipe name")),
601 OPT_INTEGER(0, "threads", &cl_args
.nr_threads
, N_("number of threads in server thread pool")),
602 OPT_INTEGER(0, "max-wait", &cl_args
.max_wait_sec
, N_("seconds to wait for daemon to start or stop")),
604 OPT_INTEGER(0, "bytecount", &cl_args
.bytecount
, N_("number of bytes")),
605 OPT_INTEGER(0, "batchsize", &cl_args
.batchsize
, N_("number of requests per thread")),
607 OPT_STRING(0, "byte", &bytevalue
, N_("byte"), N_("ballast character")),
608 OPT_STRING(0, "token", &cl_args
.token
, N_("token"), N_("command token to send to the server")),
614 usage_with_options(simple_ipc_usage
, options
);
616 if (argc
== 2 && !strcmp(argv
[1], "-h"))
617 usage_with_options(simple_ipc_usage
, options
);
619 if (argc
== 2 && !strcmp(argv
[1], "SUPPORTS_SIMPLE_IPC"))
622 cl_args
.subcommand
= argv
[1];
627 argc
= parse_options(argc
, argv
, NULL
, options
, simple_ipc_usage
, 0);
629 if (cl_args
.nr_threads
< 1)
630 cl_args
.nr_threads
= 1;
631 if (cl_args
.max_wait_sec
< 0)
632 cl_args
.max_wait_sec
= 0;
633 if (cl_args
.bytecount
< 1)
634 cl_args
.bytecount
= 1;
635 if (cl_args
.batchsize
< 1)
636 cl_args
.batchsize
= 1;
638 if (bytevalue
&& *bytevalue
)
639 cl_args
.bytevalue
= bytevalue
[0];
642 * Use '!!' on all dispatch functions to map from `error()` style
643 * (returns -1) style to `test_must_fail` style (expects 1). This
644 * makes shell error messages less confusing.
647 if (!strcmp(cl_args
.subcommand
, "is-active"))
648 return !!client__probe_server();
650 if (!strcmp(cl_args
.subcommand
, "run-daemon"))
651 return !!daemon__run_server();
653 if (!strcmp(cl_args
.subcommand
, "start-daemon"))
654 return !!daemon__start_server();
657 * Client commands follow. Ensure a server is running before
658 * sending any data. This might be overkill, but then again
659 * this is a test harness.
662 if (!strcmp(cl_args
.subcommand
, "stop-daemon")) {
663 if (client__probe_server())
665 return !!client__stop_server();
668 if (!strcmp(cl_args
.subcommand
, "send")) {
669 if (client__probe_server())
671 return !!client__send_ipc();
674 if (!strcmp(cl_args
.subcommand
, "sendbytes")) {
675 if (client__probe_server())
677 return !!client__sendbytes();
680 if (!strcmp(cl_args
.subcommand
, "multiple")) {
681 if (client__probe_server())
683 return !!client__multiple();
686 die("Unhandled subcommand: '%s'", cl_args
.subcommand
);