From: Jaroslav Kysela Date: Sat, 31 Oct 2015 19:28:12 +0000 (+0100) Subject: DVR: rewrite DVR thread to handle better EPG running flag X-Git-Tag: v4.2.1~1718 X-Git-Url: http://git.ipfire.org/gitweb.cgi?a=commitdiff_plain;h=4400bb263748b8b3dd33d85e107097680c6f3be7;p=thirdparty%2Ftvheadend.git DVR: rewrite DVR thread to handle better EPG running flag --- diff --git a/src/dvr/dvr_rec.c b/src/dvr/dvr_rec.c index 2216b0c06..4ecd417fe 100644 --- a/src/dvr/dvr_rec.c +++ b/src/dvr/dvr_rec.c @@ -1075,6 +1075,106 @@ dvr_streaming_restart(dvr_entry_t *de, int *run) } } +/** + * + */ +static int +dvr_thread_pkt_stats(dvr_entry_t *de, th_pkt_t *pkt, int payload) +{ + th_subscription_t *ts; + int ret = 0; + + if ((ts = de->de_s) != NULL) { + if (pkt->pkt_err) { + de->de_data_errors += pkt->pkt_err; + ret = 1; + } + if (payload && pkt->pkt_payload) + subscription_add_bytes_out(ts, pktbuf_len(pkt->pkt_payload)); + } + return ret; +} + +/** + * + */ +static int +dvr_thread_mpegts_stats(dvr_entry_t *de, void *sm_data) +{ + th_subscription_t *ts; + pktbuf_t *pb = sm_data; + int ret; + + if (pb == NULL) + return 0; + if ((ts = de->de_s) != NULL) { + if (pb->pb_err) { + de->de_data_errors += pb->pb_err; + ret = 1; + } + subscription_add_bytes_out(ts, pktbuf_len(pb)); + } + return ret; +} + +/** + * + */ +static int +dvr_thread_rec_start(dvr_entry_t *de, int started, + streaming_start_t *ss, int *run, + int64_t *dts_offset, + int epg_running, const char *postproc) +{ + profile_chain_t *prch = de->de_chain; + int ret = 0; + + if (started && + muxer_reconfigure(prch->prch_muxer, ss) < 0) { + tvhlog(LOG_WARNING, + "dvr", "Unable to reconfigure \"%s\"", + dvr_get_filename(de) ?: lang_str_get(de->de_title, NULL)); + + // Try to restart the recording if the muxer doesn't + // support reconfiguration of the streams. + dvr_thread_epilog(de, postproc); + started = 0; + *dts_offset = PTS_UNSET; + if (epg_running) { + if (!dvr_thread_global_lock(de, run)) + return 0; + if (de->de_config->dvr_clone) + de = dvr_entry_clone(de); + dvr_thread_global_unlock(de); + } + } + + if (!started) { + if (!dvr_thread_global_lock(de, run)) + return 0; + dvr_rec_set_state(de, DVR_RS_WAIT_PROGRAM_START, 0); + if(dvr_rec_start(de, ss) == 0) + ret = 1; + else + dvr_stop_recording(de, SM_CODE_INVALID_TARGET, 1, 0); + dvr_thread_global_unlock(de); + } + return ret; +} + +/** + * + */ +static void +dvr_thread_backlog_free(struct streaming_message_queue *backlog) +{ + streaming_message_t *sm; + while ((sm = TAILQ_FIRST(backlog)) != NULL) { + TAILQ_REMOVE(backlog, sm, sm_link); + streaming_msg_free(sm); + } +} + /** * */ @@ -1084,12 +1184,14 @@ dvr_thread(void *aux) dvr_entry_t *de = aux; profile_chain_t *prch = de->de_chain; streaming_queue_t *sq = &prch->prch_sq; - streaming_message_t *sm; - th_subscription_t *ts; - th_pkt_t *pkt; - int run = 1, started = 0, comm_skip, epg_running, rs; + struct streaming_message_queue backlog; + streaming_message_t *sm, *sm2; + th_pkt_t *pkt, *pkt2, *pkt3; + streaming_start_t *ss = NULL; + int run = 1, started = 0, muxing = 0, comm_skip, epg_running = 0, rs; int commercial = COMMERCIAL_UNKNOWN; - int64_t packets = 0; + int64_t packets = 0, dts_offset = PTS_UNSET; + time_t start_time = 0; char *postproc; if (!dvr_thread_global_lock(de, &run)) @@ -1098,39 +1200,35 @@ dvr_thread(void *aux) postproc = de->de_config->dvr_postproc ? strdup(de->de_config->dvr_postproc) : NULL; dvr_thread_global_unlock(de); - pthread_mutex_lock(&sq->sq_mutex); + TAILQ_INIT(&backlog); + pthread_mutex_lock(&sq->sq_mutex); while(run) { sm = TAILQ_FIRST(&sq->sq_queue); if(sm == NULL) { pthread_cond_wait(&sq->sq_cond, &sq->sq_mutex); continue; } + streaming_queue_remove(sq, sm); - if ((ts = de->de_s) != NULL && started) { - pktbuf_t *pb = NULL; - if (sm->sm_type == SMT_PACKET) { - pb = ((th_pkt_t*)sm->sm_data)->pkt_payload; - if (((th_pkt_t*)sm->sm_data)->pkt_err) { - de->de_data_errors += ((th_pkt_t*)sm->sm_data)->pkt_err; - dvr_notify(de); - } - } - else if (sm->sm_type == SMT_MPEGTS) { - pb = sm->sm_data; - if (pb->pb_err) { - de->de_data_errors += pb->pb_err; - dvr_notify(de); + if (sm->sm_type == SMT_PACKET || sm->sm_type == SMT_MPEGTS) { + if (de->de_running_start > de->de_running_stop) { + epg_running = 1; + } else if (de->de_running_start == 0 && de->de_running_stop == 0) { + if (start_time + 2 >= dispatch_clock) { + epg_running = 0; + TAILQ_INSERT_TAIL(&backlog, sm, sm_link); + continue; + } else { + if (TAILQ_FIRST(&backlog)) + dvr_thread_backlog_free(&backlog); + epg_running = 1; } + } else { + epg_running = 0; } - if (pb) - subscription_add_bytes_out(ts, pktbuf_len(pb)); } - streaming_queue_remove(sq, sm); - - epg_running = de->de_running_start > de->de_running_stop || - (de->de_running_start == 0 && de->de_running_stop == 0); pthread_mutex_unlock(&sq->sq_mutex); switch(sm->sm_type) { @@ -1161,67 +1259,97 @@ dvr_thread(void *aux) commercial = pkt->pkt_commercial; - if (started) { - muxer_write_pkt(prch->prch_muxer, sm->sm_type, sm->sm_data); - sm->sm_data = NULL; - dvr_notify(de); - packets++; + if (!started) + break; + + if (muxing == 0 && + !dvr_thread_rec_start(de, started, ss, &run, &dts_offset, + epg_running, postproc)) + break; + + muxing = 1; + while ((sm2 = TAILQ_FIRST(&backlog)) != NULL) { + TAILQ_REMOVE(&backlog, sm2, sm_link); + if (pkt->pkt_dts != PTS_UNSET) { + if (dts_offset == PTS_UNSET) { + pkt2 = sm2->sm_data; + dts_offset = pkt2->pkt_dts; + } + pkt3 = (th_pkt_t *)sm2->sm_data; + if (dts_offset != PTS_UNSET && pkt->pkt_dts >= dts_offset) { + pkt3 = pkt_copy_shallow(pkt3); + pkt3->pkt_dts -= dts_offset; + if (pkt3->pkt_pts != PTS_UNSET) + pkt3->pkt_pts -= dts_offset; + dvr_thread_pkt_stats(de, pkt3, 1); + muxer_write_pkt(prch->prch_muxer, sm2->sm_type, pkt3); + } else { + dvr_thread_pkt_stats(de, pkt3, 0); + } + } + streaming_msg_free(sm2); + } + if (dts_offset == PTS_UNSET && pkt->pkt_dts != PTS_UNSET) + dts_offset = pkt->pkt_dts; + if (pkt->pkt_dts != PTS_UNSET && dts_offset != PTS_UNSET && + pkt->pkt_dts >= dts_offset) { + pkt3 = pkt_copy_shallow(pkt); + pkt3->pkt_dts -= dts_offset; + if (pkt3->pkt_pts != PTS_UNSET) + pkt3->pkt_pts -= dts_offset; + dvr_thread_pkt_stats(de, pkt3, 1); + muxer_write_pkt(prch->prch_muxer, sm->sm_type, pkt3); + } else { + dvr_thread_pkt_stats(de, pkt, 0); } + dvr_notify(de); + packets++; break; case SMT_MPEGTS: dvr_rec_set_state(de, !epg_running ? DVR_RS_EPG_WAIT : DVR_RS_RUNNING, 0); - if(started) { - if (!epg_running) { - if (packets) { - dvr_streaming_restart(de, &run); - packets = 0; - started = 0; - } - break; + + if (!started) + break; + + if (!epg_running) { + if (packets) { + dvr_streaming_restart(de, &run); + packets = 0; + started = 0; } - muxer_write_pkt(prch->prch_muxer, sm->sm_type, sm->sm_data); - sm->sm_data = NULL; - dvr_notify(de); - packets++; + break; } - break; - case SMT_START: - packets = 0; - if(started && - muxer_reconfigure(prch->prch_muxer, sm->sm_data) < 0) { - tvhlog(LOG_WARNING, - "dvr", "Unable to reconfigure \"%s\"", - dvr_get_filename(de) ?: lang_str_get(de->de_title, NULL)); + if (muxing == 0 && + !dvr_thread_rec_start(de, started, ss, &run, &dts_offset, + epg_running, postproc)) + break; - // Try to restart the recording if the muxer doesn't - // support reconfiguration of the streams. - dvr_thread_epilog(de, postproc); - started = 0; - if (epg_running) { - if (!dvr_thread_global_lock(de, &run)) - break; - if (de->de_config->dvr_clone) - de = dvr_entry_clone(de); - dvr_thread_global_unlock(de); - } + muxing = 1; + while ((sm2 = TAILQ_FIRST(&backlog)) != NULL) { + TAILQ_REMOVE(&backlog, sm2, sm_link); + dvr_thread_mpegts_stats(de, sm2->sm_data); + muxer_write_pkt(prch->prch_muxer, sm2->sm_type, sm2->sm_data); + sm2->sm_data = NULL; + streaming_msg_free(sm2); } + dvr_thread_mpegts_stats(de, sm->sm_data); + muxer_write_pkt(prch->prch_muxer, sm->sm_type, sm->sm_data); + sm->sm_data = NULL; + dvr_notify(de); + packets++; + break; - if(!started) { - if (!dvr_thread_global_lock(de, &run)) - break; - dvr_rec_set_state(de, DVR_RS_WAIT_PROGRAM_START, 0); - if(dvr_rec_start(de, sm->sm_data) == 0) - started = 1; - else - dvr_stop_recording(de, SM_CODE_INVALID_TARGET, 1, 0); - dvr_thread_global_unlock(de); - } + case SMT_START: + start_time = dispatch_clock; + packets = 0; + started = 1; + ss = streaming_start_copy((streaming_start_t *)sm->sm_data); break; case SMT_STOP: - if(sm->sm_code == SM_CODE_SOURCE_RECONFIGURED) { + if (sm->sm_code == SM_CODE_SOURCE_RECONFIGURED) { // Subscription is restarting, wait for SMT_START } else if(sm->sm_code == 0) { @@ -1232,27 +1360,32 @@ dvr_thread(void *aux) "dvr", "Recording completed: \"%s\"", dvr_get_filename(de) ?: lang_str_get(de->de_title, NULL)); - dvr_thread_epilog(de, postproc); - started = 0; + goto fin; - }else if(de->de_last_error != sm->sm_code) { + } else if (de->de_last_error != sm->sm_code) { // Error during recording - dvr_rec_set_state(de, DVR_RS_ERROR, sm->sm_code); - tvhlog(LOG_ERR, - "dvr", "Recording stopped: \"%s\": %s", - dvr_get_filename(de) ?: lang_str_get(de->de_title, NULL), - streaming_code2txt(sm->sm_code)); + dvr_rec_set_state(de, DVR_RS_ERROR, sm->sm_code); + tvhlog(LOG_ERR, + "dvr", "Recording stopped: \"%s\": %s", + dvr_get_filename(de) ?: lang_str_get(de->de_title, NULL), + streaming_code2txt(sm->sm_code)); - dvr_thread_epilog(de, postproc); - started = 0; +fin: + dvr_thread_backlog_free(&backlog); + dvr_thread_epilog(de, postproc); + start_time = 0; + started = 0; + muxing = 0; + streaming_start_unref(ss); + ss = NULL; } break; case SMT_SERVICE_STATUS: - if(sm->sm_code & TSS_PACKETS) { + if (sm->sm_code & TSS_PACKETS) { - } else if(sm->sm_code & TSS_ERRORS) { + } else if (sm->sm_code & TSS_ERRORS) { int code = SM_CODE_UNDEFINED_ERROR; @@ -1274,7 +1407,7 @@ dvr_thread(void *aux) case SMT_NOSTART: - if(de->de_last_error != sm->sm_code) { + if (de->de_last_error != sm->sm_code) { dvr_rec_set_state(de, DVR_RS_PENDING, sm->sm_code); tvhlog(LOG_ERR, @@ -1303,7 +1436,9 @@ dvr_thread(void *aux) } pthread_mutex_unlock(&sq->sq_mutex); - if(prch->prch_muxer) + dvr_thread_backlog_free(&backlog); + + if (prch->prch_muxer) dvr_thread_epilog(de, postproc); free(postproc);