void mpegts_input_recv_packets
(mpegts_input_t *mi, mpegts_mux_instance_t *mmi, sbuf_t *sb,
- int64_t *pcr, uint16_t *pcr_pid);
+ int64_t *pcr_first, int64_t *pcr_last, uint16_t *pcr_pid);
int mpegts_input_get_weight ( mpegts_input_t *mi, mpegts_mux_t *mm, int flags );
int mpegts_input_get_priority ( mpegts_input_t *mi, mpegts_mux_t *mm, int flags );
#include "htsstr.h"
#include "channels.h"
#include "bouquet.h"
+#include "packet.h"
#include <sys/socket.h>
#include <sys/types.h>
pthread_mutex_lock(&iptv_lock);
+ gtimer_disarm(&im->im_pause_timer);
+
/* Stop */
if (im->im_handler->stop)
im->im_handler->stop(im);
snprintf(buf, len, "IPTV");
}
+static void
+iptv_input_unpause ( void *aux )
+{
+ iptv_mux_t *im = aux;
+ pthread_mutex_lock(&iptv_lock);
+ tvhtrace("iptv-pcr", "unpause timer callback");
+ im->im_handler->pause(im, 0);
+ pthread_mutex_unlock(&iptv_lock);
+}
+
static void *
iptv_input_thread ( void *aux )
{
- int nfds;
+ int nfds, r;
ssize_t n;
iptv_mux_t *im;
tvhpoll_event_t ev;
continue;
}
im = ev.data.ptr;
+ r = 0;
pthread_mutex_lock(&iptv_lock);
im->im_handler->stop(im);
break;
}
- iptv_input_recv_packets(im, n);
+ r = iptv_input_recv_packets(im, n);
+ if (r == 1)
+ im->im_handler->pause(im, 1);
}
pthread_mutex_unlock(&iptv_lock);
+
+ if (r == 1) {
+ pthread_mutex_lock(&global_lock);
+ if (im->mm_active)
+ gtimer_arm(&im->im_pause_timer, iptv_input_unpause, im, 1);
+ pthread_mutex_unlock(&global_lock);
+ }
}
return NULL;
}
void
+iptv_input_pause_handler ( iptv_mux_t *im, int pause )
+{
+ tvhpoll_event_t ev = { 0 };
+
+ ev.fd = im->mm_iptv_fd;
+ ev.events = TVHPOLL_IN;
+ ev.data.ptr = im;
+ if (pause)
+ tvhpoll_rem(iptv_poll, &ev, 1);
+ else
+ tvhpoll_add(iptv_poll, &ev, 1);
+}
+
+static inline int
+iptv_input_pause_check ( iptv_mux_t *im )
+{
+ /* queued more than 3 seconds? trigger the pause */
+ return im->im_pcr_end - im->im_pcr_start >= 3000000LL;
+}
+
+int
iptv_input_recv_packets ( iptv_mux_t *im, ssize_t len )
{
static time_t t1 = 0, t2;
iptv_network_t *in = (iptv_network_t*)im->mm_network;
mpegts_mux_instance_t *mmi;
+ int64_t pcr_first = PTS_UNSET, pcr_last = PTS_UNSET, s64;
in->in_bps += len * 8;
time(&t2);
t1 = t2;
}
- /* Pass on */
+ /* Pass on, but with timing */
mmi = im->mm_active;
- if (mmi)
+ if (mmi) {
+ if (im->im_pcr != PTS_UNSET) {
+ s64 = getmonoclock() - im->im_pcr_start;
+ im->im_pcr_start += s64;
+ im->im_pcr += (((s64 / 10LL) * 9LL) + 4LL) / 10LL;
+ im->im_pcr &= PTS_MASK;
+ tvhtrace("iptv-pcr", "pcr: updated %ld, time start %ld", im->im_pcr, im->im_pcr_start);
+ }
+ if (iptv_input_pause_check(im)) {
+ tvhtrace("iptv-pcr", "pcr: paused");
+ return 1;
+ }
mpegts_input_recv_packets((mpegts_input_t*)iptv_input, mmi,
- &im->mm_iptv_buffer, NULL, NULL);
+ &im->mm_iptv_buffer,
+ &pcr_first, &pcr_last, &im->im_pcr_pid);
+ if (pcr_first != PTS_UNSET && pcr_last != PTS_UNSET) {
+ if (im->im_pcr == PTS_UNSET) {
+ s64 = pts_diff(pcr_first, pcr_last);
+ if (s64 != PTS_UNSET) {
+ im->im_pcr = pcr_first;
+ im->im_pcr_start = getmonoclock();
+ im->im_pcr_end = im->im_pcr_start + ((s64 * 100LL) + 50LL) / 9LL;
+ tvhtrace("iptv-pcr", "pcr: first %ld last %ld, time start %ld, end %ld",
+ pcr_first, pcr_last, im->im_pcr_start, im->im_pcr_end);
+ }
+ } else {
+ s64 = pts_diff(im->im_pcr, pcr_last);
+ if (s64 != PTS_UNSET) {
+ im->im_pcr_end = im->im_pcr_start + ((s64 * 100LL) + 50LL) / 9LL;
+ tvhtrace("iptv-pcr", "pcr: last %ld, time end %ld", pcr_last, im->im_pcr_end);
+ }
+ }
+ if (iptv_input_pause_check(im)) {
+ tvhtrace("iptv-pcr", "pcr: paused");
+ return 1;
+ }
+ }
+ }
+ return 0;
}
+
int
iptv_input_fd_started ( iptv_mux_t *im )
{
/* Allocate input buffer */
sbuf_reset_and_alloc(&im->mm_iptv_buffer, IPTV_BUF_SIZE);
+ im->im_pcr = PTS_UNSET;
+ im->im_pcr_pid = MPEGTS_PID_NONE;
+
if (iptv_input_fd_started(im))
return;
#include <sys/types.h>
#include <fcntl.h>
#include <assert.h>
+#include <signal.h>
+
+typedef struct file_priv {
+ int fd;
+ int shutdown;
+ pthread_t tid;
+ pthread_cond_t cond;
+} file_priv_t;
/*
- * Connect UDP/RTP
+ * Read thread
+ */
+static void *
+iptv_file_thread ( void *aux )
+{
+ iptv_mux_t *im = aux;
+ file_priv_t *fp = im->im_data;
+ struct timespec ts;
+ int r, fd = fp->fd, pause = 0;
+ char buf[32*1024];
+
+ pthread_mutex_lock(&iptv_lock);
+ while (!fp->shutdown && fd > 0) {
+ while (!fp->shutdown && pause) {
+ clock_gettime(CLOCK_REALTIME, &ts);
+ ts.tv_sec += 1;
+ if (pthread_cond_timedwait(&fp->cond, &iptv_lock, &ts) == ETIMEDOUT)
+ break;
+ }
+ if (fp->shutdown)
+ break;
+ pause = 0;
+ pthread_mutex_unlock(&iptv_lock);
+ r = read(fd, buf, sizeof(buf));
+ pthread_mutex_lock(&iptv_lock);
+ if (r == 0)
+ break;
+ if (r < 0) {
+ if (ERRNO_AGAIN(errno))
+ continue;
+ break;
+ }
+ sbuf_append(&im->mm_iptv_buffer, buf, r);
+ if (iptv_input_recv_packets(im, r) == 1)
+ pause = 1;
+ }
+ pthread_mutex_unlock(&iptv_lock);
+ return NULL;
+}
+
+/*
+ * Open file
*/
static int
iptv_file_start ( iptv_mux_t *im, const char *raw, const url_t *url )
{
- int fd = tvh_open(raw + 7, O_RDONLY | O_DIRECT, 0);
+ file_priv_t *fp;
+ int fd = tvh_open(raw + 7, O_RDONLY | O_NONBLOCK, 0);
if (fd < 0) {
tvherror("iptv", "unable to open file '%s'", raw + 7);
return -1;
}
- im->mm_iptv_fd = fd;
-
+ fp = calloc(1, sizeof(*fp));
+ fp->fd = fd;
+ pthread_cond_init(&fp->cond, NULL);
+ im->im_data = fp;
iptv_input_mux_started(im);
+ tvhthread_create(&fp->tid, NULL, iptv_file_thread, im, "iptvfile");
return 0;
}
iptv_file_stop
( iptv_mux_t *im )
{
- int rd = im->mm_iptv_fd;
+ file_priv_t *fp = im->im_data;
+ int rd = fp->fd;
if (rd > 0)
close(rd);
- im->mm_iptv_fd = -1;
-}
-
-static ssize_t
-iptv_file_read ( iptv_mux_t *im )
-{
- int r, fd = im->mm_iptv_fd;
- ssize_t res = 0;
-
- while (fd > 0) {
- sbuf_alloc(&im->mm_iptv_buffer, 32*1024);
- r = sbuf_read(&im->mm_iptv_buffer, fd);
- if (r == 0) {
- close(fd);
- im->mm_iptv_fd = -1;
- break;
- }
- if (r < 0) {
- if (errno == EAGAIN)
- break;
- if (ERRNO_AGAIN(errno))
- continue;
- }
- res += r;
- }
-
- return res;
+ fp->shutdown = 1;
+ pthread_cond_signal(&fp->cond);
+ pthread_mutex_unlock(&iptv_lock);
+ pthread_join(fp->tid, NULL);
+ pthread_cond_destroy(&fp->cond);
+ pthread_mutex_lock(&iptv_lock);
+ free(im->im_data);
+ im->im_data = NULL;
}
/*
- * Initialise pipe handler
+ * Initialise file handler
*/
void
{
.scheme = "file",
.start = iptv_file_start,
- .stop = iptv_file_stop,
- .read = iptv_file_read,
+ .stop = iptv_file_stop
},
};
iptv_handler_register(ih, ARRAY_SIZE(ih));
tsdebug_write((mpegts_mux_t *)im, buf, len);
if (len > 0)
- iptv_input_recv_packets(im, len);
+ if (iptv_input_recv_packets(im, len) == 1)
+ hc->hc_pause = 1;
pthread_mutex_unlock(&iptv_lock);
}
+/*
+ * Pause/Unpause
+ */
+static void
+iptv_http_pause
+ ( iptv_mux_t *im, int pause )
+{
+ http_client_t *hc = im->im_data;
+
+ assert(pause == 0);
+ http_client_unpause(hc);
+}
+
/*
* Initialise HTTP handler
*/
.scheme = "http",
.start = iptv_http_start,
.stop = iptv_http_stop,
+ .pause = iptv_http_pause
},
{
.scheme = "https",
.start = iptv_http_start,
.stop = iptv_http_stop,
+ .pause = iptv_http_pause
}
};
iptv_handler_register(ih, 2);
.start = iptv_pipe_start,
.stop = iptv_pipe_stop,
.read = iptv_pipe_read,
+ .pause = iptv_input_pause_handler
},
};
iptv_handler_register(ih, ARRAY_SIZE(ih));
int (*start) ( iptv_mux_t *im, const char *raw, const url_t *url );
void (*stop) ( iptv_mux_t *im );
ssize_t (*read) ( iptv_mux_t *im );
+ void (*pause) ( iptv_mux_t *im, int pause );
RB_ENTRY(iptv_handler) link;
};
int iptv_input_fd_started ( iptv_mux_t *im );
void iptv_input_mux_started ( iptv_mux_t *im );
-void iptv_input_recv_packets ( iptv_mux_t *im, ssize_t len );
+int iptv_input_recv_packets ( iptv_mux_t *im, ssize_t len );
+void iptv_input_pause_handler ( iptv_mux_t *im, int pause );
struct iptv_network
{
sbuf_t mm_iptv_buffer;
iptv_handler_t *im_handler;
+ gtimer_t im_pause_timer;
+
+ int64_t im_pcr;
+ int64_t im_pcr_start;
+ int64_t im_pcr_end;
+ uint16_t im_pcr_pid;
void *im_data;
.start = iptv_udp_start,
.stop = iptv_udp_stop,
.read = iptv_udp_read,
+ .pause = iptv_input_pause_handler
},
{
.scheme = "rtp",
.start = iptv_udp_start,
.stop = iptv_udp_stop,
.read = iptv_udp_rtp_read,
+ .pause = iptv_input_pause_handler
}
};
iptv_handler_register(ih, 2);
}
/* Process */
- mpegts_input_recv_packets((mpegts_input_t*)lfe, mmi, &sb, NULL, NULL);
+ mpegts_input_recv_packets((mpegts_input_t*)lfe, mmi, &sb, NULL, NULL, NULL);
}
sbuf_free(&sb);
void
mpegts_input_recv_packets
( mpegts_input_t *mi, mpegts_mux_instance_t *mmi, sbuf_t *sb,
- int64_t *pcr, uint16_t *pcr_pid )
+ int64_t *pcr_first, int64_t *pcr_last, uint16_t *pcr_pid )
{
int len2 = 0, off = 0;
mpegts_packet_t *mp;
// require per mmi buffers, where this is generally not required)
/* Extract PCR on demand */
- if (pcr && pcr_pid) {
- uint8_t *tmp;
- for (tmp = tsb + len2 - 188; tmp >= tsb; tmp -= 188) {
- uint16_t pid = ((tmp[1] & 0x1f) << 8) | tmp[2];
+ if (pcr_first && pcr_last && pcr_pid) {
+ uint8_t *tmp, *end;
+ uint16_t pid;
+ for (tmp = tsb, end = tsb + len2; tmp < end; tmp += 188) {
+ pid = ((tmp[1] & 0x1f) << 8) | tmp[2];
if (*pcr_pid == MPEGTS_PID_NONE || *pcr_pid == pid) {
- if (get_pcr(tmp, pcr)) {
- if (*pcr != PTS_UNSET) *pcr_pid = pid;
+ if (get_pcr(tmp, pcr_first)) {
+ *pcr_pid = pid;
break;
}
}
}
+ if (*pcr_pid != MPEGTS_PID_NONE) {
+ for (tmp = tsb + len2 - 188; tmp >= tsb; tmp -= 188) {
+ pid = ((tmp[1] & 0x1f) << 8) | tmp[2];
+ if (*pcr_pid == pid) {
+ if (get_pcr(tmp, pcr_last)) {
+ *pcr_pid = pid;
+ break;
+ }
+ }
+ }
+ }
}
/* Pass */
mmi = lfe->sf_req->sf_mmi;
mmi->tii_stats.unc += unc;
mpegts_input_recv_packets((mpegts_input_t*)lfe, mmi,
- &lfe->sf_sbuf, NULL, NULL);
+ &lfe->sf_sbuf, NULL, NULL, NULL);
}
pthread_mutex_unlock(&lfe->sf_dvr_lock);
lfe->sf_last_data_tstamp = dispatch_clock;
if (lfe->sf_req == lfe->sf_req_thread) {
mmi->tii_stats.unc += unc;
mpegts_input_recv_packets((mpegts_input_t*)lfe, mmi,
- sb, NULL, NULL);
+ sb, NULL, NULL, NULL);
} else
fatal = 1;
pthread_mutex_unlock(&lfe->sf_dvr_lock);
tvhpoll_event_t ev;
struct stat st;
sbuf_t buf;
- int64_t pcr, pcr_last = PTS_UNSET;
+ int64_t pcr, pcr2, pcr_last = PTS_UNSET;
#if PLATFORM_LINUX
int64_t pcr_last_realtime = 0;
#endif
if (c > 0) {
pcr = PTS_UNSET;
mpegts_input_recv_packets((mpegts_input_t*)mi, mmi, &buf,
- &pcr, &tmi->mmi_tsfile_pcr_pid);
+ &pcr, &pcr2, &tmi->mmi_tsfile_pcr_pid);
/* Delay */
if (pcr != PTS_UNSET) {
//tvhdebug("tvhdhomerun", "got r=%d (thats %d)", r, (r == 7*188));
- mpegts_input_recv_packets((mpegts_input_t*) hfe, mmi, &sb, NULL, NULL);
+ mpegts_input_recv_packets((mpegts_input_t*) hfe, mmi, &sb, NULL, NULL, NULL);
}
tvhdebug("tvhdhomerun", "setting target to none");
static inline size_t pktbuf_len(pktbuf_t *pb) { return pb ? pb->pb_size : 0; }
static inline uint8_t *pktbuf_ptr(pktbuf_t *pb) { return pb->pb_data; }
+static inline int64_t pts_diff(int64_t a, int64_t b)
+{
+ a &= PTS_MASK;
+ b &= PTS_MASK;
+ if (b < (PTS_MASK / 4) && a > (PTS_MASK / 2))
+ return b + PTS_MASK + 1 - a;
+ else if (b > a)
+ return b - a;
+ else
+ return PTS_UNSET;
+}
+
#endif /* PACKET_H_ */
#include "packet.h"
#include "streaming.h"
-#define PTS_MASK 0x1ffffffffLL
-//#define PTS_MASK 0x7ffffLL
-
/* parser states */
#define PARSER_APPEND 0
#define PARSER_RESET 1
} globalheaders_t;
-#define PTS_MASK 0x1ffffffffLL
#define MAX_SCAN_TIME 3500 // in ms
/**
#include "streaming.h"
#include "tsfix.h"
-#define PTS_MASK 0x1ffffffffLL
-
#define tsfixprintf(fmt...) // printf(fmt)
LIST_HEAD(tfstream_list, tfstream);
} str_list_t;
#define PTS_UNSET INT64_C(0x8000000000000000)
+#define PTS_MASK INT64_C(0x00000001ffffffff)
extern int tvheadend_running;