profile_chain_t *prch = opaque, *prch2;
profile_sharer_t *prsh = prch->prch_sharer;
+ if (prsh == NULL) {
+ streaming_msg_free(sm);
+ return;
+ }
+
if (sm->sm_type == SMT_START) {
if (!prsh->prsh_master)
prsh->prsh_master = prch;
.st_info = profile_input_info
};
+/*
+ *
+ */
+static void
+profile_input_queue(void *opaque, streaming_message_t *sm)
+{
+ profile_chain_t *prch = opaque;
+ profile_sharer_t *prsh = prch->prch_sharer;
+ profile_sharer_message_t *psm = malloc(sizeof(*psm));
+ psm->psm_prch = prch;
+ psm->psm_sm = sm;
+ pthread_mutex_lock(&prsh->prsh_queue_mutex);
+ if (prsh->prsh_queue_run) {
+ if (TAILQ_FIRST(&prsh->prsh_queue))
+ tvh_cond_signal(&prsh->prsh_queue_cond, 0);
+ TAILQ_INSERT_TAIL(&prsh->prsh_queue, psm, psm_link);
+ } else {
+ streaming_msg_free(sm);
+ free(psm);
+ }
+ pthread_mutex_unlock(&prsh->prsh_queue_mutex);
+}
+
+static htsmsg_t *
+profile_input_queue_info(void *opaque, htsmsg_t *list)
+{
+ htsmsg_add_str(list, NULL, "profile queue input");
+ profile_input_info(opaque, list);
+ return list;
+}
+
+static streaming_ops_t profile_input_queue_ops = {
+ .st_cb = profile_input_queue,
+ .st_info = profile_input_queue_info
+};
+
/*
*
*/
{
profile_sharer_t *prsh = NULL;
profile_chain_t *prch2;
+ int do_queue = prch->prch_can_share != NULL;
LIST_FOREACH(prch2, &profile_chains, prch_link) {
if (prch2->prch_id != prch->prch_id)
}
if (!prsh) {
prsh = calloc(1, sizeof(*prsh));
+ prsh->prsh_do_queue = do_queue;
+ if (do_queue) {
+ pthread_mutex_init(&prsh->prsh_queue_mutex, NULL);
+ tvh_cond_init(&prsh->prsh_queue_cond);
+ TAILQ_INIT(&prsh->prsh_queue);
+ }
streaming_target_init(&prsh->prsh_input, &profile_sharer_input_ops, prsh, 0);
LIST_INIT(&prsh->prsh_chains);
}
return prsh;
}
+/*
+ *
+ */
+static void *
+profile_sharer_thread(void *aux)
+{
+ profile_sharer_t *prsh = aux;
+ profile_sharer_message_t *psm;
+ int run = 1;
+
+ while (run) {
+ pthread_mutex_lock(&prsh->prsh_queue_mutex);
+ run = prsh->prsh_queue_run;
+ psm = TAILQ_FIRST(&prsh->prsh_queue);
+ if (run && psm == NULL) {
+ tvh_cond_wait(&prsh->prsh_queue_cond, &prsh->prsh_queue_mutex);
+ run = prsh->prsh_queue_run;
+ psm = TAILQ_FIRST(&prsh->prsh_queue);
+ }
+ if (run && psm) {
+ if (psm) {
+ profile_input(psm->psm_prch, psm->psm_sm);
+ TAILQ_REMOVE(&prsh->prsh_queue, psm, psm_link);
+ free(psm);
+ }
+ }
+ pthread_mutex_unlock(&prsh->prsh_queue_mutex);
+ }
+ return NULL;
+}
+
+/*
+ *
+ */
+static int
+profile_sharer_postinit(profile_sharer_t *prsh)
+{
+ int r;
+
+ if (!prsh->prsh_do_queue)
+ return 0;
+ if (prsh->prsh_queue_run)
+ return 0;
+ r = tvhthread_create(&prsh->prsh_queue_thread, NULL,
+ profile_sharer_thread, prsh, "sharer");
+ if (!r)
+ prsh->prsh_queue_run = 1;
+ else
+ tvherror(LS_PROFILE, "unable to create sharer thread");
+ return r;
+}
+
/*
*
*/
profile_sharer_destroy(profile_chain_t *prch)
{
profile_sharer_t *prsh = prch->prch_sharer;
+ profile_sharer_message_t *psm, *psm2;
if (prsh == NULL)
return;
LIST_REMOVE(prch, prch_sharer_link);
- prch->prch_sharer = NULL;
- prch->prch_post_share = NULL;
if (LIST_EMPTY(&prsh->prsh_chains)) {
+ if (prsh->prsh_queue_run) {
+ pthread_mutex_lock(&prsh->prsh_queue_mutex);
+ prsh->prsh_queue_run = 0;
+ tvh_cond_signal(&prsh->prsh_queue_cond, 0);
+ pthread_mutex_unlock(&prsh->prsh_queue_mutex);
+ pthread_join(prsh->prsh_queue_thread, NULL);
+ while ((psm = TAILQ_FIRST(&prsh->prsh_queue)) != NULL) {
+ streaming_msg_free(psm->psm_sm);
+ TAILQ_REMOVE(&prsh->prsh_queue, psm, psm_link);
+ free(psm);
+ }
+ }
if (prsh->prsh_tsfix)
tsfix_destroy(prsh->prsh_tsfix);
#if ENABLE_LIBAV
if (prsh->prsh_start_msg)
streaming_start_unref(prsh->prsh_start_msg);
free(prsh);
+ prch->prch_sharer = NULL;
+ prch->prch_post_share = NULL;
+ } else {
+ if (prsh->prsh_queue_run) {
+ pthread_mutex_lock(&prsh->prsh_queue_mutex);
+ for (psm = TAILQ_FIRST(&prsh->prsh_queue); psm; psm = psm2) {
+ psm2 = TAILQ_NEXT(psm, psm_link);
+ if (psm->psm_prch != prch) continue;
+ if (psm->psm_sm->sm_type == SMT_PACKET ||
+ psm->psm_sm->sm_type == SMT_MPEGTS)
+ streaming_msg_free(psm->psm_sm);
+ else
+ profile_input(psm->psm_prch, psm->psm_sm);
+ TAILQ_REMOVE(&prsh->prsh_queue, psm, psm_link);
+ free(psm);
+ }
+ }
+ prch->prch_sharer = NULL;
+ prch->prch_post_share = NULL;
+ if (prsh->prsh_master == prch)
+ prsh->prsh_master = NULL;
+ if (prsh->prsh_queue_run)
+ pthread_mutex_unlock(&prsh->prsh_queue_mutex);
}
}
prch->prch_share = prsh->prsh_tsfix;
prch->prch_flags = SUBSCRIPTION_PACKET;
- streaming_target_init(&prch->prch_input, &profile_input_ops, prch, 0);
+ streaming_target_init(&prch->prch_input,
+ prsh->prsh_do_queue ?
+ &profile_input_queue_ops : &profile_input_ops, prch,
+ 0);
prch->prch_st = &prch->prch_input;
+ if (profile_sharer_postinit(prsh))
+ goto fail;
return 0;
fail:
if (!prsh)
goto fail;
- prch->prch_can_share = profile_transcode_can_share;
-
memset(&props, 0, sizeof(props));
strncpy(props.tp_vcodec, pro->pro_vcodec ?: "", sizeof(props.tp_vcodec)-1);
strncpy(props.tp_vcodec_preset, pro->pro_vcodec_preset ?: "", sizeof(props.tp_vcodec_preset)-1);
prsh->prsh_tsfix = tsfix_create(dst);
}
prch->prch_share = prsh->prsh_tsfix;
- streaming_target_init(&prch->prch_input, &profile_input_ops, prch, 0);
+ streaming_target_init(&prch->prch_input,
+ prsh->prsh_do_queue ?
+ &profile_input_queue_ops : &profile_input_ops, prch,
+ 0);
prch->prch_st = &prch->prch_input;
+ if (profile_sharer_postinit(prsh))
+ goto fail;
return 0;
fail:
profile_chain_close(prch);
{
int r;
+ prch->prch_can_share = profile_transcode_can_share;
+
prch->prch_flags = SUBSCRIPTION_PACKET;
prch->prch_sq.sq_maxsize = qsize;
extern profile_builders_queue profile_builders;
-typedef struct profile_sharer {
- streaming_target_t prsh_input;
- LIST_HEAD(,profile_chain) prsh_chains;
- struct profile_chain *prsh_master;
- struct streaming_start *prsh_start_msg;
- struct streaming_target *prsh_tsfix;
-#if ENABLE_LIBAV
- struct streaming_target *prsh_transcoder;
-#endif
-} profile_sharer_t;
-
typedef struct profile_chain {
LIST_ENTRY(profile_chain) prch_link;
int prch_linked;
muxer_config_t *m_cfg, int flags, size_t qsize);
} profile_t;
+typedef struct profile_sharer_message {
+ TAILQ_ENTRY(profile_sharer_message) psm_link;
+ profile_chain_t *psm_prch;
+ streaming_message_t *psm_sm;
+} profile_sharer_message_t;
+
+typedef struct profile_sharer {
+ uint32_t prsh_do_queue: 1;
+ uint32_t prsh_queue_run: 1;
+ pthread_t prsh_queue_thread;
+ pthread_mutex_t prsh_queue_mutex;
+ tvh_cond_t prsh_queue_cond;
+ TAILQ_HEAD(,profile_sharer_message) prsh_queue;
+ streaming_target_t prsh_input;
+ LIST_HEAD(,profile_chain) prsh_chains;
+ struct profile_chain *prsh_master;
+ struct streaming_start *prsh_start_msg;
+ struct streaming_target *prsh_tsfix;
+#if ENABLE_LIBAV
+ struct streaming_target *prsh_transcoder;
+#endif
+} profile_sharer_t;
+
void profile_register(const idclass_t *clazz, profile_builder_t builder);
profile_t *profile_create