From: Jaroslav Kysela Date: Mon, 19 Jan 2015 20:08:15 +0000 (+0100) Subject: timeshift: implement timeshift to RAM, fixes #2626 X-Git-Tag: v4.1~452 X-Git-Url: http://git.ipfire.org/gitweb.cgi?a=commitdiff_plain;h=bc9874cc2670515c3f31c2592758cef2a24557b7;p=thirdparty%2Ftvheadend.git timeshift: implement timeshift to RAM, fixes #2626 --- diff --git a/docs/html/config_timeshift.html b/docs/html/config_timeshift.html index 6f20cbdc6..848219500 100644 --- a/docs/html/config_timeshift.html +++ b/docs/html/config_timeshift.html @@ -36,6 +36,11 @@ specify an unlimited period its highly recommended you specifying a value here. +
Max. RAM Size (MegaBytes) +
Specifies the maximum RAM (system memory) size for timeshift buffers. + When free RAM buffers are available, they are used instead storage to + save the timeshift data. +
Unlimited:
If checked, this allows the combined size of all timeshift buffers to potentially grow unbounded until your storage media runs out of space diff --git a/src/timeshift.c b/src/timeshift.c index c67753124..70de729e3 100644 --- a/src/timeshift.c +++ b/src/timeshift.c @@ -41,6 +41,8 @@ int timeshift_unlimited_period; uint32_t timeshift_max_period; int timeshift_unlimited_size; uint64_t timeshift_max_size; +uint64_t timeshift_ram_size; +uint64_t timeshift_ram_segment_size; /* * Intialise global file manager @@ -61,6 +63,8 @@ void timeshift_init ( void ) timeshift_max_period = 3600; // 1Hr timeshift_unlimited_size = 0; timeshift_max_size = 10000 * (size_t)1048576; // 10G + timeshift_ram_size = 0; + timeshift_ram_segment_size = 0; /* Load settings */ if ((m = hts_settings_load("timeshift/config"))) { @@ -77,6 +81,10 @@ void timeshift_init ( void ) timeshift_unlimited_size = u32 ? 1 : 0; if (!htsmsg_get_u32(m, "max_size", &u32)) timeshift_max_size = 1048576LL * u32; + if (!htsmsg_get_u32(m, "ram_size", &u32)) { + timeshift_ram_size = 1048576LL * u32; + timeshift_ram_segment_size = timeshift_ram_size / 10; + } htsmsg_destroy(m); } } @@ -107,6 +115,7 @@ void timeshift_save ( void ) htsmsg_add_u32(m, "max_period", timeshift_max_period); htsmsg_add_u32(m, "unlimited_size", timeshift_unlimited_size); htsmsg_add_u32(m, "max_size", timeshift_max_size / 1048576); + htsmsg_add_u32(m, "ram_size", timeshift_ram_size / 1048576); hts_settings_save(m, "timeshift/config"); } diff --git a/src/timeshift.h b/src/timeshift.h index db1736256..908fceaee 100644 --- a/src/timeshift.h +++ b/src/timeshift.h @@ -27,6 +27,9 @@ extern uint32_t timeshift_max_period; extern int timeshift_unlimited_size; extern uint64_t timeshift_max_size; extern uint64_t timeshift_total_size; +extern uint64_t timeshift_ram_size; +extern uint64_t timeshift_ram_segment_size; +extern uint64_t timeshift_total_ram_size; typedef struct timeshift_status { diff --git a/src/timeshift/private.h b/src/timeshift/private.h index db72d4027..5f193fbc0 100644 --- a/src/timeshift/private.h +++ b/src/timeshift/private.h @@ -51,12 +51,18 @@ typedef TAILQ_HEAD(timeshift_index_data_list,timeshift_index_data) timeshift_ind */ typedef struct timeshift_file { - int fd; ///< Write descriptor + int wfd; ///< Write descriptor + int rfd; ///< Read descriptor char *path; ///< Full path to file time_t time; ///< Files coarse timestamp size_t size; ///< Current file size; int64_t last; ///< Latest timestamp + off_t woff; ///< Write offset + off_t roff; ///< Read offset + + uint8_t *ram; ///< RAM area + int64_t ram_size; ///< RAM area size in bytes uint8_t bad; ///< File is broken @@ -66,6 +72,8 @@ typedef struct timeshift_file timeshift_index_data_list_t sstart; ///< Stream start messages TAILQ_ENTRY(timeshift_file) link; ///< List entry + + pthread_mutex_t ram_lock; ///< Mutex for the ram array access } timeshift_file_t; typedef TAILQ_HEAD(timeshift_file_list,timeshift_file) timeshift_file_list_t; @@ -113,15 +121,15 @@ typedef struct timeshift { /* * Write functions */ -ssize_t timeshift_write_start ( int fd, int64_t time, streaming_start_t *ss ); -ssize_t timeshift_write_sigstat ( int fd, int64_t time, signal_status_t *ss ); -ssize_t timeshift_write_packet ( int fd, int64_t time, th_pkt_t *pkt ); -ssize_t timeshift_write_mpegts ( int fd, int64_t time, void *data ); +ssize_t timeshift_write_start ( timeshift_file_t *tsf, int64_t time, streaming_start_t *ss ); +ssize_t timeshift_write_sigstat ( timeshift_file_t *tsf, int64_t time, signal_status_t *ss ); +ssize_t timeshift_write_packet ( timeshift_file_t *tsf, int64_t time, th_pkt_t *pkt ); +ssize_t timeshift_write_mpegts ( timeshift_file_t *tsf, int64_t time, void *data ); ssize_t timeshift_write_skip ( int fd, streaming_skip_t *skip ); ssize_t timeshift_write_speed ( int fd, int speed ); ssize_t timeshift_write_stop ( int fd, int code ); ssize_t timeshift_write_exit ( int fd ); -ssize_t timeshift_write_eof ( int fd ); +ssize_t timeshift_write_eof ( timeshift_file_t *tsf ); void timeshift_writer_flush ( timeshift_t *ts ); diff --git a/src/timeshift/timeshift_filemgr.c b/src/timeshift/timeshift_filemgr.c index 2cf8e00ca..33cf39b63 100644 --- a/src/timeshift/timeshift_filemgr.c +++ b/src/timeshift/timeshift_filemgr.c @@ -39,6 +39,7 @@ static pthread_mutex_t timeshift_reaper_lock; static pthread_cond_t timeshift_reaper_cond; uint64_t timeshift_total_size; +uint64_t timeshift_total_ram_size; /* ************************************************************************** * File reaper thread @@ -63,15 +64,19 @@ static void* timeshift_reaper_callback ( void *p ) TAILQ_REMOVE(×hift_reaper_list, tsf, link); pthread_mutex_unlock(×hift_reaper_lock); - tvhtrace("timeshift", "remove file %s", tsf->path); - - /* Remove */ - unlink(tsf->path); - dpath = dirname(tsf->path); - if (rmdir(dpath) == -1) - if (errno != ENOTEMPTY) - tvhlog(LOG_ERR, "timeshift", "failed to remove %s [e=%s]", - dpath, strerror(errno)); + if (tsf->path) { + tvhtrace("timeshift", "remove file %s", tsf->path); + + /* Remove */ + unlink(tsf->path); + dpath = dirname(tsf->path); + if (rmdir(dpath) == -1) + if (errno != ENOTEMPTY) + tvhlog(LOG_ERR, "timeshift", "failed to remove %s [e=%s]", + dpath, strerror(errno)); + } else { + tvhtrace("timeshift", "remove RAM segment (time %li)", (long)tsf->time); + } /* Free memory */ while ((ti = TAILQ_FIRST(&tsf->iframes))) { @@ -85,6 +90,7 @@ static void* timeshift_reaper_callback ( void *p ) free(tid); } free(tsf->path); + free(tsf->ram); free(tsf); pthread_mutex_lock(×hift_reaper_lock); @@ -96,7 +102,12 @@ static void* timeshift_reaper_callback ( void *p ) static void timeshift_reaper_remove ( timeshift_file_t *tsf ) { - tvhtrace("timeshift", "queue file for removal %s", tsf->path); +#if ENABLE_TRACE + if (tsf->path) + tvhtrace("timeshift", "queue file for removal %s", tsf->path); + else + tvhtrace("timeshift", "queue file for removal - RAM segment time %li", (long)tsf->time); +#endif pthread_mutex_lock(×hift_reaper_lock); TAILQ_INSERT_TAIL(×hift_reaper_list, tsf, link); pthread_cond_signal(×hift_reaper_cond); @@ -140,14 +151,17 @@ int timeshift_filemgr_makedirs ( int index, char *buf, size_t len ) */ void timeshift_filemgr_close ( timeshift_file_t *tsf ) { - ssize_t r = timeshift_write_eof(tsf->fd); + ssize_t r = timeshift_write_eof(tsf); if (r > 0) { tsf->size += r; atomic_add_u64(×hift_total_size, r); + if (tsf->ram) + atomic_add_u64(×hift_total_ram_size, r); } - close(tsf->fd); - tsf->fd = -1; + if (tsf->wfd >= 0) + close(tsf->wfd); + tsf->wfd = -1; } /* @@ -156,11 +170,19 @@ void timeshift_filemgr_close ( timeshift_file_t *tsf ) void timeshift_filemgr_remove ( timeshift_t *ts, timeshift_file_t *tsf, int force ) { - if (tsf->fd != -1) - close(tsf->fd); - tvhlog(LOG_DEBUG, "timeshift", "ts %d remove %s", ts->id, tsf->path); + if (tsf->wfd >= 0) + close(tsf->wfd); + assert(tsf->rfd < 0); +#if ENABLE_TRACE + if (tsf->path) + tvhdebug("timeshift", "ts %d remove %s", ts->id, tsf->path); + else + tvhdebug("timeshift", "ts %d RAM segment remove time %li", ts->id, (long)tsf->time); +#endif TAILQ_REMOVE(&ts->files, tsf, link); atomic_add_u64(×hift_total_size, -tsf->size); + if (tsf->ram) + atomic_add_u64(×hift_total_ram_size, -tsf->size); timeshift_reaper_remove(tsf); } @@ -176,6 +198,26 @@ void timeshift_filemgr_flush ( timeshift_t *ts, timeshift_file_t *end ) } } +/* + * + */ +static timeshift_file_t * timeshift_filemgr_file_init + ( timeshift_t *ts, time_t time ) +{ + timeshift_file_t *tsf; + + tsf = calloc(1, sizeof(timeshift_file_t)); + tsf->time = time; + tsf->last = getmonoclock(); + tsf->wfd = -1; + tsf->rfd = -1; + TAILQ_INIT(&tsf->iframes); + TAILQ_INIT(&tsf->sstart); + TAILQ_INSERT_TAIL(&ts->files, tsf, link); + pthread_mutex_init(&tsf->ram_lock, NULL); + return tsf; +} + /* * Get current / new file */ @@ -185,7 +227,7 @@ timeshift_file_t *timeshift_filemgr_get ( timeshift_t *ts, int create ) struct timespec tp; timeshift_file_t *tsf_tl, *tsf_hd, *tsf_tmp; timeshift_index_data_t *ti; - char path[512]; + char path[PATH_MAX]; time_t time; /* Return last file */ @@ -200,11 +242,12 @@ timeshift_file_t *timeshift_filemgr_get ( timeshift_t *ts, int create ) clock_gettime(CLOCK_MONOTONIC_COARSE, &tp); time = tp.tv_sec / TIMESHIFT_FILE_PERIOD; tsf_tl = TAILQ_LAST(&ts->files, timeshift_file_list); - if (!tsf_tl || tsf_tl->time != time) { + if (!tsf_tl || tsf_tl->time != time || + (tsf_tl->ram && tsf_tl->woff >= timeshift_ram_segment_size)) { tsf_hd = TAILQ_FIRST(&ts->files); /* Close existing */ - if (tsf_tl && tsf_tl->fd != -1) + if (tsf_tl) timeshift_filemgr_close(tsf_tl); /* Check period */ @@ -236,32 +279,48 @@ timeshift_file_t *timeshift_filemgr_get ( timeshift_t *ts, int create ) ts->full = 1; } } - + /* Create new file */ tsf_tmp = NULL; if (!ts->full) { - /* Create directories */ - if (!ts->path) { - if (timeshift_filemgr_makedirs(ts->id, path, sizeof(path))) - return NULL; - ts->path = strdup(path); + tvhtrace("timeshift", "ts %d RAM total %"PRId64" requested %"PRId64" segment %"PRId64, + ts->id, atomic_pre_add_u64(×hift_total_ram_size, 0), + timeshift_ram_size, timeshift_ram_segment_size); + if (timeshift_ram_size >= 8*1024*1024 && + atomic_pre_add_u64(×hift_total_ram_size, 0) < + timeshift_ram_size + (timeshift_ram_segment_size / 2)) { + tsf_tmp = timeshift_filemgr_file_init(ts, time); + tsf_tmp->ram_size = MIN(16*1024*1024, timeshift_ram_segment_size); + tsf_tmp->ram = malloc(tsf_tmp->ram_size); + if (!tsf_tmp->ram) { + free(tsf_tmp); + tsf_tmp = NULL; + } else { + tvhtrace("timeshift", "ts %d create RAM segment with %"PRId64" bytes (time %li)", + ts->id, tsf_tmp->ram_size, (long)time); + } } + + if (!tsf_tmp) { + /* Create directories */ + if (!ts->path) { + if (timeshift_filemgr_makedirs(ts->id, path, sizeof(path))) + return NULL; + ts->path = strdup(path); + } - /* Create File */ - snprintf(path, sizeof(path), "%s/tvh-%"PRItime_t, ts->path, time); - tvhtrace("timeshift", "ts %d create file %s", ts->id, path); - if ((fd = open(path, O_WRONLY | O_CREAT, 0600)) > 0) { - tsf_tmp = calloc(1, sizeof(timeshift_file_t)); - tsf_tmp->time = time; - tsf_tmp->fd = fd; - tsf_tmp->path = strdup(path); - tsf_tmp->refcount = 0; - tsf_tmp->last = getmonoclock(); - TAILQ_INIT(&tsf_tmp->iframes); - TAILQ_INIT(&tsf_tmp->sstart); - TAILQ_INSERT_TAIL(&ts->files, tsf_tmp, link); + /* Create File */ + snprintf(path, sizeof(path), "%s/tvh-%"PRItime_t, ts->path, time); + tvhtrace("timeshift", "ts %d create file %s", ts->id, path); + if ((fd = open(path, O_WRONLY | O_CREAT, 0600)) > 0) { + tsf_tmp = timeshift_filemgr_file_init(ts, time); + tsf_tmp->wfd = fd; + tsf_tmp->path = strdup(path); + } + } + if (tsf_tmp) { /* Copy across last start message */ if (tsf_tl && (ti = TAILQ_LAST(&tsf_tl->sstart, timeshift_index_data_list))) { tvhtrace("timeshift", "ts %d copy smt_start to new file", @@ -343,6 +402,7 @@ void timeshift_filemgr_init ( void ) /* Size processing */ timeshift_total_size = 0; + timeshift_ram_size = 0; /* Start the reaper thread */ timeshift_reaper_run = 1; @@ -371,5 +431,3 @@ void timeshift_filemgr_term ( void ) if (!timeshift_filemgr_get_root(path, sizeof(path))) rmtree(path); } - - diff --git a/src/timeshift/timeshift_reader.c b/src/timeshift/timeshift_reader.c index 58d744eba..d83ec7fb6 100644 --- a/src/timeshift/timeshift_reader.c +++ b/src/timeshift/timeshift_reader.c @@ -41,13 +41,30 @@ * File Reading * *************************************************************************/ -static ssize_t _read_pktbuf ( int fd, pktbuf_t **pktbuf ) +static ssize_t _read_buf ( timeshift_file_t *tsf, int fd, void *buf, size_t size ) +{ + if (tsf && tsf->ram) { + if (tsf->roff + size > tsf->woff) return -1; + pthread_mutex_lock(&tsf->ram_lock); + memcpy(buf, tsf->ram + tsf->roff, size); + tsf->roff += size; + pthread_mutex_unlock(&tsf->ram_lock); + return size; + } else { + size = read(tsf ? tsf->rfd : fd, buf, size); + if (size > 0 && tsf) + tsf->roff += size; + return size; + } +} + +static ssize_t _read_pktbuf ( timeshift_file_t *tsf, int fd, pktbuf_t **pktbuf ) { ssize_t r, cnt = 0; size_t sz; /* Size */ - r = read(fd, &sz, sizeof(sz)); + r = _read_buf(tsf, fd, &sz, sizeof(sz)); if (r < 0) return -1; if (r != sizeof(sz)) return 0; cnt += r; @@ -60,7 +77,7 @@ static ssize_t _read_pktbuf ( int fd, pktbuf_t **pktbuf ) /* Data */ *pktbuf = pktbuf_alloc(NULL, sz); - r = read(fd, (*pktbuf)->pb_data, sz); + r = _read_buf(tsf, fd, (*pktbuf)->pb_data, sz); if (r != sz) { free((*pktbuf)->pb_data); free(*pktbuf); @@ -72,7 +89,7 @@ static ssize_t _read_pktbuf ( int fd, pktbuf_t **pktbuf ) } -static ssize_t _read_msg ( int fd, streaming_message_t **sm ) +static ssize_t _read_msg ( timeshift_file_t *tsf, int fd, streaming_message_t **sm ) { ssize_t r, cnt = 0; size_t sz; @@ -85,7 +102,7 @@ static ssize_t _read_msg ( int fd, streaming_message_t **sm ) *sm = NULL; /* Size */ - r = read(fd, &sz, sizeof(sz)); + r = _read_buf(tsf, fd, &sz, sizeof(sz)); if (r < 0) return -1; if (r != sizeof(sz)) return 0; cnt += r; @@ -97,13 +114,13 @@ static ssize_t _read_msg ( int fd, streaming_message_t **sm ) if (sz > 1024 * 1024) return -1; /* Type */ - r = read(fd, &type, sizeof(type)); + r = _read_buf(tsf, fd, &type, sizeof(type)); if (r < 0) return -1; if (r != sizeof(type)) return 0; cnt += r; /* Time */ - r = read(fd, &time, sizeof(time)); + r = _read_buf(tsf, fd, &time, sizeof(time)); if (r < 0) return -1; if (r != sizeof(time)) return 0; cnt += r; @@ -127,7 +144,7 @@ static ssize_t _read_msg ( int fd, streaming_message_t **sm ) case SMT_EXIT: case SMT_SPEED: if (sz != sizeof(code)) return -1; - r = read(fd, &code, sz); + r = _read_buf(tsf, fd, &code, sz); if (r != sz) { if (r < 0) return -1; return 0; @@ -141,7 +158,7 @@ static ssize_t _read_msg ( int fd, streaming_message_t **sm ) case SMT_MPEGTS: case SMT_PACKET: data = malloc(sz); - r = read(fd, data, sz); + r = _read_buf(tsf, fd, data, sz); if (r != sz) { free(data); if (r < 0) return -1; @@ -152,13 +169,13 @@ static ssize_t _read_msg ( int fd, streaming_message_t **sm ) pkt->pkt_payload = pkt->pkt_meta = NULL; pkt->pkt_refcount = 0; *sm = streaming_msg_create_pkt(pkt); - r = _read_pktbuf(fd, &pkt->pkt_meta); + r = _read_pktbuf(tsf, fd, &pkt->pkt_meta); if (r < 0) { streaming_msg_free(*sm); return r; } cnt += r; - r = _read_pktbuf(fd, &pkt->pkt_payload); + r = _read_pktbuf(tsf, fd, &pkt->pkt_payload); if (r < 0) { streaming_msg_free(*sm); return r; @@ -317,24 +334,31 @@ static int _timeshift_skip * Output packet */ static int _timeshift_read - ( timeshift_t *ts, timeshift_file_t **cur_file, off_t *cur_off, int *fd, + ( timeshift_t *ts, timeshift_file_t **cur_file, streaming_message_t **sm, int *wait ) { - if (*cur_file) { + timeshift_file_t *tsf = *cur_file; + ssize_t r; + off_t off, ooff; + + if (tsf) { /* Open file */ - if (*fd < 0) { - tvhtrace("timeshift", "ts %d open file %s", - ts->id, (*cur_file)->path); - *fd = open((*cur_file)->path, O_RDONLY); - if (*fd < 0) + if (tsf->rfd < 0 && !tsf->ram) { + tsf->rfd = open(tsf->path, O_RDONLY); + tvhtrace("timeshift", "ts %d open file %s (fd %i)", ts->id, tsf->path, tsf->rfd); + if (tsf->rfd < 0) return -1; } - tvhtrace("timeshift", "ts %d seek to %jd", ts->id, (intmax_t)*cur_off); - lseek(*fd, *cur_off, SEEK_SET); + tvhtrace("timeshift", "ts %d seek to %jd (fd %i)", ts->id, tsf->roff, tsf->rfd); + if (tsf->rfd >= 0) + if ((off = lseek(tsf->rfd, tsf->roff, SEEK_SET)) != tsf->roff) + tvherror("timeshift", "seek to %s failed (off %"PRId64" != %"PRId64"): %s", + tsf->path, (int64_t)tsf->roff, (int64_t)off, strerror(errno)); /* Read msg */ - ssize_t r = _read_msg(*fd, sm); + ooff = tsf->roff; + r = _read_msg(tsf, -1, sm); if (r < 0) { streaming_message_t *e = streaming_msg_create_code(SMT_STOP, SM_CODE_UNDEFINED_ERROR); streaming_target_deliver2(ts->output, e); @@ -349,21 +373,25 @@ static int _timeshift_read /* Incomplete */ if (r == 0) { - lseek(*fd, *cur_off, SEEK_SET); + if (tsf->rfd >= 0) { + tvhtrace("timeshift", "ts %d seek to %jd (fd %i) (incomplete)", ts->id, tsf->roff, tsf->rfd); + if ((off = lseek(tsf->rfd, ooff, SEEK_SET)) != ooff) + tvherror("timeshift", "seek to %s failed (off %"PRId64" != %"PRId64"): %s", + tsf->path, (int64_t)ooff, (int64_t)off, strerror(errno)); + } + tsf->roff = ooff; return 0; } - /* Update */ - *cur_off += r; - /* Special case - EOF */ - if (r == sizeof(size_t) || *cur_off > (*cur_file)->size) { - close(*fd); - *fd = -1; + if (r == sizeof(size_t) || tsf->roff > tsf->size) { + if (tsf->rfd >= 0) + close(tsf->rfd); + tsf->rfd = -1; pthread_mutex_lock(&ts->rdwr_mutex); - *cur_file = timeshift_filemgr_next(*cur_file, NULL, 0); + *cur_file = timeshift_filemgr_next(tsf, NULL, 0); pthread_mutex_unlock(&ts->rdwr_mutex); - *cur_off = 0; // reset + tsf->roff = 0; // reset *wait = 0; /* Check SMT_START index */ @@ -386,12 +414,12 @@ static int _timeshift_read * Flush all data to live */ static int _timeshift_flush_to_live - ( timeshift_t *ts, timeshift_file_t **cur_file, off_t *cur_off, int *fd, + ( timeshift_t *ts, timeshift_file_t **cur_file, streaming_message_t **sm, int *wait ) { time_t pts = 0; while (*cur_file) { - if (_timeshift_read(ts, cur_file, cur_off, fd, sm, wait) == -1) + if (_timeshift_read(ts, cur_file, sm, wait) == -1) return -1; if (!*sm) break; if ((*sm)->sm_type == SMT_PACKET) { @@ -409,15 +437,15 @@ static int _timeshift_flush_to_live * Thread * *************************************************************************/ + /* * Timeshift thread */ void *timeshift_reader ( void *p ) { timeshift_t *ts = p; - int nfds, end, fd = -1, run = 1, wait = -1; + int nfds, end, run = 1, wait = -1; timeshift_file_t *cur_file = NULL; - off_t cur_off = 0; int cur_speed = 100, keyframe_mode = 0; int64_t pause_time = 0, play_time = 0, last_time = 0; int64_t now, deliver, skip_time = 0; @@ -454,7 +482,7 @@ void *timeshift_reader ( void *p ) /* Control */ pthread_mutex_lock(&ts->state_mutex); if (nfds == 1) { - if (_read_msg(ts->rd_pipe.rd, &ctrl) > 0) { + if (_read_msg(NULL, ts->rd_pipe.rd, &ctrl) > 0) { /* Exit */ if (ctrl->sm_type == SMT_EXIT) { @@ -494,10 +522,10 @@ void *timeshift_reader ( void *p ) ts->id); timeshift_writer_flush(ts); pthread_mutex_lock(&ts->rdwr_mutex); - if ((cur_file = timeshift_filemgr_get(ts, 1))) { - cur_off = cur_file->size; - pause_time = cur_file->last; - last_time = pause_time; + if ((cur_file = timeshift_filemgr_get(ts, 1))) { + cur_file->roff = cur_file->size; + pause_time = cur_file->last; + last_time = pause_time; } pthread_mutex_unlock(&ts->rdwr_mutex); } @@ -576,9 +604,9 @@ void *timeshift_reader ( void *p ) /* Live playback (stage1) */ if (ts->state == TS_LIVE) { pthread_mutex_lock(&ts->rdwr_mutex); - if ((cur_file = timeshift_filemgr_get(ts, !ts->ondemand))) { - cur_off = cur_file->size; - last_time = cur_file->last; + if ((cur_file = timeshift_filemgr_get(ts, !ts->ondemand))) { + cur_file->roff = cur_file->size; + last_time = cur_file->last; } else { tvhlog(LOG_ERR, "timeshift", "ts %d failed to get current file", ts->id); skip = NULL; @@ -695,23 +723,24 @@ void *timeshift_reader ( void *p ) tvhlog(LOG_DEBUG, "timeshift", "ts %d skip found pkt @ %"PRId64, ts->id, tsi->time); /* File changed (close) */ - if ((tsf != cur_file) && (fd != -1)) { - close(fd); - fd = -1; + if ((tsf != cur_file) && cur_file && cur_file->rfd >= 0) { + close(cur_file->rfd); + cur_file->rfd = -1; } /* Position */ if (cur_file) cur_file->refcount--; - cur_file = tsf; - if (tsi) - cur_off = tsi->pos; - else - cur_off = 0; + if ((cur_file = tsf) != NULL) { + if (tsi) + cur_file->roff = tsi->pos; + else + cur_file->roff = 0; + } } /* Find packet */ - if (_timeshift_read(ts, &cur_file, &cur_off, &fd, &sm, &wait) == -1) { + if (_timeshift_read(ts, &cur_file, &sm, &wait) == -1) { pthread_mutex_unlock(&ts->state_mutex); break; } @@ -782,13 +811,13 @@ void *timeshift_reader ( void *p ) streaming_target_deliver2(ts->output, ctrl); /* Flush timeshift buffer to live */ - if (_timeshift_flush_to_live(ts, &cur_file, &cur_off, &fd, &sm, &wait) == -1) + if (_timeshift_flush_to_live(ts, &cur_file, &sm, &wait) == -1) break; /* Close file (if open) */ - if (fd != -1) { - close(fd); - fd = -1; + if (cur_file && cur_file->rfd >= 0) { + close(cur_file->rfd); + cur_file->rfd = -1; } /* Flush ALL files */ @@ -823,7 +852,10 @@ void *timeshift_reader ( void *p ) /* Cleanup */ tvhpoll_destroy(pd); - if (fd != -1) close(fd); + if (cur_file && cur_file->rfd >= 0) { + close(cur_file->rfd); + cur_file->rfd = -1; + } if (sm) streaming_msg_free(sm); if (ctrl) streaming_msg_free(ctrl); tvhtrace("timeshift", "ts %d exit reader thread", ts->id); diff --git a/src/timeshift/timeshift_writer.c b/src/timeshift/timeshift_writer.c index 40a0a6970..2714344e8 100644 --- a/src/timeshift/timeshift_writer.c +++ b/src/timeshift/timeshift_writer.c @@ -36,7 +36,7 @@ /* * Write data (retry on EAGAIN) */ -static ssize_t _write +static ssize_t _write_fd ( int fd, const void *buf, size_t count ) { ssize_t r; @@ -54,25 +54,76 @@ static ssize_t _write return count == n ? n : -1; } +static ssize_t _write + ( timeshift_file_t *tsf, const void *buf, size_t count ) +{ + uint8_t *ram; + size_t alloc; + if (tsf->ram) { + pthread_mutex_lock(&tsf->ram_lock); + if (tsf->ram_size < tsf->woff + count) { + if (tsf->ram_size >= timeshift_ram_segment_size) + alloc = MAX(count, 64*1024); + else + alloc = MAX(count, 4*1024*1024); + ram = realloc(tsf->ram, tsf->ram_size + alloc); + if (ram == NULL) { + tvhwarn("timeshift", "RAM timeshift memalloc failed"); + pthread_mutex_unlock(&tsf->ram_lock); + return -1; + } + tsf->ram = ram; + tsf->ram_size += alloc; + } + memcpy(tsf->ram + tsf->woff, buf, count); + tsf->woff += count; + pthread_mutex_unlock(&tsf->ram_lock); + return count; + } + return _write_fd(tsf->wfd, buf, count); +} + /* * Write message */ static ssize_t _write_msg + ( timeshift_file_t *tsf, streaming_message_type_t type, int64_t time, + const void *buf, size_t len ) +{ + size_t len2 = len + sizeof(type) + sizeof(time); + ssize_t err, ret; + ret = err = _write(tsf, &len2, sizeof(len2)); + if (err < 0) return err; + err = _write(tsf, &type, sizeof(type)); + if (err < 0) return err; + ret += err; + err = _write(tsf, &time, sizeof(time)); + if (err < 0) return err; + ret += err; + if (len) { + err = _write(tsf, buf, len); + if (err < 0) return err; + ret += err; + } + return ret; +} + +static ssize_t _write_msg_fd ( int fd, streaming_message_type_t type, int64_t time, const void *buf, size_t len ) { size_t len2 = len + sizeof(type) + sizeof(time); ssize_t err, ret; - ret = err = _write(fd, &len2, sizeof(len2)); + ret = err = _write_fd(fd, &len2, sizeof(len2)); if (err < 0) return err; - err = _write(fd, &type, sizeof(type)); + err = _write_fd(fd, &type, sizeof(type)); if (err < 0) return err; ret += err; - err = _write(fd, &time, sizeof(time)); + err = _write_fd(fd, &time, sizeof(time)); if (err < 0) return err; ret += err; if (len) { - err = _write(fd, buf, len); + err = _write_fd(fd, buf, len); if (err < 0) return err; ret += err; } @@ -82,18 +133,18 @@ static ssize_t _write_msg /* * Write packet buffer */ -static int _write_pktbuf ( int fd, pktbuf_t *pktbuf ) +static int _write_pktbuf ( timeshift_file_t *tsf, pktbuf_t *pktbuf ) { ssize_t ret, err; if (pktbuf) { - ret = err = _write(fd, &pktbuf->pb_size, sizeof(pktbuf->pb_size)); + ret = err = _write(tsf, &pktbuf->pb_size, sizeof(pktbuf->pb_size)); if (err < 0) return err; - err = _write(fd, pktbuf->pb_data, pktbuf->pb_size); + err = _write(tsf, pktbuf->pb_data, pktbuf->pb_size); if (err < 0) return err; ret += err; } else { size_t sz = 0; - ret = _write(fd, &sz, sizeof(sz)); + ret = _write(tsf, &sz, sizeof(sz)); } return ret; } @@ -102,24 +153,24 @@ static int _write_pktbuf ( int fd, pktbuf_t *pktbuf ) * Write signal status */ ssize_t timeshift_write_sigstat - ( int fd, int64_t time, signal_status_t *sigstat ) + ( timeshift_file_t *tsf, int64_t time, signal_status_t *sigstat ) { - return _write_msg(fd, SMT_SIGNAL_STATUS, time, sigstat, + return _write_msg(tsf, SMT_SIGNAL_STATUS, time, sigstat, sizeof(signal_status_t)); } /* * Write packet */ -ssize_t timeshift_write_packet ( int fd, int64_t time, th_pkt_t *pkt ) +ssize_t timeshift_write_packet ( timeshift_file_t *tsf, int64_t time, th_pkt_t *pkt ) { ssize_t ret = 0, err; - ret = err = _write_msg(fd, SMT_PACKET, time, pkt, sizeof(th_pkt_t)); + ret = err = _write_msg(tsf, SMT_PACKET, time, pkt, sizeof(th_pkt_t)); if (err <= 0) return err; - err = _write_pktbuf(fd, pkt->pkt_meta); + err = _write_pktbuf(tsf, pkt->pkt_meta); if (err <= 0) return err; ret += err; - err = _write_pktbuf(fd, pkt->pkt_payload); + err = _write_pktbuf(tsf, pkt->pkt_payload); if (err <= 0) return err; ret += err; return ret; @@ -128,9 +179,9 @@ ssize_t timeshift_write_packet ( int fd, int64_t time, th_pkt_t *pkt ) /* * Write MPEGTS data */ -ssize_t timeshift_write_mpegts ( int fd, int64_t time, void *data ) +ssize_t timeshift_write_mpegts ( timeshift_file_t *tsf, int64_t time, void *data ) { - return _write_msg(fd, SMT_MPEGTS, time, data, 188); + return _write_msg(tsf, SMT_MPEGTS, time, data, 188); } /* @@ -138,7 +189,7 @@ ssize_t timeshift_write_mpegts ( int fd, int64_t time, void *data ) */ ssize_t timeshift_write_skip ( int fd, streaming_skip_t *skip ) { - return _write_msg(fd, SMT_SKIP, 0, skip, sizeof(streaming_skip_t)); + return _write_msg_fd(fd, SMT_SKIP, 0, skip, sizeof(streaming_skip_t)); } /* @@ -146,7 +197,7 @@ ssize_t timeshift_write_skip ( int fd, streaming_skip_t *skip ) */ ssize_t timeshift_write_speed ( int fd, int speed ) { - return _write_msg(fd, SMT_SPEED, 0, &speed, sizeof(speed)); + return _write_msg_fd(fd, SMT_SPEED, 0, &speed, sizeof(speed)); } /* @@ -154,7 +205,7 @@ ssize_t timeshift_write_speed ( int fd, int speed ) */ ssize_t timeshift_write_stop ( int fd, int code ) { - return _write_msg(fd, SMT_STOP, 0, &code, sizeof(code)); + return _write_msg_fd(fd, SMT_STOP, 0, &code, sizeof(code)); } /* @@ -163,16 +214,16 @@ ssize_t timeshift_write_stop ( int fd, int code ) ssize_t timeshift_write_exit ( int fd ) { int code = 0; - return _write_msg(fd, SMT_EXIT, 0, &code, sizeof(code)); + return _write_msg_fd(fd, SMT_EXIT, 0, &code, sizeof(code)); } /* * Write end of file (special internal message) */ -ssize_t timeshift_write_eof ( int fd ) +ssize_t timeshift_write_eof ( timeshift_file_t *tsf ) { size_t sz = 0; - return _write(fd, &sz, sizeof(sz)); + return _write(tsf, &sz, sizeof(sz)); } /* ************************************************************************** @@ -200,9 +251,9 @@ static inline ssize_t _process_msg0 if (SCT_ISVIDEO(ss->ss_components[i].ssc_type)) ts->vididx = ss->ss_components[i].ssc_index; } else if (sm->sm_type == SMT_SIGNAL_STATUS) - err = timeshift_write_sigstat(tsf->fd, sm->sm_time, sm->sm_data); + err = timeshift_write_sigstat(tsf, sm->sm_time, sm->sm_data); else if (sm->sm_type == SMT_PACKET) { - err = timeshift_write_packet(tsf->fd, sm->sm_time, sm->sm_data); + err = timeshift_write_packet(tsf, sm->sm_time, sm->sm_data); if (err > 0) { th_pkt_t *pkt = sm->sm_data; @@ -216,7 +267,7 @@ static inline ssize_t _process_msg0 } } } else if (sm->sm_type == SMT_MPEGTS) - err = timeshift_write_mpegts(tsf->fd, sm->sm_time, sm->sm_data); + err = timeshift_write_mpegts(tsf, sm->sm_time, sm->sm_data); else err = 0; @@ -225,6 +276,8 @@ static inline ssize_t _process_msg0 tsf->last = sm->sm_time; tsf->size += err; atomic_add_u64(×hift_total_size, err); + if (tsf->ram) + atomic_add_u64(×hift_total_ram_size, err); } return err; } @@ -265,7 +318,7 @@ static void _process_msg case SMT_MPEGTS: case SMT_PACKET: pthread_mutex_lock(&ts->rdwr_mutex); - if ((tsf = timeshift_filemgr_get(ts, 1)) && (tsf->fd != -1)) { + if ((tsf = timeshift_filemgr_get(ts, 1)) && (tsf->wfd >= 0 || tsf->ram)) { if ((err = _process_msg0(ts, tsf, &sm)) < 0) { timeshift_filemgr_close(tsf); tsf->bad = 1; diff --git a/src/webui/extjs.c b/src/webui/extjs.c index 248f6c1aa..0053eaaee 100755 --- a/src/webui/extjs.c +++ b/src/webui/extjs.c @@ -677,6 +677,7 @@ extjs_timeshift(http_connection_t *hc, const char *remain, void *opaque) htsmsg_add_u32(m, "timeshift_max_period", timeshift_max_period / 60); htsmsg_add_u32(m, "timeshift_unlimited_size", timeshift_unlimited_size); htsmsg_add_u32(m, "timeshift_max_size", timeshift_max_size / 1048576); + htsmsg_add_u32(m, "timeshift_ram_size", timeshift_ram_size / 1048576); pthread_mutex_unlock(&global_lock); out = json_single_record(m, "config"); @@ -696,6 +697,10 @@ extjs_timeshift(http_connection_t *hc, const char *remain, void *opaque) timeshift_unlimited_size = http_arg_get(&hc->hc_req_args, "timeshift_unlimited_size") ? 1 : 0; if ((str = http_arg_get(&hc->hc_req_args, "timeshift_max_size"))) timeshift_max_size = atol(str) * 1048576LL; + if ((str = http_arg_get(&hc->hc_req_args, "timeshift_ram_size"))) { + timeshift_ram_size = atol(str) * 1048576LL; + timeshift_ram_segment_size = timeshift_ram_size / 10; + } timeshift_save(); pthread_mutex_unlock(&global_lock); diff --git a/src/webui/static/app/timeshift.js b/src/webui/static/app/timeshift.js index 017512ce3..ee7ada92e 100644 --- a/src/webui/static/app/timeshift.js +++ b/src/webui/static/app/timeshift.js @@ -12,7 +12,8 @@ tvheadend.timeshift = function(panel, index) { 'timeshift_enabled', 'timeshift_ondemand', 'timeshift_path', 'timeshift_unlimited_period', 'timeshift_max_period', - 'timeshift_unlimited_size', 'timeshift_max_size' + 'timeshift_unlimited_size', 'timeshift_max_size', + 'timeshift_ram_size' ] ); @@ -59,6 +60,13 @@ tvheadend.timeshift = function(panel, index) { width: 300 }); + var timeshiftRamSize = new Ext.form.NumberField({ + fieldLabel: 'Max. RAM Size (MB)', + name: 'timeshift_ram_size', + allowBlank: false, + width: 300 + }); + var timeshiftUnlSize = new Ext.form.Checkbox({ fieldLabel: 'Unlimited size', name: 'timeshift_unlimited_size', @@ -100,7 +108,7 @@ tvheadend.timeshift = function(panel, index) { width: 500, autoHeight: true, border: false, - items : [timeshiftMaxPeriod, timeshiftMaxSize] + items : [timeshiftMaxPeriod, timeshiftMaxSize, timeshiftRamSize] }); var timeshiftPanelB = new Ext.form.FieldSet({