]> git.ipfire.org Git - thirdparty/freeswitch.git/commitdiff
FS-7526: add enable_fallback_format_fields for mod_amqp producer profiles if the...
authorWilliam King <william.king@quentustech.com>
Sun, 3 May 2015 18:23:30 +0000 (11:23 -0700)
committerWilliam King <william.king@quentustech.com>
Sun, 3 May 2015 18:23:30 +0000 (11:23 -0700)
conf/vanilla/autoload_configs/amqp.conf.xml
src/mod/event_handlers/mod_amqp/mod_amqp.h
src/mod/event_handlers/mod_amqp/mod_amqp_producer.c

index ccc81f51fd576c89c247ea14094044107b29fd33..0d139169a97c2725b01b3e945d4a62f456ea415f 100644 (file)
        <param name="circuit_breaker_ms" value="10000"/>
        <param name="reconnect_interval_ms" value="1000"/>
        <param name="send_queue_size" value="5000"/>
+       <param name="enable_fallback_format_fields" value="1"/>
        
        <!-- The routing key is made from the format string, using the header values in the event specified in the format_fields.-->
        <!-- Fields that are prefixed with a # are treated as literals rather than doing a header lookup -->
        <param name="format_fields" value="#FreeSWITCH,FreeSWITCH-Hostname,Event-Name,Event-Subclass,Unique-ID"/>
+
+       <!-- If enable_fallback_format_fields is enabled, then you can | separate event headers, and if the first does not exist
+            then the system will check additional configured header values.
+       -->
+       <!-- <param name="format_fields" value="#FreeSWITCH,FreeSWITCH-Hostname|#Unknown,Event-Name,Event-Subclass,Unique-ID"/> -->
        
        <!--    <param name="eventFilter" value="SWITCH_EVENT_ALL"/> -->
        <param name="event_filter" value="SWITCH_EVENT_CHANNEL_CREATE,SWITCH_EVENT_CHANNEL_DESTROY,SWITCH_EVENT_HEARTBEAT,SWITCH_EVENT_DTMF"/>
index 07528b234ebce2d18a953c7c8996166ebcd16871..f651a1a89bfedbde89053e2faf5d8ee58b5ab55a 100644 (file)
@@ -104,6 +104,7 @@ typedef struct {
   int reconnect_interval_ms;
   int circuit_breaker_ms;
   switch_time_t circuit_breaker_reset_time;
+  switch_bool_t enable_fallback_format_fields;
 
   switch_bool_t running;
   switch_memory_pool_t *pool;
@@ -156,7 +157,8 @@ void * SWITCH_THREAD_FUNC mod_amqp_command_thread(switch_thread_t *thread, void
 
 /* producer */
 void mod_amqp_producer_event_handler(switch_event_t* evt);
-switch_status_t mod_amqp_producer_routing_key(char routingKey[MAX_AMQP_ROUTING_KEY_LENGTH],switch_event_t* evt, char* routingKeyEventHeaderNames[]);
+switch_status_t mod_amqp_producer_routing_key(mod_amqp_producer_profile_t *profile, char routingKey[MAX_AMQP_ROUTING_KEY_LENGTH],
+                                             switch_event_t* evt, char* routingKeyEventHeaderNames[]);
 switch_status_t mod_amqp_producer_destroy(mod_amqp_producer_profile_t **profile);
 switch_status_t mod_amqp_producer_create(char *name, switch_xml_t cfg);
 void * SWITCH_THREAD_FUNC mod_amqp_producer_thread(switch_thread_t *thread, void *data);
index 067bb57d7bd6010ad780f24b0b4561ed2e488cb4..0108bdaa795e9559294633bb893cb4e551d6ccaa 100644 (file)
@@ -45,7 +45,8 @@ void mod_amqp_producer_msg_destroy(mod_amqp_message_t **msg)
        switch_safe_free(*msg);
 }
 
-switch_status_t mod_amqp_producer_routing_key(char routingKey[MAX_AMQP_ROUTING_KEY_LENGTH], switch_event_t* evt, char* routingKeyEventHeaderNames[])
+switch_status_t mod_amqp_producer_routing_key(mod_amqp_producer_profile_t *profile, char routingKey[MAX_AMQP_ROUTING_KEY_LENGTH],
+                                                                                         switch_event_t* evt, char* routingKeyEventHeaderNames[])
 {
        int i = 0, idx = 0;
 
@@ -54,16 +55,41 @@ switch_status_t mod_amqp_producer_routing_key(char routingKey[MAX_AMQP_ROUTING_K
                        if (idx) {
                                routingKey[idx++] = '.';
                        }
-                       if (routingKeyEventHeaderNames[i][0] == '#') {
-                               strncpy(routingKey + idx, routingKeyEventHeaderNames[i] + 1, MAX_AMQP_ROUTING_KEY_LENGTH - idx);
+                       if ( profile->enable_fallback_format_fields) {
+                               int count = 0, x = 0;
+                               char *argv[10];
+
+                               count = switch_separate_string(routingKeyEventHeaderNames[i], '|', argv, (sizeof(argv) / sizeof(argv[0])));
+                               for( x = 0; x < count; x++) {
+                                       if (argv[x][0] == '#') {
+                                               strncpy(routingKey + idx, argv[x] + 1, MAX_AMQP_ROUTING_KEY_LENGTH - idx);
+                                               break;
+                                       } else {
+                                               char *value = switch_event_get_header(evt, argv[x]);
+
+                                               if (!value) {
+                                                       continue;
+                                               }
+
+                                               strncpy(routingKey + idx, value, MAX_AMQP_ROUTING_KEY_LENGTH - idx);
+
+                                               /* Replace dots with underscores so that the routing key does not get corrupted */
+                                               switch_replace_char(routingKey + idx, '.', '_', 0);
+                                       }
+                               }
+                               idx += strlen(routingKey + idx);
                        } else {
-                               char *value = switch_event_get_header(evt, routingKeyEventHeaderNames[i]);
-                               strncpy(routingKey + idx, value ? value : "", MAX_AMQP_ROUTING_KEY_LENGTH - idx);
-
-                               /* Replace dots with underscores so that the routing key does not get corrupted */
-                               switch_replace_char(routingKey + idx, '.', '_', 0);
+                               if (routingKeyEventHeaderNames[i][0] == '#') {
+                                       strncpy(routingKey + idx, routingKeyEventHeaderNames[i] + 1, MAX_AMQP_ROUTING_KEY_LENGTH - idx);
+                               } else {
+                                       char *value = switch_event_get_header(evt, routingKeyEventHeaderNames[i]);
+                                       strncpy(routingKey + idx, value ? value : "", MAX_AMQP_ROUTING_KEY_LENGTH - idx);
+
+                                       /* Replace dots with underscores so that the routing key does not get corrupted */
+                                       switch_replace_char(routingKey + idx, '.', '_', 0);
+                               }
+                               idx += strlen(routingKey + idx);
                        }
-                       idx += strlen(routingKey + idx);
                }
        }
        return SWITCH_STATUS_SUCCESS;
@@ -97,7 +123,7 @@ void mod_amqp_producer_event_handler(switch_event_t* evt)
        switch_malloc(amqp_message, sizeof(mod_amqp_message_t));
 
        switch_event_serialize_json(evt, &amqp_message->pjson);
-       mod_amqp_producer_routing_key(amqp_message->routing_key, evt, profile->format_fields);
+       mod_amqp_producer_routing_key(profile, amqp_message->routing_key, evt, profile->format_fields);
 
        /* Queue the message to be sent by the worker thread, errors are reported only once per circuit breaker interval */
        if (switch_queue_trypush(profile->send_queue, amqp_message) != SWITCH_STATUS_SUCCESS) {
@@ -221,6 +247,11 @@ switch_status_t mod_amqp_producer_create(char *name, switch_xml_t cfg)
                                if ( interval && interval > 0 ) {
                                        profile->send_queue_size = interval;
                                }
+                       } else if (!strncmp(var, "enable_fallback_format_fields", 29)) {
+                               int interval = atoi(val);
+                               if ( interval && interval > 0 ) {
+                                       profile->enable_fallback_format_fields = 1;
+                               }
                        } else if (!strncmp(var, "exchange", 8)) {
                                exchange = switch_core_strdup(profile->pool, "TAP.Events");
                        } else if (!strncmp(var, "format_fields", 13)) {