uint8_t fir_seq;
uint16_t fir_count;
uint16_t pli_count;
-
+ uint32_t cur_nack;
ts_normalize_t ts_norm;
switch_sockaddr_t *remote_addr, *rtcp_remote_addr;
rtp_msg_t recv_msg;
uint8_t cn;
stfu_instance_t *jb;
switch_vb_t *vb;
+ switch_vb_t *vbw;
uint32_t max_missed_packets;
uint32_t missed_count;
rtp_msg_t write_msg;
rtcp_ok = 0;
}
+ if (rtp_session->flags[SWITCH_RTP_FLAG_NACK] && rtp_session->vb) {
+ rtp_session->cur_nack = switch_vb_pop_nack(rtp_session->vb);
+ }
+
if (rtp_session->rtcp_sock_output && rtp_session->flags[SWITCH_RTP_FLAG_ENABLE_RTCP] &&
!rtp_session->flags[SWITCH_RTP_FLAG_RTCP_PASSTHRU] &&
- ((now - rtp_session->rtcp_last_sent) > rtp_session->rtcp_send_rate * 1000000 || rtp_session->pli_count || rtp_session->fir_count)) {
+ ((now - rtp_session->rtcp_last_sent) > rtp_session->rtcp_send_rate * 1000000 || rtp_session->pli_count || rtp_session->fir_count || rtp_session->cur_nack)) {
switch_rtcp_numbers_t * stats = &rtp_session->stats.rtcp;
struct switch_rtcp_receiver_report *rr;
struct switch_rtcp_sender_report *sr;
rtcp_generate_report_block(rtp_session, rtcp_report_block);
rtp_session->rtcp_send_msg.header.length = htons((uint16_t)(rtcp_bytes / 4) - 1);
-
+
if (rtp_session->flags[SWITCH_RTP_FLAG_VIDEO]) {
if (rtp_session->remote_ssrc == 0) {
rtp_session->remote_ssrc = rtp_session->stats.rtcp.peer_ssrc;
rtcp_bytes += sizeof(switch_rtcp_ext_hdr_t);
rtp_session->pli_count = 0;
}
+
+ if (rtp_session->flags[SWITCH_RTP_FLAG_NACK] && rtp_session->cur_nack) {
+ switch_rtcp_ext_hdr_t *ext_hdr;
+ uint32_t *nack;
+ p = (uint8_t *) (&rtp_session->rtcp_send_msg) + rtcp_bytes;
+ ext_hdr = (switch_rtcp_ext_hdr_t *) p;
+
+ ext_hdr->version = 2;
+ ext_hdr->p = 0;
+ ext_hdr->fmt = 1;
+ ext_hdr->pt = 205;
+ ext_hdr->send_ssrc = htonl(rtp_session->ssrc);
+ ext_hdr->recv_ssrc = htonl(rtp_session->remote_ssrc);
+ ext_hdr->length = htons(3);
+ p += sizeof(switch_rtcp_ext_hdr_t);
+ nack = (uint32_t *) p;
+ *nack = rtp_session->cur_nack;
+
+ switch_log_printf(SWITCH_CHANNEL_SESSION_LOG(rtp_session->session), SWITCH_LOG_DEBUG, "Sending RTCP NACK %u\n",
+ ntohs(*nack & 0xFFFF));
+
+ rtcp_bytes += sizeof(switch_rtcp_ext_hdr_t) + sizeof(rtp_session->cur_nack);
+
+ rtp_session->cur_nack = 0;
+ }
if (rtp_session->fir_count) {
switch_rtcp_ext_hdr_t *ext_hdr;
switch_log_printf(SWITCH_CHANNEL_SESSION_LOG(rtp_session->session), SWITCH_LOG_DEBUG, "Starting video timer.\n");
}
- switch_vb_create(&rtp_session->vb, 5, 30, SWITCH_FALSE);
- switch_vb_debug_level(rtp_session->vb, 10);
+ switch_vb_create(&rtp_session->vb, 5, 30);
+ //switch_vb_debug_level(rtp_session->vb, 10);
switch_log_printf(SWITCH_CHANNEL_SESSION_LOG(rtp_session->session), SWITCH_LOG_DEBUG, "Starting video buffer.\n");
} else {
switch_vb_destroy(&(*rtp_session)->vb);
}
+ if ((*rtp_session)->vbw) {
+ switch_vb_destroy(&(*rtp_session)->vbw);
+ }
+
if ((*rtp_session)->dtls && (*rtp_session)->dtls == (*rtp_session)->rtcp_dtls) {
(*rtp_session)->rtcp_dtls = NULL;
}
my_host = switch_get_addr(bufc, sizeof(bufc), rtp_session->local_addr);
switch_log_printf(SWITCH_CHANNEL_SESSION_LOG_CLEAN(rtp_session->session), SWITCH_LOG_CONSOLE,
- "R %s b=%4ld %s:%u %s:%u %s:%u pt=%d ts=%u m=%d\n",
+ "R %s b=%4ld %s:%u %s:%u %s:%u pt=%d ts=%u seq=%u m=%d\n",
rtp_session->session ? switch_channel_get_name(switch_core_session_get_channel(rtp_session->session)) : "No-Name",
(long) *bytes,
my_host, switch_sockaddr_get_port(rtp_session->local_addr),
old_host, rtp_session->remote_port,
tx_host, switch_sockaddr_get_port(rtp_session->from_addr),
- rtp_session->recv_msg.header.pt, ntohl(rtp_session->recv_msg.header.ts), rtp_session->recv_msg.header.m);
+ rtp_session->recv_msg.header.pt, ntohl(rtp_session->recv_msg.header.ts), ntohs(rtp_session->recv_msg.header.seq),
+ rtp_session->recv_msg.header.m);
}
-
+#ifdef RTP_READ_PLOSS
+ {
+ int r = (rand() % 10000) + 1;
+ if (r <= 200) {
+ switch_log_printf(SWITCH_CHANNEL_SESSION_LOG(rtp_session->session), SWITCH_LOG_ALERT,
+ "Simulate dropped packet ......... ts: %u seq: %u\n", ntohl(rtp_session->recv_msg.header.ts), ntohs(rtp_session->recv_msg.header.seq));
+ *bytes = 0;
+ }
+ }
+#endif
+
+
if (sync) {
if (!rtp_session->flags[SWITCH_RTP_FLAG_USE_TIMER] && rtp_session->timer.interval) {
switch_core_timer_sync(&rtp_session->timer);
reset_jitter_seq(rtp_session);
}
rtp_session->hot_hits = 0;
+
goto more;
}
stfu_n_destroy(&rtp_session->jb);
}
}
-
+
if (rtp_session->recv_msg.header.version == 2 && *bytes) {
if (rtp_session->vb) {
return status;
}
+static void handle_nack(switch_rtp_t *rtp_session, uint32_t nack)
+{
+ switch_size_t bytes = 0;
+ rtp_msg_t send_msg[1] = {{{0}}};
+ uint16_t seq = (uint16_t) (nack & 0xFFFF);
+ int i;
+ const char *tx_host = NULL;
+ const char *old_host = NULL;
+ const char *my_host = NULL;
+ char bufa[30], bufb[30], bufc[30];
+
+ if (!(rtp_session->flags[SWITCH_RTP_FLAG_NACK] && rtp_session->vbw)) {
+ return; /* not enabled */
+ }
+
+ if (rtp_session->flags[SWITCH_RTP_FLAG_DEBUG_RTP_WRITE]) {
+ tx_host = switch_get_addr(bufa, sizeof(bufa), rtp_session->from_addr);
+ old_host = switch_get_addr(bufb, sizeof(bufb), rtp_session->remote_addr);
+ my_host = switch_get_addr(bufc, sizeof(bufc), rtp_session->local_addr);
+ }
+
+ switch_log_printf(SWITCH_CHANNEL_SESSION_LOG(rtp_session->session), SWITCH_LOG_DEBUG, "Got NACK [%u][0x%x] for seq %u\n", nack, nack, ntohs(seq));
+
+ if (switch_vb_get_packet_by_seq(rtp_session->vbw, seq, (switch_rtp_packet_t *) send_msg, &bytes) == SWITCH_STATUS_SUCCESS) {
+
+ if (rtp_session->flags[SWITCH_RTP_FLAG_DEBUG_RTP_WRITE]) {
+ switch_log_printf(SWITCH_CHANNEL_SESSION_LOG_CLEAN(rtp_session->session), SWITCH_LOG_CONSOLE,
+ "X %s b=%4ld %s:%u %s:%u %s:%u pt=%d ts=%u seq=%u m=%d\n",
+ rtp_session->session ? switch_channel_get_name(switch_core_session_get_channel(rtp_session->session)) : "NoName",
+ (long) bytes,
+ my_host, switch_sockaddr_get_port(rtp_session->local_addr),
+ old_host, rtp_session->remote_port,
+ tx_host, switch_sockaddr_get_port(rtp_session->from_addr),
+ send_msg->header.pt, ntohl(send_msg->header.ts), ntohs(send_msg->header.seq), send_msg->header.m);
+
+ }
+ switch_rtp_write_raw(rtp_session, (void *) send_msg, &bytes, SWITCH_FALSE);
+ } else {
+ switch_log_printf(SWITCH_CHANNEL_SESSION_LOG(rtp_session->session), SWITCH_LOG_DEBUG, "Cannot send NACK for seq %u\n", ntohs(seq));
+ }
+
+ for (i = 0; i < 16; i++) {
+ if ((nack & (1 << (16 + i)))) {
+ switch_log_printf(SWITCH_CHANNEL_SESSION_LOG(rtp_session->session), SWITCH_LOG_DEBUG, "Also Got NACK for seq %u\n", ntohs(seq) + i);
+ if (switch_vb_get_packet_by_seq(rtp_session->vbw, htons(ntohs(seq) + i), (switch_rtp_packet_t *) &send_msg, &bytes) == SWITCH_STATUS_SUCCESS) {
+ if (rtp_session->flags[SWITCH_RTP_FLAG_DEBUG_RTP_WRITE]) {
+ switch_log_printf(SWITCH_CHANNEL_SESSION_LOG_CLEAN(rtp_session->session), SWITCH_LOG_CONSOLE,
+ "X %s b=%4ld %s:%u %s:%u %s:%u pt=%d ts=%u seq=%u m=%d\n",
+ rtp_session->session ? switch_channel_get_name(switch_core_session_get_channel(rtp_session->session)) : "NoName",
+ (long) bytes,
+ my_host, switch_sockaddr_get_port(rtp_session->local_addr),
+ old_host, rtp_session->remote_port,
+ tx_host, switch_sockaddr_get_port(rtp_session->from_addr),
+ send_msg->header.pt, ntohl(send_msg->header.ts), ntohs(send_msg->header.seq), send_msg->header.m);
+
+ }
+ switch_rtp_write_raw(rtp_session, (void *) &send_msg, &bytes, SWITCH_FALSE);
+ } else {
+ switch_log_printf(SWITCH_CHANNEL_SESSION_LOG(rtp_session->session), SWITCH_LOG_DEBUG, "Cannot send NACK for seq %u\n", ntohs(seq) + i);
+ }
+ }
+ }
+}
+
static switch_status_t process_rtcp_report(switch_rtp_t *rtp_session, rtcp_msg_t *msg, switch_size_t bytes)
{
switch_status_t status = SWITCH_STATUS_FALSE;
- switch_log_printf(SWITCH_CHANNEL_SESSION_LOG(rtp_session->session), SWITCH_LOG_CRIT,
+ switch_log_printf(SWITCH_CHANNEL_SESSION_LOG(rtp_session->session), SWITCH_LOG_DEBUG1,
"RTCP packet bytes %" SWITCH_SIZE_T_FMT " type %d pad %d\n",
bytes, msg->header.type, msg->header.p);
(msg->header.type == 205 || //RTPFB
msg->header.type == 206)) {//PSFB
rtcp_ext_msg_t *extp = (rtcp_ext_msg_t *) msg;
- switch_log_printf(SWITCH_CHANNEL_SESSION_LOG(rtp_session->session), SWITCH_LOG_CRIT, "PICKED UP XRTCP type: %d fmt: %d\n",
+ switch_log_printf(SWITCH_CHANNEL_SESSION_LOG(rtp_session->session), SWITCH_LOG_DEBUG, "PICKED UP XRTCP type: %d fmt: %d\n",
msg->header.type, extp->header.fmt);
- if ((extp->header.fmt == 4) || (extp->header.fmt == 1)) { /* FIR || PLI */
+ if (msg->header.type == 206 && (extp->header.fmt == 4 || extp->header.fmt == 1)) { /* FIR || PLI */
switch_core_media_gen_key_frame(rtp_session->session);
switch_channel_set_flag(switch_core_session_get_channel(rtp_session->session), CF_VIDEO_REFRESH_REQ);
}
+
+ if (msg->header.type == 205 && extp->header.fmt == 1) { /*NACK*/
+ uint32_t *nack = (uint32_t *) extp->body;
+ int i;
+
+ switch_log_printf(SWITCH_CHANNEL_SESSION_LOG(rtp_session->session), SWITCH_LOG_DEBUG, "Got NACK count %d\n", ntohs(extp->header.length) - 2);
+
+ for (i = 0; i < ntohs(extp->header.length) - 2; i++) {
+ handle_nack(rtp_session, *nack);
+ nack++;
+ }
+ }
+
} else
if (msg->header.type == 200 || msg->header.type == 201) {
if (report_block->lsr && !rtp_session->flags[SWITCH_RTP_FLAG_RTCP_PASSTHRU]) {
switch_time_exp_gmt(&now_hr,now);
/* Calculating RTT = A - DLSR - LSR */
- switch_log_printf(SWITCH_CHANNEL_SESSION_LOG(rtp_session->session), SWITCH_LOG_DEBUG,
+ switch_log_printf(SWITCH_CHANNEL_SESSION_LOG(rtp_session->session), SWITCH_LOG_DEBUG1,
"Receiving an RTCP packet\n[%04d-%02d-%02d %02d:%02d:%02d.%d] SSRC[%u]\n"
"RTT[%f] A[%u] - DLSR[%u] - LSR[%u]\n",
1900 + now_hr.tm_year, now_hr.tm_mday, now_hr.tm_mon, now_hr.tm_hour, now_hr.tm_min, now_hr.tm_sec, now_hr.tm_usec,
pt = 200000;
}
- if (rtp_session->vb && switch_vb_poll(rtp_session->vb)) {
- pt = 1000;
- force = 1;
- }
+ //if (rtp_session->vb && switch_vb_poll(rtp_session->vb)) {
+ // pt = 1000;
+ // force = 1;
+ //}
poll_status = switch_poll(rtp_session->read_pollfd, 1, &fdr, pt);
my_host = switch_get_addr(bufc, sizeof(bufc), rtp_session->local_addr);
switch_log_printf(SWITCH_CHANNEL_SESSION_LOG_CLEAN(rtp_session->session), SWITCH_LOG_CONSOLE,
- "W %s b=%4ld %s:%u %s:%u %s:%u pt=%d ts=%u m=%d\n",
+ "W %s b=%4ld %s:%u %s:%u %s:%u pt=%d ts=%u seq=%u m=%d\n",
rtp_session->session ? switch_channel_get_name(switch_core_session_get_channel(rtp_session->session)) : "NoName",
(long) bytes,
my_host, switch_sockaddr_get_port(rtp_session->local_addr),
old_host, rtp_session->remote_port,
tx_host, switch_sockaddr_get_port(rtp_session->from_addr),
- send_msg->header.pt, ntohl(send_msg->header.ts), send_msg->header.m);
+ send_msg->header.pt, ntohl(send_msg->header.ts), ntohs(send_msg->header.seq), send_msg->header.m);
}
+
+ if (rtp_session->flags[SWITCH_RTP_FLAG_NACK]) {
+ if (!rtp_session->vbw) {
+ switch_vb_create(&rtp_session->vbw, 5, 5);
+ //switch_vb_debug_level(rtp_session->vbw, 10);
+ }
+ switch_vb_put_packet(rtp_session->vbw, (switch_rtp_packet_t *)send_msg, bytes);
+ }
+#ifdef RTP_WRITE_PLOSS
+ {
+ int r = (rand() % 10000) + 1;
+
+ if (r <= 200) {
+ switch_log_printf(SWITCH_CHANNEL_SESSION_LOG(rtp_session->session), SWITCH_LOG_ALERT,
+ "Simulate dropping packet ......... ts: %u seq: %u\n", ntohl(send_msg->header.ts), ntohs(send_msg->header.seq));
+ } else {
+ if (switch_socket_sendto(rtp_session->sock_output, rtp_session->remote_addr, 0, (void *) send_msg, &bytes) != SWITCH_STATUS_SUCCESS) {
+ rtp_session->seq--;
+ ret = -1;
+ goto end;
+ }
+ }
+ }
+#else
+
if (switch_socket_sendto(rtp_session->sock_output, rtp_session->remote_addr, 0, (void *) send_msg, &bytes) != SWITCH_STATUS_SUCCESS) {
rtp_session->seq--;
ret = -1;
goto end;
}
-
+#endif
rtp_session->last_write_ts = this_ts;
rtp_session->flags[SWITCH_RTP_FLAG_RESET] = 0;
bytes = rtp_header_len + datalen;
+ if (switch_rtp_write_raw(rtp_session, (void *) &rtp_session->write_msg, &bytes, SWITCH_TRUE) != SWITCH_STATUS_SUCCESS) {
+ rtp_session->seq--;
+ ret = -1;
+ goto end;
+ }
+
+ if (((*flags) & SFF_RTP_HEADER)) {
+ rtp_session->last_write_ts = ts;
+ rtp_session->flags[SWITCH_RTP_FLAG_RESET] = 0;
+ }
+
+ ret = (int) bytes;
+
+ end:
+
+ WRITE_DEC(rtp_session);
+
+ return ret;
+}
+
+
+
+SWITCH_DECLARE(switch_status_t) switch_rtp_write_raw(switch_rtp_t *rtp_session, void *data, switch_size_t *bytes, switch_bool_t process_encryption)
+{
+ switch_status_t status = SWITCH_STATUS_FALSE;
+
+ switch_assert(bytes);
+
+ if (!switch_rtp_ready(rtp_session) || !rtp_session->remote_addr || *bytes > SWITCH_RTP_MAX_BUF_LEN) {
+ return status;
+ }
+
+ if (!rtp_write_ready(rtp_session, *bytes, __LINE__)) {
+ return SWITCH_STATUS_NOT_INITALIZED;
+ }
+
+ WRITE_INC(rtp_session);
+
+ if (process_encryption) {
+ process_encryption = SWITCH_FALSE;
#ifdef ENABLE_SRTP
- if (rtp_session->flags[SWITCH_RTP_FLAG_SECURE_SEND]) {
+ if (rtp_session->flags[SWITCH_RTP_FLAG_SECURE_SEND]) {
- int sbytes = (int) bytes;
- err_status_t stat;
-
- if (rtp_session->flags[SWITCH_RTP_FLAG_SECURE_SEND_RESET]) {
- switch_rtp_clear_flag(rtp_session, SWITCH_RTP_FLAG_SECURE_SEND_RESET);
- srtp_dealloc(rtp_session->send_ctx[rtp_session->srtp_idx_rtp]);
- rtp_session->send_ctx[rtp_session->srtp_idx_rtp] = NULL;
- if ((stat = srtp_create(&rtp_session->send_ctx[rtp_session->srtp_idx_rtp], &rtp_session->send_policy[rtp_session->srtp_idx_rtp]))) {
- switch_log_printf(SWITCH_CHANNEL_SESSION_LOG(rtp_session->session), SWITCH_LOG_ERROR, "Error! RE-Activating Secure RTP SEND\n");
- ret = -1;
- goto end;
- } else {
- switch_log_printf(SWITCH_CHANNEL_SESSION_LOG(rtp_session->session), SWITCH_LOG_INFO, "RE-Activating Secure RTP SEND\n");
+ int sbytes = (int) *bytes;
+ err_status_t stat;
+
+ if (rtp_session->flags[SWITCH_RTP_FLAG_SECURE_SEND_RESET]) {
+ switch_rtp_clear_flag(rtp_session, SWITCH_RTP_FLAG_SECURE_SEND_RESET);
+ srtp_dealloc(rtp_session->send_ctx[rtp_session->srtp_idx_rtp]);
+ rtp_session->send_ctx[rtp_session->srtp_idx_rtp] = NULL;
+ if ((stat = srtp_create(&rtp_session->send_ctx[rtp_session->srtp_idx_rtp], &rtp_session->send_policy[rtp_session->srtp_idx_rtp]))) {
+ switch_log_printf(SWITCH_CHANNEL_SESSION_LOG(rtp_session->session), SWITCH_LOG_ERROR, "Error! RE-Activating Secure RTP SEND\n");
+ status = SWITCH_STATUS_FALSE;
+ goto end;
+ } else {
+ switch_log_printf(SWITCH_CHANNEL_SESSION_LOG(rtp_session->session), SWITCH_LOG_INFO, "RE-Activating Secure RTP SEND\n");
+ }
}
- }
- stat = srtp_protect(rtp_session->send_ctx[rtp_session->srtp_idx_rtp], &rtp_session->write_msg.header, &sbytes);
- if (stat) {
- switch_log_printf(SWITCH_CHANNEL_SESSION_LOG(rtp_session->session), SWITCH_LOG_ERROR, "Error: SRTP protection failed with code %d\n", stat);
+ stat = srtp_protect(rtp_session->send_ctx[rtp_session->srtp_idx_rtp], &rtp_session->write_msg.header, &sbytes);
+ if (stat) {
+ switch_log_printf(SWITCH_CHANNEL_SESSION_LOG(rtp_session->session), SWITCH_LOG_ERROR, "Error: SRTP protection failed with code %d\n", stat);
+ }
+ *bytes = sbytes;
}
- bytes = sbytes;
- }
#endif
#ifdef ENABLE_ZRTP
- /* ZRTP Send */
- if (zrtp_on && !rtp_session->flags[SWITCH_RTP_FLAG_PROXY_MEDIA]) {
- unsigned int sbytes = (int) bytes;
- zrtp_status_t stat = zrtp_status_fail;
+ /* ZRTP Send */
+ if (zrtp_on && !rtp_session->flags[SWITCH_RTP_FLAG_PROXY_MEDIA]) {
+ unsigned int sbytes = (int) *bytes;
+ zrtp_status_t stat = zrtp_status_fail;
- stat = zrtp_process_rtp(rtp_session->zrtp_stream, (void *) &rtp_session->write_msg, &sbytes);
+ stat = zrtp_process_rtp(rtp_session->zrtp_stream, (void *) &rtp_session->write_msg, &sbytes);
- switch (stat) {
- case zrtp_status_ok:
- break;
- case zrtp_status_drop:
- switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "Error: zRTP protection drop with code %d\n", stat);
- ret = (int) bytes;
- goto end;
- break;
- case zrtp_status_fail:
- switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "Error: zRTP protection fail with code %d\n", stat);
- break;
- default:
- break;
- }
+ switch (stat) {
+ case zrtp_status_ok:
+ break;
+ case zrtp_status_drop:
+ switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "Error: zRTP protection drop with code %d\n", stat);
+ ret = SWITCH_STATUS_SUCCESS;
+ goto end;
+ break;
+ case zrtp_status_fail:
+ switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "Error: zRTP protection fail with code %d\n", stat);
+ break;
+ default:
+ break;
+ }
- bytes = sbytes;
- }
+ *bytes = sbytes;
+ }
#endif
-
- if (switch_socket_sendto(rtp_session->sock_output, rtp_session->remote_addr, 0, (void *) &rtp_session->write_msg, &bytes) != SWITCH_STATUS_SUCCESS) {
- rtp_session->seq--;
- ret = -1;
- goto end;
- }
-
- if (((*flags) & SFF_RTP_HEADER)) {
- rtp_session->last_write_ts = ts;
- rtp_session->flags[SWITCH_RTP_FLAG_RESET] = 0;
}
- ret = (int) bytes;
+ status = switch_socket_sendto(rtp_session->sock_output, rtp_session->remote_addr, 0, data, bytes);
end:
WRITE_DEC(rtp_session);
- return ret;
+ return status;
}
SWITCH_DECLARE(uint32_t) switch_rtp_get_ssrc(switch_rtp_t *rtp_session)
#define MAX_MISSING_SEQ 20
#define vb_debug(_vb, _level, _format, ...) if (_vb->debug_level >= _level) switch_log_printf(SWITCH_CHANNEL_LOG_CLEAN, SWITCH_LOG_ALERT, "VB:%p level:%d line:%d ->" _format, (void *) _vb, _level, __LINE__, __VA_ARGS__)
+struct switch_vb_s;
+
typedef struct switch_vb_node_s {
- struct switch_vb_frame_s *parent;
+ struct switch_vb_s *parent;
switch_rtp_packet_t packet;
uint32_t len;
uint8_t visible;
struct switch_vb_node_s *next;
} switch_vb_node_t;
-typedef struct switch_vb_frame_s {
- struct switch_vb_s *parent;
- struct switch_vb_node_s *node_list;
- uint32_t ts;
- uint32_t visible_nodes;
- uint8_t visible;
- uint8_t complete;
- uint8_t mark;
- struct switch_vb_frame_s *next;
- uint16_t min_seq;
- uint16_t max_seq;
-} switch_vb_frame_t;
-
struct switch_vb_s {
- struct switch_vb_frame_s *frame_list;
- struct switch_vb_frame_s *cur_read_frame;
- struct switch_vb_frame_s *cur_write_frame;
- uint32_t last_read_ts;
- uint32_t last_read_seq;
+ struct switch_vb_node_s *node_list;
uint32_t last_target_seq;
- uint32_t last_wrote_ts;
- uint32_t last_wrote_seq;
+ uint32_t highest_read_ts;
+ uint32_t highest_read_seq;
+ uint32_t highest_wrote_ts;
+ uint32_t highest_wrote_seq;
uint16_t target_seq;
uint16_t seq_out;
- uint32_t visible_frames;
+ uint32_t visible_nodes;
uint32_t total_frames;
uint32_t complete_frames;
uint32_t frame_len;
uint32_t min_frame_len;
uint32_t max_frame_len;
+ uint8_t write_init;
+ uint8_t read_init;
uint8_t debug_level;
- switch_timer_t timer;
- int cur_errs;
+ uint16_t next_seq;
+ switch_inthash_t *missing_seq_hash;
+ switch_inthash_t *node_hash;
};
-static inline switch_vb_node_t *new_node(switch_vb_frame_t *frame)
+static inline switch_vb_node_t *new_node(switch_vb_t *vb)
{
switch_vb_node_t *np, *last = NULL;
- for (np = frame->node_list; np; np = np->next) {
+ for (np = vb->node_list; np; np = np->next) {
if (!np->visible) {
break;
}
if (!np) {
switch_zmalloc(np, sizeof(*np));
- np->parent = frame;
if (last) {
last->next = np;
} else {
- frame->node_list = np;
+ vb->node_list = np;
}
}
switch_assert(np);
np->visible = 1;
- np->parent->visible_nodes++;
+ vb->visible_nodes++;
+ np->parent = vb;
return np;
}
-static inline void add_node(switch_vb_frame_t *frame, switch_rtp_packet_t *packet, switch_size_t len)
+static inline switch_vb_node_t *find_seq(switch_vb_t *vb, uint16_t seq)
{
- switch_vb_node_t *node = new_node(frame);
- uint16_t seq = ntohs(packet->header.seq);
-
- node->packet = *packet;
- node->len = len;
- memcpy(node->packet.body, packet->body, len);
-
- if (!frame->min_seq ||seq < ntohs(frame->min_seq)) {
- frame->min_seq = packet->header.seq;
- }
-
- if (seq > ntohs(frame->max_seq)) {
- frame->max_seq = packet->header.seq;
- }
-
- vb_debug(frame->parent, (packet->header.m ? 1 : 2), "PUT packet last_ts:%u ts:%u seq:%u%s\n",
- ntohl(frame->parent->last_wrote_ts), ntohl(node->packet.header.ts), ntohs(node->packet.header.seq), packet->header.m ? " <MARK>" : "");
-
-
- if (packet->header.m) {
- frame->mark = 1;
- }
-
- if ((frame->parent->last_wrote_ts && frame->parent->last_wrote_ts != node->packet.header.ts)) {
- frame->complete = 1;
- frame->parent->complete_frames++;
+ switch_vb_node_t *np;
+ for (np = vb->node_list; np; np = np->next) {
+ if (!np->visible) continue;
+
+ if (ntohs(np->packet.header.seq) == ntohs(seq)) {
+ return np;
+ }
}
- frame->parent->last_wrote_ts = packet->header.ts;
- frame->parent->last_wrote_seq = packet->header.seq;
+ return NULL;
}
static inline void hide_node(switch_vb_node_t *node)
{
- if (node->visible) {
- node->visible = 0;
- node->parent->visible_nodes--;
- }
+ node->visible = 0;
+ node->parent->visible_nodes--;
+ switch_core_inthash_delete(node->parent->node_hash, node->packet.header.seq);
}
-static inline void hide_nodes(switch_vb_frame_t *frame)
+static inline void hide_nodes(switch_vb_t *vb)
{
switch_vb_node_t *np;
- for (np = frame->node_list; np; np = np->next) {
+ for (np = vb->node_list; np; np = np->next) {
hide_node(np);
}
}
-static inline void hide_frame(switch_vb_frame_t *frame)
+static inline void drop_ts(switch_vb_t *vb, uint32_t ts)
{
- vb_debug(frame->parent, 2, "Hide frame ts: %u\n", ntohl(frame->ts));
+ switch_vb_node_t *np;
+ int x = 0;
- if (frame->visible) {
- frame->visible = 0;
- frame->parent->visible_frames--;
- }
+ for (np = vb->node_list; np; np = np->next) {
+ if (!np->visible) continue;
- if (frame->complete) {
- frame->parent->complete_frames--;
- frame->complete = 0;
+ if (ts == np->packet.header.ts) {
+ hide_node(np);
+ x++;
+ }
}
- frame->min_seq = frame->max_seq = 0;
-
- hide_nodes(frame);
+ if (x) vb->complete_frames--;
}
-static inline switch_vb_frame_t *new_frame(switch_vb_t *vb, switch_rtp_packet_t *packet)
+static inline uint32_t vb_find_lowest_ts(switch_vb_t *vb)
{
- switch_vb_frame_t *fp = NULL, *last = NULL;
- int new = 1;
-
- if (vb->cur_write_frame) {
- if (!vb->cur_write_frame->visible) {
- vb->cur_write_frame = NULL;
- return NULL;
- } else if (vb->cur_write_frame->ts == packet->header.ts) {
- fp = vb->cur_write_frame;
- new = 0;
- }
- }
-
- if (!fp) {
- for (fp = vb->frame_list; fp; fp = fp->next) {
- if (fp->ts == packet->header.ts) {
- if (!fp->visible) {
- return NULL;
- } else {
- new = 0;
- break;
- }
- }
- }
- }
-
- if (!fp) {
- for (fp = vb->frame_list; fp; fp = fp->next) {
- if (!fp->visible) {
- break;
- }
- last = fp;
- }
- }
-
- if (!fp) {
- switch_zmalloc(fp, sizeof(*fp));
- fp->parent = vb;
- vb->total_frames++;
-
- if (last) {
- last->next = fp;
- } else {
- vb->frame_list = fp;
- }
- }
-
- switch_assert(fp);
-
- if (new) {
- vb->visible_frames++;
- fp->visible = 1;
- fp->complete = 0;
- fp->ts = packet->header.ts;
- fp->min_seq = fp->max_seq = 0;
- fp->mark = 0;
- }
-
- vb->cur_write_frame = fp;
+ switch_vb_node_t *np, *lowest = NULL;
- return fp;
-
-}
+ for (np = vb->node_list; np; np = np->next) {
+ if (!np->visible) continue;
-static inline int frame_contains_seq(switch_vb_frame_t *frame, uint16_t target_seq, switch_vb_node_t **nodep)
-{
- uint16_t seq = ntohs(target_seq);
- switch_vb_node_t *np;
-
- for (np = frame->node_list; np; np = np->next) {
- if (!np->visible) {
- continue;
- }
- //vb_debug(frame->parent, 10, " CMP %u %u/%u\n", ntohl(frame->ts), ntohs(np->packet.header.seq), seq);
- if (ntohs(np->packet.header.seq) == seq) {
- //vb_debug(frame->parent, 10, " MATCH %u %u v:%d\n", ntohs(np->packet.header.seq), seq, np->visible);
- if (nodep) {
- *nodep = np;
- }
- return 1;
+ if (!lowest || ntohl(lowest->packet.header.ts) > ntohl(np->packet.header.ts)) {
+ lowest = np;
}
}
- return 0;
+ return lowest ? lowest->packet.header.ts : 0;
}
-static inline void increment_seq(switch_vb_t *vb)
+static inline void drop_oldest_frame(switch_vb_t *vb)
{
- vb->target_seq = htons((ntohs(vb->target_seq) + 1));
-}
+ uint32_t ts = vb_find_lowest_ts(vb);
-static inline void set_read_seq(switch_vb_t *vb, uint16_t seq)
-{
- vb->last_target_seq = seq;
- vb->target_seq = htons((ntohs(vb->last_target_seq) + 1));
+ drop_ts(vb, ts);
+ vb_debug(vb, 1, "Dropping oldest frame ts:%u\n", ntohl(ts));
}
-static inline switch_status_t next_frame(switch_vb_t *vb, switch_vb_node_t **nodep)
+static inline void add_node(switch_vb_t *vb, switch_rtp_packet_t *packet, switch_size_t len)
{
- switch_vb_frame_t *fp = NULL, *oldest = NULL, *frame_containing_seq = NULL;
-
- if ((fp = vb->cur_read_frame)) {
- if (fp->visible_nodes == 0) {
- hide_frame(fp);
- vb->cur_read_frame = NULL;
- }
- }
+ switch_vb_node_t *node = new_node(vb);
+ node->packet = *packet;
+ node->len = len;
+ memcpy(node->packet.body, packet->body, len);
- if ((fp = vb->cur_read_frame)) {
- int ok = 1;
+ switch_core_inthash_insert(vb->node_hash, node->packet.header.seq, node);
- if (!fp->visible || fp->visible_nodes == 0) {
- ok = 0;
- } else {
- if (vb->target_seq) {
- if (frame_contains_seq(fp, vb->target_seq, nodep)) {
- vb_debug(vb, 2, "CUR FRAME %u CONTAINS REQUESTED SEQ %d\n", ntohl(fp->ts), ntohs(vb->target_seq));
- frame_containing_seq = fp;
- goto end;
- } else {
- ok = 0;
- }
- }
- }
+ vb_debug(vb, (packet->header.m ? 1 : 2), "PUT packet last_ts:%u ts:%u seq:%u%s\n",
+ ntohl(vb->highest_wrote_ts), ntohl(node->packet.header.ts), ntohs(node->packet.header.seq), packet->header.m ? " <MARK>" : "");
- if (!ok) {
- vb_debug(vb, 2, "DONE WITH CUR FRAME %u v: %d c: %d\n", ntohl(fp->ts), fp->visible, fp->complete);
- vb->cur_read_frame = NULL;
- }
+ if (!vb->write_init || ntohs(packet->header.seq) > ntohs(vb->highest_wrote_seq) ||
+ (ntohs(vb->highest_wrote_seq) > USHRT_MAX - 10 && ntohs(packet->header.seq) <= 10) ) {
+ vb->highest_wrote_seq = packet->header.seq;
}
- do {
- *nodep = NULL;
-
- for (fp = vb->frame_list; fp; fp = fp->next) {
- if (!fp->visible || !fp->complete) {
- continue;
- }
-
- if (vb->target_seq) {
- if (frame_contains_seq(fp, vb->target_seq, nodep)) {
- vb_debug(vb, 2, "FOUND FRAME %u CONTAINING SEQ %d\n", ntohl(fp->ts), ntohs(vb->target_seq));
- frame_containing_seq = fp;
- goto end;
- }
- }
-
- if ((!oldest || htonl(oldest->ts) > htonl(fp->ts))) {
- oldest = fp;
- }
- }
-
- if (!frame_containing_seq && vb->target_seq) {
- if (ntohs(vb->target_seq) - ntohs(vb->last_target_seq) > MAX_MISSING_SEQ) {
- vb_debug(vb, 1, "FOUND NO FRAMES CONTAINING SEQ %d. Too many failures....\n", ntohs(vb->target_seq));
- switch_vb_reset(vb, SWITCH_FALSE);
- } else {
- vb_debug(vb, 2, "FOUND NO FRAMES CONTAINING SEQ %d. Try next one\n", ntohs(vb->target_seq));
- increment_seq(vb);
- vb->cur_errs++;
- }
- }
- } while (!frame_containing_seq && vb->target_seq);
-
- end:
-
- if (frame_containing_seq) {
- vb->cur_read_frame = frame_containing_seq;
- if (nodep && *nodep) {
- hide_node(*nodep);
- set_read_seq(vb, (*nodep)->packet.header.seq);
- }
- } else if (oldest) {
- vb->cur_read_frame = oldest;
- } else {
- vb->cur_read_frame = NULL;
+ if (vb->write_init && htons(packet->header.seq) >= htons(vb->highest_wrote_seq) && (ntohl(node->packet.header.ts) > ntohl(vb->highest_wrote_ts))) {
+ vb->complete_frames++;
+ vb_debug(vb, 2, "WRITE frame ts: %u complete=%u/%u n:%u\n", ntohl(node->packet.header.ts), vb->complete_frames , vb->frame_len, vb->visible_nodes);
+ vb->highest_wrote_ts = packet->header.ts;
+ } else if (!vb->write_init) {
+ vb->highest_wrote_ts = packet->header.ts;
}
+
+ if (!vb->write_init) vb->write_init = 1;
- if (vb->cur_read_frame) {
- return SWITCH_STATUS_SUCCESS;
+ if (vb->complete_frames > vb->max_frame_len) {
+ drop_oldest_frame(vb);
}
-
- return SWITCH_STATUS_NOTFOUND;
}
-static inline switch_vb_node_t *frame_find_next_seq(switch_vb_frame_t *frame)
+static inline void increment_seq(switch_vb_t *vb)
{
- switch_vb_node_t *np;
-
- for (np = frame->node_list; np; np = np->next) {
- if (!np->visible) continue;
-
- if (ntohs(np->packet.header.seq) == ntohs(frame->parent->target_seq)) {
- hide_node(np);
- set_read_seq(frame->parent, np->packet.header.seq);
- return np;
- }
- }
-
- return NULL;
+ vb->target_seq = htons((ntohs(vb->target_seq) + 1));
}
+static inline void set_read_seq(switch_vb_t *vb, uint16_t seq)
+{
+ vb->last_target_seq = seq;
+ vb->target_seq = htons((ntohs(vb->last_target_seq) + 1));
+}
-static inline switch_vb_node_t *frame_find_lowest_seq(switch_vb_frame_t *frame)
+static inline switch_vb_node_t *vb_find_lowest_seq(switch_vb_t *vb)
{
switch_vb_node_t *np, *lowest = NULL;
- for (np = frame->node_list; np; np = np->next) {
+ for (np = vb->node_list; np; np = np->next) {
if (!np->visible) continue;
if (!lowest || ntohs(lowest->packet.header.seq) > ntohs(np->packet.header.seq)) {
- hide_node(np);
lowest = np;
}
}
- if (lowest) {
- set_read_seq(frame->parent, lowest->packet.header.seq);
- }
-
return lowest;
}
-static inline switch_status_t next_frame_packet(switch_vb_t *vb, switch_vb_node_t **nodep)
+static inline switch_status_t vb_next_packet(switch_vb_t *vb, switch_vb_node_t **nodep)
{
- switch_vb_node_t *node = NULL;
+ switch_vb_node_t *np = NULL, *node = NULL;
switch_status_t status;
- if ((status = next_frame(vb, &node) != SWITCH_STATUS_SUCCESS)) {
- return status;
- }
-
- if (!node) {
- if (vb->target_seq) {
- vb_debug(vb, 2, "Search for next packet %u cur ts: %u\n", htons(vb->target_seq), htonl(vb->cur_read_frame->ts));
- node = frame_find_next_seq(vb->cur_read_frame);
+ if (np) status = 0, status++;
+
+ if (!vb->target_seq) {
+ if ((node = vb_find_lowest_seq(vb))) {
+ vb_debug(vb, 2, "No target seq using seq: %u as a starting point\n", ntohs(node->packet.header.seq));
} else {
- node = frame_find_lowest_seq(vb->cur_read_frame);
- vb_debug(vb, 2, "Find lowest seq frame ts: %u seq: %u\n", ntohl(vb->cur_read_frame->ts), ntohs(node->packet.header.seq));
+ vb_debug(vb, 1, "%s", "No nodes available....\n");
+ }
+ } else if ((node = switch_core_inthash_find(vb->node_hash, vb->target_seq))) {
+ vb_debug(vb, 2, "FOUND desired seq: %u\n", ntohs(vb->target_seq));
+ } else {
+ int x;
+
+ vb_debug(vb, 2, "MISSING desired seq: %u\n", ntohs(vb->target_seq));
+
+ for (x = 0; x < 10; x++) {
+ increment_seq(vb);
+ if ((node = switch_core_inthash_find(vb->node_hash, vb->target_seq))) {
+ vb_debug(vb, 2, "FOUND incremental seq: %u\n", ntohs(vb->target_seq));
+ break;
+ } else {
+ vb_debug(vb, 2, "MISSING incremental seq: %u\n", ntohs(vb->target_seq));
+ }
}
}
*nodep = node;
if (node) {
+ set_read_seq(vb, node->packet.header.seq);
return SWITCH_STATUS_SUCCESS;
}
}
-static inline void free_nodes(switch_vb_frame_t *frame)
+static inline void free_nodes(switch_vb_t *vb)
{
- switch_vb_node_t *np = frame->node_list, *cur;
+ switch_vb_node_t *np = vb->node_list, *cur;
while(np) {
cur = np;
free(cur);
}
- frame->node_list = NULL;
+ vb->node_list = NULL;
}
-static inline void free_frames(switch_vb_t *vb)
-{
- switch_vb_frame_t *fp = vb->frame_list, *cur = NULL;
-
- while(fp) {
- cur = fp;
- fp = fp->next;
- free_nodes(cur);
- free(cur);
- }
-
- vb->frame_list = NULL;
-}
-
-static inline void do_flush(switch_vb_t *vb)
-{
- switch_vb_frame_t *fp = vb->frame_list;
-
- while(fp) {
- hide_frame(fp);
- fp = fp->next;
- }
-}
SWITCH_DECLARE(int) switch_vb_poll(switch_vb_t *vb)
{
{
vb_debug(vb, 2, "RESET BUFFER flush: %d\n", (int)flush);
-
- if (vb->cur_read_frame) {
- vb->cur_read_frame = NULL;
- }
-
- vb->last_read_ts = 0;
vb->last_target_seq = 0;
vb->target_seq = 0;
if (flush) {
- do_flush(vb);
+ //do_flush(vb);
}
}
-SWITCH_DECLARE(switch_status_t) switch_vb_create(switch_vb_t **vbp, uint32_t min_frame_len, uint32_t max_frame_len, switch_bool_t timer_compensation)
+SWITCH_DECLARE(switch_status_t) switch_vb_create(switch_vb_t **vbp, uint32_t min_frame_len, uint32_t max_frame_len)
{
switch_vb_t *vb;
switch_zmalloc(vb, sizeof(*vb));
vb->min_frame_len = vb->frame_len = min_frame_len;
vb->max_frame_len = max_frame_len;
//vb->seq_out = (uint16_t) rand();
-
- if (timer_compensation) { /* rewrite timestamps and seq as they are read to hide packet loss */
- switch_core_timer_init(&vb->timer, "soft", 1, 90, NULL);
- }
+ switch_core_inthash_init(&vb->missing_seq_hash);
+ switch_core_inthash_init(&vb->node_hash);
*vbp = vb;
{
switch_vb_t *vb = *vbp;
*vbp = NULL;
+
+ switch_core_inthash_destroy(&vb->missing_seq_hash);
+ switch_core_inthash_destroy(&vb->node_hash);
- if (vb->timer.timer_interface) {
- switch_core_timer_destroy(&vb->timer);
+ free_nodes(vb);
+ free(vb);
+
+ return SWITCH_STATUS_SUCCESS;
+}
+
+SWITCH_DECLARE(uint32_t) switch_vb_pop_nack(switch_vb_t *vb)
+{
+ switch_hash_index_t *hi = NULL;
+ uint32_t nack = 0;
+ uint16_t least = 0;
+ int i = 0;
+
+ void *val;
+ const void *var;
+
+ for (hi = switch_core_hash_first(vb->missing_seq_hash); hi; hi = switch_core_hash_next(&hi)) {
+ uint16_t seq;
+
+ switch_core_hash_this(hi, &var, NULL, &val);
+ seq = ntohs(*((uint16_t *) var));
+
+ vb_debug(vb, 3, "WTF ENTRY %u\n", seq);
+
+ if (!least || seq < least) {
+ least = seq;
+ }
}
- free_frames(vb);
- free(vb);
+ if (least && switch_core_inthash_delete(vb->missing_seq_hash, (uint32_t)htons(least))) {
+ vb_debug(vb, 3, "Found smallest NACKABLE seq %u\n", least);
+ nack = (uint32_t) htons(least);
+
+ for (i = 1; i > 17; i++) {
+ if (switch_core_inthash_delete(vb->missing_seq_hash, (uint32_t)htons(least + i))) {
+ vb_debug(vb, 3, "Found addtl NACKABLE seq %u\n", least + i);
+ nack |= (1 << (16 + i));
+ } else {
+ break;
+ }
+ }
+ }
+
+ return nack;
+}
+
+SWITCH_DECLARE(switch_status_t) switch_vb_push_packet(switch_vb_t *vb, switch_rtp_packet_t *packet, switch_size_t len)
+{
+ add_node(vb, packet, len);
return SWITCH_STATUS_SUCCESS;
}
SWITCH_DECLARE(switch_status_t) switch_vb_put_packet(switch_vb_t *vb, switch_rtp_packet_t *packet, switch_size_t len)
{
- switch_vb_frame_t *frame;
+ uint32_t i;
+ uint16_t want = ntohs(vb->next_seq), got = ntohs(packet->header.seq);
+
+ if (!want) want = got;
-#ifdef VB_PLOSS
- int r = (rand() % 10000) + 1;
- if (r <= 200) {
- vb_debug(vb, 1, "Simulate dropped packet ......... ts: %u seq: %u\n", ntohl(packet->header.ts), ntohs(packet->header.seq));
- return SWITCH_STATUS_SUCCESS;
+ if (got > want) {
+ for (i = want; i < got; i++) {
+ vb_debug(vb, 2, "MARK SEQ MISSING %u\n", i);
+ switch_core_inthash_insert(vb->missing_seq_hash, (uint32_t)htons(i), (void *)SWITCH_TRUE);
+ }
+ } else {
+ if (switch_core_inthash_delete(vb->missing_seq_hash, (uint32_t)htons(got))) {
+ vb_debug(vb, 2, "MARK SEQ FOUND %u\n", got);
+ }
+ }
+
+ if (got >= want) {
+ vb->next_seq = htons(ntohs(packet->header.seq) + 1);
}
-#endif
+
+ add_node(vb, packet, len);
+
+ return SWITCH_STATUS_SUCCESS;
+}
+
+SWITCH_DECLARE(switch_status_t) switch_vb_get_packet_by_seq(switch_vb_t *vb, uint16_t seq, switch_rtp_packet_t *packet, switch_size_t *len)
+{
+ switch_vb_node_t *node;
- if ((frame = new_frame(vb, packet))) {
- add_node(frame, packet, len);
+ if ((node = switch_core_inthash_find(vb->node_hash, seq))) {
+ vb_debug(vb, 2, "Found buffered seq: %u\n", ntohs(seq));
+ *packet = node->packet;
+ *len = node->len;
+ memcpy(packet->body, node->packet.body, node->len);
return SWITCH_STATUS_SUCCESS;
+ } else {
+ vb_debug(vb, 2, "Missing buffered seq: %u\n", ntohs(seq));
}
- return SWITCH_STATUS_IGNORE;
+ return SWITCH_STATUS_NOTFOUND;
}
SWITCH_DECLARE(switch_status_t) switch_vb_get_packet(switch_vb_t *vb, switch_rtp_packet_t *packet, switch_size_t *len)
switch_vb_node_t *node = NULL;
switch_status_t status;
- vb->cur_errs = 0;
+ vb_debug(vb, 2, "GET PACKET %u/%u n:%d\n", vb->complete_frames , vb->frame_len, vb->visible_nodes);
if (vb->complete_frames < vb->frame_len) {
vb_debug(vb, 2, "BUFFERING %u/%u\n", vb->complete_frames , vb->frame_len);
return SWITCH_STATUS_MORE_DATA;
}
- if ((status = next_frame_packet(vb, &node)) == SWITCH_STATUS_SUCCESS) {
- vb_debug(vb, 2, "Found next frame cur ts: %u seq: %u\n", htonl(vb->cur_read_frame->ts), htons(node->packet.header.seq));
+ if ((status = vb_next_packet(vb, &node)) == SWITCH_STATUS_SUCCESS) {
+ vb_debug(vb, 2, "Found next frame cur ts: %u seq: %u\n", htonl(node->packet.header.ts), htons(node->packet.header.seq));
+
+ if (!vb->read_init || ntohs(node->packet.header.seq) > ntohs(vb->highest_read_seq) ||
+ (ntohs(vb->highest_read_seq) > USHRT_MAX - 10 && ntohs(node->packet.header.seq) <= 10) ) {
+ vb->highest_read_seq = node->packet.header.seq;
+ }
+
+ if (vb->read_init && htons(node->packet.header.seq) >= htons(vb->highest_read_seq) && (ntohl(node->packet.header.ts) > ntohl(vb->highest_read_ts))) {
+ vb->complete_frames--;
+ vb_debug(vb, 2, "READ frame ts: %u complete=%u/%u n:%u\n", ntohl(node->packet.header.ts), vb->complete_frames , vb->frame_len, vb->visible_nodes);
+ vb->highest_read_ts = node->packet.header.ts;
+ } else if (!vb->read_init) {
+ vb->highest_read_ts = node->packet.header.ts;
+ }
+
+ if (!vb->read_init) vb->read_init = 1;
+
+
} else {
switch_vb_reset(vb, SWITCH_FALSE);
*packet = node->packet;
*len = node->len;
memcpy(packet->body, node->packet.body, node->len);
-
- if (vb->cur_errs) {
- vb_debug(vb, 1, "One or more Missing SEQ TS %u\n", ntohl(packet->header.ts));
- status = SWITCH_STATUS_BREAK;
- }
-
- vb->last_read_ts = packet->header.ts;
- vb->last_read_seq = packet->header.seq;
-
- if (vb->timer.timer_interface) {
- if (packet->header.m || !vb->timer.samplecount) {
- switch_core_timer_sync(&vb->timer);
- }
- }
-
- if (vb->cur_read_frame && vb->cur_read_frame->visible_nodes == 0 && !packet->header.m) {
- /* force mark bit */
- vb_debug(vb, 1, "LAST PACKET %u WITH NO MARK BIT, ADDIONG MARK BIT\n", ntohl(packet->header.ts));
- packet->header.m = 1;
- status = SWITCH_STATUS_BREAK;
- }
+ hide_node(node);
vb_debug(vb, 1, "GET packet ts:%u seq:%u~%u%s\n", ntohl(packet->header.ts), ntohs(packet->header.seq), vb->seq_out, packet->header.m ? " <MARK>" : "");
//packet->header.seq = htons(vb->seq_out++);
- if (vb->timer.timer_interface) {
- packet->header.ts = htonl(vb->timer.samplecount);
- }
-
return status;
}