From: Jaroslav Kysela Date: Fri, 15 Dec 2017 16:32:54 +0000 (+0100) Subject: epggrab: create a new thread to process OTA events X-Git-Url: http://git.ipfire.org/gitweb.cgi?a=commitdiff_plain;h=0e967b35544c7cced158a0e5c61bdd53ffeffeb3;p=thirdparty%2Ftvheadend.git epggrab: create a new thread to process OTA events --- diff --git a/src/epggrab.c b/src/epggrab.c index 2d57b1ad2..ac99587b3 100644 --- a/src/epggrab.c +++ b/src/epggrab.c @@ -39,17 +39,21 @@ #include "cron.h" /* Thread protection */ -static int epggrab_confver; -pthread_mutex_t epggrab_mutex; -static pthread_cond_t epggrab_cond; -int epggrab_running; +static int epggrab_confver; +pthread_mutex_t epggrab_mutex; +static pthread_cond_t epggrab_cond; +static pthread_mutex_t epggrab_data_mutex; +static pthread_cond_t epggrab_data_cond; +int epggrab_running; + +static TAILQ_HEAD(, epggrab_module) epggrab_data_modules; /* Config */ -epggrab_module_list_t epggrab_modules; +epggrab_module_list_t epggrab_modules; -gtimer_t epggrab_save_timer; +gtimer_t epggrab_save_timer; -static cron_multi_t *epggrab_cron_multi; +static cron_multi_t *epggrab_cron_multi; /* ************************************************************************** * Internal Grab Thread @@ -83,7 +87,7 @@ static void _epggrab_module_grab ( epggrab_module_int_t *mod ) /* * Thread (for internal grabbing) */ -static void* _epggrab_internal_thread ( void* p ) +static void *_epggrab_internal_thread( void *aux ) { epggrab_module_t *mod; int err, confver = -1; // force first run @@ -143,6 +147,74 @@ epggrab_rerun_internal(void) pthread_cond_signal(&epggrab_cond); } +/* + * Thread (for data queue processing) + */ +static void *_epggrab_data_thread( void *aux ) +{ + epggrab_module_t *mod; + epggrab_queued_data_t *eq; + + /* Time for other jobs */ + while (atomic_get(&epggrab_running)) { + pthread_mutex_lock(&epggrab_data_mutex); + do { + eq = NULL; + mod = TAILQ_FIRST(&epggrab_data_modules); + if (mod) { + eq = TAILQ_FIRST(&mod->data_queue); + if (eq) { + TAILQ_REMOVE(&mod->data_queue, eq, eq_link); + if (TAILQ_EMPTY(&mod->data_queue)) + TAILQ_REMOVE(&epggrab_data_modules, mod, qlink); + } + } + if (eq == NULL) { + while (atomic_get(&epggrab_running)) + pthread_cond_wait(&epggrab_cond, &epggrab_data_mutex); + } + } while (eq == NULL && atomic_get(&epggrab_running)); + pthread_mutex_unlock(&epggrab_data_mutex); + mod->process_data(mod, eq->eq_data, eq->eq_len); + free(eq); + } + pthread_mutex_lock(&epggrab_data_mutex); + while ((mod = TAILQ_FIRST(&epggrab_data_modules)) != NULL) { + while ((eq = TAILQ_FIRST(&mod->data_queue)) != NULL) { + TAILQ_REMOVE(&mod->data_queue, eq, eq_link); + free(eq); + } + TAILQ_REMOVE(&epggrab_data_modules, mod, qlink); + } + pthread_mutex_unlock(&epggrab_data_mutex); + return NULL; +} + +void epggrab_queue_data(epggrab_module_t *mod, + const void *data1, uint32_t len1, + const void *data2, uint32_t len2) +{ + epggrab_queued_data_t *eq; + + if (!atomic_get(&epggrab_running)) + return; + eq = malloc(sizeof(*eq) + len1 + len2); + if (eq == NULL) + return; + eq->eq_len = len1 + len2; + if (len1) + memcpy(eq->eq_data, data1, len1); + if (len2) + memcpy(eq->eq_data + len1, data2, len2); + pthread_mutex_lock(&epggrab_data_mutex); + TAILQ_INSERT_TAIL(&mod->data_queue, eq, eq_link); + if (TAILQ_EMPTY(&mod->data_queue)) { + pthread_cond_signal(&epggrab_data_cond); + TAILQ_INSERT_TAIL(&epggrab_data_modules, mod, qlink); + } + pthread_mutex_unlock(&epggrab_data_mutex); +} + /* ************************************************************************** * Configuration * *************************************************************************/ @@ -401,6 +473,7 @@ void epggrab_resched ( void ) * Initialise */ pthread_t epggrab_tid; +pthread_t epggrab_data_tid; void epggrab_init ( void ) { @@ -416,6 +489,8 @@ void epggrab_init ( void ) pthread_mutex_init(&epggrab_mutex, NULL); pthread_cond_init(&epggrab_cond, NULL); + TAILQ_INIT(&epggrab_data_modules); + idclass_register(&epggrab_class); idclass_register(&epggrab_mod_class); idclass_register(&epggrab_mod_int_class); @@ -446,9 +521,10 @@ void epggrab_init ( void ) /* Post-init for OTA subsystem */ epggrab_ota_post(); - /* Start internal grab thread */ + /* Start internal and data queue grab thread */ atomic_set(&epggrab_running, 1); tvhthread_create(&epggrab_tid, NULL, _epggrab_internal_thread, NULL, "epggrabi"); + tvhthread_create(&epggrab_data_tid, NULL, _epggrab_data_thread, NULL, "epggrabd"); } /* @@ -460,7 +536,9 @@ void epggrab_done ( void ) atomic_set(&epggrab_running, 0); pthread_cond_signal(&epggrab_cond); + pthread_cond_signal(&epggrab_data_cond); pthread_join(epggrab_tid, NULL); + pthread_join(epggrab_data_tid, NULL); pthread_mutex_lock(&global_lock); while ((mod = LIST_FIRST(&epggrab_modules)) != NULL) { diff --git a/src/epggrab.h b/src/epggrab.h index 7944794a4..d0a5e9679 100644 --- a/src/epggrab.h +++ b/src/epggrab.h @@ -27,6 +27,7 @@ * Typedefs/Forward decls * *************************************************************************/ +typedef struct epggrab_queued_data epggrab_queued_data_t; typedef struct epggrab_module epggrab_module_t; typedef struct epggrab_module_int epggrab_module_int_t; typedef struct epggrab_module_ext epggrab_module_ext_t; @@ -139,6 +140,16 @@ int epggrab_channel_is_ota ( epggrab_channel_t *ec ); * Grabber Modules * *************************************************************************/ +/* + * Data queue + */ +struct epggrab_queued_data +{ + TAILQ_ENTRY(epggrab_queued_data) eq_link; + uint32_t eq_len; + uint8_t eq_data[0]; ///< Data are allocated at the end of structure +}; + /* * Grabber base class */ @@ -146,6 +157,7 @@ struct epggrab_module { idnode_t idnode; LIST_ENTRY(epggrab_module) link; ///< Global list link + TAILQ_ENTRY(epggrab_module) qlink; ///< Queued data link enum { EPGGRAB_OTA, @@ -161,11 +173,16 @@ struct epggrab_module int priority; ///< Priority of the module epggrab_channel_tree_t channels; ///< Channel list + TAILQ_HEAD(, epggrab_queued_data) data_queue; + /* Activate */ - int (*activate) ( void *m, int activate ); + int (*activate)( void *m, int activate ); /* Free */ - void (*done) ( void *m ); + void (*done)( void *m ); + + /* Process queued data */ + void (*process_data)( void *m, void *data, uint32_t len ); }; /* @@ -309,6 +326,13 @@ epggrab_module_t* epggrab_module_find_by_id ( const char *id ); const char * epggrab_module_type(epggrab_module_t *mod); const char * epggrab_module_get_status(epggrab_module_t *mod); +/* + * Data queue + */ +void epggrab_queue_data(epggrab_module_t *mod, + const void *data1, uint32_t len1, + const void *data2, uint32_t len2); + /* ************************************************************************** * Setup/Configuration * *************************************************************************/ diff --git a/src/epggrab/module.c b/src/epggrab/module.c index d3a5a6df4..978e5e4b7 100644 --- a/src/epggrab/module.c +++ b/src/epggrab/module.c @@ -698,12 +698,13 @@ epggrab_module_ota_t *epggrab_module_ota_create id, subsys, saveid, name, priority); /* Setup */ - skel->type = EPGGRAB_OTA; - skel->activate = ops->activate; - skel->start = ops->start; - skel->done = ops->done; - skel->tune = ops->tune; - skel->opaque = ops->opaque; + skel->type = EPGGRAB_OTA; + skel->activate = ops->activate; + skel->start = ops->start; + skel->done = ops->done; + skel->tune = ops->tune; + skel->process_data = ops->process_data; + skel->opaque = ops->opaque; return skel; } diff --git a/src/epggrab/module/eit.c b/src/epggrab/module/eit.c index cdd3d4da6..570f513ab 100644 --- a/src/epggrab/module/eit.c +++ b/src/epggrab/module/eit.c @@ -46,6 +46,14 @@ typedef struct eit_private #define EIT_SPEC_NZ_FREEVIEW 2 #define EIT_SPEC_UK_CABLE_VIRGIN 3 +/* Queued data structure */ +typedef struct eit_data_t +{ + tvh_uuid_t svc_uuid; + int tableid; + int sect; + int local_time; +} eit_data_t; /* Provider configuration */ typedef struct eit_module_t @@ -73,7 +81,9 @@ typedef struct eit_event const char *default_charset; +#if TODO_ADD_EXTRA htsmsg_t *extra; +#endif epg_genre_list_t *genre; @@ -792,12 +802,41 @@ static int _eit_process_event return 12 + (((ptr[10] & 0x0f) << 8) | ptr[11]); } +static void +_eit_process_data(void *m, void *data, uint32_t len) +{ + int save = 0, resched = 0; + eit_data_t ed; + mpegts_service_t *svc; + + assert(len >= sizeof(ed)); + memcpy(&ed, data, sizeof(ed)); + data += sizeof(ed); + len -= sizeof(ed); + svc = (mpegts_service_t *)service_find0(&ed.svc_uuid); + if (svc == NULL) + return; + pthread_mutex_lock(&global_lock); + while (len) { + int r; + if ((r = _eit_process_event(m, ed.tableid, ed.sect, svc, data, len, + ed.local_time, &resched, &save)) < 0) + break; + assert(r > 0); + len -= r; + data += r; + } + pthread_mutex_unlock(&global_lock); + + if (save) epg_updated(); + if (resched) epggrab_resched(); +} static int _eit_callback (mpegts_table_t *mt, const uint8_t *ptr, int len, int tableid) { - int r, sect, last, ver, save, resched, spec; + int r, sect, last, ver, spec; uint8_t seg; uint16_t onid, tsid, sid; uint32_t extraid; @@ -808,6 +847,7 @@ _eit_callback epggrab_ota_mux_t *ota = NULL; mpegts_psi_table_state_t *st; th_subscription_t *ths; + eit_data_t data; char ubuf[UUID_HEX_SIZE]; if (!epggrab_ota_running) @@ -928,25 +968,17 @@ svc_ok: if (svc->s_dvb_ignore_eit) goto done; - /* Process events */ - save = resched = 0; + /* Queue events */ len -= 11; ptr += 11; - while (len) { - int r; - if ((r = _eit_process_event(mod, tableid, sect, svc, ptr, len, - mm->mm_network->mn_localtime, - &resched, &save)) < 0) - break; - assert(r > 0); - len -= r; - ptr += r; + if (len >= 12) { + data.tableid = tableid; + data.sect = sect; + data.svc_uuid = svc->s_id.in_uuid; + data.local_time = mm->mm_network->mn_localtime; + epggrab_queue_data(mod, &data, sizeof(data), ptr, len); } - /* Update EPG */ - if (resched) epggrab_resched(); - if (save) epg_updated(); - done: r = dvb_table_end((mpegts_psi_table_t *)mt, st, sect); complete: @@ -1172,6 +1204,7 @@ static eit_module_t *eit_module_ota_create .start = _eit_start, \ .done = _eit_done, \ .activate = _eit_activate, \ + .process_data = _eit_process_data, \ .tune = _eit_tune, \ .opaque = &opaque_##name, \ } diff --git a/src/epggrab/private.h b/src/epggrab/private.h index 98eeec038..346dca05f 100644 --- a/src/epggrab/private.h +++ b/src/epggrab/private.h @@ -104,6 +104,7 @@ typedef struct epggrab_ota_module_ops { void (*done) (void *m); int (*tune) (epggrab_ota_map_t *map, epggrab_ota_mux_t *om, struct mpegts_mux *mm); + void (*process_data) (void *m, void *data, uint32_t len); void *opaque; } epggrab_ota_module_ops_t; diff --git a/src/service.h b/src/service.h index ed31eb16e..4dd11008f 100644 --- a/src/service.h +++ b/src/service.h @@ -539,6 +539,8 @@ void service_ref(service_t *t); static inline service_t *service_find(const char *identifier) { return idnode_find(identifier, &service_class, NULL); } +static inline service_t *service_find0(tvh_uuid_t *uuid) + { return idnode_find0(uuid, &service_class, NULL); } #define service_find_by_identifier service_find service_instance_t *service_find_instance(struct service *s,