From: Jaroslav Kysela Date: Fri, 24 Apr 2015 11:29:25 +0000 (+0200) Subject: added tasklets X-Git-Tag: v4.1~118 X-Git-Url: http://git.ipfire.org/cgi-bin/gitweb.cgi?a=commitdiff_plain;h=c4d8b7a3fe93e5f89b325d411bd20e937cd1398a;p=thirdparty%2Ftvheadend.git added tasklets --- diff --git a/src/main.c b/src/main.c index 41c747f58..6a82df71b 100644 --- a/src/main.c +++ b/src/main.c @@ -159,6 +159,7 @@ const tvh_caps_t tvheadend_capabilities[] = { }; pthread_mutex_t global_lock; +pthread_mutex_t tasklet_lock; pthread_mutex_t ffmpeg_lock; pthread_mutex_t fork_lock; pthread_mutex_t atomic_lock; @@ -168,6 +169,9 @@ pthread_mutex_t atomic_lock; */ static LIST_HEAD(, gtimer) gtimers; static pthread_cond_t gtimer_cond; +static TAILQ_HEAD(, tasklet) tasklets; +static pthread_cond_t tasklet_cond; +static pthread_t tasklet_tid; static void handle_sigpipe(int x) @@ -248,8 +252,6 @@ gtimer_arm_abs2 LIST_INSERT_SORTED(>imers, gti, gti_link, gtimercmp); - //tvhdebug("gtimer", "%p @ %ld.%09ld", gti, when->tv_sec, when->tv_nsec); - if (LIST_FIRST(>imers) == gti) pthread_cond_signal(>imer_cond); // force timer re-check } @@ -298,12 +300,90 @@ void gtimer_disarm(gtimer_t *gti) { if(gti->gti_callback) { - //tvhdebug("gtimer", "%p disarm", gti); LIST_REMOVE(gti, gti_link); gti->gti_callback = NULL; } } +/** + * + */ +void +tasklet_arm(tasklet_t *tsk, tsk_callback_t *callback, void *opaque) +{ + pthread_mutex_lock(&tasklet_lock); + + if (tsk->tsk_callback != NULL) + TAILQ_REMOVE(&tasklets, tsk, tsk_link); + + tsk->tsk_callback = callback; + tsk->tsk_opaque = opaque; + + TAILQ_INSERT_TAIL(&tasklets, tsk, tsk_link); + + if (TAILQ_FIRST(&tasklets) == tsk) + pthread_cond_signal(&tasklet_cond); + + pthread_mutex_unlock(&tasklet_lock); +} + +/** + * + */ +void +tasklet_disarm(tasklet_t *tsk) +{ + pthread_mutex_lock(&tasklet_lock); + + if(tsk->tsk_callback) { + TAILQ_REMOVE(&tasklets, tsk, tsk_link); + tsk->tsk_callback(tsk->tsk_opaque, 1); + tsk->tsk_callback = NULL; + } + + pthread_mutex_unlock(&tasklet_lock); +} + +static void +tasklet_flush() +{ + tasklet_t *tsk; + + pthread_mutex_lock(&tasklet_lock); + + while ((tsk = TAILQ_FIRST(&tasklets)) != NULL) { + TAILQ_REMOVE(&tasklets, tsk, tsk_link); + tsk->tsk_callback(tsk->tsk_opaque, 1); + tsk->tsk_callback = NULL; + } + + pthread_mutex_unlock(&tasklet_lock); +} + +/** + * + */ +static void * +tasklet_thread ( void *aux ) +{ + tasklet_t *tsk; + + pthread_mutex_lock(&tasklet_lock); + while (tvheadend_running) { + tsk = TAILQ_FIRST(&tasklets); + if (tsk == NULL) { + pthread_cond_wait(&tasklet_cond, &tasklet_lock); + continue; + } + if (tsk->tsk_callback) + tsk->tsk_callback(tsk->tsk_opaque, 0); + TAILQ_REMOVE(&tasklets, tsk, tsk_link); + } + pthread_mutex_unlock(&tasklet_lock); + + return NULL; +} + /** * Show version info */ @@ -454,8 +534,10 @@ main(int argc, char **argv) pthread_mutex_init(&ffmpeg_lock, NULL); pthread_mutex_init(&fork_lock, NULL); pthread_mutex_init(&global_lock, NULL); + pthread_mutex_init(&tasklet_lock, NULL); pthread_mutex_init(&atomic_lock, NULL); pthread_cond_init(>imer_cond, NULL); + pthread_cond_init(&tasklet_cond, NULL); /* Defaults */ tvheadend_webui_port = 9981; @@ -823,6 +905,8 @@ main(int argc, char **argv) * Initialize subsystems */ + tvhthread_create(&tasklet_tid, NULL, tasklet_thread, NULL); + dbus_server_init(opt_dbus, opt_dbus_session); intlconv_init(); @@ -973,6 +1057,12 @@ main(int argc, char **argv) tvhftrace("main", idnode_done); tvhftrace("main", spawn_done); + tvhtrace("main", "tasklet enter"); + pthread_join(tasklet_tid, NULL); + tvhtrace("main", "tasklet thread end"); + tasklet_flush(); + tvhtrace("main", "tasklet leave"); + tvhlog(LOG_NOTICE, "STOP", "Exiting HTS Tvheadend"); tvhlog_end(); diff --git a/src/tvheadend.h b/src/tvheadend.h index 0a01c59cd..67dead208 100644 --- a/src/tvheadend.h +++ b/src/tvheadend.h @@ -150,7 +150,6 @@ typedef enum { * global timer */ - typedef void (gti_callback_t)(void *opaque); typedef struct gtimer { @@ -175,6 +174,22 @@ void gtimer_arm_abs2(gtimer_t *gti, gti_callback_t *callback, void *opaque, void gtimer_disarm(gtimer_t *gti); +/* + * tasklet + */ + +typedef void (tsk_callback_t)(void *opaque, int disarmed); + +typedef struct tasklet { + TAILQ_ENTRY(tasklet) tsk_link; + tsk_callback_t *tsk_callback; + void *tsk_opaque; +} tasklet_t; + +void tasklet_arm(tasklet_t *tsk, tsk_callback_t *callback, void *opaque); +void tasklet_disarm(tasklet_t *gti); + + /* * List / Queue header declarations */