]> git.ipfire.org Git - thirdparty/tvheadend.git/commitdiff
Add a level of indirection between a packet and its payload.
authorAndreas Öman <andreas@lonelycoder.com>
Thu, 24 Jun 2010 11:29:22 +0000 (11:29 +0000)
committerAndreas Öman <andreas@lonelycoder.com>
Thu, 24 Jun 2010 11:29:22 +0000 (11:29 +0000)
This allows the code to modify packets (which are non-mutable due to reference counting)
by copying them without having to also copy the packet payload.

src/dvr/mkmux.c
src/htsp.c
src/packet.c
src/packet.h
src/parser_latm.c
src/parsers.c
src/plumbing/globalheaders.c
src/plumbing/tsfix.c
src/rtsp.c
src/streaming.c
src/streaming.h

index 8234e479bb336c163e8473b391bfeebd11b92948..afa7b8fa02fabb161960c1a0e20e623f03d20e7f 100644 (file)
@@ -221,13 +221,14 @@ mk_build_tracks(mk_mux_t *mkm, const struct streaming_start *ss)
     if(ssc->ssc_lang[0])
       ebml_append_string(t, 0x22b59c, ssc->ssc_lang);
     
-    if(ssc->ssc_global_header_len) {
+    if(ssc->ssc_gh) {
       switch(ssc->ssc_type) {
       case SCT_H264:
       case SCT_MPEG2VIDEO:
       case SCT_AAC:
-       ebml_append_bin(t, 0x63a2, ssc->ssc_global_header,
-                      ssc->ssc_global_header_len);
+       ebml_append_bin(t, 0x63a2, 
+                       pktbuf_ptr(ssc->ssc_gh),
+                       pktbuf_len(ssc->ssc_gh));
        break;
       }
     }
@@ -616,8 +617,8 @@ mk_write_frame_i(mk_mux_t *mkm, mk_track *t, th_pkt_t *pkt)
   }
 
 
-  data = pkt->pkt_payload;
-  len  = pkt->pkt_payloadlen;
+  data = pktbuf_ptr(pkt->pkt_payload);
+  len  = pktbuf_len(pkt->pkt_payload);
 
   if(t->type == SCT_AAC) {
     // Skip ADTS header
@@ -659,7 +660,7 @@ mk_mux_write_pkt(mk_mux_t *mkm, struct th_pkt *pkt)
   
   if(t != NULL) {
     if(t->merge)
-      pkt = pkt_merge_global(pkt);
+      pkt = pkt_merge_header(pkt);
     mk_write_frame_i(mkm, t, pkt);
   }
   
index 54008c6b4d118889468a763cb2e7998b2e7cc7d8..2dc820b53a4841168cb000eb4e983aecb5f89605 100644 (file)
@@ -75,7 +75,7 @@ typedef struct htsp_msg {
   int hm_payloadsize;         /* For maintaining stats about streaming
                                 buffer depth */
 
-  th_pkt_t *hm_pkt;     /* For keeping reference to packet.
+  pktbuf_t *hm_pb;      /* For keeping reference to packet payload.
                           hm_msg can contain messages that points
                           to packet payload so to avoid copy we
                           keep a reference here */
@@ -194,8 +194,8 @@ static void
 htsp_msg_destroy(htsp_msg_t *hm)
 {
   htsmsg_destroy(hm->hm_msg);
-  if(hm->hm_pkt != NULL)
-    pkt_ref_dec(hm->hm_pkt);
+  if(hm->hm_pb != NULL)
+    pktbuf_ref_dec(hm->hm_pb);
   free(hm);
 }
 
@@ -247,13 +247,15 @@ htsp_subscription_destroy(htsp_connection_t *htsp, htsp_subscription_t *hs)
  *
  */
 static void
-htsp_send(htsp_connection_t *htsp, htsmsg_t *m, th_pkt_t *pkt,
+htsp_send(htsp_connection_t *htsp, htsmsg_t *m, pktbuf_t *pb,
          htsp_msg_q_t *hmq, int payloadsize)
 {
   htsp_msg_t *hm = malloc(sizeof(htsp_msg_t));
 
   hm->hm_msg = m;
-  hm->hm_pkt = pkt;
+  hm->hm_pb = pb;
+  if(pb != NULL)
+    pktbuf_ref_inc(pb);
   hm->hm_payloadsize = payloadsize;
   
   pthread_mutex_lock(&htsp->htsp_out_mutex);
@@ -1416,14 +1418,15 @@ htsp_stream_deliver(htsp_subscription_t *hs, th_pkt_t *pkt)
   uint32_t dur = ts_rescale(pkt->pkt_duration, 1000000);
   htsmsg_add_u32(m, "duration", dur);
   
-  pkt = pkt_merge_global(pkt);
+  pkt = pkt_merge_header(pkt);
 
   /**
    * Since we will serialize directly we use 'binptr' which is a binary
    * object that just points to data, thus avoiding a copy.
    */
-  htsmsg_add_binptr(m, "payload", pkt->pkt_payload, pkt->pkt_payloadlen);
-  htsp_send(htsp, m, pkt, &hs->hs_q, pkt->pkt_payloadlen);
+  htsmsg_add_binptr(m, "payload", pktbuf_ptr(pkt->pkt_payload),
+                   pktbuf_len(pkt->pkt_payload));
+  htsp_send(htsp, m, pkt->pkt_payload, &hs->hs_q, pktbuf_len(pkt->pkt_payload));
 
   if(hs->hs_last_report != dispatch_clock) {
     /* Send a queue status report every second */
@@ -1460,6 +1463,7 @@ htsp_stream_deliver(htsp_subscription_t *hs, th_pkt_t *pkt)
 
     htsp_send_message(hs->hs_htsp, m, &hs->hs_htsp->htsp_hmq_qstatus);
   }
+  pkt_ref_dec(pkt);
 }
 
 
index 9231a6e063429681eac7efb318933c8895b50723..972c4526ce1642dd6b44b83e42a6942a1b46f419 100644 (file)
 static void
 pkt_destroy(th_pkt_t *pkt)
 {
-  free(pkt->pkt_payload);
-  free(pkt->pkt_globaldata);
+  if(pkt->pkt_payload != NULL)
+    pktbuf_ref_dec(pkt->pkt_payload);
+
+  if(pkt->pkt_header != NULL)
+    pktbuf_ref_dec(pkt->pkt_header);
   free(pkt);
 }
 
@@ -44,13 +47,8 @@ pkt_alloc(const void *data, size_t datalen, int64_t pts, int64_t dts)
   th_pkt_t *pkt;
 
   pkt = calloc(1, sizeof(th_pkt_t));
-  pkt->pkt_payloadlen = datalen;
-  if(datalen > 0) {
-    pkt->pkt_payload = malloc(datalen);
-    if(data != NULL)
-      memcpy(pkt->pkt_payload, data, datalen);
-  }
-
+  if(datalen)
+    pkt->pkt_payload = pktbuf_alloc(data, datalen);
   pkt->pkt_dts = dts;
   pkt->pkt_pts = pts;
   pkt->pkt_refcount = 1;
@@ -107,29 +105,32 @@ pktref_clear_queue(struct th_pktref_queue *q)
  *
  */
 th_pkt_t *
-pkt_merge_global(th_pkt_t *pkt)
+pkt_merge_header(th_pkt_t *pkt)
 {
   th_pkt_t *n;
+  size_t s;
 
-  if(pkt->pkt_globaldata == NULL)
+  if(pkt->pkt_header == NULL)
     return pkt;
 
   n = malloc(sizeof(th_pkt_t));
   *n = *pkt;
 
   n->pkt_refcount = 1;
-  n->pkt_globaldata = NULL;
-  n->pkt_globaldata_len = 0;
-  
-  n->pkt_payloadlen = pkt->pkt_globaldata_len + pkt->pkt_payloadlen;
+  n->pkt_header = NULL;
 
-  n->pkt_payload = malloc(n->pkt_payloadlen);
-  memcpy(n->pkt_payload, pkt->pkt_globaldata, pkt->pkt_globaldata_len);
-  memcpy(n->pkt_payload + pkt->pkt_globaldata_len, pkt->pkt_payload,
-        pkt->pkt_payloadlen);
+  s = pktbuf_len(pkt->pkt_payload) + pktbuf_len(pkt->pkt_header);
+  n->pkt_payload = pktbuf_alloc(NULL, s);
+
+  memcpy(pktbuf_ptr(n->pkt_payload),
+        pktbuf_ptr(pkt->pkt_header),
+        pktbuf_len(pkt->pkt_header));
+
+  memcpy(pktbuf_ptr(n->pkt_payload) + pktbuf_len(pkt->pkt_header),
+        pktbuf_ptr(pkt->pkt_payload),
+        pktbuf_len(pkt->pkt_payload));
 
   pkt_ref_dec(pkt);
-  
   return n;
 }
 
@@ -139,22 +140,18 @@ pkt_merge_global(th_pkt_t *pkt)
  *
  */
 th_pkt_t *
-pkt_copy(th_pkt_t *pkt)
+pkt_copy_shallow(th_pkt_t *pkt)
 {
   th_pkt_t *n = malloc(sizeof(th_pkt_t));
   *n = *pkt;
 
   n->pkt_refcount = 1;
 
-  if(pkt->pkt_globaldata_len) {
-    n->pkt_globaldata = malloc(pkt->pkt_globaldata_len);
-    memcpy(n->pkt_globaldata, pkt->pkt_globaldata, pkt->pkt_globaldata_len);
-  }
+  if(n->pkt_header)
+    pktbuf_ref_inc(n->pkt_header);
 
-  if(pkt->pkt_payloadlen) {
-    n->pkt_payload = malloc(pkt->pkt_payloadlen);
-    memcpy(n->pkt_payload, pkt->pkt_payload, pkt->pkt_payloadlen);
-  }
+  if(n->pkt_payload)
+    pktbuf_ref_inc(n->pkt_payload);
 
   return n;
 }
@@ -170,3 +167,45 @@ pktref_create(th_pkt_t *pkt)
   pr->pr_pkt = pkt;
   return pr;
 }
+
+
+
+void 
+pktbuf_ref_dec(pktbuf_t *pb)
+{
+  if((atomic_add(&pb->pb_refcount, -1)) == 1) {
+    free(pb->pb_data);
+    free(pb);
+  }
+}
+
+void
+pktbuf_ref_inc(pktbuf_t *pb)
+{
+  atomic_add(&pb->pb_refcount, 1);
+}
+
+pktbuf_t *
+pktbuf_alloc(const void *data, size_t size)
+{
+  pktbuf_t *pb = malloc(sizeof(pktbuf_t));
+  pb->pb_refcount = 1;
+  pb->pb_size = size;
+
+  if(size > 0) {
+    pb->pb_data = malloc(size);
+    if(data != NULL)
+      memcpy(pb->pb_data, data, size);
+  }
+  return pb;
+}
+
+pktbuf_t *
+pktbuf_make(void *data, size_t size)
+{
+  pktbuf_t *pb = malloc(sizeof(pktbuf_t));
+  pb->pb_refcount = 1;
+  pb->pb_size = size;
+  pb->pb_data = data;
+  return pb;
+}
index 57ed52137bf1b3a8719d8e1e8f9cdb41350d3466..0398c11221dd884773273bc63e3abed84e5b4aec 100644 (file)
 #define PACKET_H_
 
 
+typedef struct pktbuf {
+  int pb_refcount;
+  uint8_t *pb_data;
+  size_t pb_size;
+} pktbuf_t;
+
+
+
 /**
  * Packets
  */
@@ -42,11 +50,8 @@ typedef struct th_pkt {
   uint8_t pkt_channels;
   uint8_t pkt_sri;
 
-  uint8_t *pkt_payload;
-  int pkt_payloadlen;
-
-  uint8_t *pkt_globaldata;
-  int pkt_globaldata_len;
+  pktbuf_t *pkt_payload;
+  pktbuf_t *pkt_header;
 
 } th_pkt_t;
 
@@ -73,10 +78,21 @@ void pktref_clear_queue(struct th_pktref_queue *q);
 
 th_pkt_t *pkt_alloc(const void *data, size_t datalen, int64_t pts, int64_t dts);
 
-th_pkt_t *pkt_merge_global(th_pkt_t *pkt);
+th_pkt_t *pkt_merge_header(th_pkt_t *pkt);
 
-th_pkt_t *pkt_copy(th_pkt_t *pkt);
+th_pkt_t *pkt_copy_shallow(th_pkt_t *pkt);
 
 th_pktref_t *pktref_create(th_pkt_t *pkt);
 
+void pktbuf_ref_dec(pktbuf_t *pb);
+
+void pktbuf_ref_inc(pktbuf_t *pb);
+
+pktbuf_t *pktbuf_alloc(const void *data, size_t size);
+
+pktbuf_t *pktbuf_make(void *data, size_t size);
+
+#define pktbuf_len(pb) ((pb)->pb_size)
+#define pktbuf_ptr(pb) ((pb)->pb_data)
+
 #endif /* PACKET_H_ */
index d1a318f3daa56ff616b7b4c3c6dff1da1806f69f..d04cb68292f3f7e497f217a04ecb0273e13b2a0d 100644 (file)
@@ -194,17 +194,15 @@ parse_latm_audio_mux_element(th_transport_t *t, th_stream_t *st, uint8_t *data,
   if(st->st_curdts == PTS_UNSET)
     return NULL;
 
-  th_pkt_t *pkt = pkt_alloc(NULL, 0, st->st_curdts, st->st_curdts);
+  th_pkt_t *pkt = pkt_alloc(NULL, slot_len + 7, st->st_curdts, st->st_curdts);
 
   pkt->pkt_commercial = t->tht_tt_commercial_advice;
-  pkt->pkt_payloadlen = slot_len + 7;
-  pkt->pkt_payload = malloc(pkt->pkt_payloadlen);
   pkt->pkt_duration = st->st_frame_duration;
   pkt->pkt_sri = latm->sample_rate_index;
   pkt->pkt_channels = latm->channel_config;
 
   /* 7 bytes of ADTS header */
-  init_bits(&out, pkt->pkt_payload, 56);
+  init_bits(&out, pktbuf_ptr(pkt->pkt_payload), 56);
 
   put_bits(&out, 0xfff, 12); // Sync marker
   put_bits(&out, 0, 1);      // ID 0 = MPEG 4
@@ -226,7 +224,7 @@ parse_latm_audio_mux_element(th_transport_t *t, th_stream_t *st, uint8_t *data,
   assert(remaining_bits(&out) == 0);
 
   /* AAC RDB */
-  buf = pkt->pkt_payload + 7;
+  buf = pktbuf_ptr(pkt->pkt_payload) + 7;
   for(i = 0; i < slot_len; i++)
     *buf++ = read_bits(&bs, 8);
 
index 8011020424232d5af17ac287268a64d7577a007f..3e4bd082e1ebf694ad538a469988b9cf3944c8ef 100644 (file)
@@ -925,15 +925,13 @@ parse_mpeg2video(th_transport_t *t, th_stream_t *st, size_t len,
       }
 
       if(st->st_global_data) {
-       pkt->pkt_globaldata = st->st_global_data;
-       pkt->pkt_globaldata_len = st->st_global_data_len;
-       
+       pkt->pkt_header = pktbuf_make(st->st_global_data,
+                                     st->st_global_data_len);
        st->st_global_data = NULL;
        st->st_global_data_len = 0;
       }
 
-      pkt->pkt_payload = st->st_buffer;
-      pkt->pkt_payloadlen = st->st_buffer_ptr - 4;
+      pkt->pkt_payload = pktbuf_make(st->st_buffer, st->st_buffer_ptr - 4);
       pkt->pkt_duration = st->st_frame_duration;
 
       parser_deliver(t, st, pkt);
@@ -1062,14 +1060,13 @@ parse_h264(th_transport_t *t, th_stream_t *st, size_t len,
     if(pkt != NULL) {
       
       if(st->st_global_data) {
-       pkt->pkt_globaldata = st->st_global_data;
-       pkt->pkt_globaldata_len = st->st_global_data_len;
+       pkt->pkt_header = pktbuf_make(st->st_global_data,
+                                     st->st_global_data_len);
        st->st_global_data = NULL;
        st->st_global_data_len = 0;
       }
     
-      pkt->pkt_payload = st->st_buffer;
-      pkt->pkt_payloadlen = st->st_buffer_ptr - 4;
+      pkt->pkt_payload = pktbuf_make(st->st_buffer, st->st_buffer_ptr - 4);
       parser_deliver(t, st, pkt);
       
       st->st_curpkt = NULL;
@@ -1168,7 +1165,7 @@ parser_deliver(th_transport_t *t, th_stream_t *st, th_pkt_t *pkt)
         pkt->pkt_payloadlen);
 #endif
 
-  avgstat_add(&st->st_rate, pkt->pkt_payloadlen, dispatch_clock);
+  //  avgstat_add(&st->st_rate, pkt->pkt_payloadlen, dispatch_clock);
 
   /**
    * Input is ok
index 2677d195fa2e37921e5736cc6264b1897ca7e2ca..268cb7a2e94c1323bd456c617379b922a376a4c2 100644 (file)
@@ -20,7 +20,6 @@
 #include "tvhead.h"
 #include "streaming.h"
 #include "globalheaders.h"
-#include "avc.h"
 
 typedef struct globalheaders {
   streaming_target_t gh_input;
@@ -61,13 +60,13 @@ apply_header(streaming_start_component_t *ssc, th_pkt_t *pkt)
   if(ssc->ssc_frameduration == 0 && pkt->pkt_duration != 0)
     ssc->ssc_frameduration = pkt->pkt_duration;
 
-  if(ssc->ssc_global_header != NULL)
+  if(ssc->ssc_gh != NULL)
     return;
 
   switch(ssc->ssc_type) {
   case SCT_AAC:
-    d = ssc->ssc_global_header = malloc(2);
-    ssc->ssc_global_header_len = 2;
+    ssc->ssc_gh = pktbuf_alloc(NULL, 2);
+    d = pktbuf_ptr(ssc->ssc_gh);
 
     const int profile = 2;
     d[0] = (profile << 3) | ((pkt->pkt_sri & 0xe) >> 1);
@@ -76,13 +75,9 @@ apply_header(streaming_start_component_t *ssc, th_pkt_t *pkt)
 
   case SCT_H264:
   case SCT_MPEG2VIDEO:
-    if(pkt->pkt_globaldata != NULL) {
-      ssc->ssc_global_header = malloc(pkt->pkt_globaldata_len + 
-                                     FF_INPUT_BUFFER_PADDING_SIZE);
-      
-      memcpy(ssc->ssc_global_header, pkt->pkt_globaldata,
-            pkt->pkt_globaldata_len);
-      ssc->ssc_global_header_len = pkt->pkt_globaldata_len;
+    if(pkt->pkt_header != NULL) {
+      ssc->ssc_gh = pkt->pkt_header;
+      pktbuf_ref_inc(ssc->ssc_gh);
     }
     break;
 
@@ -110,7 +105,7 @@ headers_complete(globalheaders_t *gh)
        ssc->ssc_frameduration == 0)
       return 0;
   
-    if(ssc->ssc_global_header == NULL &&
+    if(ssc->ssc_gh == NULL &&
        (ssc->ssc_type == SCT_H264 ||
        ssc->ssc_type == SCT_MPEG2VIDEO ||
        ssc->ssc_type == SCT_AAC))
index f9cf8c30410e027e5aeb10a1084077dba298cb77..feaf0a2e446d6c8d8bd632a0b926f59d86e9621c 100644 (file)
@@ -206,7 +206,7 @@ normalize_ts(tsfix_t *tf, tfstream_t *tfs, th_pkt_t *pkt)
              pkt->pkt_dts,
              pkt->pkt_pts,
              pkt->pkt_duration,
-             pkt->pkt_payloadlen);
+             pktbuf_len(pkt->pkt_payload));
 
   streaming_message_t *sm = streaming_msg_create_pkt(pkt);
   streaming_target_deliver2(tf->tf_output, sm);
@@ -307,7 +307,7 @@ compute_pts(tsfix_t *tf, tfstream_t *tfs, th_pkt_t *pkt)
 static void
 tsfix_input_packet(tsfix_t *tf, streaming_message_t *sm)
 {
-  th_pkt_t *pkt = pkt_copy(sm->sm_data);
+  th_pkt_t *pkt = pkt_copy_shallow(sm->sm_data);
   tfstream_t *tfs = tfs_find(tf, pkt);
   streaming_msg_free(sm);
   
index 54876d679cffb544c2a7197909e665acae836680..f74b262282682eb22996c7f503f5ee13b773f636 100644 (file)
@@ -391,7 +391,7 @@ rtsp_streaming_send(rtsp_t *rtsp, th_pkt_t *pkt)
 {
   rtsp_stream_t *rs;
 
-  pkt = pkt_merge_global(pkt);
+  pkt = pkt_merge_header(pkt);
 
   LIST_FOREACH(rs, &rtsp->rtsp_streams, rs_link)
     if(rs->rs_index == pkt->pkt_componentindex)
@@ -402,12 +402,16 @@ rtsp_streaming_send(rtsp_t *rtsp, th_pkt_t *pkt)
 
   switch(rs->rs_type) {
   case SCT_MPEG2VIDEO:
-    rtp_send_mpv(rs->rs_sender, rs, &rs->rs_rtp, pkt->pkt_payload,
-                pkt->pkt_payloadlen, pkt->pkt_pts);
+    rtp_send_mpv(rs->rs_sender, rs, &rs->rs_rtp,
+                pktbuf_ptr(pkt->pkt_payload),
+                pktbuf_len(pkt->pkt_payload),
+                pkt->pkt_pts);
     break;
   case SCT_MPEG2AUDIO:
-    rtp_send_mpa(rs->rs_sender, rs, &rs->rs_rtp, pkt->pkt_payload,
-                pkt->pkt_payloadlen, pkt->pkt_pts);
+    rtp_send_mpa(rs->rs_sender, rs, &rs->rs_rtp,
+                pktbuf_ptr(pkt->pkt_payload),
+                pktbuf_len(pkt->pkt_payload),
+                pkt->pkt_pts);
     break;
   }
 }
index c3949872e646fccd972e5a029fc0ae177d6906e3..b7b1770b5fde4aea4142b14d39f327bd75c61736 100644 (file)
@@ -204,8 +204,8 @@ streaming_start_unref(streaming_start_t *ss)
 
   transport_source_info_free(&ss->ss_si);
   for(i = 0; i < ss->ss_num_components; i++)
-    free(ss->ss_components[i].ssc_global_header);
-
+    if(ss->ss_components[i].ssc_gh)
+      pktbuf_ref_dec(ss->ss_components[i].ssc_gh);
   free(ss);
 }
 
@@ -377,10 +377,8 @@ streaming_start_copy(const streaming_start_t *src)
 
   for(i = 0; i < dst->ss_num_components; i++) {
     streaming_start_component_t *ssc = &dst->ss_components[i];
-    if(ssc->ssc_global_header != NULL)
-      ssc->ssc_global_header = memcpy(malloc(ssc->ssc_global_header_len),
-                                     ssc->ssc_global_header,
-                                     ssc->ssc_global_header_len);
+    if(ssc->ssc_gh != NULL)
+      pktbuf_ref_inc(ssc->ssc_gh);
   }
 
   dst->ss_refcount = 1;
index acc0c54df7cda1f6853e9dec86cd72bbd3603fc4..7a439bba16343408f78c426f06d30b4fd97106cf 100644 (file)
@@ -33,8 +33,7 @@ typedef struct streaming_start_component {
   int16_t ssc_width;
   int16_t ssc_height;
 
-  uint8_t *ssc_global_header;
-  int ssc_global_header_len;
+  pktbuf_t *ssc_gh;
 
   int ssc_frameduration;