]> git.ipfire.org Git - thirdparty/tvheadend.git/commitdiff
epggrab: create a new thread to process OTA events
authorJaroslav Kysela <perex@perex.cz>
Fri, 15 Dec 2017 16:32:54 +0000 (17:32 +0100)
committerJaroslav Kysela <perex@perex.cz>
Mon, 18 Dec 2017 08:39:10 +0000 (09:39 +0100)
src/epggrab.c
src/epggrab.h
src/epggrab/module.c
src/epggrab/module/eit.c
src/epggrab/private.h
src/service.h

index 2d57b1ad2a4036a32a47bdb080dc2b519773bb58..ac99587b3ff74748d050d80958991d2c3a369da0 100644 (file)
 #include "cron.h"
 
 /* Thread protection */
-static int            epggrab_confver;
-pthread_mutex_t       epggrab_mutex;
-static pthread_cond_t epggrab_cond;
-int                   epggrab_running;
+static int             epggrab_confver;
+pthread_mutex_t        epggrab_mutex;
+static pthread_cond_t  epggrab_cond;
+static pthread_mutex_t epggrab_data_mutex;
+static pthread_cond_t  epggrab_data_cond;
+int                    epggrab_running;
+
+static TAILQ_HEAD(, epggrab_module) epggrab_data_modules;
 
 /* Config */
-epggrab_module_list_t epggrab_modules;
+epggrab_module_list_t  epggrab_modules;
 
-gtimer_t              epggrab_save_timer;
+gtimer_t               epggrab_save_timer;
 
-static cron_multi_t  *epggrab_cron_multi;
+static cron_multi_t   *epggrab_cron_multi;
 
 /* **************************************************************************
  * Internal Grab Thread
@@ -83,7 +87,7 @@ static void _epggrab_module_grab ( epggrab_module_int_t *mod )
 /*
  * Thread (for internal grabbing)
  */
-static void* _epggrab_internal_thread ( void* p )
+static void *_epggrab_internal_thread( void *aux )
 {
   epggrab_module_t *mod;
   int err, confver = -1; // force first run
@@ -143,6 +147,74 @@ epggrab_rerun_internal(void)
   pthread_cond_signal(&epggrab_cond);
 }
 
+/*
+ * Thread (for data queue processing)
+ */
+static void *_epggrab_data_thread( void *aux )
+{
+  epggrab_module_t *mod;
+  epggrab_queued_data_t *eq;
+
+  /* Time for other jobs */
+  while (atomic_get(&epggrab_running)) {
+    pthread_mutex_lock(&epggrab_data_mutex);
+    do {
+      eq = NULL;
+      mod = TAILQ_FIRST(&epggrab_data_modules);
+      if (mod) {
+        eq = TAILQ_FIRST(&mod->data_queue);
+        if (eq) {
+          TAILQ_REMOVE(&mod->data_queue, eq, eq_link);
+          if (TAILQ_EMPTY(&mod->data_queue))
+            TAILQ_REMOVE(&epggrab_data_modules, mod, qlink);
+        }
+      }
+      if (eq == NULL) {
+        while (atomic_get(&epggrab_running))
+          pthread_cond_wait(&epggrab_cond, &epggrab_data_mutex);
+      }
+    } while (eq == NULL && atomic_get(&epggrab_running));
+    pthread_mutex_unlock(&epggrab_data_mutex);
+    mod->process_data(mod, eq->eq_data, eq->eq_len);
+    free(eq);
+  }
+  pthread_mutex_lock(&epggrab_data_mutex);
+  while ((mod = TAILQ_FIRST(&epggrab_data_modules)) != NULL) {
+    while ((eq = TAILQ_FIRST(&mod->data_queue)) != NULL) {
+      TAILQ_REMOVE(&mod->data_queue, eq, eq_link);
+      free(eq);
+    }
+    TAILQ_REMOVE(&epggrab_data_modules, mod, qlink);
+  }
+  pthread_mutex_unlock(&epggrab_data_mutex);
+  return NULL;
+}
+
+void epggrab_queue_data(epggrab_module_t *mod,
+                        const void *data1, uint32_t len1,
+                        const void *data2, uint32_t len2)
+{
+  epggrab_queued_data_t *eq;
+
+  if (!atomic_get(&epggrab_running))
+    return;
+  eq = malloc(sizeof(*eq) + len1 + len2);
+  if (eq == NULL)
+    return;
+  eq->eq_len = len1 + len2;
+  if (len1)
+    memcpy(eq->eq_data, data1, len1);
+  if (len2)
+    memcpy(eq->eq_data + len1, data2, len2);
+  pthread_mutex_lock(&epggrab_data_mutex);
+  TAILQ_INSERT_TAIL(&mod->data_queue, eq, eq_link);
+  if (TAILQ_EMPTY(&mod->data_queue)) {
+    pthread_cond_signal(&epggrab_data_cond);
+    TAILQ_INSERT_TAIL(&epggrab_data_modules, mod, qlink);
+  }
+  pthread_mutex_unlock(&epggrab_data_mutex);
+}
+
 /* **************************************************************************
  * Configuration
  * *************************************************************************/
@@ -401,6 +473,7 @@ void epggrab_resched ( void )
  * Initialise
  */
 pthread_t      epggrab_tid;
+pthread_t      epggrab_data_tid;
 
 void epggrab_init ( void )
 {
@@ -416,6 +489,8 @@ void epggrab_init ( void )
   pthread_mutex_init(&epggrab_mutex, NULL);
   pthread_cond_init(&epggrab_cond, NULL);
 
+  TAILQ_INIT(&epggrab_data_modules);
+
   idclass_register(&epggrab_class);
   idclass_register(&epggrab_mod_class);
   idclass_register(&epggrab_mod_int_class);
@@ -446,9 +521,10 @@ void epggrab_init ( void )
   /* Post-init for OTA subsystem */
   epggrab_ota_post();
 
-  /* Start internal grab thread */
+  /* Start internal and data queue grab thread */
   atomic_set(&epggrab_running, 1);
   tvhthread_create(&epggrab_tid, NULL, _epggrab_internal_thread, NULL, "epggrabi");
+  tvhthread_create(&epggrab_data_tid, NULL, _epggrab_data_thread, NULL, "epggrabd");
 }
 
 /*
@@ -460,7 +536,9 @@ void epggrab_done ( void )
 
   atomic_set(&epggrab_running, 0);
   pthread_cond_signal(&epggrab_cond);
+  pthread_cond_signal(&epggrab_data_cond);
   pthread_join(epggrab_tid, NULL);
+  pthread_join(epggrab_data_tid, NULL);
 
   pthread_mutex_lock(&global_lock);
   while ((mod = LIST_FIRST(&epggrab_modules)) != NULL) {
index 7944794a479327d629835b85d1eceb3054eb5fd9..d0a5e96796409a651fc354cc7b379ab7ce06018e 100644 (file)
@@ -27,6 +27,7 @@
  * Typedefs/Forward decls
  * *************************************************************************/
 
+typedef struct epggrab_queued_data  epggrab_queued_data_t;
 typedef struct epggrab_module       epggrab_module_t;
 typedef struct epggrab_module_int   epggrab_module_int_t;
 typedef struct epggrab_module_ext   epggrab_module_ext_t;
@@ -139,6 +140,16 @@ int epggrab_channel_is_ota ( epggrab_channel_t *ec );
  * Grabber Modules
  * *************************************************************************/
 
+/*
+ * Data queue
+ */
+struct epggrab_queued_data
+{
+  TAILQ_ENTRY(epggrab_queued_data) eq_link;
+  uint32_t                         eq_len;
+  uint8_t                          eq_data[0]; ///< Data are allocated at the end of structure
+};
+
 /*
  * Grabber base class
  */
@@ -146,6 +157,7 @@ struct epggrab_module
 {
   idnode_t                     idnode;
   LIST_ENTRY(epggrab_module)   link;      ///< Global list link
+  TAILQ_ENTRY(epggrab_module)  qlink;     ///< Queued data link
 
   enum {
     EPGGRAB_OTA,
@@ -161,11 +173,16 @@ struct epggrab_module
   int                          priority;  ///< Priority of the module
   epggrab_channel_tree_t       channels;  ///< Channel list
 
+  TAILQ_HEAD(, epggrab_queued_data) data_queue;
+
   /* Activate */
-  int       (*activate) ( void *m, int activate );
+  int       (*activate)( void *m, int activate );
 
   /* Free */
-  void      (*done)    ( void *m );
+  void      (*done)( void *m );
+
+  /* Process queued data */
+  void      (*process_data)( void *m, void *data, uint32_t len );
 };
 
 /*
@@ -309,6 +326,13 @@ epggrab_module_t* epggrab_module_find_by_id ( const char *id );
 const char * epggrab_module_type(epggrab_module_t *mod);
 const char * epggrab_module_get_status(epggrab_module_t *mod);
 
+/*
+ * Data queue
+ */
+void epggrab_queue_data(epggrab_module_t *mod,
+                        const void *data1, uint32_t len1,
+                        const void *data2, uint32_t len2);
+
 /* **************************************************************************
  * Setup/Configuration
  * *************************************************************************/
index d3a5a6df422a22055f9117962de25a16180ddc3c..978e5e4b79f7ba49d7394842c25d2df3bfe92ea0 100644 (file)
@@ -698,12 +698,13 @@ epggrab_module_ota_t *epggrab_module_ota_create
                         id, subsys, saveid, name, priority);
 
   /* Setup */
-  skel->type     = EPGGRAB_OTA;
-  skel->activate = ops->activate;
-  skel->start    = ops->start;
-  skel->done     = ops->done;
-  skel->tune     = ops->tune;
-  skel->opaque   = ops->opaque;
+  skel->type         = EPGGRAB_OTA;
+  skel->activate     = ops->activate;
+  skel->start        = ops->start;
+  skel->done         = ops->done;
+  skel->tune         = ops->tune;
+  skel->process_data = ops->process_data;
+  skel->opaque       = ops->opaque;
 
   return skel;
 }
index cdd3d4da61e09e35d058d8e8cc4a34ad261af2f0..570f513ab778ac477ac5e0ad12816c04a36b157a 100644 (file)
@@ -46,6 +46,14 @@ typedef struct eit_private
 #define EIT_SPEC_NZ_FREEVIEW        2
 #define EIT_SPEC_UK_CABLE_VIRGIN    3
 
+/* Queued data structure */
+typedef struct eit_data_t
+{
+  tvh_uuid_t svc_uuid;
+  int        tableid;
+  int        sect;
+  int        local_time;
+} eit_data_t;
 
 /* Provider configuration */
 typedef struct eit_module_t
@@ -73,7 +81,9 @@ typedef struct eit_event
 
   const char       *default_charset;
 
+#if TODO_ADD_EXTRA
   htsmsg_t         *extra;
+#endif
 
   epg_genre_list_t *genre;
 
@@ -792,12 +802,41 @@ static int _eit_process_event
   return 12 + (((ptr[10] & 0x0f) << 8) | ptr[11]);
 }
 
+static void
+_eit_process_data(void *m, void *data, uint32_t len)
+{
+  int save = 0, resched = 0;
+  eit_data_t ed;
+  mpegts_service_t *svc;
+
+  assert(len >= sizeof(ed));
+  memcpy(&ed, data, sizeof(ed));
+  data += sizeof(ed);
+  len -= sizeof(ed);
+  svc = (mpegts_service_t *)service_find0(&ed.svc_uuid);
+  if (svc == NULL)
+    return;
+  pthread_mutex_lock(&global_lock);
+  while (len) {
+    int r;
+    if ((r = _eit_process_event(m, ed.tableid, ed.sect, svc, data, len,
+                                ed.local_time, &resched, &save)) < 0)
+      break;
+    assert(r > 0);
+    len -= r;
+    data += r;
+  }
+  pthread_mutex_unlock(&global_lock);
+
+  if (save)    epg_updated();
+  if (resched) epggrab_resched();
+}
 
 static int
 _eit_callback
   (mpegts_table_t *mt, const uint8_t *ptr, int len, int tableid)
 {
-  int r, sect, last, ver, save, resched, spec;
+  int r, sect, last, ver, spec;
   uint8_t  seg;
   uint16_t onid, tsid, sid;
   uint32_t extraid;
@@ -808,6 +847,7 @@ _eit_callback
   epggrab_ota_mux_t    *ota = NULL;
   mpegts_psi_table_state_t *st;
   th_subscription_t    *ths;
+  eit_data_t            data;
   char ubuf[UUID_HEX_SIZE];
 
   if (!epggrab_ota_running)
@@ -928,25 +968,17 @@ svc_ok:
   if (svc->s_dvb_ignore_eit)
     goto done;
 
-  /* Process events */
-  save = resched = 0;
+  /* Queue events */
   len -= 11;
   ptr += 11;
-  while (len) {
-    int r;
-    if ((r = _eit_process_event(mod, tableid, sect, svc, ptr, len,
-                                mm->mm_network->mn_localtime,
-                                &resched, &save)) < 0)
-      break;
-    assert(r > 0);
-    len -= r;
-    ptr += r;
+  if (len >= 12) {
+    data.tableid = tableid;
+    data.sect = sect;
+    data.svc_uuid = svc->s_id.in_uuid;
+    data.local_time = mm->mm_network->mn_localtime;
+    epggrab_queue_data(mod, &data, sizeof(data), ptr, len);
   }
 
-  /* Update EPG */
-  if (resched) epggrab_resched();
-  if (save)    epg_updated();
-  
 done:
   r = dvb_table_end((mpegts_psi_table_t *)mt, st, sect);
 complete:
@@ -1172,6 +1204,7 @@ static eit_module_t *eit_module_ota_create
     .start  = _eit_start, \
     .done = _eit_done, \
     .activate = _eit_activate, \
+    .process_data = _eit_process_data, \
     .tune   = _eit_tune, \
     .opaque = &opaque_##name, \
   }
index 98eeec038ead0fa2dbd617b4de299c71de1a9f48..346dca05f29cdf07da565010ea29513ce34590f8 100644 (file)
@@ -104,6 +104,7 @@ typedef struct epggrab_ota_module_ops {
     void (*done)     (void *m);
     int  (*tune)     (epggrab_ota_map_t *map, epggrab_ota_mux_t *om,
                       struct mpegts_mux *mm);
+    void (*process_data) (void *m, void *data, uint32_t len);
     void  *opaque;
 } epggrab_ota_module_ops_t;
 
index ed31eb16e6d52fbfe0ea9c1192a412788f0ecbfc..4dd11008f5bca8ebf9d102df19ec6eacd3f36183 100644 (file)
@@ -539,6 +539,8 @@ void service_ref(service_t *t);
 
 static inline service_t *service_find(const char *identifier)
   { return idnode_find(identifier, &service_class, NULL); }
+static inline service_t *service_find0(tvh_uuid_t *uuid)
+  { return idnode_find0(uuid, &service_class, NULL); }
 #define service_find_by_identifier service_find
 
 service_instance_t *service_find_instance(struct service *s,