#define CAPMT_DESC_DEMUX 0x82
#define CAPMT_DESC_PID 0x84
+// message type
+#define CAPMT_MSG_FAST 0x01
+#define CAPMT_MSG_CLEAR 0x02
+
// limits
#define MAX_CA 16
#define MAX_INDEX 64
*
*/
static void
-capmt_queue_msg(capmt_t *capmt, int sid, const uint8_t *buf, size_t len)
+capmt_queue_msg
+ (capmt_t *capmt, int sid, const uint8_t *buf, size_t len, int flags)
{
- capmt_message_t *msg = malloc(sizeof(*msg));
+ capmt_message_t *msg;
+ if (flags & CAPMT_MSG_CLEAR) {
+ while ((msg = TAILQ_FIRST(&capmt->capmt_writeq)) != NULL) {
+ TAILQ_REMOVE(&capmt->capmt_writeq, msg, cm_link);
+ sbuf_free(&msg->cm_sb);
+ free(msg);
+ }
+ }
+ msg = malloc(sizeof(*msg));
sbuf_init_fixed(&msg->cm_sb, len);
sbuf_append(&msg->cm_sb, buf, len);
msg->cm_sid = sid;
- pthread_mutex_lock(&capmt->capmt_mutex);
- TAILQ_INSERT_TAIL(&capmt->capmt_writeq, msg, cm_link);
- pthread_mutex_unlock(&capmt->capmt_mutex);
+ if (flags & CAPMT_MSG_FAST)
+ TAILQ_INSERT_HEAD(&capmt->capmt_writeq, msg, cm_link);
+ else
+ TAILQ_INSERT_TAIL(&capmt->capmt_writeq, msg, cm_link);
tvh_write(capmt->capmt_pipe.wr, "c", 1);
}
buf[10] = ((pos - 5 - 12) & 0xF00) >> 8;
buf[11] = ((pos - 5 - 12) & 0xFF);
- capmt_queue_msg(t->ct_capmt, s->s_dvb_service_id, buf, pos);
+ capmt_queue_msg(t->ct_capmt, s->s_dvb_service_id,
+ buf, pos, CAPMT_MSG_CLEAR);
}
}
mpegts_service_t *s = (mpegts_service_t *)ct->td_service;
int oscam_new = capmt_oscam_new(ct->ct_capmt);
capmt_caid_ecm_t *cce;
+ capmt_t *capmt = ct->ct_capmt;
tvhlog(LOG_INFO, "capmt",
"Removing CAPMT Server from service \"%s\" on adapter %d",
s->s_dvb_svcname, ct->ct_adapter);
+ pthread_mutex_lock(&capmt->capmt_mutex);
+
/* send stop to client */
if (!oscam_new)
capmt_send_stop(ct);
LIST_REMOVE(ct, ct_link);
if (oscam_new)
- capmt_enumerate_services(ct->ct_capmt, 1);
+ capmt_enumerate_services(capmt, 1);
- if (LIST_EMPTY(&ct->ct_capmt->capmt_services)) {
- ct->ct_capmt->capmt_adapters[ct->ct_adapter].ca_tuner = NULL;
- memset(&ct->ct_capmt->capmt_demuxes, 0, sizeof(ct->ct_capmt->capmt_demuxes));
+ if (LIST_EMPTY(&capmt->capmt_services)) {
+ capmt->capmt_adapters[ct->ct_adapter].ca_tuner = NULL;
+ memset(&capmt->capmt_demuxes, 0, sizeof(capmt->capmt_demuxes));
}
+ pthread_mutex_unlock(&capmt->capmt_mutex);
+
tvhcsa_destroy(&ct->ct_csa);
free(ct);
}
static void
capmt_filter_data(capmt_t *capmt, uint8_t adapter, uint8_t demux_index,
- uint8_t filter_index, const uint8_t *data, int len)
+ uint8_t filter_index, const uint8_t *data, int len,
+ int flags)
{
uint8_t *buf = alloca(len + 6);
buf[5] = filter_index;
memcpy(buf + 6, data, len);
if (len - 3 == ((((uint16_t)buf[7] << 8) | buf[8]) & 0xfff))
- capmt_queue_msg(capmt, 0, buf, len + 6);
+ capmt_queue_msg(capmt, 0, buf, len + 6, flags);
}
static void
dmx_filter_params_t *filter;
dmx_filter_params_t *params = (dmx_filter_params_t *)sbuf_peek(sb, offset + 6);
capmt_filters_t *cf;
+ capmt_service_t *ct;
+ mpegts_service_t *t;
+ elementary_stream_t *st;
+ caid_t *c;
tvhtrace("capmt", "setting filter: adapter=%d, demux=%d, filter=%d, pid=%d",
adapter, demux_index, filter_index, pid);
cf = &capmt->capmt_demuxes.filters[demux_index];
if (cf->max && cf->adapter != adapter)
return;
+ pthread_mutex_lock(&capmt->capmt_mutex);
cf->adapter = adapter;
filter = &cf->dmx[filter_index];
filter->pid = pid;
capmt_pid_add(capmt, adapter, pid);
memcpy(&filter->filter, ¶ms->filter, sizeof(params->filter));
- filter->timeout = filter->flags = 0;
+ filter->timeout = 0;
+ filter->flags = 0;
+ /* ECM messages have the higher priority */
+ LIST_FOREACH(ct, &capmt->capmt_services, ct_link) {
+ t = (mpegts_service_t *)ct->td_service;
+ pthread_mutex_lock(&t->s_stream_mutex);
+ TAILQ_FOREACH(st, &t->s_components, es_link) {
+ LIST_FOREACH(c, &st->es_caids, link) {
+ if (c->pid == pid) {
+ filter->flags = CAPMT_MSG_FAST;
+ break;
+ }
+ }
+ if (c) break;
+ }
+ pthread_mutex_unlock(&t->s_stream_mutex);
+ if (st) break;
+ }
+ /* Update the max values */
if (capmt->capmt_demuxes.max <= demux_index)
capmt->capmt_demuxes.max = demux_index + 1;
if (cf->max <= filter_index)
memmove(filter->filter.filter + 3, filter->filter.filter + 1, DMX_FILTER_SIZE - 3);
memmove(filter->filter.mode + 3, filter->filter.mode + 1, DMX_FILTER_SIZE - 3);
filter->filter.mask[1] = filter->filter.mask[2] = 0;
+ pthread_mutex_unlock(&capmt->capmt_mutex);
}
static void
filter = &cf->dmx[filter_index];
if (filter->pid != pid)
return;
+ pthread_mutex_lock(&capmt->capmt_mutex);
memset(filter, 0, sizeof(*filter));
capmt_pid_remove(capmt, adapter, pid);
/* short the max values */
while (demux_index != 255 && capmt->capmt_demuxes.filters[demux_index].max == 0)
demux_index--;
capmt->capmt_demuxes.max = demux_index == 255 ? 0 : demux_index + 1;
+ pthread_mutex_unlock(&capmt->capmt_mutex);
}
static void
capmt_notify_server(capmt_t *capmt, capmt_service_t *ct)
{
+ pthread_mutex_lock(&capmt->capmt_mutex);
if (capmt_oscam_new(capmt)) {
if (!LIST_EMPTY(&capmt->capmt_services))
capmt_enumerate_services(capmt, 0);
LIST_FOREACH(ct, &capmt->capmt_services, ct_link)
capmt_send_request(ct, CAPMT_LIST_ONLY);
}
+ pthread_mutex_unlock(&capmt->capmt_mutex);
}
static void
mpegts_service_t *t;
capmt_service_t *ct;
- pthread_mutex_lock(&global_lock);
+ pthread_mutex_lock(&capmt->capmt_mutex);
LIST_FOREACH(ct, &capmt->capmt_services, ct_link) {
t = (mpegts_service_t *)ct->td_service;
ct->td_keystate = keystate;
}
}
- pthread_mutex_unlock(&global_lock);
+ pthread_mutex_unlock(&capmt->capmt_mutex);
}
static void
capmt_service_t *ct;
unsigned int i;
- pthread_mutex_lock(&global_lock);
+ pthread_mutex_lock(&capmt->capmt_mutex);
LIST_FOREACH(ct, &capmt->capmt_services, ct_link) {
t = (mpegts_service_t *)ct->td_service;
ct->td_keystate = DS_RESOLVED;
}
- pthread_mutex_unlock(&global_lock);
+ pthread_mutex_unlock(&capmt->capmt_mutex);
}
static int
for (i = 0; i < MAX_CA; i++)
sbuf_init(&buffer[i]);
- pthread_mutex_lock(&global_lock);
capmt_notify_server(capmt, NULL);
- pthread_mutex_unlock(&global_lock);
capmt->capmt_poll = tvhpoll_create(MAX_CA + 1);
capmt_poll_add(capmt, capmt->capmt_pipe.rd, 0);
reconnect = capmt->capmt_sock_reconnect[0];
sbuf_init(&buffer);
- pthread_mutex_lock(&global_lock);
capmt_notify_server(capmt, NULL);
- pthread_mutex_unlock(&global_lock);
capmt->capmt_poll = tvhpoll_create(2);
capmt_poll_add(capmt, capmt->capmt_pipe.rd, 0);
show_connection(capmt, ".so wrapper");
- pthread_mutex_lock(&global_lock);
capmt_notify_server(capmt, NULL);
- pthread_mutex_unlock(&global_lock);
while (capmt->capmt_running) {
/* Validate */
if (data == NULL || len > 4096) return;
+ pthread_mutex_lock(&capmt->capmt_mutex);
+
for (demux_index = 0; demux_index < capmt->capmt_demuxes.max; demux_index++) {
cf = &capmt->capmt_demuxes.filters[demux_index];
if (cf->adapter != o->adapter)
if (i >= DMX_FILTER_SIZE && i <= len) {
capmt_filter_data(capmt,
o->adapter, demux_index,
- filter_index, data, len);
+ filter_index, data, len,
+ cf->dmx[filter_index].flags);
}
}
}
+ pthread_mutex_unlock(&capmt->capmt_mutex);
}
static void
{
capmt_service_t *ct = (capmt_service_t *)td;
capmt_t *capmt = ct->ct_capmt;
- mpegts_service_t *t = (mpegts_service_t*)td->td_service;
+ mpegts_service_t *t;
elementary_stream_t *st;
capmt_caid_ecm_t *cce;
caid_t *c;
int change = 0;
+ pthread_mutex_lock(&capmt->capmt_mutex);
+
+ t = (mpegts_service_t*)td->td_service;
TAILQ_FOREACH(st, &t->s_components, es_link) {
LIST_FOREACH(c, &st->es_caids, link) {
/* search ecmpid in list */
if (change)
capmt_notify_server(capmt, ct);
+
+ pthread_mutex_unlock(&capmt->capmt_mutex);
}
static void
buf[9] = pmtversion;
pmtversion = (pmtversion + 1) & 0x1F;
- capmt_queue_msg(capmt, sid, buf, pos);
+ capmt_queue_msg(capmt, sid, buf, pos, 0);
}
static void
sb->sb_ptr = 0; // clear
}
+static void
+mpegts_input_table_dispatch ( mpegts_mux_t *mm, const uint8_t *tsb )
+{
+ int i = 0;
+ int len = mm->mm_num_tables;
+ uint16_t pid = ((tsb[1] & 0x1f) << 8) | tsb[2];
+ uint8_t cc = (tsb[3] & 0x0f);
+ mpegts_table_t *mt, *vec[len];
+
+ /* Collate - tables may be removed during callbacks */
+ LIST_FOREACH(mt, &mm->mm_tables, mt_link) {
+ mpegts_table_grab(mt);
+ vec[i++] = mt;
+ }
+ assert(i == len);
+
+ /* Process */
+ for (i = 0; i < len; i++) {
+ mt = vec[i];
+ if (!mt->mt_destroyed && mt->mt_pid == pid) {
+ if (tsb[3] & 0x10) {
+ int ccerr = 0;
+ if (mt->mt_cc != -1 && mt->mt_cc != cc) {
+ ccerr = 1;
+ /* Ignore dupes (shouldn't have payload set, but some seem to) */
+ //if (((mt->mt_cc + 15) & 0xf) != cc)
+ tvhdebug("psi", "PID %04X CC error %d != %d", pid, cc, mt->mt_cc);
+ }
+ mt->mt_cc = (cc + 1) & 0xF;
+ mpegts_psi_section_reassemble(&mt->mt_sect, tsb, 0, ccerr,
+ mpegts_table_dispatch, mt);
+ }
+ }
+ mpegts_table_release(mt);
+ }
+}
+
static void
mpegts_input_process
( mpegts_input_t *mi, mpegts_packet_t *mp )
// wrong for a brief period of time if the registrations on
// the PID change
if (mp != last_mp) {
- if (pid == 0)
- stream = table = 1;
- else {
+ if (pid == 0) {
+ stream = MPS_STREAM;
+ table = MPS_TABLE;
+ } else {
stream = table = 0;
/* Determine PID type */
RB_FOREACH(mps, &mp->mp_subs, mps_link) {
- if (mps->mps_type & MPS_STREAM)
- stream = 1;
- if (mps->mps_type & MPS_TABLE)
- table = 1;
- if (table && stream) break;
+ stream |= mps->mps_type & MPS_STREAM;
+ table |= mps->mps_type & (MPS_TABLE | MPS_FTABLE);
+ if (table == (MPS_TABLE|MPS_FTABLE) && stream) break;
}
/* Special case streams */
LIST_FOREACH(s, &mi->mi_transports, s_active_link) {
if (((mpegts_service_t*)s)->s_dvb_mux != mmi->mmi_mux) continue;
- if (pid == s->s_pmt_pid) stream = 1;
- else if (pid == s->s_pcr_pid) stream = 1;
+ if (pid == s->s_pmt_pid) stream = MPS_STREAM;
+ else if (pid == s->s_pcr_pid) stream = MPS_STREAM;
}
}
}
/* Table data */
if (table) {
if (!(tsb[i+1] & 0x80)) {
- // TODO: might be able to optimise this a bit by having slightly
- // larger buffering and trying to aggregate data (if we get
- // same PID multiple times in the loop)
- mpegts_table_feed_t *mtf = malloc(sizeof(mpegts_table_feed_t));
- memcpy(mtf->mtf_tsb, tsb+i, 188);
- mtf->mtf_mux = mm;
- TAILQ_INSERT_TAIL(&mi->mi_table_queue, mtf, mtf_link);
- table_wakeup = 1;
+ if (table & MPS_FTABLE)
+ mpegts_input_table_dispatch(mm, tsb+i);
+ if (table & MPS_TABLE) {
+ // TODO: might be able to optimise this a bit by having slightly
+ // larger buffering and trying to aggregate data (if we get
+ // same PID multiple times in the loop)
+ mpegts_table_feed_t *mtf = malloc(sizeof(mpegts_table_feed_t));
+ memcpy(mtf->mtf_tsb, tsb+i, 188);
+ mtf->mtf_mux = mm;
+ TAILQ_INSERT_TAIL(&mi->mi_table_queue, mtf, mtf_link);
+ table_wakeup = 1;
+ }
} else {
//tvhdebug("tsdemux", "%s - SI packet had errors", name);
}
return NULL;
}
-static void
-mpegts_input_table_dispatch ( mpegts_mux_t *mm, mpegts_table_feed_t *mtf )
-{
- int i = 0;
- int len = mm->mm_num_tables;
- uint16_t pid = ((mtf->mtf_tsb[1] & 0x1f) << 8) | mtf->mtf_tsb[2];
- uint8_t cc = (mtf->mtf_tsb[3] & 0x0f);
- mpegts_table_t *mt, *vec[len];
-
- /* Collate - tables may be removed during callbacks */
- LIST_FOREACH(mt, &mm->mm_tables, mt_link) {
- mpegts_table_grab(mt);
- vec[i++] = mt;
- }
- assert(i == len);
-
- /* Process */
- for (i = 0; i < len; i++) {
- mt = vec[i];
- if (!mt->mt_destroyed && mt->mt_pid == pid) {
- if (mtf->mtf_tsb[3] & 0x10) {
- int ccerr = 0;
- if (mt->mt_cc != -1 && mt->mt_cc != cc) {
- ccerr = 1;
- /* Ignore dupes (shouldn't have payload set, but some seem to) */
- //if (((mt->mt_cc + 15) & 0xf) != cc)
- tvhdebug("psi", "PID %04X CC error %d != %d", pid, cc, mt->mt_cc);
- }
- mt->mt_cc = (cc + 1) & 0xF;
- mpegts_psi_section_reassemble(&mt->mt_sect, mtf->mtf_tsb, 0, ccerr,
- mpegts_table_dispatch, mt);
- }
- }
- mpegts_table_release(mt);
- }
-}
-
static void *
mpegts_input_table_thread ( void *aux )
{
/* Process */
if (mtf->mtf_mux) {
pthread_mutex_lock(&global_lock);
- mpegts_input_table_dispatch(mtf->mtf_mux, mtf);
+ mpegts_input_table_dispatch(mtf->mtf_mux, mtf->mtf_tsb);
pthread_mutex_unlock(&global_lock);
}