]> git.ipfire.org Git - thirdparty/freeswitch.git/commitdiff
FS-7564 #resolve #comment [mod_rayo] Added new algorithms for offering calls to clients.
authorChris Rienzo <chris@rienzo.com>
Tue, 2 Jun 2015 14:48:57 +0000 (10:48 -0400)
committerChris Rienzo <chris@rienzo.com>
Tue, 2 Jun 2015 14:53:54 +0000 (10:53 -0400)
   Two new params added to autoload_configs/rayo.conf.xml
     offer-algorithm
       all: offer to all clients (default and old behavior)
       first: offer to first client, fails over to next client in list
       random: offer to random client, fails over to next random client

     offer-timeout-ms
       0: disable
       > 0 and < 120000: time to wait for reply from offer.  On timeout, next client is offered call.
                         If no other clients available, call is rejected.  5000 is default.

conf/rayo/autoload_configs/rayo.conf.xml
src/mod/event_handlers/mod_rayo/conf/autoload_configs/rayo.conf.xml
src/mod/event_handlers/mod_rayo/mod_rayo.c

index 54aa38867518c565e84b48c73ce6627a7d7561dd..2ebcdbcffdc0cf7f2cf16f8701660915323cc70d 100644 (file)
@@ -8,6 +8,11 @@
                <param name="mixer-conf-profile" value="sla"/>
                <!-- if true, to attribute in offer uses URI instead of name/number -->
                <param name="offer-uri" value="true"/>
+               <!-- how offers are distributed to clients (all, first, random). -->
+               <param name="offer-algorithm" value="all"/>
+               <!-- If offer is not answered after timeout, next client is offered (based on algorithm picked).
+                    If no other clients are available, the call is rejected.  Set to 0 to disable -->
+               <param name="offer-timeout-ms" value="5000"/>
                <!-- if true, channel variables are added to rayo client offer -->
                <param name="add-variables-to-offer" value="false"/>
                 <!-- if true, channel variables are added to offer, ringing, answered, and end events sent to rayo clients -->
index 54aa38867518c565e84b48c73ce6627a7d7561dd..2ebcdbcffdc0cf7f2cf16f8701660915323cc70d 100644 (file)
@@ -8,6 +8,11 @@
                <param name="mixer-conf-profile" value="sla"/>
                <!-- if true, to attribute in offer uses URI instead of name/number -->
                <param name="offer-uri" value="true"/>
+               <!-- how offers are distributed to clients (all, first, random). -->
+               <param name="offer-algorithm" value="all"/>
+               <!-- If offer is not answered after timeout, next client is offered (based on algorithm picked).
+                    If no other clients are available, the call is rejected.  Set to 0 to disable -->
+               <param name="offer-timeout-ms" value="5000"/>
                <!-- if true, channel variables are added to rayo client offer -->
                <param name="add-variables-to-offer" value="false"/>
                 <!-- if true, channel variables are added to offer, ringing, answered, and end events sent to rayo clients -->
index aa213ff168f7deda5d7551bfc9c0c40e5392d675..28d7ae1013ea2e952c15b177e67e2285425d4ec5 100644 (file)
@@ -1,6 +1,6 @@
 /*
  * mod_rayo for FreeSWITCH Modular Media Switching Software Library / Soft-Switch Application
- * Copyright (C) 2013-2014, Grasshopper
+ * Copyright (C) 2013-2015, Grasshopper
  *
  * Version: MPL 1.1
  *
@@ -61,6 +61,10 @@ SWITCH_MODULE_DEFINITION(mod_rayo, mod_rayo_load, mod_rayo_shutdown, mod_rayo_ru
 #define JOINED_CALL 1
 #define JOINED_MIXER 2
 
+#define OFFER_ALL 0
+#define OFFER_FIRST 1
+#define OFFER_RANDOM 2
+
 struct rayo_actor;
 struct rayo_client;
 struct rayo_call;
@@ -123,8 +127,12 @@ struct rayo_call {
        struct rayo_actor base;
        /** Definitive controlling party JID */
        char *dcp_jid;
-       /** Potential controlling parties */
+       /** Potential controlling parties (have sent offers to) */
        switch_hash_t *pcps;
+       /** Available controlling parties (not sent offers to) */
+       switch_hash_t *acps;
+       /** Number of available controlling parties */
+       int num_acps;
        /** current idle start time */
        switch_time_t idle_start_time;
        /** true if fax is in progress */
@@ -223,6 +231,8 @@ static struct {
        int num_message_threads;
        /** message delivery queue */
        switch_queue_t *msg_queue;
+       /** in progress offer queue */
+       switch_queue_t *offer_queue;
        /** shutdown flag */
        int shutdown;
        /** prevents context shutdown until all threads are finished */
@@ -237,6 +247,10 @@ static struct {
        int add_variables_to_offer;
        /** if true, channel variables are added to answered, ringing, end events */
        int add_variables_to_events;
+       /** How to distribute offers to clients */
+       int offer_algorithm;
+       /** How long to wait for offer response before retrying */
+       int offer_timeout_us;
 } globals;
 
 /**
@@ -866,12 +880,13 @@ static void start_deliver_message_thread(switch_memory_pool_t *pool)
 }
 
 /**
- * Stop all message threads
+ * Stop all threads
  */
-static void stop_deliver_message_threads(void)
+static void stop_all_threads(void)
 {
        globals.shutdown = 1;
        switch_queue_interrupt_all(globals.msg_queue);
+       switch_queue_interrupt_all(globals.offer_queue);
        switch_thread_rwlock_wrlock(globals.shutdown_rwlock);
 }
 
@@ -1219,6 +1234,7 @@ done:
                switch_event_destroy(&call->answer_event);
        }
        switch_core_hash_destroy(&call->pcps);
+       switch_core_hash_destroy(&call->acps);
 }
 
 /**
@@ -1404,6 +1420,8 @@ static struct rayo_call *rayo_call_init(struct rayo_call *call, switch_memory_po
                call->rayo_app_started = 0;
                call->answer_event = NULL;
                switch_core_hash_init(&call->pcps);
+               switch_core_hash_init(&call->acps);
+               call->num_acps = 0;
        }
 
        switch_safe_free(call_jid);
@@ -3825,6 +3843,171 @@ static int should_offer_to_client(struct rayo_client *rclient, char **offer_filt
        return 0;
 }
 
+/**
+ * Offered call information
+ */
+struct offered_call_info {
+       /** Call JID */
+       char *call_jid;
+       /** Time this offer expires */
+       switch_time_t offer_time;
+};
+
+/**
+ * Deliver offer message to next available client(s)
+ */
+static int send_offer_to_clients(struct rayo_call *from_call, switch_core_session_t *session)
+{
+       int i = 0;
+       int selection = 0;
+       int sent = 0;
+       switch_hash_index_t *hi = NULL;
+       iks *offer = NULL;
+
+       if (from_call->num_acps <= 0) {
+               return 0;
+       }
+
+       if (globals.offer_algorithm == OFFER_RANDOM) {
+               /* pick client at (not really) random */
+               selection = rand() % from_call->num_acps;
+       } else if (globals.offer_algorithm == OFFER_FIRST) {
+               /* send to first client */
+               selection = 0;
+       } else {
+               /* send to all clients */
+               selection = -1;
+       }
+
+       for (hi = switch_core_hash_first(from_call->acps); hi; hi = switch_core_hash_next(&hi)) {
+               if (i++ == selection || selection == -1) {
+                       const char *to_client_jid = NULL;
+                       const void *key;
+                       void *val;
+
+                       /* get client jid to send to */
+                       switch_core_hash_this(hi, &key, NULL, &val);
+                       to_client_jid = (const char *)key;
+                       switch_assert(to_client_jid);
+
+                       /* send offer to client, remembering jid as PCP */
+                       if (!offer) {
+                               offer = rayo_create_offer(from_call, session);
+                       }
+                       switch_core_hash_insert(from_call->pcps, to_client_jid, "1");
+                       iks_insert_attrib(offer, "to", to_client_jid);
+                       RAYO_SEND_MESSAGE_DUP(from_call, to_client_jid, offer);
+
+                       /* remove client JID from list of available clients */
+                       switch_core_hash_delete(from_call->acps, to_client_jid);
+                       from_call->num_acps--;
+                       sent = 1;
+
+                       if (selection != -1) {
+                               break;
+                       }
+               }
+       }
+       switch_safe_free(hi);
+
+       /* queue offer information */
+       if (globals.offer_timeout_us > 0 && sent) {
+               struct offered_call_info *offered_call;
+               switch_zmalloc(offered_call, sizeof(*offered_call));
+               offered_call->offer_time = switch_micro_time_now();
+               offered_call->call_jid = strdup(RAYO_JID(from_call));
+               if (switch_queue_trypush(globals.offer_queue, offered_call) != SWITCH_STATUS_SUCCESS) {
+                       switch_log_printf(SWITCH_CHANNEL_SESSION_LOG(session), SWITCH_LOG_WARNING, "Failed to queue offered call info!  Offer timeout won't work on this call\n");
+                       switch_safe_free(offered_call->call_jid);
+                       switch_safe_free(offered_call);
+               }
+       }
+
+       if (offer) {
+               iks_delete(offer);
+       }
+
+       return sent;
+}
+
+/**
+ * Thread that monitors for timed out offers
+ * @param thread this thread
+ * @param obj unused
+ * @return NULL
+ */
+static void *SWITCH_THREAD_FUNC offer_timeout_thread(switch_thread_t *thread, void *obj)
+{
+       struct offered_call_info *next_offer;
+       switch_thread_rwlock_rdlock(globals.shutdown_rwlock);
+       switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "New offer timeout thread\n");
+       while (!globals.shutdown) {
+               if (switch_queue_pop(globals.offer_queue, (void *)&next_offer) == SWITCH_STATUS_SUCCESS) {
+                       switch_time_t now = switch_micro_time_now();
+                       switch_time_t offer_timeout = next_offer->offer_time + globals.offer_timeout_us;
+
+                       /* wait for timeout */
+                       while (offer_timeout > now && !globals.shutdown) {
+                               switch_time_t remain = offer_timeout - now;
+                               remain = remain > 500000 ? 500000 : remain;
+                               switch_sleep(remain);
+                               now = switch_micro_time_now();
+                       }
+
+                       /* check if offer was accepted - it is accepted if the call has a DCP (definitive controlling party) */
+                       if (!globals.shutdown) {
+                               struct rayo_call *call = RAYO_CALL_LOCATE(next_offer->call_jid);
+                               if (call) {
+                                       switch_mutex_lock(RAYO_ACTOR(call)->mutex);
+                                       if (zstr(rayo_call_get_dcp_jid(call))) {
+                                               switch_core_session_t *session = switch_core_session_locate(rayo_call_get_uuid(call));
+                                               if (session) {
+                                                       switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "%s, offer timeout\n", RAYO_JID(call));
+                                                       if (!send_offer_to_clients(call, session)) {
+                                                               /* nobody to offer to, end call */
+                                                               switch_channel_t *channel = switch_core_session_get_channel(session);
+                                                               switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "%s, no more clients to offer, ending call\n", RAYO_JID(call));
+                                                               switch_channel_hangup(channel, SWITCH_CAUSE_NORMAL_TEMPORARY_FAILURE);
+                                                       }
+                                                       switch_core_session_rwunlock(session);
+                                               }
+                                       }
+                                       switch_mutex_unlock(RAYO_ACTOR(call)->mutex);
+                                       RAYO_RELEASE(call);
+                               }
+                       }
+
+                       switch_safe_free(next_offer->call_jid);
+                       switch_safe_free(next_offer);
+               }
+       }
+
+       /* clean up queue */
+       while(switch_queue_trypop(globals.offer_queue, (void *)&next_offer) == SWITCH_STATUS_SUCCESS) {
+               switch_safe_free(next_offer->call_jid);
+               switch_safe_free(next_offer);
+       }
+
+       switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "Offer timeout thread finished\n");
+       switch_thread_rwlock_unlock(globals.shutdown_rwlock);
+
+       return NULL;
+}
+
+/**
+ * Create a new offer timeout thread
+ * @param pool to use
+ */
+static void start_offer_timeout_thread(switch_memory_pool_t *pool)
+{
+       switch_thread_t *thread;
+       switch_threadattr_t *thd_attr = NULL;
+       switch_threadattr_create(&thd_attr, pool);
+       switch_threadattr_detach_set(thd_attr, 1);
+       switch_threadattr_stacksize_set(thd_attr, SWITCH_THREAD_STACKSIZE);
+       switch_thread_create(&thread, thd_attr, offer_timeout_thread, NULL, pool);
+}
+
 #define RAYO_USAGE "[client username 1,client username n]"
 /**
  * Offer call and park channel
@@ -3874,7 +4057,6 @@ SWITCH_STANDARD_APP(rayo_app)
        if (!call) {
                /* offer control */
                switch_hash_index_t *hi = NULL;
-               iks *offer = NULL;
                char *clients_to_offer[16] = { 0 };
                int clients_to_offer_count = 0;
 
@@ -3888,7 +4070,6 @@ SWITCH_STANDARD_APP(rayo_app)
 
                switch_channel_set_variable(switch_core_session_get_channel(session), "rayo_call_jid", RAYO_JID(call));
 
-               offer = rayo_create_offer(call, session);
                switch_log_printf(SWITCH_CHANNEL_SESSION_LOG(session), SWITCH_LOG_DEBUG, "Offering call for Rayo 3PCC\n");
 
                if (!zstr(data)) {
@@ -3904,7 +4085,6 @@ SWITCH_STANDARD_APP(rayo_app)
                }
 
                /* Offer call to all (or specified) ONLINE clients */
-               /* TODO load balance offers so first session doesn't always get offer first? */
                switch_mutex_lock(globals.clients_mutex);
                for (hi = switch_core_hash_first(globals.clients_roster); hi; hi = switch_core_hash_next(&hi)) {
                        struct rayo_client *rclient;
@@ -3914,16 +4094,15 @@ SWITCH_STANDARD_APP(rayo_app)
                        rclient = (struct rayo_client *)val;
                        switch_assert(rclient);
 
-                       /* is session available to take call? */
+                       /* find clients available to take calls */
                        if (should_offer_to_client(rclient, clients_to_offer, clients_to_offer_count)) {
-                               ok = 1;
-                               switch_core_hash_insert(call->pcps, RAYO_JID(rclient), "1");
-                               iks_insert_attrib(offer, "to", RAYO_JID(rclient));
-                               RAYO_SEND_MESSAGE_DUP(call, RAYO_JID(rclient), offer);
+                               switch_core_hash_insert(call->acps, RAYO_JID(rclient), "1");
+                               call->num_acps++;
                        }
                }
+               ok = send_offer_to_clients(call, session);
+
                switch_mutex_unlock(globals.clients_mutex);
-               iks_delete(offer);
 
                /* nobody to offer to */
                if (!ok) {
@@ -4158,6 +4337,8 @@ static switch_status_t do_config(switch_memory_pool_t *pool, const char *config_
        globals.pause_when_offline = 0;
        globals.add_variables_to_offer = 0;
        globals.add_variables_to_events = 0;
+       globals.offer_timeout_us = 5000000;
+       globals.offer_algorithm = OFFER_ALL;
 
        /* get params */
        {
@@ -4203,6 +4384,25 @@ static switch_status_t do_config(switch_memory_pool_t *pool, const char *config_
                                                globals.add_variables_to_offer = 1;
                                                globals.add_variables_to_events = 1;
                                        }
+                               } else if (!strcasecmp(var, "offer-timeout-ms")) {
+                                       int offer_timeout_ms = 0;
+                                       if (switch_is_number(val) && (offer_timeout_ms = atoi(val)) >= 0  && offer_timeout_ms < 120000) {
+                                               globals.offer_timeout_us = offer_timeout_ms * 1000;
+                                       } else {
+                                               switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_WARNING, "Ignoring invalid value for offer-timeout-ms \"%s\"\n", val);
+                                       }
+                               } else if (!strcasecmp(var, "offer-algorithm")) {
+                                       if (zstr(val)) {
+                                               switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_WARNING, "No value for offer-algorithm\n");
+                                       } else if (!strcasecmp(val, "all")) {
+                                               globals.offer_algorithm = OFFER_ALL;
+                                       } else if (!strcasecmp(val, "first")) {
+                                               globals.offer_algorithm = OFFER_FIRST;
+                                       } else if (!strcasecmp(val, "random")) {
+                                               globals.offer_algorithm = OFFER_RANDOM;
+                                       } else {
+                                               switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_WARNING, "Ignoring invalid value for offer-algorithm \"%s\"\n", val);
+                                       }
                                } else {
                                        switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_WARNING, "Unsupported param: %s\n", var);
                                }
@@ -4881,6 +5081,7 @@ SWITCH_MODULE_LOAD_FUNCTION(mod_rayo_load)
        switch_core_hash_init(&globals.cmd_aliases);
        switch_thread_rwlock_create(&globals.shutdown_rwlock, pool);
        switch_queue_create(&globals.msg_queue, 25000, pool);
+       switch_queue_create(&globals.offer_queue, 25000, pool);
        globals.offline_logged = 1;
 
        /* server commands */
@@ -4930,6 +5131,7 @@ SWITCH_MODULE_LOAD_FUNCTION(mod_rayo_load)
                        start_deliver_message_thread(pool);
                }
        }
+       start_offer_timeout_thread(pool);
 
        /* create admin client */
        globals.console = rayo_console_client_create();
@@ -4979,9 +5181,9 @@ SWITCH_MODULE_SHUTDOWN_FUNCTION(mod_rayo_shutdown)
        switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "Waiting for XMPP threads to stop\n");
        xmpp_stream_context_destroy(globals.xmpp_context);
 
-       /* stop message threads */
-       switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "Waiting for message threads to stop\n");
-       stop_deliver_message_threads();
+       /* stop threads */
+       switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "Waiting for message and offer timeout threads to stop\n");
+       stop_all_threads();
 
        if (globals.console) {
                RAYO_RELEASE(globals.console);