From a78cce2d140e2ef1ea3f5480d444b1bcc7517471 Mon Sep 17 00:00:00 2001 From: Jaroslav Kysela Date: Sun, 14 May 2017 20:25:41 +0200 Subject: [PATCH] ts: rework PCR clock - interpolate the PCR clock gaps from the audio tracks - report dropped out packets by PCR checks - use DTS instead PTS for PCR clock modifications - use DTS instead PTS for invalid packet clock checks --- src/input/mpegts/tsdemux.c | 135 ++++++++++++++++++++++++++++++------- src/parsers.h | 21 ++++++ src/parsers/parsers.c | 63 ++++++++--------- src/service.c | 4 ++ src/service.h | 7 +- 5 files changed, 175 insertions(+), 55 deletions(-) diff --git a/src/input/mpegts/tsdemux.c b/src/input/mpegts/tsdemux.c index a8e812e38..b8d422a25 100644 --- a/src/input/mpegts/tsdemux.c +++ b/src/input/mpegts/tsdemux.c @@ -45,6 +45,109 @@ static void ts_remux(mpegts_service_t *t, const uint8_t *tsb, int len, int errors); static void ts_skip(mpegts_service_t *t, const uint8_t *tsb, int len); +/** + * Extract PCR clocks + */ +static void ts_recv_pcr(mpegts_service_t *t, const uint8_t *tsb) +{ + int64_t pcr; + pcr = (uint64_t)tsb[6] << 25; + pcr |= (uint64_t)tsb[7] << 17; + pcr |= (uint64_t)tsb[8] << 9; + pcr |= (uint64_t)tsb[9] << 1; + pcr |= ((uint64_t)tsb[10] >> 7) & 0x01; + /* handle the broken info using candidate variable */ + if (t->s_current_pcr == PTS_UNSET || t->s_current_pcr_guess || + pts_diff(t->s_current_pcr, pcr) <= 90000 || + (t->s_candidate_pcr != PTS_UNSET && pts_diff(t->s_candidate_pcr, pcr) <= 90000)) { + if (pcr != t->s_current_pcr) { + if (t->s_current_pcr == PTS_UNSET) + tvhtrace(LS_PCR, "%s: initial : %"PRId64, service_nicename((service_t*)t), pcr); + else + tvhtrace(LS_PCR, "%s: change : %"PRId64"%s", service_nicename((service_t*)t), pcr, + t->s_candidate_pcr != PTS_UNSET ? " (candidate)" : ""); + t->s_current_pcr = pcr; + t->s_current_pcr_guess = 0; + } + t->s_candidate_pcr = PTS_UNSET; + } else { + tvhtrace(LS_PCR, "%s: candidate: %"PRId64, service_nicename((service_t*)t), pcr); + t->s_candidate_pcr = pcr; + } +} + +static void ts_recv_pcr_audio + (mpegts_service_t *t, elementary_stream_t *st, const uint8_t *buf, int len) +{ + int64_t dts, pts, d; + int hdr, flags, hlen; + + if (buf[3] != 0xbd && buf[3] != 0xc0) + return; + + if (len < 9) + return; + + buf += 6; + len -= 6; + hdr = buf[0]; + flags = buf[1]; + hlen = buf[2]; + buf += 3; + len -= 3; + + if (len < hlen || (hdr & 0xc0) != 0x80) + return; + + if ((flags & 0xc0) == 0xc0) { + if (hlen < 10) + return; + + pts = getpts(buf); + dts = getpts(buf + 5); + + d = (pts - dts) & PTS_MASK; + if (d > 180000) // More than two seconds of PTS/DTS delta, probably corrupt + return; + + } else if ((flags & 0xc0) == 0x80) { + if (hlen < 5) + return; + + dts = pts = getpts(buf); + } else + return; + + if (st->es_last_pcr == PTS_UNSET) { + d = pts_diff(st->es_last_pcr_dts, dts); + goto set; + } + + if (st->es_last_pcr != t->s_current_pcr) { + st->es_last_pcr = t->s_current_pcr; + st->es_last_pcr_dts = dts; + return; + } + + d = pts_diff(st->es_last_pcr_dts, dts); + if (d == PTS_UNSET || d < 30000) + return; + if (d < 10*90000) + t->s_current_pcr += d; + +set: + if (t->s_current_pcr == PTS_UNSET && d != PTS_UNSET && d < 30000) { + t->s_current_pcr = (dts - 2*90000) & PTS_MASK; + t->s_current_pcr_guess = 1; + } + + tvhtrace(LS_PCR, "%s: audio DTS: %"PRId64" dts %"PRId64" [%s/%d]", + service_nicename((service_t*)t), t->s_current_pcr, dts, + streaming_component_type2txt(st->es_type), st->es_pid); + st->es_last_pcr = t->s_current_pcr; + st->es_last_pcr_dts = dts; +} + /** * Continue processing of transport stream packets */ @@ -55,7 +158,6 @@ ts_recv_packet0 mpegts_service_t *m; int len2, off, pusi, cc, pid, error, errors = 0; const uint8_t *tsb2; - int64_t pcr; service_set_streaming_status_flags((service_t*)t, TSS_MUX_PACKETS); @@ -90,33 +192,18 @@ ts_recv_packet0 if (tsb2[3] & 0x20) { off = tsb2[4] + 5; - if (st->es_pid == t->s_pcr_pid && !error && off > 10 && (tsb2[5] & 0x10) != 0) { - pcr = (uint64_t)tsb2[6] << 25; - pcr |= (uint64_t)tsb2[7] << 17; - pcr |= (uint64_t)tsb2[8] << 9; - pcr |= (uint64_t)tsb2[9] << 1; - pcr |= ((uint64_t)tsb2[10] >> 7) & 0x01; - /* handle the broken info using candidate variable */ - if (t->s_current_pcr == PTS_UNSET || pts_diff(t->s_current_pcr, pcr) <= 4*90000 || - (t->s_candidate_pcr != PTS_UNSET && pts_diff(t->s_candidate_pcr, pcr) <= 4*90000)) { - if (pcr != t->s_current_pcr) { - if (t->s_current_pcr == PTS_UNSET) - tvhtrace(LS_PCR, "%s: PCR initial: %"PRId64, service_nicename((service_t*)t), pcr); - else - tvhtrace(LS_PCR, "%s: PCR change : %"PRId64"%s", service_nicename((service_t*)t), pcr, - t->s_candidate_pcr != PTS_UNSET ? " (candidate)" : ""); - t->s_current_pcr = pcr; - } - t->s_candidate_pcr = PTS_UNSET; - } else { - tvhtrace(LS_PCR, "%s: PCR candidate: %"PRId64, service_nicename((service_t*)t), pcr); - t->s_candidate_pcr = pcr; - } - } + if (st->es_pid == t->s_pcr_pid && !error && off > 10 && + (tsb2[5] & 0x10) != 0 && off <= 188) + ts_recv_pcr(t, tsb2); } else { off = 4; } + if (pusi && !error && off < 188 - 16 && + tsb2[off] == 0 && tsb2[off+1] == 0 && tsb2[off+2] == 1 && + SCT_ISAUDIO(st->es_type)) + ts_recv_pcr_audio(t, st, tsb2 + off, 188 - off); + if (!streaming_pad_probe_type(&t->s_streaming_pad, SMT_PACKET)) continue; diff --git a/src/parsers.h b/src/parsers.h index 1a4da09b5..daf0701e0 100644 --- a/src/parsers.h +++ b/src/parsers.h @@ -21,6 +21,27 @@ #include "packet.h" +static inline int64_t +getpts(const uint8_t *p) +{ + int a = p[0]; + int b = (p[1] << 8) | p[2]; + int c = (p[3] << 8) | p[4]; + + if ((a & 1) && (b & 1) && (c & 1)) { + + return + ((int64_t)((a >> 1) & 0x07) << 30) | + ((int64_t)((b >> 1) ) << 15) | + ((int64_t)((c >> 1) )) + ; + + } else { + // Marker bits not present + return PTS_UNSET; + } +} + void parse_mpeg_ts(struct service *t, struct elementary_stream *st, const uint8_t *data, int len, int start, int err); diff --git a/src/parsers/parsers.c b/src/parsers/parsers.c index 34421eabf..17cc33d3d 100644 --- a/src/parsers/parsers.c +++ b/src/parsers/parsers.c @@ -66,28 +66,6 @@ pts_no_backlog(int64_t pts) return pts_is_backlog(pts) ? (pts & ~PTS_BACKLOG) : pts; } -static int64_t -getpts(const uint8_t *p) -{ - int a = p[0]; - int b = (p[1] << 8) | p[2]; - int c = (p[3] << 8) | p[4]; - - if((a & 1) && (b & 1) && (c & 1)) { - - return - ((int64_t)((a >> 1) & 0x07) << 30) | - ((int64_t)((b >> 1) ) << 15) | - ((int64_t)((c >> 1) )) - ; - - } else { - // Marker bits not present - return PTS_UNSET; - } -} - - static int parse_mpeg2video(service_t *t, elementary_stream_t *st, size_t len, uint32_t next_startcode, int sc_offset); @@ -1265,7 +1243,7 @@ parse_mpeg2video(service_t *t, elementary_stream_t *st, size_t len, } if (st->es_buf.sb_err) { - pkt->pkt_err = 1; + pkt->pkt_err = st->es_buf.sb_err; st->es_buf.sb_err = 0; } if (metalen) { @@ -1822,25 +1800,52 @@ parse_teletext(service_t *t, elementary_stream_t *st, const uint8_t *data, } } +/** + * + */ +static th_pkt_t * +parser_error_packet(service_t *t, elementary_stream_t *st, int error) +{ + th_pkt_t *pkt; + + pkt = pkt_alloc(st->es_type, NULL, 0, PTS_UNSET, PTS_UNSET, t->s_current_pcr); + pkt->pkt_err = error; + return pkt; +} + /** * */ static void parser_deliver(service_t *t, elementary_stream_t *st, th_pkt_t *pkt) { - int64_t diff; + int64_t d, diff; assert(pkt->pkt_type == st->es_type); + if (pkt->pkt_dts == PTS_UNSET) { + if (pkt->pkt_pts == PTS_UNSET && !pkt->pkt_payload) /* error notification */ + goto deliver; + pkt_trace(LS_PARSER, pkt, "dts drop"); + pkt_ref_dec(pkt); + pkt = parser_error_packet(t, st, 1); + goto deliver; + } + diff = st->es_type == SCT_DVBSUB ? 6*90000 : 4*90000; + d = pts_diff(pkt->pkt_pcr, (pkt->pkt_dts + 30000) & PTS_MASK); - if (pts_diff(pkt->pkt_pcr, pkt->pkt_pts) > diff) { + if (d > diff || d == PTS_UNSET) { if (tvhlog_limit(&st->es_pcr_log, 2)) - tvhwarn(LS_PARSER, "%s: PTS and PCR diff is very big (%"PRId64")", + tvhwarn(LS_PARSER, "%s: DTS and PCR diff is very big (%"PRId64")", service_component_nicename(st), pts_diff(pkt->pkt_pcr, pkt->pkt_pts)); - goto end; + pkt_trace(LS_PARSER, pkt, "pcr drop"); + pkt_ref_dec(pkt); + pkt = parser_error_packet(t, st, 1); + goto deliver; } +deliver: pkt->pkt_componentindex = st->es_index; pkt_trace(LS_PARSER, pkt, "deliver"); @@ -1861,7 +1866,6 @@ parser_deliver(service_t *t, elementary_stream_t *st, th_pkt_t *pkt) /* Forward packet */ streaming_pad_deliver(&t->s_streaming_pad, streaming_msg_create_pkt(pkt)); -end: /* Decrease our own reference to the packet */ pkt_ref_dec(pkt); @@ -1877,8 +1881,7 @@ parser_deliver_error(service_t *t, elementary_stream_t *st) if (!st->es_buf.sb_err) return; - pkt = pkt_alloc(st->es_type, NULL, 0, PTS_UNSET, PTS_UNSET, t->s_current_pcr); - pkt->pkt_err = st->es_buf.sb_err; + pkt = parser_error_packet(t, st, st->es_buf.sb_err); parser_deliver(t, st, pkt); st->es_buf.sb_err = 0; } diff --git a/src/service.c b/src/service.c index 5063a8078..c03364ffe 100644 --- a/src/service.c +++ b/src/service.c @@ -264,6 +264,9 @@ stream_init(elementary_stream_t *st) { st->es_cc = -1; + st->es_last_pcr = PTS_UNSET; + st->es_last_pcr_dts = PTS_UNSET; + st->es_incomplete = 0; st->es_header_mode = 0; st->es_parser_state = 0; @@ -707,6 +710,7 @@ service_start(service_t *t, int instance, int weight, int flags, t->s_status = SERVICE_RUNNING; t->s_current_pcr = PTS_UNSET; t->s_candidate_pcr = PTS_UNSET; + t->s_current_pcr_guess = 0; /** * Initialize stream diff --git a/src/service.h b/src/service.h index 8bb6794dd..be0a44d80 100644 --- a/src/service.h +++ b/src/service.h @@ -85,6 +85,10 @@ typedef struct elementary_stream { int es_peak_presentation_delay; /* Max seen diff. of DTS and PTS */ + /* PCR clocks */ + int64_t es_last_pcr; + int64_t es_last_pcr_dts; + /* For service stream packet reassembly */ sbuf_t es_buf; @@ -477,7 +481,7 @@ typedef struct service { */ struct elementary_stream_queue s_components; struct elementary_stream_queue s_filt_components; - int s_last_pid; + short s_last_pid; elementary_stream_t *s_last_es; /** @@ -489,6 +493,7 @@ typedef struct service { int64_t s_current_pcr; int64_t s_candidate_pcr; + uint8_t s_current_pcr_guess; /* * Local channel numbers per bouquet -- 2.47.3