]> git.ipfire.org Git - thirdparty/freeswitch.git/commitdiff
FS-8406 #resolve #comment improvement to drop video packets on slow rtmp link
authorSeven Du <dujinfang@gmail.com>
Thu, 4 Feb 2016 00:32:06 +0000 (08:32 +0800)
committerSeven Du <dujinfang@gmail.com>
Fri, 18 Mar 2016 01:05:45 +0000 (09:05 +0800)
src/mod/endpoints/mod_rtmp/mod_rtmp.c
src/mod/endpoints/mod_rtmp/mod_rtmp.h
src/mod/endpoints/mod_rtmp/rtmp.c
src/mod/endpoints/mod_rtmp/rtmp_tcp.c
src/mod/endpoints/mod_rtmp/rtmp_video.c

index a838fa34237b4590003c6133ac5a95004a4d76df..715e13dc9588049e02fce3af9f3d0d6f55be12aa 100644 (file)
@@ -911,7 +911,6 @@ switch_status_t rtmp_session_request(rtmp_profile_t *profile, rtmp_session_t **n
        {
                char buf[1024];
 #ifndef _WIN32
-#else
                snprintf(buf, sizeof(buf), "/tmp/rtmp-%s-in.txt", (*newsession)->uuid);
                (*newsession)->io_debug_in = fopen(buf, "w");
                snprintf(buf, sizeof(buf), "/tmp/rtmp-%s-out.txt", (*newsession)->uuid);
index d75410c3159e2eee7c52c83d48542c06c36d1a0f..4edfb8dc45bd140206d33941f0b7983e92c68357 100644 (file)
@@ -515,6 +515,7 @@ struct rtmp_session {
        uint32_t media_streamid;                        /* < The stream id that was used for the last "play" command,
                                                                                        where we should send media */
        switch_size_t dropped_video_frame;
+       switch_queue_t *video_send_queue;
 
        uint8_t media_debug;
 };
index 8c132634e768a6f8bf0f71271f2201e6ffbbeea2..285bc81103f87635a467c542e5d5e2b17ba5cb6c 100644 (file)
@@ -41,6 +41,17 @@ typedef struct {
        size_t len;
 } buffer_helper_t;
 
+typedef struct {
+       uint8_t amfnumber;
+       uint32_t timestamp;
+       uint8_t type;
+       uint32_t stream_id;
+       switch_size_t len;
+       uint32_t flags;
+       unsigned char *message;
+} video_send_buffer_t;
+
+
 size_t my_buffer_read(void * out_buffer, size_t size, void * user_data)
 {
        buffer_helper_t *helper = (buffer_helper_t*)user_data;
@@ -561,8 +572,62 @@ switch_status_t rtmp_send_invoke_v(rtmp_session_t *rsession, uint8_t amfnumber,
        return rtmp_send_message(rsession, amfnumber, timestamp, type, stream_id, buf, helper.pos, 0);
 }
 
+static int flush_video_send_queue(rtmp_session_t *rsession, switch_bool_t lock)
+{
+       video_send_buffer_t *b;
+       void *pop;
+       switch_queue_t *q = rsession->video_send_queue;
+       int x = 0;
+
+       if (!q) return 0;
+
+       if (lock) switch_mutex_lock(rsession->socket_mutex);
+       while (switch_queue_size(q) > 0 && switch_queue_trypop(q, &pop) == SWITCH_STATUS_SUCCESS && pop) {
+               b = (video_send_buffer_t *)pop;
+               free(b->message);
+               free(b);
+               x++;
+       }
+       if (lock) switch_mutex_unlock(rsession->socket_mutex);
+       switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "Dropped %d Video Frames\n", x);
+
+       return x;
+}
+
+static void buffer_video_send(rtmp_session_t *rsession, uint8_t amfnumber, uint32_t timestamp, uint8_t type, uint32_t stream_id, const unsigned char *message, switch_size_t len, uint32_t flags)
+{
+       video_send_buffer_t *vbuf;
+
+       switch_mutex_lock(rsession->socket_mutex);
+
+       if (!rsession->video_send_queue) {
+               switch_queue_create(&rsession->video_send_queue, 1000, rsession->pool);
+       }
+
+       if (*message == 0x17) {
+               flush_video_send_queue(rsession, SWITCH_FALSE);
+       }
+
+       vbuf = malloc(sizeof(video_send_buffer_t));
+       switch_assert(vbuf);
+
+       vbuf->amfnumber = amfnumber;
+       vbuf->timestamp = timestamp;
+       vbuf->type      = type;
+       vbuf->stream_id = stream_id;
+       vbuf->len = len;
+       vbuf->flags = flags;
+       vbuf->message = malloc(len);
+       switch_assert(vbuf->message);
+
+       memcpy(vbuf->message, message, len);
+
+       switch_queue_push(rsession->video_send_queue, (void *)vbuf);
+       switch_mutex_unlock(rsession->socket_mutex);
+}
+
 /* Break message down into 128 bytes chunks, add the appropriate headers and send it out */
-switch_status_t rtmp_send_message(rtmp_session_t *rsession, uint8_t amfnumber, uint32_t timestamp, uint8_t type, uint32_t stream_id, const unsigned char *message, switch_size_t len, uint32_t flags)
+switch_status_t _rtmp_send_message(rtmp_session_t *rsession, uint8_t amfnumber, uint32_t timestamp, uint8_t type, uint32_t stream_id, const unsigned char *message, switch_size_t len, uint32_t flags)
 {
        switch_size_t pos = 0;
        uint8_t header[12] =  { amfnumber & 0x3F, INT24(0), INT24(len), type, INT32_LE(stream_id) };
@@ -575,52 +640,6 @@ switch_status_t rtmp_send_message(rtmp_session_t *rsession, uint8_t amfnumber, u
        // switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "%d send_ack=%d send=%d window=%d wait_ack=%d\n",
        //      type, rsession->send_ack, rsession->send, rsession->send_ack_window, rsession->send + 3073 - rsession->send_ack);
 
-       if (type == RTMP_TYPE_VIDEO) {
-               uint32_t window = rsession->send_ack_window;
-
-               if (rsession->media_debug & RTMP_MD_VIDEO_WRITE) {
-                       switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "W V ts:%u data:0x%02x len:%" SWITCH_SIZE_T_FMT "\n", timestamp, *message, len);
-               }
-
-               /* start to drop video frame on window/2 if the frame is a non-IDR video frame
-                  start to drop video frame on window * 3/4 if the frame is a IDR frame
-                  start to drop audio frame on widnow full
-                */
-
-               if (*message == 0x17) {
-                       window = window / 4 * 3;
-               } else {
-                       window /= 2;
-               }
-
-               if ((rsession->send_ack + window) < (rsession->send + 3073)) {
-                       /* We're sending too fast, drop the frame */
-                       rsession->dropped_video_frame++;
-                       switch_log_printf(SWITCH_CHANNEL_UUID_LOG(rsession->uuid), SWITCH_LOG_DEBUG,
-                               "DROP VIDEO FRAME [amfnumber=%d type=0x%x stream_id=0x%x ftype=0x%x] len=%"SWITCH_SIZE_T_FMT
-                               " dropped=%"SWITCH_SIZE_T_FMT"\n",
-                               amfnumber, type, stream_id, *message, len, rsession->dropped_video_frame);
-                       return SWITCH_STATUS_SUCCESS;
-               }
-
-               if (rsession->dropped_video_frame) {
-                       if (*message != 0x17) {
-                               rsession->dropped_video_frame++;
-                               switch_log_printf(SWITCH_CHANNEL_UUID_LOG(rsession->uuid), SWITCH_LOG_DEBUG,
-                                       "DROP VIDEO FRAME [amfnumber=%d type=0x%x stream_id=0x%x ftype=0x%x] len=%"SWITCH_SIZE_T_FMT
-                                       " dropped=%"SWITCH_SIZE_T_FMT" waiting for the next IDR\n",
-                                       amfnumber, type, stream_id, *message, len, rsession->dropped_video_frame);
-
-                               return SWITCH_STATUS_SUCCESS;
-                       } else {
-                               switch_log_printf(SWITCH_CHANNEL_UUID_LOG(rsession->uuid), SWITCH_LOG_INFO,
-                                       "Got IDR frame after %"SWITCH_SIZE_T_FMT" frame(s) dropped\n",
-                                       rsession->dropped_video_frame);
-                               rsession->dropped_video_frame = 0;
-                       }
-               }
-       }
-
        if (type == RTMP_TYPE_AUDIO && (rsession->media_debug & RTMP_MD_AUDIO_WRITE)) {
                switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "W A ts:%u data:0x%02x len:%" SWITCH_SIZE_T_FMT "\n", timestamp, *message, len);
        }
@@ -696,6 +715,8 @@ switch_status_t rtmp_send_message(rtmp_session_t *rsession, uint8_t amfnumber, u
                header[3] = timestamp & 0xFF;
        }
 
+       // switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_INFO, "=== send type: %d ts: %d bytes: %zu\n", type, timestamp, len);
+
        state->ts = timestamp;
        state->type = type;
        state->origlen = len;
@@ -740,6 +761,79 @@ end:
        return status;
 }
 
+switch_status_t rtmp_send_message(rtmp_session_t *rsession, uint8_t amfnumber, uint32_t timestamp, uint8_t type, uint32_t stream_id, const unsigned char *message, switch_size_t len, uint32_t flags)
+{
+       switch_status_t status = SWITCH_STATUS_SUCCESS;
+       int window = rsession->send_ack_window;
+
+       // switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "%d send_ack=%d send=%d window=%d wait_ack=%d\n",
+       //      type, rsession->send_ack, rsession->send, rsession->send_ack_window, rsession->send + 3073 - rsession->send_ack);
+
+       if (type != RTMP_TYPE_VIDEO) {
+               return _rtmp_send_message(rsession, amfnumber, timestamp, type, stream_id, message, len, flags);
+       }
+
+       if (rsession->media_debug & RTMP_MD_VIDEO_WRITE) {
+               switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "W V ts:%u data:0x%02x len:%" SWITCH_SIZE_T_FMT "\n", timestamp, *message, len);
+       }
+
+       window = window / 4 * 3;
+       // window = 65000;
+
+       if ((rsession->send_ack + window) < (rsession->send + 3073)) {
+               buffer_video_send(rsession, amfnumber, timestamp, type, stream_id, message, len, flags);
+               switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "queued %zu bytes, ts: %d, queue size:%d\n", len, timestamp, switch_queue_size(rsession->video_send_queue));
+               return SWITCH_STATUS_SUCCESS;
+       }
+
+       if (rsession->video_send_queue && switch_queue_size(rsession->video_send_queue)) {
+               if (*message == 0x17) { // key frame
+                       switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "Got a key frame, flush video queue %d\n", switch_queue_size(rsession->video_send_queue));
+                       flush_video_send_queue(rsession, SWITCH_TRUE);
+                       return _rtmp_send_message(rsession, amfnumber, timestamp, type, stream_id, message, len, flags);
+               } else {
+                       int x = 0;
+                       void *pop = NULL;
+
+                       buffer_video_send(rsession, amfnumber, timestamp, type, stream_id, message, len, flags);
+                       switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "queued %zu bytes, ts: %d, queue size:%d\n", len, timestamp, switch_queue_size(rsession->video_send_queue));
+
+                       again:
+                       switch_mutex_lock(rsession->socket_mutex);
+                       switch_queue_trypop(rsession->video_send_queue, &pop);
+                       switch_mutex_unlock(rsession->socket_mutex);
+
+                       if (pop) {
+                               video_send_buffer_t *vbuf = (video_send_buffer_t *)pop;
+
+                               amfnumber = vbuf->amfnumber;
+                               // timestamp = vbuf->timestamp;
+                               type = vbuf->type;
+                               stream_id = vbuf->stream_id;
+                               len = vbuf->len;
+                               flags = vbuf->flags;
+                               message = vbuf->message;
+
+                               switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "pop len: %zu, ts: %d, queue size: %d\n", len, timestamp, switch_queue_size(rsession->video_send_queue));
+
+                               status = _rtmp_send_message(rsession, amfnumber, timestamp, type, stream_id, message, len, flags);
+
+                               free(vbuf->message);
+                               free(vbuf);
+
+                               if (status == SWITCH_STATUS_SUCCESS && ((rsession->send_ack + window) >= (rsession->send + 3073) && (++x < 3))) {
+                                       pop = NULL;
+                                       goto again;
+                               }
+                       }
+               }
+       } else {
+               return _rtmp_send_message(rsession, amfnumber, timestamp, type, stream_id, message, len, flags);
+       }
+
+       return status;
+}
+
 /* Returns SWITCH_STATUS_SUCCESS of the connection is still active or SWITCH_STATUS_FALSE to tear it down */
 switch_status_t rtmp_handle_data(rtmp_session_t *rsession)
 {
index 4389cef0ac649afe70acf433fafecec301855cfd..f0156e406dc69b7cd63bbcbfa9022c99fb9c6ef4 100644 (file)
@@ -301,6 +301,10 @@ switch_status_t rtmp_tcp_init(rtmp_profile_t *profile, const char *bindaddr, rtm
        if (switch_socket_opt_set(io_tcp->listen_socket, SWITCH_SO_TCP_NODELAY, 1)) {
                goto fail;
        }
+       if (1) {
+               switch_socket_opt_set(io_tcp->listen_socket, SWITCH_SO_RCVBUF, 1572864);
+               switch_socket_opt_set(io_tcp->listen_socket, SWITCH_SO_SNDBUF, 1572864);
+       }
        if (switch_socket_bind(io_tcp->listen_socket, sa)) {
                goto fail;
        }
index 04b2d3c91231fc62671295388b77e7e3b0544e36..6b9ca84b965b78bd9039d42057c984b992bdb2bd 100644 (file)
@@ -583,7 +583,7 @@ switch_status_t rtmp_write_video_frame(switch_core_session_t *session, switch_fr
        rtmp_rtp2rtmpH264(helper, frame);
 
        if (helper->send) {
-               uint16_t used = switch_buffer_inuse(helper->rtmp_buf);
+               uint32_t used = switch_buffer_inuse(helper->rtmp_buf);
                const void *rtmp_data = NULL;
 
                switch_buffer_peek_zerocopy(helper->rtmp_buf, &rtmp_data);
@@ -633,6 +633,11 @@ switch_status_t rtmp_write_video_frame(switch_core_session_t *session, switch_fr
                                switch_core_session_rwunlock(other_session);
                        }
                }
+
+               if (rsession->video_send_queue && switch_queue_size(rsession->video_send_queue) > 30) {
+                       switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "Need a key frame\n");
+                       switch_channel_set_flag(channel, CF_VIDEO_REFRESH_REQ);
+               }
 skip:
                switch_buffer_zero(helper->rtmp_buf);
                switch_buffer_zero(helper->fua_buf);