#include <pthread.h>
#include "htsmsg.h"
#include "psi.h"
+#include "tvhpoll.h"
struct service;
struct th_dvb_table;
th_dvb_mux_instance_t *tda_mux_epg;
- int tda_table_epollfd;
+ tvhpoll_t *tda_table_pd;
uint32_t tda_enabled;
#include <sys/types.h>
#include <sys/stat.h>
#include <sys/ioctl.h>
-#include <sys/epoll.h>
#include <sys/types.h>
#include <dirent.h>
#include <fcntl.h>
#include "epggrab.h"
#include "diseqc.h"
#include "atomic.h"
+#include "tvhpoll.h"
struct th_dvb_adapter_queue dvb_adapters;
struct th_dvb_mux_instance_tree dvb_muxes;
dvb_adapter_input_dvr(void *aux)
{
th_dvb_adapter_t *tda = aux;
- int fd = -1, i, r, c, efd, nfds, dmx = -1;
+ int fd = -1, i, r, c, nfds, dmx = -1;
uint8_t tsb[188 * 10];
service_t *t;
- struct epoll_event ev;
- int delay = 10;
+ tvhpoll_t *pd;
+ tvhpoll_event_t ev[2];
/* Install RAW demux */
if (tda->tda_rawmode) {
return NULL;
}
- /* Create poll */
- efd = epoll_create(2);
- memset(&ev, 0, sizeof(ev));
- ev.events = EPOLLIN;
- ev.data.fd = tda->tda_dvr_pipe.rd;
- epoll_ctl(efd, EPOLL_CTL_ADD, tda->tda_dvr_pipe.rd, &ev);
- ev.data.fd = fd;
- epoll_ctl(efd, EPOLL_CTL_ADD, fd, &ev);
+ pd = tvhpoll_create(2);
+ memset(ev, 0, sizeof(ev));
+ ev[0].data.fd = ev[0].fd = tda->tda_dvr_pipe.rd;
+ ev[0].events = TVHPOLL_IN;
+ ev[1].data.fd = ev[1].fd = fd;
+ ev[1].events = TVHPOLL_IN;
+ tvhpoll_add(pd, ev, 2);
r = i = 0;
while(1) {
/* Wait for input */
- nfds = epoll_wait(efd, &ev, 1, delay);
+ nfds = tvhpoll_wait(pd, ev, 1, -1);
/* No data */
if (nfds < 1) continue;
/* Exit */
- if (ev.data.fd != fd) break;
+ if (ev[0].data.fd != fd) break;
/* Read data */
c = read(fd, tsb+r, sizeof(tsb)-r);
if(dmx != -1)
close(dmx);
- close(efd);
+ tvhpoll_destroy(pd);
close(fd);
return NULL;
}
#include <fcntl.h>
#include <linux/dvb/frontend.h>
#include <linux/dvb/dmx.h>
-#include <sys/epoll.h>
#include "tvheadend.h"
#include "dvb.h"
#include "service.h"
+#include "tvhpoll.h"
/**
* Install filters for a service
if(fd == -1) {
st->es_demuxer_fd = -1;
tvhlog(LOG_ERR, "dvb",
- "\"%s\" unable to open demuxer \"%s\" for pid %d -- %s",
- s->s_identifier, tda->tda_demux_path,
- st->es_pid, strerror(errno));
+ "\"%s\" unable to open demuxer \"%s\" for pid %d -- %s",
+ s->s_identifier, tda->tda_demux_path,
+ st->es_pid, strerror(errno));
continue;
}
if(ioctl(fd, DMX_SET_PES_FILTER, &dmx_param)) {
tvhlog(LOG_ERR, "dvb",
- "\"%s\" unable to configure demuxer \"%s\" for pid %d -- %s",
- s->s_identifier, tda->tda_demux_path,
- st->es_pid, strerror(errno));
+ "\"%s\" unable to configure demuxer \"%s\" for pid %d -- %s",
+ s->s_identifier, tda->tda_demux_path,
+ st->es_pid, strerror(errno));
close(fd);
fd = -1;
}
open_table(th_dvb_mux_instance_t *tdmi, th_dvb_table_t *tdt)
{
th_dvb_adapter_t *tda = tdmi->tdmi_adapter;
- struct epoll_event e;
+ tvhpoll_event_t ev;
static int tdt_id_tally;
tdt->tdt_fd = tvh_open(tda->tda_demux_path, O_RDWR, 0);
tdt->tdt_id = ++tdt_id_tally;
- e.events = EPOLLIN;
- e.data.u64 = ((uint64_t)tdt->tdt_fd << 32) | tdt->tdt_id;
+ ev.fd = tdt->tdt_fd;
+ ev.events = TVHPOLL_IN;
+ ev.data.u64 = ((uint64_t)tdt->tdt_fd << 32) | tdt->tdt_id;
- if(epoll_ctl(tda->tda_table_epollfd, EPOLL_CTL_ADD, tdt->tdt_fd, &e)) {
+ if(tvhpoll_add(tda->tda_table_pd, &ev, 1) != 0) {
close(tdt->tdt_fd);
tdt->tdt_fd = -1;
} else {
fp.filter.mask[0] = tdt->tdt_mask;
if(tdt->tdt_flags & TDT_CRC)
- fp.flags |= DMX_CHECK_CRC;
+ fp.flags |= DMX_CHECK_CRC;
fp.flags |= DMX_IMMEDIATE_START;
fp.pid = tdt->tdt_pid;
if(ioctl(tdt->tdt_fd, DMX_SET_FILTER, &fp)) {
- close(tdt->tdt_fd);
- tdt->tdt_fd = -1;
+ close(tdt->tdt_fd);
+ tdt->tdt_fd = -1;
}
}
}
tdt_close_fd(th_dvb_mux_instance_t *tdmi, th_dvb_table_t *tdt)
{
th_dvb_adapter_t *tda = tdmi->tdmi_adapter;
+ tvhpoll_event_t ev;
assert(tdt->tdt_fd != -1);
- epoll_ctl(tda->tda_table_epollfd, EPOLL_CTL_DEL, tdt->tdt_fd, NULL);
+ ev.fd = tdt->tdt_fd;
+ tvhpoll_rem(tda->tda_table_pd, &ev, 1);
close(tdt->tdt_fd);
tdt->tdt_fd = -1;
dvb_table_input(void *aux)
{
th_dvb_adapter_t *tda = aux;
- int r, i, tid, fd, x;
- struct epoll_event ev[1];
+ int r, tid, fd, x;
uint8_t sec[4096];
th_dvb_mux_instance_t *tdmi;
th_dvb_table_t *tdt;
int64_t cycle_barrier = 0;
+ tvhpoll_event_t ev;
while(1) {
- x = epoll_wait(tda->tda_table_epollfd, ev, sizeof(ev) / sizeof(ev[0]), -1);
-
- for(i = 0; i < x; i++) {
-
- tid = ev[i].data.u64 & 0xffffffff;
- fd = ev[i].data.u64 >> 32;
-
- if(!(ev[i].events & EPOLLIN))
- continue;
-
- if((r = read(fd, sec, sizeof(sec))) < 3)
- continue;
-
- pthread_mutex_lock(&global_lock);
- if((tdmi = tda->tda_mux_current) != NULL) {
- LIST_FOREACH(tdt, &tdmi->tdmi_tables, tdt_link)
- if(tdt->tdt_id == tid)
- break;
-
- if(tdt != NULL) {
- dvb_table_dispatch(sec, r, tdt);
-
- /* Any tables pending (that wants a filter/fd), close this one */
- if(TAILQ_FIRST(&tdmi->tdmi_table_queue) != NULL &&
- cycle_barrier < getmonoclock()) {
- tdt_close_fd(tdmi, tdt);
- cycle_barrier = getmonoclock() + 100000;
- tdt = TAILQ_FIRST(&tdmi->tdmi_table_queue);
- assert(tdt != NULL);
- TAILQ_REMOVE(&tdmi->tdmi_table_queue, tdt, tdt_pending_link);
-
- open_table(tdmi, tdt);
- }
- }
+ x = tvhpoll_wait(tda->tda_table_pd, &ev, 1, -1);
+ if (x != 1) continue;
+
+ tid = ev.data.u64 & 0xffffffff;
+ fd = ev.data.u64 >> 32;
+
+ if(!(ev.events & TVHPOLL_IN))
+ continue;
+
+ if((r = read(fd, sec, sizeof(sec))) < 3)
+ continue;
+
+ pthread_mutex_lock(&global_lock);
+ if((tdmi = tda->tda_mux_current) != NULL) {
+ LIST_FOREACH(tdt, &tdmi->tdmi_tables, tdt_link)
+ if(tdt->tdt_id == tid)
+ break;
+
+ if(tdt != NULL) {
+ dvb_table_dispatch(sec, r, tdt);
+
+ /* Any tables pending (that wants a filter/fd), close this one */
+ if(TAILQ_FIRST(&tdmi->tdmi_table_queue) != NULL &&
+ cycle_barrier < getmonoclock()) {
+ tdt_close_fd(tdmi, tdt);
+ cycle_barrier = getmonoclock() + 100000;
+ tdt = TAILQ_FIRST(&tdmi->tdmi_table_queue);
+ assert(tdt != NULL);
+ TAILQ_REMOVE(&tdmi->tdmi_table_queue, tdt, tdt_pending_link);
+ open_table(tdmi, tdt);
+ }
}
- pthread_mutex_unlock(&global_lock);
}
+ pthread_mutex_unlock(&global_lock);
}
return NULL;
}
close_table(th_dvb_mux_instance_t *tdmi, th_dvb_table_t *tdt)
{
th_dvb_adapter_t *tda = tdmi->tdmi_adapter;
+ tvhpoll_event_t ev;
if(tdt->tdt_fd == -1) {
TAILQ_REMOVE(&tdmi->tdmi_table_queue, tdt, tdt_pending_link);
} else {
- epoll_ctl(tda->tda_table_epollfd, EPOLL_CTL_DEL, tdt->tdt_fd, NULL);
+ ev.fd = tdt->tdt_fd;
+ tvhpoll_rem(tda->tda_table_pd, &ev, 1);
close(tdt->tdt_fd);
}
}
tda->tda_close_table = close_table;
pthread_t ptid;
- tda->tda_table_epollfd = epoll_create(50);
+ tda->tda_table_pd = tvhpoll_create(50);
pthread_create(&ptid, NULL, dvb_table_input, tda);
}
#include <sys/socket.h>
#include <sys/types.h>
#include <sys/ioctl.h>
-#include <sys/epoll.h>
#include <fcntl.h>
#include <assert.h>
#include "tsdemux.h"
#include "psi.h"
#include "settings.h"
+#include "tvhpoll.h"
#if defined(PLATFORM_LINUX)
#include <linux/netdevice.h>
# endif
#endif
-static int iptv_thread_running;
-static int iptv_epollfd;
+static int iptv_thread_running;
+static tvhpoll_t *iptv_poll;
static pthread_mutex_t iptv_recvmutex;
struct service_list iptv_all_services; /* All IPTV services */
{
int nfds, fd, r, j, hlen;
uint8_t tsb[65536], *buf;
- struct epoll_event ev;
+ tvhpoll_event_t ev;
service_t *t;
while(1) {
- nfds = epoll_wait(iptv_epollfd, &ev, 1, -1);
+ nfds = tvhpoll_wait(iptv_poll, &ev, 1, -1);
if(nfds == -1) {
tvhlog(LOG_ERR, "IPTV", "epoll() error -- %s, sleeping 1 second",
strerror(errno));
struct sockaddr_in sin;
struct sockaddr_in6 sin6;
struct ifreq ifr;
- struct epoll_event ev;
+ tvhpoll_event_t ev;
assert(t->s_iptv_fd == -1);
if(iptv_thread_running == 0) {
iptv_thread_running = 1;
- iptv_epollfd = epoll_create(10);
+ iptv_poll = tvhpoll_create(10);
pthread_create(&tid, NULL, iptv_thread, NULL);
}
resize, strerror(errno));
memset(&ev, 0, sizeof(ev));
- ev.events = EPOLLIN;
+ ev.events = TVHPOLL_IN;
+ ev.fd = fd;
ev.data.fd = fd;
- if(epoll_ctl(iptv_epollfd, EPOLL_CTL_ADD, fd, &ev) == -1) {
+ if(tvhpoll_add(iptv_poll, &ev, 1) == -1) {
tvhlog(LOG_ERR, "IPTV", "\"%s\" cannot add to epoll set -- %s",
t->s_identifier, strerror(errno));
close(fd);
#endif
}
close(t->s_iptv_fd); // Automatically removes fd from epoll set
+ // TODO: this is an issue
t->s_iptv_fd = -1;
}
#include <pthread.h>
#include <netdb.h>
-#include <sys/epoll.h>
#include <poll.h>
#include <assert.h>
#include <stdio.h>
#include "tcp.h"
#include "tvheadend.h"
+#include "tvhpoll.h"
int tcp_preferred_address_family = AF_INET;
r = poll(&pfd, 1, timeout * 1000);
if(r == 0) {
- /* Timeout */
- snprintf(errbuf, errbufsize, "Connection attempt timed out");
- close(fd);
- return -1;
+ /* Timeout */
+ snprintf(errbuf, errbufsize, "Connection attempt timed out");
+ close(fd);
+ return -1;
}
if(r == -1) {
- snprintf(errbuf, errbufsize, "poll() error: %s", strerror(errno));
- close(fd);
- return -1;
+ snprintf(errbuf, errbufsize, "poll() error: %s", strerror(errno));
+ close(fd);
+ return -1;
}
getsockopt(fd, SOL_SOCKET, SO_ERROR, (void *)&err, &errlen);
x = recv(fd, buf + tot, len - tot, MSG_DONTWAIT);
if(x == -1) {
if(errno == EAGAIN)
- continue;
+ continue;
return errno;
}
/**
*
*/
-static int tcp_server_epoll_fd;
+static tvhpoll_t *tcp_server_poll;
typedef struct tcp_server {
tcp_server_callback_t *start;
static void *
tcp_server_loop(void *aux)
{
- int r, i;
- struct epoll_event ev[1];
+ int r;
+ tvhpoll_event_t ev;
tcp_server_t *ts;
tcp_server_launch_t *tsl;
pthread_attr_t attr;
pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_DETACHED);
while(1) {
- r = epoll_wait(tcp_server_epoll_fd, ev, sizeof(ev) / sizeof(ev[0]), -1);
+ r = tvhpoll_wait(tcp_server_poll, &ev, 1, -1);
if(r == -1) {
- perror("tcp_server: epoll_wait");
+ perror("tcp_server: tchpoll_wait");
continue;
}
- for(i = 0; i < r; i++) {
- ts = ev[i].data.ptr;
+ if (r == 0) continue;
- if(ev[i].events & EPOLLHUP) {
- close(ts->serverfd);
- free(ts);
- continue;
- }
+ ts = ev.data.ptr;
- if(ev[i].events & EPOLLIN) {
- tsl = malloc(sizeof(tcp_server_launch_t));
- tsl->start = ts->start;
- tsl->opaque = ts->opaque;
- slen = sizeof(struct sockaddr_storage);
-
- tsl->fd = accept(ts->serverfd,
- (struct sockaddr *)&tsl->peer, &slen);
- if(tsl->fd == -1) {
- perror("accept");
- free(tsl);
- sleep(1);
- continue;
- }
-
-
- slen = sizeof(struct sockaddr_storage);
- if(getsockname(tsl->fd, (struct sockaddr *)&tsl->self, &slen)) {
- close(tsl->fd);
- free(tsl);
- continue;
- }
-
- pthread_create(&tid, &attr, tcp_server_start, tsl);
- }
+ if(ev.events & TVHPOLL_HUP) {
+ close(ts->serverfd);
+ free(ts);
+ continue;
+ }
+
+ if(ev.events & TVHPOLL_IN) {
+ tsl = malloc(sizeof(tcp_server_launch_t));
+ tsl->start = ts->start;
+ tsl->opaque = ts->opaque;
+ slen = sizeof(struct sockaddr_storage);
+
+ tsl->fd = accept(ts->serverfd,
+ (struct sockaddr *)&tsl->peer, &slen);
+ if(tsl->fd == -1) {
+ perror("accept");
+ free(tsl);
+ sleep(1);
+ continue;
+ }
+
+ slen = sizeof(struct sockaddr_storage);
+ if(getsockname(tsl->fd, (struct sockaddr *)&tsl->self, &slen)) {
+ close(tsl->fd);
+ free(tsl);
+ continue;
+ }
+
+ pthread_create(&tid, &attr, tcp_server_start, tsl);
}
}
return NULL;
tcp_server_create(const char *bindaddr, int port, tcp_server_callback_t *start, void *opaque)
{
int fd, x;
- struct epoll_event e;
+ tvhpoll_event_t ev;
tcp_server_t *ts;
struct addrinfo hints, *res, *ressave, *use = NULL;
char *portBuf = (char*)malloc(6);
int one = 1;
int zero = 0;
- memset(&e, 0, sizeof(e));
+ memset(&ev, 0, sizeof(ev));
snprintf(portBuf, 6, "%d", port);
ts->start = start;
ts->opaque = opaque;
- e.events = EPOLLIN;
- e.data.ptr = ts;
- epoll_ctl(tcp_server_epoll_fd, EPOLL_CTL_ADD, fd, &e);
+ ev.fd = fd;
+ ev.events = TVHPOLL_IN;
+ ev.data.ptr = ts;
+ tvhpoll_add(tcp_server_poll, &ev, 1);
return ts;
}
if(opt_ipv6)
tcp_preferred_address_family = AF_INET6;
- tcp_server_epoll_fd = epoll_create(10);
+ tcp_server_poll = tvhpoll_create(10);
pthread_create(&tid, NULL, tcp_server_loop, NULL);
}
#include "timeshift.h"
#include "timeshift/private.h"
#include "atomic.h"
+#include "tvhpoll.h"
#include <sys/types.h>
#include <sys/stat.h>
void *timeshift_reader ( void *p )
{
timeshift_t *ts = p;
- int efd, nfds, end, fd = -1, run = 1, wait = -1;
+ int nfds, end, fd = -1, run = 1, wait = -1;
timeshift_file_t *cur_file = NULL;
off_t cur_off = 0;
int cur_speed = 100, keyframe_mode = 0;
timeshift_index_iframe_t *tsi = NULL;
streaming_skip_t *skip = NULL;
time_t last_status = 0;
+ tvhpoll_t *pd;
+ tvhpoll_event_t ev = { 0 };
- /* Poll */
- struct epoll_event ev = { 0 };
- efd = epoll_create(1);
- ev.events = EPOLLIN;
- ev.data.fd = ts->rd_pipe.rd;
- epoll_ctl(efd, EPOLL_CTL_ADD, ev.data.fd, &ev);
+ pd = tvhpoll_create(1);
+ ev.fd = ts->rd_pipe.rd;
+ ev.events = TVHPOLL_IN;
+ tvhpoll_add(pd, &ev, 1);
/* Output */
while (run) {
/* Wait for data */
if(wait)
- nfds = epoll_wait(efd, &ev, 1, wait);
+ nfds = tvhpoll_wait(pd, &ev, 1, wait);
else
nfds = 0;
wait = -1;
/* Control */
pthread_mutex_lock(&ts->state_mutex);
if (nfds == 1) {
- if (_read_msg(ev.data.fd, &ctrl) > 0) {
+ if (_read_msg(ts->rd_pipe.rd, &ctrl) > 0) {
/* Exit */
if (ctrl->sm_type == SMT_EXIT) {
}
/* Cleanup */
+ tvhpoll_destroy(pd);
if (fd != -1) close(fd);
if (sm) streaming_msg_free(sm);
if (ctrl) streaming_msg_free(ctrl);