return FALSE;
}
-
START_TEST(test_async)
{
stream_service_t *service;
}
END_TEST
+static void concurrency(void *data, stream_t *stream)
+{
+ static refcount_t refs = 0;
+ u_int current;
+ ssize_t len;
+
+ current = ref_get(&refs);
+ ck_assert(current <= 3);
+ len = stream->write(stream, "x", 1, TRUE);
+ ck_assert_int_eq(len, 1);
+ usleep(1000);
+ ignore_result(ref_put(&refs));
+}
+
+START_TEST(test_concurrency)
+{
+ stream_service_t *service;
+ stream_t *streams[10];
+ ssize_t len;
+ char x;
+ int i;
+
+ lib->processor->set_threads(lib->processor, 8);
+
+ service = lib->streams->create_service(lib->streams, services[_i], 10);
+ ck_assert(service != NULL);
+ service->on_accept(service, concurrency, NULL, JOB_PRIO_HIGH, 3);
+
+ for (i = 0; i < countof(streams); i++)
+ {
+ streams[i] = lib->streams->connect(lib->streams, services[_i]);
+ ck_assert(streams[i] != NULL);
+ }
+ for (i = 0; i < countof(streams); i++)
+ {
+ len = streams[i]->read(streams[i], &x, 1, TRUE);
+ ck_assert_int_eq(len, 1);
+ ck_assert_int_eq(x, 'x');
+ }
+ for (i = 0; i < countof(streams); i++)
+ {
+ streams[i]->destroy(streams[i]);
+ }
+ service->destroy(service);
+}
+END_TEST
+
Suite *stream_suite_create()
{
Suite *s;
tcase_add_loop_test(tc, test_async, 0, countof(services));
suite_add_tcase(s, tc);
+ tc = tcase_create("concurrency");
+ tcase_add_loop_test(tc, test_concurrency, 0, countof(services));
+ suite_add_tcase(s, tc);
+
return s;
}