From: Jaroslav Kysela Date: Mon, 4 Jan 2016 15:32:48 +0000 (+0100) Subject: timeshift: fix file refcounting, create timeshift_seek_t to maintain seek state correctly X-Git-Tag: v4.2.1~1232 X-Git-Url: http://git.ipfire.org/gitweb.cgi?a=commitdiff_plain;h=80a50ce2ee93ea657ab00b57b34d2b5eba439acd;p=thirdparty%2Ftvheadend.git timeshift: fix file refcounting, create timeshift_seek_t to maintain seek state correctly --- diff --git a/src/timeshift.c b/src/timeshift.c index bdb0673ee..37fac3bd5 100644 --- a/src/timeshift.c +++ b/src/timeshift.c @@ -429,6 +429,8 @@ streaming_target_t *timeshift_create ts->buf_time = 0; ts->start_pts = 0; ts->ref_time = 0; + ts->seek.file = NULL; + ts->seek.frame = NULL; for (i = 0; i < TIMESHIFT_BACKLOG_MAX; i++) TAILQ_INIT(&ts->backlog[i]); pthread_mutex_init(&ts->state_mutex, NULL); diff --git a/src/timeshift/private.h b/src/timeshift/private.h index e24ca74b2..bcd301213 100644 --- a/src/timeshift/private.h +++ b/src/timeshift/private.h @@ -19,7 +19,7 @@ #ifndef __TVH_TIMESHIFT_PRIVATE_H__ #define __TVH_TIMESHIFT_PRIVATE_H__ -#define TIMESHIFT_PLAY_BUF 1000000 //< us to buffer in TX +#define TIMESHIFT_PLAY_BUF 1000000 //< us to buffer in TX #define TIMESHIFT_FILE_PERIOD 60 //< number of secs in each buffer file #define TIMESHIFT_BACKLOG_MAX 16 //< maximum elementary streams @@ -79,6 +79,14 @@ typedef struct timeshift_file typedef TAILQ_HEAD(timeshift_file_list,timeshift_file) timeshift_file_list_t; +/** + * + */ +typedef struct timeshift_seek { + timeshift_file_t *file; + timeshift_index_iframe_t *frame; +} timeshift_seek_t; + /** * */ @@ -110,6 +118,8 @@ typedef struct timeshift { pthread_mutex_t state_mutex; ///< Protect state changes uint8_t exit; ///< Exit from the main input thread uint8_t full; ///< Buffer is full + + timeshift_seek_t seek; ///< Seek into buffered data streaming_queue_t wr_queue; ///< Writer queue pthread_t wr_thread; ///< Writer thread @@ -165,6 +175,35 @@ void timeshift_filemgr_init ( void ); void timeshift_filemgr_term ( void ); int timeshift_filemgr_makedirs ( int ts_index, char *buf, size_t len ); +static inline void timeshift_file_get0 ( timeshift_file_t *tsf ) +{ + if (tsf) + tsf->refcount++; +} + +#define timeshift_file_get(tsf) ({ \ + timeshift_file_get0(tsf); \ + /* tvhtrace("timeshift", "file get %p refcount %d fcn %s:%d", \ + tsf, tsf ? tsf->refcount : -1, __FUNCTION__, __LINE__); */ \ + tsf; \ +}) + +static inline void timeshift_file_put0 ( timeshift_file_t *tsf ) +{ + if (tsf) { + if (!tsf->refcount) + abort(); + tsf->refcount--; + } +} + +#define timeshift_file_put(tsf) ({ \ + /* tvhtrace("timeshift", "file put %p refcount %d fcn %s:%d", \ + tsf, tsf ? (tsf->refcount - 1) : -1, __FUNCTION__, __LINE__); */ \ + timeshift_file_put0(tsf); \ + tsf; \ +}) + timeshift_file_t *timeshift_filemgr_get ( timeshift_t *ts, int64_t start_time ); timeshift_file_t *timeshift_filemgr_oldest diff --git a/src/timeshift/timeshift_filemgr.c b/src/timeshift/timeshift_filemgr.c index 0e0e4f0ae..a65cda768 100644 --- a/src/timeshift/timeshift_filemgr.c +++ b/src/timeshift/timeshift_filemgr.c @@ -129,8 +129,8 @@ timeshift_filemgr_dump0 ( timeshift_t *ts ) return; } TAILQ_FOREACH(tsf, &ts->files, link) { - tvhtrace("timeshift", "ts %d file dump tsf %p time %"PRId64" last %"PRId64, - ts->id, tsf, tsf->time, tsf->last); + tvhtrace("timeshift", "ts %d (full=%d) file dump tsf %p time %4"PRId64" last %10"PRId64" bad %d refcnt %d", + ts->id, ts->full, tsf, tsf->time, tsf->last, tsf->bad, tsf->refcount); } } @@ -358,9 +358,7 @@ timeshift_file_t *timeshift_filemgr_get ( timeshift_t *ts, int64_t start_time ) tsf_tl = tsf_tmp; } - if (tsf_tl) - tsf_tl->refcount++; - return tsf_tl; + return timeshift_file_get(tsf_tl); } timeshift_file_t *timeshift_filemgr_next @@ -369,22 +367,18 @@ timeshift_file_t *timeshift_filemgr_next timeshift_file_t *nxt = TAILQ_NEXT(tsf, link); if (!nxt && end) *end = 1; if (!nxt && keep) return tsf; - tsf->refcount--; - if (nxt) - nxt->refcount++; - return nxt; + timeshift_file_put(tsf); + return timeshift_file_get(nxt); } timeshift_file_t *timeshift_filemgr_prev ( timeshift_file_t *tsf, int *end, int keep ) { - timeshift_file_t *nxt = TAILQ_PREV(tsf, timeshift_file_list, link); - if (!nxt && end) *end = 1; - if (!nxt && keep) return tsf; - tsf->refcount--; - if (nxt) - nxt->refcount++; - return nxt; + timeshift_file_t *prev = TAILQ_PREV(tsf, timeshift_file_list, link); + if (!prev && end) *end = 1; + if (!prev && keep) return tsf; + timeshift_file_put(tsf); + return timeshift_file_get(prev); } /* @@ -393,9 +387,7 @@ timeshift_file_t *timeshift_filemgr_prev timeshift_file_t *timeshift_filemgr_oldest ( timeshift_t *ts ) { timeshift_file_t *tsf = TAILQ_FIRST(&ts->files); - if (tsf) - tsf->refcount++; - return tsf; + return timeshift_file_get(tsf); } /* @@ -404,9 +396,7 @@ timeshift_file_t *timeshift_filemgr_oldest ( timeshift_t *ts ) timeshift_file_t *timeshift_filemgr_newest ( timeshift_t *ts ) { timeshift_file_t *tsf = TAILQ_LAST(&ts->files, timeshift_file_list); - if (tsf) - tsf->refcount++; - return tsf; + return timeshift_file_get(tsf); } /* ************************************************************************** diff --git a/src/timeshift/timeshift_reader.c b/src/timeshift/timeshift_reader.c index f4e2260b1..1f71a72f7 100644 --- a/src/timeshift/timeshift_reader.c +++ b/src/timeshift/timeshift_reader.c @@ -37,6 +37,38 @@ #include #endif +/* ************************************************************************** + * Buffered position handling + * *************************************************************************/ + +static timeshift_seek_t *_seek_reset ( timeshift_seek_t *seek ) +{ + timeshift_file_t *tsf = seek->file; + seek->file = NULL; + seek->frame = NULL; + timeshift_file_put(tsf); + return seek; +} + +static timeshift_seek_t *_seek_set_file + ( timeshift_seek_t *seek, timeshift_file_t *tsf, off_t roff ) +{ + seek->file = tsf; + seek->frame = NULL; + if (tsf) + tsf->roff = roff; + return seek; +} + +static timeshift_seek_t *_read_close ( timeshift_seek_t *seek ) +{ + if (seek->file && seek->file->rfd >= 0) { + close(seek->file->rfd); + seek->file->rfd = -1; + } + return _seek_reset(seek); +} + /* ************************************************************************** * File Reading * *************************************************************************/ @@ -57,6 +89,7 @@ static ssize_t _read_buf ( timeshift_file_t *tsf, int fd, void *buf, size_t size ret = 0; while (size > 0) { r = read(tsf ? tsf->rfd : fd, buf, size); + tvhtrace("timeshift", "read %ld size %ld", (long)r, (long)size); if (r < 0) { if (ERRNO_AGAIN(errno)) continue; @@ -72,7 +105,7 @@ static ssize_t _read_buf ( timeshift_file_t *tsf, int fd, void *buf, size_t size return 0; } if (ret > 0 && tsf) - tsf->roff += r; + tsf->roff += ret; return ret; } } @@ -130,7 +163,10 @@ static ssize_t _read_msg ( timeshift_file_t *tsf, int fd, streaming_message_t ** if (sz == 0) return cnt; /* Wrong data size */ - if (sz > 1024 * 1024) return -1; + if (sz > 1024 * 1024) { + tvhtrace("timeshift", "wrong msg size (%lld/0x%llx)", (long long)sz, (long long)sz); + return -1; + } /* Type */ r = _read_buf(tsf, fd, &type, sizeof(type)); @@ -235,25 +271,19 @@ static int64_t _timeshift_first_time *active = 1; ret = tsi->time; } - if (tsf) - tsf->refcount--; + timeshift_file_put(tsf); return ret; } static int _timeshift_skip ( timeshift_t *ts, int64_t req_time, int64_t cur_time, - timeshift_file_t *cur_file, timeshift_file_t **new_file, - timeshift_index_iframe_t **iframe ) + timeshift_seek_t *seek, timeshift_seek_t *nseek ) { - timeshift_index_iframe_t *tsi = *iframe; - timeshift_file_t *tsf = cur_file, *tsf_last; + timeshift_index_iframe_t *tsi = seek->frame; + timeshift_file_t *tsf = seek->file, *tsf_last; int64_t sec = req_time / (1000000 * TIMESHIFT_FILE_PERIOD); int back = (req_time < cur_time) ? 1 : 0; int end = 0; - - /* Hold local ref */ - if (cur_file) - cur_file->refcount++; /* Coarse search */ if (!tsi) { @@ -298,6 +328,7 @@ static int _timeshift_skip /* Find start/end of buffer */ if (end) { + timeshift_file_put(tsf); if (back) { tsf = tsf_last = timeshift_filemgr_oldest(ts); tsi = NULL; @@ -323,12 +354,9 @@ static int _timeshift_skip } } - if (cur_file) - cur_file->refcount--; - /* Done */ - *new_file = tsf; - *iframe = tsi; + nseek->file = tsf; + nseek->frame = tsi; return end; } @@ -337,39 +365,37 @@ static int _timeshift_skip */ static int _timeshift_do_skip ( timeshift_t *ts, int64_t req_time, int64_t last_time, - timeshift_file_t **_cur_file, timeshift_index_iframe_t **_tsi ) + timeshift_seek_t *seek ) { - timeshift_file_t *tsf = NULL, *cur_file; - timeshift_index_iframe_t *tsi; + timeshift_seek_t nseek; int end; tvhlog(LOG_DEBUG, "timeshift", "ts %d skip to %"PRId64" from %"PRId64, ts->id, req_time, last_time); + timeshift_file_get(seek->file); + /* Find */ - cur_file = *_cur_file; - end = _timeshift_skip(ts, req_time, last_time, - cur_file, &tsf, _tsi); - tsi = *_tsi; - if (tsi) + end = _timeshift_skip(ts, req_time, last_time, seek, &nseek); + if (nseek.frame) tvhlog(LOG_DEBUG, "timeshift", "ts %d skip found pkt @ %"PRId64, - ts->id, tsi->time); + ts->id, nseek.frame->time); /* File changed (close) */ - if ((tsf != cur_file) && cur_file && cur_file->rfd >= 0) { - close(cur_file->rfd); - cur_file->rfd = -1; - } + if (nseek.file != seek->file) + _read_close(seek); + + timeshift_file_put(seek->file); /* Position */ - if (cur_file) - cur_file->refcount--; - if ((cur_file = *_cur_file = tsf) != NULL) { - if (tsi) - cur_file->roff = tsi->pos; + *seek = nseek; + if (nseek.file != NULL) { + if (nseek.frame) + nseek.file->roff = nseek.frame->pos; else - cur_file->roff = req_time > last_time ? cur_file->size : 0; - tvhtrace("timeshift", "do skip cur_file %p roff %"PRId64, cur_file, (int64_t)cur_file->roff); + nseek.file->roff = req_time > last_time ? nseek.file->size : 0; + tvhtrace("timeshift", "do skip seek->file %p roff %"PRId64, + nseek.file, (int64_t)nseek.file->roff); } return end; @@ -380,10 +406,10 @@ static int _timeshift_do_skip * Output packet */ static int _timeshift_read - ( timeshift_t *ts, timeshift_file_t **cur_file, + ( timeshift_t *ts, timeshift_seek_t *seek, streaming_message_t **sm, int *wait ) { - timeshift_file_t *tsf = *cur_file; + timeshift_file_t *tsf = seek->file; ssize_t r; off_t off = 0; @@ -408,7 +434,7 @@ static int _timeshift_read if (r < 0) { streaming_message_t *e = streaming_msg_create_code(SMT_STOP, SM_CODE_UNDEFINED_ERROR); streaming_target_deliver2(ts->output, e); - tvhtrace("timeshift", "ts %d seek to %jd (fd %i)", ts->id, (intmax_t)tsf->roff, tsf->rfd); + tvhtrace("timeshift", "ts %d seek to %jd (woff %jd) (fd %i)", ts->id, (intmax_t)off, (intmax_t)tsf->woff, tsf->rfd); tvhlog(LOG_ERR, "timeshift", "ts %d could not read buffer", ts->id); return -1; } @@ -417,14 +443,11 @@ static int _timeshift_read /* Special case - EOF */ if (r <= sizeof(size_t) || tsf->roff > tsf->size || *sm == NULL) { - if (tsf->rfd >= 0) - close(tsf->rfd); - tsf->rfd = -1; - *cur_file = timeshift_filemgr_next(tsf, NULL, 0); - if (*cur_file) - (*cur_file)->roff = 0; // reset + timeshift_file_get(seek->file); /* _read_close decreases file reference */ + _read_close(seek); + _seek_set_file(seek, timeshift_filemgr_next(tsf, NULL, 0), 0); *wait = 0; - tvhtrace("timeshift", "ts %d eof, cur_file %p (prev %p)", ts->id, *cur_file, tsf); + tvhtrace("timeshift", "ts %d eof, seek->file %p (prev %p)", ts->id, seek->file, tsf); timeshift_filemgr_dump(ts); } } @@ -435,12 +458,12 @@ static int _timeshift_read * Flush all data to live */ static int _timeshift_flush_to_live - ( timeshift_t *ts, timeshift_file_t **cur_file, int *wait ) + ( timeshift_t *ts, timeshift_seek_t *seek, int *wait ) { streaming_message_t *sm; - while (*cur_file) { - if (_timeshift_read(ts, cur_file, &sm, wait) == -1) + while (seek->file) { + if (_timeshift_read(ts, seek, &sm, wait) == -1) return -1; if (!sm) break; timeshift_packet_log("ouf", ts, sm); @@ -502,13 +525,13 @@ void *timeshift_reader ( void *p ) { timeshift_t *ts = p; int nfds, end, run = 1, wait = -1, state; - timeshift_file_t *cur_file = NULL, *tmp_file; + timeshift_seek_t *seek = &ts->seek; + timeshift_file_t *tmp_file; int cur_speed = 100, keyframe_mode = 0; int64_t mono_now, mono_play_time = 0, mono_last_status = 0; int64_t deliver, deliver0, pause_time = 0, last_time = 0, skip_time = 0; int64_t i64; streaming_message_t *sm = NULL, *ctrl = NULL; - timeshift_index_iframe_t *tsi = NULL; streaming_skip_t *skip = NULL; tvhpoll_t *pd; tvhpoll_event_t ev = { 0 }; @@ -559,7 +582,7 @@ void *timeshift_reader ( void *p ) /* Ignore negative */ if (!ts->dobuf && (speed < 0)) - speed = cur_file ? speed : 0; + speed = seek->file ? speed : 0; /* Process */ if (cur_speed != speed) { @@ -579,21 +602,22 @@ void *timeshift_reader ( void *p ) tvhlog(LOG_DEBUG, "timeshift", "ts %d enter timeshift mode", ts->id); ts->dobuf = 1; + _seek_reset(seek); tmp_file = timeshift_filemgr_newest(ts); if (tmp_file != NULL) { i64 = tmp_file->last; - tmp_file->refcount--; + timeshift_file_put(tmp_file); } else { i64 = ts->buf_time; } - cur_file = timeshift_filemgr_get(ts, i64); - if (cur_file != NULL) { - cur_file->roff = cur_file->size; - pause_time = cur_file->last; - last_time = pause_time; + seek->file = timeshift_filemgr_get(ts, i64); + if (seek->file != NULL) { + seek->file->roff = seek->file->size; + pause_time = seek->file->last; + last_time = pause_time; } else { - pause_time = i64; - last_time = pause_time; + pause_time = i64; + last_time = pause_time; } } } @@ -604,9 +628,8 @@ void *timeshift_reader ( void *p ) tvhlog(LOG_DEBUG, "timeshift", "using keyframe mode? %s", keyframe ? "yes" : "no"); keyframe_mode = keyframe; - if (keyframe) { - tsi = NULL; - } + if (keyframe) + seek->frame = NULL; } /* Update */ @@ -640,6 +663,7 @@ void *timeshift_reader ( void *p ) /* Reset */ if (ts->full) { timeshift_filemgr_flush(ts, NULL); + _seek_reset(seek); ts->full = 0; } @@ -663,45 +687,44 @@ void *timeshift_reader ( void *p ) /* Live playback (stage1) */ if (ts->state == TS_LIVE) { + _seek_reset(seek); tmp_file = timeshift_filemgr_newest(ts); if (tmp_file) { i64 = tmp_file->last; - tmp_file->refcount--; + timeshift_file_put(tmp_file); } - if (tmp_file && (cur_file = timeshift_filemgr_get(ts, i64)) != NULL) { - cur_file->roff = cur_file->size; - last_time = cur_file->last; + if (tmp_file && (seek->file = timeshift_filemgr_get(ts, i64)) != NULL) { + seek->file->roff = seek->file->size; + last_time = seek->file->last; } else { - last_time = ts->buf_time; + last_time = ts->buf_time; } } /* May have failed */ - if (skip) { - if (skip->type == SMT_SKIP_REL_TIME) - skip_time += last_time; - tvhlog(LOG_DEBUG, "timeshift", "ts %d skip time %"PRId64, ts->id, skip_time); - - /* Live (stage2) */ - if (ts->state == TS_LIVE) { - if (skip_time >= ts->buf_time - TIMESHIFT_PLAY_BUF) { - tvhlog(LOG_DEBUG, "timeshift", "ts %d skip ignored, already live", ts->id); - skip = NULL; - } else { - ts->state = TS_PLAY; - ts->dobuf = 1; - tvhtrace("timeshift", "reader - set TS_PLAY"); - } + if (skip->type == SMT_SKIP_REL_TIME) + skip_time += last_time; + tvhlog(LOG_DEBUG, "timeshift", "ts %d skip time %"PRId64, ts->id, skip_time); + + /* Live (stage2) */ + if (ts->state == TS_LIVE) { + if (skip_time >= ts->buf_time - TIMESHIFT_PLAY_BUF) { + tvhlog(LOG_DEBUG, "timeshift", "ts %d skip ignored, already live", ts->id); + skip = NULL; + } else { + ts->state = TS_PLAY; + ts->dobuf = 1; + tvhtrace("timeshift", "reader - set TS_PLAY"); } } /* OK */ if (skip) { /* seek */ - tsi = NULL; - end = _timeshift_do_skip(ts, skip_time, last_time, &cur_file, &tsi); - if (tsi) { - pause_time = tsi->time; + seek->frame = NULL; + end = _timeshift_do_skip(ts, skip_time, last_time, seek); + if (seek->frame) { + pause_time = seek->frame->time; tvhtrace("timeshift", "ts %d skip - play buffer from %"PRId64" last_time %"PRId64, ts->id, pause_time, last_time); @@ -743,7 +766,7 @@ void *timeshift_reader ( void *p ) /* Done */ - if (!run || !cur_file || ((ts->state != TS_PLAY && !skip))) { + if (!run || !seek->file || ((ts->state != TS_PLAY && !skip))) { if (mono_now >= (mono_last_status + 1000000)) { timeshift_status(ts, last_time); mono_last_status = mono_now; @@ -772,7 +795,7 @@ void *timeshift_reader ( void *p ) else req_time = skip_time; - end = _timeshift_do_skip(ts, req_time, last_time, &cur_file, &tsi); + end = _timeshift_do_skip(ts, req_time, last_time, seek); } /* Clear old message */ @@ -782,7 +805,7 @@ void *timeshift_reader ( void *p ) } /* Find packet */ - if (_timeshift_read(ts, &cur_file, &sm, &wait) == -1) { + if (_timeshift_read(ts, seek, &sm, &wait) == -1) { pthread_mutex_unlock(&ts->state_mutex); break; } @@ -838,10 +861,10 @@ void *timeshift_reader ( void *p ) } /* Terminate */ - if (!cur_file || end != 0) { + if (!seek->file || end != 0) { /* Back to live (unless buffer is full) */ - if ((end == 1 && !ts->full) || !cur_file) { + if ((end == 1 && !ts->full) || !seek->file) { tvhlog(LOG_DEBUG, "timeshift", "ts %d eob revert to live mode", ts->id); cur_speed = 100; ctrl = streaming_msg_create_code(SMT_SPEED, cur_speed); @@ -850,7 +873,7 @@ void *timeshift_reader ( void *p ) tvhtrace("timeshift", "reader - set TS_LIVE"); /* Flush timeshift buffer to live */ - if (_timeshift_flush_to_live(ts, &cur_file, &wait) == -1) { + if (_timeshift_flush_to_live(ts, seek, &wait) == -1) { pthread_mutex_unlock(&ts->state_mutex); break; } @@ -858,10 +881,7 @@ void *timeshift_reader ( void *p ) ts->state = TS_LIVE; /* Close file (if open) */ - if (cur_file && cur_file->rfd >= 0) { - close(cur_file->rfd); - cur_file->rfd = -1; - } + _read_close(seek); /* Pause */ } else { @@ -894,10 +914,7 @@ void *timeshift_reader ( void *p ) /* Cleanup */ tvhpoll_destroy(pd); - if (cur_file && cur_file->rfd >= 0) { - close(cur_file->rfd); - cur_file->rfd = -1; - } + _read_close(seek); if (sm) streaming_msg_free(sm); if (ctrl) streaming_msg_free(ctrl); tvhtrace("timeshift", "ts %d exit reader thread", ts->id); diff --git a/src/timeshift/timeshift_writer.c b/src/timeshift/timeshift_writer.c index 0d1682c7d..06558ac49 100644 --- a/src/timeshift/timeshift_writer.c +++ b/src/timeshift/timeshift_writer.c @@ -342,15 +342,17 @@ static void _process_msg timeshift_packet_log("liv", ts, sm); } if (ts->dobuf) { - if ((tsf = timeshift_filemgr_get(ts, sm->sm_time)) && (tsf->wfd >= 0 || tsf->ram)) { - if ((err = _process_msg0(ts, tsf, sm)) < 0) { - timeshift_filemgr_close(tsf); - tsf->bad = 1; - ts->full = 1; ///< Stop any more writing - } else { - timeshift_packet_log("sav", ts, sm); + if ((tsf = timeshift_filemgr_get(ts, sm->sm_time)) != NULL) { + if (tsf->wfd >= 0 || tsf->ram) { + if ((err = _process_msg0(ts, tsf, sm)) < 0) { + timeshift_filemgr_close(tsf); + tsf->bad = 1; + ts->full = 1; ///< Stop any more writing + } else { + timeshift_packet_log("sav", ts, sm); + } } - tsf->refcount--; + timeshift_file_put(tsf); } } pthread_mutex_unlock(&ts->state_mutex);