]> git.ipfire.org Git - thirdparty/freeswitch.git/commitdiff
FSCORE-639 with some additional changes
authorAnthony Minessale <anthm@freeswitch.org>
Thu, 29 Jul 2010 22:41:23 +0000 (17:41 -0500)
committerAnthony Minessale <anthm@freeswitch.org>
Thu, 29 Jul 2010 22:41:33 +0000 (17:41 -0500)
src/include/switch_types.h
src/mod/endpoints/mod_sofia/mod_sofia.c
src/switch_core_timer.c
src/switch_ivr_play_say.c
src/switch_rtp.c
src/switch_time.c

index 1808aa167ec1b13d0dbab428d275007ec24a5f79..f719c108b2df85ecadc1a5939d32f3524af7d9d0 100644 (file)
@@ -1289,7 +1289,8 @@ typedef uint32_t switch_file_flag_t;
 
 typedef enum {
        SWITCH_IO_FLAG_NONE = 0,
-       SWITCH_IO_FLAG_NOBLOCK = (1 << 0)
+       SWITCH_IO_FLAG_NOBLOCK = (1 << 0),
+       SWITCH_IO_FLAG_SINGLE_READ = (1 << 1)
 } switch_io_flag_enum_t;
 typedef uint32_t switch_io_flag_t;
 
index 79a393a46448750c828c5430ab9d8136fe1a6179..ba8f7b71c7c4f2e971b8fdef0d3117fc89a89c03 100644 (file)
@@ -1369,7 +1369,7 @@ static switch_status_t sofia_receive_message(switch_core_session_t *session, swi
                                } else {
                                        ok = sofia_test_pflag(tech_pvt->profile, PFLAG_RTP_AUTOFLUSH_DURING_BRIDGE);
                                }
-
+                               
                                if (ok) {
                                        rtp_flush_read_buffer(tech_pvt->rtp_session, SWITCH_RTP_FLUSH_STICK);
                                } else {
index e7f908cc0e96fcdeec3e3d3ed6e940c98c9b5fa1..8d563b08b5edff918b23c94bef243ce5af0ee9af 100644 (file)
@@ -48,7 +48,7 @@ SWITCH_DECLARE(switch_status_t) switch_core_timer_init(switch_timer_t *timer, co
 
        timer->interval = interval;
        timer->samples = samples;
-       timer->samplecount = 0;
+       timer->samplecount = samples;
        timer->timer_interface = timer_interface;
 
        if (pool) {
index bc1fbadd0007d34fcf00ec9ebe7e515fa351541e..200126c1789d934a3c585e65ff5f31b0cc0ffa8f 100644 (file)
@@ -915,6 +915,7 @@ SWITCH_DECLARE(switch_status_t) switch_ivr_play_file(switch_core_session_t *sess
        int done = 0;
        int timeout_samples = 0;
        const char *var;
+       int more_data = 0;
 
        if (switch_channel_pre_answer(channel) != SWITCH_STATUS_SUCCESS) {
                return SWITCH_STATUS_FALSE;
@@ -1190,6 +1191,7 @@ SWITCH_DECLARE(switch_status_t) switch_ivr_play_file(switch_core_session_t *sess
                                status = SWITCH_STATUS_GENERR;
                                continue;
                        }
+                       switch_core_timer_sync(&timer); // Sync timer
                        switch_log_printf(SWITCH_CHANNEL_SESSION_LOG(session), SWITCH_LOG_DEBUG, "Setup timer success %u bytes per %d ms!\n", len, interval);
                }
                write_frame.rate = fh->samplerate;
@@ -1384,6 +1386,41 @@ SWITCH_DECLARE(switch_status_t) switch_ivr_play_file(switch_core_session_t *sess
                                olen = llen;
                        }
 
+                       if (!more_data) {
+                               if (timer_name) {
+                                       if (switch_core_timer_next(&timer) != SWITCH_STATUS_SUCCESS) {
+                                               break;
+                                       }
+                               } else {                        /* time off the channel (if you must) */
+                                       switch_frame_t *read_frame;
+                                       switch_status_t tstatus;
+                                       
+                                       while (switch_channel_ready(channel) && switch_channel_test_flag(channel, CF_HOLD)) {
+                                               switch_yield(10000);
+                                       }
+
+                                       tstatus = switch_core_session_read_frame(session, &read_frame, SWITCH_IO_FLAG_SINGLE_READ, 0);
+
+                                       if (!SWITCH_READ_ACCEPTABLE(tstatus)) {
+                                               break;
+                                       }
+
+                                       if (args && (args->read_frame_callback)) {
+                                               int ok = 1;
+                                               switch_set_flag(fh, SWITCH_FILE_CALLBACK);
+                                               if (args->read_frame_callback(session, read_frame, args->user_data) != SWITCH_STATUS_SUCCESS) {
+                                                       ok = 0;
+                                               }
+                                               switch_clear_flag(fh, SWITCH_FILE_CALLBACK);
+                                               if (!ok) {
+                                                       break;
+                                               }
+                                       }
+                               }
+                       }
+
+                       more_data = 0;
+
                        write_frame.samples = (uint32_t) olen;
 
                        if (asis) {
@@ -1424,6 +1461,7 @@ SWITCH_DECLARE(switch_status_t) switch_ivr_play_file(switch_core_session_t *sess
 
                        if (status == SWITCH_STATUS_MORE_DATA) {
                                status = SWITCH_STATUS_SUCCESS;
+                               more_data = 1;
                                continue;
                        } else if (status != SWITCH_STATUS_SUCCESS) {
                                done = 1;
@@ -1433,36 +1471,6 @@ SWITCH_DECLARE(switch_status_t) switch_ivr_play_file(switch_core_session_t *sess
                        if (done) {
                                break;
                        }
-
-                       if (timer_name) {
-                               if (switch_core_timer_next(&timer) != SWITCH_STATUS_SUCCESS) {
-                                       break;
-                               }
-                       } else {                        /* time off the channel (if you must) */
-                               switch_frame_t *read_frame;
-                               switch_status_t tstatus;
-                               while (switch_channel_ready(channel) && switch_channel_test_flag(channel, CF_HOLD)) {
-                                       switch_yield(10000);
-                               }
-
-                               tstatus = switch_core_session_read_frame(session, &read_frame, SWITCH_IO_FLAG_NONE, 0);
-
-                               if (!SWITCH_READ_ACCEPTABLE(tstatus)) {
-                                       break;
-                               }
-
-                               if (args && (args->read_frame_callback)) {
-                                       int ok = 1;
-                                       switch_set_flag(fh, SWITCH_FILE_CALLBACK);
-                                       if (args->read_frame_callback(session, read_frame, args->user_data) != SWITCH_STATUS_SUCCESS) {
-                                               ok = 0;
-                                       }
-                                       switch_clear_flag(fh, SWITCH_FILE_CALLBACK);
-                                       if (!ok) {
-                                               break;
-                                       }
-                               }
-                       }
                }
 
                switch_log_printf(SWITCH_CHANNEL_SESSION_LOG(session), SWITCH_LOG_DEBUG, "done playing file\n");
@@ -2177,6 +2185,8 @@ SWITCH_DECLARE(switch_status_t) switch_ivr_speak_text(switch_core_session_t *ses
                        switch_log_printf(SWITCH_CHANNEL_SESSION_LOG(session), SWITCH_LOG_DEBUG, "Setup timer success %u bytes per %d ms!\n", sh->samples * 2,
                                                          interval);
                }
+               switch_core_timer_sync(timer); // Sync timer
+
                /* start a thread to absorb incoming audio */
                switch_core_service_session(session);
 
index a123d93eea8fb6a48f19d850fa607693c322ae69..cec5c6be358b2ee1128db012b80c0aba18505c2e 100644 (file)
@@ -135,6 +135,17 @@ struct switch_rtp_rfc2833_data {
        switch_mutex_t *dtmf_mutex;
 };
 
+
+#define FLUSH_MAX 5
+#define MAX_MSG 6
+
+struct rtp_packet {
+       rtp_msg_t recv_msg;
+       switch_size_t bytes;
+};
+
+typedef struct rtp_packet rtp_packet_t;
+
 struct switch_rtp {
        /* 
         * Two sockets are needed because we might be transcoding protocol families
@@ -151,7 +162,12 @@ struct switch_rtp {
        rtcp_msg_t rtcp_send_msg;
 
        switch_sockaddr_t *remote_addr, *rtcp_remote_addr;
+
        rtp_msg_t recv_msg;
+       rtp_packet_t recv_msg_array[MAX_MSG];
+       int recv_msg_idx;
+
+
        rtcp_msg_t rtcp_recv_msg;
 
        switch_sockaddr_t *remote_stun_addr;
@@ -225,7 +241,7 @@ struct switch_rtp {
        uint32_t cng_count;
        switch_rtp_bug_flag_t rtp_bugs;
        switch_rtp_stats_t stats;
-       uint32_t hot_hits;
+       //uint32_t hot_hits;
        uint32_t sync_packets;
        int rtcp_interval;
        switch_bool_t rtcp_fresh_frame;
@@ -239,6 +255,7 @@ struct switch_rtp {
 #endif
 
        switch_time_t send_time;
+       //int more_data;
 };
 
 struct switch_rtcp_senderinfo {
@@ -2057,13 +2074,59 @@ static switch_status_t read_rtp_packet(switch_rtp_t *rtp_session, switch_size_t
 {
        switch_status_t status = SWITCH_STATUS_FALSE;
        stfu_frame_t *jb_frame;
+       int i = 0;
 
+       
        switch_assert(bytes);
 
-       *bytes = sizeof(rtp_msg_t);
-       status = switch_socket_recvfrom(rtp_session->from_addr, rtp_session->sock_input, 0, (void *) &rtp_session->recv_msg, bytes);
+       *bytes = 0;
+
+ top:
+
+       if ((switch_test_flag(rtp_session, SWITCH_RTP_FLAG_AUTOFLUSH) || switch_test_flag(rtp_session, SWITCH_RTP_FLAG_STICKY_FLUSH))) {
+               if (rtp_session->recv_msg_idx) {
+                       rtp_session->recv_msg = rtp_session->recv_msg_array[0].recv_msg;
+                       *bytes = rtp_session->recv_msg_array[0].bytes;
+                       
+                       for (i = 1; i < MAX_MSG - 1; i++) {
+                               rtp_session->recv_msg_array[i-1] = rtp_session->recv_msg_array[i];
+                       }
+                       rtp_session->recv_msg_idx--;
+                       status = SWITCH_STATUS_SUCCESS;
+                       goto got_data;
+               }
+
+               
+               while(rtp_session->recv_msg_idx < MAX_MSG) {
+                       switch_status_t rstatus;
+                       switch_size_t rb = sizeof(rtp_msg_t);
+                       
+                       rstatus = switch_socket_recvfrom(rtp_session->from_addr,
+                                                                                        rtp_session->sock_input, 0,
+                                                                                        (void *) &rtp_session->recv_msg_array[rtp_session->recv_msg_idx].recv_msg, 
+                                                                                        &rb);
+                       
+                       if ((rstatus != SWITCH_STATUS_SUCCESS && rstatus != SWITCH_STATUS_BREAK) || rb < 0) {
+                               *bytes = rb;
+                               return rstatus;
+                       }
+                       
+                       if (!rb) break;
+                       
+                       rtp_session->recv_msg_array[rtp_session->recv_msg_idx].bytes = rb;
+                       rtp_session->recv_msg_idx++;
+               }
+
+               if (!*bytes && rtp_session->recv_msg_idx) goto top;
+       } else {
+               *bytes = sizeof(rtp_msg_t);
+               status = switch_socket_recvfrom(rtp_session->from_addr, rtp_session->sock_input, 0, (void *) &rtp_session->recv_msg, bytes);
+       }
+       
+ got_data:
 
        if (*bytes) {
+               
                rtp_session->stats.inbound.raw_bytes += *bytes;
                if (rtp_session->recv_te && rtp_session->recv_msg.header.pt == rtp_session->recv_te) {
                        rtp_session->stats.inbound.dtmf_packet_count++;
@@ -2235,6 +2298,7 @@ static int rtp_common_read(switch_rtp_t *rtp_session, switch_payload_t *payload_
        int fdr = 0;
        int rtcp_fdr = 0;
        int hot_socket = 0;
+       int read_loops = 0;
 
        if (session) {
                channel = switch_core_session_get_channel(session);
@@ -2253,36 +2317,36 @@ static int rtp_common_read(switch_rtp_t *rtp_session, switch_payload_t *payload_
        while (switch_rtp_ready(rtp_session)) {
                int do_cng = 0;
                bytes = 0;
+               read_loops++;
 
                if (rtp_session->timer.interval) {
-                       if ((switch_test_flag(rtp_session, SWITCH_RTP_FLAG_AUTOFLUSH) || switch_test_flag(rtp_session, SWITCH_RTP_FLAG_STICKY_FLUSH)) &&
-                               rtp_session->read_pollfd) {
-                               if (switch_poll(rtp_session->read_pollfd, 1, &fdr, 0) == SWITCH_STATUS_SUCCESS) {
-                                       rtp_session->hot_hits += rtp_session->samples_per_interval;
-
-                                       if (rtp_session->hot_hits >= rtp_session->samples_per_second * 5) {
-                                               switch_set_flag(rtp_session, SWITCH_RTP_FLAG_FLUSH);
-                                               hot_socket = 1;
-                                       }
-                               } else {
-                                       rtp_session->hot_hits = 0;
-                               }
+                       if ((switch_test_flag(rtp_session, SWITCH_RTP_FLAG_AUTOFLUSH) || switch_test_flag(rtp_session, SWITCH_RTP_FLAG_STICKY_FLUSH)) && 
+                               rtp_session->recv_msg_idx > FLUSH_MAX) {
+                               hot_socket = 1;
+                       } else {
+                               hot_socket = 0;
                        }
-
+                       
                        if (hot_socket) {
                                rtp_session->sync_packets++;
                                switch_core_timer_sync(&rtp_session->timer);
                        } else {
                                if (rtp_session->sync_packets) {
 #if 0
-                                       switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG,
+                                       switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_CRIT,
                                                                          "Auto-Flush catching up %d packets (%d)ms.\n",
                                                                          rtp_session->sync_packets, (rtp_session->ms_per_packet * rtp_session->sync_packets) / 1000);
 #endif
                                        rtp_session->sync_packets = 0;
                                }
+
                                switch_core_timer_next(&rtp_session->timer);
                        }
+
+
+
+
+                       
                }
 
        recvfrom:
@@ -2405,7 +2469,13 @@ static int rtp_common_read(switch_rtp_t *rtp_session, switch_payload_t *payload_
                        goto end;
                }
 
-               if (rtp_session->max_missed_packets) {
+               if (!bytes && (io_flags & SWITCH_IO_FLAG_NOBLOCK)) {
+                       rtp_session->missed_count = 0;
+                       ret = 0;
+                       goto end;
+               }
+
+               if (rtp_session->max_missed_packets && read_loops == 1) {
                        if (bytes) {
                                rtp_session->missed_count = 0;
                        } else if (++rtp_session->missed_count >= rtp_session->max_missed_packets) {
@@ -2553,10 +2623,6 @@ static int rtp_common_read(switch_rtp_t *rtp_session, switch_payload_t *payload_
                        }
                }
 
-               if (!bytes && (io_flags & SWITCH_IO_FLAG_NOBLOCK)) {
-                       return_cng_frame();
-               }
-
 
                if (check && switch_test_flag(rtp_session, SWITCH_RTP_FLAG_AUTO_CNG) &&
                        rtp_session->timer.samplecount >= (rtp_session->last_write_samplecount + (rtp_session->samples_per_interval * 50))) {
@@ -2814,17 +2880,16 @@ static int rtp_common_read(switch_rtp_t *rtp_session, switch_payload_t *payload_
                                        switch_cond_next();
                                        continue;
                                }
-
+                               
                                return_cng_frame();
                        }
                }
-
+               
                if (status == SWITCH_STATUS_BREAK || bytes == 0) {
-                       if (switch_test_flag(rtp_session, SWITCH_RTP_FLAG_DATAWAIT)) {
+                       if (!(io_flags & SWITCH_IO_FLAG_SINGLE_READ) && switch_test_flag(rtp_session, SWITCH_RTP_FLAG_DATAWAIT)) {
                                goto do_continue;
                        }
-                       ret = 0;
-                       goto end;
+                       return_cng_frame();
                }
 
                if (switch_test_flag(rtp_session, SWITCH_RTP_FLAG_GOOGLEHACK) && rtp_session->recv_msg.header.pt == 102) {
@@ -2856,7 +2921,7 @@ static int rtp_common_read(switch_rtp_t *rtp_session, switch_payload_t *payload_
        }
 
  end:
-
+       
        READ_DEC(rtp_session);
 
        return ret;
@@ -2960,6 +3025,7 @@ SWITCH_DECLARE(switch_status_t) switch_rtp_read(switch_rtp_t *rtp_session, void
 
        bytes = rtp_common_read(rtp_session, payload_type, flags, io_flags);
 
+
        if (bytes < 0) {
                *datalen = 0;
                return bytes == -2 ? SWITCH_STATUS_TIMEOUT : SWITCH_STATUS_GENERR;
@@ -3153,10 +3219,14 @@ static int rtp_common_write(switch_rtp_t *rtp_session,
 
                if (timestamp) {
                        rtp_session->ts = (uint32_t) timestamp;
+                       /* Send marker bit if timestamp is lower/same as before (resetted/new timer) */
+                       if (rtp_session->ts <= rtp_session->last_write_ts) {
+                               m++;
+                       }
                } else if (rtp_session->timer.timer_interface) {
                        rtp_session->ts = rtp_session->timer.samplecount;
 
-                       if (rtp_session->ts <= rtp_session->last_write_ts) {
+                       if (rtp_session->ts <= rtp_session->last_write_ts && rtp_session->ts > 0) {
                                rtp_session->ts = rtp_session->last_write_ts + rtp_session->samples_per_interval;
                        }
                } else {
index 5c07409d93ce0b3a8dc900910f2aa580d19f2cc3..7ffdf4e4d7d7010ae87ad2983bc7b60aa2c34a27 100644 (file)
@@ -550,10 +550,7 @@ static switch_status_t timer_sync(switch_timer_t *timer)
        private_info->reference = timer->tick = TIMER_MATRIX[timer->interval].tick;
 
        /* apply timestamp */
-       if (timer_step(timer) == SWITCH_STATUS_SUCCESS) {
-               /* push the reference into the future to prevent collision */
-               private_info->reference++;
-       }
+       timer_step(timer);
 
        return SWITCH_STATUS_SUCCESS;
 }