]> git.ipfire.org Git - thirdparty/freeswitch.git/commitdiff
FS-7060 expanded configuration for amqp command configuration
authorWilliam King <william.king@quentustech.com>
Tue, 7 Apr 2015 23:11:34 +0000 (16:11 -0700)
committerWilliam King <william.king@quentustech.com>
Tue, 7 Apr 2015 23:11:34 +0000 (16:11 -0700)
conf/vanilla/autoload_configs/amqp.conf.xml
src/mod/event_handlers/mod_amqp/mod_amqp.h
src/mod/event_handlers/mod_amqp/mod_amqp_command.c
src/mod/event_handlers/mod_amqp/mod_amqp_producer.c

index 5ae9dc944a8e329868eca8c2408ecbc1248e5ec4..06bbb0d39529b07bc554f2ca318dbe04cadab9ba 100644 (file)
@@ -48,8 +48,8 @@
        </connection>
       </connections>
       <params>
-       <param name="eventExchange" value="TAP.Events"/>
-       <param name="eventExchangetype" value="topic"/>
+       <param name="exchange" value="TAP.Commands"/>
+       <param name="reconnect_interval_ms" value="1000"/>
       </params>
     </profile>
   </commands>
index 808415e7adb8ca0d671bd75a1774561030eec145..07528b234ebce2d18a953c7c8996166ebcd16871 100644 (file)
@@ -114,15 +114,8 @@ typedef struct {
   char *name;
   
   char *exchange;
-  char *exchange_type;
   char *binding_key;
 
-  /* Array to store the possible event subscriptions */
-  char *event_filter;
-  unsigned int number_of_event_filters;
-  switch_event_node_t *event_nodes[SWITCH_EVENT_ALL];
-  switch_event_types_t event_ids[SWITCH_EVENT_ALL];
-
   /* Note: The AMQP channel is not reentrant this MUTEX serializes sending events. */
   mod_amqp_connection_t *conn_root;
   mod_amqp_connection_t *conn_active;
index 235983bae0d57e6f9e1826a9ca6118bd22df3baa..48eec1da1e157b064b113cf8f75fec8c00e36134 100644 (file)
@@ -83,8 +83,10 @@ switch_status_t mod_amqp_command_destroy(mod_amqp_command_profile_t **prof)
 switch_status_t mod_amqp_command_create(char *name, switch_xml_t cfg)
 {
        mod_amqp_command_profile_t *profile = NULL;
+       switch_xml_t params, param, connections, connection;
        switch_threadattr_t *thd_attr = NULL;
        switch_memory_pool_t *pool;
+       char *exchange = NULL;
 
        if (switch_core_new_memory_pool(&pool) != SWITCH_STATUS_SUCCESS) {
                goto err;
@@ -95,6 +97,61 @@ switch_status_t mod_amqp_command_create(char *name, switch_xml_t cfg)
        profile->pool = pool;
        profile->name = switch_core_strdup(profile->pool, name);
        profile->running = 1;
+       profile->reconnect_interval_ms = 1000;
+
+       if ((params = switch_xml_child(cfg, "params")) != NULL) {
+               for (param = switch_xml_child(params, "param"); param; param = param->next) {
+                       char *var = (char *) switch_xml_attr_soft(param, "name");
+                       char *val = (char *) switch_xml_attr_soft(param, "value");
+
+                       if (!var) {
+                               switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_CRIT, "Profile[%s] param missing 'name' attribute\n", profile->name);
+                               continue;
+                       }
+
+                       if (!val) {
+                               switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_CRIT, "Profile[%s] param[%s] missing 'value' attribute\n", profile->name, var);
+                               continue;
+                       }
+
+                       if (!strncmp(var, "reconnect_interval_ms", 21)) {
+                               int interval = atoi(val);
+                               if ( interval && interval > 0 ) {
+                                       profile->reconnect_interval_ms = interval;
+                               }
+                       } else if (!strncmp(var, "exchange", 8)) {
+                               exchange = switch_core_strdup(profile->pool, "TAP.Commands");
+                       }
+               }
+       }
+
+       /* Handle defaults of string types */
+       profile->exchange = exchange ? exchange : switch_core_strdup(profile->pool, "TAP.Commands");
+
+       if ((connections = switch_xml_child(cfg, "connections")) != NULL) {
+               for (connection = switch_xml_child(connections, "connection"); connection; connection = connection->next) {
+                       if ( ! profile->conn_root ) { /* Handle first root node */
+                               if (mod_amqp_connection_create(&(profile->conn_root), connection, profile->pool) != SWITCH_STATUS_SUCCESS) {
+                                       /* Handle connection create failure */
+                                       switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_WARNING, "Profile[%s] failed to create connection\n", profile->name);
+                                       continue;
+                               }
+                               profile->conn_active = profile->conn_root;
+                       } else {
+                               if (mod_amqp_connection_create(&(profile->conn_active->next), connection, profile->pool) != SWITCH_STATUS_SUCCESS) {
+                                       /* Handle connection create failure */
+                                       switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_WARNING, "Profile[%s] failed to create connection\n", profile->name);
+                                       continue;
+                               }
+                               profile->conn_active = profile->conn_active->next;
+                       }
+               }
+       }
+       profile->conn_active = NULL;
+
+       if ( mod_amqp_connection_open(profile->conn_root, &(profile->conn_active), profile->name, profile->custom_attr) != SWITCH_STATUS_SUCCESS) {
+               switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "Profile[%s] was unable to connect to any connection\n", profile->name);
+       }
 
        /* Start the worker threads */
        switch_threadattr_create(&thd_attr, profile->pool);
index 136bbf4b484a420c927d9bdba293b956d793d793..067bb57d7bd6010ad780f24b0b4561ed2e488cb4 100644 (file)
@@ -221,6 +221,8 @@ switch_status_t mod_amqp_producer_create(char *name, switch_xml_t cfg)
                                if ( interval && interval > 0 ) {
                                        profile->send_queue_size = interval;
                                }
+                       } else if (!strncmp(var, "exchange", 8)) {
+                               exchange = switch_core_strdup(profile->pool, "TAP.Events");
                        } else if (!strncmp(var, "format_fields", 13)) {
                                int size = 0;
                                if ((size = mod_amqp_count_chars(val, ',')) >= MAX_ROUTING_KEY_FORMAT_FIELDS) {