2 * test-simple-ipc.c: verify that the Inter-Process Communication works.
8 #include "simple-ipc.h"
9 #include "parse-options.h"
10 #include "thread-utils.h"
12 #include "run-command.h"
15 #ifndef SUPPORTS_SIMPLE_IPC
16 int cmd__simple_ipc(int argc
, const char **argv
)
18 die("simple IPC not available on this platform");
23 * The test daemon defines an "application callback" that supports a
24 * series of commands (see `test_app_cb()`).
26 * Unknown commands are caught here and we send an error message back
27 * to the client process.
29 static int app__unhandled_command(const char *command
,
30 ipc_server_reply_cb
*reply_cb
,
31 struct ipc_server_reply_data
*reply_data
)
33 struct strbuf buf
= STRBUF_INIT
;
36 strbuf_addf(&buf
, "unhandled command: %s", command
);
37 ret
= reply_cb(reply_data
, buf
.buf
, buf
.len
);
44 * Reply with a single very large buffer. This is to ensure that
45 * long response are properly handled -- whether the chunking occurs
46 * in the kernel or in the (probably pkt-line) layer.
48 #define BIG_ROWS (10000)
49 static int app__big_command(ipc_server_reply_cb
*reply_cb
,
50 struct ipc_server_reply_data
*reply_data
)
52 struct strbuf buf
= STRBUF_INIT
;
56 for (row
= 0; row
< BIG_ROWS
; row
++)
57 strbuf_addf(&buf
, "big: %.75d\n", row
);
59 ret
= reply_cb(reply_data
, buf
.buf
, buf
.len
);
66 * Reply with a series of lines. This is to ensure that we can incrementally
67 * compute the response and chunk it to the client.
69 #define CHUNK_ROWS (10000)
70 static int app__chunk_command(ipc_server_reply_cb
*reply_cb
,
71 struct ipc_server_reply_data
*reply_data
)
73 struct strbuf buf
= STRBUF_INIT
;
77 for (row
= 0; row
< CHUNK_ROWS
; row
++) {
78 strbuf_setlen(&buf
, 0);
79 strbuf_addf(&buf
, "big: %.75d\n", row
);
80 ret
= reply_cb(reply_data
, buf
.buf
, buf
.len
);
89 * Slowly reply with a series of lines. This is to model an expensive to
90 * compute chunked response (which might happen if this callback is running
91 * in a thread and is fighting for a lock with other threads).
93 #define SLOW_ROWS (1000)
94 #define SLOW_DELAY_MS (10)
95 static int app__slow_command(ipc_server_reply_cb
*reply_cb
,
96 struct ipc_server_reply_data
*reply_data
)
98 struct strbuf buf
= STRBUF_INIT
;
102 for (row
= 0; row
< SLOW_ROWS
; row
++) {
103 strbuf_setlen(&buf
, 0);
104 strbuf_addf(&buf
, "big: %.75d\n", row
);
105 ret
= reply_cb(reply_data
, buf
.buf
, buf
.len
);
106 sleep_millisec(SLOW_DELAY_MS
);
109 strbuf_release(&buf
);
115 * The client sent a command followed by a (possibly very) large buffer.
117 static int app__sendbytes_command(const char *received
, size_t received_len
,
118 ipc_server_reply_cb
*reply_cb
,
119 struct ipc_server_reply_data
*reply_data
)
121 struct strbuf buf_resp
= STRBUF_INIT
;
129 * The test is setup to send:
130 * "sendbytes" SP <n * char>
132 if (received_len
< strlen("sendbytes "))
133 BUG("received_len is short in app__sendbytes_command");
135 if (skip_prefix(received
, "sendbytes ", &p
))
136 len_ballast
= strlen(p
);
139 * Verify that the ballast is n copies of a single letter.
140 * And that the multi-threaded IO layer didn't cross the streams.
142 for (k
= 1; k
< len_ballast
; k
++)
147 strbuf_addf(&buf_resp
, "errs:%d\n", errs
);
149 strbuf_addf(&buf_resp
, "rcvd:%c%08d\n", p
[0], len_ballast
);
151 ret
= reply_cb(reply_data
, buf_resp
.buf
, buf_resp
.len
);
153 strbuf_release(&buf_resp
);
159 * An arbitrary fixed address to verify that the application instance
160 * data is handled properly.
162 static int my_app_data
= 42;
164 static ipc_server_application_cb test_app_cb
;
167 * This is the "application callback" that sits on top of the
168 * "ipc-server". It completely defines the set of commands supported
169 * by this application.
171 static int test_app_cb(void *application_data
,
172 const char *command
, size_t command_len
,
173 ipc_server_reply_cb
*reply_cb
,
174 struct ipc_server_reply_data
*reply_data
)
177 * Verify that we received the application-data that we passed
178 * when we started the ipc-server. (We have several layers of
179 * callbacks calling callbacks and it's easy to get things mixed
180 * up (especially when some are "void*").)
182 if (application_data
!= (void*)&my_app_data
)
183 BUG("application_cb: application_data pointer wrong");
185 if (command_len
== 4 && !strncmp(command
, "quit", 4)) {
187 * The client sent a "quit" command. This is an async
188 * request for the server to shutdown.
190 * We DO NOT send the client a response message
191 * (because we have nothing to say and the other
192 * server threads have not yet stopped).
194 * Tell the ipc-server layer to start shutting down.
195 * This includes: stop listening for new connections
196 * on the socket/pipe and telling all worker threads
197 * to finish/drain their outgoing responses to other
200 * This DOES NOT force an immediate sync shutdown.
202 return SIMPLE_IPC_QUIT
;
205 if (command_len
== 4 && !strncmp(command
, "ping", 4)) {
206 const char *answer
= "pong";
207 return reply_cb(reply_data
, answer
, strlen(answer
));
210 if (command_len
== 3 && !strncmp(command
, "big", 3))
211 return app__big_command(reply_cb
, reply_data
);
213 if (command_len
== 5 && !strncmp(command
, "chunk", 5))
214 return app__chunk_command(reply_cb
, reply_data
);
216 if (command_len
== 4 && !strncmp(command
, "slow", 4))
217 return app__slow_command(reply_cb
, reply_data
);
219 if (command_len
>= 10 && starts_with(command
, "sendbytes "))
220 return app__sendbytes_command(command
, command_len
,
221 reply_cb
, reply_data
);
223 return app__unhandled_command(command
, reply_cb
, reply_data
);
228 const char *subcommand
;
240 static struct cl_args cl_args
= {
254 * This process will run as a simple-ipc server and listen for IPC commands
255 * from client processes.
257 static int daemon__run_server(void)
261 struct ipc_server_opts opts
= {
262 .nr_threads
= cl_args
.nr_threads
,
266 * Synchronously run the ipc-server. We don't need any application
267 * instance data, so pass an arbitrary pointer (that we'll later
268 * verify made the round trip).
270 ret
= ipc_server_run(cl_args
.path
, &opts
, test_app_cb
, (void*)&my_app_data
);
272 error("socket/pipe already in use: '%s'", cl_args
.path
);
274 error_errno("could not start server on: '%s'", cl_args
.path
);
279 static start_bg_wait_cb bg_wait_cb
;
281 static int bg_wait_cb(const struct child_process
*cp
, void *cb_data
)
283 int s
= ipc_get_active_state(cl_args
.path
);
286 case IPC_STATE__LISTENING
:
287 /* child is "ready" */
290 case IPC_STATE__NOT_LISTENING
:
291 case IPC_STATE__PATH_NOT_FOUND
:
292 /* give child more time */
296 case IPC_STATE__INVALID_PATH
:
297 case IPC_STATE__OTHER_ERROR
:
298 /* all the time in world won't help */
303 static int daemon__start_server(void)
305 struct child_process cp
= CHILD_PROCESS_INIT
;
306 enum start_bg_result sbgr
;
308 strvec_push(&cp
.args
, "test-tool");
309 strvec_push(&cp
.args
, "simple-ipc");
310 strvec_push(&cp
.args
, "run-daemon");
311 strvec_pushf(&cp
.args
, "--name=%s", cl_args
.path
);
312 strvec_pushf(&cp
.args
, "--threads=%d", cl_args
.nr_threads
);
318 sbgr
= start_bg_command(&cp
, bg_wait_cb
, NULL
, cl_args
.max_wait_sec
);
327 return error("daemon failed to start");
330 return error("daemon not online yet");
333 return error("daemon terminated");
338 * This process will run a quick probe to see if a simple-ipc server
339 * is active on this path.
341 * Returns 0 if the server is alive.
343 static int client__probe_server(void)
345 enum ipc_active_state s
;
347 s
= ipc_get_active_state(cl_args
.path
);
349 case IPC_STATE__LISTENING
:
352 case IPC_STATE__NOT_LISTENING
:
353 return error("no server listening at '%s'", cl_args
.path
);
355 case IPC_STATE__PATH_NOT_FOUND
:
356 return error("path not found '%s'", cl_args
.path
);
358 case IPC_STATE__INVALID_PATH
:
359 return error("invalid pipe/socket name '%s'", cl_args
.path
);
361 case IPC_STATE__OTHER_ERROR
:
363 return error("other error for '%s'", cl_args
.path
);
368 * Send an IPC command token to an already-running server daemon and
369 * print the response.
371 * This is a simple 1 word command/token that `test_app_cb()` (in the
372 * daemon process) will understand.
374 static int client__send_ipc(void)
376 const char *command
= "(no-command)";
377 struct strbuf buf
= STRBUF_INIT
;
378 struct ipc_client_connect_options options
379 = IPC_CLIENT_CONNECT_OPTIONS_INIT
;
381 if (cl_args
.token
&& *cl_args
.token
)
382 command
= cl_args
.token
;
384 options
.wait_if_busy
= 1;
385 options
.wait_if_not_found
= 0;
387 if (!ipc_client_send_command(cl_args
.path
, &options
,
388 command
, strlen(command
),
391 printf("%s\n", buf
.buf
);
394 strbuf_release(&buf
);
399 return error("failed to send '%s' to '%s'", command
, cl_args
.path
);
403 * Send an IPC command to an already-running server and ask it to
404 * shutdown. "send quit" is an async request and queues a shutdown
405 * event in the server, so we spin and wait here for it to actually
406 * shutdown to make the unit tests a little easier to write.
408 static int client__stop_server(void)
411 time_t time_limit
, now
;
412 enum ipc_active_state s
;
415 time_limit
+= cl_args
.max_wait_sec
;
417 cl_args
.token
= "quit";
419 ret
= client__send_ipc();
426 s
= ipc_get_active_state(cl_args
.path
);
428 if (s
!= IPC_STATE__LISTENING
) {
430 * The socket/pipe is gone and/or has stopped
431 * responding. Lets assume that the daemon
432 * process has exited too.
438 if (now
> time_limit
)
439 return error("daemon has not shutdown yet");
444 * Send an IPC command followed by ballast to confirm that a large
445 * message can be sent and that the kernel or pkt-line layers will
446 * properly chunk it and that the daemon receives the entire message.
448 static int do_sendbytes(int bytecount
, char byte
, const char *path
,
449 const struct ipc_client_connect_options
*options
)
451 struct strbuf buf_send
= STRBUF_INIT
;
452 struct strbuf buf_resp
= STRBUF_INIT
;
454 strbuf_addstr(&buf_send
, "sendbytes ");
455 strbuf_addchars(&buf_send
, byte
, bytecount
);
457 if (!ipc_client_send_command(path
, options
,
458 buf_send
.buf
, buf_send
.len
,
460 strbuf_rtrim(&buf_resp
);
461 printf("sent:%c%08d %s\n", byte
, bytecount
, buf_resp
.buf
);
463 strbuf_release(&buf_send
);
464 strbuf_release(&buf_resp
);
469 return error("client failed to sendbytes(%d, '%c') to '%s'",
470 bytecount
, byte
, path
);
474 * Send an IPC command with ballast to an already-running server daemon.
476 static int client__sendbytes(void)
478 struct ipc_client_connect_options options
479 = IPC_CLIENT_CONNECT_OPTIONS_INIT
;
481 options
.wait_if_busy
= 1;
482 options
.wait_if_not_found
= 0;
483 options
.uds_disallow_chdir
= 0;
485 return do_sendbytes(cl_args
.bytecount
, cl_args
.bytevalue
, cl_args
.path
,
489 struct multiple_thread_data
{
490 pthread_t pthread_id
;
491 struct multiple_thread_data
*next
;
500 static void *multiple_thread_proc(void *_multiple_thread_data
)
502 struct multiple_thread_data
*d
= _multiple_thread_data
;
504 struct ipc_client_connect_options options
505 = IPC_CLIENT_CONNECT_OPTIONS_INIT
;
507 options
.wait_if_busy
= 1;
508 options
.wait_if_not_found
= 0;
510 * A multi-threaded client should not be randomly calling chdir().
511 * The test will pass without this restriction because the test is
512 * not otherwise accessing the filesystem, but it makes us honest.
514 options
.uds_disallow_chdir
= 1;
516 trace2_thread_start("multiple");
518 for (k
= 0; k
< d
->batchsize
; k
++) {
519 if (do_sendbytes(d
->bytecount
+ k
, d
->letter
, d
->path
, &options
))
525 trace2_thread_exit();
530 * Start a client-side thread pool. Each thread sends a series of
531 * IPC requests. Each request is on a new connection to the server.
533 static int client__multiple(void)
535 struct multiple_thread_data
*list
= NULL
;
537 int sum_join_errors
= 0;
538 int sum_thread_errors
= 0;
541 for (k
= 0; k
< cl_args
.nr_threads
; k
++) {
542 struct multiple_thread_data
*d
= xcalloc(1, sizeof(*d
));
544 d
->path
= cl_args
.path
;
545 d
->bytecount
= cl_args
.bytecount
+ cl_args
.batchsize
*(k
/26);
546 d
->batchsize
= cl_args
.batchsize
;
549 d
->letter
= 'A' + (k
% 26);
551 if (pthread_create(&d
->pthread_id
, NULL
, multiple_thread_proc
, d
)) {
552 warning("failed to create thread[%d] skipping remainder", k
);
561 struct multiple_thread_data
*d
= list
;
563 if (pthread_join(d
->pthread_id
, NULL
))
566 sum_thread_errors
+= d
->sum_errors
;
567 sum_good
+= d
->sum_good
;
573 printf("client (good %d) (join %d), (errors %d)\n",
574 sum_good
, sum_join_errors
, sum_thread_errors
);
576 return (sum_join_errors
+ sum_thread_errors
) ? 1 : 0;
579 int cmd__simple_ipc(int argc
, const char **argv
)
581 const char * const simple_ipc_usage
[] = {
582 N_("test-helper simple-ipc is-active [<name>] [<options>]"),
583 N_("test-helper simple-ipc run-daemon [<name>] [<threads>]"),
584 N_("test-helper simple-ipc start-daemon [<name>] [<threads>] [<max-wait>]"),
585 N_("test-helper simple-ipc stop-daemon [<name>] [<max-wait>]"),
586 N_("test-helper simple-ipc send [<name>] [<token>]"),
587 N_("test-helper simple-ipc sendbytes [<name>] [<bytecount>] [<byte>]"),
588 N_("test-helper simple-ipc multiple [<name>] [<threads>] [<bytecount>] [<batchsize>]"),
592 const char *bytevalue
= NULL
;
594 struct option options
[] = {
595 #ifndef GIT_WINDOWS_NATIVE
596 OPT_STRING(0, "name", &cl_args
.path
, N_("name"), N_("name or pathname of unix domain socket")),
598 OPT_STRING(0, "name", &cl_args
.path
, N_("name"), N_("named-pipe name")),
600 OPT_INTEGER(0, "threads", &cl_args
.nr_threads
, N_("number of threads in server thread pool")),
601 OPT_INTEGER(0, "max-wait", &cl_args
.max_wait_sec
, N_("seconds to wait for daemon to start or stop")),
603 OPT_INTEGER(0, "bytecount", &cl_args
.bytecount
, N_("number of bytes")),
604 OPT_INTEGER(0, "batchsize", &cl_args
.batchsize
, N_("number of requests per thread")),
606 OPT_STRING(0, "byte", &bytevalue
, N_("byte"), N_("ballast character")),
607 OPT_STRING(0, "token", &cl_args
.token
, N_("token"), N_("command token to send to the server")),
613 usage_with_options(simple_ipc_usage
, options
);
615 if (argc
== 2 && !strcmp(argv
[1], "-h"))
616 usage_with_options(simple_ipc_usage
, options
);
618 if (argc
== 2 && !strcmp(argv
[1], "SUPPORTS_SIMPLE_IPC"))
621 cl_args
.subcommand
= argv
[1];
626 argc
= parse_options(argc
, argv
, NULL
, options
, simple_ipc_usage
, 0);
628 if (cl_args
.nr_threads
< 1)
629 cl_args
.nr_threads
= 1;
630 if (cl_args
.max_wait_sec
< 0)
631 cl_args
.max_wait_sec
= 0;
632 if (cl_args
.bytecount
< 1)
633 cl_args
.bytecount
= 1;
634 if (cl_args
.batchsize
< 1)
635 cl_args
.batchsize
= 1;
637 if (bytevalue
&& *bytevalue
)
638 cl_args
.bytevalue
= bytevalue
[0];
641 * Use '!!' on all dispatch functions to map from `error()` style
642 * (returns -1) style to `test_must_fail` style (expects 1). This
643 * makes shell error messages less confusing.
646 if (!strcmp(cl_args
.subcommand
, "is-active"))
647 return !!client__probe_server();
649 if (!strcmp(cl_args
.subcommand
, "run-daemon"))
650 return !!daemon__run_server();
652 if (!strcmp(cl_args
.subcommand
, "start-daemon"))
653 return !!daemon__start_server();
656 * Client commands follow. Ensure a server is running before
657 * sending any data. This might be overkill, but then again
658 * this is a test harness.
661 if (!strcmp(cl_args
.subcommand
, "stop-daemon")) {
662 if (client__probe_server())
664 return !!client__stop_server();
667 if (!strcmp(cl_args
.subcommand
, "send")) {
668 if (client__probe_server())
670 return !!client__send_ipc();
673 if (!strcmp(cl_args
.subcommand
, "sendbytes")) {
674 if (client__probe_server())
676 return !!client__sendbytes();
679 if (!strcmp(cl_args
.subcommand
, "multiple")) {
680 if (client__probe_server())
682 return !!client__multiple();
685 die("Unhandled subcommand: '%s'", cl_args
.subcommand
);