]> git.ipfire.org Git - thirdparty/freeswitch.git/commitdiff
FS-7806 FS-7803 #resolve
authorLuis Azedo <luis@2600hz.com>
Mon, 3 Aug 2015 12:25:28 +0000 (13:25 +0100)
committerLuis Azedo <luis@2600hz.com>
Mon, 3 Aug 2015 12:25:28 +0000 (13:25 +0100)
added new properties to amqp configuration
fixed - enable_fallback_format_fields usage, only worked on first event
added amqp_util_encode to fix routing key

src/mod/event_handlers/mod_amqp/mod_amqp.h
src/mod/event_handlers/mod_amqp/mod_amqp_producer.c
src/mod/event_handlers/mod_amqp/mod_amqp_utils.c

index f651a1a89bfedbde89053e2faf5d8ee58b5ab55a..56e7e6372d7f505b09d2fec57db52c63f915c8c9 100644 (file)
 
 /* If you change MAX_ROUTING_KEY_FORMAT_FIELDS then you must change the implementation of makeRoutingKey where it formats the routing key using sprintf */
 #define MAX_ROUTING_KEY_FORMAT_FIELDS 10
+#define MAX_ROUTING_KEY_FORMAT_FALLBACK_FIELDS 5
 #define MAX_AMQP_ROUTING_KEY_LENGTH 255
 
 #define TIME_STATS_TO_AGGREGATE 1024
 #define MOD_AMQP_DEBUG_TIMING 0
+#define MOD_AMQP_DEFAULT_CONTENT_TYPE "text/json"
 
 
 typedef struct {
@@ -74,12 +76,23 @@ typedef struct mod_amqp_connection_s {
   struct mod_amqp_connection_s *next;
 } mod_amqp_connection_t;
 
+typedef struct mod_amqp_keypart_s {
+  char *name[MAX_ROUTING_KEY_FORMAT_FALLBACK_FIELDS];
+  int size;
+} mod_amqp_keypart_t;
+
 typedef struct {
   char *name;
 
   char *exchange;
   char *exchange_type;
-  char *format_fields[MAX_ROUTING_KEY_FORMAT_FIELDS+1];
+  int exchange_durable;
+  int exchange_auto_delete;
+  int delivery_mode;
+  int delivery_timestamp;
+  char *content_type;
+  mod_amqp_keypart_t format_fields[MAX_ROUTING_KEY_FORMAT_FIELDS+1];
+
   
   /* Array to store the possible event subscriptions */
   int event_subscriptions;
@@ -158,11 +171,12 @@ void * SWITCH_THREAD_FUNC mod_amqp_command_thread(switch_thread_t *thread, void
 /* producer */
 void mod_amqp_producer_event_handler(switch_event_t* evt);
 switch_status_t mod_amqp_producer_routing_key(mod_amqp_producer_profile_t *profile, char routingKey[MAX_AMQP_ROUTING_KEY_LENGTH],
-                                             switch_event_t* evt, char* routingKeyEventHeaderNames[]);
+                                             switch_event_t* evt, mod_amqp_keypart_t routingKeyEventHeaderNames[]);
 switch_status_t mod_amqp_producer_destroy(mod_amqp_producer_profile_t **profile);
 switch_status_t mod_amqp_producer_create(char *name, switch_xml_t cfg);
 void * SWITCH_THREAD_FUNC mod_amqp_producer_thread(switch_thread_t *thread, void *data);
 
+char *amqp_util_encode(char *key, char *dest);
 
 #endif /* MOD_AMQP_H */
 
index 0afc281e193d4fb505efb9f6a312af7344863e91..088be1e1ff15de62d72f4c966e40999a77cd6cde 100644 (file)
@@ -46,50 +46,30 @@ void mod_amqp_producer_msg_destroy(mod_amqp_message_t **msg)
 }
 
 switch_status_t mod_amqp_producer_routing_key(mod_amqp_producer_profile_t *profile, char routingKey[MAX_AMQP_ROUTING_KEY_LENGTH],
-                                                                                         switch_event_t* evt, char* routingKeyEventHeaderNames[])
+                                                                                         switch_event_t* evt, mod_amqp_keypart_t routingKeyEventHeaderNames[])
 {
-       int i = 0, idx = 0;
+       int i = 0, idx = 0, x = 0;
+       char keybuffer[MAX_AMQP_ROUTING_KEY_LENGTH];
 
        for (i = 0; i < MAX_ROUTING_KEY_FORMAT_FIELDS && idx < MAX_AMQP_ROUTING_KEY_LENGTH; i++) {
-               if (routingKeyEventHeaderNames[i]) {
+               if (routingKeyEventHeaderNames[i].size) {
                        if (idx) {
                                routingKey[idx++] = '.';
                        }
-                       if ( profile->enable_fallback_format_fields) {
-                               int count = 0, x = 0;
-                               char *argv[10];
-
-                               count = switch_separate_string(routingKeyEventHeaderNames[i], '|', argv, (sizeof(argv) / sizeof(argv[0])));
-                               for( x = 0; x < count; x++) {
-                                       if (argv[x][0] == '#') {
-                                               strncpy(routingKey + idx, argv[x] + 1, MAX_AMQP_ROUTING_KEY_LENGTH - idx);
+                       for( x = 0; x < routingKeyEventHeaderNames[i].size; x++) {
+                               if (routingKeyEventHeaderNames[i].name[x][0] == '#') {
+                                       strncpy(routingKey + idx, routingKeyEventHeaderNames[i].name[x] + 1, MAX_AMQP_ROUTING_KEY_LENGTH - idx);
+                                       break;
+                               } else {
+                                       char *value = switch_event_get_header(evt, routingKeyEventHeaderNames[i].name[x]);
+                                       if (value) {
+                                               amqp_util_encode(value, keybuffer);
+                                               strncpy(routingKey + idx, keybuffer, MAX_AMQP_ROUTING_KEY_LENGTH - idx);
                                                break;
-                                       } else {
-                                               char *value = switch_event_get_header(evt, argv[x]);
-
-                                               if (!value) {
-                                                       continue;
-                                               }
-
-                                               strncpy(routingKey + idx, value, MAX_AMQP_ROUTING_KEY_LENGTH - idx);
-
-                                               /* Replace dots with underscores so that the routing key does not get corrupted */
-                                               switch_replace_char(routingKey + idx, '.', '_', 0);
                                        }
                                }
-                               idx += strlen(routingKey + idx);
-                       } else {
-                               if (routingKeyEventHeaderNames[i][0] == '#') {
-                                       strncpy(routingKey + idx, routingKeyEventHeaderNames[i] + 1, MAX_AMQP_ROUTING_KEY_LENGTH - idx);
-                               } else {
-                                       char *value = switch_event_get_header(evt, routingKeyEventHeaderNames[i]);
-                                       strncpy(routingKey + idx, value ? value : "", MAX_AMQP_ROUTING_KEY_LENGTH - idx);
-
-                                       /* Replace dots with underscores so that the routing key does not get corrupted */
-                                       switch_replace_char(routingKey + idx, '.', '_', 0);
-                               }
-                               idx += strlen(routingKey + idx);
                        }
+                       idx += strlen(routingKey + idx);
                }
        }
        return SWITCH_STATUS_SUCCESS;
@@ -194,8 +174,16 @@ switch_status_t mod_amqp_producer_create(char *name, switch_xml_t cfg)
        char  *argv[SWITCH_EVENT_ALL];
        switch_xml_t params, param, connections, connection;
        switch_threadattr_t *thd_attr = NULL;
-       char *exchange = NULL, *exchange_type = NULL;
+       char *exchange = NULL, *exchange_type = NULL, *content_type = NULL;
+       int exchange_durable = 1; /* durable */
+       int exchange_auto_delete = 0;
+       int delivery_mode = -1;
+       int delivery_timestamp = 1;
        switch_memory_pool_t *pool;
+       char *format_fields[MAX_ROUTING_KEY_FORMAT_FIELDS+1];
+       int format_fields_size = 0;
+
+       memset(format_fields, 0, MAX_ROUTING_KEY_FORMAT_FIELDS + 1);
 
        if (switch_core_new_memory_pool(&pool) != SWITCH_STATUS_SUCCESS) {
                goto err;
@@ -205,12 +193,11 @@ switch_status_t mod_amqp_producer_create(char *name, switch_xml_t cfg)
        profile->pool = pool;
        profile->name = switch_core_strdup(profile->pool, name);
        profile->running = 1;
-       memset(profile->format_fields, 0, MAX_ROUTING_KEY_FORMAT_FIELDS + 1);
+       memset(profile->format_fields, 0, (MAX_ROUTING_KEY_FORMAT_FIELDS + 1) * sizeof(mod_amqp_keypart_t));
        profile->event_ids[0] = SWITCH_EVENT_ALL;
        profile->event_subscriptions = 1;
        profile->conn_root   = NULL;
        profile->conn_active = NULL;
-
        /* Set reasonable defaults which may change if more reasonable defaults are found */
        /* Handle defaults of non string types */
        profile->circuit_breaker_ms = 10000;
@@ -251,24 +238,38 @@ switch_status_t mod_amqp_producer_create(char *name, switch_xml_t cfg)
                                int interval = atoi(val);
                                if ( interval && interval > 0 ) {
                                        profile->enable_fallback_format_fields = 1;
+                                       switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_INFO, "amqp fallback format fields enabled\n");
                                }
-                       } else if (!strncmp(var, "exchange_type", 13)) {
+                       } else if (!strncmp(var, "exchange-type", 13)) {
                                exchange_type = switch_core_strdup(profile->pool, val);
-                       } else if (!strncmp(var, "exchange", 8)) {
+                       } else if (!strncmp(var, "exchange-name", 13)) {
                                exchange = switch_core_strdup(profile->pool, val);
+                       } else if (!strncmp(var, "exchange-durable", 16)) {
+                               exchange_durable = switch_true(val);
+                       } else if (!strncmp(var, "exchange-auto-delete", 20)) {
+                               exchange_auto_delete = switch_true(val);
+                       } else if (!strncmp(var, "delivery-mode", 13)) {
+                               delivery_mode = atoi(val);
+                       } else if (!strncmp(var, "delivery-timestamp", 18)) {
+                               delivery_timestamp = switch_true(val);
+                       } else if (!strncmp(var, "exchange_type", 13)) {
+                               switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "Found exchange_type parameter. please change to exchange-type\n");
+                       } else if (!strncmp(var, "exchange", 8)) {
+                               switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "Found exchange parameter. please change to exchange-name\n");
+                       } else if (!strncmp(var, "content-type", 12)) {
+                               content_type = switch_core_strdup(profile->pool, val);
                        } else if (!strncmp(var, "format_fields", 13)) {
-                               int size = 0;
-                               if ((size = mod_amqp_count_chars(val, ',')) >= MAX_ROUTING_KEY_FORMAT_FIELDS) {
+                               switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_INFO, "amqp format fields : %s\n", val);
+                               if ((format_fields_size = mod_amqp_count_chars(val, ',')) >= MAX_ROUTING_KEY_FORMAT_FIELDS) {
                                        switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_CRIT, "You can have only %d routing fields in the routing key.\n",
                                                                          MAX_ROUTING_KEY_FORMAT_FIELDS);
                                        goto err;
                                }
 
                                /* increment size because the count returned the number of separators, not number of fields */
-                               size++;
-
-                               switch_separate_string(val, ',', profile->format_fields, size);
-                               profile->format_fields[size] = NULL;
+                               format_fields_size++;
+                               switch_separate_string(val, ',', format_fields, MAX_ROUTING_KEY_FORMAT_FIELDS);
+                               format_fields[format_fields_size] = NULL;
                        } else if (!strncmp(var, "event_filter", 12)) {
                                /* Parse new events */
                                profile->event_subscriptions = switch_separate_string(val, ',', argv, (sizeof(argv) / sizeof(argv[0])));
@@ -287,6 +288,28 @@ switch_status_t mod_amqp_producer_create(char *name, switch_xml_t cfg)
        /* Handle defaults of string types */
        profile->exchange = exchange ? exchange : switch_core_strdup(profile->pool, "TAP.Events");
        profile->exchange_type = exchange_type ? exchange_type : switch_core_strdup(profile->pool, "topic");
+       profile->exchange_durable = exchange_durable;
+       profile->exchange_auto_delete = exchange_auto_delete;
+       profile->delivery_mode = delivery_mode;
+       profile->delivery_timestamp = delivery_timestamp;
+       profile->content_type = content_type ? content_type : switch_core_strdup(profile->pool, MOD_AMQP_DEFAULT_CONTENT_TYPE);
+
+
+       for(i = 0; i < format_fields_size; i++) {
+               switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_INFO, "amqp routing key %d : %s\n", i, format_fields[i]);
+               if(profile->enable_fallback_format_fields) {
+                       profile->format_fields[i].size = switch_separate_string(format_fields[i], '|', profile->format_fields[i].name, MAX_ROUTING_KEY_FORMAT_FALLBACK_FIELDS);
+                       if(profile->format_fields[i].size > 1) {
+                               for(arg = 0; arg < profile->format_fields[i].size; arg++) {
+                                       switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_INFO, "amqp routing key %d : sub key %d : %s\n", i, arg, profile->format_fields[i].name[arg]);
+                               }
+                       }
+               } else {
+                       profile->format_fields[i].name[0] = format_fields[i];
+                       profile->format_fields[i].size = 1;
+               }
+       }
+
 
        if ((connections = switch_xml_child(cfg, "connections")) != NULL) {
                for (connection = switch_xml_child(connections, "connection"); connection; connection = connection->next) {
@@ -317,8 +340,8 @@ switch_status_t mod_amqp_producer_create(char *name, switch_xml_t cfg)
        amqp_exchange_declare(profile->conn_active->state, 1,
                                                  amqp_cstring_bytes(profile->exchange),
                                                  amqp_cstring_bytes(profile->exchange_type),
-                                                 0,
-                                                 1,
+                                                 profile->exchange_durable,
+                                                 profile->exchange_auto_delete,
                                                  amqp_empty_table);
        
        if (mod_amqp_log_if_amqp_error(amqp_get_rpc_reply(profile->conn_active->state), "Declaring exchange")) {
@@ -381,18 +404,25 @@ switch_status_t mod_amqp_producer_send(mod_amqp_producer_profile_t *profile, mod
                switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_CRIT, "Profile[%s] not active\n", profile->name);
                return SWITCH_STATUS_NOT_INITALIZED;
        }
+       memset(&props, 0, sizeof(amqp_basic_properties_t));
 
-       props._flags = AMQP_BASIC_CONTENT_TYPE_FLAG | AMQP_BASIC_DELIVERY_MODE_FLAG | AMQP_BASIC_TIMESTAMP_FLAG | AMQP_BASIC_HEADERS_FLAG;
-       props.content_type = amqp_cstring_bytes("text/json");
-       props.delivery_mode = 1; /* non persistent delivery mode */
-       props.timestamp = (uint64_t)time(NULL);
+       props._flags = AMQP_BASIC_CONTENT_TYPE_FLAG;
+       props.content_type = amqp_cstring_bytes(profile->content_type);
 
-       props.headers.num_entries = 1;
-       props.headers.entries = messageTableEntries;
+       if(profile->delivery_mode > 0) {
+               props._flags |= AMQP_BASIC_DELIVERY_MODE_FLAG;
+               props.delivery_mode = profile->delivery_mode;
+       }
 
-       messageTableEntries[0].key = amqp_cstring_bytes("x_Liquid_MessageSentTimeStamp");
-       messageTableEntries[0].value.kind = AMQP_FIELD_KIND_TIMESTAMP;
-       messageTableEntries[0].value.value.u64 = (uint64_t)switch_micro_time_now();
+       if(profile->delivery_timestamp) {
+               props._flags |= AMQP_BASIC_TIMESTAMP_FLAG | AMQP_BASIC_HEADERS_FLAG;
+               props.timestamp = (uint64_t)time(NULL);
+               props.headers.num_entries = 1;
+               props.headers.entries = messageTableEntries;
+               messageTableEntries[0].key = amqp_cstring_bytes("x_Liquid_MessageSentTimeStamp");
+               messageTableEntries[0].value.kind = AMQP_FIELD_KIND_TIMESTAMP;
+               messageTableEntries[0].value.value.u64 = (uint64_t)switch_micro_time_now();
+       }
 
        status = amqp_basic_publish(
                                                                profile->conn_active->state,
index caa22bf21c3bd025e1014d0993a80dff60b6ac9b..5c59cba1f7c90aa742f1a4092b5d36395a443214 100644 (file)
@@ -148,6 +148,42 @@ switch_status_t mod_amqp_do_config(switch_bool_t reload)
 }
 
 
+#define KEY_SAFE(C) ((C >= 'a' && C <= 'z') || \
+                                       (C >= 'A' && C <= 'Z') || \
+                                       (C >= '0' && C <= '9') || \
+                                       (C == '-' || C == '~' || C == '_'))
+
+#define HI4(C) (C>>4)
+#define LO4(C) (C & 0x0F)
+
+#define hexint(C) (C < 10?('0' + C):('A'+ C - 10))
+
+char *amqp_util_encode(char *key, char *dest) {
+       char *p, *end;
+       if ((strlen(key) == 1) && (key[0] == '#' || key[0] == '*')) {
+               *dest++ = key[0];
+               *dest = '\0';
+               return dest;
+    }
+       for (p = key, end = key + strlen(key); p < end; p++) {
+               if (KEY_SAFE(*p)) {
+                       *dest++ = *p;
+               } else if (*p == '.') {
+                       memcpy(dest, "%2E", 3);
+                       dest += 3;
+               } else if (*p == ' ') {
+                       *dest++ = '+';
+               } else {
+                       *dest++ = '%';
+                       sprintf(dest, "%c%c", hexint(HI4(*p)), hexint(LO4(*p)));
+                       dest += 2;
+               }
+       }
+       *dest = '\0';
+       return dest;
+}
+
+
 /* For Emacs:
  * Local Variables:
  * mode:c