}
switch_status_t mod_amqp_producer_routing_key(mod_amqp_producer_profile_t *profile, char routingKey[MAX_AMQP_ROUTING_KEY_LENGTH],
- switch_event_t* evt, char* routingKeyEventHeaderNames[])
+ switch_event_t* evt, mod_amqp_keypart_t routingKeyEventHeaderNames[])
{
- int i = 0, idx = 0;
+ int i = 0, idx = 0, x = 0;
+ char keybuffer[MAX_AMQP_ROUTING_KEY_LENGTH];
for (i = 0; i < MAX_ROUTING_KEY_FORMAT_FIELDS && idx < MAX_AMQP_ROUTING_KEY_LENGTH; i++) {
- if (routingKeyEventHeaderNames[i]) {
+ if (routingKeyEventHeaderNames[i].size) {
if (idx) {
routingKey[idx++] = '.';
}
- if ( profile->enable_fallback_format_fields) {
- int count = 0, x = 0;
- char *argv[10];
-
- count = switch_separate_string(routingKeyEventHeaderNames[i], '|', argv, (sizeof(argv) / sizeof(argv[0])));
- for( x = 0; x < count; x++) {
- if (argv[x][0] == '#') {
- strncpy(routingKey + idx, argv[x] + 1, MAX_AMQP_ROUTING_KEY_LENGTH - idx);
+ for( x = 0; x < routingKeyEventHeaderNames[i].size; x++) {
+ if (routingKeyEventHeaderNames[i].name[x][0] == '#') {
+ strncpy(routingKey + idx, routingKeyEventHeaderNames[i].name[x] + 1, MAX_AMQP_ROUTING_KEY_LENGTH - idx);
+ break;
+ } else {
+ char *value = switch_event_get_header(evt, routingKeyEventHeaderNames[i].name[x]);
+ if (value) {
+ amqp_util_encode(value, keybuffer);
+ strncpy(routingKey + idx, keybuffer, MAX_AMQP_ROUTING_KEY_LENGTH - idx);
break;
- } else {
- char *value = switch_event_get_header(evt, argv[x]);
-
- if (!value) {
- continue;
- }
-
- strncpy(routingKey + idx, value, MAX_AMQP_ROUTING_KEY_LENGTH - idx);
-
- /* Replace dots with underscores so that the routing key does not get corrupted */
- switch_replace_char(routingKey + idx, '.', '_', 0);
}
}
- idx += strlen(routingKey + idx);
- } else {
- if (routingKeyEventHeaderNames[i][0] == '#') {
- strncpy(routingKey + idx, routingKeyEventHeaderNames[i] + 1, MAX_AMQP_ROUTING_KEY_LENGTH - idx);
- } else {
- char *value = switch_event_get_header(evt, routingKeyEventHeaderNames[i]);
- strncpy(routingKey + idx, value ? value : "", MAX_AMQP_ROUTING_KEY_LENGTH - idx);
-
- /* Replace dots with underscores so that the routing key does not get corrupted */
- switch_replace_char(routingKey + idx, '.', '_', 0);
- }
- idx += strlen(routingKey + idx);
}
+ idx += strlen(routingKey + idx);
}
}
return SWITCH_STATUS_SUCCESS;
char *argv[SWITCH_EVENT_ALL];
switch_xml_t params, param, connections, connection;
switch_threadattr_t *thd_attr = NULL;
- char *exchange = NULL, *exchange_type = NULL;
+ char *exchange = NULL, *exchange_type = NULL, *content_type = NULL;
+ int exchange_durable = 1; /* durable */
+ int exchange_auto_delete = 0;
+ int delivery_mode = -1;
+ int delivery_timestamp = 1;
switch_memory_pool_t *pool;
+ char *format_fields[MAX_ROUTING_KEY_FORMAT_FIELDS+1];
+ int format_fields_size = 0;
+
+ memset(format_fields, 0, MAX_ROUTING_KEY_FORMAT_FIELDS + 1);
if (switch_core_new_memory_pool(&pool) != SWITCH_STATUS_SUCCESS) {
goto err;
profile->pool = pool;
profile->name = switch_core_strdup(profile->pool, name);
profile->running = 1;
- memset(profile->format_fields, 0, MAX_ROUTING_KEY_FORMAT_FIELDS + 1);
+ memset(profile->format_fields, 0, (MAX_ROUTING_KEY_FORMAT_FIELDS + 1) * sizeof(mod_amqp_keypart_t));
profile->event_ids[0] = SWITCH_EVENT_ALL;
profile->event_subscriptions = 1;
profile->conn_root = NULL;
profile->conn_active = NULL;
-
/* Set reasonable defaults which may change if more reasonable defaults are found */
/* Handle defaults of non string types */
profile->circuit_breaker_ms = 10000;
int interval = atoi(val);
if ( interval && interval > 0 ) {
profile->enable_fallback_format_fields = 1;
+ switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_INFO, "amqp fallback format fields enabled\n");
}
- } else if (!strncmp(var, "exchange_type", 13)) {
+ } else if (!strncmp(var, "exchange-type", 13)) {
exchange_type = switch_core_strdup(profile->pool, val);
- } else if (!strncmp(var, "exchange", 8)) {
+ } else if (!strncmp(var, "exchange-name", 13)) {
exchange = switch_core_strdup(profile->pool, val);
+ } else if (!strncmp(var, "exchange-durable", 16)) {
+ exchange_durable = switch_true(val);
+ } else if (!strncmp(var, "exchange-auto-delete", 20)) {
+ exchange_auto_delete = switch_true(val);
+ } else if (!strncmp(var, "delivery-mode", 13)) {
+ delivery_mode = atoi(val);
+ } else if (!strncmp(var, "delivery-timestamp", 18)) {
+ delivery_timestamp = switch_true(val);
+ } else if (!strncmp(var, "exchange_type", 13)) {
+ switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "Found exchange_type parameter. please change to exchange-type\n");
+ } else if (!strncmp(var, "exchange", 8)) {
+ switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "Found exchange parameter. please change to exchange-name\n");
+ } else if (!strncmp(var, "content-type", 12)) {
+ content_type = switch_core_strdup(profile->pool, val);
} else if (!strncmp(var, "format_fields", 13)) {
- int size = 0;
- if ((size = mod_amqp_count_chars(val, ',')) >= MAX_ROUTING_KEY_FORMAT_FIELDS) {
+ switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_INFO, "amqp format fields : %s\n", val);
+ if ((format_fields_size = mod_amqp_count_chars(val, ',')) >= MAX_ROUTING_KEY_FORMAT_FIELDS) {
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_CRIT, "You can have only %d routing fields in the routing key.\n",
MAX_ROUTING_KEY_FORMAT_FIELDS);
goto err;
}
/* increment size because the count returned the number of separators, not number of fields */
- size++;
-
- switch_separate_string(val, ',', profile->format_fields, size);
- profile->format_fields[size] = NULL;
+ format_fields_size++;
+ switch_separate_string(val, ',', format_fields, MAX_ROUTING_KEY_FORMAT_FIELDS);
+ format_fields[format_fields_size] = NULL;
} else if (!strncmp(var, "event_filter", 12)) {
/* Parse new events */
profile->event_subscriptions = switch_separate_string(val, ',', argv, (sizeof(argv) / sizeof(argv[0])));
/* Handle defaults of string types */
profile->exchange = exchange ? exchange : switch_core_strdup(profile->pool, "TAP.Events");
profile->exchange_type = exchange_type ? exchange_type : switch_core_strdup(profile->pool, "topic");
+ profile->exchange_durable = exchange_durable;
+ profile->exchange_auto_delete = exchange_auto_delete;
+ profile->delivery_mode = delivery_mode;
+ profile->delivery_timestamp = delivery_timestamp;
+ profile->content_type = content_type ? content_type : switch_core_strdup(profile->pool, MOD_AMQP_DEFAULT_CONTENT_TYPE);
+
+
+ for(i = 0; i < format_fields_size; i++) {
+ switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_INFO, "amqp routing key %d : %s\n", i, format_fields[i]);
+ if(profile->enable_fallback_format_fields) {
+ profile->format_fields[i].size = switch_separate_string(format_fields[i], '|', profile->format_fields[i].name, MAX_ROUTING_KEY_FORMAT_FALLBACK_FIELDS);
+ if(profile->format_fields[i].size > 1) {
+ for(arg = 0; arg < profile->format_fields[i].size; arg++) {
+ switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_INFO, "amqp routing key %d : sub key %d : %s\n", i, arg, profile->format_fields[i].name[arg]);
+ }
+ }
+ } else {
+ profile->format_fields[i].name[0] = format_fields[i];
+ profile->format_fields[i].size = 1;
+ }
+ }
+
if ((connections = switch_xml_child(cfg, "connections")) != NULL) {
for (connection = switch_xml_child(connections, "connection"); connection; connection = connection->next) {
amqp_exchange_declare(profile->conn_active->state, 1,
amqp_cstring_bytes(profile->exchange),
amqp_cstring_bytes(profile->exchange_type),
- 0,
- 1,
+ profile->exchange_durable,
+ profile->exchange_auto_delete,
amqp_empty_table);
if (mod_amqp_log_if_amqp_error(amqp_get_rpc_reply(profile->conn_active->state), "Declaring exchange")) {
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_CRIT, "Profile[%s] not active\n", profile->name);
return SWITCH_STATUS_NOT_INITALIZED;
}
+ memset(&props, 0, sizeof(amqp_basic_properties_t));
- props._flags = AMQP_BASIC_CONTENT_TYPE_FLAG | AMQP_BASIC_DELIVERY_MODE_FLAG | AMQP_BASIC_TIMESTAMP_FLAG | AMQP_BASIC_HEADERS_FLAG;
- props.content_type = amqp_cstring_bytes("text/json");
- props.delivery_mode = 1; /* non persistent delivery mode */
- props.timestamp = (uint64_t)time(NULL);
+ props._flags = AMQP_BASIC_CONTENT_TYPE_FLAG;
+ props.content_type = amqp_cstring_bytes(profile->content_type);
- props.headers.num_entries = 1;
- props.headers.entries = messageTableEntries;
+ if(profile->delivery_mode > 0) {
+ props._flags |= AMQP_BASIC_DELIVERY_MODE_FLAG;
+ props.delivery_mode = profile->delivery_mode;
+ }
- messageTableEntries[0].key = amqp_cstring_bytes("x_Liquid_MessageSentTimeStamp");
- messageTableEntries[0].value.kind = AMQP_FIELD_KIND_TIMESTAMP;
- messageTableEntries[0].value.value.u64 = (uint64_t)switch_micro_time_now();
+ if(profile->delivery_timestamp) {
+ props._flags |= AMQP_BASIC_TIMESTAMP_FLAG | AMQP_BASIC_HEADERS_FLAG;
+ props.timestamp = (uint64_t)time(NULL);
+ props.headers.num_entries = 1;
+ props.headers.entries = messageTableEntries;
+ messageTableEntries[0].key = amqp_cstring_bytes("x_Liquid_MessageSentTimeStamp");
+ messageTableEntries[0].value.kind = AMQP_FIELD_KIND_TIMESTAMP;
+ messageTableEntries[0].value.value.u64 = (uint64_t)switch_micro_time_now();
+ }
status = amqp_basic_publish(
profile->conn_active->state,