}
} else if (!strncmp(var, "exchange", 8)) {
exchange = switch_core_strdup(profile->pool, "TAP.Events");
+ } else if (!strncmp(var, "exchange_type", 13)) {
+ exchange_type = switch_core_strdup(profile->pool, "topic");
} else if (!strncmp(var, "format_fields", 13)) {
int size = 0;
if ((size = mod_amqp_count_chars(val, ',')) >= MAX_ROUTING_KEY_FORMAT_FIELDS) {
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "Profile[%s] was unable to connect to any connection\n", profile->name);
}
+ amqp_exchange_declare(profile->conn_active->state, 1,
+ amqp_cstring_bytes(profile->exchange),
+ amqp_cstring_bytes(profile->exchange_type),
+ 0,
+ 1,
+ amqp_empty_table);
+
+ if (mod_amqp_log_if_amqp_error(amqp_get_rpc_reply(profile->conn_active->state), "Declaring exchange")) {
+ switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "Profile[%s] failed to create exchange\n", profile->name);
+ goto err;
+ }
+
/* Create a bounded FIFO queue for sending messages */
if (switch_queue_create(&(profile->send_queue), profile->send_queue_size, profile->pool) != SWITCH_STATUS_SUCCESS) {
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "Cannot create send queue of size %d!\n", profile->send_queue_size);
durable,
amqp_empty_table);
- if (mod_amqp_log_if_amqp_error(amqp_get_rpc_reply(profile->conn_active->state), "Declaring exchange")) {
+ if (!mod_amqp_log_if_amqp_error(amqp_get_rpc_reply(profile->conn_active->state), "Declaring exchange")) {
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_INFO, "Amqp reconnect successful- connected\n");
continue;
}