}
}
+void
+timeshift_packets_clone( timeshift_t *ts, struct streaming_message_queue *dst )
+{
+ streaming_message_t *lowest, *sm;
+ struct streaming_message_queue *sq, *backlogs;
+ int i;
+
+ lock_assert(&ts->buffering_mutex);
+
+ /* init temporary queues */
+ backlogs = alloca(ts->backlog_max * sizeof(*backlogs));
+ for (i = 0; i < ts->backlog_max; i++)
+ TAILQ_INIT(&backlogs[i]);
+ /* push to destination (pts sorted) */
+ while (1) {
+ lowest = NULL;
+ for (i = 0; i < ts->backlog_max; i++) {
+ sq = &backlogs[i];
+ sm = TAILQ_FIRST(sq);
+ if (sm && (lowest == NULL || lowest->sm_time > sm->sm_time))
+ lowest = sm;
+ }
+ if (!lowest)
+ break;
+ TAILQ_REMOVE(sq, lowest, sm_link);
+ TAILQ_INSERT_TAIL(dst, lowest, sm_link);
+ }
+}
+
/*
* Receive data
*/
ts->state = TS_LIVE;
}
+ pthread_mutex_lock(&ts->buffering_mutex);
+
/* Pass-thru */
if (ts->state <= TS_LIVE) {
if (sm->sm_type == SMT_START) {
}
pktcont:
+ pthread_mutex_unlock(&ts->buffering_mutex);
+
/* Exit/Stop */
if (exit) {
timeshift_write_exit(ts->rd_pipe.wr);
TAILQ_INIT(&ts->backlog[i]);
pthread_mutex_init(&ts->rdwr_mutex, NULL);
pthread_mutex_init(&ts->state_mutex, NULL);
+ pthread_mutex_init(&ts->buffering_mutex, NULL);
/* Initialise output */
tvh_pipe(O_NONBLOCK, &ts->rd_pipe);
int64_t ref_time; ///< Start time in us (monoclock)
struct streaming_message_queue backlog[TIMESHIFT_BACKLOG_MAX]; ///< Queued packets for time sorting
int backlog_max;///< Maximum component index in backlog
+ pthread_mutex_t buffering_mutex;///< Protect backlog / write queues
enum {
TS_INIT,
extern uint64_t timeshift_total_size;
extern uint64_t timeshift_total_ram_size;
+/*
+ *
+ */
+void timeshift_packets_clone ( timeshift_t *ts, struct streaming_message_queue *dst );
+
/*
* Write functions
*/
ssize_t timeshift_write_eof ( timeshift_file_t *tsf );
void timeshift_writer_flush ( timeshift_t *ts );
+void timeshift_writer_clone ( timeshift_t *ts, struct streaming_message_queue *dst );
/*
* Threads
return 0;
}
+/*
+ * Write packets from temporary queues
+ */
+static void _timeshift_write_queues
+ ( timeshift_t *ts )
+{
+ struct streaming_message_queue sq;
+ streaming_message_t *sm;
+
+ TAILQ_INIT(&sq);
+ timeshift_writer_clone(ts, &sq);
+ timeshift_packets_clone(ts, &sq);
+ while ((sm = TAILQ_FIRST(&sq)) != NULL) {
+ TAILQ_REMOVE(&sq, sm, sm_link);
+ streaming_target_deliver2(ts->output, sm);
+ }
+}
+
/*
* Send the status message
*/
streaming_target_deliver2(ts->output, ctrl);
ctrl = NULL;
tvhtrace("timeshift", "reader - set TS_LIVE");
- ts->state = TS_LIVE;
+
+ /* Critical section - protect write / backlog queues */
+ pthread_mutex_lock(&ts->buffering_mutex);
/* Flush timeshift buffer to live */
- if (_timeshift_flush_to_live(ts, &cur_file, &sm, &wait) == -1)
+ if (_timeshift_flush_to_live(ts, &cur_file, &sm, &wait) == -1) {
+ pthread_mutex_unlock(&ts->buffering_mutex);
break;
+ }
+
+ /* Flush write / backlog queues */
+ _timeshift_write_queues(ts);
+
+ ts->state = TS_LIVE;
+ pthread_mutex_unlock(&ts->buffering_mutex);
/* Close file (if open) */
if (cur_file && cur_file->rfd >= 0) {
pthread_mutex_unlock(&sq->sq_mutex);
}
+void timeshift_writer_clone ( timeshift_t *ts, struct streaming_message_queue *dst )
+{
+ streaming_message_t *sm, *sm2;
+ streaming_queue_t *sq = &ts->wr_queue;
+
+ pthread_mutex_lock(&sq->sq_mutex);
+ TAILQ_FOREACH(sm, &sq->sq_queue, sm_link) {
+ sm2 = streaming_msg_clone(sm);
+ TAILQ_INSERT_TAIL(dst, sm2, sm_link);
+ }
+ pthread_mutex_unlock(&sq->sq_mutex);
+}