From a12c166bf38f909876d3b06b5f54174b51aff1ea Mon Sep 17 00:00:00 2001 From: Jaroslav Kysela Date: Thu, 10 Aug 2017 15:56:42 +0200 Subject: [PATCH] http/satip server: change RTP/TCP data queuing to avoid dead-locks, fixes #4226 --- src/htsbuf.c | 1 + src/http.c | 141 +++++++++++++++++++++++++++++++++++++++++++-- src/http.h | 33 ++++++++++- src/satip/rtp.c | 23 +++----- src/satip/rtsp.c | 22 +++---- src/satip/server.c | 4 +- src/satip/server.h | 2 +- src/webui/webui.c | 20 +++---- 8 files changed, 198 insertions(+), 48 deletions(-) diff --git a/src/htsbuf.c b/src/htsbuf.c index 446c37791..22f029a6f 100644 --- a/src/htsbuf.c +++ b/src/htsbuf.c @@ -67,6 +67,7 @@ htsbuf_queue_free(htsbuf_queue_t *hq) void htsbuf_data_free(htsbuf_queue_t *hq, htsbuf_data_t *hd) { + hq->hq_size -= hd->hd_data_size - hd->hd_data_off; TAILQ_REMOVE(&hq->hq_q, hd, hd_link); free(hd->hd_data); free(hd); diff --git a/src/http.c b/src/http.c index 65c30387b..c9aa241c1 100644 --- a/src/http.c +++ b/src/http.c @@ -325,7 +325,7 @@ http_send_header(http_connection_t *hc, int rc, const char *content, time_t t; int sess = 0; - lock_assert(&hc->hc_fd_lock); + assert(atomic_get(&hc->hc_extra_insend) > 0); htsbuf_queue_init(&hdrs, 0); @@ -525,7 +525,7 @@ http_send_reply(http_connection_t *hc, int rc, const char *content, } #endif - pthread_mutex_lock(&hc->hc_fd_lock); + http_send_begin(hc); http_send_header(hc, rc, content, size, encoding, location, maxage, 0, NULL, NULL); @@ -535,7 +535,7 @@ http_send_reply(http_connection_t *hc, int rc, const char *content, else tvh_write(hc->hc_fd, data, size); } - pthread_mutex_unlock(&hc->hc_fd_lock); + http_send_end(hc); free(data); } @@ -690,6 +690,131 @@ http_css_import(http_connection_t *hc, const char *location) http_send_reply(hc, HTTP_STATUS_OK, "text/css", NULL, loc, 0); } +/** + * + */ +void +http_extra_destroy(http_connection_t *hc) +{ + htsbuf_data_t *hd, *hd_next; + + pthread_mutex_lock(&hc->hc_extra_lock); + for (hd = TAILQ_FIRST(&hc->hc_extra.hq_q); hd; hd = hd_next) { + hd_next = TAILQ_NEXT(hd, hd_link); + if (hd->hd_data_off <= 0) { + htsbuf_data_free(&hc->hc_extra, hd); + atomic_dec(&hc->hc_extra_chunks, 1); + } + } + pthread_mutex_unlock(&hc->hc_extra_lock); +} + +/** + * + */ +int +http_extra_flush(http_connection_t *hc) +{ + htsbuf_data_t *hd; + int r = 0, serr; + + if (atomic_add(&hc->hc_extra_insend, 1) != 0) + goto fin; + + while (1) { + pthread_mutex_lock(&hc->hc_extra_lock); + hd = TAILQ_FIRST(&hc->hc_extra.hq_q); + do { + r = send(hc->hc_fd, hd->hd_data + hd->hd_data_off, + hd->hd_data_size - hd->hd_data_off, + MSG_DONTWAIT | (TAILQ_NEXT(hd, hd_link) ? MSG_MORE : 0)); + serr = errno; + } while (r < 0 && serr == EINTR); + if (r + hd->hd_data_off >= hd->hd_data_size) { + atomic_dec(&hc->hc_extra_chunks, 1); + htsbuf_data_free(&hc->hc_extra, hd); + } else if (r > 0) { + hd->hd_data_off += r; + hc->hc_extra.hq_size -= r; + } + pthread_mutex_unlock(&hc->hc_extra_lock); + + if (r < 0) { + if (ERRNO_AGAIN(serr)) + r = 0; + break; + } + } + +fin: + atomic_dec(&hc->hc_extra_insend, 1); + return r; +} + +/** + * + */ +int +http_extra_flush_partial(http_connection_t *hc) +{ + htsbuf_data_t *hd; + int r = 0; + unsigned int off, size; + void *data = NULL; + + atomic_add(&hc->hc_extra_insend, 1); + + pthread_mutex_lock(&hc->hc_extra_lock); + hd = TAILQ_FIRST(&hc->hc_extra.hq_q); + if (hd && hd->hd_data_off > 0) { + data = hd->hd_data; + hd->hd_data = NULL; + off = hd->hd_data_off; + size = hd->hd_data_size; + atomic_dec(&hc->hc_extra_chunks, 1); + htsbuf_data_free(&hc->hc_extra, hd); + } + pthread_mutex_unlock(&hc->hc_extra_lock); + if (data == NULL) + goto finish; + + r = tvh_write(hc->hc_fd, data + off, size - off); + free(data); + +finish: + atomic_dec(&hc->hc_extra_insend, 1); + return r; +} + +/** + * + */ +int +http_extra_send(http_connection_t *hc, const void *data, size_t data_len) +{ + uint8_t *b = malloc(data_len); + memcpy(b, data, data_len); + return http_extra_send_prealloc(hc, b, data_len); +} + +/** + * + */ +int +http_extra_send_prealloc(http_connection_t *hc, const void *data, size_t data_len) +{ + if (data == NULL) return 0; + pthread_mutex_lock(&hc->hc_extra_lock); + if (hc->hc_extra.hq_size <= 1024*1024) { + atomic_add(&hc->hc_extra_chunks, 1); + htsbuf_append_prealloc(&hc->hc_extra, data, data_len); + } else { + free((void *)data); + } + pthread_mutex_unlock(&hc->hc_extra_lock); + return http_extra_flush(hc); +} + /** * */ @@ -979,10 +1104,10 @@ dump_request(http_connection_t *hc) static int http_cmd_options(http_connection_t *hc) { - pthread_mutex_lock(&hc->hc_fd_lock); + http_send_begin(hc); http_send_header(hc, HTTP_STATUS_OK, NULL, INT64_MIN, NULL, NULL, -1, 0, NULL, NULL); - pthread_mutex_unlock(&hc->hc_fd_lock); + http_send_end(hc); return 0; } @@ -1413,11 +1538,14 @@ http_serve_requests(http_connection_t *hc) char *argv[3], *c, *s, *cmdline = NULL, *hdrline = NULL; int n, r, delim; - pthread_mutex_init(&hc->hc_fd_lock, NULL); + pthread_mutex_init(&hc->hc_extra_lock, NULL); http_arg_init(&hc->hc_args); http_arg_init(&hc->hc_req_args); htsbuf_queue_init(&spill, 0); htsbuf_queue_init(&hc->hc_reply, 0); + htsbuf_queue_init(&hc->hc_extra, 0); + atomic_set(&hc->hc_extra_insend, 0); + atomic_set(&hc->hc_extra_chunks, 0); do { hc->hc_no_output = 0; @@ -1528,6 +1656,7 @@ error: free(hdrline); free(cmdline); htsbuf_queue_flush(&spill); + htsbuf_queue_flush(&hc->hc_extra); free(hc->hc_nonce); hc->hc_nonce = NULL; diff --git a/src/http.h b/src/http.h index 36c8e97bf..4eb7546a8 100644 --- a/src/http.h +++ b/src/http.h @@ -22,7 +22,8 @@ #include "htsbuf.h" #include "url.h" #include "tvhpoll.h" - #include "access.h" +#include "access.h" +#include "atomic.h" struct channel; struct http_path; @@ -122,7 +123,6 @@ typedef enum http_ver { } http_ver_t; typedef struct http_connection { - pthread_mutex_t hc_fd_lock; int hc_fd; struct sockaddr_storage *hc_peer; char *hc_peer_ipstr; @@ -139,6 +139,11 @@ typedef struct http_connection { htsbuf_queue_t hc_reply; + int hc_extra_insend; + pthread_mutex_t hc_extra_lock; + int hc_extra_chunks; + htsbuf_queue_t hc_extra; + http_arg_list_t hc_args; http_arg_list_t hc_req_args; /* Argumets from GET or POST request */ @@ -204,6 +209,30 @@ void http_redirect(http_connection_t *hc, const char *location, void http_css_import(http_connection_t *hc, const char *location); +void http_extra_destroy(http_connection_t *hc); + +int http_extra_flush(http_connection_t *hc); + +int http_extra_flush_partial(http_connection_t *hc); + +int http_extra_send(http_connection_t *hc, const void *data, size_t data_len); + +int http_extra_send_prealloc(http_connection_t *hc, const void *data, size_t data_len); + +static inline void http_send_begin(http_connection_t *hc) +{ + if (atomic_get(&hc->hc_extra_chunks) > 0) + http_extra_flush_partial(hc); + atomic_add(&hc->hc_extra_insend, 1); +} + +static inline void http_send_end(http_connection_t *hc) +{ + atomic_dec(&hc->hc_extra_insend, 1); + if (atomic_get(&hc->hc_extra_chunks) > 0) + http_extra_flush(hc); +} + void http_send_header(http_connection_t *hc, int rc, const char *content, int64_t contentlen, const char *encoding, const char *location, int maxage, const char *range, diff --git a/src/satip/rtp.c b/src/satip/rtp.c index d6f6fb07f..2c4f8f307 100644 --- a/src/satip/rtp.c +++ b/src/satip/rtp.c @@ -68,7 +68,7 @@ typedef struct satip_rtp_session { signal_status_t sig; int sig_lock; pthread_mutex_t lock; - pthread_mutex_t *tcp_lock; + http_connection_t *hc; uint8_t *table_data; int table_data_len; void (*no_data_cb)(void *opaque); @@ -264,18 +264,12 @@ found: static int satip_rtp_tcp_data(satip_rtp_session_t *rtp, uint8_t stream, uint8_t *data, size_t data_len) { - int r = 0; - assert(data_len <= 0xffff); data[0] = '$'; data[1] = stream; data[2] = (data_len - 4) >> 8; data[3] = (data_len - 4) & 0xff; - pthread_mutex_lock(rtp->tcp_lock); - if (!tvh_write(rtp->fd_rtp, data, data_len)) - r = errno; - pthread_mutex_unlock(rtp->tcp_lock); - return r; + return http_extra_send_prealloc(rtp->hc, data, data_len); } static inline int @@ -286,7 +280,8 @@ satip_rtp_flush_tcp_data(satip_rtp_session_t *rtp) if (v->iov_len) r = satip_rtp_tcp_data(rtp, 0, v->iov_base, v->iov_len); - free(v->iov_base); + else + free(v->iov_base); v->iov_base = NULL; v->iov_len = 0; return r; @@ -455,7 +450,7 @@ satip_rtp_thread(void *aux) */ void *satip_rtp_queue(th_subscription_t *subs, streaming_queue_t *sq, - pthread_mutex_t *tcp_lock, + http_connection_t *hc, struct sockaddr_storage *peer, int port, int fd_rtp, int fd_rtcp, int frontend, int source, dvb_mux_conf_t *dmc, @@ -478,7 +473,9 @@ void *satip_rtp_queue(th_subscription_t *subs, rtp->fd_rtcp = fd_rtcp; rtp->subs = subs; rtp->sq = sq; - rtp->tcp_lock = tcp_lock; + rtp->hc = hc; + rtp->tcp_payload = RTP_TCP_PAYLOAD; + rtp->tcp_buffer_size = 16*1024*1024; rtp->no_data_cb = no_data_cb; rtp->no_data_opaque = no_data_opaque; atomic_set(&rtp->allow_data, allow_data); @@ -594,11 +591,9 @@ void satip_rtp_close(void *_rtp) tvh_cond_signal(&sq->sq_cond, 0); pthread_mutex_unlock(&sq->sq_mutex); pthread_mutex_unlock(&satip_rtp_lock); - if (rtp->port == RTSP_TCP_DATA) - pthread_mutex_lock(rtp->tcp_lock); pthread_join(rtp->tid, NULL); if (rtp->port == RTSP_TCP_DATA) { - pthread_mutex_unlock(rtp->tcp_lock); + http_extra_destroy(rtp->hc); free(rtp->tcp_data.iov_base); } else { udp_multisend_free(&rtp->um); diff --git a/src/satip/rtsp.c b/src/satip/rtsp.c index 94fa80e50..03d7ad1f7 100644 --- a/src/satip/rtsp.c +++ b/src/satip/rtsp.c @@ -227,8 +227,6 @@ rtsp_session_timer_cb(void *aux) tvhwarn(LS_SATIPS, "-/%s/%i: session closed (timeout)", rs->session, rs->stream); pthread_mutex_unlock(&global_lock); pthread_mutex_lock(&rtsp_lock); - if (rs->rtp_peer_port == RTSP_TCP_DATA && rs->tcp_data) - shutdown(rs->tcp_data->hc_fd, SHUT_RDWR); rtsp_close_session(rs); rtsp_free_session(rs); pthread_mutex_unlock(&rtsp_lock); @@ -630,7 +628,7 @@ pids: rs->no_data = 0; rs->rtp_handle = satip_rtp_queue(rs->subs, &rs->prch.prch_sq, - &hc->hc_fd_lock, hc->hc_peer, rs->rtp_peer_port, + hc, hc->hc_peer, rs->rtp_peer_port, rs->udp_rtp ? rs->udp_rtp->fd : hc->hc_fd, rs->udp_rtcp ? rs->udp_rtcp->fd : -1, rs->findex, rs->src, &rs->dmc_tuned, @@ -1217,9 +1215,9 @@ rtsp_process_options(http_connection_t *hc) http_arg_set(&args, "Public", "OPTIONS,DESCRIBE,SETUP,PLAY,TEARDOWN"); if (hc->hc_session) http_arg_set(&args, "Session", hc->hc_session); - pthread_mutex_lock(&hc->hc_fd_lock); + http_send_begin(hc); http_send_header(hc, HTTP_STATUS_OK, NULL, 0, NULL, NULL, 0, NULL, NULL, &args); - pthread_mutex_unlock(&hc->hc_fd_lock); + http_send_end(hc); http_arg_flush(&args); return 0; @@ -1360,11 +1358,11 @@ rtsp_process_describe(http_connection_t *hc) else snprintf(buf, sizeof(buf), "rtsp://%s", rtsp_ip); http_arg_set(&args, "Content-Base", buf); - pthread_mutex_lock(&hc->hc_fd_lock); + http_send_begin(hc); http_send_header(hc, HTTP_STATUS_OK, "application/sdp", q.hq_size, NULL, NULL, 0, NULL, NULL, &args); tcp_write_queue(hc->hc_fd, &q); - pthread_mutex_unlock(&hc->hc_fd_lock); + http_send_end(hc); http_arg_flush(&args); htsbuf_queue_flush(&q); return 0; @@ -1444,9 +1442,9 @@ rtsp_process_play(http_connection_t *hc, int cmd) pthread_mutex_unlock(&rtsp_lock); - pthread_mutex_lock(&hc->hc_fd_lock); + http_send_begin(hc); http_send_header(hc, HTTP_STATUS_OK, NULL, 0, NULL, NULL, 0, NULL, NULL, &args); - pthread_mutex_unlock(&hc->hc_fd_lock); + http_send_end(hc); goto end; @@ -1498,9 +1496,9 @@ rtsp_process_teardown(http_connection_t *hc) pthread_mutex_unlock(&rtsp_lock); http_arg_init(&args); http_arg_set(&args, "Session", session); - pthread_mutex_lock(&hc->hc_fd_lock); + http_send_begin(hc); http_send_header(hc, HTTP_STATUS_OK, NULL, 0, NULL, NULL, 0, NULL, NULL, NULL); - pthread_mutex_unlock(&hc->hc_fd_lock); + http_send_end(hc); http_arg_flush(&args); } return 0; @@ -1597,8 +1595,6 @@ rtsp_serve(int fd, void **opaque, struct sockaddr_storage *peer, http_serve_requests(&hc); - shutdown(fd, SHUT_RDWR); - rtsp_flush_requests(&hc); close(fd); diff --git a/src/satip/server.c b/src/satip/server.c index feca0a881..840b4b27d 100644 --- a/src/satip/server.c +++ b/src/satip/server.c @@ -216,10 +216,10 @@ satip_server_http_xml(http_connection_t *hc) snprintf(buf2, sizeof(buf2), "%d", srcs); http_arg_set(&args, "X-SATIP-Sources", buf2); } - pthread_mutex_lock(&hc->hc_fd_lock); + http_send_begin(hc); http_send_header(hc, 200, "text/xml", strlen(buf), 0, NULL, 10, 0, NULL, &args); tvh_write(hc->hc_fd, buf, strlen(buf)); - pthread_mutex_unlock(&hc->hc_fd_lock); + http_send_end(hc); http_arg_flush(&args); return 0; diff --git a/src/satip/server.h b/src/satip/server.h index d0c381940..d22fa05da 100644 --- a/src/satip/server.h +++ b/src/satip/server.h @@ -68,7 +68,7 @@ extern const idclass_t satip_server_class; void *satip_rtp_queue(th_subscription_t *subs, streaming_queue_t *sq, - pthread_mutex_t *tcp_lock, + http_connection_t *hc, struct sockaddr_storage *peer, int port, int fd_rtp, int fd_rtcp, int frontend, int source, diff --git a/src/webui/webui.c b/src/webui/webui.c index 8e5381664..e9d03c9fc 100644 --- a/src/webui/webui.c +++ b/src/webui/webui.c @@ -331,7 +331,7 @@ page_static_file(http_connection_t *hc, const char *_remain, void *opaque) if (!gzip && fb_gzipped(fp)) gzip = "gzip"; - pthread_mutex_lock(&hc->hc_fd_lock); + http_send_begin(hc); http_send_header(hc, 200, content, size, gzip, NULL, 10, 0, NULL, NULL); while (!fb_eof(fp)) { ssize_t c = fb_read(fp, buf, sizeof(buf)); @@ -344,7 +344,7 @@ page_static_file(http_connection_t *hc, const char *_remain, void *opaque) break; } } - pthread_mutex_unlock(&hc->hc_fd_lock); + http_send_end(hc); fb_close(fp); return ret; @@ -1438,10 +1438,10 @@ page_xspf(http_connection_t *hc, const char *remain, void *opaque) pthread_mutex_unlock(&global_lock); len = strlen(buf); - pthread_mutex_lock(&hc->hc_fd_lock); + http_send_begin(hc); http_send_header(hc, 200, "application/xspf+xml", len, 0, NULL, 10, 0, NULL, NULL); tvh_write(hc->hc_fd, buf, len); - pthread_mutex_unlock(&hc->hc_fd_lock); + http_send_end(hc); free(hostpath); return 0; @@ -1481,10 +1481,10 @@ page_m3u(http_connection_t *hc, const char *remain, void *opaque) pthread_mutex_unlock(&global_lock); len = strlen(buf); - pthread_mutex_lock(&hc->hc_fd_lock); + http_send_begin(hc); http_send_header(hc, 200, MIME_M3U, len, 0, NULL, 10, 0, NULL, NULL); tvh_write(hc->hc_fd, buf, len); - pthread_mutex_unlock(&hc->hc_fd_lock); + http_send_end(hc); free(hostpath); return 0; @@ -1645,7 +1645,7 @@ http_serve_file(http_connection_t *hc, const char *fname, } } - pthread_mutex_lock(&hc->hc_fd_lock); + http_send_begin(hc); http_send_header(hc, range ? HTTP_STATUS_PARTIAL_CONTENT : HTTP_STATUS_OK, content, content_len, NULL, NULL, 10, range ? range_buf : NULL, disposition, NULL); @@ -1671,7 +1671,7 @@ http_serve_file(http_connection_t *hc, const char *fname, stats(hc, r, opaque); } } - pthread_mutex_unlock(&hc->hc_fd_lock); + http_send_end(hc); close(fd); return ret; @@ -1899,10 +1899,10 @@ http_redir(http_connection_t *hc, const char *remain, void *opaque) } } snprintf(buf, sizeof(buf), "tvh_locale={};tvh_locale_lang='';"); - pthread_mutex_lock(&hc->hc_fd_lock); + http_send_begin(hc); http_send_header(hc, 200, "text/javascript; charset=UTF-8", strlen(buf), 0, NULL, 10, 0, NULL, NULL); tvh_write(hc->hc_fd, buf, strlen(buf)); - pthread_mutex_unlock(&hc->hc_fd_lock); + http_send_end(hc); return 0; } if (!strcmp(components[0], "theme.css")) { -- 2.47.2