From: Jaroslav Kysela Date: Fri, 16 Jun 2017 13:33:20 +0000 (+0200) Subject: profile: run transcoder in another thread X-Git-Url: http://git.ipfire.org/cgi-bin/gitweb.cgi?a=commitdiff_plain;h=0310309305830caa0deffda07e46d282e3e8c5d5;p=thirdparty%2Ftvheadend.git profile: run transcoder in another thread --- diff --git a/src/profile.c b/src/profile.c index 1c045d7d7..6bf7e40f2 100644 --- a/src/profile.c +++ b/src/profile.c @@ -645,6 +645,11 @@ profile_input(void *opaque, streaming_message_t *sm) profile_chain_t *prch = opaque, *prch2; profile_sharer_t *prsh = prch->prch_sharer; + if (prsh == NULL) { + streaming_msg_free(sm); + return; + } + if (sm->sm_type == SMT_START) { if (!prsh->prsh_master) prsh->prsh_master = prch; @@ -700,6 +705,42 @@ static streaming_ops_t profile_input_ops = { .st_info = profile_input_info }; +/* + * + */ +static void +profile_input_queue(void *opaque, streaming_message_t *sm) +{ + profile_chain_t *prch = opaque; + profile_sharer_t *prsh = prch->prch_sharer; + profile_sharer_message_t *psm = malloc(sizeof(*psm)); + psm->psm_prch = prch; + psm->psm_sm = sm; + pthread_mutex_lock(&prsh->prsh_queue_mutex); + if (prsh->prsh_queue_run) { + if (TAILQ_FIRST(&prsh->prsh_queue)) + tvh_cond_signal(&prsh->prsh_queue_cond, 0); + TAILQ_INSERT_TAIL(&prsh->prsh_queue, psm, psm_link); + } else { + streaming_msg_free(sm); + free(psm); + } + pthread_mutex_unlock(&prsh->prsh_queue_mutex); +} + +static htsmsg_t * +profile_input_queue_info(void *opaque, htsmsg_t *list) +{ + htsmsg_add_str(list, NULL, "profile queue input"); + profile_input_info(opaque, list); + return list; +} + +static streaming_ops_t profile_input_queue_ops = { + .st_cb = profile_input_queue, + .st_info = profile_input_queue_info +}; + /* * */ @@ -798,6 +839,7 @@ profile_sharer_find(profile_chain_t *prch) { profile_sharer_t *prsh = NULL; profile_chain_t *prch2; + int do_queue = prch->prch_can_share != NULL; LIST_FOREACH(prch2, &profile_chains, prch_link) { if (prch2->prch_id != prch->prch_id) @@ -811,12 +853,70 @@ profile_sharer_find(profile_chain_t *prch) } if (!prsh) { prsh = calloc(1, sizeof(*prsh)); + prsh->prsh_do_queue = do_queue; + if (do_queue) { + pthread_mutex_init(&prsh->prsh_queue_mutex, NULL); + tvh_cond_init(&prsh->prsh_queue_cond); + TAILQ_INIT(&prsh->prsh_queue); + } streaming_target_init(&prsh->prsh_input, &profile_sharer_input_ops, prsh, 0); LIST_INIT(&prsh->prsh_chains); } return prsh; } +/* + * + */ +static void * +profile_sharer_thread(void *aux) +{ + profile_sharer_t *prsh = aux; + profile_sharer_message_t *psm; + int run = 1; + + while (run) { + pthread_mutex_lock(&prsh->prsh_queue_mutex); + run = prsh->prsh_queue_run; + psm = TAILQ_FIRST(&prsh->prsh_queue); + if (run && psm == NULL) { + tvh_cond_wait(&prsh->prsh_queue_cond, &prsh->prsh_queue_mutex); + run = prsh->prsh_queue_run; + psm = TAILQ_FIRST(&prsh->prsh_queue); + } + if (run && psm) { + if (psm) { + profile_input(psm->psm_prch, psm->psm_sm); + TAILQ_REMOVE(&prsh->prsh_queue, psm, psm_link); + free(psm); + } + } + pthread_mutex_unlock(&prsh->prsh_queue_mutex); + } + return NULL; +} + +/* + * + */ +static int +profile_sharer_postinit(profile_sharer_t *prsh) +{ + int r; + + if (!prsh->prsh_do_queue) + return 0; + if (prsh->prsh_queue_run) + return 0; + r = tvhthread_create(&prsh->prsh_queue_thread, NULL, + profile_sharer_thread, prsh, "sharer"); + if (!r) + prsh->prsh_queue_run = 1; + else + tvherror(LS_PROFILE, "unable to create sharer thread"); + return r; +} + /* * */ @@ -841,13 +941,24 @@ static void profile_sharer_destroy(profile_chain_t *prch) { profile_sharer_t *prsh = prch->prch_sharer; + profile_sharer_message_t *psm, *psm2; if (prsh == NULL) return; LIST_REMOVE(prch, prch_sharer_link); - prch->prch_sharer = NULL; - prch->prch_post_share = NULL; if (LIST_EMPTY(&prsh->prsh_chains)) { + if (prsh->prsh_queue_run) { + pthread_mutex_lock(&prsh->prsh_queue_mutex); + prsh->prsh_queue_run = 0; + tvh_cond_signal(&prsh->prsh_queue_cond, 0); + pthread_mutex_unlock(&prsh->prsh_queue_mutex); + pthread_join(prsh->prsh_queue_thread, NULL); + while ((psm = TAILQ_FIRST(&prsh->prsh_queue)) != NULL) { + streaming_msg_free(psm->psm_sm); + TAILQ_REMOVE(&prsh->prsh_queue, psm, psm_link); + free(psm); + } + } if (prsh->prsh_tsfix) tsfix_destroy(prsh->prsh_tsfix); #if ENABLE_LIBAV @@ -857,6 +968,29 @@ profile_sharer_destroy(profile_chain_t *prch) if (prsh->prsh_start_msg) streaming_start_unref(prsh->prsh_start_msg); free(prsh); + prch->prch_sharer = NULL; + prch->prch_post_share = NULL; + } else { + if (prsh->prsh_queue_run) { + pthread_mutex_lock(&prsh->prsh_queue_mutex); + for (psm = TAILQ_FIRST(&prsh->prsh_queue); psm; psm = psm2) { + psm2 = TAILQ_NEXT(psm, psm_link); + if (psm->psm_prch != prch) continue; + if (psm->psm_sm->sm_type == SMT_PACKET || + psm->psm_sm->sm_type == SMT_MPEGTS) + streaming_msg_free(psm->psm_sm); + else + profile_input(psm->psm_prch, psm->psm_sm); + TAILQ_REMOVE(&prsh->prsh_queue, psm, psm_link); + free(psm); + } + } + prch->prch_sharer = NULL; + prch->prch_post_share = NULL; + if (prsh->prsh_master == prch) + prsh->prsh_master = NULL; + if (prsh->prsh_queue_run) + pthread_mutex_unlock(&prsh->prsh_queue_mutex); } } @@ -1064,8 +1198,13 @@ profile_htsp_work(profile_chain_t *prch, prch->prch_share = prsh->prsh_tsfix; prch->prch_flags = SUBSCRIPTION_PACKET; - streaming_target_init(&prch->prch_input, &profile_input_ops, prch, 0); + streaming_target_init(&prch->prch_input, + prsh->prsh_do_queue ? + &profile_input_queue_ops : &profile_input_ops, prch, + 0); prch->prch_st = &prch->prch_input; + if (profile_sharer_postinit(prsh)) + goto fail; return 0; fail: @@ -2178,8 +2317,6 @@ profile_transcode_work(profile_chain_t *prch, if (!prsh) goto fail; - prch->prch_can_share = profile_transcode_can_share; - memset(&props, 0, sizeof(props)); strncpy(props.tp_vcodec, pro->pro_vcodec ?: "", sizeof(props.tp_vcodec)-1); strncpy(props.tp_vcodec_preset, pro->pro_vcodec_preset ?: "", sizeof(props.tp_vcodec_preset)-1); @@ -2216,8 +2353,13 @@ profile_transcode_work(profile_chain_t *prch, prsh->prsh_tsfix = tsfix_create(dst); } prch->prch_share = prsh->prsh_tsfix; - streaming_target_init(&prch->prch_input, &profile_input_ops, prch, 0); + streaming_target_init(&prch->prch_input, + prsh->prsh_do_queue ? + &profile_input_queue_ops : &profile_input_ops, prch, + 0); prch->prch_st = &prch->prch_input; + if (profile_sharer_postinit(prsh)) + goto fail; return 0; fail: profile_chain_close(prch); @@ -2272,6 +2414,8 @@ profile_transcode_open(profile_chain_t *prch, { int r; + prch->prch_can_share = profile_transcode_can_share; + prch->prch_flags = SUBSCRIPTION_PACKET; prch->prch_sq.sq_maxsize = qsize; diff --git a/src/profile.h b/src/profile.h index cd518c52e..a5c192507 100644 --- a/src/profile.h +++ b/src/profile.h @@ -69,17 +69,6 @@ typedef LIST_HEAD(, profile_build) profile_builders_queue; extern profile_builders_queue profile_builders; -typedef struct profile_sharer { - streaming_target_t prsh_input; - LIST_HEAD(,profile_chain) prsh_chains; - struct profile_chain *prsh_master; - struct streaming_start *prsh_start_msg; - struct streaming_target *prsh_tsfix; -#if ENABLE_LIBAV - struct streaming_target *prsh_transcoder; -#endif -} profile_sharer_t; - typedef struct profile_chain { LIST_ENTRY(profile_chain) prch_link; int prch_linked; @@ -147,6 +136,29 @@ typedef struct profile { muxer_config_t *m_cfg, int flags, size_t qsize); } profile_t; +typedef struct profile_sharer_message { + TAILQ_ENTRY(profile_sharer_message) psm_link; + profile_chain_t *psm_prch; + streaming_message_t *psm_sm; +} profile_sharer_message_t; + +typedef struct profile_sharer { + uint32_t prsh_do_queue: 1; + uint32_t prsh_queue_run: 1; + pthread_t prsh_queue_thread; + pthread_mutex_t prsh_queue_mutex; + tvh_cond_t prsh_queue_cond; + TAILQ_HEAD(,profile_sharer_message) prsh_queue; + streaming_target_t prsh_input; + LIST_HEAD(,profile_chain) prsh_chains; + struct profile_chain *prsh_master; + struct streaming_start *prsh_start_msg; + struct streaming_target *prsh_tsfix; +#if ENABLE_LIBAV + struct streaming_target *prsh_transcoder; +#endif +} profile_sharer_t; + void profile_register(const idclass_t *clazz, profile_builder_t builder); profile_t *profile_create