]> git.ipfire.org Git - thirdparty/tvheadend.git/commitdiff
http/satip server: change RTP/TCP data queuing to avoid dead-locks, fixes #4226
authorJaroslav Kysela <perex@perex.cz>
Thu, 10 Aug 2017 13:56:42 +0000 (15:56 +0200)
committerJaroslav Kysela <perex@perex.cz>
Mon, 18 Sep 2017 13:11:49 +0000 (15:11 +0200)
src/htsbuf.c
src/http.c
src/http.h
src/satip/rtp.c
src/satip/rtsp.c
src/satip/server.c
src/satip/server.h
src/webui/webui.c

index 446c37791f6e6a95741bd1e8f0c29326859c8ff2..22f029a6f3027606991ddb99539bed1c81c5e95b 100644 (file)
@@ -67,6 +67,7 @@ htsbuf_queue_free(htsbuf_queue_t *hq)
 void
 htsbuf_data_free(htsbuf_queue_t *hq, htsbuf_data_t *hd)
 {
+  hq->hq_size -= hd->hd_data_size - hd->hd_data_off;
   TAILQ_REMOVE(&hq->hq_q, hd, hd_link);
   free(hd->hd_data);
   free(hd);
index 65c30387b4c126811649cc203678907f69e43ca9..c9aa241c14d97503c40dc6d6eef1aabdf5d161e2 100644 (file)
@@ -325,7 +325,7 @@ http_send_header(http_connection_t *hc, int rc, const char *content,
   time_t t;
   int sess = 0;
 
-  lock_assert(&hc->hc_fd_lock);
+  assert(atomic_get(&hc->hc_extra_insend) > 0);
 
   htsbuf_queue_init(&hdrs, 0);
 
@@ -525,7 +525,7 @@ http_send_reply(http_connection_t *hc, int rc, const char *content,
   }
 #endif
 
-  pthread_mutex_lock(&hc->hc_fd_lock);
+  http_send_begin(hc);
   http_send_header(hc, rc, content, size,
                   encoding, location, maxage, 0, NULL, NULL);
   
@@ -535,7 +535,7 @@ http_send_reply(http_connection_t *hc, int rc, const char *content,
     else
       tvh_write(hc->hc_fd, data, size);
   }
-  pthread_mutex_unlock(&hc->hc_fd_lock);
+  http_send_end(hc);
 
   free(data);
 }
@@ -690,6 +690,131 @@ http_css_import(http_connection_t *hc, const char *location)
   http_send_reply(hc, HTTP_STATUS_OK, "text/css", NULL, loc, 0);
 }
 
+/**
+ *
+ */
+void
+http_extra_destroy(http_connection_t *hc)
+{
+  htsbuf_data_t *hd, *hd_next;
+
+  pthread_mutex_lock(&hc->hc_extra_lock);
+  for (hd = TAILQ_FIRST(&hc->hc_extra.hq_q); hd; hd = hd_next) {
+    hd_next = TAILQ_NEXT(hd, hd_link);
+    if (hd->hd_data_off <= 0) {
+      htsbuf_data_free(&hc->hc_extra, hd);
+      atomic_dec(&hc->hc_extra_chunks, 1);
+    }
+  }
+  pthread_mutex_unlock(&hc->hc_extra_lock);
+}
+
+/**
+ *
+ */
+int
+http_extra_flush(http_connection_t *hc)
+{
+  htsbuf_data_t *hd;
+  int r = 0, serr;
+
+  if (atomic_add(&hc->hc_extra_insend, 1) != 0)
+    goto fin;
+
+  while (1) {
+    pthread_mutex_lock(&hc->hc_extra_lock);
+    hd = TAILQ_FIRST(&hc->hc_extra.hq_q);
+    do {
+      r = send(hc->hc_fd, hd->hd_data + hd->hd_data_off,
+               hd->hd_data_size - hd->hd_data_off,
+               MSG_DONTWAIT | (TAILQ_NEXT(hd, hd_link) ? MSG_MORE : 0));
+      serr = errno;
+    } while (r < 0 && serr == EINTR);
+    if (r + hd->hd_data_off >= hd->hd_data_size) {
+      atomic_dec(&hc->hc_extra_chunks, 1);
+      htsbuf_data_free(&hc->hc_extra, hd);
+    } else if (r > 0) {
+      hd->hd_data_off += r;
+      hc->hc_extra.hq_size -= r;
+    }
+    pthread_mutex_unlock(&hc->hc_extra_lock);
+
+    if (r < 0) {
+      if (ERRNO_AGAIN(serr))
+        r = 0;
+      break;
+    }
+  }
+
+fin:
+  atomic_dec(&hc->hc_extra_insend, 1);
+  return r;
+}
+
+/**
+ *
+ */
+int
+http_extra_flush_partial(http_connection_t *hc)
+{
+  htsbuf_data_t *hd;
+  int r = 0;
+  unsigned int off, size;
+  void *data = NULL;
+
+  atomic_add(&hc->hc_extra_insend, 1);
+
+  pthread_mutex_lock(&hc->hc_extra_lock);
+  hd = TAILQ_FIRST(&hc->hc_extra.hq_q);
+  if (hd && hd->hd_data_off > 0) {
+    data = hd->hd_data;
+    hd->hd_data = NULL;
+    off = hd->hd_data_off;
+    size = hd->hd_data_size;
+    atomic_dec(&hc->hc_extra_chunks, 1);
+    htsbuf_data_free(&hc->hc_extra, hd);
+  }
+  pthread_mutex_unlock(&hc->hc_extra_lock);
+  if (data == NULL)
+    goto finish;
+
+  r = tvh_write(hc->hc_fd, data + off, size - off);
+  free(data);
+
+finish:
+  atomic_dec(&hc->hc_extra_insend, 1);
+  return r;
+}
+
+/**
+ *
+ */
+int
+http_extra_send(http_connection_t *hc, const void *data, size_t data_len)
+{
+  uint8_t *b = malloc(data_len);
+  memcpy(b, data, data_len);
+  return http_extra_send_prealloc(hc, b, data_len);
+}
+
+/**
+ *
+ */
+int
+http_extra_send_prealloc(http_connection_t *hc, const void *data, size_t data_len)
+{
+  if (data == NULL) return 0;
+  pthread_mutex_lock(&hc->hc_extra_lock);
+  if (hc->hc_extra.hq_size <= 1024*1024) {
+    atomic_add(&hc->hc_extra_chunks, 1);
+    htsbuf_append_prealloc(&hc->hc_extra, data, data_len);
+  } else {
+    free((void *)data);
+  }
+  pthread_mutex_unlock(&hc->hc_extra_lock);
+  return http_extra_flush(hc);
+}
+
 /**
  *
  */
@@ -979,10 +1104,10 @@ dump_request(http_connection_t *hc)
 static int
 http_cmd_options(http_connection_t *hc)
 {
-  pthread_mutex_lock(&hc->hc_fd_lock);
+  http_send_begin(hc);
   http_send_header(hc, HTTP_STATUS_OK, NULL, INT64_MIN,
                   NULL, NULL, -1, 0, NULL, NULL);
-  pthread_mutex_unlock(&hc->hc_fd_lock);
+  http_send_end(hc);
   return 0;
 }
 
@@ -1413,11 +1538,14 @@ http_serve_requests(http_connection_t *hc)
   char *argv[3], *c, *s, *cmdline = NULL, *hdrline = NULL;
   int n, r, delim;
 
-  pthread_mutex_init(&hc->hc_fd_lock, NULL);
+  pthread_mutex_init(&hc->hc_extra_lock, NULL);
   http_arg_init(&hc->hc_args);
   http_arg_init(&hc->hc_req_args);
   htsbuf_queue_init(&spill, 0);
   htsbuf_queue_init(&hc->hc_reply, 0);
+  htsbuf_queue_init(&hc->hc_extra, 0);
+  atomic_set(&hc->hc_extra_insend, 0);
+  atomic_set(&hc->hc_extra_chunks, 0);
 
   do {
     hc->hc_no_output  = 0;
@@ -1528,6 +1656,7 @@ error:
   free(hdrline);
   free(cmdline);
   htsbuf_queue_flush(&spill);
+  htsbuf_queue_flush(&hc->hc_extra);
 
   free(hc->hc_nonce);
   hc->hc_nonce = NULL;
index 36c8e97bfd1c8c4285a6de8082a91f709a8cbe3d..4eb7546a89cb7886bc2f61c82c2e8e74fb7420f5 100644 (file)
@@ -22,7 +22,8 @@
 #include "htsbuf.h"
 #include "url.h"
 #include "tvhpoll.h"
-  #include "access.h"
+#include "access.h"
+#include "atomic.h"
 
 struct channel;
 struct http_path;
@@ -122,7 +123,6 @@ typedef enum http_ver {
 } http_ver_t;
 
 typedef struct http_connection {
-  pthread_mutex_t hc_fd_lock;
   int hc_fd;
   struct sockaddr_storage *hc_peer;
   char *hc_peer_ipstr;
@@ -139,6 +139,11 @@ typedef struct http_connection {
 
   htsbuf_queue_t  hc_reply;
 
+  int             hc_extra_insend;
+  pthread_mutex_t hc_extra_lock;
+  int             hc_extra_chunks;
+  htsbuf_queue_t  hc_extra;
+
   http_arg_list_t hc_args;
 
   http_arg_list_t hc_req_args; /* Argumets from GET or POST request */
@@ -204,6 +209,30 @@ void http_redirect(http_connection_t *hc, const char *location,
 
 void http_css_import(http_connection_t *hc, const char *location);
 
+void http_extra_destroy(http_connection_t *hc);
+
+int http_extra_flush(http_connection_t *hc);
+
+int http_extra_flush_partial(http_connection_t *hc);
+
+int http_extra_send(http_connection_t *hc, const void *data, size_t data_len);
+
+int http_extra_send_prealloc(http_connection_t *hc, const void *data, size_t data_len);
+
+static inline void http_send_begin(http_connection_t *hc)
+{
+  if (atomic_get(&hc->hc_extra_chunks) > 0)
+    http_extra_flush_partial(hc);
+  atomic_add(&hc->hc_extra_insend, 1);
+}
+
+static inline void http_send_end(http_connection_t *hc)
+{
+  atomic_dec(&hc->hc_extra_insend, 1);
+  if (atomic_get(&hc->hc_extra_chunks) > 0)
+    http_extra_flush(hc);
+}
+
 void http_send_header(http_connection_t *hc, int rc, const char *content, 
                      int64_t contentlen, const char *encoding,
                      const char *location, int maxage, const char *range,
index d6f6fb07fc562f8beaa271b2a5bb90b3161556df..2c4f8f3078b8be3c1e6357d107ac88d96fc27c14 100644 (file)
@@ -68,7 +68,7 @@ typedef struct satip_rtp_session {
   signal_status_t sig;
   int sig_lock;
   pthread_mutex_t lock;
-  pthread_mutex_t *tcp_lock;
+  http_connection_t *hc;
   uint8_t *table_data;
   int table_data_len;
   void (*no_data_cb)(void *opaque);
@@ -264,18 +264,12 @@ found:
 static int
 satip_rtp_tcp_data(satip_rtp_session_t *rtp, uint8_t stream, uint8_t *data, size_t data_len)
 {
-  int r = 0;
-
   assert(data_len <= 0xffff);
   data[0] = '$';
   data[1] = stream;
   data[2] = (data_len - 4) >> 8;
   data[3] = (data_len - 4) & 0xff;
-  pthread_mutex_lock(rtp->tcp_lock);
-  if (!tvh_write(rtp->fd_rtp, data, data_len))
-    r = errno;
-  pthread_mutex_unlock(rtp->tcp_lock);
-  return r;
+  return http_extra_send_prealloc(rtp->hc, data, data_len);
 }
 
 static inline int
@@ -286,7 +280,8 @@ satip_rtp_flush_tcp_data(satip_rtp_session_t *rtp)
 
   if (v->iov_len)
     r = satip_rtp_tcp_data(rtp, 0, v->iov_base, v->iov_len);
-  free(v->iov_base);
+  else
+    free(v->iov_base);
   v->iov_base = NULL;
   v->iov_len = 0;
   return r;
@@ -455,7 +450,7 @@ satip_rtp_thread(void *aux)
  */
 void *satip_rtp_queue(th_subscription_t *subs,
                       streaming_queue_t *sq,
-                      pthread_mutex_t *tcp_lock,
+                      http_connection_t *hc,
                       struct sockaddr_storage *peer, int port,
                       int fd_rtp, int fd_rtcp,
                       int frontend, int source, dvb_mux_conf_t *dmc,
@@ -478,7 +473,9 @@ void *satip_rtp_queue(th_subscription_t *subs,
   rtp->fd_rtcp = fd_rtcp;
   rtp->subs = subs;
   rtp->sq = sq;
-  rtp->tcp_lock = tcp_lock;
+  rtp->hc = hc;
+  rtp->tcp_payload = RTP_TCP_PAYLOAD;
+  rtp->tcp_buffer_size = 16*1024*1024;
   rtp->no_data_cb = no_data_cb;
   rtp->no_data_opaque = no_data_opaque;
   atomic_set(&rtp->allow_data, allow_data);
@@ -594,11 +591,9 @@ void satip_rtp_close(void *_rtp)
   tvh_cond_signal(&sq->sq_cond, 0);
   pthread_mutex_unlock(&sq->sq_mutex);
   pthread_mutex_unlock(&satip_rtp_lock);
-  if (rtp->port == RTSP_TCP_DATA)
-    pthread_mutex_lock(rtp->tcp_lock);
   pthread_join(rtp->tid, NULL);
   if (rtp->port == RTSP_TCP_DATA) {
-    pthread_mutex_unlock(rtp->tcp_lock);
+    http_extra_destroy(rtp->hc);
     free(rtp->tcp_data.iov_base);
   } else {
     udp_multisend_free(&rtp->um);
index 94fa80e50a9417817fcd357b7ad89ca7d4238a57..03d7ad1f77f1782855a1fe9a726b8e6c40e69755 100644 (file)
@@ -227,8 +227,6 @@ rtsp_session_timer_cb(void *aux)
   tvhwarn(LS_SATIPS, "-/%s/%i: session closed (timeout)", rs->session, rs->stream);
   pthread_mutex_unlock(&global_lock);
   pthread_mutex_lock(&rtsp_lock);
-  if (rs->rtp_peer_port == RTSP_TCP_DATA && rs->tcp_data)
-    shutdown(rs->tcp_data->hc_fd, SHUT_RDWR);
   rtsp_close_session(rs);
   rtsp_free_session(rs);
   pthread_mutex_unlock(&rtsp_lock);
@@ -630,7 +628,7 @@ pids:
     rs->no_data = 0;
     rs->rtp_handle =
       satip_rtp_queue(rs->subs, &rs->prch.prch_sq,
-                      &hc->hc_fd_lock, hc->hc_peer, rs->rtp_peer_port,
+                      hc, hc->hc_peer, rs->rtp_peer_port,
                       rs->udp_rtp ? rs->udp_rtp->fd : hc->hc_fd,
                       rs->udp_rtcp ? rs->udp_rtcp->fd : -1,
                       rs->findex, rs->src, &rs->dmc_tuned,
@@ -1217,9 +1215,9 @@ rtsp_process_options(http_connection_t *hc)
   http_arg_set(&args, "Public", "OPTIONS,DESCRIBE,SETUP,PLAY,TEARDOWN");
   if (hc->hc_session)
     http_arg_set(&args, "Session", hc->hc_session);
-  pthread_mutex_lock(&hc->hc_fd_lock);
+  http_send_begin(hc);
   http_send_header(hc, HTTP_STATUS_OK, NULL, 0, NULL, NULL, 0, NULL, NULL, &args);
-  pthread_mutex_unlock(&hc->hc_fd_lock);
+  http_send_end(hc);
   http_arg_flush(&args);
   return 0;
 
@@ -1360,11 +1358,11 @@ rtsp_process_describe(http_connection_t *hc)
   else
     snprintf(buf, sizeof(buf), "rtsp://%s", rtsp_ip);
   http_arg_set(&args, "Content-Base", buf);
-  pthread_mutex_lock(&hc->hc_fd_lock);
+  http_send_begin(hc);
   http_send_header(hc, HTTP_STATUS_OK, "application/sdp", q.hq_size,
                    NULL, NULL, 0, NULL, NULL, &args);
   tcp_write_queue(hc->hc_fd, &q);
-  pthread_mutex_unlock(&hc->hc_fd_lock);
+  http_send_end(hc);
   http_arg_flush(&args);
   htsbuf_queue_flush(&q);
   return 0;
@@ -1444,9 +1442,9 @@ rtsp_process_play(http_connection_t *hc, int cmd)
 
   pthread_mutex_unlock(&rtsp_lock);
 
-  pthread_mutex_lock(&hc->hc_fd_lock);
+  http_send_begin(hc);
   http_send_header(hc, HTTP_STATUS_OK, NULL, 0, NULL, NULL, 0, NULL, NULL, &args);
-  pthread_mutex_unlock(&hc->hc_fd_lock);
+  http_send_end(hc);
 
   goto end;
 
@@ -1498,9 +1496,9 @@ rtsp_process_teardown(http_connection_t *hc)
     pthread_mutex_unlock(&rtsp_lock);
     http_arg_init(&args);
     http_arg_set(&args, "Session", session);
-    pthread_mutex_lock(&hc->hc_fd_lock);
+    http_send_begin(hc);
     http_send_header(hc, HTTP_STATUS_OK, NULL, 0, NULL, NULL, 0, NULL, NULL, NULL);
-    pthread_mutex_unlock(&hc->hc_fd_lock);
+    http_send_end(hc);
     http_arg_flush(&args);
   }
   return 0;
@@ -1597,8 +1595,6 @@ rtsp_serve(int fd, void **opaque, struct sockaddr_storage *peer,
 
   http_serve_requests(&hc);
 
-  shutdown(fd, SHUT_RDWR);
-
   rtsp_flush_requests(&hc);
 
   close(fd);
index feca0a8819613823d3467777ad91a2ccc6560995..840b4b27dc4eed7732c8e42d3c316dd2b9f90224 100644 (file)
@@ -216,10 +216,10 @@ satip_server_http_xml(http_connection_t *hc)
     snprintf(buf2, sizeof(buf2), "%d", srcs);
     http_arg_set(&args, "X-SATIP-Sources", buf2);
   }
-  pthread_mutex_lock(&hc->hc_fd_lock);
+  http_send_begin(hc);
   http_send_header(hc, 200, "text/xml", strlen(buf), 0, NULL, 10, 0, NULL, &args);
   tvh_write(hc->hc_fd, buf, strlen(buf));
-  pthread_mutex_unlock(&hc->hc_fd_lock);
+  http_send_end(hc);
   http_arg_flush(&args);
 
   return 0;
index d0c381940ffa2b28ba276e632f3e8f81a8320a02..d22fa05da79444a56cc27b893f0fdc898271e66d 100644 (file)
@@ -68,7 +68,7 @@ extern const idclass_t satip_server_class;
 
 void *satip_rtp_queue(th_subscription_t *subs,
                       streaming_queue_t *sq,
-                      pthread_mutex_t *tcp_lock,
+                      http_connection_t *hc,
                       struct sockaddr_storage *peer, int port,
                       int fd_rtp, int fd_rtcp,
                       int frontend, int source,
index 8e53816640d4574fd2b712fa291f346f5f2db3c6..e9d03c9fc8b2ced3bc4d6d4a807807ebf59d56aa 100644 (file)
@@ -331,7 +331,7 @@ page_static_file(http_connection_t *hc, const char *_remain, void *opaque)
   if (!gzip && fb_gzipped(fp))
     gzip = "gzip";
 
-  pthread_mutex_lock(&hc->hc_fd_lock);
+  http_send_begin(hc);
   http_send_header(hc, 200, content, size, gzip, NULL, 10, 0, NULL, NULL);
   while (!fb_eof(fp)) {
     ssize_t c = fb_read(fp, buf, sizeof(buf));
@@ -344,7 +344,7 @@ page_static_file(http_connection_t *hc, const char *_remain, void *opaque)
       break;
     }
   }
-  pthread_mutex_unlock(&hc->hc_fd_lock);
+  http_send_end(hc);
   fb_close(fp);
 
   return ret;
@@ -1438,10 +1438,10 @@ page_xspf(http_connection_t *hc, const char *remain, void *opaque)
   pthread_mutex_unlock(&global_lock);
 
   len = strlen(buf);
-  pthread_mutex_lock(&hc->hc_fd_lock);
+  http_send_begin(hc);
   http_send_header(hc, 200, "application/xspf+xml", len, 0, NULL, 10, 0, NULL, NULL);
   tvh_write(hc->hc_fd, buf, len);
-  pthread_mutex_unlock(&hc->hc_fd_lock);
+  http_send_end(hc);
 
   free(hostpath);
   return 0;
@@ -1481,10 +1481,10 @@ page_m3u(http_connection_t *hc, const char *remain, void *opaque)
   pthread_mutex_unlock(&global_lock);
 
   len = strlen(buf);
-  pthread_mutex_lock(&hc->hc_fd_lock);
+  http_send_begin(hc);
   http_send_header(hc, 200, MIME_M3U, len, 0, NULL, 10, 0, NULL, NULL);
   tvh_write(hc->hc_fd, buf, len);
-  pthread_mutex_unlock(&hc->hc_fd_lock);
+  http_send_end(hc);
 
   free(hostpath);
   return 0;
@@ -1645,7 +1645,7 @@ http_serve_file(http_connection_t *hc, const char *fname,
     }
   }
 
-  pthread_mutex_lock(&hc->hc_fd_lock);
+  http_send_begin(hc);
   http_send_header(hc, range ? HTTP_STATUS_PARTIAL_CONTENT : HTTP_STATUS_OK,
        content, content_len, NULL, NULL, 10, 
        range ? range_buf : NULL, disposition, NULL);
@@ -1671,7 +1671,7 @@ http_serve_file(http_connection_t *hc, const char *fname,
         stats(hc, r, opaque);
     }
   }
-  pthread_mutex_unlock(&hc->hc_fd_lock);
+  http_send_end(hc);
   close(fd);
 
   return ret;
@@ -1899,10 +1899,10 @@ http_redir(http_connection_t *hc, const char *remain, void *opaque)
         }
       }
       snprintf(buf, sizeof(buf), "tvh_locale={};tvh_locale_lang='';");
-      pthread_mutex_lock(&hc->hc_fd_lock);
+      http_send_begin(hc);
       http_send_header(hc, 200, "text/javascript; charset=UTF-8", strlen(buf), 0, NULL, 10, 0, NULL, NULL);
       tvh_write(hc->hc_fd, buf, strlen(buf));
-      pthread_mutex_unlock(&hc->hc_fd_lock);
+      http_send_end(hc);
       return 0;
     }
     if (!strcmp(components[0], "theme.css")) {