size_t len;
} buffer_helper_t;
+typedef struct {
+ uint8_t amfnumber;
+ uint32_t timestamp;
+ uint8_t type;
+ uint32_t stream_id;
+ switch_size_t len;
+ uint32_t flags;
+ unsigned char *message;
+} video_send_buffer_t;
+
+
size_t my_buffer_read(void * out_buffer, size_t size, void * user_data)
{
buffer_helper_t *helper = (buffer_helper_t*)user_data;
return rtmp_send_message(rsession, amfnumber, timestamp, type, stream_id, buf, helper.pos, 0);
}
+static int flush_video_send_queue(rtmp_session_t *rsession, switch_bool_t lock)
+{
+ video_send_buffer_t *b;
+ void *pop;
+ switch_queue_t *q = rsession->video_send_queue;
+ int x = 0;
+
+ if (!q) return 0;
+
+ if (lock) switch_mutex_lock(rsession->socket_mutex);
+ while (switch_queue_size(q) > 0 && switch_queue_trypop(q, &pop) == SWITCH_STATUS_SUCCESS && pop) {
+ b = (video_send_buffer_t *)pop;
+ free(b->message);
+ free(b);
+ x++;
+ }
+ if (lock) switch_mutex_unlock(rsession->socket_mutex);
+ switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "Dropped %d Video Frames\n", x);
+
+ return x;
+}
+
+static void buffer_video_send(rtmp_session_t *rsession, uint8_t amfnumber, uint32_t timestamp, uint8_t type, uint32_t stream_id, const unsigned char *message, switch_size_t len, uint32_t flags)
+{
+ video_send_buffer_t *vbuf;
+
+ switch_mutex_lock(rsession->socket_mutex);
+
+ if (!rsession->video_send_queue) {
+ switch_queue_create(&rsession->video_send_queue, 1000, rsession->pool);
+ }
+
+ if (*message == 0x17) {
+ flush_video_send_queue(rsession, SWITCH_FALSE);
+ }
+
+ vbuf = malloc(sizeof(video_send_buffer_t));
+ switch_assert(vbuf);
+
+ vbuf->amfnumber = amfnumber;
+ vbuf->timestamp = timestamp;
+ vbuf->type = type;
+ vbuf->stream_id = stream_id;
+ vbuf->len = len;
+ vbuf->flags = flags;
+ vbuf->message = malloc(len);
+ switch_assert(vbuf->message);
+
+ memcpy(vbuf->message, message, len);
+
+ switch_queue_push(rsession->video_send_queue, (void *)vbuf);
+ switch_mutex_unlock(rsession->socket_mutex);
+}
+
/* Break message down into 128 bytes chunks, add the appropriate headers and send it out */
-switch_status_t rtmp_send_message(rtmp_session_t *rsession, uint8_t amfnumber, uint32_t timestamp, uint8_t type, uint32_t stream_id, const unsigned char *message, switch_size_t len, uint32_t flags)
+switch_status_t _rtmp_send_message(rtmp_session_t *rsession, uint8_t amfnumber, uint32_t timestamp, uint8_t type, uint32_t stream_id, const unsigned char *message, switch_size_t len, uint32_t flags)
{
switch_size_t pos = 0;
uint8_t header[12] = { amfnumber & 0x3F, INT24(0), INT24(len), type, INT32_LE(stream_id) };
// switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "%d send_ack=%d send=%d window=%d wait_ack=%d\n",
// type, rsession->send_ack, rsession->send, rsession->send_ack_window, rsession->send + 3073 - rsession->send_ack);
- if (type == RTMP_TYPE_VIDEO) {
- uint32_t window = rsession->send_ack_window;
-
- if (rsession->media_debug & RTMP_MD_VIDEO_WRITE) {
- switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "W V ts:%u data:0x%02x len:%" SWITCH_SIZE_T_FMT "\n", timestamp, *message, len);
- }
-
- /* start to drop video frame on window/2 if the frame is a non-IDR video frame
- start to drop video frame on window * 3/4 if the frame is a IDR frame
- start to drop audio frame on widnow full
- */
-
- if (*message == 0x17) {
- window = window / 4 * 3;
- } else {
- window /= 2;
- }
-
- if ((rsession->send_ack + window) < (rsession->send + 3073)) {
- /* We're sending too fast, drop the frame */
- rsession->dropped_video_frame++;
- switch_log_printf(SWITCH_CHANNEL_UUID_LOG(rsession->uuid), SWITCH_LOG_DEBUG,
- "DROP VIDEO FRAME [amfnumber=%d type=0x%x stream_id=0x%x ftype=0x%x] len=%"SWITCH_SIZE_T_FMT
- " dropped=%"SWITCH_SIZE_T_FMT"\n",
- amfnumber, type, stream_id, *message, len, rsession->dropped_video_frame);
- return SWITCH_STATUS_SUCCESS;
- }
-
- if (rsession->dropped_video_frame) {
- if (*message != 0x17) {
- rsession->dropped_video_frame++;
- switch_log_printf(SWITCH_CHANNEL_UUID_LOG(rsession->uuid), SWITCH_LOG_DEBUG,
- "DROP VIDEO FRAME [amfnumber=%d type=0x%x stream_id=0x%x ftype=0x%x] len=%"SWITCH_SIZE_T_FMT
- " dropped=%"SWITCH_SIZE_T_FMT" waiting for the next IDR\n",
- amfnumber, type, stream_id, *message, len, rsession->dropped_video_frame);
-
- return SWITCH_STATUS_SUCCESS;
- } else {
- switch_log_printf(SWITCH_CHANNEL_UUID_LOG(rsession->uuid), SWITCH_LOG_INFO,
- "Got IDR frame after %"SWITCH_SIZE_T_FMT" frame(s) dropped\n",
- rsession->dropped_video_frame);
- rsession->dropped_video_frame = 0;
- }
- }
- }
-
if (type == RTMP_TYPE_AUDIO && (rsession->media_debug & RTMP_MD_AUDIO_WRITE)) {
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "W A ts:%u data:0x%02x len:%" SWITCH_SIZE_T_FMT "\n", timestamp, *message, len);
}
header[3] = timestamp & 0xFF;
}
+ // switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_INFO, "=== send type: %d ts: %d bytes: %zu\n", type, timestamp, len);
+
state->ts = timestamp;
state->type = type;
state->origlen = len;
return status;
}
+switch_status_t rtmp_send_message(rtmp_session_t *rsession, uint8_t amfnumber, uint32_t timestamp, uint8_t type, uint32_t stream_id, const unsigned char *message, switch_size_t len, uint32_t flags)
+{
+ switch_status_t status = SWITCH_STATUS_SUCCESS;
+ int window = rsession->send_ack_window;
+
+ // switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "%d send_ack=%d send=%d window=%d wait_ack=%d\n",
+ // type, rsession->send_ack, rsession->send, rsession->send_ack_window, rsession->send + 3073 - rsession->send_ack);
+
+ if (type != RTMP_TYPE_VIDEO) {
+ return _rtmp_send_message(rsession, amfnumber, timestamp, type, stream_id, message, len, flags);
+ }
+
+ if (rsession->media_debug & RTMP_MD_VIDEO_WRITE) {
+ switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "W V ts:%u data:0x%02x len:%" SWITCH_SIZE_T_FMT "\n", timestamp, *message, len);
+ }
+
+ window = window / 4 * 3;
+ // window = 65000;
+
+ if ((rsession->send_ack + window) < (rsession->send + 3073)) {
+ buffer_video_send(rsession, amfnumber, timestamp, type, stream_id, message, len, flags);
+ switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "queued %zu bytes, ts: %d, queue size:%d\n", len, timestamp, switch_queue_size(rsession->video_send_queue));
+ return SWITCH_STATUS_SUCCESS;
+ }
+
+ if (rsession->video_send_queue && switch_queue_size(rsession->video_send_queue)) {
+ if (*message == 0x17) { // key frame
+ switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "Got a key frame, flush video queue %d\n", switch_queue_size(rsession->video_send_queue));
+ flush_video_send_queue(rsession, SWITCH_TRUE);
+ return _rtmp_send_message(rsession, amfnumber, timestamp, type, stream_id, message, len, flags);
+ } else {
+ int x = 0;
+ void *pop = NULL;
+
+ buffer_video_send(rsession, amfnumber, timestamp, type, stream_id, message, len, flags);
+ switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "queued %zu bytes, ts: %d, queue size:%d\n", len, timestamp, switch_queue_size(rsession->video_send_queue));
+
+ again:
+ switch_mutex_lock(rsession->socket_mutex);
+ switch_queue_trypop(rsession->video_send_queue, &pop);
+ switch_mutex_unlock(rsession->socket_mutex);
+
+ if (pop) {
+ video_send_buffer_t *vbuf = (video_send_buffer_t *)pop;
+
+ amfnumber = vbuf->amfnumber;
+ // timestamp = vbuf->timestamp;
+ type = vbuf->type;
+ stream_id = vbuf->stream_id;
+ len = vbuf->len;
+ flags = vbuf->flags;
+ message = vbuf->message;
+
+ switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "pop len: %zu, ts: %d, queue size: %d\n", len, timestamp, switch_queue_size(rsession->video_send_queue));
+
+ status = _rtmp_send_message(rsession, amfnumber, timestamp, type, stream_id, message, len, flags);
+
+ free(vbuf->message);
+ free(vbuf);
+
+ if (status == SWITCH_STATUS_SUCCESS && ((rsession->send_ack + window) >= (rsession->send + 3073) && (++x < 3))) {
+ pop = NULL;
+ goto again;
+ }
+ }
+ }
+ } else {
+ return _rtmp_send_message(rsession, amfnumber, timestamp, type, stream_id, message, len, flags);
+ }
+
+ return status;
+}
+
/* Returns SWITCH_STATUS_SUCCESS of the connection is still active or SWITCH_STATUS_FALSE to tear it down */
switch_status_t rtmp_handle_data(rtmp_session_t *rsession)
{