uint32_t last_read_ts;
uint32_t last_read_seq;
uint16_t target_seq;
+ uint16_t seq_out;
uint32_t visible_frames;
uint32_t total_frames;
uint32_t complete_frames;
uint32_t min_frame_len;
uint32_t max_frame_len;
uint8_t debug_level;
+ switch_timer_t timer;
};
static inline switch_vb_node_t *new_node(switch_vb_frame_t *frame)
node->len = len;
memcpy(node->packet.body, packet->body, len);
- if (seq < ntohs(frame->min_seq)) {
+ if (!frame->min_seq ||seq < ntohs(frame->min_seq)) {
frame->min_seq = packet->header.seq;
- } else if (seq > ntohs(frame->max_seq)) {
+ }
+
+ if (seq > ntohs(frame->max_seq)) {
frame->max_seq = packet->header.seq;
}
static inline void hide_frame(switch_vb_frame_t *frame)
{
+ vb_debug(frame->parent, 2, "Hide frame ts: %u\n", ntohl(frame->ts));
+
frame->visible = 0;
frame->min_seq = frame->max_seq = 0;
frame->parent->visible_frames--;
if (frame->complete) {
frame->parent->complete_frames--;
+ frame->complete = 0;
}
hide_nodes(frame);
static inline switch_vb_frame_t *new_frame(switch_vb_t *vb, switch_rtp_packet_t *packet)
{
switch_vb_frame_t *fp, *last = NULL;
+ int new = 1;
for (fp = vb->frame_list; fp; fp = fp->next) {
if (fp->ts == packet->header.ts) {
if (fp->complete || !fp->visible) {
return NULL;
} else {
+ new = 0;
break;
}
}
switch_assert(fp);
- vb->visible_frames++;
- fp->visible = 1;
- fp->complete = 0;
- fp->ts = packet->header.ts;
- fp->min_seq = fp->max_seq = 0;
+ if (new) {
+ vb->visible_frames++;
+ fp->visible = 1;
+ fp->complete = 0;
+ fp->ts = packet->header.ts;
+ fp->min_seq = fp->max_seq = 0;
+ }
return fp;
return 0;
}
-static inline switch_vb_frame_t *next_frame(switch_vb_t *vb)
+/* seq_check verifies that every seq is present in the frame but is probably heavy */
+static inline int check_frame(switch_vb_frame_t *frame, switch_bool_t seq_check)
+{
+ switch_vb_node_t *np;
+ int m = 0;
+ uint16_t x = 0, min = ntohs(frame->min_seq), max = ntohs(frame->max_seq);
+
+ for (x = min; x < max; x++) {
+ int ok = 0;
+ for (np = frame->node_list; np; np = np->next) {
+ if (!np->visible) continue;
+ if (np->packet.header.m) {
+ if (!seq_check) {
+ return 1;
+ }
+ m++;
+ }
+
+ if (!seq_check) {
+ ok++;
+ } else {
+ if (ntohs(np->packet.header.seq) == x) {
+ ok++;
+ }
+ }
+ }
+
+ if (!m || !ok) {
+ goto bail;
+ }
+ }
+
+ return 1;
+
+ bail:
+
+
+ vb_debug(frame->parent, 1, "Frame %u failed frame check seq: %u m: %d, discarding\n", ntohl(frame->ts), x, m);
+ hide_frame(frame);
+ return 0;
+
+}
+
+
+static inline switch_status_t next_frame(switch_vb_t *vb)
{
switch_vb_frame_t *fp = NULL, *oldest = NULL, *frame_containing_seq = NULL;
+ vb->cur_read_frame = NULL;
+
for (fp = vb->frame_list; fp; fp = fp->next) {
if (!fp->visible || !fp->complete) {
}
}
+ if (!check_frame(fp, SWITCH_FALSE)) {
+ return SWITCH_STATUS_RESTART;
+ }
+
if ((!oldest || htonl(oldest->ts) > htonl(fp->ts))) {
oldest = fp;
}
}
-
+
if (frame_containing_seq) {
- return frame_containing_seq;
+ vb->cur_read_frame = frame_containing_seq;
+ } else if (oldest) {
+ vb->cur_read_frame = oldest;
}
- return oldest;
+ if (vb->cur_read_frame) {
+ return SWITCH_STATUS_SUCCESS;
+ }
+
+ return SWITCH_STATUS_NOTFOUND;
}
static inline void set_read_seq(switch_vb_t *vb, uint16_t seq)
switch_vb_node_t *node;
if (vb->last_read_seq) {
+ vb_debug(vb, 2, "Search for next packet %u cur ts: %u\n", htons(vb->target_seq), htonl(vb->cur_read_frame->ts));
node = frame_find_next_seq(vb->cur_read_frame);
} else {
- printf("WTF LOWEST ==============\n");
+ vb_debug(vb, 2, "Find lowest seq frame ts: %u\n", ntohl(vb->cur_read_frame->ts));
node = frame_find_lowest_seq(vb->cur_read_frame);
}
}
}
+SWITCH_DECLARE(int) switch_vb_poll(switch_vb_t *vb)
+{
+ return (vb->complete_frames >= vb->frame_len);
+}
SWITCH_DECLARE(int) switch_vb_frame_count(switch_vb_t *vb)
{
}
}
-SWITCH_DECLARE(switch_status_t) switch_vb_create(switch_vb_t **vbp, uint32_t min_frame_len, uint32_t max_frame_len)
+SWITCH_DECLARE(switch_status_t) switch_vb_create(switch_vb_t **vbp, uint32_t min_frame_len, uint32_t max_frame_len, switch_bool_t timer_compensation)
{
switch_vb_t *vb;
switch_zmalloc(vb, sizeof(*vb));
vb->min_frame_len = vb->frame_len = min_frame_len;
vb->max_frame_len = max_frame_len;
+ //vb->seq_out = (uint16_t) rand();
+
+ if (timer_compensation) { /* rewrite timestamps and seq as they are read to hide packet loss */
+ switch_core_timer_init(&vb->timer, "soft", 1, 90, NULL);
+ }
*vbp = vb;
switch_vb_t *vb = *vbp;
*vbp = NULL;
+ if (vb->timer.timer_interface) {
+ switch_core_timer_destroy(&vb->timer);
+ }
+
free_frames(vb);
free(vb);
SWITCH_DECLARE(switch_status_t) switch_vb_put_packet(switch_vb_t *vb, switch_rtp_packet_t *packet, switch_size_t len)
{
switch_vb_frame_t *frame;
-
+
+#ifndef VB_PLOSS
+ int r = (rand() % 100000) + 1;
+ if (r <= 20) {
+ vb_debug(vb, 1, "Simulate dropped packet ......... ts: %u seq: %u\n", ntohl(packet->header.ts), ntohs(packet->header.seq));
+ return SWITCH_STATUS_SUCCESS;
+ }
+#endif
+
if ((frame = new_frame(vb, packet))) {
add_node(frame, packet, len);
return SWITCH_STATUS_SUCCESS;
{
switch_vb_node_t *node = NULL;
int fail = 0;
-
+
if (vb->complete_frames < vb->frame_len) {
vb_debug(vb, 2, "BUFFERING %u/%u\n", vb->complete_frames , vb->frame_len);
return SWITCH_STATUS_MORE_DATA;
do {
if (vb->cur_read_frame) {
- vb_debug(vb, 2, "Search for next frame cur ts: %u\n", htonl(vb->cur_read_frame->ts));
if (!(node = next_frame_packet(vb))) {
- vb_debug(vb, 1, "Cannot find frame cur ts %u ... RESET!\n", htonl(vb->cur_read_frame->ts));
+ vb_debug(vb, 2, "Cannot find next packet %u cur ts: %u\n", htons(vb->target_seq), htonl(vb->cur_read_frame->ts));
switch_vb_reset(vb, SWITCH_FALSE);
fail++;
}
} else {
- if (!(vb->cur_read_frame = next_frame(vb))) {
+ switch_status_t status = next_frame(vb);
+
+ switch(status) {
+ case SWITCH_STATUS_RESTART:
+ vb_debug(vb, 2, "%s", "Error encountered ask for new keyframe\n");
+ return SWITCH_STATUS_RESTART;
+ case SWITCH_STATUS_NOTFOUND:
+ vb_debug(vb, 2, "%s", "No frames found wait for more\n");
+ return SWITCH_STATUS_MORE_DATA;
+ default:
+ vb_debug(vb, 2, "Found next frame cur ts: %u\n", htonl(vb->cur_read_frame->ts));
break;
}
}
- } while (!node && fail < 2);
-
+ } while (!node && fail < 2);
+
if (node) {
*packet = node->packet;
*len = node->len;
memcpy(packet->body, node->packet.body, node->len);
- vb->last_read_ts = node->packet.header.ts;
+ vb->last_read_ts = packet->header.ts;
- vb_debug(vb, 1, "GET packet ts:%u seq:%u\n", ntohl(packet->header.ts), ntohs(packet->header.seq));
+ if (vb->timer.timer_interface) {
+ if (packet->header.m || !vb->timer.samplecount) {
+ switch_core_timer_sync(&vb->timer);
+ }
+ }
+
+ packet->header.ts = htonl(vb->timer.samplecount);
+
+ vb_debug(vb, 1, "GET packet ts:%u seq:%u~%u\n", ntohl(packet->header.ts), ntohs(packet->header.seq), vb->seq_out);
+
+ if (vb->timer.timer_interface) {
+ packet->header.seq = htons(vb->seq_out++);
+ }
if (vb->cur_read_frame && node->packet.header.m) {
hide_frame(vb->cur_read_frame);
}
return SWITCH_STATUS_SUCCESS;
+ } else if (fail) {
+ return SWITCH_STATUS_NOTFOUND;
}
return SWITCH_STATUS_MORE_DATA;