src/epggrab/support/freesat_huffman.c \
SRCS += src/plumbing/tsfix.c \
- src/plumbing/globalheaders.c \
+ src/plumbing/globalheaders.c
SRCS += src/dvr/dvr_db.c \
src/dvr/dvr_rec.c \
# Optional code
#
+# Timeshift
+SRCS-${CONFIG_TIMESHIFT} += \
+ src/timeshift.c \
+ src/timeshift/timeshift_filemgr.c \
+ src/timeshift/timeshift_writer.c \
+ src/timeshift/timeshift_reader.c \
+
# DVB
SRCS-${CONFIG_LINUXDVB} += \
src/dvb/dvb.c \
"avahi:auto"
"zlib:auto"
"libav:auto"
+ "timeshift:no"
"bundle:no"
"dvbcsa:no"
)
void config_init ( void )
{
+ int save = 0;
+ uint32_t u32;
+
config = hts_settings_load("config");
if (!config) {
tvhlog(LOG_DEBUG, "config", "no configuration, loading defaults");
config = htsmsg_create_map();
}
+
+ /* Defaults */
+ if (htsmsg_get_u32(config, "timeshiftperiod", &u32))
+ save |= config_set_timeshift_period(0);
+ if (htsmsg_get_u32(config, "timeshiftsize", &u32))
+ save |= config_set_timeshift_size(0);
+
+ /* Save defaults */
+ if (save)
+ config_save();
}
void config_save ( void )
return htsmsg_copy(config);
}
-const char *config_get_language ( void )
+static int _config_set_str ( const char *fld, const char *val )
{
- return htsmsg_get_str(config, "language");
+ const char *c = htsmsg_get_str(config, fld);
+ if (!c || strcmp(c, val)) {
+ if (c) htsmsg_delete_field(config, fld);
+ htsmsg_add_str(config, fld, val);
+ return 1;
+ }
+ return 0;
}
-int config_set_language ( const char *lang )
+static int _config_set_u32 ( const char *fld, uint32_t val )
{
- const char *c = config_get_language();
- if (!c || strcmp(c, lang)) {
- if (c) htsmsg_delete_field(config, "language");
- htsmsg_add_str(config, "language", lang);
+ uint32_t u32;
+ int ret = htsmsg_get_u32(config, fld, &u32);
+ if (ret || (u32 != val)) {
+ if (!ret) htsmsg_delete_field(config, fld);
+ htsmsg_add_u32(config, fld, val);
return 1;
}
return 0;
}
+const char *config_get_language ( void )
+{
+ return htsmsg_get_str(config, "language");
+}
+
+int config_set_language ( const char *lang )
+{
+ return _config_set_str("language", lang);
+}
+
const char *config_get_muxconfpath ( void )
{
return htsmsg_get_str(config, "muxconfpath");
int config_set_muxconfpath ( const char *path )
{
- const char *c = config_get_muxconfpath();
- if (!c || strcmp(c, path)) {
- if (c) htsmsg_delete_field(config, "muxconfpath");
- htsmsg_add_str(config, "muxconfpath", path);
- return 1;
- }
- return 0;
+ return _config_set_str("muxconfpath", path);
+}
+
+const char *config_get_timeshift_path ( void )
+{
+ return htsmsg_get_str(config, "timeshiftpath");
+}
+
+int config_set_timeshift_path ( const char *path )
+{
+ return _config_set_str("timeshiftpath", path);
+}
+
+uint32_t config_get_timeshift_period ( void )
+{
+ return htsmsg_get_u32_or_default(config, "timeshiftperiod", 0);
+}
+
+int config_set_timeshift_period ( uint32_t period )
+{
+ return _config_set_u32("timeshiftperiod", period);
+}
+
+uint32_t config_get_timeshift_size ( void )
+{
+ return htsmsg_get_u32_or_default(config, "timeshiftsize", 0);
+}
+
+int config_set_timeshift_size ( uint32_t size )
+{
+ return _config_set_u32("timeshiftsize", size);
}
int config_set_language ( const char *str )
__attribute__((warn_unused_result));
+const char *config_get_timeshift_path ( void );
+int config_set_timeshift_path ( const char *str )
+ __attribute__((warn_unused_result));
+
+uint32_t config_get_timeshift_period ( void );
+int config_set_timeshift_period ( uint32_t val )
+ __attribute__((warn_unused_result));
+
+uint32_t config_get_timeshift_size ( void );
+int config_set_timeshift_size ( uint32_t val )
+ __attribute__((warn_unused_result));
+
#endif /* __TVH_CONFIG__H__ */
}
break;
+ case SMT_SPEED:
+ case SMT_SKIP:
case SMT_SIGNAL_STATUS:
break;
#include "epg.h"
#include "plumbing/tsfix.h"
#include "imagecache.h"
+#if ENABLE_TIMESHIFT
+#include "timeshift.h"
+#endif
#include <sys/statvfs.h>
#include "settings.h"
streaming_target_t hs_input;
streaming_target_t *hs_tsfix;
+#if ENABLE_TIMESHIFT
+ streaming_target_t *hs_tshift;
+#endif
+
htsp_msg_q_t hs_q;
time_t hs_last_report; /* Last queue status report sent */
if(hs->hs_tsfix != NULL)
tsfix_destroy(hs->hs_tsfix);
htsp_flush_queue(htsp, &hs->hs_q);
+#if ENABLE_TIMESHIFT
+ if(hs->hs_tshift) timeshift_destroy(hs->hs_tshift);
+#endif
free(hs);
}
htsp_method_subscribe(htsp_connection_t *htsp, htsmsg_t *in)
{
uint32_t chid, sid, weight, req90khz, normts;
+#if ENABLE_TIMESHIFT
+ uint32_t timeshiftPeriod;
+#endif
channel_t *ch;
htsp_subscription_t *hs;
const char *str;
req90khz = htsmsg_get_u32_or_default(in, "90khz", 0);
normts = htsmsg_get_u32_or_default(in, "normts", 0);
+#if ENABLE_TIMESHIFT
+ timeshiftPeriod = htsmsg_get_u32_or_default(in, "timeshiftPeriod", 0);
+ timeshiftPeriod = MIN(timeshiftPeriod, config_get_timeshift_period());
+#endif
+
/*
* We send the reply now to avoid the user getting the 'subscriptionStart'
* async message before the reply to 'subscribe'.
LIST_INSERT_HEAD(&htsp->htsp_subscriptions, hs, hs_link);
streaming_target_init(&hs->hs_input, htsp_streaming_input, hs, 0);
- streaming_target_t *st;
+ streaming_target_t *st = &hs->hs_input;
- if(normts) {
- hs->hs_tsfix = tsfix_create(&hs->hs_input);
- st = hs->hs_tsfix;
- } else {
- st = &hs->hs_input;
+ if(normts)
+ st = hs->hs_tsfix = tsfix_create(st);
+#if ENABLE_TIMESHIFT
+ if (timeshiftPeriod != 0) {
+ if (timeshiftPeriod == ~0)
+ tvhlog(LOG_DEBUG, "htsp", "using timeshift buffer (unlimited)");
+ else
+ tvhlog(LOG_DEBUG, "htsp", "using timeshift buffer (%u mins)", timeshiftPeriod / 60);
+ st = hs->hs_tshift = timeshift_create(st, timeshiftPeriod);
}
+#endif
hs->hs_s = subscription_create_from_channel(ch, weight,
htsp->htsp_logname,
return NULL;
}
+/**
+ * Skip stream
+ */
+static htsmsg_t *
+htsp_method_skip(htsp_connection_t *htsp, htsmsg_t *in)
+{
+ htsp_subscription_t *hs;
+ uint32_t sid, abs;
+ int64_t s64;
+ streaming_skip_t skip;
+
+ if(htsmsg_get_u32(in, "subscriptionId", &sid))
+ return htsp_error("Missing argument 'subscriptionId'");
+
+ abs = htsmsg_get_u32_or_default(in, "absolute", 0);
+
+ if(!htsmsg_get_s64(in, "time", &s64)) {
+ skip.type = abs ? SMT_SKIP_ABS_TIME : SMT_SKIP_REL_TIME;
+ skip.time = s64;
+ } else if (!htsmsg_get_s64(in, "size", &s64)) {
+ skip.type = abs ? SMT_SKIP_ABS_SIZE : SMT_SKIP_REL_SIZE;
+ skip.size = s64;
+ } else {
+ return htsp_error("Missing argument 'time' or 'size'");
+ }
+
+ LIST_FOREACH(hs, &htsp->htsp_subscriptions, hs_link)
+ if(hs->hs_sid == sid)
+ break;
+
+ if(hs == NULL)
+ return htsp_error("Requested subscription does not exist");
+
+ subscription_set_skip(hs->hs_s, &skip);
+
+ htsp_reply(htsp, in, htsmsg_create_map());
+ return NULL;
+}
+
+/*
+ * Set stream speed
+ */
+static htsmsg_t *
+htsp_method_speed(htsp_connection_t *htsp, htsmsg_t *in)
+{
+ htsp_subscription_t *hs;
+ uint32_t sid;
+ int32_t speed;
+
+ if(htsmsg_get_u32(in, "subscriptionId", &sid))
+ return htsp_error("Missing argument 'subscriptionId'");
+ if(htsmsg_get_s32(in, "speed", &speed))
+ return htsp_error("Missing argument 'speed'");
+
+ LIST_FOREACH(hs, &htsp->htsp_subscriptions, hs_link)
+ if(hs->hs_sid == sid)
+ break;
+
+ if(hs == NULL)
+ return htsp_error("Requested subscription does not exist");
+
+ subscription_set_speed(hs->hs_s, speed);
+
+ htsp_reply(htsp, in, htsmsg_create_map());
+ return NULL;
+}
/**
* Open file
{ "subscribe", htsp_method_subscribe, ACCESS_STREAMING},
{ "unsubscribe", htsp_method_unsubscribe, ACCESS_STREAMING},
{ "subscriptionChangeWeight", htsp_method_change_weight, ACCESS_STREAMING},
+ { "subscriptionSkip", htsp_method_skip, ACCESS_STREAMING},
+ { "subscriptionSpeed", htsp_method_speed, ACCESS_STREAMING},
{ "fileOpen", htsp_method_file_open, ACCESS_RECORDER},
{ "fileRead", htsp_method_file_read, ACCESS_RECORDER},
{ "fileClose", htsp_method_file_close, ACCESS_RECORDER},
* Build a htsmsg from a th_pkt and enqueue it on our HTSP service
*/
static void
+#if ENABLE_TIMESHIFT
+htsp_stream_deliver(htsp_subscription_t *hs, th_pkt_t *pkt, uint64_t timeshift)
+#else
htsp_stream_deliver(htsp_subscription_t *hs, th_pkt_t *pkt)
+#endif
{
htsmsg_t *m;
htsp_msg_t *hm;
htsmsg_add_u32(m, "stream", pkt->pkt_componentindex);
htsmsg_add_u32(m, "com", pkt->pkt_commercial);
+#if ENABLE_TIMESHIFT
+ if (timeshift)
+ htsmsg_add_s64(m, "timeshift", timeshift);
+#endif
+
if(pkt->pkt_pts != PTS_UNSET) {
int64_t pts = hs->hs_90khz ? pkt->pkt_pts : ts_rescale(pkt->pkt_pts, 1000000);
htsp_send_message(hs->hs_htsp, m, &hs->hs_htsp->htsp_hmq_qstatus);
}
+/**
+ *
+ */
+static void
+htsp_subscription_speed(htsp_subscription_t *hs, int speed)
+{
+ htsmsg_t *m = htsmsg_create_map();
+ htsmsg_add_str(m, "method", "subscriptionSpeed");
+ htsmsg_add_u32(m, "subscriptionId", hs->hs_sid);
+ htsmsg_add_u32(m, "speed", speed);
+ htsp_send(hs->hs_htsp, m, NULL, &hs->hs_q, 0);
+}
+
/**
*
*/
switch(sm->sm_type) {
case SMT_PACKET:
- htsp_stream_deliver(hs, sm->sm_data); // reference is transfered
+#if ENABLE_TIMESHIFT
+ htsp_stream_deliver(hs, sm->sm_data, sm->sm_timeshift);
+#else
+ htsp_stream_deliver(hs, sm->sm_data);
+#endif
+ // reference is transfered
sm->sm_data = NULL;
break;
case SMT_EXIT:
abort();
+
+ case SMT_SKIP:
+ break;
+
+ case SMT_SPEED:
+ htsp_subscription_speed(hs, sm->sm_code);
+ break;
}
streaming_msg_free(sm);
}
#include "muxes.h"
#include "config2.h"
#include "imagecache.h"
+#include "timeshift.h"
#if ENABLE_LIBAV
#include "libav.h"
#endif
v4l_init();
#endif
+#if ENABLE_TIMESHIFT
+ timeshift_init();
+#endif
+
tcp_server_init();
http_server_init();
webui_init();
epg_save();
+#if ENABLE_TIMESHIFT
+ timeshift_term();
+#endif
+
tvhlog(LOG_NOTICE, "STOP", "Exiting HTS Tvheadend");
if(opt_fork)
#define PKT_B_FRAME 3
#define PKT_NTYPES 4
+static inline char pkt_frametype_to_char ( int frametype )
+{
+ if (frametype == PKT_I_FRAME) return 'I';
+ if (frametype == PKT_P_FRAME) return 'P';
+ if (frametype == PKT_B_FRAME) return 'B';
+ return ' ';
+}
+
typedef struct th_pkt {
int64_t pkt_dts;
int64_t pkt_pts;
case SMT_SIGNAL_STATUS:
case SMT_NOSTART:
case SMT_MPEGTS:
+ case SMT_SPEED:
+ case SMT_SKIP:
streaming_target_deliver2(gh->gh_output, sm);
break;
}
case SMT_SIGNAL_STATUS:
case SMT_NOSTART:
case SMT_MPEGTS:
+ case SMT_SKIP:
+ case SMT_SPEED:
streaming_target_deliver2(gh->gh_output, sm);
break;
case SMT_SIGNAL_STATUS:
case SMT_NOSTART:
case SMT_MPEGTS:
+ case SMT_SPEED:
+ case SMT_SKIP:
break;
}
{
streaming_message_t *sm = malloc(sizeof(streaming_message_t));
sm->sm_type = type;
+#if ENABLE_TIMESHIFT
+ sm->sm_time = 0;
+ sm->sm_timeshift = 0;
+#endif
return sm;
}
streaming_message_t *dst = malloc(sizeof(streaming_message_t));
streaming_start_t *ss;
- dst->sm_type = src->sm_type;
+ dst->sm_type = src->sm_type;
+#if ENABLE_TIMESHIFT
+ dst->sm_time = src->sm_time;
+ dst->sm_timeshift = src->sm_timeshift;
+#endif
switch(src->sm_type) {
atomic_add(&ss->ss_refcount, 1);
break;
+ case SMT_SKIP:
+ dst->sm_data = malloc(sizeof(streaming_skip_t));
+ memcpy(dst->sm_data, src->sm_data, sizeof(streaming_skip_t));
+ break;
+
case SMT_SIGNAL_STATUS:
dst->sm_data = malloc(sizeof(signal_status_t));
memcpy(dst->sm_data, src->sm_data, sizeof(signal_status_t));
break;
+ case SMT_SPEED:
case SMT_STOP:
case SMT_SERVICE_STATUS:
case SMT_NOSTART:
break;
case SMT_STOP:
- break;
-
case SMT_EXIT:
- break;
-
case SMT_SERVICE_STATUS:
- break;
-
case SMT_NOSTART:
+ case SMT_SPEED:
break;
+ case SMT_SKIP:
case SMT_SIGNAL_STATUS:
free(sm->sm_data);
break;
"Dummy join %s ok", id);
}
-
-
/**
*
*/
{
gtimer_arm(&every_sec, every_sec_cb, NULL, 1);
}
+
+/**
+ * Set speed
+ */
+void
+subscription_set_speed ( th_subscription_t *s, int speed )
+{
+ streaming_message_t *sm;
+ service_t *t = s->ths_service;
+
+ pthread_mutex_lock(&t->s_stream_mutex);
+
+ sm = streaming_msg_create_code(SMT_SPEED, speed);
+
+ streaming_target_deliver(s->ths_output, sm);
+
+ pthread_mutex_unlock(&t->s_stream_mutex);
+}
+
+/**
+ * Set skip
+ */
+void
+subscription_set_skip ( th_subscription_t *s, const streaming_skip_t *skip )
+{
+ streaming_message_t *sm;
+ service_t *t = s->ths_service;
+
+ pthread_mutex_lock(&t->s_stream_mutex);
+
+ sm = streaming_msg_create(SMT_SKIP);
+ sm->sm_data = malloc(sizeof(streaming_skip_t));
+ memcpy(sm->sm_data, skip, sizeof(streaming_skip_t));
+
+ streaming_target_deliver(s->ths_output, sm);
+
+ pthread_mutex_unlock(&t->s_stream_mutex);
+}
void subscription_change_weight(th_subscription_t *s, int weight);
+void subscription_set_speed
+ (th_subscription_t *s, int32_t speed );
+
+void subscription_set_skip
+ (th_subscription_t *s, const streaming_skip_t *skip);
+
+void subscription_stop(th_subscription_t *s);
+
void subscription_unlink_service(th_subscription_t *s, int reason);
void subscription_dummy_join(const char *id, int first);
--- /dev/null
+/**
+ * TV headend - Timeshift
+ * Copyright (C) 2012 Adam Sutton
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation, either version 3 of the License, or
+ * (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this program. If not, see <http://www.gnu.org/licenses/>.
+ */
+
+#include "tvheadend.h"
+#include "streaming.h"
+#include "timeshift.h"
+#include "timeshift/private.h"
+#include "config2.h"
+#include "settings.h"
+
+#include <sys/types.h>
+#include <sys/stat.h>
+#include <sys/epoll.h>
+#include <unistd.h>
+#include <fcntl.h>
+#include <string.h>
+#include <assert.h>
+#include <stdio.h>
+
+static int timeshift_index = 0;
+
+/*
+ * Intialise global file manager
+ */
+void timeshift_init ( void )
+{
+ timeshift_filemgr_init();
+}
+
+/*
+ * Terminate global file manager
+ */
+void timeshift_term ( void )
+{
+ timeshift_filemgr_term();
+}
+
+/*
+ * Receive data
+ */
+static void timeshift_input
+ ( void *opaque, streaming_message_t *sm )
+{
+ timeshift_t *ts = opaque;
+
+ pthread_mutex_lock(&ts->state_mutex);
+
+ /* Control */
+ if (sm->sm_type == SMT_SKIP) {
+ if (ts->state >= TS_LIVE)
+ timeshift_write_skip(ts->rd_pipe.wr, sm->sm_data);
+ } else if (sm->sm_type == SMT_SPEED) {
+ if (ts->state >= TS_LIVE)
+ timeshift_write_speed(ts->rd_pipe.wr, sm->sm_code);
+ }
+
+ else {
+
+ /* Start */
+ if (sm->sm_type == SMT_START && ts->state == TS_INIT) {
+ ts->state = TS_LIVE;
+ }
+
+ /* Pass-thru */
+ if (ts->state <= TS_LIVE) {
+ streaming_target_deliver2(ts->output, streaming_msg_clone(sm));
+ }
+
+ /* Buffer to disk */
+ if (ts->state >= TS_LIVE) {
+ sm->sm_time = getmonoclock();
+ streaming_target_deliver2(&ts->wr_queue.sq_st, sm);
+ } else
+ streaming_msg_free(sm);
+
+ /* Exit/Stop */
+ if (sm->sm_type == SMT_EXIT ||
+ (sm->sm_type == SMT_STOP && sm->sm_code == 0)) {
+ timeshift_write_exit(ts->rd_pipe.wr);
+ ts->state = TS_EXIT;
+ }
+ }
+
+ pthread_mutex_unlock(&ts->state_mutex);
+}
+
+/**
+ *
+ */
+void
+timeshift_destroy(streaming_target_t *pad)
+{
+ timeshift_t *ts = (timeshift_t*)pad;
+ timeshift_file_t *tsf;
+ streaming_message_t *sm;
+
+ /* Must hold global lock */
+ lock_assert(&global_lock);
+
+ /* Ensure the thread exits */
+ // Note: this is a workaround for the fact the Q might have been flushed
+ // in reader thread (VERY unlikely)
+ sm = streaming_msg_create(SMT_EXIT);
+ streaming_target_deliver2(&ts->wr_queue.sq_st, sm);
+
+ /* Wait for all threads */
+ pthread_join(ts->rd_thread, NULL);
+ pthread_join(ts->wr_thread, NULL);
+ pthread_join(ts->rm_thread, NULL);
+
+ /* Shut stuff down */
+ streaming_queue_deinit(&ts->wr_queue);
+
+ close(ts->rd_pipe.rd);
+ close(ts->rd_pipe.wr);
+
+ /* Flush files */
+ while ((tsf = TAILQ_FIRST(&ts->files)))
+ timeshift_filemgr_remove(ts, tsf, 1);
+
+ free(ts->path);
+ free(ts);
+}
+
+/**
+ * Create timeshift buffer
+ *
+ * max_period of buffer in seconds (0 = unlimited)
+ * max_size of buffer in bytes (0 = unlimited)
+ */
+streaming_target_t *timeshift_create
+ (streaming_target_t *out, time_t max_time)
+{
+ char buf[512];
+ timeshift_t *ts = calloc(1, sizeof(timeshift_t));
+
+ /* Must hold global lock */
+ lock_assert(&global_lock);
+
+ /* Create directories */
+ if (timeshift_filemgr_makedirs(timeshift_index, buf, sizeof(buf)))
+ return NULL;
+
+ /* Setup structure */
+ TAILQ_INIT(&ts->files);
+ ts->output = out;
+ ts->path = strdup(buf);
+ ts->max_time = max_time;
+ ts->state = TS_INIT;
+ ts->full = 0;
+ ts->vididx = -1;
+ ts->id = timeshift_index;
+ pthread_mutex_init(&ts->rdwr_mutex, NULL);
+ pthread_mutex_init(&ts->state_mutex, NULL);
+
+ /* Initialise output */
+ tvh_pipe(O_NONBLOCK, &ts->rd_pipe);
+
+ /* Initialise input */
+ streaming_queue_init(&ts->wr_queue, 0);
+ streaming_target_init(&ts->input, timeshift_input, ts, 0);
+ pthread_create(&ts->wr_thread, NULL, timeshift_writer, ts);
+ pthread_create(&ts->rd_thread, NULL, timeshift_reader, ts);
+
+ /* Update index */
+ timeshift_index++;
+
+ return &ts->input;
+}
--- /dev/null
+/*
+ * TV headend - Timeshift
+ * Copyright (C) 2012 Adam Sutton
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation, either version 3 of the License, or
+ * (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this program. If not, see <http://www.gnu.org/licenses/>.
+ */
+
+#ifndef __TVH_TIMESHIFT_H__
+#define __TVH_TIMESHIFT_H__
+
+void timeshift_init ( void );
+void timeshift_term ( void );
+
+streaming_target_t *timeshift_create
+ (streaming_target_t *out, time_t max_period);
+
+void timeshift_destroy(streaming_target_t *pad);
+
+#endif /* __TVH_TIMESHIFT_H__ */
--- /dev/null
+/*
+ * TV headend - Timeshift
+ * Copyright (C) 2012 Adam Sutton
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation, either version 3 of the License, or
+ * (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this program. If not, see <http://www.gnu.org/licenses/>.
+ */
+
+#ifndef __TVH_TIMESHIFT_PRIVATE_H__
+#define __TVH_TIMESHIFT_PRIVATE_H__
+
+#define TS_PLAY_BUF 100000 // us to buffer in TX
+
+/**
+ * Indexes of import data in the stream
+ */
+typedef struct timeshift_index
+{
+ off_t pos; ///< Position in the file
+ union {
+ int64_t time; ///< Packet time
+ void *data; ///< Associated data
+ };
+ TAILQ_ENTRY(timeshift_index) link; ///< List entry
+} timeshift_index_t;
+
+typedef TAILQ_HEAD(timeshift_index_list,timeshift_index) timeshift_index_list_t;
+
+/**
+ * Timeshift file
+ */
+typedef struct timeshift_file
+{
+ int fd; ///< Write descriptor
+ char *path; ///< Full path to file
+
+ time_t time; ///< Files coarse timestamp
+ size_t size; ///< Current file size;
+ int64_t last; ///< Latest timestamp
+
+ uint8_t bad; ///< File is broken
+
+ int refcount; ///< Reader ref count
+
+ timeshift_index_list_t iframes; ///< I-frame indexing
+ timeshift_index_list_t sstart; ///< Stream start messages
+
+ TAILQ_ENTRY(timeshift_file) link; ///< List entry
+} timeshift_file_t;
+
+typedef TAILQ_HEAD(timeshift_file_list,timeshift_file) timeshift_file_list_t;
+
+/**
+ *
+ */
+typedef struct timeshift {
+ // Note: input MUST BE FIRST in struct
+ streaming_target_t input; ///< Input source
+ streaming_target_t *output; ///< Output dest
+
+ int id; ///< Reference number
+ char *path; ///< Directory containing buffer
+ time_t max_time; ///< Maximum period to shift
+
+ enum {
+ TS_INIT,
+ TS_EXIT,
+ TS_LIVE,
+ TS_PAUSE,
+ TS_PLAY,
+ } state; ///< Play state
+ pthread_mutex_t state_mutex; ///< Protect state changes
+ uint8_t full; ///< Buffer is full
+
+ streaming_queue_t wr_queue; ///< Writer queue
+ pthread_t wr_thread; ///< Writer thread
+
+ pthread_t rd_thread; ///< Reader thread
+ th_pipe_t rd_pipe; ///< Message passing to reader
+
+ pthread_t rm_thread; ///< Reaper thread
+ timeshift_file_list_t rm_list; ///< Remove files
+
+ pthread_mutex_t rdwr_mutex; ///< Buffer protection
+ timeshift_file_list_t files; ///< List of files
+
+ int vididx; ///< Index of (current) video stream
+
+} timeshift_t;
+
+/*
+ * Write functions
+ */
+ssize_t timeshift_write_start ( int fd, int64_t time, streaming_start_t *ss );
+ssize_t timeshift_write_sigstat ( int fd, int64_t time, signal_status_t *ss );
+ssize_t timeshift_write_packet ( int fd, int64_t time, th_pkt_t *pkt );
+ssize_t timeshift_write_mpegts ( int fd, int64_t time, void *data );
+ssize_t timeshift_write_skip ( int fd, streaming_skip_t *skip );
+ssize_t timeshift_write_speed ( int fd, int speed );
+ssize_t timeshift_write_stop ( int fd, int code );
+ssize_t timeshift_write_exit ( int fd );
+ssize_t timeshift_write_eof ( int fd );
+
+void timeshift_writer_flush ( timeshift_t *ts );
+
+/*
+ * Threads
+ */
+void *timeshift_reader ( void *p );
+void *timeshift_writer ( void *p );
+
+/*
+ * File management
+ */
+void timeshift_filemgr_init ( void );
+void timeshift_filemgr_term ( void );
+int timeshift_filemgr_makedirs ( int ts_index, char *buf, size_t len );
+
+timeshift_file_t *timeshift_filemgr_get
+ ( timeshift_t *ts, int create );
+timeshift_file_t *timeshift_filemgr_prev
+ ( timeshift_file_t *ts, int *end, int keep );
+timeshift_file_t *timeshift_filemgr_next
+ ( timeshift_file_t *ts, int *end, int keep );
+void timeshift_filemgr_remove
+ ( timeshift_t *ts, timeshift_file_t *tsf, int force );
+void timeshift_filemgr_close ( timeshift_file_t *tsf );
+
+#endif /* __TVH_TIMESHIFT_PRIVATE_H__ */
--- /dev/null
+/**
+ * TV headend - Timeshift File Manager
+ * Copyright (C) 2012 Adam Sutton
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation, either version 3 of the License, or
+ * (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this program. If not, see <http://www.gnu.org/licenses/>.
+ */
+
+#include <sys/types.h>
+#include <sys/stat.h>
+#include <sys/epoll.h>
+#include <unistd.h>
+#include <fcntl.h>
+#include <string.h>
+#include <assert.h>
+#include <pthread.h>
+
+#include "tvheadend.h"
+#include "streaming.h"
+#include "timeshift.h"
+#include "timeshift/private.h"
+#include "config2.h"
+#include "settings.h"
+
+static int timeshift_reaper_run;
+static timeshift_file_list_t timeshift_reaper_list;
+static pthread_t timeshift_reaper_thread;
+static pthread_mutex_t timeshift_reaper_lock;
+static pthread_cond_t timeshift_reaper_cond;
+
+/* **************************************************************************
+ * File reaper thread
+ * *************************************************************************/
+
+static void* timeshift_reaper_callback ( void *p )
+{
+ char *dpath;
+ timeshift_file_t *tsf;
+ timeshift_index_t *ti;
+ streaming_message_t *sm;
+ pthread_mutex_lock(×hift_reaper_lock);
+ while (timeshift_reaper_run) {
+
+ /* Get next */
+ tsf = TAILQ_FIRST(×hift_reaper_list);
+ if (!tsf) {
+ pthread_cond_wait(×hift_reaper_cond, ×hift_reaper_lock);
+ continue;
+ }
+ TAILQ_REMOVE(×hift_reaper_list, tsf, link);
+ pthread_mutex_unlock(×hift_reaper_lock);
+
+#ifdef TSHFT_TRACE
+ tvhlog(LOG_DEBUG, "timeshift", "remove file %s", tsf->path);
+#endif
+
+ /* Remove */
+ unlink(tsf->path);
+ dpath = dirname(tsf->path);
+ if (rmdir(dpath) == -1)
+ if (errno != ENOTEMPTY)
+ tvhlog(LOG_ERR, "timeshift", "failed to remove %s [e=%s]",
+ dpath, strerror(errno));
+
+ /* Free memory */
+ while ((ti = TAILQ_FIRST(&tsf->iframes))) {
+ TAILQ_REMOVE(&tsf->iframes, ti, link);
+ free(ti);
+ }
+ while ((ti = TAILQ_FIRST(&tsf->sstart))) {
+ TAILQ_REMOVE(&tsf->sstart, ti, link);
+ sm = ti->data;
+ streaming_msg_free(sm);
+ free(ti);
+ }
+ free(tsf->path);
+ free(tsf);
+ }
+ pthread_mutex_unlock(×hift_reaper_lock);
+#ifdef TSHFT_TRACE
+ tvhlog(LOG_DEBUG, "timeshift", "reaper thread exit");
+#endif
+ return NULL;
+}
+
+static void timeshift_reaper_remove ( timeshift_file_t *tsf )
+{
+#ifdef TSHFT_TRACE
+ tvhlog(LOG_DEBUG, "timeshift", "queue file for removal %s", tsf->path);
+#endif
+ pthread_mutex_lock(×hift_reaper_lock);
+ TAILQ_INSERT_TAIL(×hift_reaper_list, tsf, link);
+ pthread_cond_signal(×hift_reaper_cond);
+ pthread_mutex_unlock(×hift_reaper_lock);
+}
+
+/* **************************************************************************
+ * File Handling
+ * *************************************************************************/
+
+/*
+ * Get root directory
+ */
+static void timeshift_filemgr_get_root ( char *buf, size_t len )
+{
+ const char *path = config_get_timeshift_path();
+ if (!path || !*path)
+ path = hts_settings_get_root();
+ snprintf(buf, len, "%s/timeshift", path);
+}
+
+/*
+ * Create timeshift directories (for a given instance)
+ */
+int timeshift_filemgr_makedirs ( int index, char *buf, size_t len )
+{
+ timeshift_filemgr_get_root(buf, len);
+ snprintf(buf+strlen(buf), len-strlen(buf), "/%d", index);
+ return makedirs(buf, 0700);
+}
+
+/*
+ * Close file
+ */
+void timeshift_filemgr_close ( timeshift_file_t *tsf )
+{
+ ssize_t r = timeshift_write_eof(tsf->fd);
+ if (r > 0)
+ tsf->size += r;
+ close(tsf->fd);
+ tsf->fd = -1;
+}
+
+/*
+ * Remove file
+ */
+void timeshift_filemgr_remove
+ ( timeshift_t *ts, timeshift_file_t *tsf, int force )
+{
+ if (tsf->fd != -1)
+ close(tsf->fd);
+ TAILQ_REMOVE(&ts->files, tsf, link);
+ timeshift_reaper_remove(tsf);
+}
+
+/*
+ * Get current / new file
+ */
+timeshift_file_t *timeshift_filemgr_get ( timeshift_t *ts, int create )
+{
+ int fd;
+ struct timespec tp;
+ timeshift_file_t *tsf_tl, *tsf_hd, *tsf_tmp;
+ timeshift_index_t *ti;
+ char path[512];
+
+ /* Return last file */
+ if (!create)
+ return TAILQ_LAST(&ts->files, timeshift_file_list);
+
+ /* No space */
+ if (ts->full)
+ return NULL;
+
+ /* Store to file */
+ clock_gettime(CLOCK_MONOTONIC_COARSE, &tp);
+ tsf_tl = TAILQ_LAST(&ts->files, timeshift_file_list);
+ if (!tsf_tl || tsf_tl->time != tp.tv_sec) {
+ tsf_hd = TAILQ_FIRST(&ts->files);
+
+ /* Close existing */
+ if (tsf_tl && tsf_tl->fd != -1)
+ timeshift_filemgr_close(tsf_tl);
+
+ /* Check period */
+ if (ts->max_time && tsf_hd && tsf_tl) {
+ time_t d = tsf_tl->time - tsf_hd->time;
+ if (d > (ts->max_time+5)) {
+ if (!tsf_hd->refcount) {
+ timeshift_filemgr_remove(ts, tsf_hd, 0);
+ } else {
+#ifdef TSHFT_TRACE
+ tvhlog(LOG_DEBUG, "timeshift", "ts %d buffer full", ts->id);
+#endif
+ ts->full = 1;
+ }
+ }
+ }
+
+ /* Check size */
+ // TODO: need to implement this
+
+ /* Create new file */
+ tsf_tmp = NULL;
+ if (!ts->full) {
+ snprintf(path, sizeof(path), "%s/tvh-%"PRItime_t, ts->path, tp.tv_sec);
+#ifdef TSHFT_TRACE
+ tvhlog(LOG_DEBUG, "timeshift", "ts %d create file %s", ts->id, path);
+#endif
+ if ((fd = open(path, O_WRONLY | O_CREAT, 0600)) > 0) {
+ tsf_tmp = calloc(1, sizeof(timeshift_file_t));
+ tsf_tmp->time = tp.tv_sec;
+ tsf_tmp->fd = fd;
+ tsf_tmp->path = strdup(path);
+ tsf_tmp->refcount = 0;
+ TAILQ_INIT(&tsf_tmp->iframes);
+ TAILQ_INIT(&tsf_tmp->sstart);
+ TAILQ_INSERT_TAIL(&ts->files, tsf_tmp, link);
+
+ /* Copy across last start message */
+ if (tsf_tl && (ti = TAILQ_LAST(&tsf_tl->sstart, timeshift_index_list))) {
+#ifdef TSHFT_TRACE
+ tvhlog(LOG_DEBUG, "timeshift", "ts %d copy smt_start to new file",
+ ts->id);
+#endif
+ timeshift_index_t *ti2 = calloc(1, sizeof(timeshift_index_t));
+ ti2->pos = ti->pos;
+ ti2->data = streaming_msg_clone(ti->data);
+ TAILQ_INSERT_TAIL(&tsf_tmp->sstart, ti2, link);
+ }
+ }
+ }
+ tsf_tl = tsf_tmp;
+ }
+
+ return tsf_tl;
+}
+
+timeshift_file_t *timeshift_filemgr_next
+ ( timeshift_file_t *tsf, int *end, int keep )
+{
+ timeshift_file_t *nxt = TAILQ_NEXT(tsf, link);
+ if (!nxt && end) *end = 1;
+ if (!nxt && keep) return tsf;
+ tsf->refcount--;
+ if (nxt)
+ nxt->refcount++;
+ return nxt;
+}
+
+timeshift_file_t *timeshift_filemgr_prev
+ ( timeshift_file_t *tsf, int *end, int keep )
+{
+ timeshift_file_t *nxt = TAILQ_PREV(tsf, timeshift_file_list, link);
+ if (!nxt && end) *end = 1;
+ if (!nxt && keep) return tsf;
+ tsf->refcount--;
+ if (nxt)
+ nxt->refcount++;
+ return nxt;
+}
+
+/* **************************************************************************
+ * Setup / Teardown
+ * *************************************************************************/
+
+/*
+ * Initialise global structures
+ */
+void timeshift_filemgr_init ( void )
+{
+ char path[512];
+
+ /* Try to remove any rubbish left from last run */
+ timeshift_filemgr_get_root(path, sizeof(path));
+ rmtree(path);
+
+ /* Start the reaper thread */
+ timeshift_reaper_run = 1;
+ pthread_mutex_init(×hift_reaper_lock, NULL);
+ pthread_cond_init(×hift_reaper_cond, NULL);
+ TAILQ_INIT(×hift_reaper_list);
+ pthread_create(×hift_reaper_thread, NULL,
+ timeshift_reaper_callback, NULL);
+}
+
+/*
+ * Terminate
+ */
+void timeshift_filemgr_term ( void )
+{
+ char path[512];
+
+ /* Wait for thread */
+ pthread_mutex_lock(×hift_reaper_lock);
+ timeshift_reaper_run = 0;
+ pthread_cond_signal(×hift_reaper_cond);
+ pthread_mutex_unlock(×hift_reaper_lock);
+ pthread_join(timeshift_reaper_thread, NULL);
+
+ /* Remove the lot */
+ timeshift_filemgr_get_root(path, sizeof(path));
+ rmtree(path);
+}
+
+
--- /dev/null
+/**
+ * TV headend - Timeshift Reader
+ * Copyright (C) 2012 Adam Sutton
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation, either version 3 of the License, or
+ * (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this program. If not, see <http://www.gnu.org/licenses/>.
+ */
+
+#include "tvheadend.h"
+#include "streaming.h"
+#include "timeshift.h"
+#include "timeshift/private.h"
+
+#include <sys/types.h>
+#include <sys/stat.h>
+#include <sys/epoll.h>
+#include <unistd.h>
+#include <fcntl.h>
+#include <string.h>
+#include <assert.h>
+
+/* **************************************************************************
+ * File Reading
+ * *************************************************************************/
+
+static ssize_t _read_pktbuf ( int fd, pktbuf_t **pktbuf )
+{
+ ssize_t r, cnt = 0;
+ size_t sz;
+
+ /* Size */
+ r = read(fd, &sz, sizeof(sz));
+ if (r < 0) return -1;
+ if (r != sizeof(sz)) return 0;
+ cnt += r;
+
+ /* Empty */
+ if (!sz) {
+ *pktbuf = NULL;
+ return cnt;
+ }
+
+ /* Data */
+ *pktbuf = pktbuf_alloc(NULL, sz);
+ r = read(fd, (*pktbuf)->pb_data, sz);
+ if (r != sz) {
+ free((*pktbuf)->pb_data);
+ free(*pktbuf);
+ return r < 0 ? -1 : 0;
+ }
+ cnt += r;
+
+ return cnt;
+}
+
+
+static ssize_t _read_msg ( int fd, streaming_message_t **sm )
+{
+ ssize_t r, cnt = 0;
+ size_t sz;
+ streaming_message_type_t type;
+ int64_t time;
+ void *data;
+ int code;
+
+ /* Clear */
+ *sm = NULL;
+
+ /* Size */
+ r = read(fd, &sz, sizeof(sz));
+ if (r < 0) return -1;
+ if (r != sizeof(sz)) return 0;
+ cnt += r;
+
+ /* EOF */
+ if (sz == 0) return cnt;
+
+ /* Type */
+ r = read(fd, &type, sizeof(type));
+ if (r < 0) return -1;
+ if (r != sizeof(type)) return 0;
+ cnt += r;
+
+ /* Time */
+ r = read(fd, &time, sizeof(time));
+ if (r < 0) return -1;
+ if (r != sizeof(time)) return 0;
+ cnt += r;
+
+ /* Adjust size */
+ sz -= sizeof(type) + sizeof(time);
+ cnt += sz;
+
+ /* Standard messages */
+ switch (type) {
+
+ /* Unhandled */
+ case SMT_START:
+ case SMT_NOSTART:
+ case SMT_SERVICE_STATUS:
+ break;
+
+ /* Code */
+ case SMT_STOP:
+ case SMT_EXIT:
+ case SMT_SPEED:
+ if (sz != sizeof(code)) return -1;
+ r = read(fd, &code, sz);
+ if (r != sz) {
+ if (r < 0) return -1;
+ return 0;
+ }
+ *sm = streaming_msg_create_code(type, code);
+ break;
+
+ /* Data */
+ case SMT_SKIP:
+ case SMT_SIGNAL_STATUS:
+ case SMT_MPEGTS:
+ case SMT_PACKET:
+ data = malloc(sz);
+ r = read(fd, data, sz);
+ if (r != sz) {
+ free(data);
+ if (r < 0) return -1;
+ return 0;
+ }
+ if (type == SMT_PACKET) {
+ th_pkt_t *pkt = data;
+ pkt->pkt_payload = pkt->pkt_header = NULL;
+ *sm = streaming_msg_create_pkt(pkt);
+ r = _read_pktbuf(fd, &pkt->pkt_header);
+ if (r < 0) {
+ streaming_msg_free(*sm);
+ return r;
+ }
+ cnt += r;
+ r = _read_pktbuf(fd, &pkt->pkt_payload);
+ if (r < 0) {
+ streaming_msg_free(*sm);
+ return r;
+ }
+ cnt += r;
+ } else {
+ *sm = streaming_msg_create_data(type, data);
+ }
+ (*sm)->sm_time = time;
+ break;
+ }
+
+ /* OK */
+ return cnt;
+}
+
+/* **************************************************************************
+ * Thread
+ * *************************************************************************/
+
+/*
+ * Timeshift thread
+ */
+void *timeshift_reader ( void *p )
+{
+ timeshift_t *ts = p;
+ int efd, nfds, end, fd = -1, run = 1, wait = -1;
+ off_t cur_off = 0;
+ int cur_speed = 100, keyframe_mode = 0;
+ int64_t pause_time = 0, play_time = 0, last_time = 0, tx_time = 0;
+ int64_t now, deliver;
+ streaming_message_t *sm = NULL, *ctrl;
+ timeshift_file_t *cur_file = NULL, *tsi_file = NULL;
+ timeshift_index_t *tsi = NULL;
+
+ /* Poll */
+ struct epoll_event ev;
+ efd = epoll_create(1);
+ ev.events = EPOLLIN;
+ ev.data.fd = ts->rd_pipe.rd;
+ epoll_ctl(efd, EPOLL_CTL_ADD, ev.data.fd, &ev);
+
+ /* Output */
+ while (run) {
+
+ /* Wait for data */
+ if(wait)
+ nfds = epoll_wait(efd, &ev, 1, wait);
+ else
+ nfds = 0;
+ wait = -1;
+ end = 0;
+
+ /* Control */
+ pthread_mutex_lock(&ts->state_mutex);
+ if (nfds == 1) {
+ if (_read_msg(ev.data.fd, &ctrl) > 0) {
+
+ /* Exit */
+ if (ctrl->sm_type == SMT_EXIT) {
+#ifdef TSHFT_TRACE
+ tvhlog(LOG_DEBUG, "timeshift", "ts %d read exit request", ts->id);
+#endif
+ run = 0;
+ streaming_msg_free(ctrl);
+
+ /* Speed */
+ // TODO: currently just pause
+ } else if (ctrl->sm_type == SMT_SPEED) {
+ int speed = ctrl->sm_code;
+ int keyframe;
+
+ /* Bound it */
+ if (speed > 3200) speed = 3200;
+ if (speed < -3200) speed = -3200;
+
+ /* Process */
+ if (cur_speed != speed) {
+
+ /* Live playback */
+ if (ts->state == TS_LIVE) {
+
+ /* Reject */
+ if (speed >= 100) {
+ tvhlog(LOG_DEBUG, "timeshift", "ts %d reject 1x+ in live mode",
+ ts->id);
+ speed = 100;
+
+ /* Set position */
+ } else {
+ tvhlog(LOG_DEBUG, "timeshift", "ts %d enter timeshift mode",
+ ts->id);
+ timeshift_writer_flush(ts);
+ pthread_mutex_lock(&ts->rdwr_mutex);
+ if ((cur_file = timeshift_filemgr_get(ts, 0))) {
+ cur_off = cur_file->size;
+ pause_time = cur_file->last;
+ last_time = pause_time;
+ }
+ pthread_mutex_unlock(&ts->rdwr_mutex);
+ }
+
+ /* Buffer playback */
+ } else if (ts->state == TS_PLAY) {
+ pause_time = last_time;
+
+ /* Paused */
+ } else {
+ }
+
+ /* Check keyframe mode */
+ keyframe = (speed < 0) || (speed > 400);
+ if (keyframe != keyframe_mode) {
+ tvhlog(LOG_DEBUG, "timeshift", "using keyframe mode? %s",
+ keyframe ? "yes" : "no");
+ keyframe_mode = keyframe;
+ if (keyframe) {
+ tsi = NULL;
+ tsi_file = cur_file;
+ }
+ }
+
+ /* Update */
+ play_time = getmonoclock();
+ cur_speed = speed;
+ if (speed != 100 || ts->state != TS_LIVE)
+ ts->state = speed == 0 ? TS_PAUSE : TS_PLAY;
+ tvhlog(LOG_DEBUG, "timeshift", "ts %d change speed %d",
+ ts->id, speed);
+ }
+
+ /* Send on the message */
+ ctrl->sm_code = speed;
+ streaming_target_deliver2(ts->output, ctrl);
+
+ /* Skip */
+ } else {
+ streaming_msg_free(ctrl);
+ }
+
+ ctrl = NULL;
+ }
+ }
+
+ /* Done */
+ if (!run || ts->state != TS_PLAY || !cur_file) {
+ pthread_mutex_unlock(&ts->state_mutex);
+ continue;
+ }
+
+ /* Calculate delivery time */
+ now = getmonoclock();
+ deliver = (now - play_time) + TS_PLAY_BUF;
+ deliver = (deliver * cur_speed) / 100;
+ deliver = (deliver + pause_time);
+
+ /* Rewind or Fast forward (i-frame only) */
+ if (keyframe_mode) {
+ wait = 0;
+
+ /* Find next index */
+ if (cur_speed < 0) {
+ if (!tsi) {
+ TAILQ_FOREACH_REVERSE(tsi, &tsi_file->iframes,
+ timeshift_index_list, link) {
+ if (tsi->time < last_time) break;
+ }
+ }
+ } else {
+ if (!tsi) {
+ TAILQ_FOREACH(tsi, &tsi_file->iframes, link) {
+ if (tsi->time > last_time) break;
+ }
+ }
+ }
+
+ /* Next file */
+ if (!tsi) {
+ if (fd != -1)
+ close(fd);
+ wait = 0; // immediately cycle around
+ fd = -1;
+ if (cur_speed < 0)
+ tsi_file = timeshift_filemgr_prev(tsi_file, &end, 1);
+ else
+ tsi_file = timeshift_filemgr_next(tsi_file, &end, 0);
+ }
+
+ /* Deliver */
+ if (tsi && (((cur_speed < 0) && (tsi->time >= deliver)) ||
+ ((cur_speed > 0) && (tsi->time <= deliver)))) {
+
+ /* Keep delivery to 5fps max */
+ if ((now - tx_time) >= 200000) {
+
+ /* Open */
+ if (fd == -1) {
+#ifdef TSHFT_TRACE
+ tvhlog(LOG_DEBUG, "timeshift", "ts %d open file %s",
+ ts->id, tsi_file->path);
+#endif
+ fd = open(tsi_file->path, O_RDONLY);
+ }
+
+ /* Read */
+ off_t ret = lseek(fd, tsi->pos, SEEK_SET);
+ assert(ret == tsi->pos);
+ ssize_t r = _read_msg(fd, &sm);
+
+ /* Send */
+ if (r > 0) {
+#ifdef TSHFT_TRACE
+ tvhlog(LOG_DEBUG, "timeshift", "ts %d deliver %"PRItime_t,
+ ts->id, sm->sm_time);
+#endif
+ sm->sm_timeshift = now - sm->sm_time;
+ streaming_target_deliver2(ts->output, sm);
+ cur_file = tsi_file;
+ cur_off = tsi->pos + r;
+ last_time = sm->sm_time;
+ tx_time = now;
+ sm = NULL;
+ } else {
+ wait = -1;
+ close(fd);
+ fd = -1;
+ }
+ }
+
+ /* Next index */
+ if (cur_speed < 0)
+ tsi = TAILQ_PREV(tsi, timeshift_index_list, link);
+ else
+ tsi = TAILQ_NEXT(tsi, link);
+
+ /* Not yet! */
+ } else if (tsi) {
+ if (cur_speed > 0)
+ wait = (tsi->time - deliver) / 1000;
+ else
+ wait = (deliver - tsi->time) / 1000;
+ if (wait == 0) wait = 1;
+ }
+
+ /* Full frame delivery */
+ } else {
+
+ /* Open file */
+ if (fd == -1) {
+#ifdef TSHFT_TRACE
+ tvhlog(LOG_DEBUG, "timeshift", "ts %d open file %s",
+ ts->id, cur_file->path);
+#endif
+ fd = open(cur_file->path, O_RDONLY);
+ if (cur_off) lseek(fd, cur_off, SEEK_SET);
+ }
+
+ /* Process */
+ pthread_mutex_lock(&ts->rdwr_mutex);
+ end = 1;
+ while (cur_file && cur_off < cur_file->size) {
+
+ /* Read msg */
+ if (!sm) {
+ ssize_t r = _read_msg(fd, &sm);
+ assert(r != -1);
+
+ /* Incomplete */
+ if (r == 0) {
+ lseek(fd, cur_off, SEEK_SET);
+ break;
+ }
+
+ cur_off += r;
+
+ /* Special case - EOF */
+ if (r == sizeof(size_t) || cur_off > cur_file->size) {
+ close(fd);
+ wait = 0; // immediately cycle around
+ cur_off = 0; // reset
+ fd = -1;
+ cur_file = timeshift_filemgr_next(cur_file, NULL, 0);
+ break;
+ }
+ }
+
+ assert(sm);
+ end = 0;
+
+ /* Deliver */
+ if (sm->sm_time <= deliver) {
+#ifdef TSHFT_TRACE
+ tvhlog(LOG_DEBUG, "timeshift", "ts %d deliver %"PRItime_t,
+ ts->id, sm->sm_time);
+#endif
+ sm->sm_timeshift = now - sm->sm_time;
+ streaming_target_deliver2(ts->output, sm);
+ tx_time = now;
+ last_time = sm->sm_time;
+ sm = NULL;
+ wait = 0;
+ } else {
+ wait = (sm->sm_time - deliver) / 1000;
+ if (wait == 0) wait = 1;
+ break;
+ }
+ }
+ }
+
+ /* Terminate */
+ if (!cur_file || end) {
+
+ /* Back to live */
+ if (cur_speed > 0) {
+ tvhlog(LOG_DEBUG, "timeshift", "ts %d eob revert to live mode", ts->id);
+ ts->state = TS_LIVE;
+ cur_speed = 100;
+ ctrl = streaming_msg_create_code(SMT_SPEED, cur_speed);
+ streaming_target_deliver2(ts->output, ctrl);
+
+ /* Pause */
+ } else if (cur_speed < 0) {
+ tvhlog(LOG_DEBUG, "timeshift", "ts %d sob pause stream", ts->id);
+ cur_speed = 0;
+ ts->state = TS_PAUSE;
+ ctrl = streaming_msg_create_code(SMT_SPEED, cur_speed);
+ streaming_target_deliver2(ts->output, ctrl);
+ }
+ }
+
+ pthread_mutex_unlock(&ts->rdwr_mutex);
+ pthread_mutex_unlock(&ts->state_mutex);
+ }
+
+ /* Cleanup */
+ if (sm) streaming_msg_free(sm);
+#ifdef TSHFT_TRACE
+ tvhlog(LOG_DEBUG, "timeshift", "ts %d exit reader thread", ts->id);
+#endif
+
+ return NULL;
+}
--- /dev/null
+/**
+ * TV headend - Timeshift Write Handler
+ * Copyright (C) 2012 Adam Sutton
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation, either version 3 of the License, or
+ * (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this program. If not, see <http://www.gnu.org/licenses/>.
+ */
+
+#include "tvheadend.h"
+#include "streaming.h"
+#include "timeshift.h"
+#include "timeshift/private.h"
+
+#include <sys/types.h>
+#include <sys/stat.h>
+#include <sys/epoll.h>
+#include <unistd.h>
+#include <fcntl.h>
+#include <string.h>
+#include <assert.h>
+
+/* **************************************************************************
+ * File Writing
+ * *************************************************************************/
+
+/*
+ * Write data (retry on EAGAIN)
+ */
+static ssize_t _write
+ ( int fd, const void *buf, size_t count )
+{
+ ssize_t r;
+ size_t n = 0;
+ while ( n < count ) {
+ r = write(fd, buf+n, count-n);
+ if (r == -1) {
+ if (errno == EAGAIN)
+ continue;
+ else
+ return -1;
+ }
+ n += r;
+ }
+ return count == n ? n : -1;
+}
+
+/*
+ * Write message
+ */
+static ssize_t _write_msg
+ ( int fd, streaming_message_type_t type, int64_t time,
+ const void *buf, size_t len )
+{
+ size_t len2 = len + sizeof(type) + sizeof(time);
+ ssize_t err, ret;
+ ret = err = _write(fd, &len2, sizeof(len2));
+ if (err < 0) return err;
+ err = _write(fd, &type, sizeof(type));
+ if (err < 0) return err;
+ ret += err;
+ err = _write(fd, &time, sizeof(time));
+ if (err < 0) return err;
+ ret += err;
+ if (len) {
+ err = _write(fd, buf, len);
+ if (err < 0) return err;
+ ret += err;
+ }
+ return ret;
+}
+
+/*
+ * Write packet buffer
+ */
+static int _write_pktbuf ( int fd, pktbuf_t *pktbuf )
+{
+ ssize_t ret, err;
+ if (pktbuf) {
+ ret = err = _write(fd, &pktbuf->pb_size, sizeof(pktbuf->pb_size));
+ if (err < 0) return err;
+ err = _write(fd, pktbuf->pb_data, pktbuf->pb_size);
+ if (err < 0) return err;
+ ret += err;
+ } else {
+ size_t sz = 0;
+ ret = _write(fd, &sz, sizeof(sz));
+ }
+ return ret;
+}
+
+/*
+ * Write signal status
+ */
+ssize_t timeshift_write_sigstat
+ ( int fd, int64_t time, signal_status_t *sigstat )
+{
+ return _write_msg(fd, SMT_SIGNAL_STATUS, time, sigstat,
+ sizeof(signal_status_t));
+}
+
+/*
+ * Write packet
+ */
+ssize_t timeshift_write_packet ( int fd, int64_t time, th_pkt_t *pkt )
+{
+ ssize_t ret = 0, err;
+ ret = err = _write_msg(fd, SMT_PACKET, time, pkt, sizeof(th_pkt_t));
+ if (err <= 0) return err;
+ err = _write_pktbuf(fd, pkt->pkt_header);
+ if (err <= 0) return err;
+ ret += err;
+ err = _write_pktbuf(fd, pkt->pkt_payload);
+ if (err <= 0) return err;
+ ret += err;
+ return ret;
+}
+
+/*
+ * Write MPEGTS data
+ */
+ssize_t timeshift_write_mpegts ( int fd, int64_t time, void *data )
+{
+ return _write_msg(fd, SMT_MPEGTS, time, data, 188);
+}
+
+/*
+ * Write skip message
+ */
+ssize_t timeshift_write_skip ( int fd, streaming_skip_t *skip )
+{
+ return _write_msg(fd, SMT_SKIP, 0, skip, sizeof(streaming_skip_t));
+}
+
+/*
+ * Write speed message
+ */
+ssize_t timeshift_write_speed ( int fd, int speed )
+{
+ return _write_msg(fd, SMT_SPEED, 0, &speed, sizeof(speed));
+}
+
+/*
+ * Stop
+ */
+ssize_t timeshift_write_stop ( int fd, int code )
+{
+ return _write_msg(fd, SMT_STOP, 0, &code, sizeof(code));
+}
+
+/*
+ * Exit
+ */
+ssize_t timeshift_write_exit ( int fd )
+{
+ int code = 0;
+ return _write_msg(fd, SMT_EXIT, 0, &code, sizeof(code));
+}
+
+/*
+ * Write end of file (special internal message)
+ */
+ssize_t timeshift_write_eof ( int fd )
+{
+ size_t sz = 0;
+ return _write(fd, &sz, sizeof(sz));
+}
+
+/* **************************************************************************
+ * Thread
+ * *************************************************************************/
+
+static inline ssize_t _process_msg0
+ ( timeshift_t *ts, timeshift_file_t *tsf, streaming_message_t **smp )
+{
+ int i;
+ ssize_t err;
+ streaming_start_t *ss;
+ streaming_message_t *sm = *smp;
+ if (sm->sm_type == SMT_START) {
+ err = 0;
+ timeshift_index_t *ti = calloc(1, sizeof(timeshift_index_t));
+ ti->pos = tsf->size;
+ ti->data = sm;
+ *smp = NULL;
+ TAILQ_INSERT_TAIL(&tsf->sstart, ti, link);
+
+ /* Update video index */
+ ss = sm->sm_data;
+ for (i = 0; i < ss->ss_num_components; i++)
+ if (SCT_ISVIDEO(ss->ss_components[i].ssc_type))
+ ts->vididx = ss->ss_components[i].ssc_index;
+ } else if (sm->sm_type == SMT_SIGNAL_STATUS)
+ err = timeshift_write_sigstat(tsf->fd, sm->sm_time, sm->sm_data);
+ else if (sm->sm_type == SMT_PACKET) {
+ err = timeshift_write_packet(tsf->fd, sm->sm_time, sm->sm_data);
+ if (err > 0) {
+ th_pkt_t *pkt = sm->sm_data;
+
+ /* Index video iframes */
+ if (pkt->pkt_componentindex == ts->vididx &&
+ pkt->pkt_frametype == PKT_I_FRAME) {
+ timeshift_index_t *ti = calloc(1, sizeof(timeshift_index_t));
+ ti->pos = tsf->size;
+ ti->time = sm->sm_time;
+ TAILQ_INSERT_TAIL(&tsf->iframes, ti, link);
+ }
+ }
+ } else if (sm->sm_type == SMT_MPEGTS)
+ err = timeshift_write_mpegts(tsf->fd, sm->sm_time, sm->sm_data);
+ else
+ err = 0;
+
+ /* OK */
+ if (err > 0) {
+ tsf->last = sm->sm_time;
+ tsf->size += err;
+ }
+ return err;
+}
+
+static void _process_msg
+ ( timeshift_t *ts, streaming_message_t *sm, int *run )
+{
+ int err;
+ timeshift_file_t *tsf;
+
+ /* Process */
+ switch (sm->sm_type) {
+
+ /* Terminate */
+ case SMT_EXIT:
+ if (run) *run = 0;
+ break;
+ case SMT_STOP:
+ if (sm->sm_code == 0 && run)
+ *run = 0;
+ break;
+
+ /* Timeshifting */
+ case SMT_SKIP:
+ case SMT_SPEED:
+ break;
+
+ /* Status */
+ case SMT_NOSTART:
+ case SMT_SERVICE_STATUS:
+ break;
+
+ /* Store */
+ case SMT_SIGNAL_STATUS:
+ case SMT_START:
+ case SMT_MPEGTS:
+ case SMT_PACKET:
+ pthread_mutex_lock(&ts->rdwr_mutex);
+ if ((tsf = timeshift_filemgr_get(ts, 1)) && (tsf->fd != -1)) {
+ if ((err = _process_msg0(ts, tsf, &sm)) < 0) {
+ timeshift_filemgr_close(tsf);
+ tsf->bad = 1;
+ ts->full = 1; ///< Stop any more writing
+ }
+ }
+ pthread_mutex_unlock(&ts->rdwr_mutex);
+ break;
+ }
+
+ /* Next */
+ if (sm)
+ streaming_msg_free(sm);
+}
+
+void *timeshift_writer ( void *aux )
+{
+ int run = 1;
+ timeshift_t *ts = aux;
+ streaming_queue_t *sq = &ts->wr_queue;
+ streaming_message_t *sm;
+
+ pthread_mutex_lock(&sq->sq_mutex);
+
+ while (run) {
+
+ /* Get message */
+ sm = TAILQ_FIRST(&sq->sq_queue);
+ if (sm == NULL) {
+ pthread_cond_wait(&sq->sq_cond, &sq->sq_mutex);
+ continue;
+ }
+ TAILQ_REMOVE(&sq->sq_queue, sm, sm_link);
+ pthread_mutex_unlock(&sq->sq_mutex);
+
+ _process_msg(ts, sm, &run);
+
+ pthread_mutex_lock(&sq->sq_mutex);
+ }
+
+ pthread_mutex_unlock(&sq->sq_mutex);
+ return NULL;
+}
+
+/* **************************************************************************
+ * Utilities
+ * *************************************************************************/
+
+void timeshift_writer_flush ( timeshift_t *ts )
+
+{
+ streaming_message_t *sm;
+ streaming_queue_t *sq = &ts->wr_queue;
+
+ pthread_mutex_lock(&sq->sq_mutex);
+ while ((sm = TAILQ_FIRST(&sq->sq_queue))) {
+ TAILQ_REMOVE(&sq->sq_queue, sm, sm_link);
+ _process_msg(ts, sm, NULL);
+ }
+ pthread_mutex_unlock(&sq->sq_mutex);
+}
+
int unc; /* uncorrected blocks */
} signal_status_t;
+/**
+ * Streaming skip
+ */
+typedef struct streaming_skip
+{
+ enum {
+ SMT_SKIP_REL_TIME,
+ SMT_SKIP_ABS_TIME,
+ SMT_SKIP_REL_SIZE,
+ SMT_SKIP_ABS_SIZE
+ } type;
+ union {
+ off_t size;
+ time_t time;
+ };
+} streaming_skip_t;
+
+
/**
* A streaming pad generates data.
* It has one or more streaming targets attached to it.
* Streaming messages types
*/
typedef enum {
+
/**
* Packet with data.
*
* Internal message to exit receiver
*/
SMT_EXIT,
+
+ /**
+ * Set stream speed
+ */
+ SMT_SPEED,
+
+ /**
+ * Skip the stream
+ */
+ SMT_SKIP,
+
} streaming_message_type_t;
#define SMT_TO_MASK(x) (1 << ((unsigned int)x))
typedef struct streaming_message {
TAILQ_ENTRY(streaming_message) sm_link;
streaming_message_type_t sm_type;
+#if ENABLE_TIMESHIFT
+ int64_t sm_time;
+ uint64_t sm_timeshift;
+#endif
union {
void *sm_data;
int sm_code;
if (tvheadend.capabilities.indexOf('imagecache') == -1)
imagecachePanel.hide();
+ /* ****************************************************************
+ * Timeshift
+ * ***************************************************************/
+
+ var timeshiftPath = new Ext.form.TextField({
+ fieldLabel : 'Temp. storage path',
+ name : 'timeshiftpath',
+ allowBlank : true,
+ width : 400
+ });
+
+ var timeshiftPeriod = new Ext.form.NumberField({
+ fieldLabel : 'Max period (minutes, per stream)',
+ name : 'timeshiftperiod',
+ allowBlank : false,
+ width : 400
+ });
+
+ var timeshiftPeriodU = new Ext.form.Checkbox({
+ fieldLabel : '(unlimited)',
+ name : 'timeshiftperiod_unlimited',
+ allowBlank : false,
+ width : 400
+ });
+ timeshiftPeriodU.on('check', function(e, c) {
+ timeshiftPeriod.setDisabled(c);
+ });
+
+ var timeshiftSize = new Ext.form.NumberField({
+ fieldLabel : 'Max size (MB, global)',
+ name : 'timeshiftsize',
+ allowBlank : false,
+ width : 400
+ });
+
+ var timeshiftFields = new Ext.form.FieldSet({
+ title : 'Timeshift',
+ width : 700,
+ autoHeight : true,
+ collapsible : true,
+ items : [ timeshiftPath, timeshiftPeriod, timeshiftPeriodU ]//, timeshiftSize ]
+ });
+
/* ****************************************************************
* Form
* ***************************************************************/
border : false,
bodyStyle : 'padding:15px',
labelAlign : 'left',
- labelWidth : 150,
+ labelWidth : 200,
waitMsgTarget : true,
reader : confreader,
layout : 'form',
op : 'loadSettings'
},
success : function(form, action) {
+ v = parseInt(timeshiftPeriod.getValue());
+ if (v == 4294967295) {
+ timeshiftPeriodU.setValue(true);
+ timeshiftPeriod.setValue("");
+ timeshiftPeriod.setDisabled(true); // TODO: this isn't working
+ } else {
+ timeshiftPeriod.setValue(v / 60);
+ }
confpanel.enable();
}
});
}
break;
+ case SMT_SKIP:
+ case SMT_SPEED:
case SMT_SIGNAL_STATUS:
break;