#include "cron.h"
/* Thread protection */
-static int epggrab_confver;
-pthread_mutex_t epggrab_mutex;
-static pthread_cond_t epggrab_cond;
-int epggrab_running;
+static int epggrab_confver;
+pthread_mutex_t epggrab_mutex;
+static pthread_cond_t epggrab_cond;
+static pthread_mutex_t epggrab_data_mutex;
+static pthread_cond_t epggrab_data_cond;
+int epggrab_running;
+
+static TAILQ_HEAD(, epggrab_module) epggrab_data_modules;
/* Config */
-epggrab_module_list_t epggrab_modules;
+epggrab_module_list_t epggrab_modules;
-gtimer_t epggrab_save_timer;
+gtimer_t epggrab_save_timer;
-static cron_multi_t *epggrab_cron_multi;
+static cron_multi_t *epggrab_cron_multi;
/* **************************************************************************
* Internal Grab Thread
/*
* Thread (for internal grabbing)
*/
-static void* _epggrab_internal_thread ( void* p )
+static void *_epggrab_internal_thread( void *aux )
{
epggrab_module_t *mod;
int err, confver = -1; // force first run
pthread_cond_signal(&epggrab_cond);
}
+/*
+ * Thread (for data queue processing)
+ */
+static void *_epggrab_data_thread( void *aux )
+{
+ epggrab_module_t *mod;
+ epggrab_queued_data_t *eq;
+
+ /* Time for other jobs */
+ while (atomic_get(&epggrab_running)) {
+ pthread_mutex_lock(&epggrab_data_mutex);
+ do {
+ eq = NULL;
+ mod = TAILQ_FIRST(&epggrab_data_modules);
+ if (mod) {
+ eq = TAILQ_FIRST(&mod->data_queue);
+ if (eq) {
+ TAILQ_REMOVE(&mod->data_queue, eq, eq_link);
+ if (TAILQ_EMPTY(&mod->data_queue))
+ TAILQ_REMOVE(&epggrab_data_modules, mod, qlink);
+ }
+ }
+ if (eq == NULL) {
+ while (atomic_get(&epggrab_running))
+ pthread_cond_wait(&epggrab_cond, &epggrab_data_mutex);
+ }
+ } while (eq == NULL && atomic_get(&epggrab_running));
+ pthread_mutex_unlock(&epggrab_data_mutex);
+ mod->process_data(mod, eq->eq_data, eq->eq_len);
+ free(eq);
+ }
+ pthread_mutex_lock(&epggrab_data_mutex);
+ while ((mod = TAILQ_FIRST(&epggrab_data_modules)) != NULL) {
+ while ((eq = TAILQ_FIRST(&mod->data_queue)) != NULL) {
+ TAILQ_REMOVE(&mod->data_queue, eq, eq_link);
+ free(eq);
+ }
+ TAILQ_REMOVE(&epggrab_data_modules, mod, qlink);
+ }
+ pthread_mutex_unlock(&epggrab_data_mutex);
+ return NULL;
+}
+
+void epggrab_queue_data(epggrab_module_t *mod,
+ const void *data1, uint32_t len1,
+ const void *data2, uint32_t len2)
+{
+ epggrab_queued_data_t *eq;
+
+ if (!atomic_get(&epggrab_running))
+ return;
+ eq = malloc(sizeof(*eq) + len1 + len2);
+ if (eq == NULL)
+ return;
+ eq->eq_len = len1 + len2;
+ if (len1)
+ memcpy(eq->eq_data, data1, len1);
+ if (len2)
+ memcpy(eq->eq_data + len1, data2, len2);
+ pthread_mutex_lock(&epggrab_data_mutex);
+ TAILQ_INSERT_TAIL(&mod->data_queue, eq, eq_link);
+ if (TAILQ_EMPTY(&mod->data_queue)) {
+ pthread_cond_signal(&epggrab_data_cond);
+ TAILQ_INSERT_TAIL(&epggrab_data_modules, mod, qlink);
+ }
+ pthread_mutex_unlock(&epggrab_data_mutex);
+}
+
/* **************************************************************************
* Configuration
* *************************************************************************/
* Initialise
*/
pthread_t epggrab_tid;
+pthread_t epggrab_data_tid;
void epggrab_init ( void )
{
pthread_mutex_init(&epggrab_mutex, NULL);
pthread_cond_init(&epggrab_cond, NULL);
+ TAILQ_INIT(&epggrab_data_modules);
+
idclass_register(&epggrab_class);
idclass_register(&epggrab_mod_class);
idclass_register(&epggrab_mod_int_class);
/* Post-init for OTA subsystem */
epggrab_ota_post();
- /* Start internal grab thread */
+ /* Start internal and data queue grab thread */
atomic_set(&epggrab_running, 1);
tvhthread_create(&epggrab_tid, NULL, _epggrab_internal_thread, NULL, "epggrabi");
+ tvhthread_create(&epggrab_data_tid, NULL, _epggrab_data_thread, NULL, "epggrabd");
}
/*
atomic_set(&epggrab_running, 0);
pthread_cond_signal(&epggrab_cond);
+ pthread_cond_signal(&epggrab_data_cond);
pthread_join(epggrab_tid, NULL);
+ pthread_join(epggrab_data_tid, NULL);
pthread_mutex_lock(&global_lock);
while ((mod = LIST_FIRST(&epggrab_modules)) != NULL) {
* Typedefs/Forward decls
* *************************************************************************/
+typedef struct epggrab_queued_data epggrab_queued_data_t;
typedef struct epggrab_module epggrab_module_t;
typedef struct epggrab_module_int epggrab_module_int_t;
typedef struct epggrab_module_ext epggrab_module_ext_t;
* Grabber Modules
* *************************************************************************/
+/*
+ * Data queue
+ */
+struct epggrab_queued_data
+{
+ TAILQ_ENTRY(epggrab_queued_data) eq_link;
+ uint32_t eq_len;
+ uint8_t eq_data[0]; ///< Data are allocated at the end of structure
+};
+
/*
* Grabber base class
*/
{
idnode_t idnode;
LIST_ENTRY(epggrab_module) link; ///< Global list link
+ TAILQ_ENTRY(epggrab_module) qlink; ///< Queued data link
enum {
EPGGRAB_OTA,
int priority; ///< Priority of the module
epggrab_channel_tree_t channels; ///< Channel list
+ TAILQ_HEAD(, epggrab_queued_data) data_queue;
+
/* Activate */
- int (*activate) ( void *m, int activate );
+ int (*activate)( void *m, int activate );
/* Free */
- void (*done) ( void *m );
+ void (*done)( void *m );
+
+ /* Process queued data */
+ void (*process_data)( void *m, void *data, uint32_t len );
};
/*
const char * epggrab_module_type(epggrab_module_t *mod);
const char * epggrab_module_get_status(epggrab_module_t *mod);
+/*
+ * Data queue
+ */
+void epggrab_queue_data(epggrab_module_t *mod,
+ const void *data1, uint32_t len1,
+ const void *data2, uint32_t len2);
+
/* **************************************************************************
* Setup/Configuration
* *************************************************************************/
id, subsys, saveid, name, priority);
/* Setup */
- skel->type = EPGGRAB_OTA;
- skel->activate = ops->activate;
- skel->start = ops->start;
- skel->done = ops->done;
- skel->tune = ops->tune;
- skel->opaque = ops->opaque;
+ skel->type = EPGGRAB_OTA;
+ skel->activate = ops->activate;
+ skel->start = ops->start;
+ skel->done = ops->done;
+ skel->tune = ops->tune;
+ skel->process_data = ops->process_data;
+ skel->opaque = ops->opaque;
return skel;
}
#define EIT_SPEC_NZ_FREEVIEW 2
#define EIT_SPEC_UK_CABLE_VIRGIN 3
+/* Queued data structure */
+typedef struct eit_data_t
+{
+ tvh_uuid_t svc_uuid;
+ int tableid;
+ int sect;
+ int local_time;
+} eit_data_t;
/* Provider configuration */
typedef struct eit_module_t
const char *default_charset;
+#if TODO_ADD_EXTRA
htsmsg_t *extra;
+#endif
epg_genre_list_t *genre;
return 12 + (((ptr[10] & 0x0f) << 8) | ptr[11]);
}
+static void
+_eit_process_data(void *m, void *data, uint32_t len)
+{
+ int save = 0, resched = 0;
+ eit_data_t ed;
+ mpegts_service_t *svc;
+
+ assert(len >= sizeof(ed));
+ memcpy(&ed, data, sizeof(ed));
+ data += sizeof(ed);
+ len -= sizeof(ed);
+ svc = (mpegts_service_t *)service_find0(&ed.svc_uuid);
+ if (svc == NULL)
+ return;
+ pthread_mutex_lock(&global_lock);
+ while (len) {
+ int r;
+ if ((r = _eit_process_event(m, ed.tableid, ed.sect, svc, data, len,
+ ed.local_time, &resched, &save)) < 0)
+ break;
+ assert(r > 0);
+ len -= r;
+ data += r;
+ }
+ pthread_mutex_unlock(&global_lock);
+
+ if (save) epg_updated();
+ if (resched) epggrab_resched();
+}
static int
_eit_callback
(mpegts_table_t *mt, const uint8_t *ptr, int len, int tableid)
{
- int r, sect, last, ver, save, resched, spec;
+ int r, sect, last, ver, spec;
uint8_t seg;
uint16_t onid, tsid, sid;
uint32_t extraid;
epggrab_ota_mux_t *ota = NULL;
mpegts_psi_table_state_t *st;
th_subscription_t *ths;
+ eit_data_t data;
char ubuf[UUID_HEX_SIZE];
if (!epggrab_ota_running)
if (svc->s_dvb_ignore_eit)
goto done;
- /* Process events */
- save = resched = 0;
+ /* Queue events */
len -= 11;
ptr += 11;
- while (len) {
- int r;
- if ((r = _eit_process_event(mod, tableid, sect, svc, ptr, len,
- mm->mm_network->mn_localtime,
- &resched, &save)) < 0)
- break;
- assert(r > 0);
- len -= r;
- ptr += r;
+ if (len >= 12) {
+ data.tableid = tableid;
+ data.sect = sect;
+ data.svc_uuid = svc->s_id.in_uuid;
+ data.local_time = mm->mm_network->mn_localtime;
+ epggrab_queue_data(mod, &data, sizeof(data), ptr, len);
}
- /* Update EPG */
- if (resched) epggrab_resched();
- if (save) epg_updated();
-
done:
r = dvb_table_end((mpegts_psi_table_t *)mt, st, sect);
complete:
.start = _eit_start, \
.done = _eit_done, \
.activate = _eit_activate, \
+ .process_data = _eit_process_data, \
.tune = _eit_tune, \
.opaque = &opaque_##name, \
}
void (*done) (void *m);
int (*tune) (epggrab_ota_map_t *map, epggrab_ota_mux_t *om,
struct mpegts_mux *mm);
+ void (*process_data) (void *m, void *data, uint32_t len);
void *opaque;
} epggrab_ota_module_ops_t;
static inline service_t *service_find(const char *identifier)
{ return idnode_find(identifier, &service_class, NULL); }
+static inline service_t *service_find0(tvh_uuid_t *uuid)
+ { return idnode_find0(uuid, &service_class, NULL); }
#define service_find_by_identifier service_find
service_instance_t *service_find_instance(struct service *s,