streaming_target_t *st = &hs->hs_input;
- if(normts)
- st = hs->hs_tsfix = tsfix_create(st);
#if ENABLE_TIMESHIFT
if (timeshiftPeriod != 0) {
if (timeshiftPeriod == ~0)
else
tvhlog(LOG_DEBUG, "htsp", "using timeshift buffer (%u mins)", timeshiftPeriod / 60);
st = hs->hs_tshift = timeshift_create(st, timeshiftPeriod);
+ normts = 1;
}
#endif
+ if(normts)
+ st = hs->hs_tsfix = tsfix_create(st);
hs->hs_s = subscription_create_from_channel(ch, weight,
htsp->htsp_logname,
{ "subscribe", htsp_method_subscribe, ACCESS_STREAMING},
{ "unsubscribe", htsp_method_unsubscribe, ACCESS_STREAMING},
{ "subscriptionChangeWeight", htsp_method_change_weight, ACCESS_STREAMING},
+ { "subscriptionSeek", htsp_method_skip, ACCESS_STREAMING},
{ "subscriptionSkip", htsp_method_skip, ACCESS_STREAMING},
{ "subscriptionSpeed", htsp_method_speed, ACCESS_STREAMING},
{ "fileOpen", htsp_method_file_open, ACCESS_RECORDER},
htsp_send(hs->hs_htsp, m, NULL, &hs->hs_q, 0);
}
+/**
+ *
+ */
+static void
+htsp_subscription_skip(htsp_subscription_t *hs, streaming_skip_t *skip)
+{
+ htsmsg_t *m = htsmsg_create_map();
+ htsmsg_add_str(m, "method", "subscriptionSkip");
+ htsmsg_add_u32(m, "subscriptionId", hs->hs_sid);
+ if (skip->type == SMT_SKIP_ABS_TIME || skip->type == SMT_SKIP_ABS_SIZE)
+ htsmsg_add_u32(m, "absolute", 1);
+ if (skip->type == SMT_SKIP_ERROR)
+ htsmsg_add_u32(m, "error", 1);
+ else if (skip->type == SMT_SKIP_ABS_TIME || skip->type == SMT_SKIP_REL_TIME)
+ htsmsg_add_s64(m, "time", skip->time);
+ else if (skip->type == SMT_SKIP_ABS_SIZE || skip->type == SMT_SKIP_REL_SIZE)
+ htsmsg_add_s64(m, "size", skip->size);
+ htsp_send(hs->hs_htsp, m, NULL, &hs->hs_q, 0);
+}
+
/**
*
*/
abort();
case SMT_SKIP:
+ htsp_subscription_skip(hs, sm->sm_data);
break;
case SMT_SPEED:
case SMT_START:
case SMT_NOSTART:
case SMT_SERVICE_STATUS:
+ return -1;
break;
/* Code */
}
(*sm)->sm_time = time;
break;
+
+ default:
+ return -1;
}
/* OK */
if (!tsf || !tsi)
end = 1;
+ /* Find start/end of buffer */
+ if (end) {
+ if (back) {
+ tsf = timeshift_filemgr_last(ts);
+ tsi = NULL;
+ while (tsf && !tsi) {
+ if (!(tsi = TAILQ_FIRST(&tsf->iframes)))
+ tsf = timeshift_filemgr_next(tsf, &end, 0);
+ }
+ end = -1;
+ } else {
+ tsf = timeshift_filemgr_get(ts, ts->ondemand);
+ tsi = NULL;
+ while (tsf && !tsi) {
+ if (!(tsi = TAILQ_LAST(&tsf->iframes, timeshift_index_iframe_list)))
+ tsf = timeshift_filemgr_prev(tsf, &end, 0);
+ }
+ end = 1;
+ }
+ }
+
/* Done */
*new_file = tsf;
- *iframe = end ? NULL : tsi;
+ *iframe = tsi;
return end;
}
off_t cur_off = 0;
int cur_speed = 100, keyframe_mode = 0;
int64_t pause_time = 0, play_time = 0, last_time = 0;
- int64_t now, deliver, skip_time;
- streaming_message_t *sm = NULL, *ctrl;
+ int64_t now, deliver, skip_time = 0;
+ streaming_message_t *sm = NULL, *ctrl = NULL;
timeshift_index_iframe_t *tsi = NULL;
- streaming_skip_t *skip;
+ streaming_skip_t *skip = NULL;
/* Poll */
- struct epoll_event ev;
- efd = epoll_create(1);
+ struct epoll_event ev = { 0 };
+ efd = epoll_create(1);
ev.events = EPOLLIN;
ev.data.fd = ts->rd_pipe.rd;
epoll_ctl(efd, EPOLL_CTL_ADD, ev.data.fd, &ev);
nfds = 0;
wait = -1;
end = 0;
- skip_time = 0;
skip = NULL;
now = getmonoclock();
#endif
run = 0;
streaming_msg_free(ctrl);
+ ctrl = NULL;
/* Speed */
} else if (ctrl->sm_type == SMT_SPEED) {
/* Send on the message */
ctrl->sm_code = speed;
streaming_target_deliver2(ts->output, ctrl);
+ ctrl = NULL;
/* Skip/Seek */
} else if (ctrl->sm_type == SMT_SKIP) {
- skip = (streaming_skip_t *) ctrl->sm_data;
+ skip = ctrl->sm_data;
switch (skip->type) {
case SMT_SKIP_REL_TIME:
- skip_time = last_time + skip->time;
- break;
- case SMT_SKIP_ABS_TIME:
- skip_time = skip->time; // Wrong - need to use starttime of video too
- break;
- case SMT_SKIP_REL_SIZE:
- case SMT_SKIP_ABS_SIZE:
- tvhlog(LOG_DEBUG, "timeshift", "unsupported skip type: %d", skip->type);
+ tvhlog(LOG_DEBUG, "timeshift", "ts %d skip %"PRId64" requested", ts->id, skip->time);
+
+ /* Must handle live playback case */
+ if (ts->state == TS_LIVE) {
+ if (skip->time < 0) {
+ pthread_mutex_lock(&ts->rdwr_mutex);
+ if ((cur_file = timeshift_filemgr_get(ts, ts->ondemand))) {
+ ts->state = TS_PLAY;
+ cur_off = cur_file->size;
+ last_time = cur_file->last;
+ } else {
+ tvhlog(LOG_ERR, "timeshift", "ts %d failed to get current file", ts->id);
+ skip = NULL;
+ break;
+ }
+ pthread_mutex_unlock(&ts->rdwr_mutex);
+ } else {
+ tvhlog(LOG_DEBUG, "timeshift", "ts %d skip ignored, already live", ts->id);
+ skip = NULL;
+ }
+ }
+
+ /* OK */
+ if (skip) {
+ /* Adjust time */
+ play_time = now;
+ pause_time = skip_time = last_time + skip->time;
+ tsi = NULL;
+
+ /* Clear existing packet */
+ if (sm)
+ streaming_msg_free(sm);
+ sm = NULL;
+ }
break;
default:
- tvhlog(LOG_ERR, "timeshift", "invalid skip type: %d", skip->type);
+ tvhlog(LOG_ERR, "timeshift", "ts %d invalid/unsupported skip type: %d", ts->id, skip->type);
+ skip = NULL;
+ break;
}
- if (!skip_time)
- streaming_msg_free(ctrl);
+
+ /* Error */
+ if (!skip) {
+ ((streaming_skip_t*)ctrl->sm_data)->type = SMT_SKIP_ERROR;
+ streaming_target_deliver2(ts->output, ctrl);
+ ctrl = NULL;
+ }
+
/* Ignore */
} else {
streaming_msg_free(ctrl);
+ ctrl = NULL;
}
}
}
/* Done */
- if (!run || ts->state != TS_PLAY || !cur_file) {
+ if (!run || !cur_file || ((ts->state != TS_PLAY && !skip))) {
pthread_mutex_unlock(&ts->state_mutex);
continue;
}
if (!sm) {
/* Rewind or Fast forward (i-frame only) */
- if (skip_time || keyframe_mode) {
+ if (skip || keyframe_mode) {
timeshift_file_t *tsf = NULL;
time_t req_time;
/* Time */
- if (!skip_time)
+ if (!skip)
req_time = last_time + ((cur_speed < 0) ? -1 : 1);
else
req_time = skip_time;
end = _timeshift_skip(ts, req_time, last_time,
cur_file, &tsf, &tsi);
- /* Adjust skip time to actual */
- if (skip_time) {
- skip->time += (tsi->time - skip_time);
- streaming_target_deliver2(ts->output, ctrl);
- }
-
/* File changed (close) */
if ((tsf != cur_file) && (fd != -1)) {
close(fd);
cur_file = tsf;
if (tsi)
cur_off = tsi->pos;
+ else
+ cur_off = 0;
}
/* Find packet */
- if (cur_file && !end) {
+ if (cur_file) {
/* Open file */
if (fd == -1) {
#endif
fd = open(cur_file->path, O_RDONLY);
}
- if (cur_off) lseek(fd, cur_off, SEEK_SET);
+ if (cur_off) {
+#ifdef TSHFT_TRACE
+ tvhlog(LOG_DEBUG, "timeshift", "ts %d seek to %lu", ts->id, cur_off);
+#endif
+ lseek(fd, cur_off, SEEK_SET);
+ }
/* Read msg */
ssize_t r = _read_msg(fd, &sm);
- assert(r != -1);
+ if (r < 0) {
+ streaming_message_t *e = streaming_msg_create_code(SMT_STOP, SM_CODE_UNDEFINED_ERROR);
+ streaming_target_deliver2(ts->output, e);
+ tvhlog(LOG_ERR, "timeshift", "ts %d could not read buffer", ts->id);
+ pthread_mutex_unlock(&ts->rdwr_mutex);
+ pthread_mutex_unlock(&ts->state_mutex);
+ break;
+ }
#ifdef TSHFT_TRACE
tvhlog(LOG_DEBUG, "timeshift", "ts %d read msg %p (%ld)",
ts->id, sm, r);
}
}
+ /* Send skip response */
+ if (skip) {
+ if (sm && sm->sm_type == SMT_PACKET) {
+ th_pkt_t *pkt = sm->sm_data;
+ skip->time = pkt->pkt_pts;
+ skip->type = SMT_SKIP_ABS_TIME;
+ tvhlog(LOG_DEBUG, "timeshift", "ts %d skip to %"PRId64" ok", ts->id, skip->time);
+ } else {
+ /* Report error */
+ skip->type = SMT_SKIP_ERROR;
+ skip = NULL;
+ tvhlog(LOG_DEBUG, "timeshift", "ts %d skip failed", ts->id);
+ }
+ streaming_target_deliver2(ts->output, ctrl);
+ ctrl = NULL;
+ }
+
/* Deliver */
- if (sm && (skip_time ||
+ if (sm && (skip ||
(((cur_speed < 0) && (sm->sm_time >= deliver)) ||
((cur_speed > 0) && (sm->sm_time <= deliver))))) {
+ sm->sm_timeshift = now - sm->sm_time;
#ifdef TSHFT_TRACE
- tvhlog(LOG_DEBUG, "timeshift", "ts %d deliver %"PRItime_t,
- ts->id, sm->sm_time);
+ {
+ time_t pts = 0;
+ if (sm->sm_type == SMT_PACKET)
+ pts = ((th_pkt_t*)sm->sm_data)->pkt_pts;
+ tvhlog(LOG_DEBUG, "timeshift", "ts %d deliver %"PRItime_t" pts=%"PRItime_t " shift=%"PRIu64,
+ ts->id, sm->sm_time, pts, sm->sm_timeshift );
+ }
#endif
- sm->sm_timeshift = now - sm->sm_time;
streaming_target_deliver2(ts->output, sm);
last_time = sm->sm_time;
sm = NULL;
}
/* Terminate */
- if (!cur_file || end) {
+ if (!cur_file || end != 0) {
+ if (!end)
+ end = (cur_file > 0) ? 1 : -1;
/* Back to live */
- if (cur_speed > 0) {
+ if (end == 1) {
tvhlog(LOG_DEBUG, "timeshift", "ts %d eob revert to live mode", ts->id);
ts->state = TS_LIVE;
cur_speed = 100;
timeshift_filemgr_flush(ts, NULL);
/* Pause */
- } else if (cur_speed < 0) {
+ } else {
tvhlog(LOG_DEBUG, "timeshift", "ts %d sob pause stream", ts->id);
- cur_speed = 0;
- ts->state = TS_PAUSE;
- ctrl = streaming_msg_create_code(SMT_SPEED, cur_speed);
+ cur_speed = 0;
+ ts->state = TS_PAUSE;
+ pause_time = now;
+ ctrl = streaming_msg_create_code(SMT_SPEED, cur_speed);
streaming_target_deliver2(ts->output, ctrl);
}
+ ctrl = NULL;
/* Flush unwanted */
} else if (ts->ondemand && cur_file) {
}
/* Cleanup */
- if (sm) streaming_msg_free(sm);
+ if (sm) streaming_msg_free(sm);
+ if (ctrl) streaming_msg_free(ctrl);
#ifdef TSHFT_TRACE
tvhlog(LOG_DEBUG, "timeshift", "ts %d exit reader thread", ts->id);
#endif