#include "dvb_charset.h"
void
-dvb_init(uint32_t adapter_mask)
+dvb_init(uint32_t adapter_mask, const char *rawfile)
{
dvb_charset_init();
- dvb_adapter_init(adapter_mask);
+ dvb_adapter_init(adapter_mask, rawfile);
}
TAILQ_HEAD(, epggrab_ota_mux) tdmi_epg_grab;
+ struct th_subscription_list tdmi_subscriptions;
+
} th_dvb_mux_instance_t;
int tda_rawmode;
+ // Full mux streaming, protected via the delivery mutex
+
+ streaming_pad_t tda_streaming_pad;
+
struct dvb_table_feed_queue tda_table_feed;
pthread_cond_t tda_table_feed_cond; // Bound to tda_delivery_mutex
extern struct th_dvb_adapter_queue dvb_adapters;
extern struct th_dvb_mux_instance_tree dvb_muxes;
-void dvb_init(uint32_t adapter_mask);
+void dvb_init(uint32_t adapter_mask, const char *rawfile);
/**
* DVB Adapter
*/
-void dvb_adapter_init(uint32_t adapter_mask);
+void dvb_adapter_init(uint32_t adapter_mask, const char *rawfile);
void dvb_adapter_mux_scanner(void *aux);
void dvb_lnb_get_frequencies(const char *id,
int *f_low, int *f_hi, int *f_switch);
+
+/**
+ * Raw demux
+ */
+struct th_subscription;
+struct th_subscription *dvb_subscription_create_from_tdmi(th_dvb_mux_instance_t *tdmi,
+ const char *name,
+ streaming_target_t *st);
+
#endif /* DVB_H_ */
+
TAILQ_INIT(&tda->tda_scan_queues[i]);
TAILQ_INIT(&tda->tda_initial_scan_queue);
TAILQ_INIT(&tda->tda_satconfs);
+ streaming_pad_init(&tda->tda_streaming_pad);
return tda;
}
gtimer_arm(&tda->tda_mux_scanner_timer, dvb_adapter_mux_scanner, tda, 1);
}
+
+/**
+ *
+ */
+static void
+tda_add_from_file(const char *filename)
+{
+ int i, r;
+ th_dvb_adapter_t *tda;
+ char buf[400];
+
+ tda = tda_alloc();
+
+ tda->tda_adapter_num = -1;
+ tda->tda_fe_fd = -1;
+ tda->tda_dvr_pipe[0] = -1;
+
+ tda->tda_type = -1;
+
+ snprintf(buf, sizeof(buf), "%s", filename);
+
+ r = strlen(buf);
+ for(i = 0; i < r; i++)
+ if(!isalnum((int)buf[i]))
+ buf[i] = '_';
+
+ tda->tda_identifier = strdup(buf);
+
+ tda->tda_autodiscovery = 0;
+ tda->tda_idlescan = 0;
+
+ tda->tda_sat = 0;
+
+ /* Come up with an initial displayname, user can change it and it will
+ be overridden by any stored settings later on */
+
+ tda->tda_displayname = strdup(filename);
+
+ TAILQ_INSERT_TAIL(&dvb_adapters, tda, tda_global_link);
+
+ dvb_input_raw_setup(tda);
+}
+
+
+/**
+ *
+ */
void
dvb_adapter_start ( th_dvb_adapter_t *tda )
{
*
*/
void
-dvb_adapter_init(uint32_t adapter_mask)
+dvb_adapter_init(uint32_t adapter_mask, const char *rawfile)
{
htsmsg_t *l, *c;
htsmsg_field_t *f;
if ((1 << i) & adapter_mask)
tda_add(i);
+ if(rawfile)
+ tda_add_from_file(rawfile);
+
+
l = hts_settings_load("dvbadapters");
if(l != NULL) {
HTSMSG_FOREACH(f, l) {
if(service_compute_weight(&tda->tda_transports) > 0)
return;
+ if(tda->tda_mux_current != NULL &&
+ LIST_FIRST(&tda->tda_mux_current->tdmi_subscriptions) != NULL)
+ return; // Someone is doing full mux dump
+
/* Check if we have muxes pending for quickscan, if so, choose them */
if((tdmi = TAILQ_FIRST(&tda->tda_initial_scan_queue)) != NULL) {
dvb_fe_tune(tdmi, "Initial autoscan");
/* sync */
if (tsb[i] == 0x47) {
-
+
+ if(LIST_FIRST(&tda->tda_streaming_pad.sp_targets) != NULL) {
+ streaming_message_t sm;
+ pktbuf_t *pb = pktbuf_alloc(tsb, 188);
+ memset(&sm, 0, sizeof(sm));
+ sm.sm_type = SMT_MPEGTS;
+ sm.sm_data = pb;
+ streaming_pad_deliver(&tda->tda_streaming_pad, &sm);
+ pktbuf_ref_dec(pb);
+ }
+
+
if(!(tsb[i+1] & 0x80)) { // Only dispatch to table parser if not error
int pid = (tsb[i+1] & 0x1f) << 8 | tsb[i+2];
if(tda->tda_table_filter[pid]) {
}
return NULL;
}
+
+
+/**
+ *
+ */
+th_subscription_t *
+dvb_subscription_create_from_tdmi(th_dvb_mux_instance_t *tdmi,
+ const char *name,
+ streaming_target_t *st)
+{
+ th_subscription_t *s;
+ th_dvb_adapter_t *tda = tdmi->tdmi_adapter;
+
+ s = subscription_create(INT32_MAX, name, st, SUBSCRIPTION_RAW_MPEGTS,
+ NULL, NULL, NULL, NULL);
+
+
+ s->ths_tdmi = tdmi;
+ LIST_INSERT_HEAD(&tdmi->tdmi_subscriptions, s, ths_tdmi_link);
+
+ dvb_fe_tune(tdmi, "Full mux subscription");
+
+ pthread_mutex_lock(&tda->tda_delivery_mutex);
+ streaming_target_connect(&tda->tda_streaming_pad, &s->ths_input);
+ pthread_mutex_unlock(&tda->tda_delivery_mutex);
+
+ notify_reload("subscriptions");
+ return s;
+}
if(w && w >= weight && !force_start)
/* We are outranked by weight, cant use it */
return SM_CODE_NOT_FREE;
-
+
+ if(LIST_FIRST(&tdmi->tdmi_subscriptions) != NULL)
+ return SM_CODE_NOT_FREE;
+
dvb_adapter_clean(tda);
}
sigset_t set;
const char *homedir;
const char *rawts_input = NULL;
+ const char *dvb_rawts_input = NULL;
const char *join_transport = NULL;
const char *confpath = NULL;
char *p, *endp;
// make sure the timezone is set
tzset();
- while((c = getopt(argc, argv, "Aa:fp:u:g:c:Chdr:j:sw:e:E:")) != -1) {
+ while((c = getopt(argc, argv, "Aa:fp:u:g:c:Chdr:j:sw:e:E:R:")) != -1) {
switch(c) {
case 'a':
adapter_mask = 0x0;
case 'r':
rawts_input = optarg;
break;
+ case 'R':
+ dvb_rawts_input = optarg;
+ break;
case 'j':
join_transport = optarg;
break;
tcp_server_init();
#if ENABLE_LINUXDVB
- dvb_init(adapter_mask);
+ dvb_init(adapter_mask, dvb_rawts_input);
#endif
iptv_input_init();
#if ENABLE_V4L
#include "htsmsg.h"
#include "notify.h"
#include "atomic.h"
+#include "dvb/dvb.h"
struct th_subscription_list subscriptions;
static gtimer_t subscription_reschedule_timer;
if(t != NULL)
service_remove_subscriber(t, s, SM_CODE_OK);
+ if(s->ths_tdmi != NULL) {
+ LIST_REMOVE(s, ths_tdmi_link);
+ th_dvb_adapter_t *tda = s->ths_tdmi->tdmi_adapter;
+ pthread_mutex_lock(&tda->tda_delivery_mutex);
+ streaming_target_disconnect(&tda->tda_streaming_pad, &s->ths_input);
+ pthread_mutex_unlock(&tda->tda_delivery_mutex);
+ }
+
if(s->ths_start_message != NULL)
streaming_msg_free(s->ths_start_message);
/**
*
*/
-static th_subscription_t *
+th_subscription_t *
subscription_create(int weight, const char *name, streaming_target_t *st,
- int flags, int direct, const char *hostname,
+ int flags, st_callback_t *cb, const char *hostname,
const char *username, const char *client)
{
th_subscription_t *s = calloc(1, sizeof(th_subscription_t));
else
reject |= SMT_TO_MASK(SMT_MPEGTS); // Reject raw mpegts
-
- streaming_target_init(&s->ths_input, direct ? subscription_input_direct :
- subscription_input, s, reject);
+ streaming_target_init(&s->ths_input,
+ cb ?: subscription_input_direct, s, reject);
s->ths_weight = weight;
s->ths_title = strdup(name);
int flags, const char *hostname,
const char *username, const char *client)
{
- th_subscription_t *s = subscription_create(weight, name, st, flags, 0,
- hostname, username, client);
+ th_subscription_t *s;
+
+ s = subscription_create(weight, name, st, flags, subscription_input,
+ hostname, username, client);
s->ths_channel = ch;
LIST_INSERT_HEAD(&ch->ch_subscriptions, s, ths_channel_link);
*/
th_subscription_t *
subscription_create_from_service(service_t *t, const char *name,
- streaming_target_t *st, int flags)
+ streaming_target_t *st, int flags)
{
- th_subscription_t *s = subscription_create(INT32_MAX, name, st, flags, 1,
- NULL, NULL, NULL);
+ th_subscription_t *s;
source_info_t si;
int r;
+ s = subscription_create(INT32_MAX, name, st, flags,
+ subscription_input_direct,
+ NULL, NULL, NULL);
+
if(t->s_status != SERVICE_RUNNING) {
if((r = service_start(t, INT32_MAX, 1)) != 0) {
subscription_unsubscribe(s);
char *ths_username;
char *ths_client;
+ // Ugly ugly ugly to refer DVB code here
+
+ LIST_ENTRY(th_subscription) ths_tdmi_link;
+ struct th_dvb_mux_instance *ths_tdmi;
} th_subscription_t;
streaming_target_t *st,
int flags);
-void subscription_change_weight(th_subscription_t *s, int weight);
+th_subscription_t *subscription_create(int weight, const char *name,
+ streaming_target_t *st,
+ int flags, st_callback_t *cb,
+ const char *hostname,
+ const char *username,
+ const char *client);
-void subscription_stop(th_subscription_t *s);
+void subscription_change_weight(th_subscription_t *s, int weight);
void subscription_unlink_service(th_subscription_t *s, int reason);
#include "plumbing/globalheaders.h"
#include "epg.h"
#include "muxer.h"
+#include "dvb/dvb.h"
+#include "dvb/dvb_support.h"
/**
*
}
+
+/**
+ * HTTP stream loop
+ */
+static void
+http_stream_run2(http_connection_t *hc, streaming_queue_t *sq)
+{
+ streaming_message_t *sm;
+ int run = 1;
+ int timeouts = 0;
+ struct timespec ts;
+ struct timeval tp;
+ int err = 0;
+ socklen_t errlen = sizeof(err);
+
+ /* reduce timeout on write() for streaming */
+ tp.tv_sec = 5;
+ tp.tv_usec = 0;
+ setsockopt(hc->hc_fd, SOL_SOCKET, SO_SNDTIMEO, &tp, sizeof(tp));
+ http_output_content(hc, "application/octet-stream");
+
+ while(run) {
+ pthread_mutex_lock(&sq->sq_mutex);
+ sm = TAILQ_FIRST(&sq->sq_queue);
+ if(sm == NULL) {
+ gettimeofday(&tp, NULL);
+ ts.tv_sec = tp.tv_sec + 1;
+ ts.tv_nsec = tp.tv_usec * 1000;
+
+ if(pthread_cond_timedwait(&sq->sq_cond, &sq->sq_mutex, &ts) == ETIMEDOUT) {
+ timeouts++;
+
+ //Check socket status
+ getsockopt(hc->hc_fd, SOL_SOCKET, SO_ERROR, (char *)&err, &errlen);
+ if(err) {
+ tvhlog(LOG_DEBUG, "webui", "Stop streaming %s, client hung up", hc->hc_url_orig);
+ run = 0;
+ }else if(timeouts >= 20) {
+ tvhlog(LOG_WARNING, "webui", "Stop streaming %s, timeout waiting for packets", hc->hc_url_orig);
+ run = 0;
+ }
+ }
+ pthread_mutex_unlock(&sq->sq_mutex);
+ continue;
+ }
+
+ timeouts = 0; //Reset timeout counter
+ TAILQ_REMOVE(&sq->sq_queue, sm, sm_link);
+ pthread_mutex_unlock(&sq->sq_mutex);
+
+ pktbuf_t *pb;
+
+ switch(sm->sm_type) {
+ case SMT_MPEGTS:
+ pb = sm->sm_data;
+ if(write(hc->hc_fd, pb->pb_data, pb->pb_size) != pb->pb_size) {
+ tvhlog(LOG_DEBUG, "webui", "Write error %s, stopping", hc->hc_url_orig);
+ run = 0;
+ }
+ break;
+ default:
+ break;
+ }
+
+ streaming_msg_free(sm);
+ }
+}
+
+
+
/**
* Output a playlist containing a single channel
*/
}
+/**
+ * Subscribes to a service and starts the streaming loop
+ */
+static int
+http_stream_tdmi(http_connection_t *hc, th_dvb_mux_instance_t *tdmi)
+{
+ th_subscription_t *s;
+ streaming_queue_t sq;
+
+ streaming_queue_init(&sq, SMT_PACKET);
+
+ pthread_mutex_lock(&global_lock);
+ s = dvb_subscription_create_from_tdmi(tdmi, "HTTP", &sq.sq_st);
+ pthread_mutex_unlock(&global_lock);
+
+ http_stream_run2(hc, &sq);
+
+
+ pthread_mutex_lock(&global_lock);
+ subscription_unsubscribe(s);
+ pthread_mutex_unlock(&global_lock);
+
+ streaming_queue_deinit(&sq);
+
+ return 0;
+}
+
+
/**
* Subscribes to a channel and starts the streaming loop
*/
* Handle the http request. http://tvheadend/stream/channelid/<chid>
* http://tvheadend/stream/channel/<chname>
* http://tvheadend/stream/service/<servicename>
+ * http://tvheadend/stream/mux/<muxid>
*/
static int
http_stream(http_connection_t *hc, const char *remain, void *opaque)
char *components[2];
channel_t *ch = NULL;
service_t *service = NULL;
+ th_dvb_mux_instance_t *tdmi = NULL;
hc->hc_keep_alive = 0;
ch = channel_find_by_name(components[1], 0, 0);
} else if(!strcmp(components[0], "service")) {
service = service_find_by_identifier(components[1]);
+ } else if(!strcmp(components[0], "mux")) {
+ tdmi = dvb_mux_find_by_identifier(components[1]);
}
+ // bug here: We can't retain pointers to channels etc outside global_lock
pthread_mutex_unlock(&global_lock);
if(ch != NULL) {
return http_stream_channel(hc, ch);
} else if(service != NULL) {
return http_stream_service(hc, service);
+ } else if(tdmi != NULL) {
+ return http_stream_tdmi(hc, tdmi);
} else {
http_error(hc, HTTP_STATUS_BAD_REQUEST);
return HTTP_STATUS_BAD_REQUEST;