* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
-#define _GNU_SOURCE
-#include <sys/socket.h>
#include <fcntl.h>
#include "tvheadend.h"
#include "tvhpoll.h"
#include "http.h"
#include "satip_private.h"
-#define PKTS 64
-
-#ifndef CONFIG_RECVMMSG
-
-#ifdef __linux__
-
-/* define the syscall - works only for linux */
-
-#include <linux/unistd.h>
-
-struct mmsghdr {
- struct msghdr msg_hdr;
- unsigned int msg_len;
-};
-
-int recvmmsg(int sockfd, struct mmsghdr *msgvec, unsigned int vlen,
- unsigned int flags, struct timespec *timeout);
-
-#ifdef __NR_recvmmsg
-
-int recvmmsg(int sockfd, struct mmsghdr *msgvec, unsigned int vlen,
- unsigned int flags, struct timespec *timeout)
-{
- return syscall(__NR_recvmmsg, sockfd, msgvec, vlen, flags, timeout);
-}
-
-#else
-
-#undef PKTS
-#define PKTS 1
-/* receive only single packet */
-int recvmmsg(int sockfd, struct mmsghdr *msgvec, unsigned int vlen,
- unsigned int flags, struct timespec *timeout)
-{
- ssize_t r = recvmsg(sockfd, &msgvec->msg_hdr, flags);
- if (r < 0)
- return r;
- msgvec->msg_len = r;
- return 1;
-}
-
-#endif
-
-#else /* not __linux__ */
-
-#error "Add recvmmsg() support for your platform!!!"
-
-#endif
-
-#endif /* !CONFIG_RECVMMSG */
-
static int
satip_frontend_tune1
( satip_frontend_t *lfe, mpegts_mux_instance_t *mmi );
static void *
satip_frontend_input_thread ( void *aux )
{
+#define RTP_PKTS 64
+#define RTP_PKT_SIZE 1472 /* this is maximum UDP payload (standard ethernet) */
#define HTTP_CMD_NONE 9874
satip_frontend_t *lfe = aux;
mpegts_mux_instance_t *mmi = lfe->sf_mmi;
http_client_t *rtsp;
dvb_mux_t *lm;
char buf[256];
- uint8_t tsb[PKTS][1356 + 128];
+ struct iovec *iovec;
uint8_t rtcp[2048];
uint8_t *p;
sbuf_t sb;
- struct iovec iov[PKTS];
- struct mmsghdr msg[PKTS];
int pos, nfds, i, r;
size_t c;
int tc;
tvhpoll_t *efd;
int changing = 0, ms = -1, fatal = 0;
uint32_t seq = -1, nseq;
+ udp_multirecv_t um;
lfe->mi_display_name((mpegts_input_t*)lfe, buf, sizeof(buf));
tvhpoll_add(efd, ev, 4);
rtsp->hc_efd = efd;
- /* Read */
- memset(&msg, 0, sizeof(msg));
- for (i = 0; i < PKTS; i++) {
- msg[i].msg_hdr.msg_iov = &iov[i];
- msg[i].msg_hdr.msg_iovlen = 1;
- iov[i].iov_base = tsb[i];
- iov[i].iov_len = sizeof(tsb[0]);
- }
-
-
r = satip_rtsp_setup(rtsp,
lfe->sf_position, lfe->sf_number,
lfe->sf_rtp_port, &lm->lm_tuning,
return NULL;
}
+ udp_multirecv_init(&um, RTP_PKTS, RTP_PKT_SIZE);
sbuf_init_fixed(&sb, 18800);
while (tvheadend_running && !fatal) {
nfds = tvhpoll_wait(efd, ev, 1, ms);
if (nfds > 0 && ev[0].data.ptr == NULL) {
- c = read(lfe->sf_dvr_pipe.rd, tsb[0], 1);
- if (c == 1 && tsb[0][0] == 'c') {
+ c = read(lfe->sf_dvr_pipe.rd, rtcp, 1);
+ if (c == 1 && rtcp[0] == 'c') {
ms = 20;
changing = 1;
continue;
if (ev[0].data.ptr != lfe->sf_rtp)
continue;
- tc = recvmmsg(lfe->sf_rtp->fd, msg, PKTS, MSG_DONTWAIT, NULL);
+ tc = udp_multirecv_read(&um, lfe->sf_rtp->fd, RTP_PKTS, &iovec);
if (tc < 0) {
if (errno == EAGAIN || errno == EINTR || errno == EWOULDBLOCK)
tvhlog(LOG_WARNING, "satip", "%s - recvmsg() EOVERFLOW", buf);
continue;
}
- tvhlog(LOG_ERR, "satip", "%s - recv() error %d (%s)",
+ tvhlog(LOG_ERR, "satip", "%s - multirecv error %d (%s)",
buf, errno, strerror(errno));
break;
}
for (i = 0; i < tc; i++) {
- p = tsb[i];
- c = msg[i].msg_len;
+ p = iovec[i].iov_base;
+ c = iovec[i].iov_len;
/* Strip RTP header */
if (c < 12)
}
sbuf_free(&sb);
+ udp_multirecv_free(&um);
ev[0].events = TVHPOLL_IN;
ev[0].fd = lfe->sf_rtp->fd;
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
+#define _GNU_SOURCE
#include "tvheadend.h"
#include "udp.h"
q->hq_size = 0;
return r;
}
+
+/*
+ * UDP multi packet receive support
+ */
+
+#ifndef CONFIG_RECVMMSG
+
+#ifdef __linux__
+
+/* define the syscall - works only for linux */
+
+#include <linux/unistd.h>
+
+struct mmsghdr {
+ struct msghdr msg_hdr;
+ unsigned int msg_len;
+};
+
+int recvmmsg(int sockfd, struct mmsghdr *msgvec, unsigned int vlen,
+ unsigned int flags, struct timespec *timeout);
+
+#ifdef __NR_recvmmsg
+
+int recvmmsg(int sockfd, struct mmsghdr *msgvec, unsigned int vlen,
+ unsigned int flags, struct timespec *timeout)
+{
+ return syscall(__NR_recvmmsg, sockfd, msgvec, vlen, flags, timeout);
+}
+
+#else
+
+#undef PKTS
+#define PKTS 1
+/* receive only single packet */
+int recvmmsg(int sockfd, struct mmsghdr *msgvec, unsigned int vlen,
+ unsigned int flags, struct timespec *timeout)
+{
+ ssize_t r = recvmsg(sockfd, &msgvec->msg_hdr, flags);
+ if (r < 0)
+ return r;
+ msgvec->msg_len = r;
+ return 1;
+}
+
+#endif
+
+#else /* not __linux__ */
+
+#error "Add recvmmsg() support for your platform!!!"
+
+#endif
+
+#endif /* !CONFIG_RECVMMSG */
+
+void
+udp_multirecv_init( udp_multirecv_t *um, int packets, int psize )
+{
+ int i;
+
+ um->um_psize = psize;
+ um->um_packets = packets;
+ um->um_data = malloc(packets * psize);
+ um->um_iovec = malloc(packets * sizeof(struct iovec));
+ um->um_riovec = malloc(packets * sizeof(struct iovec));
+ um->um_msg = calloc(packets, sizeof(struct mmsghdr));
+ for (i = 0; i < packets; i++) {
+ ((struct mmsghdr *)um->um_msg)[i].msg_hdr.msg_iov = &um->um_iovec[i];
+ ((struct mmsghdr *)um->um_msg)[i].msg_hdr.msg_iovlen = 1;
+ um->um_iovec[i].iov_base = /* follow thru */
+ um->um_riovec[i].iov_base = um->um_data + i * psize;
+ um->um_iovec[i].iov_len = psize;
+ }
+}
+
+void
+udp_multirecv_free( udp_multirecv_t *um )
+{
+ free(um->um_msg); um->um_msg = NULL;
+ free(um->um_iovec); um->um_iovec = NULL;
+ free(um->um_data); um->um_data = NULL;
+ um->um_psize = 0;
+ um->um_packets = 0;
+}
+
+int
+udp_multirecv_read( udp_multirecv_t *um, int fd, int packets,
+ struct iovec **iovec )
+{
+ int n, i;
+ if (packets > um->um_packets)
+ packets = um->um_packets;
+ n = recvmmsg(fd, (struct mmsghdr *)um->um_msg, packets, MSG_DONTWAIT, NULL);
+ if (n > 0) {
+ for (i = 0; i < n; i++)
+ um->um_riovec[i].iov_len = ((struct mmsghdr *)um->um_msg)[i].msg_len;
+ *iovec = um->um_riovec;
+ }
+ return n;
+}