uint8_t pkt_componentindex;
uint8_t pkt_frametype;
uint8_t pkt_field; // Set if packet is only a half frame (a field)
-#if ENABLE_TIMESHIFT
- uint8_t pkt_delivered;
-#endif
uint8_t pkt_channels;
uint8_t pkt_sri;
}
static void
-timeshift_packet_flush ( timeshift_t *ts, int64_t time, int deliver )
+timeshift_packet_flush ( timeshift_t *ts, int64_t time )
{
streaming_message_t *lowest, *sm;
struct streaming_message_queue *sq;
if (!lowest)
break;
TAILQ_REMOVE(sq, lowest, sm_link);
- if (deliver)
- timeshift_packet_deliver(ts, lowest);
- else
- streaming_msg_free(lowest);
+ timeshift_packet_deliver(ts, lowest);
}
}
static void
-timeshift_packet( timeshift_t *ts, th_pkt_t *pkt, int deliver )
+timeshift_packet( timeshift_t *ts, th_pkt_t *pkt )
{
streaming_message_t *sm;
int64_t time;
time = ts_rescale(pkt->pkt_pts, 1000000);
if (time > ts->last_time) {
ts->last_time = time;
- timeshift_packet_flush(ts, time, deliver);
+ timeshift_packet_flush(ts, time);
}
sm->sm_time = time;
if (time + MAX_TIME_DELTA < ts->last_time) {
- if (deliver)
- timeshift_packet_deliver(ts, sm);
+ timeshift_packet_deliver(ts, sm);
} else {
if (pkt->pkt_componentindex >= ts->backlog_max)
ts->backlog_max = pkt->pkt_componentindex + 1;
TAILQ_INSERT_TAIL(&ts->backlog[pkt->pkt_componentindex], sm, sm_link);
- pkt->pkt_delivered = ts->state <= TS_LIVE;
- }
-}
-
-void
-timeshift_packets_clone
- ( timeshift_t *ts, struct streaming_message_queue *dst, int delivered )
-{
- streaming_message_t *lowest, *sm, *sm2;
- struct streaming_message_queue *sq, *sq2, *backlogs;
- th_pkt_t *pkt;
- int i;
-
- lock_assert(&ts->state_mutex);
-
- /* init temporary queues and copy the backlog data */
- backlogs = alloca(ts->backlog_max * sizeof(*backlogs));
- for (i = 0; i < ts->backlog_max; i++) {
- sq = &backlogs[i];
- sq2 = &ts->backlog[i];
- TAILQ_INIT(sq);
- TAILQ_FOREACH(sm, sq2, sm_link) {
- if (!delivered) {
- pkt = sm->sm_data;
- if (pkt->pkt_delivered)
- continue;
- }
- sm2 = streaming_msg_clone(sm);
- TAILQ_INSERT_TAIL(sq, sm2, sm_link);
- }
- }
- /* push to destination (pts sorted) */
- while (1) {
- lowest = NULL;
- for (i = 0; i < ts->backlog_max; i++) {
- sq = &backlogs[i];
- sm = TAILQ_FIRST(sq);
- if (sm && (lowest == NULL || lowest->sm_time > sm->sm_time))
- lowest = sm;
- }
- if (!lowest)
- break;
- TAILQ_REMOVE(sq, lowest, sm_link);
- TAILQ_INSERT_TAIL(dst, lowest, sm_link);
}
}
static void timeshift_input
( void *opaque, streaming_message_t *sm )
{
- int exit = 0, type = sm->sm_type;
+ int type = sm->sm_type;
timeshift_t *ts = opaque;
th_pkt_t *pkt = sm->sm_data, *pkt2;
- pthread_mutex_lock(&ts->state_mutex);
-
/* Control */
if (type == SMT_SKIP) {
- if (ts->state >= TS_LIVE)
- timeshift_write_skip(ts->rd_pipe.wr, sm->sm_data);
+ timeshift_write_skip(ts->rd_pipe.wr, sm->sm_data);
streaming_msg_free(sm);
} else if (type == SMT_SPEED) {
- if (ts->state >= TS_LIVE)
- timeshift_write_speed(ts->rd_pipe.wr, sm->sm_code);
+ timeshift_write_speed(ts->rd_pipe.wr, sm->sm_code);
streaming_msg_free(sm);
- }
-
- else {
+ } else {
- /* Start */
- if (type == SMT_START && ts->state == TS_INIT)
- ts->state = TS_LIVE;
+ if (type == SMT_START)
+ timeshift_packet_flush(ts, ts->last_time + MAX_TIME_DELTA);
/* Change PTS/DTS offsets */
- if (ts->packet_mode && ts->start_pts && type == SMT_PACKET) {
+ else if (ts->packet_mode && ts->start_pts && type == SMT_PACKET) {
pkt2 = pkt_copy_shallow(pkt);
pkt_ref_dec(pkt);
sm->sm_data = pkt = pkt2;
pkt->pkt_dts += ts->start_pts;
}
- /* Pass-thru */
- if (ts->state <= TS_LIVE) {
- if (type == SMT_START) {
- if (ts->smt_start)
- streaming_start_unref(ts->smt_start);
- ts->smt_start = sm->sm_data;
- atomic_add(&ts->smt_start->ss_refcount, 1);
- if (ts->packet_mode) {
- timeshift_packet_flush(ts, ts->last_time + MAX_TIME_DELTA + 1000, ts->dobuf);
- if (ts->last_time)
- ts->start_pts = ts->last_time + 1000;
- }
- }
- streaming_target_deliver2(ts->output, streaming_msg_clone(sm));
- }
-
/* Check for exit */
- if (type == SMT_EXIT ||
+ else if (type == SMT_EXIT ||
(type == SMT_STOP && sm->sm_code != SM_CODE_SOURCE_RECONFIGURED))
- exit = 1;
+ ts->exit = 1;
- if (type == SMT_MPEGTS)
+ else if (type == SMT_MPEGTS)
ts->packet_mode = 0;
- /* Buffer to disk */
- if ((ts->state > TS_LIVE) || (ts->dobuf && (ts->state == TS_LIVE))) {
- if (ts->packet_mode) {
- sm->sm_time = ts->last_time;
- if (type == SMT_PACKET) {
- timeshift_packet(ts, pkt, 1);
- goto msg_free;
- }
- } else {
- if (ts->ref_time == 0) {
- ts->ref_time = getmonoclock();
- sm->sm_time = 0;
- } else {
- sm->sm_time = getmonoclock() - ts->ref_time;
- }
+ /* Send to the writer thread */
+ if (ts->packet_mode) {
+ sm->sm_time = ts->last_time;
+ if (type == SMT_PACKET) {
+ timeshift_packet(ts, pkt);
+ goto msg_free;
}
- streaming_target_deliver2(&ts->wr_queue.sq_st, sm);
} else {
- if (type == SMT_PACKET) {
- timeshift_packet(ts, pkt, 0);
- tvhtrace("timeshift",
- "ts %d pkt in - stream %d type %c pts %10"PRId64
- " dts %10"PRId64" dur %10d len %6zu",
- ts->id,
- pkt->pkt_componentindex,
- pkt_frametype_to_char(pkt->pkt_frametype),
- ts_rescale(pkt->pkt_pts, 1000000),
- ts_rescale(pkt->pkt_dts, 1000000),
- pkt->pkt_duration,
- pktbuf_len(pkt->pkt_payload));
+ if (ts->ref_time == 0) {
+ ts->ref_time = getmonoclock();
+ sm->sm_time = 0;
+ } else {
+ sm->sm_time = getmonoclock() - ts->ref_time;
}
-msg_free:
- streaming_msg_free(sm);
}
+ streaming_target_deliver2(&ts->wr_queue.sq_st, sm);
+msg_free:
+ streaming_msg_free(sm);
/* Exit/Stop */
- if (exit) {
+ if (ts->exit)
timeshift_write_exit(ts->rd_pipe.wr);
- ts->state = TS_EXIT;
- }
}
-
- pthread_mutex_unlock(&ts->state_mutex);
}
/**
/* Flush files */
timeshift_filemgr_flush(ts, NULL);
- /* Release SMT_START index */
- if (ts->smt_start)
- streaming_start_unref(ts->smt_start);
-
if (ts->path)
free(ts->path);
free(ts);
ts->output = out;
ts->path = NULL;
ts->max_time = max_time;
- ts->state = TS_INIT;
+ ts->state = TS_LIVE;
+ ts->exit = 0;
ts->full = 0;
ts->vididx = -1;
ts->id = timeshift_index;
ts->dobuf = ts->ondemand ? 0 : 1;
ts->packet_mode= 1;
ts->last_time = 0;
+ ts->buf_time = 0;
ts->start_pts = 0;
ts->ref_time = 0;
for (i = 0; i < TIMESHIFT_BACKLOG_MAX; i++)
#ifndef __TVH_TIMESHIFT_PRIVATE_H__
#define __TVH_TIMESHIFT_PRIVATE_H__
-#define TIMESHIFT_PLAY_BUF 2000000 //< us to buffer in TX
+#define TIMESHIFT_PLAY_BUF 500000 //< us to buffer in TX
#define TIMESHIFT_FILE_PERIOD 60 //< number of secs in each buffer file
#define TIMESHIFT_BACKLOG_MAX 16 //< maximum elementary streams
int64_t last_time; ///< Last time in us (PTS conversion)
int64_t start_pts; ///< Start time for packets (PTS)
int64_t ref_time; ///< Start time in us (monoclock)
+ int64_t buf_time; ///< Last buffered time in us (PTS conversion)
struct streaming_message_queue backlog[TIMESHIFT_BACKLOG_MAX]; ///< Queued packets for time sorting
int backlog_max;///< Maximum component index in backlog
enum {
- TS_INIT,
TS_EXIT,
TS_LIVE,
TS_PAUSE,
TS_PLAY,
} state; ///< Play state
pthread_mutex_t state_mutex; ///< Protect state changes
+ uint8_t exit; ///< Exit from the main input thread
uint8_t full; ///< Buffer is full
- streaming_start_t *smt_start; ///< Current stream makeup
-
streaming_queue_t wr_queue; ///< Writer queue
pthread_t wr_thread; ///< Writer thread
extern uint64_t timeshift_total_size;
extern uint64_t timeshift_total_ram_size;
-/*
- *
- */
-void timeshift_packets_clone ( timeshift_t *ts, struct streaming_message_queue *dst, int delivered );
-
/*
* Write functions
*/
* Utilities
* *************************************************************************/
-static streaming_message_t *_timeshift_find_sstart
- ( timeshift_file_t *tsf, int64_t time )
-{
- timeshift_index_data_t *ti;
-
- /* Find the SMT_START message that relates (comes before) the given time */
- ti = TAILQ_LAST(&tsf->sstart, timeshift_index_data_list);
- while (ti && ti->data->sm_time > time)
- ti = TAILQ_PREV(ti, timeshift_index_data_list, link);
-
- return ti ? ti->data : NULL;
-}
-
static int64_t _timeshift_first_time
( timeshift_t *ts, int *active )
{
*wait = 0;
tvhtrace("timeshift", "ts %d eof, cur_file %p", ts->id, *cur_file);
- /* Check SMT_START index */
- } else {
- streaming_message_t *ssm = _timeshift_find_sstart(*cur_file, (*sm)->sm_time);
- if (ssm && ssm->sm_data != ts->smt_start) {
- streaming_target_deliver2(ts->output, streaming_msg_clone(ssm));
- if (ts->smt_start)
- streaming_start_unref(ts->smt_start);
- ts->smt_start = ssm->sm_data;
- atomic_add(&ts->smt_start->ss_refcount, 1);
- }
}
}
return 0;
}
+/*
+ * Trace packet
+ */
+static void timeshift_trace_pkt
+ ( timeshift_t *ts, streaming_message_t *sm )
+{
+ th_pkt_t *pkt = sm->sm_data;
+ tvhtrace("timeshift",
+ "ts %d pkt out - stream %d type %c pts %10"PRId64
+ " dts %10"PRId64 " dur %10d len %6zu time %14"PRId64,
+ ts->id,
+ pkt->pkt_componentindex,
+ pkt_frametype_to_char(pkt->pkt_frametype),
+ ts_rescale(pkt->pkt_pts, 1000000),
+ ts_rescale(pkt->pkt_dts, 1000000),
+ pkt->pkt_duration,
+ pktbuf_len(pkt->pkt_payload), sm->sm_time);
+}
/*
* Flush all data to live
{
streaming_message_t *sm;
- time_t pts = 0;
while (*cur_file) {
if (_timeshift_read(ts, cur_file, &sm, wait) == -1)
return -1;
if (!sm) break;
- if (sm->sm_type == SMT_PACKET) {
- pts = ((th_pkt_t*)sm->sm_data)->pkt_pts;
- tvhlog(LOG_DEBUG, "timeshift", "ts %d deliver %"PRId64" pts=%"PRItime_t,
- ts->id, sm->sm_time, pts);
- }
+ if (tvhtrace_enabled() && sm->sm_type == SMT_PACKET)
+ timeshift_trace_pkt(ts, sm);
streaming_target_deliver2(ts->output, sm);
}
return 0;
}
-/*
- * Write packets from temporary queues
- */
-static void _timeshift_write_queues
- ( timeshift_t *ts )
-{
- struct streaming_message_queue sq;
- streaming_message_t *sm;
-
- TAILQ_INIT(&sq);
- timeshift_writer_clone(ts, &sq);
- timeshift_packets_clone(ts, &sq, 0);
- while ((sm = TAILQ_FIRST(&sq)) != NULL) {
- TAILQ_REMOVE(&sq, sm, sm_link);
- streaming_target_deliver2(ts->output, sm);
- }
-}
-
/*
* Send the status message
*/
int64_t start, end;
start = _timeshift_first_time(ts, &active);
- end = ts->last_time;
+ end = ts->buf_time;
if (current_time < 0)
current_time = 0;
if (current_time > end)
streaming_target_deliver2(ts->output, tsm);
}
-/*
- * Trace packet
- */
-static void timeshift_trace_pkt
- ( timeshift_t *ts, streaming_message_t *sm )
-{
- th_pkt_t *pkt = sm->sm_data;
- tvhtrace("timeshift",
- "ts %d pkt out - stream %d type %c pts %10"PRId64
- " dts %10"PRId64 " dur %10d len %6zu time %14"PRId64,
- ts->id,
- pkt->pkt_componentindex,
- pkt_frametype_to_char(pkt->pkt_frametype),
- ts_rescale(pkt->pkt_pts, 1000000),
- ts_rescale(pkt->pkt_dts, 1000000),
- pkt->pkt_duration,
- pktbuf_len(pkt->pkt_payload), sm->sm_time);
-}
-
/* **************************************************************************
* Thread
* *************************************************************************/
void *timeshift_reader ( void *p )
{
timeshift_t *ts = p;
- int nfds, end, run = 1, wait = -1, skip_delivered = 0;
+ int nfds, end, run = 1, wait = -1;
timeshift_file_t *cur_file = NULL, *tmp_file;
int cur_speed = 100, keyframe_mode = 0;
int64_t mono_now, mono_play_time = 0, mono_last_status = 0;
streaming_skip_t *skip = NULL;
tvhpoll_t *pd;
tvhpoll_event_t ev = { 0 };
- th_pkt_t *pkt;
pd = tvhpoll_create(1);
ev.fd = ts->rd_pipe.rd;
ts->id);
timeshift_writer_flush(ts);
ts->dobuf = 1;
- skip_delivered = 1;
tmp_file = timeshift_filemgr_newest(ts);
if (tmp_file != NULL) {
i64 = tmp_file->last;
tmp_file->refcount--;
} else {
- i64 = ts->last_time;
+ i64 = ts->buf_time;
}
cur_file = timeshift_filemgr_get(ts, i64);
if (cur_file != NULL) {
mono_play_time = mono_now;
tvhtrace("timeshift", "update play time TS_LIVE - %"PRId64" play buffer from %"PRId64, mono_now, pause_time);
} else if (ts->state == TS_PAUSE) {
- skip_delivered = 1;
pause_time = last_time;
}
tvhlog(LOG_DEBUG, "timeshift", "ts %d change speed %d", ts->id, speed);
cur_file->roff = cur_file->size;
last_time = cur_file->last;
} else {
- last_time = ts->last_time;
+ last_time = ts->buf_time;
}
- skip_delivered = 0;
}
/* May have failed */
/* Live (stage2) */
if (ts->state == TS_LIVE) {
- if (skip_time >= ts->last_time - TIMESHIFT_PLAY_BUF) {
+ if (skip_time >= ts->buf_time - TIMESHIFT_PLAY_BUF) {
tvhlog(LOG_DEBUG, "timeshift", "ts %d skip ignored, already live", ts->id);
skip = NULL;
} else {
((cur_speed > 0) && (sm->sm_time <= deliver))))) {
last_time = sm->sm_time;
- if (sm->sm_type == SMT_PACKET) {
- pkt = sm->sm_data;
- if (skip_delivered && pkt->pkt_delivered) {
- streaming_msg_free(sm);
- goto skip_pkt;
- }
- if (tvhtrace_enabled())
- timeshift_trace_pkt(ts, sm);
- }
+ if (sm->sm_type == SMT_PACKET && tvhtrace_enabled())
+ timeshift_trace_pkt(ts, sm);
streaming_target_deliver2(ts->output, sm);
-skip_pkt:
sm = NULL;
wait = 0;
if (_timeshift_flush_to_live(ts, &cur_file, &wait) == -1)
break;
- /* Flush write / backlog queues */
- _timeshift_write_queues(ts);
-
ts->state = TS_LIVE;
/* Close file (if open) */
}
}
-static void _copy_last_sstart ( timeshift_t *ts, timeshift_file_t *tsf )
-{
- streaming_message_t *sm;
- streaming_start_t *ss = ts->smt_start;
-
- if (ss) {
- atomic_add(&ss->ss_refcount, 1);
- sm = streaming_msg_create_data(SMT_START, ss);
- _handle_sstart(ts, tsf, sm);
- }
-}
-
/* **************************************************************************
* Thread
* *************************************************************************/
} else if (sm->sm_type == SMT_SIGNAL_STATUS)
err = timeshift_write_sigstat(tsf, sm->sm_time, sm->sm_data);
else if (sm->sm_type == SMT_PACKET) {
- if (TAILQ_EMPTY(&tsf->sstart))
- _copy_last_sstart(ts, tsf);
err = timeshift_write_packet(tsf, sm->sm_time, sm->sm_data);
if (err > 0) {
th_pkt_t *pkt = sm->sm_data;
}
}
} else if (sm->sm_type == SMT_MPEGTS) {
- if (TAILQ_EMPTY(&tsf->sstart))
- _copy_last_sstart(ts, tsf);
err = timeshift_write_mpegts(tsf, sm->sm_time, sm->sm_data);
}
else
/* Terminate */
case SMT_EXIT:
if (run) *run = 0;
- break;
+ goto live;
case SMT_STOP:
if (sm->sm_code != SM_CODE_SOURCE_RECONFIGURED && run)
*run = 0;
- break;
+ goto live;
/* Timeshifting */
case SMT_SKIP:
case SMT_SERVICE_STATUS:
case SMT_TIMESHIFT_STATUS:
case SMT_DESCRAMBLE_INFO:
- break;
+ goto live;
/* Store */
case SMT_SIGNAL_STATUS:
case SMT_MPEGTS:
case SMT_PACKET:
pthread_mutex_lock(&ts->state_mutex);
- if ((tsf = timeshift_filemgr_get(ts, sm->sm_time)) && (tsf->wfd >= 0 || tsf->ram)) {
- if ((err = _process_msg0(ts, tsf, &sm)) < 0) {
- timeshift_filemgr_close(tsf);
- tsf->bad = 1;
- ts->full = 1; ///< Stop any more writing
+ ts->buf_time = sm->sm_time;
+ if (ts->state == TS_LIVE)
+ streaming_target_deliver2(ts->output, streaming_msg_clone(sm));
+ if (ts->dobuf) {
+ if ((tsf = timeshift_filemgr_get(ts, sm->sm_time)) && (tsf->wfd >= 0 || tsf->ram)) {
+ if ((err = _process_msg0(ts, tsf, &sm)) < 0) {
+ timeshift_filemgr_close(tsf);
+ tsf->bad = 1;
+ ts->full = 1; ///< Stop any more writing
+ }
+ tsf->refcount--;
}
- tsf->refcount--;
}
pthread_mutex_unlock(&ts->state_mutex);
break;
}
/* Next */
- if (sm)
- streaming_msg_free(sm);
+ streaming_msg_free(sm);
+
+live:
+ pthread_mutex_lock(&ts->state_mutex);
+ if (ts->state == TS_LIVE)
+ streaming_target_deliver2(ts->output, sm);
+ pthread_mutex_unlock(&ts->state_mutex);
}
void *timeshift_writer ( void *aux )