return end;
}
+/*
+ * Output packet
+ */
+static int _timeshift_read
+ ( timeshift_t *ts, timeshift_file_t **cur_file, off_t *cur_off, int *fd,
+ streaming_message_t **sm, int *wait )
+{
+ if (*cur_file) {
+
+ /* Open file */
+ if (*fd == -1) {
+#ifdef TSHFT_TRACE
+ tvhlog(LOG_DEBUG, "timeshift", "ts %d open file %s",
+ ts->id, (*cur_file)->path);
+#endif
+ *fd = open((*cur_file)->path, O_RDONLY);
+ }
+ if (*cur_off) {
+#ifdef TSHFT_TRACE
+ tvhlog(LOG_DEBUG, "timeshift", "ts %d seek to %lu", ts->id, *cur_off);
+#endif
+ lseek(*fd, *cur_off, SEEK_SET);
+ }
+
+ /* Read msg */
+ ssize_t r = _read_msg(*fd, sm);
+ if (r < 0) {
+ streaming_message_t *e = streaming_msg_create_code(SMT_STOP, SM_CODE_UNDEFINED_ERROR);
+ streaming_target_deliver2(ts->output, e);
+ tvhlog(LOG_ERR, "timeshift", "ts %d could not read buffer", ts->id);
+ return -1;
+ }
+#ifdef TSHFT_TRACE
+ tvhlog(LOG_DEBUG, "timeshift", "ts %d read msg %p (%ld)",
+ ts->id, *sm, r);
+#endif
+
+ /* Incomplete */
+ if (r == 0)
+ lseek(*fd, *cur_off, SEEK_SET);
+ else
+ *cur_off += r;
+
+ /* Special case - EOF */
+ if (r == sizeof(size_t) || *cur_off > (*cur_file)->size) {
+ close(*fd);
+ *fd = -1;
+ *cur_file = timeshift_filemgr_next(*cur_file, NULL, 0);
+ *cur_off = 0; // reset
+ *wait = 0;
+ }
+ }
+ return 0;
+}
+
+
+/*
+ * Flush all data to live
+ */
+static int _timeshift_flush_to_live
+ ( timeshift_t *ts, timeshift_file_t **cur_file, off_t *cur_off, int *fd,
+ streaming_message_t **sm, int *wait )
+{
+ time_t pts = 0;
+ while (*cur_file) {
+ if (_timeshift_read(ts, cur_file, cur_off, fd, sm, wait) == -1)
+ return -1;
+ if (!*sm) break;
+ if ((*sm)->sm_type == SMT_PACKET) {
+ pts = ((th_pkt_t*)(*sm)->sm_data)->pkt_pts;
+ tvhlog(LOG_DEBUG, "timeshift", "ts %d deliver %"PRId64" pts=%"PRItime_t " shift=%"PRIu64,
+ ts->id, (*sm)->sm_time, pts, (*sm)->sm_timeshift );
+ }
+ streaming_target_deliver2(ts->output, *sm);
+ *sm = NULL;
+ }
+ return 0;
+}
+
+
/* **************************************************************************
* Thread
* *************************************************************************/
}
/* Find packet */
- if (cur_file) {
-
- /* Open file */
- if (fd == -1) {
-#ifdef TSHFT_TRACE
- tvhlog(LOG_DEBUG, "timeshift", "ts %d open file %s",
- ts->id, cur_file->path);
-#endif
- fd = open(cur_file->path, O_RDONLY);
- }
- if (cur_off) {
-#ifdef TSHFT_TRACE
- tvhlog(LOG_DEBUG, "timeshift", "ts %d seek to %lu", ts->id, cur_off);
-#endif
- lseek(fd, cur_off, SEEK_SET);
- }
-
- /* Read msg */
- ssize_t r = _read_msg(fd, &sm);
- if (r < 0) {
- streaming_message_t *e = streaming_msg_create_code(SMT_STOP, SM_CODE_UNDEFINED_ERROR);
- streaming_target_deliver2(ts->output, e);
- tvhlog(LOG_ERR, "timeshift", "ts %d could not read buffer", ts->id);
- pthread_mutex_unlock(&ts->rdwr_mutex);
- pthread_mutex_unlock(&ts->state_mutex);
- break;
- }
-#ifdef TSHFT_TRACE
- tvhlog(LOG_DEBUG, "timeshift", "ts %d read msg %p (%ld)",
- ts->id, sm, r);
-#endif
-
- /* Incomplete */
- if (r == 0)
- lseek(fd, cur_off, SEEK_SET);
- else
- cur_off += r;
-
- /* Special case - EOF */
- if (r == sizeof(size_t) || cur_off > cur_file->size) {
- close(fd);
- fd = -1;
- cur_file = timeshift_filemgr_next(cur_file, NULL, 0);
- cur_off = 0; // reset
- wait = 0;
- }
+ if (_timeshift_read(ts, &cur_file, &cur_off, &fd, &sm, &wait) == -1) {
+ pthread_mutex_unlock(&ts->rdwr_mutex);
+ pthread_mutex_unlock(&ts->state_mutex);
+ break;
}
}
/* Terminate */
if (!cur_file || end != 0) {
if (!end)
- end = (cur_file > 0) ? 1 : -1;
+ end = (cur_speed > 0) ? 1 : -1;
/* Back to live */
if (end == 1) {
ctrl = streaming_msg_create_code(SMT_SPEED, cur_speed);
streaming_target_deliver2(ts->output, ctrl);
+ /* Flush timeshift buffer to live */
+ if (_timeshift_flush_to_live(ts, &cur_file, &cur_off, &fd, &sm, &wait) == -1)
+ break;
+
+ /* Close file (if open) */
+ if (fd != -1) {
+ close(fd);
+ fd = -1;
+ }
+
/* Flush ALL files */
if (ts->ondemand)
timeshift_filemgr_flush(ts, NULL);
/* Pause */
} else {
- tvhlog(LOG_DEBUG, "timeshift", "ts %d sob pause stream", ts->id);
- cur_speed = 0;
- ts->state = TS_PAUSE;
+ if (cur_speed <= 0) {
+ cur_speed = 0;
+ ts->state = TS_PAUSE;
+ } else {
+ ts->state = TS_PLAY;
+ play_time = now;
+ }
+ tvhlog(LOG_DEBUG, "timeshift", "ts %d sob speed %d", ts->id, cur_speed);
pause_time = last_time;
ctrl = streaming_msg_create_code(SMT_SPEED, cur_speed);
streaming_target_deliver2(ts->output, ctrl);
}
/* Cleanup */
- if (sm) streaming_msg_free(sm);
- if (ctrl) streaming_msg_free(ctrl);
+ if (fd != -1) close(fd);
+ if (sm) streaming_msg_free(sm);
+ if (ctrl) streaming_msg_free(ctrl);
#ifdef TSHFT_TRACE
tvhlog(LOG_DEBUG, "timeshift", "ts %d exit reader thread", ts->id);
#endif