]> git.ipfire.org Git - thirdparty/tvheadend.git/commitdiff
subscription: add proper raw service cleanup, fixes instabilities
authorJaroslav Kysela <perex@perex.cz>
Sun, 18 Oct 2015 18:54:49 +0000 (20:54 +0200)
committerJaroslav Kysela <perex@perex.cz>
Sun, 18 Oct 2015 22:20:39 +0000 (00:20 +0200)
src/service.c
src/service.h
src/subscriptions.c

index 2896d80d9252987a9fbaac02b1585c3523d60662..179b1aa0a53fa8b5403761585fd3952b45db0b1f 100644 (file)
@@ -54,6 +54,9 @@ static void service_class_save(struct idnode *self);
 
 struct service_queue service_all;
 struct service_queue service_raw_all;
+struct service_queue service_raw_remove;
+
+static gtimer_t service_raw_remove_timer;
 
 static void
 service_class_notify_enabled ( void *obj )
@@ -394,9 +397,6 @@ service_remove_subscriber(service_t *t, th_subscription_t *s,
   } else {
     subscription_unlink_service(s, reason);
   }
-
-  if(LIST_FIRST(&t->s_subscriptions) == NULL)
-    service_stop(t);
 }
 
 
@@ -842,8 +842,7 @@ service_destroy(service_t *t, int delconf)
 
   idnode_unlink(&t->s_id);
 
-  if(t->s_status != SERVICE_IDLE)
-    service_stop(t);
+  assert(t->s_status == SERVICE_IDLE);
 
   t->s_status = SERVICE_ZOMBIE;
 
@@ -855,14 +854,39 @@ service_destroy(service_t *t, int delconf)
 
   avgstat_flush(&t->s_rate);
 
-  if (t->s_type == STYPE_RAW)
+  switch (t->s_type) {
+  case STYPE_RAW:
     TAILQ_REMOVE(&service_raw_all, t, s_all_link);
-  else
+    break;
+  case STYPE_RAW_REMOVED:
+    TAILQ_REMOVE(&service_raw_remove, t, s_all_link);
+    break;
+  default:
     TAILQ_REMOVE(&service_all, t, s_all_link);
+  }
 
   service_unref(t);
 }
 
+static void
+service_remove_raw_timer_cb(void *aux)
+{
+  service_t *t;
+  while ((t = TAILQ_FIRST(&service_raw_remove)) != NULL)
+    service_destroy(t, 0);
+}
+
+void
+service_remove_raw(service_t *t)
+{
+  if (t == NULL) return;
+  assert(t->s_type == STYPE_RAW);
+  t->s_type = STYPE_RAW_REMOVED;
+  TAILQ_REMOVE(&service_raw_all, t, s_all_link);
+  TAILQ_INSERT_TAIL(&service_raw_remove, t, s_all_link);
+  gtimer_arm(&service_raw_remove_timer, service_remove_raw_timer_cb, NULL, 0);
+}
+
 void
 service_set_enabled(service_t *t, int enabled, int _auto)
 {
@@ -1414,6 +1438,7 @@ service_init(void)
   TAILQ_INIT(&pending_save_queue);
   TAILQ_INIT(&service_all);
   TAILQ_INIT(&service_raw_all);
+  TAILQ_INIT(&service_raw_remove);
   pthread_mutex_init(&pending_save_mutex, NULL);
   pthread_cond_init(&pending_save_cond, NULL);
   tvhthread_create(&service_saver_tid, NULL, service_saver, NULL);
@@ -1422,8 +1447,15 @@ service_init(void)
 void
 service_done(void)
 {
+  service_t *t;
+
   pthread_cond_signal(&pending_save_cond);
   pthread_join(service_saver_tid, NULL);
+
+  pthread_mutex_lock(&global_lock);
+  while ((t = TAILQ_FIRST(&service_raw_remove)) != NULL)
+    service_destroy(t, 0);
+  pthread_mutex_unlock(&global_lock);
 }
 
 /**
index f3a88a59dcb17a1265b959a46988828886cdbcf4..ba10626616e7dd30787e3502b4f95157d355b926 100644 (file)
@@ -28,6 +28,7 @@ extern const idclass_t service_raw_class;
 
 extern struct service_queue service_all;
 extern struct service_queue service_raw_all;
+extern struct service_queue service_raw_remove;
 
 struct channel;
 struct tvh_input;
@@ -241,7 +242,8 @@ typedef struct service {
    */
   enum {
     STYPE_STD,
-    STYPE_RAW
+    STYPE_RAW,
+    STYPE_RAW_REMOVED
   } s_type;
 
   /**
@@ -535,6 +537,8 @@ void service_set_enabled ( service_t *t, int enabled, int _auto );
 
 void service_destroy(service_t *t, int delconf);
 
+void service_remove_raw(service_t *);
+
 void service_remove_subscriber(service_t *t, struct th_subscription *s,
                               int reason);
 
index ac3a7feb4e4291bec238782c15db5ec9e935a10c..627f77b4cd19ddab19a96099a2dc24542645adcc 100644 (file)
@@ -116,16 +116,14 @@ subscription_link_service(th_subscription_t *s, service_t *t)
 /**
  * Called from service code
  */
-static void
+static int
 subscription_unlink_service0(th_subscription_t *s, int reason, int stop)
 {
   streaming_message_t *sm;
-  service_t *t = s->ths_service;
+  service_t *t = s->ths_service, *tr = s->ths_raw_service;
 
   /* Ignore - not actually linked */
-  if (!s->ths_current_instance) return;
-
-  tvhtrace("subscription", "%04X: unlinking sub %p from svc %p", shortid(s), s, t);
+  if (!s->ths_current_instance) goto stop;
 
   pthread_mutex_lock(&t->s_stream_mutex);
 
@@ -142,9 +140,19 @@ subscription_unlink_service0(th_subscription_t *s, int reason, int stop)
 
   LIST_REMOVE(s, ths_service_link);
   s->ths_service = NULL;
+  s->ths_raw_service = NULL;
 
-  if (stop && (s->ths_flags & SUBSCRIPTION_ONESHOT) != 0)
+  if (stop && (s->ths_flags & SUBSCRIPTION_ONESHOT) != 0) {
+    if (tr)
+      LIST_REMOVE(s, ths_mux_link);
     subscription_unsubscribe(s, 0);
+    service_remove_raw(tr);
+  }
+
+stop:
+  if(LIST_FIRST(&t->s_subscriptions) == NULL)
+    service_stop(t);
+  return 1;
 }
 
 void
@@ -329,11 +337,7 @@ subscription_reschedule(void)
 
       subscription_unlink_service0(s, SM_CODE_BAD_SOURCE, 0);
 
-      if(t && LIST_FIRST(&t->s_subscriptions) == NULL)
-        service_stop(t);
-
       si = s->ths_current_instance;
-
       assert(si != NULL);
       si->si_error = s->ths_testing_error;
       time(&si->si_error_time);
@@ -593,7 +597,7 @@ subscription_unsubscribe(th_subscription_t *s, int quiet)
 
 #if ENABLE_MPEGTS
   if (s->ths_raw_service)
-    service_destroy(s->ths_raw_service, 0);
+    service_remove_raw(s->ths_raw_service);
 #endif
 
   streaming_msg_free(s->ths_start_message);