]> git.ipfire.org Git - thirdparty/freeswitch.git/commitdiff
FS-9052 [mod_hiredis] add connection pooling, improve dropped connection resiliency...
authorChris Rienzo <chris.rienzo@citrix.com>
Mon, 11 Apr 2016 15:54:38 +0000 (11:54 -0400)
committerChris Rienzo <chris.rienzo@citrix.com>
Mon, 11 Apr 2016 15:54:46 +0000 (11:54 -0400)
configure.ac
src/mod/applications/mod_hiredis/hiredis_profile.c
src/mod/applications/mod_hiredis/hiredis_utils.c
src/mod/applications/mod_hiredis/mod_hiredis.c
src/mod/applications/mod_hiredis/mod_hiredis.h

index 8746b0c75f6e8728bfc9addfb36554bc953c67af..cac1f02b8cde0acd6ec770528ac6f3f1ae572d48 100644 (file)
@@ -1406,7 +1406,7 @@ PKG_CHECK_MODULES([SMPP34], [libsmpp34 >= 1.10],[
   AM_CONDITIONAL([HAVE_SMPP34],[true])],[
   AC_MSG_RESULT([no]); AM_CONDITIONAL([HAVE_SMPP34],[false])])
 
-PKG_CHECK_MODULES([HIREDIS], [hiredis >= 0.11.0],[
+PKG_CHECK_MODULES([HIREDIS], [hiredis >= 0.10.0],[
   AM_CONDITIONAL([HAVE_HIREDIS],[true])],[
   AC_MSG_RESULT([no]); AM_CONDITIONAL([HAVE_HIREDIS],[false])])
 
index ab2d152e0c53d522c64cf16ab5699da7ad49210f..ce63d881d68b2f627dd27c3ad53be66aa92e4c3c 100644 (file)
@@ -1,6 +1,6 @@
 /*
 * FreeSWITCH Modular Media Switching Software Library / Soft-Switch Application
-* Copyright (C) 2005-2015, Anthony Minessale II <anthm@freeswitch.org>
+* Copyright (C) 2005-2016, Anthony Minessale II <anthm@freeswitch.org>
 *
 * Version: MPL 1.1
 *
 * Contributor(s):
 *
 * William King <william.king@quentustech.com>
+* Christopher Rienzo <chris.rienzo@citrix.com>
 *
 * mod_hiredis.c -- redis client built using the C client library hiredis
 *
 */
 
 #include <mod_hiredis.h>
-       
+
+/* reconnect to redis server */
+static switch_status_t hiredis_context_reconnect(hiredis_context_t *context)
+{
+       redisFree(context->context);
+       context->context = redisConnectWithTimeout(context->connection->host, context->connection->port, context->connection->timeout);
+       if ( context->context && !context->context->err ) {
+               return SWITCH_STATUS_SUCCESS;
+       }
+       return SWITCH_STATUS_FALSE;
+}
+
+/* Return a context back to the pool */
+static void hiredis_context_release(hiredis_context_t *context)
+{
+       if (switch_queue_push(context->connection->context_pool, context) != SWITCH_STATUS_SUCCESS) {
+               switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_CRIT, "hiredis: failed to release back to pool [%s, %d]\n", context->connection->host, context->connection->port);
+       } else {
+               switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "hiredis: release back to pool [%s, %d]\n", context->connection->host, context->connection->port);
+       }
+}
+
+/* Grab a context from the pool, reconnect/connect as needed */
+static hiredis_context_t *hiredis_connection_get_context(hiredis_connection_t *conn)
+{
+       void *val = NULL;
+
+       switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "hiredis: waiting for [%s, %d]\n", conn->host, conn->port);
+       if ( switch_queue_pop_timeout(conn->context_pool, &val, conn->timeout_us ) == SWITCH_STATUS_SUCCESS ) {
+               hiredis_context_t *context = (hiredis_context_t *)val;
+               if ( !context->context ) {
+                       switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_INFO, "hiredis: attempting[%s, %d]\n", conn->host, conn->port);
+                       context->context = redisConnectWithTimeout(conn->host, conn->port, conn->timeout);
+                       if ( context->context && !context->context->err ) {
+                               switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "hiredis: connection success[%s, %d]\n", conn->host, conn->port);
+                               return context;
+                       } else {
+                               switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "hiredis: connection error[%s, %d] (%s)\n", conn->host, conn->port, context->context->errstr);
+                               hiredis_context_release(context);
+                               return NULL;
+                       }
+               } else if ( context->context->err ) {
+                       switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_INFO, "hiredis: reconnecting[%s, %d]\n", conn->host, conn->port);
+                       if (hiredis_context_reconnect(context) == SWITCH_STATUS_SUCCESS) {
+                               switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "hiredis: reconnection success[%s, %d]\n", conn->host, conn->port);
+                               return context;
+                       } else {
+                               switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "hiredis: reconnection error[%s, %d] (%s)\n", conn->host, conn->port, context->context->errstr);
+                               hiredis_context_release(context);
+                               return NULL;
+                       }
+               } else {
+                       switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_INFO, "hiredis: recycled from pool[%s, %d]\n", conn->host, conn->port);
+                       return context;
+               }
+       } else {
+               switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "hiredis: timed out waiting for [%s, %d]\n", conn->host, conn->port);
+       }
+
+       return NULL;
+}
+
 switch_status_t hiredis_profile_create(hiredis_profile_t **new_profile, char *name, uint8_t port)
 {
        hiredis_profile_t *profile = NULL;
        switch_memory_pool_t *pool = NULL;
        
-       switch_core_new_memory_pool(&pool);
+       switch_core_new_memory_pool(&pool);
 
        profile = switch_core_alloc(pool, sizeof(hiredis_profile_t));
 
        profile->pool = pool;
        profile->name = name ? switch_core_strdup(profile->pool, name) : "default";
-       profile->conn = NULL;
        profile->conn_head = NULL;
 
        switch_core_hash_insert(mod_hiredis_globals.profiles, name, (void *) profile);
 
        *new_profile = profile;
-       
+
        return SWITCH_STATUS_SUCCESS;
 }
 
@@ -69,7 +130,7 @@ switch_status_t hiredis_profile_destroy(hiredis_profile_t **old_profile)
        return SWITCH_STATUS_SUCCESS;
 }
 
-switch_status_t hiredis_profile_connection_add(hiredis_profile_t *profile, char *host, char *password, uint32_t port, uint32_t timeout_ms)
+switch_status_t hiredis_profile_connection_add(hiredis_profile_t *profile, char *host, char *password, uint32_t port, uint32_t timeout_ms, uint32_t max_contexts)
 {
        hiredis_connection_t *connection = NULL, *new_conn = NULL;
 
@@ -77,21 +138,39 @@ switch_status_t hiredis_profile_connection_add(hiredis_profile_t *profile, char
        new_conn->host = host ? switch_core_strdup(profile->pool, host) : "localhost";
        new_conn->password = password ? switch_core_strdup(profile->pool, password) : NULL;
        new_conn->port = port ? port : 6379;
+       new_conn->pool = profile->pool;
 
-       switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_CRIT, "hiredis: adding conn[%d]\n", new_conn->port);
+       /* create fixed size context pool */
+       max_contexts = max_contexts > 0 ? max_contexts : 3;
+       if (switch_queue_create(&new_conn->context_pool, max_contexts, new_conn->pool) != SWITCH_STATUS_SUCCESS) {
+               switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_CRIT, "hiredis: failed to allocate context pool\n");
+               return SWITCH_STATUS_GENERR;
+       } else {
+               int i = 0;
+               for (i = 0; i < max_contexts; i++) {
+                       hiredis_context_t *new_context = switch_core_alloc(new_conn->pool, sizeof(hiredis_context_t));
+                       new_context->connection = new_conn;
+                       new_context->context = NULL;
+                       switch_queue_push(new_conn->context_pool, new_context);
+               }
+       }
 
        if ( timeout_ms ) {
-               new_conn->timeout.tv_sec = 0;
-               new_conn->timeout.tv_usec = timeout_ms * 1000;
+               new_conn->timeout_us = timeout_ms * 1000;
+               new_conn->timeout.tv_sec = timeout_ms / 1000;
+               new_conn->timeout.tv_usec = (timeout_ms % 1000) * 1000;
        } else {
+               new_conn->timeout_us = 500 * 1000;
                new_conn->timeout.tv_sec = 0;
                new_conn->timeout.tv_usec = 500 * 1000;
        }
-       
+
+       switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_NOTICE, "hiredis: adding conn[%s,%d], pool size = %d\n", new_conn->host, new_conn->port, max_contexts);
+
        if ( profile->conn_head != NULL ){
                /* Adding 'another' connection */
                connection = profile->conn_head;
-               while ( connection->next != NULL ){
+               while ( connection->next != NULL ) {
                        connection = connection->next;
                }
                connection->next = new_conn;
@@ -102,72 +181,88 @@ switch_status_t hiredis_profile_connection_add(hiredis_profile_t *profile, char
        return SWITCH_STATUS_SUCCESS;
 }
 
-switch_status_t hiredis_profile_reconnect(hiredis_profile_t *profile)
+static hiredis_context_t *hiredis_profile_get_context(hiredis_profile_t *profile, hiredis_connection_t *initial_conn)
 {
-       hiredis_connection_t *conn = profile->conn_head;
-       profile->conn = NULL;
+       hiredis_connection_t *conn = initial_conn ? initial_conn : profile->conn_head;
+       hiredis_context_t *context;
 
-       /* TODO: Needs thorough expansion to handle all disconnection scenarios */
-       
        while ( conn ) {
-               switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "hiredis: attempting[%s, %d]\n", conn->host, conn->port);
-               conn->context = redisConnectWithTimeout(conn->host, conn->port, conn->timeout);
-       
-               if ( conn->context && conn->context->err) {
-                       switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "hiredis: connection error[%s]\n", conn->context->errstr);
-                       conn = conn->next;
-                       continue;
+               context = hiredis_connection_get_context(conn);
+               if (context) {
+                       /* successful redis connection */
+                       return context;
                }
-
-               switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_INFO, "hiredis: connection success[%s]\n", conn->host);
-                       
-               /* successful redis connection */
-               profile->conn = conn;
-               return SWITCH_STATUS_SUCCESS;
+               conn = conn->next;
        }
 
-       switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "hiredis: unable to reconnect\n");
-       return SWITCH_STATUS_GENERR;
+       switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "hiredis: unable to connect\n");
+       return NULL;
 }
 
-switch_status_t hiredis_profile_execute_sync(hiredis_profile_t *profile, const char *data, char **resp)
+static switch_status_t hiredis_context_execute_sync(hiredis_context_t *context, const char *data, char **resp)
 {
-       char *str = NULL;
-       redisReply *response = NULL;
-
-       /* Check connection */
-       if ( !profile->conn && hiredis_profile_reconnect(profile) != SWITCH_STATUS_SUCCESS ) {
-               *resp = strdup("hiredis profile unable to establish connection");
-               return SWITCH_STATUS_GENERR;
-       }
-       
-       response = redisCommand(profile->conn->context, data);
-
+       redisReply *response = redisCommand(context->context, data);
        if ( !response ) {
-               switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "hiredis: empty response received\n");
+               *resp = NULL;
                return SWITCH_STATUS_GENERR;
        }
-       
+
        switch(response->type) {
        case REDIS_REPLY_STATUS: /* fallthrough */
        case REDIS_REPLY_STRING:
-               str = strdup(response->str);
+               *resp = strdup(response->str);
                break;
        case REDIS_REPLY_INTEGER:
-               str = switch_mprintf("%lld", response->integer);
+               *resp = switch_mprintf("%lld", response->integer);
                break;
        default:
                switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "hiredis: response error[%s][%d]\n", response->str, response->type);
                freeReplyObject(response);
+               *resp = NULL;
                return SWITCH_STATUS_GENERR;
        }
 
        freeReplyObject(response);
-
-       *resp = str;
        return SWITCH_STATUS_SUCCESS;
 }
 
+switch_status_t hiredis_profile_execute_sync(hiredis_profile_t *profile, const char *data, char **resp)
+{
+       hiredis_context_t *context = NULL;
+       int reconnected = 0;
+
+       context = hiredis_profile_get_context(profile, NULL);
+       while (context) {
+               if (hiredis_context_execute_sync(context, data, resp) == SWITCH_STATUS_SUCCESS) {
+                       /* got result */
+                       hiredis_context_release(context);
+                       return SWITCH_STATUS_SUCCESS;
+               } else if (context->context->err) {
+                       /* have a bad connection, try a single reconnect attempt before moving on to alternate connection */
+                       if (reconnected || hiredis_context_reconnect(context) != SWITCH_STATUS_SUCCESS) {
+                               /* try alternate connection */
+                               hiredis_context_t *new_context = hiredis_profile_get_context(profile, context->connection);
+                               hiredis_context_release(context);
+                               context = new_context;
+                               if (context) {
+                                       switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "hiredis: got alternate connection to [%s, %d]\n", context->connection->host, context->connection->port);
+                               } else {
+                                       switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "hiredis: no more alternate connections to try\n");
+                               }
+                               reconnected = 0;
+                       } else {
+                               switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "hiredis: reconnection success[%s, %d]\n", context->connection->host, context->connection->port);
+                               reconnected = 1;
+                       }
+               } else {
+                       /* no problem with context, so don't retry */
+                       hiredis_context_release(context);
+                       return SWITCH_STATUS_GENERR;
+               }
+       }
+       return SWITCH_STATUS_GENERR;
+}
+
 
 /* For Emacs:
  * Local Variables:
index 8b552af569bc41294251ff8230f929d8ad719d7f..ad6d625e419d424fba3273c8b08d770d7b65cbe7 100644 (file)
@@ -24,6 +24,7 @@
 * Contributor(s):
 *
 * William King <william.king@quentustech.com>
+* Christopher Rienzo <chris.rienzo@citrix.com>
 *
 * mod_hiredis.c -- redis client built using the C client library hiredis
 *
@@ -67,7 +68,7 @@ switch_status_t mod_hiredis_do_config()
                        if ( (connections = switch_xml_child(profile, "connections")) != NULL) {
                                for (connection = switch_xml_child(connections, "connection"); connection; connection = connection->next) {             
                                        char *host = NULL, *password = NULL;
-                                       uint32_t port = 0, timeout_ms = 0;
+                                       uint32_t port = 0, timeout_ms = 0, max_connections = 0;
                                        
                                        for (param = switch_xml_child(connection, "param"); param; param = param->next) {
                                                char *var = (char *) switch_xml_attr_soft(param, "name");
@@ -75,15 +76,17 @@ switch_status_t mod_hiredis_do_config()
                                                        host = (char *) switch_xml_attr_soft(param, "value");
                                                } else if ( !strncmp(var, "port", 4) ) {
                                                        port = atoi(switch_xml_attr_soft(param, "value"));
-                                                       switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_CRIT, "hiredis: adding conn[%u == %s]\n", port, switch_xml_attr_soft(param, "value"));
+                                                       switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_NOTICE, "hiredis: adding conn[%u == %s]\n", port, switch_xml_attr_soft(param, "value"));
                                                } else if ( !strncmp(var, "timeout_ms", 10) ) {
                                                        timeout_ms = atoi(switch_xml_attr_soft(param, "value"));
                                                } else if ( !strncmp(var, "password", 8) ) {
                                                        password = (char *) switch_xml_attr_soft(param, "value");
+                                               } else if ( !strncmp(var, "max-connections", 15) ) {
+                                                       max_connections = atoi(switch_xml_attr_soft(param, "value"));
                                                }
                                        }
 
-                                       if ( hiredis_profile_connection_add(new_profile, host, password, port, timeout_ms) == SWITCH_STATUS_SUCCESS) {
+                                       if ( hiredis_profile_connection_add(new_profile, host, password, port, timeout_ms, max_connections) == SWITCH_STATUS_SUCCESS) {
                                                switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "Created profile[%s]\n", name);
                                        } else {
                                                switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "Failed to create profile[%s]\n", name);
index d3b5ab7e1117b080dd228cdd112657c9e7b44b21..efe0a034435eddc6e6762d32e323c41cc1d97ac0 100644 (file)
@@ -103,7 +103,7 @@ SWITCH_STANDARD_API(raw_api)
        }
 
        if ( hiredis_profile_execute_sync(profile, data, &response) != SWITCH_STATUS_SUCCESS) {
-               switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "hiredis: profile[%s] error executing [%s] reason:[%s]\n", input, data, response);
+               switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "hiredis: profile[%s] error executing [%s] reason:[%s]\n", input, data, response ? response : "");
                switch_goto_status(SWITCH_STATUS_GENERR, done);
        }
 
index a73a5086881f574de3ae6c0465f345bb168b96d7..2403564b062b53ef9fe1636919f03e16b56ef042 100644 (file)
@@ -13,12 +13,19 @@ typedef struct mod_hiredis_global_s {
 
 extern mod_hiredis_global_t mod_hiredis_globals;
 
+typedef struct mod_hiredis_context_s {
+  struct hiredis_connection_s *connection;
+  redisContext *context;
+} hiredis_context_t;
+
 typedef struct hiredis_connection_s {
   char *host;
   char *password;
   uint32_t port;
-  redisContext *context;
+  switch_interval_time_t timeout_us;
   struct timeval timeout;
+  switch_memory_pool_t *pool;
+  switch_queue_t *context_pool;
 
   struct hiredis_connection_s *next;
 } hiredis_connection_t;
@@ -28,7 +35,6 @@ typedef struct hiredis_profile_s {
   char *name;
   int debug;
 
-  hiredis_connection_t *conn;
   hiredis_connection_t *conn_head;
 } hiredis_profile_t;
 
@@ -44,7 +50,7 @@ typedef struct hiredis_limit_pvt_s {
 switch_status_t mod_hiredis_do_config();
 switch_status_t hiredis_profile_create(hiredis_profile_t **new_profile, char *name, uint8_t port);
 switch_status_t hiredis_profile_destroy(hiredis_profile_t **old_profile);
-switch_status_t hiredis_profile_connection_add(hiredis_profile_t *profile, char *host, char *password, uint32_t port, uint32_t timeout_ms);
+switch_status_t hiredis_profile_connection_add(hiredis_profile_t *profile, char *host, char *password, uint32_t port, uint32_t timeout_ms, uint32_t max_connections);
 
 switch_status_t hiredis_profile_execute_sync(hiredis_profile_t *profile, const char *data, char **response);