]> git.ipfire.org Git - thirdparty/tvheadend.git/commitdiff
subscription: add proper raw service cleanup, fixes instabilities
authorJaroslav Kysela <perex@perex.cz>
Sun, 18 Oct 2015 18:54:49 +0000 (20:54 +0200)
committerJaroslav Kysela <perex@perex.cz>
Sun, 18 Oct 2015 18:54:49 +0000 (20:54 +0200)
src/service.c
src/service.h
src/subscriptions.c

index f6c6be36813092260b48a68a89e60c788967fedb..6e39c80f24d8687eb20948742e3ab5934a4a449c 100644 (file)
@@ -55,6 +55,9 @@ static void service_class_save(struct idnode *self);
 
 struct service_queue service_all;
 struct service_queue service_raw_all;
+struct service_queue service_raw_remove;
+
+static gtimer_t service_raw_remove_timer;
 
 static void
 service_class_notify_enabled ( void *obj, const char *lang )
@@ -349,9 +352,6 @@ service_remove_subscriber(service_t *t, th_subscription_t *s,
   } else {
     subscription_unlink_service(s, reason);
   }
-
-  if(LIST_FIRST(&t->s_subscriptions) == NULL)
-    service_stop(t);
 }
 
 
@@ -811,8 +811,7 @@ service_destroy(service_t *t, int delconf)
 
   idnode_unlink(&t->s_id);
 
-  if(t->s_status != SERVICE_IDLE)
-    service_stop(t);
+  assert(t->s_status == SERVICE_IDLE);
 
   t->s_status = SERVICE_ZOMBIE;
 
@@ -824,14 +823,39 @@ service_destroy(service_t *t, int delconf)
 
   avgstat_flush(&t->s_rate);
 
-  if (t->s_type == STYPE_RAW)
+  switch (t->s_type) {
+  case STYPE_RAW:
     TAILQ_REMOVE(&service_raw_all, t, s_all_link);
-  else
+    break;
+  case STYPE_RAW_REMOVED:
+    TAILQ_REMOVE(&service_raw_remove, t, s_all_link);
+    break;
+  default:
     TAILQ_REMOVE(&service_all, t, s_all_link);
+  }
 
   service_unref(t);
 }
 
+static void
+service_remove_raw_timer_cb(void *aux)
+{
+  service_t *t;
+  while ((t = TAILQ_FIRST(&service_raw_remove)) != NULL)
+    service_destroy(t, 0);
+}
+
+void
+service_remove_raw(service_t *t)
+{
+  if (t == NULL) return;
+  assert(t->s_type == STYPE_RAW);
+  t->s_type = STYPE_RAW_REMOVED;
+  TAILQ_REMOVE(&service_raw_all, t, s_all_link);
+  TAILQ_INSERT_TAIL(&service_raw_remove, t, s_all_link);
+  gtimer_arm(&service_raw_remove_timer, service_remove_raw_timer_cb, NULL, 0);
+}
+
 void
 service_set_enabled(service_t *t, int enabled, int _auto)
 {
@@ -1393,6 +1417,7 @@ service_init(void)
   TAILQ_INIT(&pending_save_queue);
   TAILQ_INIT(&service_all);
   TAILQ_INIT(&service_raw_all);
+  TAILQ_INIT(&service_raw_remove);
   pthread_mutex_init(&pending_save_mutex, NULL);
   pthread_cond_init(&pending_save_cond, NULL);
   tvhthread_create(&service_saver_tid, NULL, service_saver, NULL, "service");
@@ -1401,8 +1426,15 @@ service_init(void)
 void
 service_done(void)
 {
+  service_t *t;
+
   pthread_cond_signal(&pending_save_cond);
   pthread_join(service_saver_tid, NULL);
+
+  pthread_mutex_lock(&global_lock);
+  while ((t = TAILQ_FIRST(&service_raw_remove)) != NULL)
+    service_destroy(t, 0);
+  pthread_mutex_unlock(&global_lock);
 }
 
 /**
index 3a96e611cbf2b7f8e78d103479fc8108c09e93dc..468e5749e4de17e14aa634b6dbf8d9d0bc968824 100644 (file)
@@ -29,6 +29,7 @@ extern const idclass_t service_raw_class;
 
 extern struct service_queue service_all;
 extern struct service_queue service_raw_all;
+extern struct service_queue service_raw_remove;
 
 struct channel;
 struct tvh_input;
@@ -240,7 +241,8 @@ typedef struct service {
    */
   enum {
     STYPE_STD,
-    STYPE_RAW
+    STYPE_RAW,
+    STYPE_RAW_REMOVED
   } s_type;
 
   /**
@@ -543,6 +545,8 @@ void service_set_enabled ( service_t *t, int enabled, int _auto );
 
 void service_destroy(service_t *t, int delconf);
 
+void service_remove_raw(service_t *);
+
 void service_remove_subscriber(service_t *t, struct th_subscription *s,
                               int reason);
 
index 8b36cc5f2688c1a17508cafaffcb4266521c4111..123eeb24081a7f5e49cb2f7ebfe0943dc4632f9d 100644 (file)
@@ -116,16 +116,14 @@ subscription_link_service(th_subscription_t *s, service_t *t)
 /**
  * Called from service code
  */
-static void
+static int
 subscription_unlink_service0(th_subscription_t *s, int reason, int stop)
 {
   streaming_message_t *sm;
-  service_t *t = s->ths_service;
+  service_t *t = s->ths_service, *tr = s->ths_raw_service;
 
   /* Ignore - not actually linked */
-  if (!s->ths_current_instance) return;
-
-  tvhtrace("subscription", "%04X: unlinking sub %p from svc %p", shortid(s), s, t);
+  if (!s->ths_current_instance) goto stop;
 
   pthread_mutex_lock(&t->s_stream_mutex);
 
@@ -142,9 +140,19 @@ subscription_unlink_service0(th_subscription_t *s, int reason, int stop)
 
   LIST_REMOVE(s, ths_service_link);
   s->ths_service = NULL;
+  s->ths_raw_service = NULL;
 
-  if (stop && (s->ths_flags & SUBSCRIPTION_ONESHOT) != 0)
+  if (stop && (s->ths_flags & SUBSCRIPTION_ONESHOT) != 0) {
+    if (tr)
+      LIST_REMOVE(s, ths_mux_link);
     subscription_unsubscribe(s, 0);
+    service_remove_raw(tr);
+  }
+
+stop:
+  if(LIST_FIRST(&t->s_subscriptions) == NULL)
+    service_stop(t);
+  return 1;
 }
 
 void
@@ -329,11 +337,7 @@ subscription_reschedule(void)
 
       subscription_unlink_service0(s, SM_CODE_BAD_SOURCE, 0);
 
-      if(t && LIST_FIRST(&t->s_subscriptions) == NULL)
-        service_stop(t);
-
       si = s->ths_current_instance;
-
       assert(si != NULL);
       si->si_error = s->ths_testing_error;
       time(&si->si_error_time);
@@ -597,7 +601,7 @@ subscription_unsubscribe(th_subscription_t *s, int quiet)
 
 #if ENABLE_MPEGTS
   if (s->ths_raw_service)
-    service_destroy(s->ths_raw_service, 0);
+    service_remove_raw(s->ths_raw_service);
 #endif
 
   streaming_msg_free(s->ths_start_message);