]> git.ipfire.org Git - thirdparty/freeswitch.git/commitdiff
fix some contention in rtmp
authorAnthony Minessale <anthm@freeswitch.org>
Sun, 17 Nov 2013 01:51:33 +0000 (06:51 +0500)
committerAnthony Minessale <anthm@freeswitch.org>
Sun, 17 Nov 2013 01:51:33 +0000 (06:51 +0500)
src/mod/endpoints/mod_rtmp/mod_rtmp.c
src/mod/endpoints/mod_rtmp/mod_rtmp.h

index e25f38b861803945ce31ea41868e7bd508862387..4e6e10db8f82cb1b8c64bcb6504a104d4de3feba 100644 (file)
@@ -40,7 +40,8 @@
 
 SWITCH_MODULE_LOAD_FUNCTION(mod_rtmp_load);
 SWITCH_MODULE_SHUTDOWN_FUNCTION(mod_rtmp_shutdown);
-SWITCH_MODULE_DEFINITION(mod_rtmp, mod_rtmp_load, mod_rtmp_shutdown, NULL);
+SWITCH_MODULE_RUNTIME_FUNCTION(mod_rtmp_runtime);
+SWITCH_MODULE_DEFINITION(mod_rtmp, mod_rtmp_load, mod_rtmp_shutdown, mod_rtmp_runtime);
 
 static switch_status_t config_profile(rtmp_profile_t *profile, switch_bool_t reload);
 static switch_xml_config_item_t *get_instructions(rtmp_profile_t *profile);
@@ -732,7 +733,7 @@ rtmp_session_t *rtmp_session_locate(const char *uuid)
 {
        rtmp_session_t *rsession = switch_core_hash_find_rdlock(rtmp_globals.session_hash, uuid, rtmp_globals.session_rwlock);
        
-       if (!rsession || rsession->state == RS_DESTROY) {
+       if (!rsession || rsession->state >= RS_DESTROY) {
                return NULL;
        }
        
@@ -812,53 +813,95 @@ switch_status_t rtmp_session_request(rtmp_profile_t *profile, rtmp_session_t **n
        return SWITCH_STATUS_SUCCESS;
 }
 
-switch_status_t rtmp_session_destroy(rtmp_session_t **rsession) 
+static void rtmp_garbage_colletor(void)
 {
        switch_hash_index_t *hi;
-       switch_event_t *event;
-       
-       if (switch_event_create_subclass(&event, SWITCH_EVENT_CUSTOM, RTMP_EVENT_DISCONNECT) == SWITCH_STATUS_SUCCESS) {
-               rtmp_event_fill(*rsession, event);
-               switch_event_fire(&event);
+
+       switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG1, "RTMP Garbage Collection\n");
+
+
+       switch_thread_rwlock_wrlock(rtmp_globals.session_rwlock);
+
+ top:
+
+       for (hi = switch_hash_first(NULL, rtmp_globals.session_hash); hi; hi = switch_hash_next(hi)) {
+               void *val;      
+               const void *key;
+               switch_ssize_t keylen;
+               rtmp_session_t *rsession;
+
+               switch_hash_this(hi, &key, &keylen, &val);
+               rsession = (rtmp_session_t *) val;
+
+               if (rsession->state == RS_DESTROY) {
+                       if (rtmp_real_session_destroy(&rsession) == SWITCH_STATUS_SUCCESS) {
+                               goto top;
+                       }
+               }
        }
        
-       switch_core_hash_delete_wrlock(rtmp_globals.session_hash, (*rsession)->uuid, rtmp_globals.session_rwlock);
-       switch_core_hash_delete_wrlock((*rsession)->profile->session_hash, (*rsession)->uuid, (*rsession)->profile->session_rwlock);
-       rtmp_clear_registration(*rsession, NULL, NULL);
-       switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_NOTICE, "RTMP session ended [%s]\n", (*rsession)->uuid);
-       
-       (*rsession)->state = RS_DESTROY;
+       switch_thread_rwlock_unlock(rtmp_globals.session_rwlock);
+}
+
+switch_status_t rtmp_session_destroy(rtmp_session_t **rsession) 
+{
+       switch_status_t status = SWITCH_STATUS_FALSE;
+
+       switch_mutex_lock(rtmp_globals.mutex);
+       if (rsession && *rsession) {
+               (*rsession)->state = RS_DESTROY;
+               *rsession = NULL;
+               status = SWITCH_STATUS_SUCCESS;
+       }
+       switch_mutex_unlock(rtmp_globals.mutex);
+
+       return status;
+}
+
+switch_status_t rtmp_real_session_destroy(rtmp_session_t **rsession) 
+{
+       switch_hash_index_t *hi;
+       switch_event_t *event;
+       int sess = 0;
 
        switch_thread_rwlock_rdlock((*rsession)->session_rwlock);
        for (hi = switch_hash_first(NULL, (*rsession)->session_hash); hi; hi = switch_hash_next(hi)) {
                void *val;      
                const void *key;
                switch_ssize_t keylen;
-               rtmp_private_t *tech_pvt;
                switch_channel_t *channel;
                switch_core_session_t *session;
+
                switch_hash_this(hi, &key, &keylen, &val);              
-               tech_pvt = (rtmp_private_t *)val;
                
-               /* At this point we don't know if the session still exists, so request a fresh pointer to it from the core. */
-               if ( (session = switch_core_session_locate((char *)key)) != NULL ) {
-                       /* 
-                        * This is here so that if the FS session still exists and has the FS session write(or read) lock, then we won't destroy the rsession 
-                        * until the FS session is finished with it. But if the rsession is able to get the FS session
-                        * write lock, before the FS session is hungup, then once the FS session does get the write lock
-                        * the rsession pointer will be null, and the FS session will never try and touch the already destroyed rsession.
-                        */
-
+               /* If there are any sessions attached, abort the destroy operation */
+               if ((session = switch_core_session_locate((char *)key)) != NULL ) {
                        channel = switch_core_session_get_channel(session);
-                       tech_pvt = switch_core_session_get_private(session);
-                       if ( tech_pvt && tech_pvt->rtmp_session ) {
-                               tech_pvt->rtmp_session = NULL;
-                       }
                        switch_channel_hangup(channel, SWITCH_CAUSE_DESTINATION_OUT_OF_ORDER);
                        switch_core_session_rwunlock(session);
+                       sess++;
                }
        }
        switch_thread_rwlock_unlock((*rsession)->session_rwlock);
+
+       if (sess) {
+               switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG1, "RTMP session [%s] %p still busy.\n", (*rsession)->uuid, (void *) *rsession);
+               return SWITCH_STATUS_FALSE;
+       }
+       
+       switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG1, "RTMP session [%s] %p will be destroyed.\n", (*rsession)->uuid, (void *) *rsession);
+       
+       
+       if (switch_event_create_subclass(&event, SWITCH_EVENT_CUSTOM, RTMP_EVENT_DISCONNECT) == SWITCH_STATUS_SUCCESS) {
+               rtmp_event_fill(*rsession, event);
+               switch_event_fire(&event);
+       }
+       
+       switch_core_hash_delete(rtmp_globals.session_hash, (*rsession)->uuid);
+       switch_core_hash_delete_wrlock((*rsession)->profile->session_hash, (*rsession)->uuid, (*rsession)->profile->session_rwlock);
+       rtmp_clear_registration(*rsession, NULL, NULL);
+       switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_NOTICE, "RTMP session ended [%s]\n", (*rsession)->uuid);
+   
        
        switch_mutex_lock((*rsession)->profile->mutex);
        if ( (*rsession)->profile->calls < 1 ) {
@@ -1137,7 +1180,7 @@ static void rtmp_clear_reg_auth(rtmp_session_t *rsession, const char *auth, cons
        switch_thread_rwlock_wrlock(rsession->profile->reg_rwlock);
        if ((reg = switch_core_hash_find(rsession->profile->reg_hash, auth))) {
                for (; reg; reg = reg->next) {
-                       if (!strcmp(reg->uuid, rsession->uuid) && (zstr(nickname) || !strcmp(reg->nickname, nickname))) {
+                       if (!zstr(reg->uuid) && !strcmp(reg->uuid, rsession->uuid) && (zstr(nickname) || !strcmp(reg->nickname, nickname))) {
                                switch_event_t *event;
                                if (prev) {
                                        prev->next = reg->next;
@@ -1495,6 +1538,20 @@ done:
        return SWITCH_STATUS_SUCCESS;
 }
 
+static const char *state2name(int state)
+{
+ switch(state) {
+ case RS_HANDSHAKE:
+        return "HANDSHAKE";
+ case RS_HANDSHAKE2:
+        return "HANDSHAKE2";
+ case RS_ESTABLISHED:
+        return "ESTABLISHED";
+ default:
+        return "DESTROY (PENDING)";
+ }
+}
+
 #define RTMP_FUNCTION_SYNTAX "profile [profilename] [start | stop | rescan | restart]\nstatus profile [profilename]\nstatus profile [profilename] [reg | sessions]\nsession [session_id] [kill | login [user@domain] | logout [user@domain]]"
 SWITCH_STANDARD_API(rtmp_function)
 {
@@ -1571,7 +1628,7 @@ SWITCH_STANDARD_API(rtmp_function)
                                {
                                        switch_hash_index_t *hi;
                                        stream->write_function(stream, "\nSessions:\n");
-                                       stream->write_function(stream, "uuid,address,user,domain,flashVer\n");
+                                       stream->write_function(stream, "uuid,address,user,domain,flashVer,state\n");
                                        switch_thread_rwlock_rdlock(profile->session_rwlock);
                                        for (hi = switch_hash_first(NULL, profile->session_hash); hi; hi = switch_hash_next(hi)) {
                                                void *val;      
@@ -1581,11 +1638,11 @@ SWITCH_STANDARD_API(rtmp_function)
                                                switch_hash_this(hi, &key, &keylen, &val);
                                                                                        
                                                item = (rtmp_session_t *)val;
-                                               stream->write_function(stream, "%s,%s:%d,%s,%s,%s\n", 
-                                                       item->uuid, item->remote_address, item->remote_port,
-                                                       item->account ? item->account->user : NULL,
-                                                       item->account ? item->account->domain : NULL,
-                                                       item->flashVer);
+                                               stream->write_function(stream, "%s,%s:%d,%s,%s,%s,%s\n", 
+                                                                                          item->uuid, item->remote_address, item->remote_port,
+                                                                                          item->account ? item->account->user : NULL,
+                                                                                          item->account ? item->account->domain : NULL,
+                                                                                          item->flashVer, state2name(item->state));
                                                
                                        }
                                        switch_thread_rwlock_unlock(profile->session_rwlock);
@@ -1862,6 +1919,8 @@ SWITCH_MODULE_LOAD_FUNCTION(mod_rtmp_load)
                        switch_xml_free(xml);   
                }
        }
+
+       rtmp_globals.running = 1;
        
        return SWITCH_STATUS_SUCCESS;
 }
@@ -1892,9 +1951,22 @@ SWITCH_MODULE_SHUTDOWN_FUNCTION(mod_rtmp_shutdown)
        switch_core_hash_destroy(&rtmp_globals.session_hash);
        switch_core_hash_destroy(&rtmp_globals.invoke_hash);
        
+       rtmp_globals.running = 0;
+
        return SWITCH_STATUS_SUCCESS;
 }
 
+SWITCH_MODULE_RUNTIME_FUNCTION(mod_rtmp_runtime)
+{
+
+       while(rtmp_globals.running) {
+               rtmp_garbage_colletor();
+               switch_yield(10000000);
+       }
+
+       return SWITCH_STATUS_TERM;
+}
+
 /* For Emacs:
  * Local Variables:
  * mode:c
index 8a92c5b427f63629f253a0df46e6549c828beeb8..95eb17f017e34bcd1898783583eb7b8a4a15e5c4 100644 (file)
@@ -336,6 +336,7 @@ struct mod_rtmp_globals {
        switch_hash_t *session_hash;
        switch_thread_rwlock_t *session_rwlock;
        switch_hash_t *invoke_hash;
+       int running;
 };
 
 extern struct mod_rtmp_globals rtmp_globals;
@@ -605,6 +606,7 @@ void rtmp_profile_release(rtmp_profile_t *profile);
 switch_status_t rtmp_tcp_init(rtmp_profile_t *profile, const char *bindaddr, rtmp_io_t **new_io, switch_memory_pool_t *pool);
 switch_status_t rtmp_session_request(rtmp_profile_t *profile, rtmp_session_t **newsession);
 switch_status_t rtmp_session_destroy(rtmp_session_t **session);
+switch_status_t rtmp_real_session_destroy(rtmp_session_t **session);
 
 /**** Protocol ****/
 void rtmp_set_chunksize(rtmp_session_t *rsession, uint32_t chunksize);