]> git.ipfire.org Git - thirdparty/tvheadend.git/commitdiff
udp: create multirecv interface
authorJaroslav Kysela <perex@perex.cz>
Fri, 18 Apr 2014 21:41:20 +0000 (23:41 +0200)
committerJaroslav Kysela <perex@perex.cz>
Mon, 5 May 2014 20:00:37 +0000 (22:00 +0200)
src/input/mpegts/satip/satip_frontend.c
src/udp.c
src/udp.h

index 19c53c8e0f2a2eb693fda2122f177f4a0eba90bd..430a0cfcbdbcbe67202a51f65c4875a0fb9c1bcf 100644 (file)
@@ -17,8 +17,6 @@
  *  along with this program.  If not, see <http://www.gnu.org/licenses/>.
  */
 
-#define _GNU_SOURCE
-#include <sys/socket.h>
 #include <fcntl.h>
 #include "tvheadend.h"
 #include "tvhpoll.h"
 #include "http.h"
 #include "satip_private.h"
 
-#define PKTS 64
-
-#ifndef CONFIG_RECVMMSG
-
-#ifdef __linux__
-
-/* define the syscall - works only for linux */
-
-#include <linux/unistd.h>
-
-struct mmsghdr {
-  struct msghdr msg_hdr;
-  unsigned int  msg_len;
-};
-
-int recvmmsg(int sockfd, struct mmsghdr *msgvec, unsigned int vlen,
-             unsigned int flags, struct timespec *timeout);
-
-#ifdef __NR_recvmmsg
-
-int recvmmsg(int sockfd, struct mmsghdr *msgvec, unsigned int vlen,
-             unsigned int flags, struct timespec *timeout)
-{
-  return syscall(__NR_recvmmsg, sockfd, msgvec, vlen, flags, timeout);
-}
-
-#else
-
-#undef PKTS
-#define PKTS 1
-/* receive only single packet */
-int recvmmsg(int sockfd, struct mmsghdr *msgvec, unsigned int vlen,
-             unsigned int flags, struct timespec *timeout)
-{
-  ssize_t r = recvmsg(sockfd, &msgvec->msg_hdr, flags);
-  if (r < 0)
-    return r;
-  msgvec->msg_len = r;
-  return 1;
-}
-
-#endif
-
-#else /* not __linux__ */
-
-#error "Add recvmmsg() support for your platform!!!"
-
-#endif
-
-#endif /* !CONFIG_RECVMMSG */
-
 static int
 satip_frontend_tune1
   ( satip_frontend_t *lfe, mpegts_mux_instance_t *mmi );
@@ -777,18 +724,18 @@ satip_frontend_pid_changed( http_client_t *rtsp,
 static void *
 satip_frontend_input_thread ( void *aux )
 {
+#define RTP_PKTS      64
+#define RTP_PKT_SIZE  1472  /* this is maximum UDP payload (standard ethernet) */
 #define HTTP_CMD_NONE 9874
   satip_frontend_t *lfe = aux;
   mpegts_mux_instance_t *mmi = lfe->sf_mmi;
   http_client_t *rtsp;
   dvb_mux_t *lm;
   char buf[256];
-  uint8_t tsb[PKTS][1356 + 128];
+  struct iovec *iovec;
   uint8_t rtcp[2048];
   uint8_t *p;
   sbuf_t sb;
-  struct iovec   iov[PKTS];
-  struct mmsghdr msg[PKTS];
   int pos, nfds, i, r;
   size_t c;
   int tc;
@@ -796,6 +743,7 @@ satip_frontend_input_thread ( void *aux )
   tvhpoll_t *efd;
   int changing = 0, ms = -1, fatal = 0;
   uint32_t seq = -1, nseq;
+  udp_multirecv_t um;
 
   lfe->mi_display_name((mpegts_input_t*)lfe, buf, sizeof(buf));
 
@@ -827,16 +775,6 @@ satip_frontend_input_thread ( void *aux )
   tvhpoll_add(efd, ev, 4);
   rtsp->hc_efd = efd;
 
-  /* Read */
-  memset(&msg, 0, sizeof(msg));
-  for (i = 0; i < PKTS; i++) {
-    msg[i].msg_hdr.msg_iov    = &iov[i];
-    msg[i].msg_hdr.msg_iovlen = 1;
-    iov[i].iov_base           = tsb[i];
-    iov[i].iov_len            = sizeof(tsb[0]);
-  }
-
-
   r = satip_rtsp_setup(rtsp,
                        lfe->sf_position, lfe->sf_number,
                        lfe->sf_rtp_port, &lm->lm_tuning,
@@ -846,6 +784,7 @@ satip_frontend_input_thread ( void *aux )
     return NULL;
   }
 
+  udp_multirecv_init(&um, RTP_PKTS, RTP_PKT_SIZE);
   sbuf_init_fixed(&sb, 18800);
 
   while (tvheadend_running && !fatal) {
@@ -853,8 +792,8 @@ satip_frontend_input_thread ( void *aux )
     nfds = tvhpoll_wait(efd, ev, 1, ms);
 
     if (nfds > 0 && ev[0].data.ptr == NULL) {
-      c = read(lfe->sf_dvr_pipe.rd, tsb[0], 1);
-      if (c == 1 && tsb[0][0] == 'c') {
+      c = read(lfe->sf_dvr_pipe.rd, rtcp, 1);
+      if (c == 1 && rtcp[0] == 'c') {
         ms = 20;
         changing = 1;
         continue;
@@ -932,7 +871,7 @@ satip_frontend_input_thread ( void *aux )
     if (ev[0].data.ptr != lfe->sf_rtp)
       continue;     
 
-    tc = recvmmsg(lfe->sf_rtp->fd, msg, PKTS, MSG_DONTWAIT, NULL);
+    tc = udp_multirecv_read(&um, lfe->sf_rtp->fd, RTP_PKTS, &iovec);
 
     if (tc < 0) {
       if (errno == EAGAIN || errno == EINTR || errno == EWOULDBLOCK)
@@ -941,14 +880,14 @@ satip_frontend_input_thread ( void *aux )
         tvhlog(LOG_WARNING, "satip", "%s - recvmsg() EOVERFLOW", buf);
         continue;
       }
-      tvhlog(LOG_ERR, "satip", "%s - recv() error %d (%s)",
+      tvhlog(LOG_ERR, "satip", "%s - multirecv error %d (%s)",
              buf, errno, strerror(errno));
       break;
     }
 
     for (i = 0; i < tc; i++) {
-      p = tsb[i];
-      c = msg[i].msg_len;
+      p = iovec[i].iov_base;
+      c = iovec[i].iov_len;
 
       /* Strip RTP header */
       if (c < 12)
@@ -980,6 +919,7 @@ satip_frontend_input_thread ( void *aux )
   }
 
   sbuf_free(&sb);
+  udp_multirecv_free(&um);
 
   ev[0].events             = TVHPOLL_IN;
   ev[0].fd                 = lfe->sf_rtp->fd;
index 335e0f25b69c7e8561a573abf5205556fc612e38..a0789f09b57e6cf04f9379f84315dd8c9ff60536 100644 (file)
--- a/src/udp.c
+++ b/src/udp.c
@@ -18,6 +18,7 @@
  *  along with this program.  If not, see <http://www.gnu.org/licenses/>.
  */
 
+#define _GNU_SOURCE
 #include "tvheadend.h"
 #include "udp.h"
 
@@ -380,3 +381,102 @@ udp_write_queue( udp_connection_t *uc, htsbuf_queue_t *q,
   q->hq_size = 0;
   return r;
 }
+
+/*
+ * UDP multi packet receive support
+ */
+
+#ifndef CONFIG_RECVMMSG
+
+#ifdef __linux__
+
+/* define the syscall - works only for linux */
+
+#include <linux/unistd.h>
+
+struct mmsghdr {
+  struct msghdr msg_hdr;
+  unsigned int  msg_len;
+};
+
+int recvmmsg(int sockfd, struct mmsghdr *msgvec, unsigned int vlen,
+             unsigned int flags, struct timespec *timeout);
+
+#ifdef __NR_recvmmsg
+
+int recvmmsg(int sockfd, struct mmsghdr *msgvec, unsigned int vlen,
+             unsigned int flags, struct timespec *timeout)
+{
+  return syscall(__NR_recvmmsg, sockfd, msgvec, vlen, flags, timeout);
+}
+
+#else
+
+#undef PKTS
+#define PKTS 1
+/* receive only single packet */
+int recvmmsg(int sockfd, struct mmsghdr *msgvec, unsigned int vlen,
+             unsigned int flags, struct timespec *timeout)
+{
+  ssize_t r = recvmsg(sockfd, &msgvec->msg_hdr, flags);
+  if (r < 0)
+    return r;
+  msgvec->msg_len = r;
+  return 1;
+}
+
+#endif
+
+#else /* not __linux__ */
+
+#error "Add recvmmsg() support for your platform!!!"
+
+#endif
+
+#endif /* !CONFIG_RECVMMSG */
+
+void
+udp_multirecv_init( udp_multirecv_t *um, int packets, int psize )
+{
+  int i;
+
+  um->um_psize   = psize;
+  um->um_packets = packets;
+  um->um_data    = malloc(packets * psize);
+  um->um_iovec   = malloc(packets * sizeof(struct iovec));
+  um->um_riovec  = malloc(packets * sizeof(struct iovec));
+  um->um_msg     = calloc(packets,  sizeof(struct mmsghdr));
+  for (i = 0; i < packets; i++) {
+    ((struct mmsghdr *)um->um_msg)[i].msg_hdr.msg_iov    = &um->um_iovec[i];
+    ((struct mmsghdr *)um->um_msg)[i].msg_hdr.msg_iovlen = 1;
+    um->um_iovec[i].iov_base  = /* follow thru */
+    um->um_riovec[i].iov_base = um->um_data + i * psize;
+    um->um_iovec[i].iov_len   = psize;
+  }
+}
+
+void
+udp_multirecv_free( udp_multirecv_t *um )
+{
+  free(um->um_msg);   um->um_msg   = NULL;
+  free(um->um_iovec); um->um_iovec = NULL;
+  free(um->um_data);  um->um_data  = NULL;
+  um->um_psize   = 0;
+  um->um_packets = 0;
+}
+
+int
+udp_multirecv_read( udp_multirecv_t *um, int fd, int packets,
+                    struct iovec **iovec )
+{
+  int n, i;
+  if (packets > um->um_packets)
+    packets = um->um_packets;
+  n = recvmmsg(fd, (struct mmsghdr *)um->um_msg, packets, MSG_DONTWAIT, NULL);
+  if (n > 0) {
+    for (i = 0; i < n; i++)
+      um->um_riovec[i].iov_len = ((struct mmsghdr *)um->um_msg)[i].msg_len;
+    *iovec = um->um_riovec;
+  }
+  return n;
+}
index 6f2fd90496875a9890c79120c321ed89d0db7fbb..c5fa2eef9a6cb677be5f9026c051a9f370f6c4b4 100644 (file)
--- a/src/udp.h
+++ b/src/udp.h
@@ -70,5 +70,22 @@ int
 udp_write_queue( udp_connection_t *uc, htsbuf_queue_t *q,
                  struct sockaddr_storage *storage );
 
+typedef struct udp_multirecv {
+  int             um_psize;
+  int             um_packets;
+  uint8_t        *um_data;
+  struct iovec   *um_iovec;
+  struct iovec   *um_riovec;
+  struct mmsghdr *um_msg;
+} udp_multirecv_t;
+
+void
+udp_multirecv_init( udp_multirecv_t *um, int packets, int psize );
+void
+udp_multirecv_free( udp_multirecv_t *um );
+int
+udp_multirecv_read( udp_multirecv_t *um, int fd, int packets,
+                    struct iovec **iovec );
+
 
 #endif /* UDP_H_ */