SFF_RAW_RTP_PARSE_FRAME = (1 << 13),
SFF_PICTURE_RESET = (1 << 14),
SFF_SAME_IMAGE = (1 << 15),
- SFF_USE_VIDEO_TIMESTAMP = (1 << 16)
+ SFF_USE_VIDEO_TIMESTAMP = (1 << 16),
+ SFF_ENCODED = (1 << 17)
} switch_frame_flag_enum_t;
typedef uint32_t switch_frame_flag_t;
struct switch_img_txt_handle_s;
typedef struct switch_img_txt_handle_s switch_img_txt_handle_t;
+struct switch_frame_buffer_s;
+typedef struct switch_frame_buffer_s switch_frame_buffer_t;
+
SWITCH_END_EXTERN_C
#endif
/* For Emacs:
SWITCH_DECLARE(void) switch_http_parse_qs(switch_http_request_t *request, char *qs);
+SWITCH_DECLARE(switch_status_t) switch_frame_buffer_free(switch_frame_buffer_t *fb, switch_frame_t **frameP);
+SWITCH_DECLARE(switch_status_t) switch_frame_buffer_dup(switch_frame_buffer_t *fb, switch_frame_t *orig, switch_frame_t **clone);
+SWITCH_DECLARE(switch_status_t) switch_frame_buffer_destroy(switch_frame_buffer_t **fbP);
+SWITCH_DECLARE(switch_status_t) switch_frame_buffer_create(switch_frame_buffer_t **fbP);
+
SWITCH_END_EXTERN_C
#endif
/* For Emacs:
switch_rgb_color_t bgcolor;
switch_mutex_t *mutex;
switch_timer_t timer;
+ switch_frame_buffer_t *fb;
switch_memory_pool_t *pool;
} mcu_canvas_t;
char *kicked_sound;
switch_queue_t *dtmf_queue;
switch_queue_t *video_queue;
+ switch_queue_t *mux_out_queue;
+ switch_thread_t *video_muxing_write_thread;
switch_thread_t *input_thread;
cJSON *json;
cJSON *status_field;
SWITCH_STANDARD_APP(conference_function);
static void launch_conference_video_muxing_thread(conference_obj_t *conference);
static void launch_conference_thread(conference_obj_t *conference);
+static void launch_conference_video_muxing_write_thread(conference_member_t *member);
static void *SWITCH_THREAD_FUNC conference_loop_input(switch_thread_t *thread, void *obj);
static switch_status_t conference_local_play_file(conference_obj_t *conference, switch_core_session_t *session, char *path, uint32_t leadin, void *buf,
uint32_t buflen);
frame = &write_frame;
frame->img = codec_set->frame.img;
frame->packet = codec_set->frame.packet;
+ frame->packetlen = codec_set->frame.packetlen;
switch_clear_flag(frame, SFF_SAME_IMAGE);
frame->m = 0;
}
do {
-
+
frame->data = ((unsigned char *)frame->packet) + 12;
frame->datalen = SWITCH_DEFAULT_VIDEO_SIZE;
switch_mutex_lock(conference->member_mutex);
for (imember = conference->members; imember; imember = imember->next) {
+ switch_frame_t *dupframe;
+
if (switch_test_flag(imember, MFLAG_NO_MINIMIZE_ENCODING)) {
continue;
}
if (need_refresh) {
switch_core_session_request_video_refresh(imember->session);
}
+
+ //switch_core_session_write_encoded_video_frame(imember->session, frame, 0, 0);
+ switch_set_flag(frame, SFF_ENCODED);
+
+ if (switch_frame_buffer_dup(conference->canvas->fb, frame, &dupframe) == SWITCH_STATUS_SUCCESS) {
+ switch_queue_push(imember->mux_out_queue, dupframe);
+ dupframe = NULL;
+ }
- switch_core_session_write_encoded_video_frame(imember->session, frame, 0, 0);
+ switch_clear_flag(frame, SFF_ENCODED);
switch_core_session_rwunlock(imember->session);
}
}
}
+static void *SWITCH_THREAD_FUNC conference_video_muxing_write_thread_run(switch_thread_t *thread, void *obj)
+{
+ conference_member_t *member = (conference_member_t *) obj;
+ void *pop;
+
+ while(switch_test_flag(member, MFLAG_RUNNING) || switch_queue_size(member->mux_out_queue)) {
+ switch_frame_t *frame;
+
+ if (switch_test_flag(member, MFLAG_RUNNING)) {
+ if (switch_queue_pop(member->mux_out_queue, &pop) == SWITCH_STATUS_SUCCESS) {
+ if (!pop) continue;
+
+ frame = (switch_frame_t *) pop;
+ if (switch_test_flag(frame, SFF_ENCODED)) {
+ switch_core_session_write_encoded_video_frame(member->session, frame, 0, 0);
+ } else {
+ switch_core_session_write_video_frame(member->session, frame, SWITCH_IO_FLAG_NONE, 0);
+ }
+ switch_frame_buffer_free(member->conference->canvas->fb, &frame);
+ }
+ } else {
+ if (switch_queue_trypop(member->mux_out_queue, &pop) == SWITCH_STATUS_SUCCESS) {
+ if (pop) {
+ frame = (switch_frame_t *) pop;
+ switch_frame_buffer_free(member->conference->canvas->fb, &frame);
+ }
+ }
+ }
+ }
+
+ return NULL;
+}
+
static void *SWITCH_THREAD_FUNC conference_video_muxing_thread_run(switch_thread_t *thread, void *obj)
{
conference_obj_t *conference = (conference_obj_t *) obj;
video_layout_t *vlayout = NULL;
switch_codec_t *check_codec = NULL;
codec_set_t *write_codecs[MAX_MUX_CODECS] = { 0 };
- int buflen = SWITCH_RECOMMENDED_BUFFER_SIZE * 2;
+ int buflen = SWITCH_RTP_MAX_BUF_LEN;
int i = 0;
int used = 0;
uint32_t video_key_freq = 10000000;
switch_image_t *write_img = NULL, *file_img = NULL;
uint32_t timestamp = 0;
+
if (conference->video_layout_group) {
lg = switch_core_hash_find(conference->layout_group_hash, conference->video_layout_group);
vlayout = find_best_layout(conference, lg);
}
init_canvas(conference, vlayout);
+ switch_frame_buffer_create(&conference->canvas->fb);
conference->video_timer_reset = 1;
- packet = switch_core_alloc(conference->pool, SWITCH_RECOMMENDED_BUFFER_SIZE);
+ packet = switch_core_alloc(conference->pool, SWITCH_RTP_MAX_BUF_LEN);
while (globals.running && !switch_test_flag(conference, CFLAG_DESTRUCT) && switch_test_flag(conference, CFLAG_VIDEO_MUXING)) {
switch_bool_t need_refresh = SWITCH_FALSE, need_keyframe = SWITCH_FALSE, need_reset = SWITCH_FALSE;
write_codecs[i]->frame.packet = switch_core_alloc(conference->pool, buflen);
write_codecs[i]->frame.data = ((uint8_t *)write_codecs[i]->frame.packet) + 12;
- write_codecs[i]->frame.packetlen = 0;
+ write_codecs[i]->frame.packetlen = buflen;
write_codecs[i]->frame.buflen = buflen - 12;
switch_set_flag((&write_codecs[i]->frame), SFF_RAW_RTP);
switch_mutex_lock(conference->member_mutex);
for (imember = conference->members; imember; imember = imember->next) {
-
+ switch_frame_t *dupframe;
+
if (switch_test_flag(conference, CFLAG_MINIMIZE_VIDEO_ENCODING) && !switch_test_flag(imember, MFLAG_NO_MINIMIZE_ENCODING)) {
continue;
}
switch_set_flag(&write_frame, SFF_RAW_RTP);
write_frame.img = write_img;
write_frame.packet = packet;
- write_frame.data = packet + 12;
- write_frame.datalen = SWITCH_RECOMMENDED_BUFFER_SIZE - 12;
- write_frame.buflen = write_frame.datalen;
- write_frame.packetlen = SWITCH_RECOMMENDED_BUFFER_SIZE;
+ write_frame.data = ((uint8_t *)packet) + 12;
+ write_frame.datalen = 0;
+ write_frame.buflen = SWITCH_RTP_MAX_BUF_LEN - 12;
+ write_frame.packetlen = 0;
+
+ //switch_core_session_write_video_frame(imember->session, &write_frame, SWITCH_IO_FLAG_NONE, 0);
- switch_core_session_write_video_frame(imember->session, &write_frame, SWITCH_IO_FLAG_NONE, 0);
+ if (switch_frame_buffer_dup(conference->canvas->fb, &write_frame, &dupframe) == SWITCH_STATUS_SUCCESS) {
+ switch_queue_push(imember->mux_out_queue, dupframe);
+ dupframe = NULL;
+ }
if (imember->session) {
switch_core_session_rwunlock(imember->session);
}
switch_core_timer_destroy(&conference->canvas->timer);
-
+ switch_frame_buffer_destroy(&conference->canvas->fb);
destroy_canvas(&conference->canvas);
return NULL;
member->video_codec_index = -1;
switch_queue_create(&member->dtmf_queue, 100, member->pool);
- if (conference->video_layout_name) {
- switch_queue_create(&member->video_queue, 2000, member->pool);
- }
+
conference->members = member;
switch_set_flag_locked(member, MFLAG_INTREE);
switch_mutex_unlock(conference->member_mutex);
if (member->input_thread) {
switch_thread_join(&st, member->input_thread);
}
+ if (member->video_muxing_write_thread) {
+ switch_queue_push(member->mux_out_queue, NULL);
+ switch_thread_join(&st, member->video_muxing_write_thread);
+ }
}
switch_core_timer_destroy(&timer);
conference->min = 2;
}
+ if (conference->video_layout_name) {
+ switch_queue_create(&member.video_queue, 2000, member.pool);
+ switch_queue_create(&member.mux_out_queue, 2000, member.pool);
+ launch_conference_video_muxing_write_thread(&member);
+ }
+
/* Add the caller to the conference */
if (conference_add_member(conference, &member) != SWITCH_STATUS_SUCCESS) {
switch_core_codec_destroy(&member.read_codec);
}
-/* Create a thread for the conference and launch it */
+
+static void launch_conference_video_muxing_write_thread(conference_member_t *member)
+{
+ switch_threadattr_t *thd_attr = NULL;
+ switch_mutex_lock(globals.hash_mutex);
+ if (!member->video_muxing_write_thread) {
+ switch_threadattr_create(&thd_attr, member->pool);
+ switch_threadattr_stacksize_set(thd_attr, SWITCH_THREAD_STACKSIZE);
+ switch_thread_create(&member->video_muxing_write_thread, thd_attr, conference_video_muxing_write_thread_run, member, member->pool);
+ }
+ switch_mutex_unlock(globals.hash_mutex);
+}
static void launch_conference_video_muxing_thread(conference_obj_t *conference)
{
switch_threadattr_t *thd_attr = NULL;
vid_frame.codec = &vid_codec;
vid_frame.packet = vid_buffer;
- vid_frame.data = vid_buffer + 12;
+ vid_frame.data = ((uint8_t *)vid_buffer) + 12;
vid_frame.buflen = SWITCH_RECOMMENDED_BUFFER_SIZE - 12;
switch_set_flag((&vid_frame), SFF_RAW_RTP);
// switch_set_flag((&vid_frame), SFF_PROXY_PACKET);
vid_frame.m = hdr->m;
vid_frame.timestamp = ts;
- vid_frame.data = data + 12;
+ vid_frame.data = ((uint8_t *)data) + 12;
vid_frame.datalen = vid_frame.packetlen - 12;
switch_core_session_write_video_frame(session, &vid_frame, SWITCH_IO_FLAG_NONE, 0);
}
vid_frame.codec = codec;
vid_frame.packet = vid_buffer;
- vid_frame.data = vid_buffer + 12;
+ vid_frame.data = ((uint8_t *)vid_buffer) + 12;
vid_frame.buflen = SWITCH_RECOMMENDED_BUFFER_SIZE - 12;
switch_set_flag((&vid_frame), SFF_RAW_RTP);
// switch_set_flag((&vid_frame), SFF_PROXY_PACKET);
} else {
context->vid_frame->img = context->img;
context->vid_frame->packet = context->video_packet;
- context->vid_frame->data = context->video_packet + 12;
+ context->vid_frame->data = ((uint8_t *)context->video_packet) + 12;
switch_core_session_write_video_frame(context->session, context->vid_frame, SWITCH_IO_FLAG_NONE, 0);
}
audio_frame.codec = &codec;
video_frame.codec = read_vid_codec;
video_frame.packet = context->video_packet;
- video_frame.data = context->video_packet + 12;
+ video_frame.data = ((uint8_t *)context->video_packet) + 12;
switch_channel_set_variable(channel, SWITCH_PLAYBACK_TERMINATOR_USED, "");
context->aud_frame = &tech_pvt->read_frame;
context->vid_frame = &tech_pvt->read_video_frame;
context->vid_frame->packet = context->video_packet;
- context->vid_frame->data = context->video_packet + 12;
+ context->vid_frame->data = ((uint8_t *)context->video_packet) + 12;
context->playing = 0;
// context->err = 0;
uint32_t len, ts = 0;
switch_payload_t payload = 0;
rtp_msg_t *send_msg = NULL;
- rtp_msg_t local_send_msg = { {0} };
+ srtp_hdr_t local_header;
+ int r = 0;
if (!switch_rtp_ready(rtp_session) || !rtp_session->remote_addr) {
return -1;
if (fwd) {
send_msg = frame->packet;
- local_send_msg = *send_msg;
- send_msg = &local_send_msg;
+ local_header = send_msg->header;
len = frame->packetlen;
ts = 0;
}
*/
- return rtp_common_write(rtp_session, send_msg, data, len, payload, ts, &frame->flags);
+ r = rtp_common_write(rtp_session, send_msg, data, len, payload, ts, &frame->flags);
+
+ if (send_msg) {
+ send_msg->header = local_header;
+ }
+
+ return r;
+
}
SWITCH_DECLARE(switch_rtp_stats_t *) switch_rtp_get_stats(switch_rtp_t *rtp_session, switch_memory_pool_t *pool)
}
+typedef struct switch_frame_node_s {
+ switch_frame_t *frame;
+ int inuse;
+ struct switch_frame_node_s *next;
+} switch_frame_node_t;
+
+struct switch_frame_buffer_s {
+ switch_frame_node_t *head;
+ switch_memory_pool_t *pool;
+ switch_mutex_t *mutex;
+};
+
+static switch_frame_t *find_free_frame(switch_frame_buffer_t *fb, switch_frame_t *orig)
+{
+ switch_frame_node_t *np;
+
+ switch_mutex_lock(fb->mutex);
+ for (np = fb->head; np; np = np->next) {
+ if (!np->inuse && ((orig->packet && np->frame->packet) || (!orig->packet && !np->frame->packet))) {
+ break;
+ }
+ }
+
+ if (!np) {
+ np = switch_core_alloc(fb->pool, sizeof(*np));
+ np->frame = switch_core_alloc(fb->pool, sizeof(*np->frame));
+
+ if (orig->packet) {
+ np->frame->packet = switch_core_alloc(fb->pool, SWITCH_RTP_MAX_BUF_LEN);
+ } else {
+ np->frame->data = switch_core_alloc(fb->pool, SWITCH_RTP_MAX_BUF_LEN);
+ np->frame->buflen = SWITCH_RTP_MAX_BUF_LEN;
+ }
+ np->next = fb->head;
+ fb->head = np;
+ }
+
+
+ np->frame->samples = orig->samples;
+ np->frame->rate = orig->rate;
+ np->frame->channels = orig->channels;
+ np->frame->payload = orig->payload;
+ np->frame->timestamp = orig->timestamp;
+ np->frame->seq = orig->seq;
+ np->frame->ssrc = orig->ssrc;
+ np->frame->m = orig->m;
+ np->frame->flags = orig->flags;
+ np->frame->codec = NULL;
+ np->frame->pmap = NULL;
+ np->frame->img = NULL;
+ np->frame->extra_data = np;
+ np->inuse = 1;
+
+ switch_set_flag(np->frame, SFF_DYNAMIC);
+
+ if (orig->packet) {
+ memcpy(np->frame->packet, orig->packet, orig->packetlen);
+ np->frame->packetlen = orig->packetlen;
+ np->frame->data = ((unsigned char *)np->frame->packet) + 12;
+ np->frame->datalen = orig->datalen;
+ } else {
+ np->frame->packetlen = 0;
+ memcpy(np->frame->data, orig->data, orig->datalen);
+ np->frame->datalen = orig->datalen;
+ }
+
+ if (orig->img && !switch_test_flag(orig, SFF_ENCODED)) {
+ switch_img_copy(orig->img, &np->frame->img);
+ }
+
+ switch_mutex_unlock(fb->mutex);
+
+ return np->frame;
+}
+
+SWITCH_DECLARE(switch_status_t) switch_frame_buffer_free(switch_frame_buffer_t *fb, switch_frame_t **frameP)
+{
+ switch_frame_t *old_frame;
+ switch_frame_node_t *node;
+
+ switch_mutex_lock(fb->mutex);
+
+ old_frame = *frameP;
+ *frameP = NULL;
+
+ node = (switch_frame_node_t *) old_frame->extra_data;
+ node->inuse = 0;
+ switch_img_free(&node->frame->img);
+
+ switch_mutex_unlock(fb->mutex);
+
+ return SWITCH_STATUS_SUCCESS;
+}
+
+SWITCH_DECLARE(switch_status_t) switch_frame_buffer_dup(switch_frame_buffer_t *fb, switch_frame_t *orig, switch_frame_t **clone)
+{
+ switch_frame_t *new_frame;
+
+ if (!orig) {
+ return SWITCH_STATUS_FALSE;
+ }
+
+ switch_assert(orig->buflen);
+
+ new_frame = find_free_frame(fb, orig);
+
+ *clone = new_frame;
+
+ return SWITCH_STATUS_SUCCESS;
+}
+
+SWITCH_DECLARE(switch_status_t) switch_frame_buffer_destroy(switch_frame_buffer_t **fbP)
+{
+ switch_frame_buffer_t *fb = *fbP;
+ switch_memory_pool_t *pool;
+ *fbP = NULL;
+ pool = fb->pool;
+ switch_core_destroy_memory_pool(&pool);
+
+ return SWITCH_STATUS_SUCCESS;
+}
+
+SWITCH_DECLARE(switch_status_t) switch_frame_buffer_create(switch_frame_buffer_t **fbP)
+{
+ switch_frame_buffer_t *fb;
+ switch_memory_pool_t *pool;
+
+ switch_core_new_memory_pool(&pool);
+ fb = switch_core_alloc(pool, sizeof(*fb));
+ fb->pool = pool;
+ switch_mutex_init(&fb->mutex, SWITCH_MUTEX_NESTED, pool);
+ *fbP = fb;
+
+ return SWITCH_STATUS_SUCCESS;
+}
+
+
SWITCH_DECLARE(switch_status_t) switch_frame_dup(switch_frame_t *orig, switch_frame_t **clone)
{
switch_frame_t *new_frame;
switch_assert(orig->buflen);
new_frame = malloc(sizeof(*new_frame));
-
switch_assert(new_frame);
*new_frame = *orig;
switch_set_flag(new_frame, SFF_DYNAMIC);
- new_frame->data = malloc(new_frame->buflen);
- switch_assert(new_frame->data);
+ if (orig->packet) {
+ new_frame->packet = malloc(SWITCH_RTP_MAX_BUF_LEN);
+ memcpy(new_frame->packet, orig->packet, orig->packetlen);
+ new_frame->data = ((unsigned char *)new_frame->packet) + 12;
+ } else {
+ new_frame->data = malloc(new_frame->buflen);
+ switch_assert(new_frame->data);
+ memcpy(new_frame->data, orig->data, orig->datalen);
+ }
+
- memcpy(new_frame->data, orig->data, orig->datalen);
new_frame->codec = NULL;
new_frame->pmap = NULL;
+ new_frame->img = NULL;
+ if (orig->img && !switch_test_flag(orig, SFF_ENCODED)) {
+ switch_img_copy(orig->img, &new_frame->img);
+ }
*clone = new_frame;
return SWITCH_STATUS_SUCCESS;
return SWITCH_STATUS_FALSE;
}
- switch_safe_free((*frame)->data);
+ if ((*frame)->img) {
+ switch_img_free(&(*frame)->img);
+ }
+
+ if ((*frame)->packet) {
+ free((*frame)->packet);
+ (*frame)->packet = NULL;
+ } else {
+ switch_safe_free((*frame)->data);
+ }
+
free(*frame);
*frame = NULL;