From: Jaroslav Kysela Date: Sun, 3 Jan 2016 12:54:52 +0000 (+0100) Subject: timeshift: merge packet log code to one fcn, many fixes X-Git-Tag: v4.2.1~1238 X-Git-Url: http://git.ipfire.org/cgi-bin/gitweb.cgi?a=commitdiff_plain;h=385b39549f23878789788b54cb3ec8dacec28d12;p=thirdparty%2Ftvheadend.git timeshift: merge packet log code to one fcn, many fixes --- diff --git a/src/timeshift.c b/src/timeshift.c index 1ad81716c..bdb0673ee 100644 --- a/src/timeshift.c +++ b/src/timeshift.c @@ -38,6 +38,27 @@ static int timeshift_index = 0; struct timeshift_conf timeshift_conf; +/* + * Packet log + */ +void +timeshift_packet_log0 + ( const char *source, timeshift_t *ts, streaming_message_t *sm ) +{ + th_pkt_t *pkt = sm->sm_data; + tvhtrace("timeshift", + "ts %d pkt %s - stream %d type %c pts %10"PRId64 + " dts %10"PRId64" dur %10d len %6zu time %14"PRId64, + ts->id, source, + pkt->pkt_componentindex, + pkt_frametype_to_char(pkt->pkt_frametype), + ts_rescale(pkt->pkt_pts, 1000000), + ts_rescale(pkt->pkt_dts, 1000000), + pkt->pkt_duration, + pktbuf_len(pkt->pkt_payload), + sm->sm_time); +} + /* * Safe values for RAM configuration */ @@ -214,23 +235,6 @@ const idclass_t timeshift_conf_class = { #define MAX_TIME_DELTA (2*1000000) /* 2 seconds */ #define BACKLOG_COUNT ARRAY_SIZE(timeshift_t->backlog) -static void -timeshift_packet_deliver ( timeshift_t *ts, streaming_message_t *sm ) -{ - th_pkt_t *pkt = sm->sm_data; - tvhtrace("timeshift", - "ts %d pkt buf - stream %d type %c pts %10"PRId64 - " dts %10"PRId64" dur %10d len %6zu time %14"PRId64, - ts->id, - pkt->pkt_componentindex, - pkt_frametype_to_char(pkt->pkt_frametype), - ts_rescale(pkt->pkt_pts, 1000000), - ts_rescale(pkt->pkt_dts, 1000000), - pkt->pkt_duration, - pktbuf_len(pkt->pkt_payload), - sm->sm_time); - streaming_target_deliver2(&ts->wr_queue.sq_st, sm); -} static void timeshift_packet_flush ( timeshift_t *ts, int64_t time ) @@ -251,7 +255,9 @@ timeshift_packet_flush ( timeshift_t *ts, int64_t time ) if (!lowest) break; TAILQ_REMOVE(sq, lowest, sm_link); - timeshift_packet_deliver(ts, lowest); + ts->last_wr_time = lowest->sm_time; + timeshift_packet_log("wr ", ts, lowest); + streaming_target_deliver2(&ts->wr_queue.sq_st, lowest); } } @@ -274,7 +280,9 @@ timeshift_packet( timeshift_t *ts, th_pkt_t *pkt ) sm->sm_time = time; if (time + MAX_TIME_DELTA < ts->last_time) { - timeshift_packet_deliver(ts, sm); + ts->last_wr_time = time; + timeshift_packet_log("wr2", ts, sm); + streaming_target_deliver2(&ts->wr_queue.sq_st, sm); } else { if (pkt->pkt_componentindex >= ts->backlog_max) ts->backlog_max = pkt->pkt_componentindex + 1; @@ -323,10 +331,11 @@ static void timeshift_input /* Send to the writer thread */ if (ts->packet_mode) { - sm->sm_time = ts->last_time; + sm->sm_time = ts->last_wr_time; if (type == SMT_PACKET) { timeshift_packet(ts, pkt); - goto msg_free; + streaming_msg_free(sm); + goto _exit; } } else { if (ts->ref_time == 0) { @@ -337,10 +346,9 @@ static void timeshift_input } } streaming_target_deliver2(&ts->wr_queue.sq_st, sm); -msg_free: - streaming_msg_free(sm); /* Exit/Stop */ +_exit: if (ts->exit) timeshift_write_exit(ts->rd_pipe.wr); } @@ -417,6 +425,7 @@ streaming_target_t *timeshift_create ts->dobuf = ts->ondemand ? 0 : 1; ts->packet_mode= 1; ts->last_time = 0; + ts->last_wr_time = 0; ts->buf_time = 0; ts->start_pts = 0; ts->ref_time = 0; diff --git a/src/timeshift/private.h b/src/timeshift/private.h index b094289cd..f668060a1 100644 --- a/src/timeshift/private.h +++ b/src/timeshift/private.h @@ -19,7 +19,7 @@ #ifndef __TVH_TIMESHIFT_PRIVATE_H__ #define __TVH_TIMESHIFT_PRIVATE_H__ -#define TIMESHIFT_PLAY_BUF 500000 //< us to buffer in TX +#define TIMESHIFT_PLAY_BUF 1000000 //< us to buffer in TX #define TIMESHIFT_FILE_PERIOD 60 //< number of secs in each buffer file #define TIMESHIFT_BACKLOG_MAX 16 //< maximum elementary streams @@ -94,6 +94,7 @@ typedef struct timeshift { int packet_mode;///< Packet mode (otherwise MPEG-TS data mode) int dobuf; ///< Buffer packets (store) int64_t last_time; ///< Last time in us (PTS conversion) + int64_t last_wr_time;///< Last write time in us (PTS conversion) int64_t start_pts; ///< Start time for packets (PTS) int64_t ref_time; ///< Start time in us (monoclock) int64_t buf_time; ///< Last buffered time in us (PTS conversion) @@ -128,6 +129,16 @@ typedef struct timeshift { extern uint64_t timeshift_total_size; extern uint64_t timeshift_total_ram_size; +void timeshift_packet_log0 + ( const char *prefix, timeshift_t *ts, streaming_message_t *sm ); + +static inline void timeshift_packet_log + ( const char *prefix, timeshift_t *ts, streaming_message_t *sm ) +{ + if (sm->sm_type == SMT_PACKET && tvhtrace_enabled()) + timeshift_packet_log0(prefix, ts, sm); +} + /* * Write functions */ @@ -141,9 +152,6 @@ ssize_t timeshift_write_stop ( int fd, int code ); ssize_t timeshift_write_exit ( int fd ); ssize_t timeshift_write_eof ( timeshift_file_t *tsf ); -void timeshift_writer_flush ( timeshift_t *ts ); -void timeshift_writer_clone ( timeshift_t *ts, struct streaming_message_queue *dst ); - /* * Threads */ diff --git a/src/timeshift/timeshift_reader.c b/src/timeshift/timeshift_reader.c index 1b0a19e84..0668e5dd5 100644 --- a/src/timeshift/timeshift_reader.c +++ b/src/timeshift/timeshift_reader.c @@ -413,25 +413,6 @@ static int _timeshift_read return 0; } -/* - * Trace packet - */ -static void timeshift_trace_pkt - ( timeshift_t *ts, streaming_message_t *sm ) -{ - th_pkt_t *pkt = sm->sm_data; - tvhtrace("timeshift", - "ts %d pkt out - stream %d type %c pts %10"PRId64 - " dts %10"PRId64 " dur %10d len %6zu time %14"PRId64, - ts->id, - pkt->pkt_componentindex, - pkt_frametype_to_char(pkt->pkt_frametype), - ts_rescale(pkt->pkt_pts, 1000000), - ts_rescale(pkt->pkt_dts, 1000000), - pkt->pkt_duration, - pktbuf_len(pkt->pkt_payload), sm->sm_time); -} - /* * Flush all data to live */ @@ -444,8 +425,7 @@ static int _timeshift_flush_to_live if (_timeshift_read(ts, cur_file, &sm, wait) == -1) return -1; if (!sm) break; - if (tvhtrace_enabled() && sm->sm_type == SMT_PACKET) - timeshift_trace_pkt(ts, sm); + timeshift_packet_log("ouf", ts, sm); streaming_target_deliver2(ts->output, sm); } return 0; @@ -503,7 +483,7 @@ static void timeshift_status void *timeshift_reader ( void *p ) { timeshift_t *ts = p; - int nfds, end, run = 1, wait = -1; + int nfds, end, run = 1, wait = -1, state; timeshift_file_t *cur_file = NULL, *tmp_file; int cur_speed = 100, keyframe_mode = 0; int64_t mono_now, mono_play_time = 0, mono_last_status = 0; @@ -567,7 +547,8 @@ void *timeshift_reader ( void *p ) if (cur_speed != speed) { /* Live playback */ - if (ts->state == TS_LIVE) { + state = ts->state; + if (state == TS_LIVE) { /* Reject */ if (speed >= 100) { @@ -579,7 +560,6 @@ void *timeshift_reader ( void *p ) } else { tvhlog(LOG_DEBUG, "timeshift", "ts %d enter timeshift mode", ts->id); - timeshift_writer_flush(ts); ts->dobuf = 1; tmp_file = timeshift_filemgr_newest(ts); if (tmp_file != NULL) { @@ -613,14 +593,15 @@ void *timeshift_reader ( void *p ) /* Update */ cur_speed = speed; - if (speed != 100 || ts->state != TS_LIVE) { + if (speed != 100 || state != TS_LIVE) { ts->state = speed == 0 ? TS_PAUSE : TS_PLAY; tvhtrace("timeshift", "reader - set %s", speed == 0 ? "TS_PAUSE" : "TS_PLAY"); } - if (ts->state == TS_PLAY) { + if (ts->state == TS_PLAY && state != TS_PLAY) { mono_play_time = mono_now; - tvhtrace("timeshift", "update play time TS_LIVE - %"PRId64" play buffer from %"PRId64, mono_now, pause_time); - } else if (ts->state == TS_PAUSE) { + tvhtrace("timeshift", "update play time TS_LIVE - %"PRId64" play buffer from %"PRId64, + mono_now, pause_time); + } else if (ts->state == TS_PAUSE && state != TS_PAUSE) { pause_time = last_time; } tvhlog(LOG_DEBUG, "timeshift", "ts %d change speed %d", ts->id, speed); @@ -815,8 +796,7 @@ void *timeshift_reader ( void *p ) ((cur_speed > 0) && (sm->sm_time <= deliver))))) { last_time = sm->sm_time; - if (sm->sm_type == SMT_PACKET && tvhtrace_enabled()) - timeshift_trace_pkt(ts, sm); + timeshift_packet_log("out", ts, sm); streaming_target_deliver2(ts->output, sm); sm = NULL; wait = 0; diff --git a/src/timeshift/timeshift_writer.c b/src/timeshift/timeshift_writer.c index ac9dc298a..89e30d2b0 100644 --- a/src/timeshift/timeshift_writer.c +++ b/src/timeshift/timeshift_writer.c @@ -253,15 +253,13 @@ static void _handle_sstart ( timeshift_t *ts, timeshift_file_t *tsf, streaming_m * *************************************************************************/ static inline ssize_t _process_msg0 - ( timeshift_t *ts, timeshift_file_t *tsf, streaming_message_t **smp ) + ( timeshift_t *ts, timeshift_file_t *tsf, streaming_message_t *sm ) { ssize_t err; - streaming_message_t *sm = *smp; if (sm->sm_type == SMT_START) { err = 0; - _handle_sstart(ts, tsf, sm); - *smp = NULL; + _handle_sstart(ts, tsf, streaming_msg_clone(sm)); } else if (sm->sm_type == SMT_SIGNAL_STATUS) err = timeshift_write_sigstat(tsf, sm->sm_time, sm->sm_data); else if (sm->sm_type == SMT_PACKET) { @@ -307,7 +305,7 @@ static void _process_msg /* Terminate */ case SMT_EXIT: if (run) *run = 0; - goto live; + break; case SMT_STOP: if (sm->sm_code != SM_CODE_SOURCE_RECONFIGURED && run) *run = 0; @@ -334,14 +332,19 @@ static void _process_msg case SMT_PACKET: pthread_mutex_lock(&ts->state_mutex); ts->buf_time = sm->sm_time; - if (ts->state == TS_LIVE) + if (ts->state == TS_LIVE) { streaming_target_deliver2(ts->output, streaming_msg_clone(sm)); + if (sm->sm_type == SMT_PACKET) + timeshift_packet_log("liv", ts, sm); + } if (ts->dobuf) { if ((tsf = timeshift_filemgr_get(ts, sm->sm_time)) && (tsf->wfd >= 0 || tsf->ram)) { - if ((err = _process_msg0(ts, tsf, &sm)) < 0) { + if ((err = _process_msg0(ts, tsf, sm)) < 0) { timeshift_filemgr_close(tsf); tsf->bad = 1; ts->full = 1; ///< Stop any more writing + } else { + timeshift_packet_log("sav", ts, sm); } tsf->refcount--; } @@ -352,6 +355,7 @@ static void _process_msg /* Next */ streaming_msg_free(sm); + return; live: pthread_mutex_lock(&ts->state_mutex); @@ -388,34 +392,3 @@ void *timeshift_writer ( void *aux ) pthread_mutex_unlock(&sq->sq_mutex); return NULL; } - -/* ************************************************************************** - * Utilities - * *************************************************************************/ - -void timeshift_writer_flush ( timeshift_t *ts ) - -{ - streaming_message_t *sm; - streaming_queue_t *sq = &ts->wr_queue; - - pthread_mutex_lock(&sq->sq_mutex); - while ((sm = TAILQ_FIRST(&sq->sq_queue))) { - streaming_queue_remove(sq, sm); - _process_msg(ts, sm, NULL); - } - pthread_mutex_unlock(&sq->sq_mutex); -} - -void timeshift_writer_clone ( timeshift_t *ts, struct streaming_message_queue *dst ) -{ - streaming_message_t *sm, *sm2; - streaming_queue_t *sq = &ts->wr_queue; - - pthread_mutex_lock(&sq->sq_mutex); - TAILQ_FOREACH(sm, &sq->sq_queue, sm_link) { - sm2 = streaming_msg_clone(sm); - TAILQ_INSERT_TAIL(dst, sm2, sm_link); - } - pthread_mutex_unlock(&sq->sq_mutex); -}