typedef struct mpegts_input mpegts_input_t;
typedef struct mpegts_table_feed mpegts_table_feed_t;
typedef struct mpegts_network_link mpegts_network_link_t;
+typedef struct mpegts_packet mpegts_packet_t;
+typedef struct mpegts_buffer mpegts_buffer_t;
/* Lists */
typedef LIST_HEAD (,mpegts_network) mpegts_network_list_t;
typedef TAILQ_HEAD(mpegts_mux_queue,mpegts_mux) mpegts_mux_queue_t;
typedef LIST_HEAD (,mpegts_mux) mpegts_mux_list_t;
typedef LIST_HEAD (,mpegts_network_link) mpegts_network_link_list_t;
-TAILQ_HEAD(mpegts_table_feed_queue, mpegts_table_feed);
+typedef TAILQ_HEAD(mpegts_table_feed_queue, mpegts_table_feed)
+ mpegts_table_feed_queue_t;
/* Classes */
extern const idclass_t mpegts_network_class;
extern const idclass_t mpegts_input_class;
/* **************************************************************************
- * SI processing
+ * Data / SI processing
* *************************************************************************/
+struct mpegts_packet
+{
+ TAILQ_ENTRY(mpegts_packet) mp_link;
+ size_t mp_len;
+ mpegts_mux_t *mp_mux;
+ uint8_t mp_data[0];
+};
+
typedef int (*mpegts_table_callback_t)
( mpegts_table_t*, const uint8_t *buf, int len, int tableid );
mpegts_network_link_list_t mi_networks;
- LIST_HEAD(,mpegts_mux_instance) mi_mux_active;
-
LIST_HEAD(,mpegts_mux_instance) mi_mux_instances;
+
/*
* Status
*/
* Input processing
*/
- pthread_mutex_t mi_delivery_mutex;
-
- LIST_HEAD(,service) mi_transports;
+ int mi_running;
+ /* Data input */
+ // Note: this section is protected by mi_input_lock
+ pthread_t mi_input_tid;
+ pthread_mutex_t mi_input_lock;
+ pthread_cond_t mi_input_cond;
+ TAILQ_HEAD(,mpegts_packet) mi_input_queue;
- struct mpegts_table_feed_queue mi_table_feed;
- pthread_cond_t mi_table_feed_cond; // Bound to mi_delivery_mutex
+ /* Data processing/output */
+ // Note: this lock (mi_output_lock) protects all the remaining
+ // data fields (excluding the callback functions)
+ pthread_mutex_t mi_output_lock;
-
- pthread_t mi_thread_id;
- th_pipe_t mi_thread_pipe;
-
- int mi_delivery_running;
- pthread_t mi_thread_table_id;
+ /* Active sources */
+ LIST_HEAD(,mpegts_mux_instance) mi_mux_active;
+ LIST_HEAD(,service) mi_transports;
+
+ /* Table processing */
+ pthread_t mi_table_tid;
+ pthread_cond_t mi_table_cond;
+ mpegts_table_feed_queue_t mi_table_queue;
/*
* Functions
mpegts_input_create0(calloc(1, sizeof(mpegts_input_t)),\
&mpegts_input_class, u, c)
+void mpegts_input_stop_all ( mpegts_input_t *mi );
+
void mpegts_input_delete ( mpegts_input_t *mi, int delconf );
#define mpegts_input_find(u) idnode_find(u, &mpegts_input_class);
return mm->mm_last_mp;
}
-size_t mpegts_input_recv_packets
- (mpegts_input_t *mi, mpegts_mux_instance_t *mmi, uint8_t *tsb, size_t len,
- int64_t *pcr, uint16_t *pcr_pid, const char *name);
-
-void mpegts_input_table_thread_start( mpegts_input_t *mi );
-
-void mpegts_input_table_thread_stop( mpegts_input_t *mi );
+void mpegts_input_recv_packets
+ (mpegts_input_t *mi, mpegts_mux_instance_t *mmi, sbuf_t *sb, size_t off,
+ int64_t *pcr, uint16_t *pcr_pid);
int mpegts_input_is_free ( mpegts_input_t *mi );
}
/* Service subs */
- pthread_mutex_lock(&mi->mi_delivery_mutex);
+ pthread_mutex_lock(&mi->mi_output_lock);
LIST_FOREACH(s, &mi->mi_transports, s_active_link) {
LIST_FOREACH(ths, &s->s_subscriptions, ths_service_link) {
w = MAX(w, ths->ths_weight);
}
}
- pthread_mutex_unlock(&mi->mi_delivery_mutex);
+ pthread_mutex_unlock(&mi->mi_output_lock);
return w;
}
elementary_stream_t *st;
/* Add to list */
- pthread_mutex_lock(&mi->mi_delivery_mutex);
+ pthread_mutex_lock(&mi->mi_output_lock);
if (!s->s_dvb_active_input) {
LIST_INSERT_HEAD(&mi->mi_transports, ((service_t*)s), s_active_link);
s->s_dvb_active_input = mi;
}
-
/* Register PIDs */
pthread_mutex_lock(&s->s_stream_mutex);
mi->mi_open_pid(mi, s->s_dvb_mux, s->s_pmt_pid, MPS_STREAM, s);
mi->mi_open_pid(mi, s->s_dvb_mux, st->es_pid, MPS_STREAM, s);
pthread_mutex_unlock(&s->s_stream_mutex);
- pthread_mutex_unlock(&mi->mi_delivery_mutex);
+ pthread_mutex_unlock(&mi->mi_output_lock);
}
void
elementary_stream_t *st;
/* Remove from list */
- pthread_mutex_lock(&mi->mi_delivery_mutex);
+ pthread_mutex_lock(&mi->mi_output_lock);
if (s->s_dvb_active_input != NULL) {
LIST_REMOVE(((service_t*)s), s_active_link);
s->s_dvb_active_input = NULL;
mi->mi_close_pid(mi, s->s_dvb_mux, st->es_pid, MPS_STREAM, s);
pthread_mutex_unlock(&s->s_stream_mutex);
- pthread_mutex_unlock(&mi->mi_delivery_mutex);
+ pthread_mutex_unlock(&mi->mi_output_lock);
/* Stop mux? */
s->s_dvb_mux->mm_stop(s->s_dvb_mux, 0);
{
int ret = 0;
service_t *t;
- pthread_mutex_lock(&mi->mi_delivery_mutex);
+ pthread_mutex_lock(&mi->mi_output_lock);
LIST_FOREACH(t, &mi->mi_transports, s_active_link) {
if (((mpegts_service_t*)t)->s_dvb_mux == mm) {
ret = 1;
break;
}
}
- pthread_mutex_unlock(&mi->mi_delivery_mutex);
+ pthread_mutex_unlock(&mi->mi_output_lock);
return ret;
}
* Data processing
* *************************************************************************/
-size_t
+static int inline
+ts_sync_count ( const uint8_t *tsb, int len )
+{
+ int i = 0;
+ while (len >= 188 && *tsb == 0x47) {
+ ++i;
+ len -= 188;
+ tsb += 188;
+ }
+ return i;
+}
+
+void
mpegts_input_recv_packets
- ( mpegts_input_t *mi, mpegts_mux_instance_t *mmi,
- uint8_t *tsb, size_t l, int64_t *pcr, uint16_t *pcr_pid,
- const char *name )
+ ( mpegts_input_t *mi, mpegts_mux_instance_t *mmi, sbuf_t *sb, size_t off,
+ int64_t *pcr, uint16_t *pcr_pid )
+{
+ int i, p = 0;
+ mpegts_packet_t *mp;
+ uint8_t *tsb = sb->sb_data + off;
+ int len = sb->sb_size - off;
+#define MIN_TS_PKT 10
+
+ /* Check for sync */
+ while ( (len >= (MIN_TS_PKT * 188)) &&
+ ((p = ts_sync_count(tsb, len)) < MIN_TS_PKT) ) {
+ --len;
+ ++tsb;
+ ++off;
+ }
+
+ // Note: we check for sync here so that the buffer can always be
+ // processed in its entirety inside the processing thread
+ // without the potential need to buffer data (since that would
+ // require per mmi buffers, where this is generally not required)
+
+ /* Extract PCR */
+ // Note: this is only used by tsfile for timing the play out of packets
+ // maybe we should move it?
+ if (pcr && pcr_pid) {
+ uint8_t *tmp = tsb;
+ for (i = 0; i < p; i++) {
+ if (*pcr_pid == (((tmp[1] & 0x1f) << 8) | tmp[2]))
+ ts_recv_packet1(NULL, tmp, pcr, 0);
+ tmp += 188;
+ }
+ }
+
+ /* Pass */
+ if (p >= 10) {
+ size_t sz = sizeof(mpegts_packet_t) + (p * 188);
+
+ mp = calloc(1, sz);
+ mp->mp_mux = mmi->mmi_mux;
+ mp->mp_len = p * 188;
+ memcpy(mp->mp_data, tsb, mp->mp_len);
+
+ pthread_mutex_lock(&mi->mi_input_lock);
+ if (TAILQ_FIRST(&mi->mi_input_queue) == NULL)
+ pthread_cond_signal(&mi->mi_input_cond);
+ TAILQ_INSERT_TAIL(&mi->mi_input_queue, mp, mp_link);
+ pthread_mutex_unlock(&mi->mi_input_lock);
+
+ len -= mp->mp_len;
+ off += mp->mp_len;
+ }
+
+ /* Adjust buffer */
+ if (len)
+ sbuf_cut(sb, off); // cut off the bottom
+ else
+ sb->sb_ptr = 0; // clear
+}
+
+static void
+mpegts_input_process
+ ( mpegts_input_t *mi, mpegts_packet_t *mp )
{
- int len = l;
+ int len = mp->mp_len;
int i = 0, table_wakeup = 0;
- int stream = 0;
- int table = 0;
- mpegts_mux_t *mm = mmi->mmi_mux;
+ int table, stream;
+ uint8_t *tsb = mp->mp_data;
+ mpegts_mux_t *mm = mp->mp_mux;
+ mpegts_mux_instance_t *mmi = mm->mm_active;
mpegts_pid_t *last_mp = NULL;
- assert(mm != NULL);
- assert(name != NULL);
-
- // TODO: get the input name
- tvhtrace("tsdemux", "%s - recv pkts tsb=%p len=%d pcr=%p pcr_pid=%p mmi=%p",
- name, tsb, (int)len, pcr, pcr_pid, mmi);
-
- /* Not enough data */
- if (len < 188) return len;
-
- /* Streaming - lock mutex */
- pthread_mutex_lock(&mi->mi_delivery_mutex);
/* Process */
while ( len >= 188 ) {
-
- /* Sync */
- if ( tsb[i] == 0x47 ) {
- mpegts_pid_t *mp;
- mpegts_pid_sub_t *mps;
- service_t *s;
- int pid = ((tsb[i+1] & 0x1f) << 8) | tsb[i+2];
- int64_t *ppcr = (pcr_pid && *pcr_pid == pid) ? pcr : NULL;
- tvhtrace("tsdemux", "%s - recv pkt for pid %04X (%d) on mmi %p",
- name, pid, pid, mmi);
-
- /* Find PID */
- if ((mp = mpegts_mux_find_pid(mm, pid, 0))) {
-
- if (mp != last_mp) {
- last_mp = mp;
- stream = 0;
- table = 0;
-
- /* Stream takes pref. */
+ mpegts_pid_t *mp;
+ mpegts_pid_sub_t *mps;
+ service_t *s;
+ int pid = ((tsb[i+1] & 0x1f) << 8) | tsb[i+2];
+
+ /* Find PID */
+ if ((mp = mpegts_mux_find_pid(mm, pid, 0))) {
+ // Note: there is a minor danger this caching will get things
+ // wrong for a brief period of time if the registrations on
+ // the PID change
+ if (mp != last_mp) {
+ if (pid == 0)
+ stream = table = 1;
+ else {
+ stream = table = 0;
+
+ /* Determine PID type */
RB_FOREACH(mps, &mp->mp_subs, mps_link) {
if (mps->mps_type & MPS_STREAM)
stream = 1;
if (mps->mps_type & MPS_TABLE)
table = 1;
+ if (table && stream) break;
}
/* Special case streams */
- if (pid == 0) table = stream = 1;
LIST_FOREACH(s, &mi->mi_transports, s_active_link) {
if (((mpegts_service_t*)s)->s_dvb_mux != mmi->mmi_mux) continue;
if (pid == s->s_pmt_pid) stream = 1;
else if (pid == s->s_pcr_pid) stream = 1;
}
}
+ }
- /* Stream data */
- if (stream) {
- LIST_FOREACH(s, &mi->mi_transports, s_active_link) {
- int f;
- if (((mpegts_service_t*)s)->s_dvb_mux != mmi->mmi_mux) continue;
- f = table || (pid == s->s_pmt_pid) || (pid == s->s_pcr_pid);
- ts_recv_packet1((mpegts_service_t*)s, tsb+i, ppcr, f);
- }
+ /* Stream data */
+ if (stream) {
+ LIST_FOREACH(s, &mi->mi_transports, s_active_link) {
+ int f;
+ if (((mpegts_service_t*)s)->s_dvb_mux != mmi->mmi_mux) continue;
+ f = table || (pid == s->s_pmt_pid) || (pid == s->s_pcr_pid);
+ ts_recv_packet1((mpegts_service_t*)s, tsb+i, NULL, f);
}
+ }
- /* Table data */
- if (table) {
- if (!(tsb[i+1] & 0x80)) {
- mpegts_table_feed_t *mtf = malloc(sizeof(mpegts_table_feed_t));
- memcpy(mtf->mtf_tsb, tsb+i, 188);
- mtf->mtf_mux = mm;
- TAILQ_INSERT_TAIL(&mi->mi_table_feed, mtf, mtf_link);
- table_wakeup = 1;
- } else {
- tvhdebug("tsdemux", "%s - SI packet had errors", name);
- }
+ /* Table data */
+ if (table) {
+ if (!(tsb[i+1] & 0x80)) {
+ // TODO: might be able to optimise this a bit by having slightly
+ // larger buffering and trying to aggregate data (if we get
+ // same PID multiple times in the loop)
+ mpegts_table_feed_t *mtf = malloc(sizeof(mpegts_table_feed_t));
+ memcpy(mtf->mtf_tsb, tsb+i, 188);
+ mtf->mtf_mux = mm;
+ TAILQ_INSERT_TAIL(&mi->mi_table_queue, mtf, mtf_link);
+ table_wakeup = 1;
+ } else {
+ //tvhdebug("tsdemux", "%s - SI packet had errors", name);
}
-
- /* Force PCR extraction for tsfile */
- } else {
- if (ppcr && *ppcr == PTS_UNSET)
- ts_recv_packet1(NULL, tsb+i, ppcr, 0);
}
-
- i += 188;
- len -= 188;
-
- /* Re-sync */
- } else {
- tvhdebug("tsdemux", "%s - ts sync lost", name);
- if (ts_resync(tsb, &len, &i)) break;
- tvhdebug("tsdemux", "%s - ts sync found", name);
}
+ i += 188;
+ len -= 188;
}
/* Raw stream */
- // Note: this will include unsynced data if that's what is received
if (i > 0 && LIST_FIRST(&mmi->mmi_streaming_pad.sp_targets) != NULL) {
streaming_message_t sm;
pktbuf_t *pb = pktbuf_alloc(tsb, i);
/* Wake table */
if (table_wakeup)
- pthread_cond_signal(&mi->mi_table_feed_cond);
-
- pthread_mutex_unlock(&mi->mi_delivery_mutex);
+ pthread_cond_signal(&mi->mi_table_cond);
/* Bandwidth monitoring */
atomic_add(&mmi->mmi_stats.bps, i);
-
- /* Reset buffer */
- if (len) memmove(tsb, tsb+i, len);
+}
+
+static void *
+mpegts_input_thread ( void * p )
+{
+ mpegts_packet_t *mp;
+ mpegts_input_t *mi = p;
+
+ pthread_mutex_lock(&mi->mi_input_lock);
+ while (mi->mi_running) {
- return len;
+ /* Wait for a packet */
+ if (!(mp = TAILQ_FIRST(&mi->mi_input_queue))) {
+ pthread_cond_wait(&mi->mi_input_cond, &mi->mi_input_lock);
+ continue;
+ }
+ TAILQ_REMOVE(&mi->mi_input_queue, mp, mp_link);
+ pthread_mutex_unlock(&mi->mi_input_lock);
+
+ /* Process */
+ pthread_mutex_lock(&mi->mi_output_lock);
+ if (mp->mp_mux && mp->mp_mux->mm_active)
+ mpegts_input_process(mi, mp);
+ pthread_mutex_unlock(&mi->mi_output_lock);
+
+ /* Cleanup */
+ free(mp);
+ pthread_mutex_lock(&mi->mi_input_lock);
+ }
+
+ /* Flush */
+ while ((mp = TAILQ_FIRST(&mi->mi_input_queue))) {
+ TAILQ_REMOVE(&mi->mi_input_queue, mp, mp_link);
+ free(mp);
+ }
+ pthread_mutex_unlock(&mi->mi_input_lock);
+
+ return NULL;
}
static void
mpegts_table_feed_t *mtf;
mpegts_input_t *mi = aux;
- pthread_mutex_lock(&mi->mi_delivery_mutex);
- while (mi->mi_delivery_running) {
+ pthread_mutex_lock(&mi->mi_output_lock);
+ while (mi->mi_running) {
/* Wait for data */
- while(!(mtf = TAILQ_FIRST(&mi->mi_table_feed))) {
- if (!mi->mi_delivery_running)
- break;
- pthread_cond_wait(&mi->mi_table_feed_cond, &mi->mi_delivery_mutex);
+ if (!(mtf = TAILQ_FIRST(&mi->mi_table_queue))) {
+ pthread_cond_wait(&mi->mi_table_cond, &mi->mi_output_lock);
+ continue;
}
- if (mtf)
- TAILQ_REMOVE(&mi->mi_table_feed, mtf, mtf_link);
+ TAILQ_REMOVE(&mi->mi_table_queue, mtf, mtf_link);
+ pthread_mutex_unlock(&mi->mi_output_lock);
/* Process */
- if (mtf) {
- pthread_mutex_unlock(&mi->mi_delivery_mutex);
+ if (mtf->mtf_mux) {
pthread_mutex_lock(&global_lock);
mpegts_input_table_dispatch(mtf->mtf_mux, mtf);
pthread_mutex_unlock(&global_lock);
- free(mtf);
- pthread_mutex_lock(&mi->mi_delivery_mutex);
}
- }
- while ((mtf = TAILQ_FIRST(&mi->mi_table_feed)) != NULL) {
- TAILQ_REMOVE(&mi->mi_table_feed, mtf, mtf_link);
+
+ /* Cleanup */
free(mtf);
+ pthread_mutex_lock(&mi->mi_output_lock);
}
- pthread_mutex_unlock(&mi->mi_delivery_mutex);
- return NULL;
-}
-void
-mpegts_input_table_thread_start( mpegts_input_t *mi )
-{
- mi->mi_delivery_running = 1;
- tvhthread_create(&mi->mi_thread_table_id, NULL,
- mpegts_input_table_thread, mi, 0);
-}
+ /* Flush */
+ while ((mtf = TAILQ_FIRST(&mi->mi_table_queue)) != NULL) {
+ TAILQ_REMOVE(&mi->mi_table_queue, mtf, mtf_link);
+ free(mtf);
+ }
+ pthread_mutex_unlock(&mi->mi_output_lock);
-void
-mpegts_input_table_thread_stop( mpegts_input_t *mi )
-{
- pthread_mutex_lock(&mi->mi_delivery_mutex);
- mi->mi_delivery_running = 0;
- pthread_cond_signal(&mi->mi_table_feed_cond);
- pthread_mutex_unlock(&mi->mi_delivery_mutex);
- pthread_join(mi->mi_thread_table_id, NULL);
+ return NULL;
}
void
mpegts_input_flush_mux
( mpegts_input_t *mi, mpegts_mux_t *mm )
{
- mpegts_table_feed_t *mtf, *next;
+ mpegts_table_feed_t *mtf;
+ mpegts_packet_t *mp;
- pthread_mutex_lock(&mi->mi_delivery_mutex);
- mtf = TAILQ_FIRST(&mi->mi_table_feed);
- while (mtf) {
- next = TAILQ_NEXT(mtf, mtf_link);
- if (mtf->mtf_mux == mm) {
- TAILQ_REMOVE(&mi->mi_table_feed, mtf, mtf_link);
- free(mtf);
- }
- mtf = next;
+ // Note: to avoid long delays in here, rather than actually
+ // remove things from the Q, we simply invalidate by clearing
+ // the mux pointer and allow the threads to deal with the deletion
+
+ /* Flush input Q */
+ pthread_mutex_lock(&mi->mi_input_lock);
+ TAILQ_FOREACH(mp, &mi->mi_input_queue, mp_link) {
+ if (mp->mp_mux == mm)
+ mp->mp_mux = NULL;
+ }
+ pthread_mutex_unlock(&mi->mi_input_lock);
+
+ /* Flush table Q */
+ pthread_mutex_lock(&mi->mi_output_lock);
+ TAILQ_FOREACH(mtf, &mi->mi_table_queue, mtf_link) {
+ if (mtf->mtf_mux == mm)
+ mtf->mtf_mux = NULL;
}
- pthread_mutex_unlock(&mi->mi_delivery_mutex);
+ pthread_mutex_unlock(&mi->mi_output_lock);
}
static void
mpegts_input_t *mi = (mpegts_input_t*)i;
mpegts_mux_instance_t *mmi;
- pthread_mutex_lock(&mi->mi_delivery_mutex);
+ pthread_mutex_lock(&mi->mi_output_lock);
LIST_FOREACH(mmi, &mi->mi_mux_active, mmi_active_link) {
st = calloc(1, sizeof(tvh_input_stream_t));
mpegts_input_stream_status(mmi, st);
LIST_INSERT_HEAD(isl, st, link);
}
- pthread_mutex_unlock(&mi->mi_delivery_mutex);
+ pthread_mutex_unlock(&mi->mi_output_lock);
+}
+
+static void
+mpegts_input_thread_start ( mpegts_input_t *mi )
+{
+ mi->mi_running = 1;
+
+ tvhthread_create(&mi->mi_table_tid, NULL,
+ mpegts_input_table_thread, mi, 0);
+ tvhthread_create(&mi->mi_input_tid, NULL,
+ mpegts_input_thread, mi, 0);
+}
+
+static void
+mpegts_input_thread_stop ( mpegts_input_t *mi )
+{
+ mi->mi_running = 0;
+
+ /* Stop input thread */
+ pthread_mutex_lock(&mi->mi_input_lock);
+ pthread_cond_signal(&mi->mi_input_cond);
+ pthread_mutex_unlock(&mi->mi_input_lock);
+
+ /* Stop table thread */
+ pthread_mutex_lock(&mi->mi_output_lock);
+ pthread_cond_signal(&mi->mi_table_cond);
+ pthread_mutex_unlock(&mi->mi_output_lock);
+
+ /* Join threads */
+ pthread_join(mi->mi_input_tid, NULL);
+ pthread_join(mi->mi_table_tid, NULL);
}
/* **************************************************************************
mpegts_input_t *mi = p;
mpegts_mux_instance_t *mmi;
htsmsg_t *e;
- pthread_mutex_lock(&mi->mi_delivery_mutex);
+ pthread_mutex_lock(&mi->mi_output_lock);
LIST_FOREACH(mmi, &mi->mi_mux_active, mmi_active_link) {
memset(&st, 0, sizeof(st));
mpegts_input_stream_status(mmi, &st);
notify_by_msg("input_status", e);
tvh_input_stream_destroy(&st);
}
- pthread_mutex_unlock(&mi->mi_delivery_mutex);
+ pthread_mutex_unlock(&mi->mi_output_lock);
gtimer_arm(&mi->mi_status_timer, mpegts_input_status_timer, mi, 1);
}
/* Index */
mi->mi_instance = ++mpegts_input_idx;
- /* Init mutex */
- pthread_mutex_init(&mi->mi_delivery_mutex, NULL);
-
- /* Table input */
- TAILQ_INIT(&mi->mi_table_feed);
- pthread_cond_init(&mi->mi_table_feed_cond, NULL);
+ /* Init input/output structures */
+ pthread_mutex_init(&mi->mi_input_lock, NULL);
+ pthread_cond_init(&mi->mi_input_cond, NULL);
+ TAILQ_INIT(&mi->mi_input_queue);
- /* Init input thread control */
- mi->mi_thread_pipe.rd = mi->mi_thread_pipe.wr = -1;
+ pthread_mutex_init(&mi->mi_output_lock, NULL);
+ pthread_cond_init(&mi->mi_table_cond, NULL);
+ TAILQ_INIT(&mi->mi_table_queue);
/* Add to global list */
LIST_INSERT_HEAD(&mpegts_input_all, mi, mi_global_link);
if (c)
idnode_load(&mi->ti_id, c);
+ /* Start threads */
+ mpegts_input_thread_start(mi);
+
return mi;
}
+void
+mpegts_input_stop_all ( mpegts_input_t *mi )
+{
+ mpegts_mux_instance_t *mmi;
+ while ((mmi = LIST_FIRST(&mi->mi_mux_active)))
+ mmi->mmi_mux->mm_stop(mmi->mmi_mux, 1);
+}
+
void
mpegts_input_delete ( mpegts_input_t *mi, int delconf )
{
mpegts_network_link_t *mnl;
+ mpegts_input_thread_stop(mi);
while ((mnl = LIST_FIRST(&mi->mi_networks)))
mpegts_input_del_network(mnl);
idnode_unlink(&mi->ti_id);
- pthread_mutex_destroy(&mi->mi_delivery_mutex);
- pthread_cond_destroy(&mi->mi_table_feed_cond);
- tvh_pipe_close(&mi->mi_thread_pipe);
+ pthread_mutex_destroy(&mi->mi_output_lock);
+ pthread_cond_destroy(&mi->mi_table_cond);
LIST_REMOVE(mi, ti_link);
LIST_REMOVE(mi, mi_global_link);
free(mi->mi_name);