}
END_TEST
+static bool all(void *data, stream_t *stream)
+{
+ char buf[64], *pos;
+ ssize_t len;
+ int i;
+
+ pos = buf;
+ for (i = 0; i < msglen; i++)
+ {
+ len = stream->read(stream, pos, 1, TRUE);
+ ck_assert_int_eq(len, 1);
+ pos += len;
+ }
+ pos = buf;
+ for (i = 0; i < msglen; i++)
+ {
+ len = stream->write(stream, pos, 1, TRUE);
+ ck_assert_int_eq(len, 1);
+ pos += len;
+ }
+
+ return FALSE;
+}
+
+START_TEST(test_all)
+{
+ char buf[64];
+ stream_service_t *service;
+ stream_t *stream;
+
+ lib->processor->set_threads(lib->processor, 8);
+
+ service = lib->streams->create_service(lib->streams, services[_i], 1);
+ ck_assert(service != NULL);
+ service->on_accept(service, all, NULL, JOB_PRIO_HIGH, 1);
+
+ stream = lib->streams->connect(lib->streams, services[_i]);
+ ck_assert(stream != NULL);
+ ck_assert(stream->write_all(stream, msg, msglen));
+ ck_assert(stream->read_all(stream, buf, msglen));
+ ck_assert(streq(buf, msg));
+ stream->destroy(stream);
+
+ service->destroy(service);
+}
+END_TEST
+
static void concurrency(void *data, stream_t *stream)
{
static refcount_t refs = 0;
tcase_add_loop_test(tc, test_async, 0, countof(services));
suite_add_tcase(s, tc);
+ tc = tcase_create("all");
+ tcase_add_loop_test(tc, test_all, 0, countof(services));
+ suite_add_tcase(s, tc);
+
tc = tcase_create("concurrency");
tcase_add_loop_test(tc, test_concurrency, 0, countof(services));
suite_add_tcase(s, tc);