]> git.ipfire.org Git - thirdparty/tvheadend.git/commitdiff
SAT>IP client: make code more robust for out-of-order RTP packets, issue #4621
authorJaroslav Kysela <perex@perex.cz>
Sun, 22 Oct 2017 13:59:47 +0000 (15:59 +0200)
committerJaroslav Kysela <perex@perex.cz>
Sun, 22 Oct 2017 14:17:37 +0000 (16:17 +0200)
src/input/mpegts/satip/satip_frontend.c
src/input/mpegts/satip/satip_private.h

index a9b2ebebc74e2e22bcb4302cd03433bf03d4009d..6dcec39e229cbc25922049bd0588a58133edd420 100644 (file)
@@ -38,6 +38,65 @@ typedef enum rtp_transport_mode
 } rtp_transport_mode_t;
 
 
+/*
+ *
+ */
+static int
+udp_rtp_packet_cmp( const void *_a, const void *_b )
+{
+  const struct satip_udppkt *a = _a, *b = _b;
+  int seq1 = a->up_data_seq, seq2 = b->up_data_seq;
+  if (seq1 < 0x4000 && seq2 > 0xc000) return 1;
+  if (seq2 < 0x4000 && seq1 > 0xc000) return -1;
+  return seq1 - seq2;
+}
+
+static void
+udp_rtp_packet_free( satip_frontend_t *lfe, struct satip_udppkt *up )
+{
+  free(up->up_data);
+  free(up);
+}
+
+static void
+udp_rtp_packet_remove( satip_frontend_t *lfe, struct satip_udppkt *up )
+{
+  TAILQ_REMOVE(&lfe->sf_udp_packets, up, up_link);
+  lfe->sf_udp_packets_count--;
+}
+
+static void
+udp_rtp_packet_destroy( satip_frontend_t *lfe, struct satip_udppkt *up )
+{
+  udp_rtp_packet_remove(lfe, up);
+  udp_rtp_packet_free(lfe, up);
+}
+
+static void
+udp_rtp_packet_destroy_all( satip_frontend_t *lfe )
+{
+  struct satip_udppkt *up;
+
+  while ((up = TAILQ_FIRST(&lfe->sf_udp_packets)) != NULL)
+    udp_rtp_packet_destroy(lfe, up);
+}
+
+static void
+udp_rtp_packet_append( satip_frontend_t *lfe, uint8_t *p, int len, uint16_t seq )
+{
+  struct satip_udppkt *up = malloc(sizeof(*up));
+  if (len > 0) {
+    up->up_data = malloc(len);
+    memcpy(up->up_data, p, len);
+  } else {
+    up->up_data = NULL;
+  }
+  up->up_data_len = len;
+  up->up_data_seq = seq;
+  TAILQ_INSERT_SORTED(&lfe->sf_udp_packets, up, up_link, udp_rtp_packet_cmp);
+  lfe->sf_udp_packets_count++;
+}
+
 /*
  *
  */
@@ -1362,12 +1421,79 @@ satip_frontend_close_rtsp
   http_client_close(rtsp);
 }
 
+static int
+satip_frontend_rtp_decode
+  ( satip_frontend_t *lfe, uint32_t *_seq, uint32_t *_unc,
+    uint8_t *p, int c )
+{
+  int pos, len, seq = *_seq, nseq;
+  struct satip_udppkt *up;
+
+  /* Strip RTP header */
+  if (c < 12)
+    return 0;
+  if ((p[0] & 0xc0) != 0x80)
+    return 0;
+  if ((p[1] & 0x7f) != 33)
+    return 0;
+  pos = ((p[0] & 0x0f) * 4) + 12;
+  if (p[0] & 0x10) {
+    if (c < pos + 4)
+      return 0;
+    pos += (((p[pos+2] << 8) | p[pos+3]) + 1) * 4;
+  }
+  len = c - pos;
+  if (c < pos || (len % 188) != 0)
+    return 0;
+  /* Use uncorrectable value to notify RTP delivery issues */
+  nseq = (p[2] << 8) | p[3];
+  if (seq == -1)
+    seq = nseq;
+  else if (((seq + 1) & 0xffff) != nseq) {
+    if (lfe->sf_udp_packets_count > 5) {
+      up = TAILQ_FIRST(&lfe->sf_udp_packets);
+      tvhtrace(LS_SATIP, "RTP discontinuity, reset sequence to %d from %d", up->up_data_seq, (seq + 1) & 0xffff);
+      *_seq = (up->up_data_seq - 1) & 0xffff;
+      *_unc += (up->up_data_len / 188) * (uint32_t)((uint16_t)up->up_data_seq-(uint16_t)(seq+1));
+    }
+    tvhtrace(LS_SATIP, "RTP discontinuity (%i != %i), queueing packet", seq + 1, nseq);
+    udp_rtp_packet_append(lfe, p, c, nseq);
+    len = 0;
+    goto next;
+  }
+  *_seq = nseq;
+  if (len == 0)
+    goto next;
+  /* Process */
+  if (lfe->sf_skip_ts > 0) {
+    if (lfe->sf_skip_ts < len) {
+      pos += lfe->sf_skip_ts;
+      lfe->sf_skip_ts = 0;
+      goto wrdata;
+    } else {
+      lfe->sf_skip_ts -= len;
+    }
+  } else {
+wrdata:
+    tsdebug_write((mpegts_mux_t *)lfe->sf_curmux, p + pos, len);
+    sbuf_append(&lfe->sf_sbuf, p + pos, len);
+  }
+next:
+  up = TAILQ_FIRST(&lfe->sf_udp_packets);
+  if (up && ((*_seq + 1) & 0xffff) == up->up_data_seq) {
+    udp_rtp_packet_remove(lfe, up);
+    tvhtrace(LS_SATIP, "RTP discontinuity, requeueing packet (%i)", up->up_data_seq);
+    len = satip_frontend_rtp_decode(lfe, _seq, _unc, up->up_data, up->up_data_len);
+    udp_rtp_packet_free(lfe, up);
+  }
+  return len;
+}
+
 static int
 satip_frontend_rtp_data_received( http_client_t *hc, void *buf, size_t len )
 {
-  int c, pos, r;
-  uint32_t nseq, unc;
-  uint8_t *b = buf, *p;
+  uint32_t unc;
+  uint8_t *b = buf;
   satip_frontend_t *lfe = hc->hc_aux;
   mpegts_mux_instance_t *mmi;
 
@@ -1376,52 +1502,9 @@ satip_frontend_rtp_data_received( http_client_t *hc, void *buf, size_t len )
 
   if (b[1] == 0) {
 
-    p = b   + 4;
-    c = len - 4;
-
-    /* Strip RTP header */
-    if (c < 12)
-      return 0;
-    if ((p[0] & 0xc0) != 0x80)
+    unc = 0;
+    if (satip_frontend_rtp_decode(lfe, &lfe->sf_seq, &unc, b + 4, len - 4) == 0)
       return 0;
-    if ((p[1] & 0x7f) != 33)
-      return 0;
-    pos = ((p[0] & 0x0f) * 4) + 12;
-    if (p[0] & 0x10) {
-      if (c < pos + 4)
-        return 0;
-      pos += (((p[pos+2] << 8) | p[pos+3]) + 1) * 4;
-    }
-    r = c - pos;
-    if (c < pos || (r % 188) != 0)
-      return 0;
-    /* Use uncorrectable value to notify RTP delivery issues */
-    nseq = (p[2] << 8) | p[3];
-    unc  = 0;
-    if (lfe->sf_seq == -1)
-      lfe->sf_seq = nseq;
-    else if (((lfe->sf_seq + 1) & 0xffff) != nseq) {
-      unc = ((c - pos) / 188) * (uint32_t)((uint16_t)nseq-(uint16_t)(lfe->sf_seq+1));
-      tvhtrace(LS_SATIP, "TCP/RTP discontinuity (%i != %i)", lfe->sf_seq + 1, nseq);
-    }
-    lfe->sf_seq = nseq;
-    if (r == 0)
-      return 0;
-
-    /* Process */
-    if (lfe->sf_skip_ts > 0) {
-      if (lfe->sf_skip_ts < r) {
-        pos += lfe->sf_skip_ts;
-        lfe->sf_skip_ts = 0;
-        goto wrdata;
-      } else {
-        lfe->sf_skip_ts -= r;
-      }
-    } else {
-wrdata:
-      tsdebug_write((mpegts_mux_t *)lfe->sf_curmux, p + pos, c - pos);
-      sbuf_append(&lfe->sf_sbuf, p + pos, c - pos);
-    }
 
     if (lfe->sf_sbuf.sb_ptr > 64 * 1024 ||
         lfe->sf_last_data_tstamp + sec2mono(1) <= mclk()) {
@@ -1468,15 +1551,14 @@ satip_frontend_input_thread ( void *aux )
   char buf[256];
   struct iovec *iovec;
   uint8_t b[2048], session[32];
-  uint8_t *p;
   sbuf_t *sb;
-  int pos, nfds, i, r, tc, rtp_port, start = 0;
+  int nfds, i, r, tc, rtp_port, start = 0;
   size_t c;
   tvhpoll_event_t ev[3];
   tvhpoll_t *efd;
   int changing, ms, fatal, running, play2, exit_flag;
   int rtsp_flags, position, reply;
-  uint32_t seq, nseq, unc;
+  uint32_t seq, unc;
   udp_multirecv_t um;
   uint64_t u64, u64_2;
   long stream_id;
@@ -1499,6 +1581,7 @@ satip_frontend_input_thread ( void *aux )
    * New tune
    */
 new_tune:
+  udp_rtp_packet_destroy_all(lfe);
   sbuf_free(sb);
   udp_multirecv_free(&um);
   udp_close(rtcp);
@@ -1774,6 +1857,7 @@ new_tune:
 
   udp_multirecv_init(&um, RTP_PKTS, RTP_PKT_SIZE);
   sbuf_init_fixed(sb, RTP_PKTS * RTP_PKT_SIZE);
+  udp_rtp_packet_destroy_all(lfe);
   lfe->sf_skip_ts = MINMAX(lfe->sf_device->sd_skip_ts, 0, 200) * 188;
   
   while ((reply || running) && !fatal) {
@@ -1963,50 +2047,8 @@ new_tune:
     }
 
     for (i = 0, unc = 0; i < tc; i++) {
-      p = iovec[i].iov_base;
-      c = iovec[i].iov_len;
-
-      /* Strip RTP header */
-      if (c < 12)
-        continue;
-      if ((p[0] & 0xc0) != 0x80)
-        continue;
-      if ((p[1] & 0x7f) != 33)
+      if (satip_frontend_rtp_decode(lfe, &seq, &unc, iovec[i].iov_base + 4, iovec[i].iov_len - 4) == 0)
         continue;
-      pos = ((p[0] & 0x0f) * 4) + 12;
-      if (p[0] & 0x10) {
-        if (c < pos + 4)
-          continue;
-        pos += (((p[pos+2] << 8) | p[pos+3]) + 1) * 4;
-      }
-      r = c - pos;
-      if (c < pos || (r % 188) != 0)
-        continue;
-      /* Use uncorrectable value to notify RTP delivery issues */
-      nseq = (p[2] << 8) | p[3];
-      if (seq == -1)
-        seq = nseq;
-      else if (((seq + 1) & 0xffff) != nseq) {
-        unc += ((c - pos) / 188) * (uint32_t)((uint16_t)nseq-(uint16_t)(seq+1));
-        tvhtrace(LS_SATIP, "RTP discontinuity (%i != %i)", seq + 1, nseq);
-      }
-      seq = nseq;
-      if (r == 0)
-        continue;
-      /* Process */
-      if (lfe->sf_skip_ts > 0) {
-        if (lfe->sf_skip_ts < r) {
-          pos += lfe->sf_skip_ts;
-          lfe->sf_skip_ts = 0;
-          goto wrdata;
-        } else {
-          lfe->sf_skip_ts -= r;
-        }
-      } else {
-wrdata:
-        tsdebug_write((mpegts_mux_t *)lm, p + pos, c - pos);
-        sbuf_append(sb, p + pos, c - pos);
-      }
     }
     pthread_mutex_lock(&lfe->sf_dvr_lock);
     if (lfe->sf_req == lfe->sf_req_thread) {
@@ -2018,6 +2060,7 @@ wrdata:
   }
 
   sbuf_free(sb);
+  udp_rtp_packet_destroy_all(lfe);
   udp_multirecv_free(&um);
   lfe->sf_curmux = NULL;
 
@@ -2354,6 +2397,8 @@ satip_frontend_delete ( satip_frontend_t *lfe )
 
   free(lfe->sf_type_override);
 
+  udp_rtp_packet_destroy_all(lfe);
+
   /* Finish */
   mpegts_input_delete((mpegts_input_t*)lfe, 0);
 }
index 64ffa90af472d8faa30388fe31a211c022d093ef..753df8a21790019904bf58ac36ca10647d3d332e 100644 (file)
@@ -115,6 +115,13 @@ struct satip_tune_req {
   int                        sf_netposhash;
 };
 
+struct satip_udppkt {
+  TAILQ_ENTRY(satip_udppkt)  up_link;
+  uint8_t                   *up_data;
+  uint16_t                   up_data_len;
+  uint16_t                   up_data_seq;
+};
+
 struct satip_frontend
 {
   mpegts_input_t;
@@ -168,6 +175,8 @@ struct satip_frontend
   int                        sf_netlimit;
   int                        sf_netgroup;
   int                        sf_netposhash;
+  TAILQ_HEAD(,satip_udppkt)  sf_udp_packets;
+  int                        sf_udp_packets_count;
  
   /*
    * Configuration