#include "asterisk/stasis_message_router.h"
#include "asterisk/test.h"
-static const char *test_category = "/stasis/core/";
+#define test_category "/stasis/core/"
+
+static struct ast_event *fake_event(struct stasis_message *message)
+{
+ return ast_event_new(AST_EVENT_CUSTOM,
+ AST_EVENT_IE_DESCRIPTION, AST_EVENT_IE_PLTYPE_STR, "Dummy", AST_EVENT_IE_END);
+}
static struct ast_json *fake_json(struct stasis_message *message, const struct stasis_message_sanitizer *sanitize)
{
return AST_TEST_PASS;
}
+struct test_message_types {
+ struct stasis_message_type *none;
+ struct stasis_message_type *ami;
+ struct stasis_message_type *json;
+ struct stasis_message_type *event;
+ struct stasis_message_type *amievent;
+ struct stasis_message_type *type1;
+ struct stasis_message_type *type2;
+ struct stasis_message_type *type3;
+ struct stasis_message_type *change;
+};
+
+static void destroy_message_types(void *obj)
+{
+ struct test_message_types *types = obj;
+
+ ao2_cleanup(types->none);
+ ao2_cleanup(types->ami);
+ ao2_cleanup(types->json);
+ ao2_cleanup(types->event);
+ ao2_cleanup(types->amievent);
+ ao2_cleanup(types->type1);
+ ao2_cleanup(types->type2);
+ ao2_cleanup(types->type3);
+ /* N.B. Don't cleanup types->change! */
+}
+
+static struct test_message_types *create_message_types(struct ast_test *test)
+{
+ struct stasis_message_vtable vtable = { 0 };
+ struct test_message_types *types;
+ enum ast_test_result_state __attribute__ ((unused)) rc;
+
+ types = ao2_alloc(sizeof(*types), destroy_message_types);
+ if (!types) {
+ return NULL;
+ }
+
+ ast_test_validate_cleanup(test,
+ stasis_message_type_create("TestMessageNONE", &vtable, &types->none) == STASIS_MESSAGE_TYPE_SUCCESS,
+ rc, cleanup);
+
+ vtable.to_ami = fake_ami;
+ ast_test_validate_cleanup(test,
+ stasis_message_type_create("TestMessageAMI", &vtable, &types->ami) == STASIS_MESSAGE_TYPE_SUCCESS,
+ rc, cleanup);
+
+ vtable.to_ami = NULL;
+ vtable.to_json = fake_json;
+ ast_test_validate_cleanup(test,
+ stasis_message_type_create("TestMessageJSON", &vtable, &types->json) == STASIS_MESSAGE_TYPE_SUCCESS,
+ rc, cleanup);
+
+ vtable.to_ami = NULL;
+ vtable.to_json = NULL;
+ vtable.to_event = fake_event;
+ ast_test_validate_cleanup(test,
+ stasis_message_type_create("TestMessageEVENT", &vtable, &types->event) == STASIS_MESSAGE_TYPE_SUCCESS,
+ rc, cleanup);
+
+ vtable.to_ami = fake_ami;
+ ast_test_validate_cleanup(test,
+ stasis_message_type_create("TestMessageAMIEVENT", &vtable, &types->amievent) == STASIS_MESSAGE_TYPE_SUCCESS,
+ rc, cleanup);
+
+ ast_test_validate_cleanup(test,
+ stasis_message_type_create("TestMessageType1", NULL, &types->type1) == STASIS_MESSAGE_TYPE_SUCCESS,
+ rc, cleanup);
+
+ ast_test_validate_cleanup(test,
+ stasis_message_type_create("TestMessageType2", NULL, &types->type2) == STASIS_MESSAGE_TYPE_SUCCESS,
+ rc, cleanup);
+
+ ast_test_validate_cleanup(test,
+ stasis_message_type_create("TestMessageType3", NULL, &types->type3) == STASIS_MESSAGE_TYPE_SUCCESS,
+ rc, cleanup);
+
+ types->change = stasis_subscription_change_type();
+
+ return types;
+
+cleanup:
+ ao2_cleanup(types);
+ return NULL;
+}
+
+struct cts {
+ struct consumer *consumer;
+ struct stasis_topic *topic;
+ struct stasis_subscription *sub;
+};
+
+static void destroy_cts(void *obj)
+{
+ struct cts *c = obj;
+
+ stasis_unsubscribe(c->sub);
+ ao2_cleanup(c->topic);
+ ao2_cleanup(c->consumer);
+}
+
+static struct cts *create_cts(struct ast_test *test)
+{
+ struct cts *cts = ao2_alloc(sizeof(*cts), destroy_cts);
+ enum ast_test_result_state __attribute__ ((unused)) rc;
+
+ ast_test_validate_cleanup(test, cts, rc, cleanup);
+
+ cts->topic = stasis_topic_create("TestTopic");
+ ast_test_validate_cleanup(test, NULL != cts->topic, rc, cleanup);
+
+ cts->consumer = consumer_create(0);
+ ast_test_validate_cleanup(test, NULL != cts->consumer, rc, cleanup);
+
+ ao2_ref(cts->consumer, +1);
+ cts->sub = stasis_subscribe(cts->topic, consumer_exec, cts->consumer);
+ ast_test_validate_cleanup(test, NULL != cts->sub, rc, cleanup);
+
+ return cts;
+
+cleanup:
+ ao2_cleanup(cts);
+ return NULL;
+}
+
+static int is_msg(struct stasis_message *msg, struct stasis_message_type *mtype, const char *data)
+{
+ struct stasis_subscription_change *msg_data = stasis_message_data(msg);
+
+ if (stasis_message_type(msg) != mtype) {
+ return 0;
+ }
+
+ if (data) {
+ return (strcmp(data, msg_data->description) == 0);
+ }
+
+ return 1;
+}
+
+static void dump_consumer(struct ast_test *test, struct cts *cts)
+{
+ int i;
+ struct stasis_subscription_change *data;
+
+ ast_test_status_update(test, "Messages received: %ld Final? %s\n", cts->consumer->messages_rxed_len,
+ cts->consumer->complete ? "yes" : "no");
+ for (i = 0; i < cts->consumer->messages_rxed_len; i++) {
+ data = stasis_message_data(cts->consumer->messages_rxed[i]);
+ ast_test_status_update(test, "Message type received: %s %s\n",
+ stasis_message_type_name(stasis_message_type(cts->consumer->messages_rxed[i])),
+ data && data->description ? data->description : "no data");
+ }
+}
+
+static int send_msg(struct ast_test *test, struct cts *cts, struct stasis_message_type *msg_type,
+ const char *data)
+{
+ struct stasis_message *msg;
+ struct stasis_subscription_change *test_data =
+ ao2_alloc(sizeof(*test_data) + (data ? strlen(data) : strlen("no data")) + 1, NULL);
+
+ if (!test_data) {
+ return 0;
+ }
+ strcpy(test_data->description, S_OR(data, "no data")); /* Safe */
+
+ msg = stasis_message_create(msg_type, test_data);
+ ao2_ref(test_data, -1);
+ if (!msg) {
+ ast_test_status_update(test, "Unable to create %s message\n",
+ stasis_message_type_name(msg_type));
+ return 0;
+ }
+
+ stasis_publish(cts->topic, msg);
+ ao2_ref(msg, -1);
+
+ return 1;
+}
+
+AST_TEST_DEFINE(type_filters)
+{
+ RAII_VAR(struct cts *, cts, NULL, ao2_cleanup);
+ RAII_VAR(struct test_message_types *, types, NULL, ao2_cleanup);
+ int ix = 0;
+
+ switch (cmd) {
+ case TEST_INIT:
+ info->name = __func__;
+ info->category = test_category "filtering/";
+ info->summary = "Test message filtering by type";
+ info->description = "Test message filtering by type";
+ return AST_TEST_NOT_RUN;
+ case TEST_EXECUTE:
+ break;
+ }
+
+ types = create_message_types(test);
+ ast_test_validate(test, NULL != types);
+
+ cts = create_cts(test);
+ ast_test_validate(test, NULL != cts);
+
+ ast_test_validate(test, stasis_subscription_accept_message_type(cts->sub, types->type1) == 0);
+ ast_test_validate(test, stasis_subscription_accept_message_type(cts->sub, types->type2) == 0);
+ ast_test_validate(test, stasis_subscription_accept_message_type(cts->sub, types->change) == 0);
+ ast_test_validate(test, stasis_subscription_set_filter(cts->sub, STASIS_SUBSCRIPTION_FILTER_SELECTIVE) == 0);
+
+ /* We should get these */
+ ast_test_validate(test, send_msg(test, cts, types->type1, "Pass"));
+ ast_test_validate(test, send_msg(test, cts, types->type2, "Pass"));
+ /* ... but not this one */
+ ast_test_validate(test, send_msg(test, cts, types->type3, "FAIL"));
+
+ /* Wait for change(subscribe) and "Pass" messages */
+ consumer_wait_for(cts->consumer, 3);
+
+ /* Remove type 1 */
+ ast_test_validate(test, stasis_subscription_decline_message_type(cts->sub, types->type1) == 0);
+
+ /* We should now NOT get this one */
+ ast_test_validate(test, send_msg(test, cts, types->type1, "FAIL"));
+ /* We should get this one (again) */
+ ast_test_validate(test, send_msg(test, cts, types->type2, "Pass2"));
+ /* We still should NOT get this one */
+ ast_test_validate(test, send_msg(test, cts, types->type3, "FAIL"));
+
+ /* We should now have a second type2 */
+ consumer_wait_for(cts->consumer, 4);
+
+ stasis_unsubscribe(cts->sub);
+ cts->sub = NULL;
+ consumer_wait_for_completion(cts->consumer);
+
+ dump_consumer(test, cts);
+
+ ast_test_validate(test, 1 == cts->consumer->complete);
+ ast_test_validate(test, 5 == cts->consumer->messages_rxed_len);
+ ast_test_validate(test, is_msg(cts->consumer->messages_rxed[ix++], types->change, "Subscribe"));
+ ast_test_validate(test, is_msg(cts->consumer->messages_rxed[ix++], types->type1, "Pass"));
+ ast_test_validate(test, is_msg(cts->consumer->messages_rxed[ix++], types->type2, "Pass"));
+ ast_test_validate(test, is_msg(cts->consumer->messages_rxed[ix++], types->type2, "Pass2"));
+ ast_test_validate(test, is_msg(cts->consumer->messages_rxed[ix++], types->change, "Unsubscribe"));
+
+ return AST_TEST_PASS;
+}
+
+AST_TEST_DEFINE(formatter_filters)
+{
+ RAII_VAR(struct cts *, cts, NULL, ao2_cleanup);
+ RAII_VAR(struct test_message_types *, types, NULL, ao2_cleanup) ;
+ int ix = 0;
+
+ switch (cmd) {
+ case TEST_INIT:
+ info->name = __func__;
+ info->category = test_category "filtering/";
+ info->summary = "Test message filtering by formatter";
+ info->description = "Test message filtering by formatter";
+ return AST_TEST_NOT_RUN;
+ case TEST_EXECUTE:
+ break;
+ }
+
+ types = create_message_types(test);
+ ast_test_validate(test, NULL != types);
+
+ cts = create_cts(test);
+ ast_test_validate(test, NULL != cts);
+
+ stasis_subscription_accept_formatters(cts->sub,
+ STASIS_SUBSCRIPTION_FORMATTER_AMI | STASIS_SUBSCRIPTION_FORMATTER_JSON);
+
+ /* We should get these */
+ ast_test_validate(test, send_msg(test, cts, types->ami, "Pass"));
+ ast_test_validate(test, send_msg(test, cts, types->json, "Pass"));
+ ast_test_validate(test, send_msg(test, cts, types->amievent, "Pass"));
+
+ /* ... but not these */
+ ast_test_validate(test, send_msg(test, cts, types->none, "FAIL"));
+ ast_test_validate(test, send_msg(test, cts, types->event, "FAIL"));
+ ast_test_validate(test, send_msg(test, cts, types->type1, "FAIL"));
+
+ /* Wait for change(subscribe) and the "Pass" messages */
+ consumer_wait_for(cts->consumer, 4);
+
+ /* Change the subscription to accept only event formatters */
+ stasis_subscription_accept_formatters(cts->sub, STASIS_SUBSCRIPTION_FORMATTER_EVENT);
+
+ /* We should NOT get these now */
+ ast_test_validate(test, send_msg(test, cts, types->ami, "FAIL"));
+ ast_test_validate(test, send_msg(test, cts, types->json, "FAIL"));
+ /* ... but we should still get this one */
+ ast_test_validate(test, send_msg(test, cts, types->amievent, "Pass2"));
+ /* ... and this one should be new */
+ ast_test_validate(test, send_msg(test, cts, types->event, "Pass"));
+
+ /* We should now have a second amievent */
+ consumer_wait_for(cts->consumer, 6);
+
+ stasis_unsubscribe(cts->sub);
+ cts->sub = NULL;
+ consumer_wait_for_completion(cts->consumer);
+
+ dump_consumer(test, cts);
+
+ ast_test_validate(test, 1 == cts->consumer->complete);
+ ast_test_validate(test, 7 == cts->consumer->messages_rxed_len);
+ ast_test_validate(test, is_msg(cts->consumer->messages_rxed[ix++], types->change, "Subscribe"));
+ ast_test_validate(test, is_msg(cts->consumer->messages_rxed[ix++], types->ami, "Pass"));
+ ast_test_validate(test, is_msg(cts->consumer->messages_rxed[ix++], types->json, "Pass"));
+ ast_test_validate(test, is_msg(cts->consumer->messages_rxed[ix++], types->amievent, "Pass"));
+ ast_test_validate(test, is_msg(cts->consumer->messages_rxed[ix++], types->amievent, "Pass2"));
+ ast_test_validate(test, is_msg(cts->consumer->messages_rxed[ix++], types->event, "Pass"));
+ ast_test_validate(test, is_msg(cts->consumer->messages_rxed[ix++], types->change, "Unsubscribe"));
+
+ return AST_TEST_PASS;
+}
+
+AST_TEST_DEFINE(combo_filters)
+{
+ RAII_VAR(struct cts *, cts, NULL, ao2_cleanup);
+ RAII_VAR(struct test_message_types *, types, NULL, ao2_cleanup);
+ int ix = 0;
+
+ switch (cmd) {
+ case TEST_INIT:
+ info->name = __func__;
+ info->category = test_category "filtering/";
+ info->summary = "Test message filtering by type and formatter";
+ info->description = "Test message filtering by type and formatter";
+ return AST_TEST_NOT_RUN;
+ case TEST_EXECUTE:
+ break;
+ }
+
+ types = create_message_types(test);
+ ast_test_validate(test, NULL != types);
+
+ cts = create_cts(test);
+ ast_test_validate(test, NULL != cts);
+
+ ast_test_validate(test, stasis_subscription_accept_message_type(cts->sub, types->type1) == 0);
+ ast_test_validate(test, stasis_subscription_accept_message_type(cts->sub, types->type2) == 0);
+ ast_test_validate(test, stasis_subscription_accept_message_type(cts->sub, types->change) == 0);
+ ast_test_validate(test, stasis_subscription_set_filter(cts->sub, STASIS_SUBSCRIPTION_FILTER_SELECTIVE) == 0);
+ stasis_subscription_accept_formatters(cts->sub,
+ STASIS_SUBSCRIPTION_FORMATTER_AMI | STASIS_SUBSCRIPTION_FORMATTER_JSON);
+
+ /* We should get these */
+ ast_test_validate(test, send_msg(test, cts, types->type1, "Pass"));
+ ast_test_validate(test, send_msg(test, cts, types->type2, "Pass"));
+ ast_test_validate(test, send_msg(test, cts, types->ami, "Pass"));
+ ast_test_validate(test, send_msg(test, cts, types->amievent, "Pass"));
+ ast_test_validate(test, send_msg(test, cts, types->json, "Pass"));
+
+ /* ... but not these */
+ ast_test_validate(test, send_msg(test, cts, types->type3, "FAIL"));
+ ast_test_validate(test, send_msg(test, cts, types->event, "FAIL"));
+
+ /* Wait for change(subscribe) and the "Pass" messages */
+ consumer_wait_for(cts->consumer, 6);
+
+ stasis_unsubscribe(cts->sub);
+ cts->sub = NULL;
+ consumer_wait_for_completion(cts->consumer);
+
+ dump_consumer(test, cts);
+
+ ast_test_validate(test, 1 == cts->consumer->complete);
+ ast_test_validate(test, 7 == cts->consumer->messages_rxed_len);
+ ast_test_validate(test, is_msg(cts->consumer->messages_rxed[ix++], types->change, "Subscribe"));
+ ast_test_validate(test, is_msg(cts->consumer->messages_rxed[ix++], types->type1, "Pass"));
+ ast_test_validate(test, is_msg(cts->consumer->messages_rxed[ix++], types->type2, "Pass"));
+ ast_test_validate(test, is_msg(cts->consumer->messages_rxed[ix++], types->ami, "Pass"));
+ ast_test_validate(test, is_msg(cts->consumer->messages_rxed[ix++], types->amievent, "Pass"));
+ ast_test_validate(test, is_msg(cts->consumer->messages_rxed[ix++], types->json, "Pass"));
+ ast_test_validate(test, is_msg(cts->consumer->messages_rxed[ix++], types->change, "Unsubscribe"));
+
+ return AST_TEST_PASS;
+}
+
static int unload_module(void)
{
AST_TEST_UNREGISTER(message_type);
AST_TEST_UNREGISTER(to_ami);
AST_TEST_UNREGISTER(dtor_order);
AST_TEST_UNREGISTER(caching_dtor_order);
+ AST_TEST_UNREGISTER(type_filters);
+ AST_TEST_UNREGISTER(formatter_filters);
+ AST_TEST_UNREGISTER(combo_filters);
return 0;
}
AST_TEST_REGISTER(to_ami);
AST_TEST_REGISTER(dtor_order);
AST_TEST_REGISTER(caching_dtor_order);
+ AST_TEST_REGISTER(type_filters);
+ AST_TEST_REGISTER(formatter_filters);
+ AST_TEST_REGISTER(combo_filters);
return AST_MODULE_LOAD_SUCCESS;
}