struct timeshift_conf timeshift_conf;
+/*
+ * Packet log
+ */
+void
+timeshift_packet_log0
+ ( const char *source, timeshift_t *ts, streaming_message_t *sm )
+{
+ th_pkt_t *pkt = sm->sm_data;
+ tvhtrace("timeshift",
+ "ts %d pkt %s - stream %d type %c pts %10"PRId64
+ " dts %10"PRId64" dur %10d len %6zu time %14"PRId64,
+ ts->id, source,
+ pkt->pkt_componentindex,
+ pkt_frametype_to_char(pkt->pkt_frametype),
+ ts_rescale(pkt->pkt_pts, 1000000),
+ ts_rescale(pkt->pkt_dts, 1000000),
+ pkt->pkt_duration,
+ pktbuf_len(pkt->pkt_payload),
+ sm->sm_time);
+}
+
/*
* Safe values for RAM configuration
*/
#define MAX_TIME_DELTA (2*1000000) /* 2 seconds */
#define BACKLOG_COUNT ARRAY_SIZE(timeshift_t->backlog)
-static void
-timeshift_packet_deliver ( timeshift_t *ts, streaming_message_t *sm )
-{
- th_pkt_t *pkt = sm->sm_data;
- tvhtrace("timeshift",
- "ts %d pkt buf - stream %d type %c pts %10"PRId64
- " dts %10"PRId64" dur %10d len %6zu time %14"PRId64,
- ts->id,
- pkt->pkt_componentindex,
- pkt_frametype_to_char(pkt->pkt_frametype),
- ts_rescale(pkt->pkt_pts, 1000000),
- ts_rescale(pkt->pkt_dts, 1000000),
- pkt->pkt_duration,
- pktbuf_len(pkt->pkt_payload),
- sm->sm_time);
- streaming_target_deliver2(&ts->wr_queue.sq_st, sm);
-}
static void
timeshift_packet_flush ( timeshift_t *ts, int64_t time )
if (!lowest)
break;
TAILQ_REMOVE(sq, lowest, sm_link);
- timeshift_packet_deliver(ts, lowest);
+ ts->last_wr_time = lowest->sm_time;
+ timeshift_packet_log("wr ", ts, lowest);
+ streaming_target_deliver2(&ts->wr_queue.sq_st, lowest);
}
}
sm->sm_time = time;
if (time + MAX_TIME_DELTA < ts->last_time) {
- timeshift_packet_deliver(ts, sm);
+ ts->last_wr_time = time;
+ timeshift_packet_log("wr2", ts, sm);
+ streaming_target_deliver2(&ts->wr_queue.sq_st, sm);
} else {
if (pkt->pkt_componentindex >= ts->backlog_max)
ts->backlog_max = pkt->pkt_componentindex + 1;
/* Send to the writer thread */
if (ts->packet_mode) {
- sm->sm_time = ts->last_time;
+ sm->sm_time = ts->last_wr_time;
if (type == SMT_PACKET) {
timeshift_packet(ts, pkt);
- goto msg_free;
+ streaming_msg_free(sm);
+ goto _exit;
}
} else {
if (ts->ref_time == 0) {
}
}
streaming_target_deliver2(&ts->wr_queue.sq_st, sm);
-msg_free:
- streaming_msg_free(sm);
/* Exit/Stop */
+_exit:
if (ts->exit)
timeshift_write_exit(ts->rd_pipe.wr);
}
ts->dobuf = ts->ondemand ? 0 : 1;
ts->packet_mode= 1;
ts->last_time = 0;
+ ts->last_wr_time = 0;
ts->buf_time = 0;
ts->start_pts = 0;
ts->ref_time = 0;
#ifndef __TVH_TIMESHIFT_PRIVATE_H__
#define __TVH_TIMESHIFT_PRIVATE_H__
-#define TIMESHIFT_PLAY_BUF 500000 //< us to buffer in TX
+#define TIMESHIFT_PLAY_BUF 1000000 //< us to buffer in TX
#define TIMESHIFT_FILE_PERIOD 60 //< number of secs in each buffer file
#define TIMESHIFT_BACKLOG_MAX 16 //< maximum elementary streams
int packet_mode;///< Packet mode (otherwise MPEG-TS data mode)
int dobuf; ///< Buffer packets (store)
int64_t last_time; ///< Last time in us (PTS conversion)
+ int64_t last_wr_time;///< Last write time in us (PTS conversion)
int64_t start_pts; ///< Start time for packets (PTS)
int64_t ref_time; ///< Start time in us (monoclock)
int64_t buf_time; ///< Last buffered time in us (PTS conversion)
extern uint64_t timeshift_total_size;
extern uint64_t timeshift_total_ram_size;
+void timeshift_packet_log0
+ ( const char *prefix, timeshift_t *ts, streaming_message_t *sm );
+
+static inline void timeshift_packet_log
+ ( const char *prefix, timeshift_t *ts, streaming_message_t *sm )
+{
+ if (sm->sm_type == SMT_PACKET && tvhtrace_enabled())
+ timeshift_packet_log0(prefix, ts, sm);
+}
+
/*
* Write functions
*/
ssize_t timeshift_write_exit ( int fd );
ssize_t timeshift_write_eof ( timeshift_file_t *tsf );
-void timeshift_writer_flush ( timeshift_t *ts );
-void timeshift_writer_clone ( timeshift_t *ts, struct streaming_message_queue *dst );
-
/*
* Threads
*/
return 0;
}
-/*
- * Trace packet
- */
-static void timeshift_trace_pkt
- ( timeshift_t *ts, streaming_message_t *sm )
-{
- th_pkt_t *pkt = sm->sm_data;
- tvhtrace("timeshift",
- "ts %d pkt out - stream %d type %c pts %10"PRId64
- " dts %10"PRId64 " dur %10d len %6zu time %14"PRId64,
- ts->id,
- pkt->pkt_componentindex,
- pkt_frametype_to_char(pkt->pkt_frametype),
- ts_rescale(pkt->pkt_pts, 1000000),
- ts_rescale(pkt->pkt_dts, 1000000),
- pkt->pkt_duration,
- pktbuf_len(pkt->pkt_payload), sm->sm_time);
-}
-
/*
* Flush all data to live
*/
if (_timeshift_read(ts, cur_file, &sm, wait) == -1)
return -1;
if (!sm) break;
- if (tvhtrace_enabled() && sm->sm_type == SMT_PACKET)
- timeshift_trace_pkt(ts, sm);
+ timeshift_packet_log("ouf", ts, sm);
streaming_target_deliver2(ts->output, sm);
}
return 0;
void *timeshift_reader ( void *p )
{
timeshift_t *ts = p;
- int nfds, end, run = 1, wait = -1;
+ int nfds, end, run = 1, wait = -1, state;
timeshift_file_t *cur_file = NULL, *tmp_file;
int cur_speed = 100, keyframe_mode = 0;
int64_t mono_now, mono_play_time = 0, mono_last_status = 0;
if (cur_speed != speed) {
/* Live playback */
- if (ts->state == TS_LIVE) {
+ state = ts->state;
+ if (state == TS_LIVE) {
/* Reject */
if (speed >= 100) {
} else {
tvhlog(LOG_DEBUG, "timeshift", "ts %d enter timeshift mode",
ts->id);
- timeshift_writer_flush(ts);
ts->dobuf = 1;
tmp_file = timeshift_filemgr_newest(ts);
if (tmp_file != NULL) {
/* Update */
cur_speed = speed;
- if (speed != 100 || ts->state != TS_LIVE) {
+ if (speed != 100 || state != TS_LIVE) {
ts->state = speed == 0 ? TS_PAUSE : TS_PLAY;
tvhtrace("timeshift", "reader - set %s", speed == 0 ? "TS_PAUSE" : "TS_PLAY");
}
- if (ts->state == TS_PLAY) {
+ if (ts->state == TS_PLAY && state != TS_PLAY) {
mono_play_time = mono_now;
- tvhtrace("timeshift", "update play time TS_LIVE - %"PRId64" play buffer from %"PRId64, mono_now, pause_time);
- } else if (ts->state == TS_PAUSE) {
+ tvhtrace("timeshift", "update play time TS_LIVE - %"PRId64" play buffer from %"PRId64,
+ mono_now, pause_time);
+ } else if (ts->state == TS_PAUSE && state != TS_PAUSE) {
pause_time = last_time;
}
tvhlog(LOG_DEBUG, "timeshift", "ts %d change speed %d", ts->id, speed);
((cur_speed > 0) && (sm->sm_time <= deliver))))) {
last_time = sm->sm_time;
- if (sm->sm_type == SMT_PACKET && tvhtrace_enabled())
- timeshift_trace_pkt(ts, sm);
+ timeshift_packet_log("out", ts, sm);
streaming_target_deliver2(ts->output, sm);
sm = NULL;
wait = 0;
* *************************************************************************/
static inline ssize_t _process_msg0
- ( timeshift_t *ts, timeshift_file_t *tsf, streaming_message_t **smp )
+ ( timeshift_t *ts, timeshift_file_t *tsf, streaming_message_t *sm )
{
ssize_t err;
- streaming_message_t *sm = *smp;
if (sm->sm_type == SMT_START) {
err = 0;
- _handle_sstart(ts, tsf, sm);
- *smp = NULL;
+ _handle_sstart(ts, tsf, streaming_msg_clone(sm));
} else if (sm->sm_type == SMT_SIGNAL_STATUS)
err = timeshift_write_sigstat(tsf, sm->sm_time, sm->sm_data);
else if (sm->sm_type == SMT_PACKET) {
/* Terminate */
case SMT_EXIT:
if (run) *run = 0;
- goto live;
+ break;
case SMT_STOP:
if (sm->sm_code != SM_CODE_SOURCE_RECONFIGURED && run)
*run = 0;
case SMT_PACKET:
pthread_mutex_lock(&ts->state_mutex);
ts->buf_time = sm->sm_time;
- if (ts->state == TS_LIVE)
+ if (ts->state == TS_LIVE) {
streaming_target_deliver2(ts->output, streaming_msg_clone(sm));
+ if (sm->sm_type == SMT_PACKET)
+ timeshift_packet_log("liv", ts, sm);
+ }
if (ts->dobuf) {
if ((tsf = timeshift_filemgr_get(ts, sm->sm_time)) && (tsf->wfd >= 0 || tsf->ram)) {
- if ((err = _process_msg0(ts, tsf, &sm)) < 0) {
+ if ((err = _process_msg0(ts, tsf, sm)) < 0) {
timeshift_filemgr_close(tsf);
tsf->bad = 1;
ts->full = 1; ///< Stop any more writing
+ } else {
+ timeshift_packet_log("sav", ts, sm);
}
tsf->refcount--;
}
/* Next */
streaming_msg_free(sm);
+ return;
live:
pthread_mutex_lock(&ts->state_mutex);
pthread_mutex_unlock(&sq->sq_mutex);
return NULL;
}
-
-/* **************************************************************************
- * Utilities
- * *************************************************************************/
-
-void timeshift_writer_flush ( timeshift_t *ts )
-
-{
- streaming_message_t *sm;
- streaming_queue_t *sq = &ts->wr_queue;
-
- pthread_mutex_lock(&sq->sq_mutex);
- while ((sm = TAILQ_FIRST(&sq->sq_queue))) {
- streaming_queue_remove(sq, sm);
- _process_msg(ts, sm, NULL);
- }
- pthread_mutex_unlock(&sq->sq_mutex);
-}
-
-void timeshift_writer_clone ( timeshift_t *ts, struct streaming_message_queue *dst )
-{
- streaming_message_t *sm, *sm2;
- streaming_queue_t *sq = &ts->wr_queue;
-
- pthread_mutex_lock(&sq->sq_mutex);
- TAILQ_FOREACH(sm, &sq->sq_queue, sm_link) {
- sm2 = streaming_msg_clone(sm);
- TAILQ_INSERT_TAIL(dst, sm2, sm_link);
- }
- pthread_mutex_unlock(&sq->sq_mutex);
-}