-/*
+/*
* mod_rtmp for FreeSWITCH Modular Media Switching Software Library / Soft-Switch Application
* Copyright (C) 2011-2012, Barracuda Networks Inc.
*
* the Initial Developer. All Rights Reserved.
*
* Contributor(s):
- *
+ *
* Mathieu Rene <mrene@avgs.ca>
* Anthony Minessale II <anthm@freeswitch.org>
* William King <william.king@quentustech.com>
switch_channel_t *channel = switch_core_session_get_channel(session);
rtmp_private_t *tech_pvt = switch_core_session_get_private(session);
rtmp_session_t *rsession = tech_pvt->rtmp_session;
-
+
switch_channel_set_variable(channel, "rtmp_profile", rsession->profile->name);
switch_channel_set_variable(channel, "rtmp_session", rsession->uuid);
switch_channel_set_variable(channel, "rtmp_flash_version", rsession->flashVer);
switch_status_t rtmp_tech_init(rtmp_private_t *tech_pvt, rtmp_session_t *rsession, switch_core_session_t *session)
{
switch_assert(rsession && session && tech_pvt);
-
+
tech_pvt->read_frame.data = tech_pvt->databuf;
tech_pvt->read_frame.buflen = sizeof(tech_pvt->databuf);
switch_mutex_init(&tech_pvt->mutex, SWITCH_MUTEX_NESTED, switch_core_session_get_pool(session));
switch_mutex_init(&tech_pvt->flag_mutex, SWITCH_MUTEX_NESTED, switch_core_session_get_pool(session));
switch_mutex_init(&tech_pvt->readbuf_mutex, SWITCH_MUTEX_NESTED, switch_core_session_get_pool(session));
-
+
switch_buffer_create_dynamic(&tech_pvt->readbuf, 512, 512, 1024000);
//switch_buffer_add_mutex(tech_pvt->readbuf, tech_pvt->readbuf_mutex);
-
+
switch_core_timer_init(&tech_pvt->timer, "soft", 20, (16000 / (1000 / 20)), switch_core_session_get_pool(session));
-
+
tech_pvt->session = session;
tech_pvt->rtmp_session = rsession;
tech_pvt->channel = switch_core_session_get_channel(session);
/* Initialize read & write codecs */
if (switch_core_codec_init(&tech_pvt->read_codec, /* name */ "SPEEX", /* modname */ NULL,
- /* fmtp */ NULL, /* rate */ 16000, /* ms */ 20, /* channels */ 1,
- /* flags */ SWITCH_CODEC_FLAG_ENCODE | SWITCH_CODEC_FLAG_DECODE,
+ /* fmtp */ NULL, /* rate */ 16000, /* ms */ 20, /* channels */ 1,
+ /* flags */ SWITCH_CODEC_FLAG_ENCODE | SWITCH_CODEC_FLAG_DECODE,
/* codec settings */ NULL, switch_core_session_get_pool(session)) != SWITCH_STATUS_SUCCESS) {
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "Can't initialize read codec\n");
-
+
return SWITCH_STATUS_FALSE;
}
if (switch_core_codec_init(&tech_pvt->write_codec, /* name */ "SPEEX", /* modname */ NULL,
- /* fmtp */ NULL, /* rate */ 16000, /* ms */ 20, /* channels */ 1,
- /* flags */ SWITCH_CODEC_FLAG_ENCODE | SWITCH_CODEC_FLAG_DECODE,
+ /* fmtp */ NULL, /* rate */ 16000, /* ms */ 20, /* channels */ 1,
+ /* flags */ SWITCH_CODEC_FLAG_ENCODE | SWITCH_CODEC_FLAG_DECODE,
/* codec settings */ NULL, switch_core_session_get_pool(session)) != SWITCH_STATUS_SUCCESS) {
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "Can't initialize write codec\n");
-
+
return SWITCH_STATUS_FALSE;
}
-
+
switch_core_session_set_read_codec(session, &tech_pvt->read_codec);
switch_core_session_set_write_codec(session, &tech_pvt->write_codec);
-
+
//static inline uint8_t rtmp_audio_codec(int channels, int bits, int rate, rtmp_audio_format_t format) {
tech_pvt->audio_codec = 0xB2; //rtmp_audio_codec(1, 16, 0 /* speex is always 8000 */, RTMP_AUDIO_SPEEX);
-
+
switch_core_session_set_private(session, tech_pvt);
-
+
return SWITCH_STATUS_SUCCESS;
}
-/*
- State methods they get called when the state changes to the specific state
+/*
+ State methods they get called when the state changes to the specific state
returning SWITCH_STATUS_SUCCESS tells the core to execute the standard state method next
so if you fully implement the state you can return SWITCH_STATUS_FALSE to skip it.
*/
assert(channel != NULL);
switch_channel_set_flag(channel, CF_CNG_PLC);
-
+
rtmp_notify_call_state(session);
-
+
switch_set_flag_locked(tech_pvt, TFLAG_IO);
-
+
switch_mutex_lock(rsession->profile->mutex);
rsession->profile->calls++;
switch_mutex_unlock(rsession->profile->mutex);
if (switch_core_codec_ready(&tech_pvt->write_codec)) {
switch_core_codec_destroy(&tech_pvt->write_codec);
}
-
+
switch_buffer_destroy(&tech_pvt->readbuf);
switch_core_timer_destroy(&tech_pvt->timer);
}
rtmp_notify_call_state(session);
rtmp_send_onhangup(session);
-
+
/*
* If the session_rwlock is already locked, then there is a larger possibility that the rsession
* is looping through because the rsession is trying to hang them up. If that is the case, then there
- * is really no reason to foce this hash_delete. Just timeout, and let the rsession handle the final cleanup
+ * is really no reason to foce this hash_delete. Just timeout, and let the rsession handle the final cleanup
* since it now checks for the existence of the FS session safely.
*/
if ( switch_thread_rwlock_trywrlock_timeout(rsession->session_rwlock, 10) == SWITCH_STATUS_SUCCESS) {
}
switch_thread_rwlock_unlock(rsession->session_rwlock);
}
-
+
#ifndef RTMP_DONT_HOLD
if (switch_channel_test_flag(channel, CF_HOLD)) {
switch_channel_mark_hold(channel, SWITCH_FALSE);
switch (sig) {
case SWITCH_SIG_KILL:
switch_clear_flag_locked(tech_pvt, TFLAG_IO);
-
+
break;
case SWITCH_SIG_BREAK:
switch_set_flag_locked(tech_pvt, TFLAG_BREAK);
tech_pvt = switch_core_session_get_private(session);
assert(tech_pvt != NULL);
rsession = tech_pvt->rtmp_session;
-
+
if (rsession->state >= RS_DESTROY) {
return SWITCH_STATUS_FALSE;
}
-
+
if (switch_test_flag(tech_pvt, TFLAG_DETACHED)) {
switch_core_timer_next(&tech_pvt->timer);
goto cng;
}
-
+
tech_pvt->read_frame.flags = SFF_NONE;
tech_pvt->read_frame.codec = &tech_pvt->read_codec;
-
+
switch_core_timer_next(&tech_pvt->timer);
if (switch_buffer_inuse(tech_pvt->readbuf) < 2) {
goto cng;
} else {
uint8_t codec;
-
+
if (tech_pvt->read_frame.buflen < len) {
switch_mutex_unlock(tech_pvt->readbuf_mutex);
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "Packet of size %u is bigger that the buffer length %u.\n",
len, tech_pvt->read_frame.buflen);
return SWITCH_STATUS_FALSE;
}
-
+
switch_buffer_toss(tech_pvt->readbuf, 2);
- switch_buffer_read(tech_pvt->readbuf, &codec, 1);
+ switch_buffer_read(tech_pvt->readbuf, &codec, 1);
switch_buffer_read(tech_pvt->readbuf, tech_pvt->read_frame.data, len-1);
tech_pvt->read_frame.datalen = len-1;
-
+
if (codec != tech_pvt->audio_codec) {
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "Received codec 0x%x instead of 0x%x\n", codec, tech_pvt->audio_codec);
switch_mutex_unlock(tech_pvt->readbuf_mutex);
}
*frame = &tech_pvt->read_frame;
-
+
return SWITCH_STATUS_SUCCESS;
-
+
cng:
data = (switch_byte_t *) tech_pvt->read_frame.data;
//switch_core_timer_sync(&tech_pvt->timer);
*frame = &tech_pvt->read_frame;
-
+
return SWITCH_STATUS_SUCCESS;
}
//switch_frame_t *pframe;
unsigned char buf[AMF_MAX_SIZE];
switch_time_t ts;
-
+
channel = switch_core_session_get_channel(session);
assert(channel != NULL);
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "TFLAG_IO not set\n");
goto error;
}
-
+
if (switch_test_flag(tech_pvt, TFLAG_DETACHED) || !switch_test_flag(rsession, SFLAG_AUDIO)) {
goto success;
}
-
+
if (!rsession || !tech_pvt->audio_codec || !tech_pvt->write_channel) {
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "Missing mandatory value\n");
goto error;
}
-
+
if (rsession->state >= RS_DESTROY) {
goto error;
}
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "Datalen too big\n");
goto error;
}
-
+
if (frame->flags & SFF_CNG) {
goto success;
}
/* Build message */
buf[0] = tech_pvt->audio_codec;
memcpy(buf+1, frame->data, frame->datalen);
-
+
/* Send it down the socket */
if (!tech_pvt->stream_start_ts) {
tech_pvt->stream_start_ts = switch_micro_time_now() / 1000;
rtmp_send_message(rsession, RTMP_DEFAULT_STREAM_AUDIO, ts, RTMP_TYPE_AUDIO, rsession->media_streamid, buf, frame->datalen + 1, 0);
success:
- switch_thread_rwlock_unlock(rsession->rwlock);
+ switch_thread_rwlock_unlock(rsession->rwlock);
return SWITCH_STATUS_SUCCESS;
error:
switch_thread_rwlock_unlock(rsession->rwlock);
-
+
error_null:
return SWITCH_STATUS_FALSE;
}
case SWITCH_MESSAGE_INDICATE_UNHOLD:
rtmp_notify_call_state(session);
break;
-
+
case SWITCH_MESSAGE_INDICATE_DISPLAY:
{
const char *name = msg->string_array_arg[0], *number = msg->string_array_arg[1];
number = argv[1];
}
-
+
if (!zstr(name)) {
if (zstr(number)) {
switch_caller_profile_t *caller_profile = switch_channel_get_caller_profile(channel);
number = caller_profile->destination_number;
}
-
+
if (zstr(tech_pvt->display_callee_id_name) || strcmp(tech_pvt->display_callee_id_name, name)) {
tech_pvt->display_callee_id_name = switch_core_session_strdup(session, name);
}
-
+
if (zstr(tech_pvt->display_callee_id_number) || strcmp(tech_pvt->display_callee_id_number, number)) {
tech_pvt->display_callee_id_number = switch_core_session_strdup(session, number);
}
-
+
rtmp_send_display_update(session);
}
-
+
switch_safe_free(arg);
}
break;
switch_memory_pool_t *pool;
char *destination = NULL, *auth, *user, *domain;
*newsession = NULL;
-
+
if (zstr(outbound_profile->destination_number)) {
switch_log_printf(SWITCH_CHANNEL_SESSION_LOG(session), SWITCH_LOG_ERROR, "No destination\n");
goto fail;
}
destination = strdup(outbound_profile->destination_number);
-
+
if ((auth = strchr(destination, '/'))) {
*auth++ = '\0';
}
switch_log_printf(SWITCH_CHANNEL_SESSION_LOG(session), SWITCH_LOG_ERROR, "No such session id: %s\n", outbound_profile->destination_number);
goto fail;
}
-
+
if (!(*newsession = switch_core_session_request_uuid(rtmp_globals.rtmp_endpoint_interface, flags, SWITCH_CALL_DIRECTION_OUTBOUND, inpool, switch_event_get_header(var_event, "origination_uuid")))) {
goto fail;
}
-
+
pool = switch_core_session_get_pool(*newsession);
- channel = switch_core_session_get_channel(*newsession);
+ channel = switch_core_session_get_channel(*newsession);
switch_channel_set_name(channel, switch_core_session_sprintf(*newsession, "rtmp/%s/%s", rsession->profile->name, outbound_profile->destination_number));
-
+
caller_profile = switch_caller_profile_dup(pool, outbound_profile);
switch_channel_set_caller_profile(channel, caller_profile);
-
+
tech_pvt = switch_core_alloc(pool, sizeof(rtmp_private_t));
tech_pvt->rtmp_session = rsession;
tech_pvt->write_channel = RTMP_DEFAULT_STREAM_AUDIO;
tech_pvt->session = *newsession;
- tech_pvt->caller_profile = caller_profile;
+ tech_pvt->caller_profile = caller_profile;
switch_core_session_add_stream(*newsession, NULL);
-
+
if (rtmp_tech_init(tech_pvt, rsession, *newsession) != SWITCH_STATUS_SUCCESS) {
switch_log_printf(SWITCH_CHANNEL_SESSION_LOG(*newsession), SWITCH_LOG_ERROR, "tech_init failed\n");
cause = SWITCH_CAUSE_DESTINATION_OUT_OF_ORDER;
goto fail;
}
-
+
if (!zstr(auth)) {
tech_pvt->auth = switch_core_session_strdup(*newsession, auth);
switch_split_user_domain(auth, &user, &domain);
}
/*switch_channel_mark_pre_answered(channel);*/
-
+
switch_channel_ring_ready(channel);
rtmp_send_incoming_call(*newsession, var_event);
-
+
switch_channel_set_state(channel, CS_INIT);
switch_set_flag_locked(tech_pvt, TFLAG_IO);
-
+
rtmp_set_channel_variables(*newsession);
switch_core_hash_insert_wrlock(rsession->session_hash, switch_core_session_get_uuid(*newsession), tech_pvt, rsession->session_rwlock);
if (rsession) {
rtmp_session_rwunlock(rsession);
}
-
+
return SWITCH_CAUSE_SUCCESS;
-
+
fail:
if (*newsession) {
if (!switch_core_session_running(*newsession) && !switch_core_session_started(*newsession)) {
/* Deliver the event as a custom message to the target rtmp session */
switch_event_add_header_string(event, SWITCH_STACK_BOTTOM, "Session", switch_core_session_get_uuid(session));
-
+
rtmp_send_event(rsession, event);
return SWITCH_STATUS_SUCCESS;
}
-rtmp_profile_t *rtmp_profile_locate(const char *name)
+rtmp_profile_t *rtmp_profile_locate(const char *name)
{
rtmp_profile_t *profile = switch_core_hash_find_rdlock(rtmp_globals.profile_hash, name, rtmp_globals.profile_rwlock);
-
+
if (profile) {
if (switch_thread_rwlock_tryrdlock(profile->rwlock) != SWITCH_STATUS_SUCCESS) {
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "Profile %s is locked\n", name);
return profile;
}
-void rtmp_profile_release(rtmp_profile_t *profile)
+void rtmp_profile_release(rtmp_profile_t *profile)
{
switch_thread_rwlock_unlock(profile->rwlock);
}
rtmp_session_t *rtmp_session_locate(const char *uuid)
{
rtmp_session_t *rsession = switch_core_hash_find_rdlock(rtmp_globals.session_hash, uuid, rtmp_globals.session_rwlock);
-
+
if (!rsession || rsession->state >= RS_DESTROY) {
return NULL;
}
-
+
switch_thread_rwlock_rdlock(rsession->rwlock);
-
+
return rsession;
}
switch_thread_rwlock_unlock(rsession->rwlock);
}
-void rtmp_event_fill(rtmp_session_t *rsession, switch_event_t *event)
+void rtmp_event_fill(rtmp_session_t *rsession, switch_event_t *event)
{
switch_event_add_header_string(event, SWITCH_STACK_BOTTOM, "RTMP-Session-ID", rsession->uuid);
switch_event_add_header_string(event, SWITCH_STACK_BOTTOM, "RTMP-Flash-Version", rsession->flashVer);
switch_uuid_t uuid;
switch_core_new_memory_pool(&pool);
*newsession = switch_core_alloc(pool, sizeof(rtmp_session_t));
-
+
(*newsession)->pool = pool;
(*newsession)->profile = profile;
(*newsession)->in_chunksize = (*newsession)->out_chunksize = RTMP_DEFAULT_CHUNKSIZE;
(*newsession)->recv_ack_window = RTMP_DEFAULT_ACK_WINDOW;
(*newsession)->next_streamid = 1;
(*newsession)->io_private = NULL;
-
+
switch_uuid_get(&uuid);
switch_uuid_format((*newsession)->uuid, &uuid);
switch_mutex_init(&((*newsession)->socket_mutex), SWITCH_MUTEX_NESTED, pool);
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_NOTICE, "New RTMP session [%s]\n", (*newsession)->uuid);
switch_core_hash_insert_wrlock(rtmp_globals.session_hash, (*newsession)->uuid, *newsession, rtmp_globals.session_rwlock);
switch_core_hash_insert_wrlock(profile->session_hash, (*newsession)->uuid, *newsession, profile->session_rwlock);
-
+
switch_core_hash_init(&(*newsession)->session_hash);
switch_thread_rwlock_create(&(*newsession)->session_rwlock, pool);
(*newsession)->io_debug_out = fopen(buf, "w");
}
#endif
-
+
switch_mutex_lock(profile->mutex);
profile->clients++;
switch_mutex_unlock(profile->mutex);
-
+
{
switch_event_t *event;
if (switch_event_create_subclass(&event, SWITCH_EVENT_CUSTOM, RTMP_EVENT_CONNECT) == SWITCH_STATUS_SUCCESS) {
switch_event_fire(&event);
}
}
-
+
return SWITCH_STATUS_SUCCESS;
}
top:
for (hi = switch_core_hash_first_iter( rtmp_globals.session_hash, hi); hi; hi = switch_core_hash_next(&hi)) {
- void *val;
+ void *val;
const void *key;
switch_ssize_t keylen;
rtmp_session_t *rsession;
}
}
switch_safe_free(hi);
-
+
switch_thread_rwlock_unlock(rtmp_globals.session_rwlock);
}
-switch_status_t rtmp_session_destroy(rtmp_session_t **rsession)
+switch_status_t rtmp_session_destroy(rtmp_session_t **rsession)
{
switch_status_t status = SWITCH_STATUS_FALSE;
return status;
}
-switch_status_t rtmp_real_session_destroy(rtmp_session_t **rsession)
+switch_status_t rtmp_real_session_destroy(rtmp_session_t **rsession)
{
switch_hash_index_t *hi;
switch_event_t *event;
switch_thread_rwlock_rdlock((*rsession)->session_rwlock);
for (hi = switch_core_hash_first((*rsession)->session_hash); hi; hi = switch_core_hash_next(&hi)) {
- void *val;
+ void *val;
const void *key;
switch_ssize_t keylen;
switch_channel_t *channel;
switch_core_session_t *session;
- switch_core_hash_this(hi, &key, &keylen, &val);
-
+ switch_core_hash_this(hi, &key, &keylen, &val);
+
/* If there are any sessions attached, abort the destroy operation */
if ((session = switch_core_session_locate((char *)key)) != NULL ) {
channel = switch_core_session_get_channel(session);
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG1, "RTMP session [%s] %p still busy.\n", (*rsession)->uuid, (void *) *rsession);
return SWITCH_STATUS_FALSE;
}
-
+
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG1, "RTMP session [%s] %p will be destroyed.\n", (*rsession)->uuid, (void *) *rsession);
-
-
+
+
if (switch_event_create_subclass(&event, SWITCH_EVENT_CUSTOM, RTMP_EVENT_DISCONNECT) == SWITCH_STATUS_SUCCESS) {
rtmp_event_fill(*rsession, event);
switch_event_fire(&event);
}
-
+
switch_core_hash_delete(rtmp_globals.session_hash, (*rsession)->uuid);
switch_core_hash_delete_wrlock((*rsession)->profile->session_hash, (*rsession)->uuid, (*rsession)->profile->session_rwlock);
rtmp_clear_registration(*rsession, NULL, NULL);
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_NOTICE, "RTMP session ended [%s]\n", (*rsession)->uuid);
-
-
+
+
switch_mutex_lock((*rsession)->profile->mutex);
if ( (*rsession)->profile->calls < 1 ) {
(*rsession)->profile->calls = 0;
switch_thread_rwlock_wrlock((*rsession)->rwlock);
switch_thread_rwlock_unlock((*rsession)->rwlock);
-
+
#ifdef RTMP_DEBUG_IO
fclose((*rsession)->io_debug_in);
fclose((*rsession)->io_debug_out);
-#endif
-
+#endif
+
switch_mutex_lock((*rsession)->profile->mutex);
(*rsession)->profile->clients--;
switch_mutex_unlock((*rsession)->profile->mutex);
-
+
switch_core_hash_destroy(&(*rsession)->session_hash);
-
+
switch_core_destroy_memory_pool(&(*rsession)->pool);
-
+
*rsession = NULL;
-
+
return SWITCH_STATUS_SUCCESS;
}
switch_caller_profile_t *caller_profile;
switch_channel_t *channel;
const char *dialplan, *context;
-
+
if (!(*newsession = switch_core_session_request(rtmp_globals.rtmp_endpoint_interface, SWITCH_CALL_DIRECTION_INBOUND, SOF_NONE, NULL))) {
return SWITCH_CAUSE_DESTINATION_OUT_OF_ORDER;
}
switch_log_printf(SWITCH_CHANNEL_UUID_LOG(rsession->uuid), SWITCH_LOG_INFO, "New FreeSWITCH session created: %s\n",
switch_core_session_get_uuid(*newsession));
- pool = switch_core_session_get_pool(*newsession);
- channel = switch_core_session_get_channel(*newsession);
+ pool = switch_core_session_get_pool(*newsession);
+ channel = switch_core_session_get_channel(*newsession);
switch_channel_set_name(channel, switch_core_session_sprintf(*newsession, "rtmp/%s/%s", rsession->profile->name, number));
-
+
if (!zstr(auth_user) && !zstr(auth_domain)) {
const char *s = switch_core_session_sprintf(*newsession, "%s@%s", auth_user, auth_domain);
switch_ivr_set_user(*newsession, s);
switch_channel_set_variable(channel, "rtmp_authorized", "true");
}
-
+
if (!(context = switch_channel_get_variable(channel, "user_context"))) {
if (!(context = rsession->profile->context)) {
context = "public";
}
}
-
+
if (!(dialplan = switch_channel_get_variable(channel, "inbound_dialplan"))) {
if (!(dialplan = rsession->profile->dialplan)) {
dialplan = "XML";
}
}
-
- caller_profile = switch_caller_profile_new(pool, switch_str_nil(auth_user), dialplan,
- SWITCH_DEFAULT_CLID_NAME,
+
+ caller_profile = switch_caller_profile_new(pool, switch_str_nil(auth_user), dialplan,
+ SWITCH_DEFAULT_CLID_NAME,
!zstr(auth_user) ? auth_user : SWITCH_DEFAULT_CLID_NUMBER,
- rsession->remote_address /* net addr */,
- NULL /* ani */,
- NULL /* anii */,
- NULL /* rdnis */,
+ rsession->remote_address /* net addr */,
+ NULL /* ani */,
+ NULL /* anii */,
+ NULL /* rdnis */,
"mod_rtmp", context, number);
-
+
switch_channel_set_caller_profile(channel, caller_profile);
-
+
tech_pvt = switch_core_alloc(pool, sizeof(rtmp_private_t));
tech_pvt->rtmp_session = rsession;
tech_pvt->write_channel = RTMP_DEFAULT_STREAM_AUDIO;
tech_pvt->session = *newsession;
- tech_pvt->caller_profile = caller_profile;
+ tech_pvt->caller_profile = caller_profile;
switch_core_session_add_stream(*newsession, NULL);
-
+
if (rtmp_tech_init(tech_pvt, rsession, *newsession) != SWITCH_STATUS_SUCCESS) {
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "tech_init failed\n");
goto fail;
}
-
+
if (!zstr(auth_user) && !zstr(auth_domain)) {
tech_pvt->auth_user = switch_core_session_strdup(*newsession, auth_user);
tech_pvt->auth_domain = switch_core_session_strdup(*newsession, auth_domain);
}
switch_core_hash_insert_wrlock(rsession->session_hash, switch_core_session_get_uuid(*newsession), tech_pvt, rsession->session_rwlock);
-
+
return SWITCH_CAUSE_SUCCESS;
-
+
fail:
if (!switch_core_session_running(*newsession) && !switch_core_session_started(*newsession)) {
switch_core_session_destroy(newsession);
}
- return SWITCH_CAUSE_DESTINATION_OUT_OF_ORDER;
+ return SWITCH_CAUSE_DESTINATION_OUT_OF_ORDER;
}
switch_status_t rtmp_profile_start(const char *profilename)
{
switch_memory_pool_t *pool;
rtmp_profile_t *profile;
-
+
switch_assert(profilename);
-
+
switch_core_new_memory_pool(&pool);
profile = switch_core_alloc(pool, sizeof(*profile));
profile->pool = pool;
switch_thread_rwlock_create(&profile->session_rwlock, pool);
switch_thread_rwlock_create(&profile->reg_rwlock, pool);
switch_core_hash_init(&profile->reg_hash);
-
+
if (!strcmp(profile->io_name, "tcp")) {
if (rtmp_tcp_init(profile, profile->bind_address, &profile->io, pool) != SWITCH_STATUS_SUCCESS) {
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "Couldn't initialize I/O layer\n");
goto fail;
- }
+ }
} else {
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "No such I/O module [%s]\n", profile->io_name);
goto fail;
}
-
+
switch_core_hash_insert_wrlock(rtmp_globals.profile_hash, profile->name, profile, rtmp_globals.profile_rwlock);
-
+
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_INFO, "Started profile %s\n", profile->name);
-
+
return SWITCH_STATUS_SUCCESS;
fail:
switch_core_destroy_memory_pool(&pool);
switch_hash_index_t *hi = NULL;
switch_xml_config_item_t *instructions = get_instructions(*profile);
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_NOTICE, "Stopping profile: %s\n", (*profile)->name);
-
+
switch_core_hash_delete_wrlock(rtmp_globals.profile_hash, (*profile)->name, rtmp_globals.profile_rwlock);
-
+
switch_thread_rwlock_wrlock((*profile)->rwlock);
-
- /* Kill all sessions */
+
+ /* Kill all sessions */
while ((hi = switch_core_hash_first_iter((*profile)->session_hash, hi))) {
void *val;
rtmp_session_t *session;
const void *key;
switch_ssize_t keylen;
switch_core_hash_this(hi, &key, &keylen, &val);
+
+ session = val;
- session = val;
-
rtmp_session_destroy(&session);
}
-
+
if ((*profile)->io->running > 0) {
(*profile)->io->running = 0;
-
+
while (sanity++ < 100 && (*profile)->io->running == 0) {
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "Waiting for thread to end\n");
switch_yield(500000);
}
}
-
+
switch_thread_rwlock_unlock((*profile)->rwlock);
-
+
switch_xml_config_cleanup(instructions);
-
+
switch_core_hash_destroy(&(*profile)->session_hash);
switch_core_hash_destroy(&(*profile)->reg_hash);
-
+
switch_core_destroy_memory_pool(&(*profile)->pool);
-
+
free(instructions);
-
+
return SWITCH_STATUS_SUCCESS;
}
-void rtmp_add_registration(rtmp_session_t *rsession, const char *auth, const char *nickname)
+void rtmp_add_registration(rtmp_session_t *rsession, const char *auth, const char *nickname)
{
rtmp_reg_t *current_reg;
rtmp_reg_t *reg;
if (zstr(auth)) {
return;
}
-
+
reg = switch_core_alloc(rsession->pool, sizeof(*reg));
reg->uuid = rsession->uuid;
if (!zstr(nickname)) {
- reg->nickname = switch_core_strdup(rsession->pool, nickname);
+ reg->nickname = switch_core_strdup(rsession->pool, nickname);
}
-
+
switch_thread_rwlock_wrlock(rsession->profile->reg_rwlock);
if ((current_reg = switch_core_hash_find(rsession->profile->reg_hash, auth))) {
for (;current_reg && current_reg->next; current_reg = current_reg->next);
switch_core_hash_insert(rsession->profile->reg_hash, auth, reg);
}
switch_thread_rwlock_unlock(rsession->profile->reg_rwlock);
-
+
if (switch_event_create_subclass(&event, SWITCH_EVENT_CUSTOM, RTMP_EVENT_REGISTER) == SWITCH_STATUS_SUCCESS) {
char *user, *domain, *dup;
char *url = NULL;
char network_port_c[6];
snprintf(network_port_c, sizeof(network_port_c), "%d", rsession->remote_port);
rtmp_event_fill(rsession, event);
-
+
dup = strdup(auth);
switch_split_user_domain(dup, &user, &domain);
reg->user = switch_core_strdup(rsession->pool, user);
reg->domain = switch_core_strdup(rsession->pool, domain);
-
+
switch_event_add_header_string(event, SWITCH_STACK_BOTTOM, "User", user);
switch_event_add_header_string(event, SWITCH_STACK_BOTTOM, "Domain", domain);
switch_event_add_header_string(event, SWITCH_STACK_BOTTOM, "Nickname", switch_str_nil(nickname));
void rtmp_clear_registration(rtmp_session_t *rsession, const char *auth, const char *nickname)
-{
+{
rtmp_account_t *account;
-
+
if (zstr(auth)) {
/* Reg data is pool-allocated, no need to free them */
switch_thread_rwlock_rdlock(rsession->account_rwlock);
} else {
rtmp_clear_reg_auth(rsession, auth, nickname);
}
-
+
}
switch_status_t rtmp_session_login(rtmp_session_t *rsession, const char *user, const char *domain)
{
rtmp_account_t *account = switch_core_alloc(rsession->pool, sizeof(*account));
switch_event_t *event;
-
+
account->user = switch_core_strdup(rsession->pool, user);
account->domain = switch_core_strdup(rsession->pool, domain);
-
+
switch_thread_rwlock_wrlock(rsession->account_rwlock);
account->next = rsession->account;
rsession->account = account;
switch_thread_rwlock_unlock(rsession->account_rwlock);
-
+
rtmp_send_invoke_free(rsession, 3, 0, 0,
amf0_str("onLogin"),
amf0_number_new(0),
amf0_str("success"),
amf0_str(user),
amf0_str(domain), NULL);
-
+
if (switch_event_create_subclass(&event, SWITCH_EVENT_CUSTOM, RTMP_EVENT_LOGIN) == SWITCH_STATUS_SUCCESS) {
rtmp_event_fill(rsession, event);
switch_event_add_header_string(event, SWITCH_STACK_BOTTOM, "User", user);
switch_event_add_header_string(event, SWITCH_STACK_BOTTOM, "Domain", domain);
switch_event_fire(&event);
}
-
- switch_log_printf(SWITCH_CHANNEL_UUID_LOG(rsession->uuid), SWITCH_LOG_INFO, "RTMP Session [%s] is now logged into %s@%s\n", rsession->uuid, user, domain);
-
+
+ switch_log_printf(SWITCH_CHANNEL_UUID_LOG(rsession->uuid), SWITCH_LOG_INFO, "RTMP Session [%s] is now logged into %s@%s\n", rsession->uuid, user, domain);
+
return SWITCH_STATUS_SUCCESS;
}
{
rtmp_account_t *account;
switch_event_t *event;
-
+
switch_thread_rwlock_wrlock(rsession->account_rwlock);
for (account = rsession->account; account; account = account->next) {
if (!strcmp(account->user, user) && !strcmp(account->domain, domain)) {
}
}
switch_thread_rwlock_unlock(rsession->account_rwlock);
-
+
rtmp_send_invoke_free(rsession, 3, 0, 0,
amf0_str("onLogout"),
amf0_number_new(0),
amf0_null_new(),
amf0_str(user),
amf0_str(domain), NULL);
-
+
if (switch_event_create_subclass(&event, SWITCH_EVENT_CUSTOM, RTMP_EVENT_LOGOUT) == SWITCH_STATUS_SUCCESS) {
rtmp_event_fill(rsession, event);
switch_event_add_header_string(event, SWITCH_STACK_BOTTOM, "User", user);
switch_event_add_header_string(event, SWITCH_STACK_BOTTOM, "Domain", domain);
switch_event_fire(&event);
}
-
+
switch_log_printf(SWITCH_CHANNEL_UUID_LOG(rsession->uuid), SWITCH_LOG_INFO, "RTMP Session [%s] is now logged out of %s@%s\n", rsession->uuid, user, domain);
-
+
return SWITCH_STATUS_SUCCESS;
}
{
rtmp_account_t *account;
switch_status_t status = SWITCH_STATUS_FALSE;
-
+
switch_thread_rwlock_rdlock(rsession->account_rwlock);
if (user && domain) {
for (account = rsession->account; account; account = account->next) {
}
}
switch_thread_rwlock_unlock(rsession->account_rwlock);
-
+
return status;
}
-void rtmp_attach_private(rtmp_session_t *rsession, rtmp_private_t *tech_pvt)
-{
+void rtmp_attach_private(rtmp_session_t *rsession, rtmp_private_t *tech_pvt)
+{
switch_event_t *event;
-
+
if (rsession->tech_pvt) {
/* Detach current call */
switch_set_flag_locked(rsession->tech_pvt, TFLAG_DETACHED);
#endif
if (switch_event_create_subclass(&event, SWITCH_EVENT_CUSTOM, RTMP_EVENT_DETACH) == SWITCH_STATUS_SUCCESS) {
rtmp_event_fill(rsession, event);
- switch_event_add_header_string(event, SWITCH_STACK_BOTTOM, "Call-ID",
+ switch_event_add_header_string(event, SWITCH_STACK_BOTTOM, "Call-ID",
switch_core_session_get_uuid(rsession->tech_pvt->session));
switch_event_fire(&event);
}
-
+
rsession->tech_pvt = NULL;
}
-
+
if (tech_pvt && switch_test_flag(tech_pvt, TFLAG_THREE_WAY)) {
const char *s = switch_channel_get_variable(tech_pvt->channel, RTMP_THREE_WAY_UUID_VARIABLE);
/* 2nd call of a three-way: attach to other call instead */
tech_pvt = NULL;
}
}
-
+
rsession->tech_pvt = tech_pvt;
-
+
if (tech_pvt) {
/* Attach new call */
switch_clear_flag_locked(tech_pvt, TFLAG_DETACHED);
-
+
#ifndef RTMP_DONT_HOLD
if (switch_channel_test_flag(tech_pvt->channel, CF_HOLD)) {
switch_channel_mark_hold(tech_pvt->channel, SWITCH_FALSE);
- switch_ivr_unhold(tech_pvt->session);
+ switch_ivr_unhold(tech_pvt->session);
}
#endif
if (switch_event_create_subclass(&event, SWITCH_EVENT_CUSTOM, RTMP_EVENT_ATTACH) == SWITCH_STATUS_SUCCESS) {
static switch_xml_config_item_t *get_instructions(rtmp_profile_t *profile) {
switch_xml_config_item_t *dup;
- static switch_xml_config_int_options_t opt_chunksize = {
+ static switch_xml_config_int_options_t opt_chunksize = {
SWITCH_TRUE, /* enforce min */
128,
SWITCH_TRUE, /* Enforce Max */
SWITCH_CONFIG_ITEM("buffer-len", SWITCH_CONFIG_INT, CONFIG_RELOADABLE, &profile->buffer_len, 500, &opt_bufferlen, "", "Length of the receiving buffer to be used by the flash clients, in miliseconds"),
SWITCH_CONFIG_ITEM_END()
};
-
+
dup = malloc(sizeof(instructions));
memcpy(dup, instructions, sizeof(instructions));
return dup;
switch_event_t *event = NULL;
int count;
const char *file = "rtmp.conf";
-
+
if (!(xml = switch_xml_open_cfg(file, &cfg, NULL))) {
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "Could not open %s\n", file);
goto done;
}
-
+
if (!(x_profiles = switch_xml_child(cfg, "profiles"))) {
goto done;
}
-
+
for (x_profile = switch_xml_child(x_profiles, "profile"); x_profile; x_profile = x_profile->next) {
const char *name = switch_xml_attr_soft(x_profile, "name");
if (strcmp(name, profile->name)) {
continue;
}
-
+
if (!(x_settings = switch_xml_child(x_profile, "settings"))) {
goto done;
}
-
-
+
+
count = switch_event_import_xml(switch_xml_child(x_settings, "param"), "name", "value", &event);
status = switch_xml_config_parse_event(event, count, reload, instructions);
}
-
-
+
+
done:
if (xml) {
- switch_xml_free(xml);
+ switch_xml_free(xml);
}
switch_safe_free(instructions);
if (event) {
{
rtmp_session_t *rsession;
const char *uuid;
-
+
if (!event) {
return;
}
-
+
uuid = switch_event_get_header(event, "RTMP-Session-ID");
if (zstr(uuid)) {
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "RTMP Custom event without RTMP-Session-ID\n");
return;
}
-
+
if ((rsession = rtmp_session_locate(uuid))) {
rtmp_send_event(rsession, event);
rtmp_session_rwunlock(rsession);
int argc;
char *argv[5];
char *dup = NULL;
- char *szprofile = NULL, *user = NULL;
+ char *szprofile = NULL, *user = NULL;
const char *nickname = NULL;
rtmp_profile_t *profile = NULL;
rtmp_reg_t *reg;
switch_bool_t first = SWITCH_TRUE;
-
+
if (zstr(cmd)) {
goto usage;
}
-
+
dup = strdup(cmd);
argc = switch_split(dup, '/', argv);
-
+
if (argc < 2 || zstr(argv[0]) || zstr(argv[1])) {
goto usage;
}
-
+
szprofile = argv[0];
if (!strchr(argv[1], '@')) {
goto usage;
- }
-
+ }
+
user = argv[1];
nickname = argv[2];
-
+
if (!(profile = rtmp_profile_locate(szprofile))) {
stream->write_function(stream, "-ERR No such profile\n");
goto done;
}
-
+
switch_thread_rwlock_rdlock(profile->reg_rwlock);
- if ((reg = switch_core_hash_find(profile->reg_hash, user))) {
+ if ((reg = switch_core_hash_find(profile->reg_hash, user))) {
for (; reg; reg = reg->next) {
- if (zstr(nickname) ||
- (nickname[0] == '!' && (zstr(reg->nickname) || strcmp(reg->nickname, nickname+1))) ||
+ if (zstr(nickname) ||
+ (nickname[0] == '!' && (zstr(reg->nickname) || strcmp(reg->nickname, nickname+1))) ||
(!zstr(reg->nickname) && !strcmp(reg->nickname, nickname))) {
if (!first) {
stream->write_function(stream, ",");
}
switch_thread_rwlock_unlock(profile->reg_rwlock);
goto done;
-
+
usage:
stream->write_function(stream, "Usage: rtmp_contact "RTMP_CONTACT_FUNCTION_SYNTAX"\n");
int argc;
char *argv[10];
char *dup = NULL;
-
+
if (zstr(cmd)) {
goto usage;
}
-
+
dup = strdup(cmd);
argc = switch_split(dup, ' ', argv);
-
+
if (argc < 1 || zstr(argv[0])) {
goto usage;
}
-
+
if (!strcmp(argv[0], "profile")) {
if (zstr(argv[1]) || zstr(argv[2])) {
goto usage;
} else if (!strcmp(argv[0], "status")) {
if (!zstr(argv[1]) && !strcmp(argv[1], "profile") && !zstr(argv[2])) {
rtmp_profile_t *profile;
-
+
if ((profile = rtmp_profile_locate(argv[2]))) {
stream->write_function(stream, "Profile: %s\n", profile->name);
stream->write_function(stream, "I/O Backend: %s\n", profile->io->name);
stream->write_function(stream, "Bind address: %s\n", profile->io->address);
stream->write_function(stream, "Active calls: %d\n", profile->calls);
-
+
if (!zstr(argv[3]) && !strcmp(argv[3], "sessions"))
{
switch_hash_index_t *hi;
stream->write_function(stream, "uuid,address,user,domain,flashVer,state\n");
switch_thread_rwlock_rdlock(profile->session_rwlock);
for (hi = switch_core_hash_first(profile->session_hash); hi; hi = switch_core_hash_next(&hi)) {
- void *val;
+ void *val;
const void *key;
switch_ssize_t keylen;
rtmp_session_t *item;
switch_core_hash_this(hi, &key, &keylen, &val);
-
+
item = (rtmp_session_t *)val;
- stream->write_function(stream, "%s,%s:%d,%s,%s,%s,%s\n",
+ stream->write_function(stream, "%s,%s:%d,%s,%s,%s,%s\n",
item->uuid, item->remote_address, item->remote_port,
item->account ? item->account->user : NULL,
item->account ? item->account->domain : NULL,
item->flashVer, state2name(item->state));
-
+
}
switch_thread_rwlock_unlock(profile->session_rwlock);
} else if (!zstr(argv[3]) && !strcmp(argv[3], "reg")) {
switch_hash_index_t *hi;
stream->write_function(stream, "\nRegistrations:\n");
stream->write_function(stream, "user,nickname,uuid\n");
-
+
switch_thread_rwlock_rdlock(profile->reg_rwlock);
for (hi = switch_core_hash_first(profile->reg_hash); hi; hi = switch_core_hash_next(&hi)) {
- void *val;
+ void *val;
const void *key;
switch_ssize_t keylen;
rtmp_reg_t *item;
switch_core_hash_this(hi, &key, &keylen, &val);
-
+
item = (rtmp_reg_t *)val;
for (;item;item = item->next) {
stream->write_function(stream, "%s,%s,%s\n",
key, switch_str_nil(item->nickname), item->uuid);
}
}
- switch_thread_rwlock_unlock(profile->reg_rwlock);
+ switch_thread_rwlock_unlock(profile->reg_rwlock);
} else {
- stream->write_function(stream, "Dialplan: %s\n", profile->dialplan);
- stream->write_function(stream, "Context: %s\n", profile->context);
+ stream->write_function(stream, "Dialplan: %s\n", profile->dialplan);
+ stream->write_function(stream, "Context: %s\n", profile->context);
}
-
+
rtmp_profile_release(profile);
} else {
stream->write_function(stream, "-ERR No such profile [%s]\n", argv[2]);
switch_hash_index_t *hi;
switch_thread_rwlock_rdlock(rtmp_globals.profile_rwlock);
for (hi = switch_core_hash_first(rtmp_globals.profile_hash); hi; hi = switch_core_hash_next(&hi)) {
- void *val;
+ void *val;
const void *key;
switch_ssize_t keylen;
rtmp_profile_t *item;
switch_core_hash_this(hi, &key, &keylen, &val);
-
+
item = (rtmp_profile_t *)val;
stream->write_function(stream, "%s\t%s:%s\tprofile\n", item->name, item->io->name, item->io->address);
-
+
}
switch_thread_rwlock_unlock(rtmp_globals.profile_rwlock);
}
-
+
} else if (!strcmp(argv[0], "session")) {
rtmp_session_t *rsession;
-
+
if (zstr(argv[1]) || zstr(argv[2])) {
goto usage;
}
-
+
rsession = rtmp_session_locate(argv[1]);
if (!rsession) {
stream->write_function(stream, "-ERR No such session\n");
goto done;
}
-
+
if (!strcmp(argv[2], "login")) {
char *user, *domain;
if (zstr(argv[3])) {
goto usage;
}
switch_split_user_domain(argv[3], &user, &domain);
-
+
if (!zstr(user) && !zstr(domain)) {
rtmp_session_login(rsession, user, domain);
stream->write_function(stream, "+OK\n");
goto usage;
}
switch_split_user_domain(argv[3], &user, &domain);
-
+
if (!zstr(user) && !zstr(domain)) {
rtmp_session_logout(rsession, user, domain);
stream->write_function(stream, "+OK\n");
char *dest = argv[3];
char *user = argv[4];
char *domain = NULL;
-
+
if (!zstr(user) && (domain = strchr(user, '@'))) {
*domain++ = '\0';
}
-
+
if (!zstr(dest)) {
if (rtmp_session_create_call(rsession, &newsession, 0, RTMP_DEFAULT_STREAM_AUDIO, dest, user, domain, NULL) != SWITCH_CAUSE_SUCCESS) {
stream->write_function(stream, "-ERR Couldn't create new call\n");
} else {
stream->write_function(stream, "-ERR No such session action [%s]\n", argv[2]);
}
-
+
if (rsession) {
rtmp_session_rwunlock(rsession);
}
} else {
goto usage;
}
-
+
goto done;
-
+
usage:
stream->write_function(stream, "-ERR Usage: "RTMP_FUNCTION_SYNTAX"\n");
return SWITCH_STATUS_SUCCESS;
}
-static inline void rtmp_register_invoke_function(const char *name, rtmp_invoke_function_t func)
+static inline void rtmp_register_invoke_function(const char *name, rtmp_invoke_function_t func)
{
switch_core_hash_insert(rtmp_globals.invoke_hash, name, (void*)(intptr_t)func);
}
switch_core_hash_init(&rtmp_globals.invoke_hash);
switch_thread_rwlock_create(&rtmp_globals.profile_rwlock, pool);
switch_thread_rwlock_create(&rtmp_globals.session_rwlock, pool);
-
+
rtmp_register_invoke_function("connect", rtmp_i_connect);
rtmp_register_invoke_function("createStream", rtmp_i_createStream);
rtmp_register_invoke_function("closeStream", rtmp_i_noop);
rtmp_register_invoke_function("receiveAudio", rtmp_i_receiveaudio);
rtmp_register_invoke_function("receiveVideo", rtmp_i_receivevideo);
rtmp_register_invoke_function("log", rtmp_i_log);
-
+
*module_interface = switch_loadable_module_create_module_interface(pool, modname);
rtmp_globals.rtmp_endpoint_interface = switch_loadable_module_create_interface(*module_interface, SWITCH_ENDPOINT_INTERFACE);
rtmp_globals.rtmp_endpoint_interface->interface_name = "rtmp";
rtmp_globals.rtmp_endpoint_interface->io_routines = &rtmp_io_routines;
rtmp_globals.rtmp_endpoint_interface->state_handler = &rtmp_state_handlers;
-
+
SWITCH_ADD_API(api_interface, "rtmp", "rtmp management", rtmp_function, RTMP_FUNCTION_SYNTAX);
SWITCH_ADD_API(api_interface, "rtmp_contact", "rtmp contact", rtmp_contact_function, RTMP_CONTACT_FUNCTION_SYNTAX);
switch_console_set_complete("add rtmp session ::rtmp::list_sessions kill");
switch_console_set_complete("add rtmp session ::rtmp::list_sessions login");
switch_console_set_complete("add rtmp session ::rtmp::list_sessions logout");
-
+
switch_console_add_complete_func("::rtmp::list_profiles", list_profiles);
switch_console_add_complete_func("::rtmp::list_sessions", list_sessions);
-
+
switch_event_bind("mod_rtmp", SWITCH_EVENT_CUSTOM, RTMP_EVENT_CUSTOM, rtmp_event_handler, NULL);
-
+
{
switch_xml_t cfg, xml, x_profiles, x_profile;
const char *file = "rtmp.conf";
}
done:
if (xml) {
- switch_xml_free(xml);
+ switch_xml_free(xml);
}
}
rtmp_globals.running = 1;
-
+
return SWITCH_STATUS_SUCCESS;
}
switch_mutex_lock(rtmp_globals.mutex);
while ((hi = switch_core_hash_first_iter( rtmp_globals.profile_hash, hi))) {
- void *val;
+ void *val;
const void *key;
switch_ssize_t keylen;
rtmp_profile_t *item;
switch_core_hash_this(hi, &key, &keylen, &val);
-
+
item = (rtmp_profile_t *)val;
-
+
switch_mutex_unlock(rtmp_globals.mutex);
rtmp_profile_destroy(&item);
- switch_mutex_lock(rtmp_globals.mutex);
+ switch_mutex_lock(rtmp_globals.mutex);
}
switch_mutex_unlock(rtmp_globals.mutex);
-
+
switch_event_unbind_callback(rtmp_event_handler);
-
+
switch_core_hash_destroy(&rtmp_globals.profile_hash);
switch_core_hash_destroy(&rtmp_globals.session_hash);
switch_core_hash_destroy(&rtmp_globals.invoke_hash);
-
+
rtmp_globals.running = 0;
return SWITCH_STATUS_SUCCESS;
-/*
+/*
* mod_rtmp for FreeSWITCH Modular Media Switching Software Library / Soft-Switch Application
* Copyright (C) 2011, Barracuda Networks Inc.
*
* the Initial Developer. All Rights Reserved.
*
* Contributor(s):
- *
+ *
* Mathieu Rene <mrene@avgs.ca>
*
* mod_rtmp.h -- RTMP Endpoint Module
#include "io.h"
#include "types.h"
-//#define RTMP_DEBUG_IO
+//#define RTMP_DEBUG_IO
#define RTMP_DONT_HOLD
#define RTMP_THREE_WAY_UUID_VARIABLE "rtmp_three_way_uuid"
frameType (byte & 0xf0) » 4 1: keyframe, 2: inter frame, 3: disposable inter frame
0x12: META
-The contents of a meta packet are two AMF packets.
-The first is almost always a short uint16_be length-prefixed UTF-8 string (AMF type 0×02),
-and the second is typically a mixed array (AMF type 0×08). However, the second chunk typically contains a variety of types,
+The contents of a meta packet are two AMF packets.
+The first is almost always a short uint16_be length-prefixed UTF-8 string (AMF type 0×02),
+and the second is typically a mixed array (AMF type 0×08). However, the second chunk typically contains a variety of types,
so a full AMF parser should be used.
*/
static inline uint8_t rtmp_audio_codec(int channels, int bits, int rate, rtmp_audio_format_t format) {
uint8_t codec = 0;
-
+
switch (channels) {
case 1:
break;
default:
return 0;
}
-
+
switch (bits) {
case 8:
break;
codec |= 2;
default:
return 0;
- }
-
+ }
+
switch (rate) {
case 0:
case 5500:
default:
return 0;
}
-
+
switch(format) {
case RTMP_AUDIO_PCM:
break;
default:
return 0;
}
-
+
return codec;
}
#define amf0_is_boolean(_x) (_x && (_x)->type == AMF0_TYPE_BOOLEAN)
#define amf0_is_object(_x) (_x && (_x)->type == AMF0_TYPE_OBJECT)
-static inline char *amf0_get_string(amf0_data *x)
+static inline char *amf0_get_string(amf0_data *x)
{
return (amf0_is_string(x) ? (char*)amf0_string_get_uint8_ts(x) : NULL);
}
-static inline int amf0_get_number(amf0_data *x)
+static inline int amf0_get_number(amf0_data *x)
{
return (amf0_is_number(x) ? amf0_number_get_value(x) : 0);
}
typedef enum {
SFLAG_AUDIO = (1 << 0), /* < Send audio */
SFLAG_VIDEO = (1 << 1) /* < Send video */
-} SFLAGS;
+} SFLAGS;
typedef enum {
PFLAG_RUNNING = (1 << 0)
const char *bind_address; /* < Bind address */
const char *io_name; /* < Name of I/O module (from config) */
int chunksize; /* < Override default chunksize (from config) */
- int buffer_len; /* < Receive buffer length the flash clients should use */
-
+ int buffer_len; /* < Receive buffer length the flash clients should use */
+
switch_hash_t *reg_hash; /* < Registration hashtable */
switch_thread_rwlock_t *reg_rwlock; /* < Registration hash rwlock */
-
+
switch_bool_t auth_calls; /* < Require authentiation */
};
rtmp_profile_t *profile;
char uuid[SWITCH_UUID_FORMATTED_LENGTH+1];
void *io_private;
-
+
rtmp_session_state_t state;
int parse_state;
uint16_t parse_remain; /* < Remaining bytes required before changing parse state */
-
+
int hdrsize; /* < The current header size */
int amfnumber; /* < The current AMF number */
rtmp_state_t amfstate[64];
rtmp_state_t amfstate_out[64];
-
+
switch_mutex_t *socket_mutex;
switch_mutex_t *count_mutex;
int active_sessions;
-
+
unsigned char hsbuf[2048];
int hspos;
uint16_t in_chunksize;
uint16_t out_chunksize;
-
+
/* Connect params */
const char *flashVer;
const char *swfUrl;
const char *tcUrl;
const char *app;
const char *pageUrl;
-
+
uint32_t capabilities;
uint32_t audioCodecs;
uint32_t videoCodecs;
uint32_t videoFunction;
-
+
switch_thread_rwlock_t *rwlock;
-
+
rtmp_private_t *tech_pvt; /* < Active call's tech_pvt */
#ifdef RTMP_DEBUG_IO
FILE *io_debug_in;
FILE *io_debug_out;
#endif
-
+
const char *remote_address;
switch_port_t remote_port;
-
+
switch_hash_t *session_hash; /* < Hash of call uuids and tech_pvt */
switch_thread_rwlock_t *session_rwlock; /* < RWLock protecting session_hash */
-
+
rtmp_account_t *account;
switch_thread_rwlock_t *account_rwlock;
uint32_t flags;
uint64_t recv_ack_window; /* < ACK Window */
uint64_t recv_ack_sent; /* < Bytes ack'd */
uint64_t recv; /* < Bytes received */
-
+
uint32_t send_ack_window;
uint32_t send_ack;
uint32_t send;
switch_time_t send_ack_ts;
-
+
uint32_t send_bw; /* < Current send bandwidth (in bytes/sec) */
-
+
uint32_t next_streamid; /* < The next stream id that will be used */
uint32_t active_streamid; /* < The stream id returned by the last call to createStream */
-
- uint32_t media_streamid; /* < The stream id that was used for the last "play" command,
+
+ uint32_t media_streamid; /* < The stream id that was used for the last "play" command,
where we should send media */
};
unsigned int flags;
switch_codec_t read_codec;
switch_codec_t write_codec;
-
+
switch_frame_t read_frame;
unsigned char databuf[SWITCH_RECOMMENDED_BUFFER_SIZE]; /* < Buffer for read_frame */
-
+
switch_caller_profile_t *caller_profile;
-
+
switch_mutex_t *mutex;
switch_mutex_t *flag_mutex;
-
+
switch_core_session_t *session;
switch_channel_t *channel;
rtmp_session_t *rtmp_session;
-
+
int read_channel; /* RTMP channel #s for read and write */
int write_channel;
uint8_t audio_codec;
uint8_t video_codec;
-
+
switch_time_t stream_start_ts;
switch_timer_t timer;
switch_buffer_t *readbuf;
switch_mutex_t *readbuf_mutex;
-
+
const char *display_callee_id_name;
const char *display_callee_id_number;
-
+
const char *auth_user;
const char *auth_domain;
const char *auth;
};
-typedef enum {
+typedef enum {
MSG_FULLHEADER = 1
} rtmp_message_send_flag_t;
-
+
/* Invokable functions from flash */
RTMP_INVOKE_FUNCTION(rtmp_i_connect);
switch_status_t rtmp_on_exchange_media(switch_core_session_t *session);
switch_status_t rtmp_on_soft_execute(switch_core_session_t *session);
switch_call_cause_t rtmp_outgoing_channel(switch_core_session_t *session, switch_event_t *var_event,
- switch_caller_profile_t *outbound_profile,
- switch_core_session_t **new_session, switch_memory_pool_t **pool, switch_originate_flag_t flags,
- switch_call_cause_t *cancel_cause);
+ switch_caller_profile_t *outbound_profile,
+ switch_core_session_t **new_session, switch_memory_pool_t **pool, switch_originate_flag_t flags,
+ switch_call_cause_t *cancel_cause);
switch_status_t rtmp_read_frame(switch_core_session_t *session, switch_frame_t **frame, switch_io_flag_t flags, int stream_id);
switch_status_t rtmp_write_frame(switch_core_session_t *session, switch_frame_t *frame, switch_io_flag_t flags, int stream_id);
switch_status_t rtmp_kill_channel(switch_core_session_t *session, int sig);
-/*
+/*
* mod_rtmp for FreeSWITCH Modular Media Switching Software Library / Soft-Switch Application
* Copyright (C) 2011-2012, Barracuda Networks Inc.
*
* the Initial Developer. All Rights Reserved.
*
* Contributor(s):
- *
+ *
* Mathieu Rene <mrene@avgs.ca>
* Joao Mesquita <jmesquita@freeswitch.org>
* William King <william.king@quentustech.com>
char *p = buf;
int type = state->buf[0] << 8 | state->buf[1];
int i;
-
+
for (i = 2; i < state->origlen; i++) {
p += sprintf(p, "%02x ", state->buf[i] & 0xFF);
}
-
+
switch_log_printf(SWITCH_CHANNEL_UUID_LOG(rsession->uuid), SWITCH_LOG_DEBUG, "Control (%d): %s\n", type, buf);
-
+
switch(type) {
case RTMP_CTRL_STREAM_BEGIN:
break;
{
uint32_t now = ((switch_micro_time_now()/1000) & 0xFFFFFFFF);
uint32_t sent = state->buf[2] << 24 | state->buf[3] << 16 | state->buf[4] << 8 | state->buf[5];
-
+
switch_log_printf(SWITCH_CHANNEL_UUID_LOG(rsession->uuid), SWITCH_LOG_INFO, "Ping reply: %d ms\n", (int)(now - sent));
}
break;
int i = 0;
buffer_helper_t helper = { state->buf, 0, state->origlen };
int64_t transaction_id;
- const char *command;
+ const char *command;
int argc = 0;
amf0_data *argv[100] = { 0 };
rtmp_invoke_function_t function;
}
amf0_data_free(dump);
}
- printf("<<<<< END AMF MSG\n");
+ printf("<<<<< END AMF MSG\n");
#endif
-
+
#ifdef RTMP_DEBUG_IO
{
helper.pos = 0;
fprintf(rsession->io_debug_in, "<<<<< END AMF MSG\n");
fflush(rsession->io_debug_in);
}
-#endif
-
+#endif
+
helper.pos = 0;
while (argc < switch_arraylen(argv) && (argv[argc++] = amf0_data_read(my_buffer_read, &helper)));
switch_log_printf(SWITCH_CHANNEL_UUID_LOG(rsession->uuid), SWITCH_LOG_WARNING, "Bogus INVOKE request\n");
return;
}
-
+
transaction_id = amf0_get_number(argv[i++]);
-
- switch_log_printf(SWITCH_CHANNEL_UUID_LOG(rsession->uuid), SWITCH_LOG_DEBUG, "[amfnumber=%d] Got INVOKE for %s\n", amfnumber,
+
+ switch_log_printf(SWITCH_CHANNEL_UUID_LOG(rsession->uuid), SWITCH_LOG_DEBUG, "[amfnumber=%d] Got INVOKE for %s\n", amfnumber,
command);
if ((function = (rtmp_invoke_function_t)(intptr_t)switch_core_hash_find(rtmp_globals.invoke_hash, command))) {
switch_xml_t xml = NULL, x_param, x_params;
switch_bool_t allow_empty_password = SWITCH_FALSE;
const char *passwd = NULL;
- switch_bool_t disallow_multiple_registration = SWITCH_FALSE;
+ switch_bool_t disallow_multiple_registration = SWITCH_FALSE;
switch_event_t *locate_params;
-
+
switch_event_create(&locate_params, SWITCH_EVENT_GENERAL);
switch_assert(locate_params);
switch_event_add_header_string(locate_params, SWITCH_STACK_BOTTOM, "source", "mod_rtmp");
-
+
/* Locate user */
if (switch_xml_locate_user_merged("id", user, domain, NULL, &xml, locate_params) != SWITCH_STATUS_SUCCESS) {
switch_log_printf(SWITCH_CHANNEL_UUID_LOG(rsession->uuid), SWITCH_LOG_WARNING, "Authentication failed. No such user %s@%s\n", user, domain);
if (!strcasecmp(var, "allow-empty-password")) {
allow_empty_password = switch_true(val);
}
- if (!strcasecmp(var, "disallow-multiple-registration")) {
+ if (!strcasecmp(var, "disallow-multiple-registration")) {
disallow_multiple_registration = switch_true(val);
}
}
}
-
+
if (zstr(passwd)) {
if (allow_empty_password) {
- status = SWITCH_STATUS_SUCCESS;
+ status = SWITCH_STATUS_SUCCESS;
} else {
switch_log_printf(SWITCH_CHANNEL_UUID_LOG(rsession->uuid), SWITCH_LOG_WARNING, "Authentication failed for %s@%s: empty password not allowed\n", user, switch_str_nil(domain));
}
goto done;
}
-
+
auth = switch_core_sprintf(rsession->pool, "%s:%s@%s:%s", rsession->uuid, user, domain, passwd);
switch_md5_string(md5, auth, strlen(auth));
-
+
if (!strncmp(md5, authmd5, SWITCH_MD5_DIGEST_STRING_SIZE)) {
status = SWITCH_STATUS_SUCCESS;
} else {
switch_log_printf(SWITCH_CHANNEL_UUID_LOG(rsession->uuid), SWITCH_LOG_WARNING, "Authentication failed for %s@%s\n", user, domain);
}
-
- if (disallow_multiple_registration) {
- switch_hash_index_t *hi;
- switch_thread_rwlock_rdlock(rsession->profile->session_rwlock);
- for (hi = switch_core_hash_first(rsession->profile->session_hash); hi; hi = switch_core_hash_next(&hi)) {
- void *val;
- const void *key;
- switch_ssize_t keylen;
- rtmp_session_t *item;
- switch_core_hash_this(hi, &key, &keylen, &val);
-
- item = (rtmp_session_t *)val;
- if (rtmp_session_check_user(item, user, domain) == SWITCH_STATUS_SUCCESS) {
- switch_log_printf(SWITCH_CHANNEL_UUID_LOG(rsession->uuid), SWITCH_LOG_INFO, "Logging out %s@%s on RTMP sesssion [%s]\n", user, domain, item->uuid);
- if (rtmp_session_logout(item, user, domain) != SWITCH_STATUS_SUCCESS) {
- switch_log_printf(SWITCH_CHANNEL_UUID_LOG(rsession->uuid), SWITCH_LOG_ERROR, "Unable to logout %s@%s on RTMP sesssion [%s]\n", user, domain, item->uuid);
- }
- }
-
- }
- switch_thread_rwlock_unlock(rsession->profile->session_rwlock);
- }
-
+
+ if (disallow_multiple_registration) {
+ switch_hash_index_t *hi;
+ switch_thread_rwlock_rdlock(rsession->profile->session_rwlock);
+ for (hi = switch_core_hash_first(rsession->profile->session_hash); hi; hi = switch_core_hash_next(&hi)) {
+ void *val;
+ const void *key;
+ switch_ssize_t keylen;
+ rtmp_session_t *item;
+ switch_core_hash_this(hi, &key, &keylen, &val);
+
+ item = (rtmp_session_t *)val;
+ if (rtmp_session_check_user(item, user, domain) == SWITCH_STATUS_SUCCESS) {
+ switch_log_printf(SWITCH_CHANNEL_UUID_LOG(rsession->uuid), SWITCH_LOG_INFO, "Logging out %s@%s on RTMP sesssion [%s]\n", user, domain, item->uuid);
+ if (rtmp_session_logout(item, user, domain) != SWITCH_STATUS_SUCCESS) {
+ switch_log_printf(SWITCH_CHANNEL_UUID_LOG(rsession->uuid), SWITCH_LOG_ERROR, "Unable to logout %s@%s on RTMP sesssion [%s]\n", user, domain, item->uuid);
+ }
+ }
+
+ }
+ switch_thread_rwlock_unlock(rsession->profile->session_rwlock);
+ }
+
done:
if (xml) {
switch_xml_free(xml);
switch_status_t amf_object_to_event(amf0_data *obj, switch_event_t **event)
{
switch_status_t status = SWITCH_STATUS_SUCCESS;
-
+
if (obj && obj->type == AMF0_TYPE_OBJECT) {
amf0_node *node;
if (!*event) {
for (node = amf0_object_first(obj); node; node = amf0_object_next(node)) {
const char *name = amf0_get_string(amf0_object_get_name(node));
const char *value = amf0_get_string(amf0_object_get_data(node));
-
+
if (!zstr(name) && !zstr(value)) {
if (!strcmp(name, "_body")) {
switch_event_add_body(*event, "%s", value);
} else {
status = SWITCH_STATUS_FALSE;
}
-
+
return status;
}
-switch_status_t amf_event_to_object(amf0_data **obj, switch_event_t *event)
+switch_status_t amf_event_to_object(amf0_data **obj, switch_event_t *event)
{
switch_event_header_t *hp;
const char *body;
-
+
switch_assert(event);
switch_assert(obj);
-
+
if (!*obj) {
*obj = amf0_object_new();
}
-
+
for (hp = event->headers; hp; hp = hp->next) {
amf0_object_add(*obj, hp->name, amf0_str(hp->value));
}
-
+
body = switch_event_get_body(event);
if (!zstr(body)) {
amf0_object_add(*obj, "_body", amf0_str(body));
}
-
+
return SWITCH_STATUS_SUCCESS;
}
void rtmp_set_chunksize(rtmp_session_t *rsession, uint32_t chunksize)
{
if (rsession->out_chunksize != chunksize) {
- unsigned char buf[] = {
+ unsigned char buf[] = {
INT32(chunksize)
};
-
+
rtmp_send_message(rsession, 2 /*amfnumber*/, 0, RTMP_TYPE_CHUNKSIZE, 0, buf, sizeof(buf), MSG_FULLHEADER);
rsession->out_chunksize = chunksize;
}
{
switch_channel_t *channel = switch_core_session_get_channel(session);
switch_event_header_t *he;
-
+
if (!*event && switch_event_create(event, SWITCH_EVENT_CLONE) != SWITCH_STATUS_SUCCESS) {
return;
}
-
+
if ((he = switch_channel_variable_first(channel))) {
for (; he; he = he->next) {
if (!strncmp(he->name, RTMP_USER_VARIABLE_PREFIX, strlen(RTMP_USER_VARIABLE_PREFIX))) {
void rtmp_get_user_variables_event(switch_event_t **event, switch_event_t *var_event)
{
switch_event_header_t *he;
-
+
if (!*event && switch_event_create(event, SWITCH_EVENT_CLONE) != SWITCH_STATUS_SUCCESS) {
return;
}
void rtmp_session_send_onattach(rtmp_session_t *rsession)
{
const char *uuid = "";
-
+
if (rsession->tech_pvt) {
uuid = switch_core_session_get_uuid(rsession->tech_pvt->session);
}
amf0_number_new(0),
amf0_null_new(),
amf0_str(uuid), NULL);
-
+
}
void rtmp_send_display_update(switch_core_session_t *session)
switch_caller_profile_t *caller_profile = switch_channel_get_caller_profile(channel);
switch_event_t *event = NULL;
amf0_data *obj = NULL;
-
+
if (var_event) {
rtmp_get_user_variables_event(&event, var_event);
} else {
rtmp_get_user_variables(&event, session);
}
-
+
if (event) {
amf_event_to_object(&obj, event);
switch_event_destroy(&event);
}
-
+
rtmp_send_invoke_free(rsession, 3, 0, 0,
amf0_str("incomingCall"),
amf0_number_new(0),
rtmp_private_t *tech_pvt = switch_core_session_get_private(session);
rtmp_session_t *rsession = tech_pvt->rtmp_session;
switch_channel_t *channel = switch_core_session_get_channel(session);
-
+
rtmp_send_invoke_free(rsession, 3, 0, 0,
amf0_str("onHangup"),
amf0_number_new(0),
void rtmp_send_event(rtmp_session_t *rsession, switch_event_t *event)
{
amf0_data *obj = NULL;
-
+
switch_assert(event != NULL);
switch_assert(rsession != NULL);
-
+
if (amf_event_to_object(&obj, event) == SWITCH_STATUS_SUCCESS) {
- rtmp_send_invoke_free(rsession, 3, 0, 0, amf0_str("event"), amf0_number_new(0), amf0_null_new(), obj, NULL);
+ rtmp_send_invoke_free(rsession, 3, 0, 0, amf0_str("event"), amf0_number_new(0), amf0_null_new(), obj, NULL);
}
}
void rtmp_ping(rtmp_session_t *rsession)
{
- uint32_t now = (uint32_t)((switch_micro_time_now() / 1000) & 0xFFFFFFFF);
+ uint32_t now = (uint32_t)((switch_micro_time_now() / 1000) & 0xFFFFFFFF);
unsigned char buf[] = {
INT16(RTMP_CTRL_PING_REQUEST),
INT32(now)
amf0_data *data;
unsigned char buf[AMF_MAX_SIZE];
buffer_helper_t helper = { buf, 0, AMF_MAX_SIZE };
-
+
while ((data = va_arg(list, amf0_data*))) {
//amf0_data_dump(stdout, data, 0);
//printf("\n");
switch_size_t hdrsize = 1;
switch_status_t status = SWITCH_STATUS_SUCCESS;
rtmp_state_t *state = &rsession->amfstate_out[amfnumber];
-
- if ((rsession->send_ack + rsession->send_ack_window) < rsession->send &&
- (type == RTMP_TYPE_VIDEO || type == RTMP_TYPE_AUDIO)) {
+
+ if ((rsession->send_ack + rsession->send_ack_window) < rsession->send &&
+ (type == RTMP_TYPE_VIDEO || type == RTMP_TYPE_AUDIO)) {
/* We're sending too fast, drop the frame */
- switch_log_printf(SWITCH_CHANNEL_UUID_LOG(rsession->uuid), SWITCH_LOG_DEBUG,
+ switch_log_printf(SWITCH_CHANNEL_UUID_LOG(rsession->uuid), SWITCH_LOG_DEBUG,
"DROP %s FRAME [amfnumber=%d type=0x%x stream_id=0x%x] len=%"SWITCH_SIZE_T_FMT" \n",
type == RTMP_TYPE_AUDIO ? "AUDIO" : "VIDEO", amfnumber, type, stream_id, len);
return SWITCH_STATUS_SUCCESS;
}
if (type != RTMP_TYPE_AUDIO && type != RTMP_TYPE_VIDEO && type != RTMP_TYPE_ACK) {
- switch_log_printf(SWITCH_CHANNEL_UUID_LOG(rsession->uuid), SWITCH_LOG_DEBUG,
- "[amfnumber=%d type=0x%x stream_id=0x%x] len=%"SWITCH_SIZE_T_FMT" \n", amfnumber, type, stream_id, len);
+ switch_log_printf(SWITCH_CHANNEL_UUID_LOG(rsession->uuid), SWITCH_LOG_DEBUG,
+ "[amfnumber=%d type=0x%x stream_id=0x%x] len=%"SWITCH_SIZE_T_FMT" \n", amfnumber, type, stream_id, len);
}
-
+
#ifdef RTMP_DEBUG_IO
{
fprintf(rsession->io_debug_out, "[amfnumber=%d type=0x%x stream_id=0x%x] len=%"SWITCH_SIZE_T_FMT" \n", amfnumber, type, stream_id, len);
fprintf(rsession->io_debug_out, "<<<<< END AMF MSG\n");
}
fflush(rsession->io_debug_out);
-
+
}
#endif
-
+
/* Find out what is the smallest header we can use */
if (!(flags & MSG_FULLHEADER) && stream_id > 0 && state->stream_id == stream_id && timestamp >= state->ts) {
if (state->type == type && state->origlen == (int)len) {
header[2] = (timestamp >> 8) & 0xFF;
header[3] = timestamp & 0xFF;
}
-
+
state->ts = timestamp;
state->type = type;
state->origlen = len;
switch_goto_status(SWITCH_STATUS_FALSE, end);
}
rsession->send += hdrsize;
-
+
/* Write one chunk of data */
if (rsession->profile->io->write(rsession, (unsigned char*)message, &chunksize) != SWITCH_STATUS_SUCCESS) {
switch_goto_status(SWITCH_STATUS_FALSE, end);
}
rsession->send += chunksize;
pos += chunksize;
-
+
/* Send more chunks if we need to */
while (((signed)len - (signed)pos) > 0) {
switch_mutex_unlock(rsession->socket_mutex);
switch_goto_status(SWITCH_STATUS_FALSE, end);
}
rsession->send += hdrsize;
-
+
chunksize = (len - pos) < rsession->out_chunksize ? (len - pos) : rsession->out_chunksize;
-
+
if (rsession->profile->io->write(rsession, message + pos, &chunksize) != SWITCH_STATUS_SUCCESS) {
switch_goto_status(SWITCH_STATUS_FALSE, end);
}
if (rsession->state == RS_HANDSHAKE) {
s = 1537 - rsession->hspos;
-
+
if (rsession->profile->io->read(rsession, rsession->hsbuf + rsession->hspos, &s) != SWITCH_STATUS_SUCCESS) {
switch_log_printf(SWITCH_CHANNEL_UUID_LOG(rsession->uuid), SWITCH_LOG_NOTICE, "Disconnected from flash client\n");
return SWITCH_STATUS_FALSE;
}
-
+
rsession->hspos += s;
-
+
/* Receive C0 and C1 */
if (rsession->hspos < 1537) {
/* Not quite there yet */
return SWITCH_STATUS_SUCCESS;
}
-
+
/* Send reply (S0 + S1) */
memset(buf, 0, sizeof(buf));
*buf = '\x03';
s = 1537;
rsession->profile->io->write(rsession, (unsigned char*)buf, &s);
-
+
/* Send S2 */
s = 1536;
rsession->profile->io->write(rsession, rsession->hsbuf, &s);
-
+
switch_log_printf(SWITCH_CHANNEL_UUID_LOG(rsession->uuid), SWITCH_LOG_DEBUG, "Sent handshake response\n");
-
+
rsession->state++;
rsession->hspos = 0;
} else if (rsession->state == RS_HANDSHAKE2) {
s = 1536 - rsession->hspos;
-
+
/* Receive C2 */
if (rsession->profile->io->read(rsession, rsession->hsbuf + rsession->hspos, &s) != SWITCH_STATUS_SUCCESS) {
switch_log_printf(SWITCH_CHANNEL_UUID_LOG(rsession->uuid), SWITCH_LOG_NOTICE, "Disconnected from flash client\n");
return SWITCH_STATUS_FALSE;
}
-
+
rsession->hspos += s;
-
+
if (rsession->hspos < 1536) {
/* Not quite there yet */
return SWITCH_STATUS_SUCCESS;
}
-
+
rsession->state++;
-
+
//s = 1536;
//rsession->profile->io->write(rsession, (char*)buf, &s);
-
+
switch_log_printf(SWITCH_CHANNEL_UUID_LOG(rsession->uuid), SWITCH_LOG_DEBUG, "Done with handshake\n");
-
-
+
+
return SWITCH_STATUS_SUCCESS;
} else if (rsession->state == RS_ESTABLISHED) {
/* Process RTMP packet */
switch_log_printf(SWITCH_CHANNEL_UUID_LOG(rsession->uuid), SWITCH_LOG_NOTICE, "Disconnected from flash client\n");
return SWITCH_STATUS_FALSE;
}
-
+
rsession->recv += s;
-
+
switch(buf[0] >> 6) {
case 0:
rsession->hdrsize = 12;
if (rsession->amfnumber > 64) {
switch_log_printf(SWITCH_CHANNEL_UUID_LOG(rsession->uuid), SWITCH_LOG_ERROR, "Protocol error\n");
return SWITCH_STATUS_FALSE;
- }
+ }
//switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "Header size: %d AMF Number: %d\n", rsession->hdrsize, rsession->amfnumber);
rsession->parse_state++;
if (rsession->hdrsize == 1) {
}
rsession->parse_remain = 0;
break;
-
+
case 1:
{
/* Read full header and decode */
rtmp_state_t *state = &rsession->amfstate[rsession->amfnumber];
uint8_t *hdr = (uint8_t*)state->header.sz;
unsigned char *readbuf = (unsigned char*)hdr;
-
+
if (!rsession->parse_remain) {
- rsession->parse_remain = s = rsession->hdrsize - 1;
+ rsession->parse_remain = s = rsession->hdrsize - 1;
} else {
s = rsession->parse_remain;
readbuf += (rsession->hdrsize - 1) - s;
}
-
+
if ( !(s < 12 && s > 0) ) { /** XXX **/
switch_log_printf(SWITCH_CHANNEL_UUID_LOG(rsession->uuid), SWITCH_LOG_NOTICE, "Protocol error: Invalid header size\n");
return SWITCH_STATUS_FALSE;
}
-
+
if (rsession->profile->io->read(rsession, readbuf, &s) != SWITCH_STATUS_SUCCESS) {
switch_log_printf(SWITCH_CHANNEL_UUID_LOG(rsession->uuid), SWITCH_LOG_NOTICE, "Disconnected from flash client\n");
return SWITCH_STATUS_FALSE;
}
-
+
rsession->parse_remain -= s;
if (rsession->parse_remain > 0) {
/* More data please */
return SWITCH_STATUS_SUCCESS;
}
-
+
rsession->recv += s;
-
+
if (rsession->hdrsize == 12) {
state->ts = (hdr[0] << 16) | (hdr[1] << 8) | (hdr[2]);
state->ts_delta = 0;
/* Type 3: Re-use timestamp delta if we have one */
state->ts += state->ts_delta;
}
-
+
if (rsession->hdrsize >= 8) {
/* Reset length counter since its included in the header */
state->remainlen = state->origlen = (hdr[3] << 16) | (hdr[4] << 8) | (hdr[5]);
if (rsession->hdrsize == 12) {
state->stream_id = (hdr[10] << 24) | (hdr[9] << 16) | (hdr[8] << 8) | hdr[7];
}
-
+
if (rsession->hdrsize >= 8 && state->origlen == 0) {
/* Happens we sometimes get a 0 length packet */
rsession->parse_state = 0;
return SWITCH_STATUS_SUCCESS;
}
-
+
/* FIXME: Handle extended timestamps */
if (state->ts == 0x00ffffff) {
return SWITCH_STATUS_FALSE;
}
-
+
rsession->parse_state++;
}
- case 2:
+ case 2:
{
rtmp_state_t *state = &rsession->amfstate[rsession->amfnumber];
-
+
if (rsession->parse_remain > 0) {
s = rsession->parse_remain;
} else {
s = state->remainlen < rsession->in_chunksize ? state->remainlen : rsession->in_chunksize;
- rsession->parse_remain = s;
+ rsession->parse_remain = s;
}
-
+
if (!s) {
/* Restart from beginning */
s = state->remainlen = state->origlen;
return SWITCH_STATUS_FALSE;
}
}
-
+
/* Sanity check */
if ((state->buf_pos + s) > AMF_MAX_SIZE) {
switch_log_printf(SWITCH_CHANNEL_UUID_LOG(rsession->uuid), SWITCH_LOG_ERROR, "WTF %"SWITCH_SIZE_T_FMT" %"SWITCH_SIZE_T_FMT"\n",
state->buf_pos, s);
-
+
switch_log_printf(SWITCH_CHANNEL_UUID_LOG(rsession->uuid), SWITCH_LOG_ERROR, "Protocol error: exceeding max AMF packet size\n");
return SWITCH_STATUS_FALSE;
}
if (s > rsession->in_chunksize) {
switch_log_printf(SWITCH_CHANNEL_UUID_LOG(rsession->uuid), SWITCH_LOG_ERROR, "Protocol error: invalid chunksize\n");
- return SWITCH_STATUS_FALSE;
+ return SWITCH_STATUS_FALSE;
}
-
+
if (rsession->profile->io->read(rsession, state->buf + state->buf_pos, &s) != SWITCH_STATUS_SUCCESS) {
switch_log_printf(SWITCH_CHANNEL_UUID_LOG(rsession->uuid), SWITCH_LOG_NOTICE, "Disconnected from flash client\n");
return SWITCH_STATUS_FALSE;
}
rsession->recv += s;
-
+
state->remainlen -= s;
rsession->parse_remain -= s;
state->buf_pos += s;
-
+
if (rsession->parse_remain > 0) {
/* Need more data */
return SWITCH_STATUS_SUCCESS;
switch_thread_rwlock_wrlock(rsession->rwlock);
if (rsession->tech_pvt) {
uint16_t len = state->origlen;
-
+
if (!rsession->tech_pvt->readbuf) {
switch_thread_rwlock_unlock(rsession->rwlock);
return SWITCH_STATUS_FALSE;
}
-
+
switch_mutex_lock(rsession->tech_pvt->readbuf_mutex);
if (rsession->tech_pvt->maxlen && switch_buffer_inuse(rsession->tech_pvt->readbuf) > (switch_size_t)(rsession->tech_pvt->maxlen * 40)) {
rsession->tech_pvt->over_size++;
rsession->tech_pvt->over_size = 0;
}
if (rsession->tech_pvt->over_size > 10) {
- switch_log_printf(SWITCH_CHANNEL_UUID_LOG(rsession->uuid), SWITCH_LOG_DEBUG,
- "%s buffer > %u for 10 consecutive packets... Flushing buffer\n",
+ switch_log_printf(SWITCH_CHANNEL_UUID_LOG(rsession->uuid), SWITCH_LOG_DEBUG,
+ "%s buffer > %u for 10 consecutive packets... Flushing buffer\n",
switch_core_session_get_name(rsession->tech_pvt->session), rsession->tech_pvt->maxlen * 40);
switch_buffer_zero(rsession->tech_pvt->readbuf);
#ifdef RTMP_DEBUG_IO
switch_time_t now = switch_micro_time_now();
uint32_t ack = (state->buf[0] << 24) | (state->buf[1] << 16) | (state->buf[2] << 8) | (state->buf[3]);
uint32_t delta = rsession->send_ack_ts == 0 ? 0 : now - rsession->send_ack_ts;
-
+
delta /= 1000000; /* microseconds -> seconds */
-
+
if (delta) {
rsession->send_bw = (ack - rsession->send_ack) / delta;
}
-
+
rsession->send_ack = ack;
- rsession->send_ack_ts = switch_micro_time_now();
+ rsession->send_ack_ts = switch_micro_time_now();
break;
}
default:
}
rsession->parse_state = 0;
-
+
/* Send an ACK if we need to */
if (rsession->recv - rsession->recv_ack_sent >= rsession->recv_ack_window) {
unsigned char ackbuf[] = { INT32(rsession->recv) };
rtmp_send_message(rsession, 2/*chunkstream*/, 0/*ts*/, RTMP_TYPE_ACK, 0/*msg stream id */, ackbuf, sizeof(ackbuf), 0 /*flags*/);
rsession->recv_ack_sent = rsession->recv;
}
-
+
}
- }
+ }
}
-
+
return SWITCH_STATUS_SUCCESS;
}
-/*
+/*
* mod_rtmp for FreeSWITCH Modular Media Switching Software Library / Soft-Switch Application
* Copyright (C) 2011-2012, Barracuda Networks Inc.
*
* the Initial Developer. All Rights Reserved.
*
* Contributor(s):
- *
+ *
* Mathieu Rene <mrene@avgs.ca>
*
* rtmp.c -- RTMP Signalling functions
#include "io.h"
#include "types.h"
-/* RTMP_INVOKE_FUNCTION is a macro that expands to:
+/* RTMP_INVOKE_FUNCTION is a macro that expands to:
switch_status_t function(rtmp_session_t *rsession, rtmp_state_t *state, int amfnumber, int transaction_id, int argc, amf0_data *argv[])
*/
{
amf0_data *object1 = amf0_object_new(), *object2 = amf0_object_new(), *params = argv[0], *d;
const char *s;
-
+
if ((d = amf0_object_get(params, "app")) && (s = amf0_get_string(d))) {
rsession->app = switch_core_strdup(rsession->pool, s);
}
-
+
if ((d = amf0_object_get(params, "flashVer")) && (s = amf0_get_string(d))) {
- rsession->flashVer = switch_core_strdup(rsession->pool, s);
+ rsession->flashVer = switch_core_strdup(rsession->pool, s);
}
if ((d = amf0_object_get(params, "swfUrl")) && (s = amf0_get_string(d))) {
- rsession->swfUrl = switch_core_strdup(rsession->pool, s);
+ rsession->swfUrl = switch_core_strdup(rsession->pool, s);
}
if ((d = amf0_object_get(params, "tcUrl")) && (s = amf0_get_string(d))) {
- rsession->tcUrl = switch_core_strdup(rsession->pool, s);
+ rsession->tcUrl = switch_core_strdup(rsession->pool, s);
}
if ((d = amf0_object_get(params, "pageUrl")) && (s = amf0_get_string(d))) {
- rsession->pageUrl = switch_core_strdup(rsession->pool, s);
+ rsession->pageUrl = switch_core_strdup(rsession->pool, s);
}
if ((d = amf0_object_get(params, "capabilities"))) {
amf0_object_add(object2, "description", amf0_str("Connection succeeded"));
amf0_object_add(object2, "clientId", amf0_number_new(217834719));
amf0_object_add(object2, "objectEncoding", amf0_number_new(0));
-
+
rtmp_set_chunksize(rsession, rsession->profile->chunksize);
{
unsigned char ackbuf[] = { INT32(RTMP_DEFAULT_ACK_WINDOW) };
rtmp_send_message(rsession, 2, 0, RTMP_TYPE_WINDOW_ACK_SIZE, 0, ackbuf, sizeof(ackbuf), MSG_FULLHEADER);
}
-
+
{
unsigned char ackbuf[] = { INT32(RTMP_DEFAULT_ACK_WINDOW), 0x1 /* Soft limit */};
rtmp_send_message(rsession, 2, 0, RTMP_TYPE_SET_PEER_BW, 0, ackbuf, sizeof(ackbuf), MSG_FULLHEADER);
}
-
+
{
- unsigned char buf[] = {
+ unsigned char buf[] = {
INT16(RTMP_CTRL_STREAM_BEGIN),
INT32(0)
};
-
+
rtmp_send_message(rsession, 2, 0, RTMP_TYPE_USERCTRL, 0, buf, sizeof(buf), 0);
}
-
+
/* respond with a success message */
rtmp_send_invoke_free(rsession, amfnumber, 0, 0,
amf0_str("_result"),
- amf0_number_new(1),
- object1,
- object2,
+ amf0_number_new(1),
+ object1,
+ object2,
NULL);
-
+
rtmp_send_invoke_free(rsession, 3, 0, 0,
amf0_str("connected"),
amf0_number_new(0),
amf0_null_new(),
amf0_str(rsession->uuid), NULL);
-
+
switch_log_printf(SWITCH_CHANNEL_UUID_LOG(rsession->uuid), SWITCH_LOG_NOTICE, "Sent connect reply\n");
-
- return SWITCH_STATUS_SUCCESS;
+
+ return SWITCH_STATUS_SUCCESS;
}
RTMP_INVOKE_FUNCTION(rtmp_i_createStream)
-{
+{
rtmp_send_invoke_free(rsession, amfnumber, 0, 0,
amf0_str("_result"),
amf0_number_new(transaction_id),
amf0_null_new(),
amf0_number_new(rsession->next_streamid),
NULL);
-
+
switch_log_printf(SWITCH_CHANNEL_UUID_LOG(rsession->uuid), SWITCH_LOG_INFO, "Replied to createStream (%u)\n", rsession->next_streamid);
-
+
rsession->next_streamid++;
-
+
return SWITCH_STATUS_SUCCESS;
}
RTMP_INVOKE_FUNCTION(rtmp_i_receiveaudio)
{
- switch_bool_t enabled = argv[1] ? amf0_boolean_get_value(argv[1]) : SWITCH_FALSE;
+ switch_bool_t enabled = argv[1] ? amf0_boolean_get_value(argv[1]) : SWITCH_FALSE;
if (enabled) {
switch_set_flag(rsession, SFLAG_AUDIO);
switch_clear_flag(rsession, SFLAG_AUDIO);
}
- switch_log_printf(SWITCH_CHANNEL_UUID_LOG(rsession->uuid), SWITCH_LOG_INFO, "%sending audio\n", enabled ? "S" : "Not s");
+ switch_log_printf(SWITCH_CHANNEL_UUID_LOG(rsession->uuid), SWITCH_LOG_INFO, "%sending audio\n", enabled ? "S" : "Not s");
- return SWITCH_STATUS_SUCCESS;
+ return SWITCH_STATUS_SUCCESS;
}
RTMP_INVOKE_FUNCTION(rtmp_i_receivevideo)
{
- switch_bool_t enabled = argv[1] ? amf0_boolean_get_value(argv[1]) : SWITCH_FALSE;
+ switch_bool_t enabled = argv[1] ? amf0_boolean_get_value(argv[1]) : SWITCH_FALSE;
if (enabled) {
switch_set_flag(rsession, SFLAG_VIDEO);
switch_clear_flag(rsession, SFLAG_VIDEO);
}
- switch_log_printf(SWITCH_CHANNEL_UUID_LOG(rsession->uuid), SWITCH_LOG_INFO, "%sending video\n", enabled ? "S" : "Not s");
+ switch_log_printf(SWITCH_CHANNEL_UUID_LOG(rsession->uuid), SWITCH_LOG_INFO, "%sending video\n", enabled ? "S" : "Not s");
- return SWITCH_STATUS_SUCCESS;
+ return SWITCH_STATUS_SUCCESS;
}
{
amf0_data *obj = amf0_object_new();
amf0_data *object = amf0_object_new();
-
+
switch_log_printf(SWITCH_CHANNEL_UUID_LOG(rsession->uuid), SWITCH_LOG_DEBUG, "Got play for %s on stream %d\n", switch_str_nil(amf0_get_string(argv[1])),
state->stream_id);
/* Set outgoing chunk size to 1024 bytes */
rtmp_set_chunksize(rsession, 1024);
-
+
rsession->media_streamid = state->stream_id;
/* Send StreamBegin on the current stream */
{
- unsigned char buf[] = {
+ unsigned char buf[] = {
INT16(RTMP_CTRL_STREAM_BEGIN),
INT32(rsession->media_streamid)
};
rtmp_send_message(rsession, 2, 0, RTMP_TYPE_USERCTRL, 0, buf, sizeof(buf), 0);
}
-
+
{
unsigned char buf[] = {
INT32(rsession->profile->buffer_len)
};
rtmp_send_message(rsession, 2, 0, RTMP_TYPE_USERCTRL, 0, buf, sizeof(buf), 0);
- }
+ }
/* Send onStatus */
amf0_object_add(object, "level", amf0_str("status"));
amf0_object_add(object, "description", amf0_str("description"));
amf0_object_add(object, "details", amf0_str("details"));
amf0_object_add(object, "clientid", amf0_number_new(217834719));
-
+
rtmp_send_invoke_free(rsession, RTMP_DEFAULT_STREAM_NOTIFY, 0, rsession->media_streamid,
amf0_str("onStatus"),
amf0_number_new(1),
amf0_null_new(),
object, NULL);
-
+
object = amf0_object_new();
amf0_object_add(object, "level", amf0_str("status"));
object, NULL);
amf0_object_add(obj, "code", amf0_str("NetStream.Data.Start"));
-
- rtmp_send_notify_free(rsession, RTMP_DEFAULT_STREAM_NOTIFY, 0, rsession->media_streamid,
+
+ rtmp_send_notify_free(rsession, RTMP_DEFAULT_STREAM_NOTIFY, 0, rsession->media_streamid,
amf0_str("onStatus"),
obj, NULL);
-
+
rtmp_send_notify_free(rsession, RTMP_DEFAULT_STREAM_NOTIFY, 0, rsession->media_streamid,
amf0_str("|RtmpSampleAccess"),
amf0_boolean_new(1),
amf0_boolean_new(1), NULL);
-
+
return SWITCH_STATUS_SUCCESS;
}
RTMP_INVOKE_FUNCTION(rtmp_i_publish)
{
-
+
unsigned char buf[] = {
INT16(RTMP_CTRL_STREAM_BEGIN),
INT32(state->stream_id)
};
-
+
rtmp_send_message(rsession, 2, 0, RTMP_TYPE_USERCTRL, 0, buf, sizeof(buf), 0);
-
+
rtmp_send_invoke_free(rsession, amfnumber, 0, 0,
amf0_str("_result"),
- amf0_number_new(transaction_id),
- amf0_null_new(),
+ amf0_number_new(transaction_id),
+ amf0_null_new(),
amf0_null_new(),
NULL);
-
-
+
+
switch_log_printf(SWITCH_CHANNEL_UUID_LOG(rsession->uuid), SWITCH_LOG_INFO, "Got publish on stream %u.\n", state->stream_id);
-
+
return SWITCH_STATUS_SUCCESS;
}
RTMP_INVOKE_FUNCTION(rtmp_i_makeCall)
{
- switch_core_session_t *newsession = NULL;
+ switch_core_session_t *newsession = NULL;
char *number = NULL;
-
+
if ((number = amf0_get_string(argv[1]))) {
switch_event_t *event = NULL;
char *auth, *user = NULL, *domain = NULL;
-
+
if ((auth = amf0_get_string(argv[2])) && !zstr(auth)) {
switch_split_user_domain(auth, &user, &domain);
if (rtmp_session_check_user(rsession, user, domain) != SWITCH_STATUS_SUCCESS) {
switch_log_printf(SWITCH_CHANNEL_UUID_LOG(rsession->uuid), SWITCH_LOG_WARNING, "Unauthorized call to %s, client is not logged in\n", number);
return SWITCH_STATUS_FALSE;
}
-
+
if (amf0_is_object(argv[3])) {
amf_object_to_event(argv[3], &event);
}
-
+
if (rtmp_session_create_call(rsession, &newsession, 0, RTMP_DEFAULT_STREAM_AUDIO, number, user, domain, event) != SWITCH_CAUSE_SUCCESS) {
switch_log_printf(SWITCH_CHANNEL_UUID_LOG(rsession->uuid), SWITCH_LOG_ERROR, "Couldn't create call.\n");
}
-
+
if (event) {
switch_event_destroy(&event);
}
}
-
+
if (newsession) {
rtmp_private_t *new_pvt = switch_core_session_get_private(newsession);
rtmp_send_invoke_free(rsession, 3, 0, 0,
amf0_str(switch_str_nil(number)),
amf0_str(switch_str_nil(new_pvt->auth)),
NULL);
-
+
rtmp_attach_private(rsession, switch_core_session_get_private(newsession));
}
-
+
return SWITCH_STATUS_SUCCESS;
}
switch_dtmf_t dtmf = { 0 };
switch_channel_t *channel;
char *digits;
-
+
if (!rsession->tech_pvt) {
return SWITCH_STATUS_FALSE;
}
-
+
channel = switch_core_session_get_channel(rsession->tech_pvt->session);
if (amf0_is_number(argv[2])) {
- dtmf.duration = amf0_get_number(argv[2]);
+ dtmf.duration = amf0_get_number(argv[2]);
} else if (!zstr(amf0_get_string(argv[2]))) {
dtmf.duration = atoi(amf0_get_string(argv[2]));
}
-
+
if ((digits = amf0_get_string(argv[1]))) {
size_t len = strlen(digits);
size_t j;
switch_channel_queue_dtmf(channel, &dtmf);
}
}
-
+
return SWITCH_STATUS_SUCCESS;
}
RTMP_INVOKE_FUNCTION(rtmp_i_login)
{
char *user, *auth, *domain, *ddomain = NULL;
-
-
+
+
user = amf0_get_string(argv[1]);
auth = amf0_get_string(argv[2]);
-
+
if (zstr(user) || zstr(auth)) {
return SWITCH_STATUS_FALSE;
}
-
+
if ((domain = strchr(user, '@'))) {
*domain++ = '\0';
}
-
+
if (zstr(domain)) {
ddomain = switch_core_get_domain(SWITCH_TRUE);
domain = ddomain;
if (rtmp_check_auth(rsession, user, domain, auth) == SWITCH_STATUS_SUCCESS) {
- rtmp_session_login(rsession, user, domain);
+ rtmp_session_login(rsession, user, domain);
} else {
rtmp_send_invoke_free(rsession, 3, 0, 0,
amf0_str("onLogin"),
amf0_null_new(),
amf0_null_new(), NULL);
}
-
+
switch_safe_free(ddomain);
{
char *auth = amf0_get_string(argv[1]);
char *user = NULL, *domain = NULL;
-
+
/* Unregister from that user */
rtmp_clear_registration(rsession, auth, NULL);
-
+
switch_split_user_domain(auth, &user, &domain);
-
+
if (!zstr(user) && !zstr(domain)) {
rtmp_session_logout(rsession, user, domain);
}
-
+
return SWITCH_STATUS_SUCCESS;
}
const char *user = NULL, *domain = NULL;
char *dup = NULL;
switch_status_t status;
-
+
if (!rsession->account) {
return SWITCH_STATUS_FALSE;
}
-
+
if (!zstr(auth)) {
dup = strdup(auth);
switch_split_user_domain(dup, (char**)&user, (char**)&domain);
user = rsession->account->user;
domain = rsession->account->domain;
}
-
+
if (rtmp_session_check_user(rsession, user, domain) == SWITCH_STATUS_SUCCESS) {
rtmp_add_registration(rsession, auth, amf0_get_string(argv[2]));
- status = SWITCH_STATUS_SUCCESS;
+ status = SWITCH_STATUS_SUCCESS;
} else {
status = SWITCH_STATUS_FALSE;
}
-
+
switch_safe_free(dup);
-
+
return status;
}
RTMP_INVOKE_FUNCTION(rtmp_i_unregister)
{
rtmp_clear_registration(rsession, amf0_get_string(argv[1]), amf0_get_string(argv[2]));
-
+
return SWITCH_STATUS_SUCCESS;
}
{
switch_channel_t *channel = NULL;
char *uuid = amf0_get_string(argv[1]);
-
+
if (!zstr(uuid)) {
rtmp_private_t *new_tech_pvt = rtmp_locate_private(rsession, uuid);
if (new_tech_pvt) {
}
return SWITCH_STATUS_FALSE;
}
-
+
if (!rsession->tech_pvt) {
return SWITCH_STATUS_FALSE;
}
-
+
/* No UUID specified but we're attached to a channel, mark it as answered */
channel = switch_core_session_get_channel(rsession->tech_pvt->session);
switch_channel_mark_answered(channel);
rtmp_attach_private(rsession, rsession->tech_pvt);
-
+
return SWITCH_STATUS_SUCCESS;
}
{
rtmp_private_t *tech_pvt = NULL;
char *uuid = amf0_get_string(argv[1]);
-
+
if (!zstr(uuid)) {
tech_pvt = rtmp_locate_private(rsession, uuid);
}
- /* Will detach if an empty (or invalid) uuid is received */
+ /* Will detach if an empty (or invalid) uuid is received */
rtmp_attach_private(rsession, tech_pvt);
-
+
return SWITCH_STATUS_SUCCESS;
}
if (!zstr(uuid)) {
rtmp_private_t *tech_pvt = rtmp_locate_private(rsession, uuid);
if (tech_pvt) {
- channel = switch_core_session_get_channel(tech_pvt->session);
+ channel = switch_core_session_get_channel(tech_pvt->session);
}
}
-
+
if (!channel) {
if (!rsession->tech_pvt) {
return SWITCH_STATUS_FALSE;
}
- channel = switch_core_session_get_channel(rsession->tech_pvt->session);
+ channel = switch_core_session_get_channel(rsession->tech_pvt->session);
}
-
+
if (amf0_is_number(argv[2])) {
cause = amf0_get_number(argv[2]);
} else if ((scause = amf0_get_string(argv[2])) && !zstr(scause)) {
cause = switch_channel_str2cause(scause);
- }
-
+ }
+
switch_channel_hangup(channel, cause);
-
+
return SWITCH_STATUS_SUCCESS;
}
char *uuid = amf0_get_string(argv[1]);
char *dest = amf0_get_string(argv[2]);
rtmp_private_t *tech_pvt;
-
+
if (zstr(uuid) || zstr(dest)) {
return SWITCH_STATUS_FALSE;
}
-
+
if ((tech_pvt = rtmp_locate_private(rsession, uuid))) {
const char *other_uuid = switch_channel_get_partner_uuid(tech_pvt->channel);
switch_core_session_t *session;
-
+
if (!zstr(other_uuid) && (session = switch_core_session_locate(other_uuid))) {
switch_ivr_session_transfer(session, dest, NULL, NULL);
switch_core_session_rwunlock(session);
}
}
-
+
return SWITCH_STATUS_SUCCESS;
}
char *uuid[] = { amf0_get_string(argv[1]), amf0_get_string(argv[2]) };
const char *other_uuid[2];
rtmp_private_t *tech_pvt[2];
-
+
if (zstr(uuid[0]) || zstr(uuid[1])) {
return SWITCH_STATUS_SUCCESS;
}
-
+
if (!(tech_pvt[0] = rtmp_locate_private(rsession, uuid[0])) ||
- !(tech_pvt[1] = rtmp_locate_private(rsession, uuid[1]))) {
+ !(tech_pvt[1] = rtmp_locate_private(rsession, uuid[1]))) {
return SWITCH_STATUS_FALSE;
}
-
+
if (tech_pvt[0] == tech_pvt[1]) {
return SWITCH_STATUS_FALSE;
}
-
+
if ((other_uuid[0] = switch_channel_get_partner_uuid(tech_pvt[0]->channel)) &&
- (other_uuid[1] = switch_channel_get_partner_uuid(tech_pvt[1]->channel))) {
+ (other_uuid[1] = switch_channel_get_partner_uuid(tech_pvt[1]->channel))) {
#ifndef RTMP_DONT_HOLD
if (switch_test_flag(tech_pvt[0], TFLAG_DETACHED)) {
switch_ivr_unhold(tech_pvt[1]->session);
}
#endif
-
+
switch_ivr_uuid_bridge(other_uuid[0], other_uuid[1]);
}
-
+
return SWITCH_STATUS_SUCCESS;
}
- setup a state handler in other[1] to detect when it hangs up
Check list:
-tech_pvt[0] or other[0] hangs up
+tech_pvt[0] or other[0] hangs up
If we were attached to the call, switch the active call to tech_pvt[1]
-tech_pvt[1] or other[1] hangs up
+tech_pvt[1] or other[1] hangs up
Clear up any 3-way indications on the tech_pvt[0]
*/
static switch_status_t three_way_on_soft_execute(switch_core_session_t *session);
#if 0
static switch_status_t three_way_on_hangup(switch_core_session_t *session);
-#endif
+#endif
static const switch_state_handler_table_t three_way_state_handlers_remote = {
/*.on_init */ NULL,
switch_core_session_t *my_session;
switch_channel_t *my_channel;
rtmp_private_t *tech_pvt;
-
+
if (zstr(uuid) || zstr(my_uuid)) {
return SWITCH_STATUS_SUCCESS;
}
-
+
if (zstr(my_uuid) || !(my_session = switch_core_session_locate(my_uuid))) {
return SWITCH_STATUS_SUCCESS;
}
-
+
if (!switch_core_session_check_interface(my_session, rtmp_globals.rtmp_endpoint_interface)) {
/* In case someone tempers with my variables, since we get tech_pvt from there */
switch_core_session_rwunlock(my_session);
return SWITCH_STATUS_SUCCESS;
}
-
+
my_channel = switch_core_session_get_channel(my_session);
tech_pvt = switch_core_session_get_private(my_session);
-
+
switch_ivr_eavesdrop_session(other_session, uuid, NULL, ED_MUX_READ | ED_MUX_WRITE);
-
+
/* 3-way call ended, whatever the reason
* We need to go back to our original state. */
if (!switch_channel_up(other_channel)) {
}
} else {
switch_channel_hangup(my_channel, SWITCH_CAUSE_NORMAL_CLEARING);
- }
+ }
}
} else if (switch_channel_ready(other_channel)) {
/* channel[1] didn't hangup, must be channel[0] then, rebridge this one with its original partner */
switch_ivr_uuid_bridge(switch_core_session_get_uuid(other_session), my_uuid);
} else {
- /* channel[1] being taken out of our control, take the other leg out of CS_HIBERNATE if its ready, or else leave it alone */
+ /* channel[1] being taken out of our control, take the other leg out of CS_HIBERNATE if its ready, or else leave it alone */
if (switch_channel_ready(my_channel)) {
- switch_channel_set_state(my_channel, CS_EXECUTE);
+ switch_channel_set_state(my_channel, CS_EXECUTE);
}
}
-
+
switch_channel_clear_state_handler(other_channel, &three_way_state_handlers_remote);
-
+
switch_channel_set_variable(other_channel, SWITCH_SOFT_HOLDING_UUID_VARIABLE, NULL);
switch_channel_set_variable(my_channel, SWITCH_SOFT_HOLDING_UUID_VARIABLE, NULL);
switch_channel_set_variable(other_channel, RTMP_THREE_WAY_UUID_VARIABLE, NULL);
-
+
switch_clear_flag(tech_pvt, TFLAG_THREE_WAY);
if (my_session) {
- switch_core_session_rwunlock(my_session);
+ switch_core_session_rwunlock(my_session);
}
-
+
return SWITCH_STATUS_SUCCESS;
}
const char *other_uuid[2];
switch_core_session_t *other_session[2] = { 0 };
switch_channel_t *other_channel[2] = { 0 };
-
- if (zstr(uuid[0]) || zstr(uuid[1]) ||
+
+ if (zstr(uuid[0]) || zstr(uuid[1]) ||
!(tech_pvt[0] = rtmp_locate_private(rsession, uuid[0])) ||
!(tech_pvt[1] = rtmp_locate_private(rsession, uuid[1]))) {
return SWITCH_STATUS_FALSE;
}
-
+
/* Make sure we don't 3-way with the same call, and that it doesnt turn into a 4-way, we aren't that permissive */
- if (tech_pvt[0] == tech_pvt[1] || switch_test_flag(tech_pvt[0], TFLAG_THREE_WAY) ||
+ if (tech_pvt[0] == tech_pvt[1] || switch_test_flag(tech_pvt[0], TFLAG_THREE_WAY) ||
switch_test_flag(tech_pvt[1], TFLAG_THREE_WAY)) {
return SWITCH_STATUS_FALSE;
}
-
+
if (!(other_uuid[0] = switch_channel_get_partner_uuid(tech_pvt[0]->channel)) ||
- !(other_uuid[1] = switch_channel_get_partner_uuid(tech_pvt[1]->channel))) {
+ !(other_uuid[1] = switch_channel_get_partner_uuid(tech_pvt[1]->channel))) {
return SWITCH_STATUS_FALSE; /* Both calls aren't bridged */
}
if (!(other_session[0] = switch_core_session_locate(other_uuid[0])) ||
- !(other_session[1] = switch_core_session_locate(other_uuid[1]))) {
+ !(other_session[1] = switch_core_session_locate(other_uuid[1]))) {
goto done;
}
-
+
other_channel[0] = switch_core_session_get_channel(other_session[0]);
other_channel[1] = switch_core_session_get_channel(other_session[1]);
-
+
/* Save which uuid is the 3-way target */
switch_channel_set_variable(other_channel[1], RTMP_THREE_WAY_UUID_VARIABLE, uuid[0]);
- switch_channel_set_variable(tech_pvt[1]->channel, RTMP_THREE_WAY_UUID_VARIABLE, uuid[0]);
-
+ switch_channel_set_variable(tech_pvt[1]->channel, RTMP_THREE_WAY_UUID_VARIABLE, uuid[0]);
+
/* Attach redirect */
switch_set_flag(tech_pvt[1], TFLAG_THREE_WAY);
-
+
/* Set soft_holding_uuid to the uuid of the other matching channel, so they can can be bridged back when the 3-way is over */
switch_channel_set_variable(tech_pvt[1]->channel, SWITCH_SOFT_HOLDING_UUID_VARIABLE, other_uuid[1]);
switch_channel_set_variable(other_channel[1], SWITCH_SOFT_HOLDING_UUID_VARIABLE, uuid[1]);
if (other_session[0]) {
switch_core_session_rwunlock(other_session[0]);
}
-
+
if (other_session[1]) {
switch_core_session_rwunlock(other_session[1]);
}
-
+
return SWITCH_STATUS_SUCCESS;
}
switch_event_t *event = NULL;
switch_status_t status = SWITCH_STATUS_SUCCESS;
const char *uuid = NULL;
-
+
if (argv[1] && argv[1]->type == AMF0_TYPE_OBJECT) {
obj = argv[1];
} else if (argv[2] && argv[2]->type == AMF0_TYPE_OBJECT) {
switch_log_printf(SWITCH_CHANNEL_UUID_LOG(rsession->uuid), SWITCH_LOG_ERROR, "Bad argument for sendevent");
return SWITCH_STATUS_FALSE;
}
-
-
- if (switch_event_create_subclass(&event, zstr(uuid) ? SWITCH_EVENT_CUSTOM : SWITCH_EVENT_MESSAGE,
+
+
+ if (switch_event_create_subclass(&event, zstr(uuid) ? SWITCH_EVENT_CUSTOM : SWITCH_EVENT_MESSAGE,
zstr(uuid) ? RTMP_EVENT_CLIENTCUSTOM : NULL) != SWITCH_STATUS_SUCCESS) {
switch_log_printf(SWITCH_CHANNEL_UUID_LOG(rsession->uuid), SWITCH_LOG_ERROR, "Couldn't create event\n");
return SWITCH_STATUS_FALSE;
}
-
+
rtmp_event_fill(rsession, event);
/* Build event using amf array */
switch_event_destroy(&event);
return SWITCH_STATUS_FALSE;
}
-
+
if (!zstr(uuid)) {
rtmp_private_t *session_pvt = rtmp_locate_private(rsession, uuid);
if (session_pvt) {
}
}
}
-
+
switch_event_fire(&event);
-
+
return SWITCH_STATUS_SUCCESS;
}
RTMP_INVOKE_FUNCTION(rtmp_i_log)
{
const char *data = amf0_get_string(argv[1]);
-
+
switch_log_printf(SWITCH_CHANNEL_UUID_LOG(rsession->uuid), SWITCH_LOG_INFO, "Log: %s\n", data);
-
+
return SWITCH_STATUS_SUCCESS;
}
-/*
+/*
* mod_rtmp for FreeSWITCH Modular Media Switching Software Library / Soft-Switch Application
* Copyright (C) 2011-2012, Barracuda Networks Inc.
*
* the Initial Developer. All Rights Reserved.
*
* Contributor(s):
- *
+ *
* Mathieu Rene <mrene@avgs.ca>
* William King <william.king@quentustech.com>
*
{
rtmp_tcp_io_private_t *io_pvt = rsession->io_private;
rtmp_io_tcp_t *io = (rtmp_io_tcp_t*)rsession->profile->io;
-
+
if (pollout && (io_pvt->pollfd->reqevents & SWITCH_POLLOUT)) {
return;
} else if (!pollout && !(io_pvt->pollfd->reqevents & SWITCH_POLLOUT)) {
return;
}
-
+
switch_pollset_remove(io->pollset, io_pvt->pollfd);
io_pvt->pollfd->reqevents = SWITCH_POLLIN | SWITCH_POLLERR;
if (pollout) {
io_pvt->pollfd->reqevents |= SWITCH_POLLOUT;
}
- switch_log_printf(SWITCH_CHANNEL_UUID_LOG(rsession->uuid), SWITCH_LOG_NOTICE, "Pollout: %s\n",
+ switch_log_printf(SWITCH_CHANNEL_UUID_LOG(rsession->uuid), SWITCH_LOG_NOTICE, "Pollout: %s\n",
pollout ? "true" : "false");
-
+
switch_pollset_add(io->pollset, io_pvt->pollfd);
}
switch_status_t status = SWITCH_STATUS_SUCCESS;
#ifdef RTMP_DEBUG_IO
switch_size_t olen = *len;
-#endif
+#endif
switch_assert(*len > 0 && *len < 1024000);
do {
- status = switch_socket_recv(io_pvt->socket, (char*)buf, len);
+ status = switch_socket_recv(io_pvt->socket, (char*)buf, len);
} while(status != SWITCH_STATUS_SUCCESS && SWITCH_STATUS_IS_BREAK(status));
-
+
#ifdef RTMP_DEBUG_IO
{
int i;
fprintf(rsession->io_debug_in, "recv %p max=%"SWITCH_SIZE_T_FMT" got=%"SWITCH_SIZE_T_FMT"\n< ", (void*)buf, olen, *len);
-
+
for (i = 0; i < *len; i++) {
-
+
fprintf(rsession->io_debug_in, "%02X ", (uint8_t)buf[i]);
if (i != 0 && i % 32 == 0) {
//rtmp_io_tcp_t *io = (rtmp_io_tcp_t*)rsession->profile->io;
rtmp_tcp_io_private_t *io_pvt = rsession->io_private;
switch_status_t status;
- switch_size_t orig_len = *len;
-
+ switch_size_t orig_len = *len;
+
#ifdef RTMP_DEBUG_IO
{
int i;
for (i = 0; i < *len; i++) {
fprintf(rsession->io_debug_out, "%02X ", (uint8_t)buf[i]);
-
+
if (i != 0 && i % 32 == 0) {
fprintf(rsession->io_debug_out, "\n> ");
}
}
fprintf(rsession->io_debug_out, "\n\n ");
-
+
fflush(rsession->io_debug_out);
}
#endif
-
+
if (io_pvt->sendq && switch_buffer_inuse(io_pvt->sendq) > 0) {
/* We already have queued data, append it to the sendq */
switch_buffer_write(io_pvt->sendq, buf, *len);
return SWITCH_STATUS_SUCCESS;
}
-
+
status = switch_socket_send_nonblock(io_pvt->socket, (char*)buf, len);
-
+
if (*len > 0 && *len < orig_len) {
-
+
if (rsession->state >= RS_DESTROY) {
return SWITCH_STATUS_FALSE;
}
-
- /* We didnt send it all... add it to the sendq*/
+
+ /* We didnt send it all... add it to the sendq*/
switch_log_printf(SWITCH_CHANNEL_UUID_LOG(rsession->uuid), SWITCH_LOG_DEBUG, "%"SWITCH_SIZE_T_FMT" bytes added to sendq.\n", (orig_len - *len));
-
+
switch_buffer_write(io_pvt->sendq, (buf + *len), orig_len - *len);
/* Make sure we poll-write */
rtmp_tcp_alter_pollfd(rsession, SWITCH_TRUE);
}
-
+
return status;
}
static switch_status_t rtmp_tcp_close(rtmp_session_t *rsession)
{
rtmp_io_tcp_t *io = (rtmp_io_tcp_t*)rsession->profile->io;
- rtmp_tcp_io_private_t *io_pvt = rsession->io_private;
-
+ rtmp_tcp_io_private_t *io_pvt = rsession->io_private;
+
if (io_pvt->socket) {
switch_mutex_lock(io->mutex);
switch_pollset_remove(io->pollset, io_pvt->pollfd);
if ( io_pvt->sendq ) {
switch_buffer_destroy(&(io_pvt->sendq));
}
-
+
return SWITCH_STATUS_SUCCESS;
}
{
rtmp_io_tcp_t *io = (rtmp_io_tcp_t*)obj;
io->base.running = 1;
-
+
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "%s: I/O Thread starting\n", io->base.profile->name);
-
-
+
+
while(io->base.running) {
const switch_pollfd_t *fds;
int32_t numfds;
int32_t i;
switch_status_t status;
-
+
switch_mutex_lock(io->mutex);
status = switch_pollset_poll(io->pollset, 500000, &numfds, &fds);
switch_mutex_unlock(io->mutex);
-
+
if (status != SWITCH_STATUS_SUCCESS && status != SWITCH_STATUS_TIMEOUT) {
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_CRIT, "pollset_poll failed\n");
continue;
} else if (status == SWITCH_STATUS_TIMEOUT) {
switch_cond_next();
}
-
+
for (i = 0; i < numfds; i++) {
- if (!fds[i].client_data) {
+ if (!fds[i].client_data) {
switch_socket_t *newsocket;
if (switch_socket_accept(&newsocket, io->listen_socket, io->base.pool) != SWITCH_STATUS_SUCCESS) {
if (io->base.running) {
/* Don't spam the logs if we are shutting down */
- switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "Socket Error [%s]\n", strerror(errno));
+ switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "Socket Error [%s]\n", strerror(errno));
} else {
return NULL;
}
} else {
rtmp_session_t *rsession;
-
+
if (switch_socket_opt_set(newsocket, SWITCH_SO_NONBLOCK, TRUE)) {
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_WARNING, "Couldn't set socket as non-blocking\n");
}
if (switch_socket_opt_set(newsocket, SWITCH_SO_TCP_NODELAY, 1)) {
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_WARNING, "Couldn't disable Nagle.\n");
}
-
+
if (rtmp_session_request(io->base.profile, &rsession) != SWITCH_STATUS_SUCCESS) {
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "RTMP session request failed\n");
switch_socket_close(newsocket);
} else {
switch_sockaddr_t *addr = NULL;
char ipbuf[200];
-
+
/* Create out private data and attach it to the rtmp session structure */
rtmp_tcp_io_private_t *pvt = switch_core_alloc(rsession->pool, sizeof(*pvt));
rsession->io_private = pvt;
switch_socket_create_pollfd(&pvt->pollfd, newsocket, SWITCH_POLLIN | SWITCH_POLLERR, rsession, rsession->pool);
switch_pollset_add(io->pollset, pvt->pollfd);
switch_buffer_create_dynamic(&pvt->sendq, 512, 1024, 0);
-
+
/* Get the remote address/port info */
switch_socket_addr_get(&addr, SWITCH_TRUE, newsocket);
switch_get_addr(ipbuf, sizeof(ipbuf), addr);
} else {
rtmp_session_t *rsession = (rtmp_session_t*)fds[i].client_data;
rtmp_tcp_io_private_t *io_pvt = (rtmp_tcp_io_private_t*)rsession->io_private;
-
+
if (fds[i].rtnevents & SWITCH_POLLOUT && switch_buffer_inuse(io_pvt->sendq) > 0) {
/* Send as much remaining data as possible */
switch_size_t sendlen;
}
} else if (fds[i].rtnevents & SWITCH_POLLIN && rtmp_handle_data(rsession) != SWITCH_STATUS_SUCCESS) {
switch_log_printf(SWITCH_CHANNEL_UUID_LOG(rsession->uuid), SWITCH_LOG_DEBUG, "Closing socket\n");
-
+
switch_mutex_lock(io->mutex);
switch_pollset_remove(io->pollset, io_pvt->pollfd);
switch_mutex_unlock(io->mutex);
-
+
switch_socket_close(io_pvt->socket);
io_pvt->socket = NULL;
io->base.close(rsession);
-
+
rtmp_session_destroy(&rsession);
}
}
}
}
-
+
io->base.running = -1;
switch_socket_close(io->listen_socket);
-
+
return NULL;
}
switch_sockaddr_t *sa;
switch_threadattr_t *thd_attr = NULL;
rtmp_io_tcp_t *io_tcp;
-
+
io_tcp = (rtmp_io_tcp_t*)switch_core_alloc(pool, sizeof(rtmp_io_tcp_t));
io_tcp->base.pool = pool;
io_tcp->ip = switch_core_strdup(pool, bindaddr);
-
+
*new_io = (rtmp_io_t*)io_tcp;
io_tcp->base.profile = profile;
io_tcp->base.read = rtmp_tcp_read;
io_tcp->base.close = rtmp_tcp_close;
io_tcp->base.name = "tcp";
io_tcp->base.address = switch_core_strdup(pool, io_tcp->ip);
-
+
if ((szport = strchr(io_tcp->ip, ':'))) {
*szport++ = '\0';
io_tcp->port = atoi(szport);
} else {
io_tcp->port = RTMP_DEFAULT_PORT;
}
-
+
if (switch_sockaddr_info_get(&sa, io_tcp->ip, SWITCH_INET, io_tcp->port, 0, pool)) {
goto fail;
}
}
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "Listening on %s:%u (tcp)\n", io_tcp->ip, io_tcp->port);
-
+
io_tcp->base.running = 1;
-
+
if (switch_pollset_create(&io_tcp->pollset, 1000 /* max poll fds */, pool, 0) != SWITCH_STATUS_SUCCESS) {
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "pollset_create failed\n");
goto fail;
}
-
+
switch_socket_create_pollfd(&(io_tcp->listen_pollfd), io_tcp->listen_socket, SWITCH_POLLIN | SWITCH_POLLERR, NULL, pool);
if (switch_pollset_add(io_tcp->pollset, io_tcp->listen_pollfd) != SWITCH_STATUS_SUCCESS) {
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "pollset_add failed\n");
goto fail;
}
-
+
switch_mutex_init(&io_tcp->mutex, SWITCH_MUTEX_NESTED, pool);
-
+
switch_threadattr_create(&thd_attr, pool);
switch_threadattr_detach_set(thd_attr, 1);
switch_threadattr_stacksize_set(thd_attr, SWITCH_THREAD_STACKSIZE);
switch_thread_create(&io_tcp->thread, thd_attr, rtmp_io_tcp_thread, *new_io, pool);
-
+
return SWITCH_STATUS_SUCCESS;
fail:
if (io_tcp->listen_socket) {