SWITCH_MODULE_LOAD_FUNCTION(mod_rtmp_load);
SWITCH_MODULE_SHUTDOWN_FUNCTION(mod_rtmp_shutdown);
-SWITCH_MODULE_DEFINITION(mod_rtmp, mod_rtmp_load, mod_rtmp_shutdown, NULL);
+SWITCH_MODULE_RUNTIME_FUNCTION(mod_rtmp_runtime);
+SWITCH_MODULE_DEFINITION(mod_rtmp, mod_rtmp_load, mod_rtmp_shutdown, mod_rtmp_runtime);
static switch_status_t config_profile(rtmp_profile_t *profile, switch_bool_t reload);
static switch_xml_config_item_t *get_instructions(rtmp_profile_t *profile);
{
rtmp_session_t *rsession = switch_core_hash_find_rdlock(rtmp_globals.session_hash, uuid, rtmp_globals.session_rwlock);
- if (!rsession || rsession->state == RS_DESTROY) {
+ if (!rsession || rsession->state >= RS_DESTROY) {
return NULL;
}
return SWITCH_STATUS_SUCCESS;
}
-switch_status_t rtmp_session_destroy(rtmp_session_t **rsession)
+static void rtmp_garbage_colletor(void)
{
switch_hash_index_t *hi;
- switch_event_t *event;
-
- if (switch_event_create_subclass(&event, SWITCH_EVENT_CUSTOM, RTMP_EVENT_DISCONNECT) == SWITCH_STATUS_SUCCESS) {
- rtmp_event_fill(*rsession, event);
- switch_event_fire(&event);
+
+ switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG1, "RTMP Garbage Collection\n");
+
+
+ switch_thread_rwlock_wrlock(rtmp_globals.session_rwlock);
+
+ top:
+
+ for (hi = switch_hash_first(NULL, rtmp_globals.session_hash); hi; hi = switch_hash_next(hi)) {
+ void *val;
+ const void *key;
+ switch_ssize_t keylen;
+ rtmp_session_t *rsession;
+
+ switch_hash_this(hi, &key, &keylen, &val);
+ rsession = (rtmp_session_t *) val;
+
+ if (rsession->state == RS_DESTROY) {
+ if (rtmp_real_session_destroy(&rsession) == SWITCH_STATUS_SUCCESS) {
+ goto top;
+ }
+ }
}
- switch_core_hash_delete_wrlock(rtmp_globals.session_hash, (*rsession)->uuid, rtmp_globals.session_rwlock);
- switch_core_hash_delete_wrlock((*rsession)->profile->session_hash, (*rsession)->uuid, (*rsession)->profile->session_rwlock);
- rtmp_clear_registration(*rsession, NULL, NULL);
- switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_NOTICE, "RTMP session ended [%s]\n", (*rsession)->uuid);
-
- (*rsession)->state = RS_DESTROY;
+ switch_thread_rwlock_unlock(rtmp_globals.session_rwlock);
+}
+
+switch_status_t rtmp_session_destroy(rtmp_session_t **rsession)
+{
+ switch_status_t status = SWITCH_STATUS_FALSE;
+
+ switch_mutex_lock(rtmp_globals.mutex);
+ if (rsession && *rsession) {
+ (*rsession)->state = RS_DESTROY;
+ *rsession = NULL;
+ status = SWITCH_STATUS_SUCCESS;
+ }
+ switch_mutex_unlock(rtmp_globals.mutex);
+
+ return status;
+}
+
+switch_status_t rtmp_real_session_destroy(rtmp_session_t **rsession)
+{
+ switch_hash_index_t *hi;
+ switch_event_t *event;
+ int sess = 0;
switch_thread_rwlock_rdlock((*rsession)->session_rwlock);
for (hi = switch_hash_first(NULL, (*rsession)->session_hash); hi; hi = switch_hash_next(hi)) {
void *val;
const void *key;
switch_ssize_t keylen;
- rtmp_private_t *tech_pvt;
switch_channel_t *channel;
switch_core_session_t *session;
+
switch_hash_this(hi, &key, &keylen, &val);
- tech_pvt = (rtmp_private_t *)val;
- /* At this point we don't know if the session still exists, so request a fresh pointer to it from the core. */
- if ( (session = switch_core_session_locate((char *)key)) != NULL ) {
- /*
- * This is here so that if the FS session still exists and has the FS session write(or read) lock, then we won't destroy the rsession
- * until the FS session is finished with it. But if the rsession is able to get the FS session
- * write lock, before the FS session is hungup, then once the FS session does get the write lock
- * the rsession pointer will be null, and the FS session will never try and touch the already destroyed rsession.
- */
-
+ /* If there are any sessions attached, abort the destroy operation */
+ if ((session = switch_core_session_locate((char *)key)) != NULL ) {
channel = switch_core_session_get_channel(session);
- tech_pvt = switch_core_session_get_private(session);
- if ( tech_pvt && tech_pvt->rtmp_session ) {
- tech_pvt->rtmp_session = NULL;
- }
switch_channel_hangup(channel, SWITCH_CAUSE_DESTINATION_OUT_OF_ORDER);
switch_core_session_rwunlock(session);
+ sess++;
}
}
switch_thread_rwlock_unlock((*rsession)->session_rwlock);
+
+ if (sess) {
+ switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG1, "RTMP session [%s] %p still busy.\n", (*rsession)->uuid, (void *) *rsession);
+ return SWITCH_STATUS_FALSE;
+ }
+
+ switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG1, "RTMP session [%s] %p will be destroyed.\n", (*rsession)->uuid, (void *) *rsession);
+
+
+ if (switch_event_create_subclass(&event, SWITCH_EVENT_CUSTOM, RTMP_EVENT_DISCONNECT) == SWITCH_STATUS_SUCCESS) {
+ rtmp_event_fill(*rsession, event);
+ switch_event_fire(&event);
+ }
+
+ switch_core_hash_delete(rtmp_globals.session_hash, (*rsession)->uuid);
+ switch_core_hash_delete_wrlock((*rsession)->profile->session_hash, (*rsession)->uuid, (*rsession)->profile->session_rwlock);
+ rtmp_clear_registration(*rsession, NULL, NULL);
+ switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_NOTICE, "RTMP session ended [%s]\n", (*rsession)->uuid);
+
switch_mutex_lock((*rsession)->profile->mutex);
if ( (*rsession)->profile->calls < 1 ) {
switch_thread_rwlock_wrlock(rsession->profile->reg_rwlock);
if ((reg = switch_core_hash_find(rsession->profile->reg_hash, auth))) {
for (; reg; reg = reg->next) {
- if (!strcmp(reg->uuid, rsession->uuid) && (zstr(nickname) || !strcmp(reg->nickname, nickname))) {
+ if (!zstr(reg->uuid) && !strcmp(reg->uuid, rsession->uuid) && (zstr(nickname) || !strcmp(reg->nickname, nickname))) {
switch_event_t *event;
if (prev) {
prev->next = reg->next;
return SWITCH_STATUS_SUCCESS;
}
+static const char *state2name(int state)
+{
+ switch(state) {
+ case RS_HANDSHAKE:
+ return "HANDSHAKE";
+ case RS_HANDSHAKE2:
+ return "HANDSHAKE2";
+ case RS_ESTABLISHED:
+ return "ESTABLISHED";
+ default:
+ return "DESTROY (PENDING)";
+ }
+}
+
#define RTMP_FUNCTION_SYNTAX "profile [profilename] [start | stop | rescan | restart]\nstatus profile [profilename]\nstatus profile [profilename] [reg | sessions]\nsession [session_id] [kill | login [user@domain] | logout [user@domain]]"
SWITCH_STANDARD_API(rtmp_function)
{
{
switch_hash_index_t *hi;
stream->write_function(stream, "\nSessions:\n");
- stream->write_function(stream, "uuid,address,user,domain,flashVer\n");
+ stream->write_function(stream, "uuid,address,user,domain,flashVer,state\n");
switch_thread_rwlock_rdlock(profile->session_rwlock);
for (hi = switch_hash_first(NULL, profile->session_hash); hi; hi = switch_hash_next(hi)) {
void *val;
switch_hash_this(hi, &key, &keylen, &val);
item = (rtmp_session_t *)val;
- stream->write_function(stream, "%s,%s:%d,%s,%s,%s\n",
- item->uuid, item->remote_address, item->remote_port,
- item->account ? item->account->user : NULL,
- item->account ? item->account->domain : NULL,
- item->flashVer);
+ stream->write_function(stream, "%s,%s:%d,%s,%s,%s,%s\n",
+ item->uuid, item->remote_address, item->remote_port,
+ item->account ? item->account->user : NULL,
+ item->account ? item->account->domain : NULL,
+ item->flashVer, state2name(item->state));
}
switch_thread_rwlock_unlock(profile->session_rwlock);
switch_xml_free(xml);
}
}
+
+ rtmp_globals.running = 1;
return SWITCH_STATUS_SUCCESS;
}
switch_core_hash_destroy(&rtmp_globals.session_hash);
switch_core_hash_destroy(&rtmp_globals.invoke_hash);
+ rtmp_globals.running = 0;
+
return SWITCH_STATUS_SUCCESS;
}
+SWITCH_MODULE_RUNTIME_FUNCTION(mod_rtmp_runtime)
+{
+
+ while(rtmp_globals.running) {
+ rtmp_garbage_colletor();
+ switch_yield(10000000);
+ }
+
+ return SWITCH_STATUS_TERM;
+}
+
/* For Emacs:
* Local Variables:
* mode:c