}
}
+/**
+ *
+ */
+static int
+dvr_thread_pkt_stats(dvr_entry_t *de, th_pkt_t *pkt, int payload)
+{
+ th_subscription_t *ts;
+ int ret = 0;
+
+ if ((ts = de->de_s) != NULL) {
+ if (pkt->pkt_err) {
+ de->de_data_errors += pkt->pkt_err;
+ ret = 1;
+ }
+ if (payload && pkt->pkt_payload)
+ subscription_add_bytes_out(ts, pktbuf_len(pkt->pkt_payload));
+ }
+ return ret;
+}
+
+/**
+ *
+ */
+static int
+dvr_thread_mpegts_stats(dvr_entry_t *de, void *sm_data)
+{
+ th_subscription_t *ts;
+ pktbuf_t *pb = sm_data;
+ int ret;
+
+ if (pb == NULL)
+ return 0;
+ if ((ts = de->de_s) != NULL) {
+ if (pb->pb_err) {
+ de->de_data_errors += pb->pb_err;
+ ret = 1;
+ }
+ subscription_add_bytes_out(ts, pktbuf_len(pb));
+ }
+ return ret;
+}
+
+/**
+ *
+ */
+static int
+dvr_thread_rec_start(dvr_entry_t *de, int started,
+ streaming_start_t *ss, int *run,
+ int64_t *dts_offset,
+ int epg_running, const char *postproc)
+{
+ profile_chain_t *prch = de->de_chain;
+ int ret = 0;
+
+ if (started &&
+ muxer_reconfigure(prch->prch_muxer, ss) < 0) {
+ tvhlog(LOG_WARNING,
+ "dvr", "Unable to reconfigure \"%s\"",
+ dvr_get_filename(de) ?: lang_str_get(de->de_title, NULL));
+
+ // Try to restart the recording if the muxer doesn't
+ // support reconfiguration of the streams.
+ dvr_thread_epilog(de, postproc);
+ started = 0;
+ *dts_offset = PTS_UNSET;
+ if (epg_running) {
+ if (!dvr_thread_global_lock(de, run))
+ return 0;
+ if (de->de_config->dvr_clone)
+ de = dvr_entry_clone(de);
+ dvr_thread_global_unlock(de);
+ }
+ }
+
+ if (!started) {
+ if (!dvr_thread_global_lock(de, run))
+ return 0;
+ dvr_rec_set_state(de, DVR_RS_WAIT_PROGRAM_START, 0);
+ if(dvr_rec_start(de, ss) == 0)
+ ret = 1;
+ else
+ dvr_stop_recording(de, SM_CODE_INVALID_TARGET, 1, 0);
+ dvr_thread_global_unlock(de);
+ }
+ return ret;
+}
+
+/**
+ *
+ */
+static void
+dvr_thread_backlog_free(struct streaming_message_queue *backlog)
+{
+ streaming_message_t *sm;
+ while ((sm = TAILQ_FIRST(backlog)) != NULL) {
+ TAILQ_REMOVE(backlog, sm, sm_link);
+ streaming_msg_free(sm);
+ }
+}
+
/**
*
*/
dvr_entry_t *de = aux;
profile_chain_t *prch = de->de_chain;
streaming_queue_t *sq = &prch->prch_sq;
- streaming_message_t *sm;
- th_subscription_t *ts;
- th_pkt_t *pkt;
- int run = 1, started = 0, comm_skip, epg_running, rs;
+ struct streaming_message_queue backlog;
+ streaming_message_t *sm, *sm2;
+ th_pkt_t *pkt, *pkt2, *pkt3;
+ streaming_start_t *ss = NULL;
+ int run = 1, started = 0, muxing = 0, comm_skip, epg_running = 0, rs;
int commercial = COMMERCIAL_UNKNOWN;
- int64_t packets = 0;
+ int64_t packets = 0, dts_offset = PTS_UNSET;
+ time_t start_time = 0;
char *postproc;
if (!dvr_thread_global_lock(de, &run))
postproc = de->de_config->dvr_postproc ? strdup(de->de_config->dvr_postproc) : NULL;
dvr_thread_global_unlock(de);
- pthread_mutex_lock(&sq->sq_mutex);
+ TAILQ_INIT(&backlog);
+ pthread_mutex_lock(&sq->sq_mutex);
while(run) {
sm = TAILQ_FIRST(&sq->sq_queue);
if(sm == NULL) {
pthread_cond_wait(&sq->sq_cond, &sq->sq_mutex);
continue;
}
+ streaming_queue_remove(sq, sm);
- if ((ts = de->de_s) != NULL && started) {
- pktbuf_t *pb = NULL;
- if (sm->sm_type == SMT_PACKET) {
- pb = ((th_pkt_t*)sm->sm_data)->pkt_payload;
- if (((th_pkt_t*)sm->sm_data)->pkt_err) {
- de->de_data_errors += ((th_pkt_t*)sm->sm_data)->pkt_err;
- dvr_notify(de);
- }
- }
- else if (sm->sm_type == SMT_MPEGTS) {
- pb = sm->sm_data;
- if (pb->pb_err) {
- de->de_data_errors += pb->pb_err;
- dvr_notify(de);
+ if (sm->sm_type == SMT_PACKET || sm->sm_type == SMT_MPEGTS) {
+ if (de->de_running_start > de->de_running_stop) {
+ epg_running = 1;
+ } else if (de->de_running_start == 0 && de->de_running_stop == 0) {
+ if (start_time + 2 >= dispatch_clock) {
+ epg_running = 0;
+ TAILQ_INSERT_TAIL(&backlog, sm, sm_link);
+ continue;
+ } else {
+ if (TAILQ_FIRST(&backlog))
+ dvr_thread_backlog_free(&backlog);
+ epg_running = 1;
}
+ } else {
+ epg_running = 0;
}
- if (pb)
- subscription_add_bytes_out(ts, pktbuf_len(pb));
}
- streaming_queue_remove(sq, sm);
-
- epg_running = de->de_running_start > de->de_running_stop ||
- (de->de_running_start == 0 && de->de_running_stop == 0);
pthread_mutex_unlock(&sq->sq_mutex);
switch(sm->sm_type) {
commercial = pkt->pkt_commercial;
- if (started) {
- muxer_write_pkt(prch->prch_muxer, sm->sm_type, sm->sm_data);
- sm->sm_data = NULL;
- dvr_notify(de);
- packets++;
+ if (!started)
+ break;
+
+ if (muxing == 0 &&
+ !dvr_thread_rec_start(de, started, ss, &run, &dts_offset,
+ epg_running, postproc))
+ break;
+
+ muxing = 1;
+ while ((sm2 = TAILQ_FIRST(&backlog)) != NULL) {
+ TAILQ_REMOVE(&backlog, sm2, sm_link);
+ if (pkt->pkt_dts != PTS_UNSET) {
+ if (dts_offset == PTS_UNSET) {
+ pkt2 = sm2->sm_data;
+ dts_offset = pkt2->pkt_dts;
+ }
+ pkt3 = (th_pkt_t *)sm2->sm_data;
+ if (dts_offset != PTS_UNSET && pkt->pkt_dts >= dts_offset) {
+ pkt3 = pkt_copy_shallow(pkt3);
+ pkt3->pkt_dts -= dts_offset;
+ if (pkt3->pkt_pts != PTS_UNSET)
+ pkt3->pkt_pts -= dts_offset;
+ dvr_thread_pkt_stats(de, pkt3, 1);
+ muxer_write_pkt(prch->prch_muxer, sm2->sm_type, pkt3);
+ } else {
+ dvr_thread_pkt_stats(de, pkt3, 0);
+ }
+ }
+ streaming_msg_free(sm2);
+ }
+ if (dts_offset == PTS_UNSET && pkt->pkt_dts != PTS_UNSET)
+ dts_offset = pkt->pkt_dts;
+ if (pkt->pkt_dts != PTS_UNSET && dts_offset != PTS_UNSET &&
+ pkt->pkt_dts >= dts_offset) {
+ pkt3 = pkt_copy_shallow(pkt);
+ pkt3->pkt_dts -= dts_offset;
+ if (pkt3->pkt_pts != PTS_UNSET)
+ pkt3->pkt_pts -= dts_offset;
+ dvr_thread_pkt_stats(de, pkt3, 1);
+ muxer_write_pkt(prch->prch_muxer, sm->sm_type, pkt3);
+ } else {
+ dvr_thread_pkt_stats(de, pkt, 0);
}
+ dvr_notify(de);
+ packets++;
break;
case SMT_MPEGTS:
dvr_rec_set_state(de, !epg_running ? DVR_RS_EPG_WAIT : DVR_RS_RUNNING, 0);
- if(started) {
- if (!epg_running) {
- if (packets) {
- dvr_streaming_restart(de, &run);
- packets = 0;
- started = 0;
- }
- break;
+
+ if (!started)
+ break;
+
+ if (!epg_running) {
+ if (packets) {
+ dvr_streaming_restart(de, &run);
+ packets = 0;
+ started = 0;
}
- muxer_write_pkt(prch->prch_muxer, sm->sm_type, sm->sm_data);
- sm->sm_data = NULL;
- dvr_notify(de);
- packets++;
+ break;
}
- break;
- case SMT_START:
- packets = 0;
- if(started &&
- muxer_reconfigure(prch->prch_muxer, sm->sm_data) < 0) {
- tvhlog(LOG_WARNING,
- "dvr", "Unable to reconfigure \"%s\"",
- dvr_get_filename(de) ?: lang_str_get(de->de_title, NULL));
+ if (muxing == 0 &&
+ !dvr_thread_rec_start(de, started, ss, &run, &dts_offset,
+ epg_running, postproc))
+ break;
- // Try to restart the recording if the muxer doesn't
- // support reconfiguration of the streams.
- dvr_thread_epilog(de, postproc);
- started = 0;
- if (epg_running) {
- if (!dvr_thread_global_lock(de, &run))
- break;
- if (de->de_config->dvr_clone)
- de = dvr_entry_clone(de);
- dvr_thread_global_unlock(de);
- }
+ muxing = 1;
+ while ((sm2 = TAILQ_FIRST(&backlog)) != NULL) {
+ TAILQ_REMOVE(&backlog, sm2, sm_link);
+ dvr_thread_mpegts_stats(de, sm2->sm_data);
+ muxer_write_pkt(prch->prch_muxer, sm2->sm_type, sm2->sm_data);
+ sm2->sm_data = NULL;
+ streaming_msg_free(sm2);
}
+ dvr_thread_mpegts_stats(de, sm->sm_data);
+ muxer_write_pkt(prch->prch_muxer, sm->sm_type, sm->sm_data);
+ sm->sm_data = NULL;
+ dvr_notify(de);
+ packets++;
+ break;
- if(!started) {
- if (!dvr_thread_global_lock(de, &run))
- break;
- dvr_rec_set_state(de, DVR_RS_WAIT_PROGRAM_START, 0);
- if(dvr_rec_start(de, sm->sm_data) == 0)
- started = 1;
- else
- dvr_stop_recording(de, SM_CODE_INVALID_TARGET, 1, 0);
- dvr_thread_global_unlock(de);
- }
+ case SMT_START:
+ start_time = dispatch_clock;
+ packets = 0;
+ started = 1;
+ ss = streaming_start_copy((streaming_start_t *)sm->sm_data);
break;
case SMT_STOP:
- if(sm->sm_code == SM_CODE_SOURCE_RECONFIGURED) {
+ if (sm->sm_code == SM_CODE_SOURCE_RECONFIGURED) {
// Subscription is restarting, wait for SMT_START
} else if(sm->sm_code == 0) {
"dvr", "Recording completed: \"%s\"",
dvr_get_filename(de) ?: lang_str_get(de->de_title, NULL));
- dvr_thread_epilog(de, postproc);
- started = 0;
+ goto fin;
- }else if(de->de_last_error != sm->sm_code) {
+ } else if (de->de_last_error != sm->sm_code) {
// Error during recording
- dvr_rec_set_state(de, DVR_RS_ERROR, sm->sm_code);
- tvhlog(LOG_ERR,
- "dvr", "Recording stopped: \"%s\": %s",
- dvr_get_filename(de) ?: lang_str_get(de->de_title, NULL),
- streaming_code2txt(sm->sm_code));
+ dvr_rec_set_state(de, DVR_RS_ERROR, sm->sm_code);
+ tvhlog(LOG_ERR,
+ "dvr", "Recording stopped: \"%s\": %s",
+ dvr_get_filename(de) ?: lang_str_get(de->de_title, NULL),
+ streaming_code2txt(sm->sm_code));
- dvr_thread_epilog(de, postproc);
- started = 0;
+fin:
+ dvr_thread_backlog_free(&backlog);
+ dvr_thread_epilog(de, postproc);
+ start_time = 0;
+ started = 0;
+ muxing = 0;
+ streaming_start_unref(ss);
+ ss = NULL;
}
break;
case SMT_SERVICE_STATUS:
- if(sm->sm_code & TSS_PACKETS) {
+ if (sm->sm_code & TSS_PACKETS) {
- } else if(sm->sm_code & TSS_ERRORS) {
+ } else if (sm->sm_code & TSS_ERRORS) {
int code = SM_CODE_UNDEFINED_ERROR;
case SMT_NOSTART:
- if(de->de_last_error != sm->sm_code) {
+ if (de->de_last_error != sm->sm_code) {
dvr_rec_set_state(de, DVR_RS_PENDING, sm->sm_code);
tvhlog(LOG_ERR,
}
pthread_mutex_unlock(&sq->sq_mutex);
- if(prch->prch_muxer)
+ dvr_thread_backlog_free(&backlog);
+
+ if (prch->prch_muxer)
dvr_thread_epilog(de, postproc);
free(postproc);