From: Jaroslav Kysela Date: Thu, 10 Aug 2017 13:56:42 +0000 (+0200) Subject: http/satip server: change RTP/TCP data queuing to avoid dead-locks, fixes #4226 X-Git-Url: http://git.ipfire.org/gitweb.cgi?a=commitdiff_plain;h=ebbbc3b1e9c954a19c4aa518d83ccd28d67e6d23;p=thirdparty%2Ftvheadend.git http/satip server: change RTP/TCP data queuing to avoid dead-locks, fixes #4226 --- diff --git a/src/htsbuf.c b/src/htsbuf.c index 446c37791..22f029a6f 100644 --- a/src/htsbuf.c +++ b/src/htsbuf.c @@ -67,6 +67,7 @@ htsbuf_queue_free(htsbuf_queue_t *hq) void htsbuf_data_free(htsbuf_queue_t *hq, htsbuf_data_t *hd) { + hq->hq_size -= hd->hd_data_size - hd->hd_data_off; TAILQ_REMOVE(&hq->hq_q, hd, hd_link); free(hd->hd_data); free(hd); diff --git a/src/http.c b/src/http.c index bc8e13053..3d835d43f 100644 --- a/src/http.c +++ b/src/http.c @@ -337,7 +337,7 @@ http_send_header(http_connection_t *hc, int rc, const char *content, time_t t; int sess = 0; - lock_assert(&hc->hc_fd_lock); + assert(atomic_get(&hc->hc_extra_insend) > 0); htsbuf_queue_init(&hdrs, 0); @@ -482,9 +482,9 @@ http_send_header_websocket(http_connection_t *hc, const char *protocol) "\r\n", encoded, protocol); - pthread_mutex_lock(&hc->hc_fd_lock); + http_send_begin(hc); tcp_write_queue(hc->hc_fd, &hdrs); - pthread_mutex_unlock(&hc->hc_fd_lock); + http_send_end(hc); return 0; } @@ -610,7 +610,7 @@ http_send_reply(http_connection_t *hc, int rc, const char *content, } #endif - pthread_mutex_lock(&hc->hc_fd_lock); + http_send_begin(hc); http_send_header(hc, rc, content, size, encoding, location, maxage, 0, NULL, NULL); @@ -620,7 +620,7 @@ http_send_reply(http_connection_t *hc, int rc, const char *content, else tvh_write(hc->hc_fd, data, size); } - pthread_mutex_unlock(&hc->hc_fd_lock); + http_send_end(hc); free(data); } @@ -775,6 +775,131 @@ http_css_import(http_connection_t *hc, const char *location) http_send_reply(hc, HTTP_STATUS_OK, "text/css", NULL, loc, 0); } +/** + * + */ +void +http_extra_destroy(http_connection_t *hc) +{ + htsbuf_data_t *hd, *hd_next; + + pthread_mutex_lock(&hc->hc_extra_lock); + for (hd = TAILQ_FIRST(&hc->hc_extra.hq_q); hd; hd = hd_next) { + hd_next = TAILQ_NEXT(hd, hd_link); + if (hd->hd_data_off <= 0) { + htsbuf_data_free(&hc->hc_extra, hd); + atomic_dec(&hc->hc_extra_chunks, 1); + } + } + pthread_mutex_unlock(&hc->hc_extra_lock); +} + +/** + * + */ +int +http_extra_flush(http_connection_t *hc) +{ + htsbuf_data_t *hd; + int r = 0, serr; + + if (atomic_add(&hc->hc_extra_insend, 1) != 0) + goto fin; + + while (1) { + pthread_mutex_lock(&hc->hc_extra_lock); + hd = TAILQ_FIRST(&hc->hc_extra.hq_q); + do { + r = send(hc->hc_fd, hd->hd_data + hd->hd_data_off, + hd->hd_data_size - hd->hd_data_off, + MSG_DONTWAIT | (TAILQ_NEXT(hd, hd_link) ? MSG_MORE : 0)); + serr = errno; + } while (r < 0 && serr == EINTR); + if (r + hd->hd_data_off >= hd->hd_data_size) { + atomic_dec(&hc->hc_extra_chunks, 1); + htsbuf_data_free(&hc->hc_extra, hd); + } else if (r > 0) { + hd->hd_data_off += r; + hc->hc_extra.hq_size -= r; + } + pthread_mutex_unlock(&hc->hc_extra_lock); + + if (r < 0) { + if (ERRNO_AGAIN(serr)) + r = 0; + break; + } + } + +fin: + atomic_dec(&hc->hc_extra_insend, 1); + return r; +} + +/** + * + */ +int +http_extra_flush_partial(http_connection_t *hc) +{ + htsbuf_data_t *hd; + int r = 0; + unsigned int off, size; + void *data = NULL; + + atomic_add(&hc->hc_extra_insend, 1); + + pthread_mutex_lock(&hc->hc_extra_lock); + hd = TAILQ_FIRST(&hc->hc_extra.hq_q); + if (hd && hd->hd_data_off > 0) { + data = hd->hd_data; + hd->hd_data = NULL; + off = hd->hd_data_off; + size = hd->hd_data_size; + atomic_dec(&hc->hc_extra_chunks, 1); + htsbuf_data_free(&hc->hc_extra, hd); + } + pthread_mutex_unlock(&hc->hc_extra_lock); + if (data == NULL) + goto finish; + + r = tvh_write(hc->hc_fd, data + off, size - off); + free(data); + +finish: + atomic_dec(&hc->hc_extra_insend, 1); + return r; +} + +/** + * + */ +int +http_extra_send(http_connection_t *hc, const void *data, size_t data_len) +{ + uint8_t *b = malloc(data_len); + memcpy(b, data, data_len); + return http_extra_send_prealloc(hc, b, data_len); +} + +/** + * + */ +int +http_extra_send_prealloc(http_connection_t *hc, const void *data, size_t data_len) +{ + if (data == NULL) return 0; + pthread_mutex_lock(&hc->hc_extra_lock); + if (hc->hc_extra.hq_size <= 1024*1024) { + atomic_add(&hc->hc_extra_chunks, 1); + htsbuf_append_prealloc(&hc->hc_extra, data, data_len); + } else { + free((void *)data); + } + pthread_mutex_unlock(&hc->hc_extra_lock); + return http_extra_flush(hc); +} + /** * */ @@ -1082,10 +1207,10 @@ dump_request(http_connection_t *hc) static int http_cmd_options(http_connection_t *hc) { - pthread_mutex_lock(&hc->hc_fd_lock); + http_send_begin(hc); http_send_header(hc, HTTP_STATUS_OK, NULL, INT64_MIN, NULL, NULL, -1, 0, NULL, NULL); - pthread_mutex_unlock(&hc->hc_fd_lock); + http_send_end(hc); return 0; } @@ -1510,12 +1635,12 @@ http_websocket_send(http_connection_t *hc, uint8_t *buf, uint64_t buflen, b[9] = (buflen >> 0) & 0xff; bsize = 10; } - pthread_mutex_lock(&hc->hc_fd_lock); + http_send_begin(hc); if (tvh_write(hc->hc_fd, b, bsize)) r = -1; if (r == 0 && tvh_write(hc->hc_fd, buf, buflen)) r = -1; - pthread_mutex_unlock(&hc->hc_fd_lock); + http_send_end(hc); return r; } @@ -1670,11 +1795,14 @@ http_serve_requests(http_connection_t *hc) char *argv[3], *c, *s, *cmdline = NULL, *hdrline = NULL; int n, r, delim; - pthread_mutex_init(&hc->hc_fd_lock, NULL); + pthread_mutex_init(&hc->hc_extra_lock, NULL); http_arg_init(&hc->hc_args); http_arg_init(&hc->hc_req_args); htsbuf_queue_init(&spill, 0); htsbuf_queue_init(&hc->hc_reply, 0); + htsbuf_queue_init(&hc->hc_extra, 0); + atomic_set(&hc->hc_extra_insend, 0); + atomic_set(&hc->hc_extra_chunks, 0); do { hc->hc_no_output = 0; @@ -1785,6 +1913,7 @@ error: free(hdrline); free(cmdline); htsbuf_queue_flush(&spill); + htsbuf_queue_flush(&hc->hc_extra); free(hc->hc_nonce); hc->hc_nonce = NULL; diff --git a/src/http.h b/src/http.h index 5a65d5cc9..c64ec9d1d 100644 --- a/src/http.h +++ b/src/http.h @@ -24,6 +24,7 @@ #include "url.h" #include "tvhpoll.h" #include "access.h" +#include "atomic.h" struct channel; struct http_path; @@ -130,7 +131,6 @@ typedef enum http_wsop { } http_wsop_t; typedef struct http_connection { - pthread_mutex_t hc_fd_lock; int hc_fd; struct sockaddr_storage *hc_peer; char *hc_peer_ipstr; @@ -147,6 +147,11 @@ typedef struct http_connection { htsbuf_queue_t hc_reply; + int hc_extra_insend; + pthread_mutex_t hc_extra_lock; + int hc_extra_chunks; + htsbuf_queue_t hc_extra; + http_arg_list_t hc_args; http_arg_list_t hc_req_args; /* Argumets from GET or POST request */ @@ -216,6 +221,30 @@ void http_redirect(http_connection_t *hc, const char *location, void http_css_import(http_connection_t *hc, const char *location); +void http_extra_destroy(http_connection_t *hc); + +int http_extra_flush(http_connection_t *hc); + +int http_extra_flush_partial(http_connection_t *hc); + +int http_extra_send(http_connection_t *hc, const void *data, size_t data_len); + +int http_extra_send_prealloc(http_connection_t *hc, const void *data, size_t data_len); + +static inline void http_send_begin(http_connection_t *hc) +{ + if (atomic_get(&hc->hc_extra_chunks) > 0) + http_extra_flush_partial(hc); + atomic_add(&hc->hc_extra_insend, 1); +} + +static inline void http_send_end(http_connection_t *hc) +{ + atomic_dec(&hc->hc_extra_insend, 1); + if (atomic_get(&hc->hc_extra_chunks) > 0) + http_extra_flush(hc); +} + void http_send_header(http_connection_t *hc, int rc, const char *content, int64_t contentlen, const char *encoding, const char *location, int maxage, const char *range, diff --git a/src/satip/rtp.c b/src/satip/rtp.c index 6bd0347de..040eeb46b 100644 --- a/src/satip/rtp.c +++ b/src/satip/rtp.c @@ -73,7 +73,7 @@ typedef struct satip_rtp_session { signal_status_t sig; int sig_lock; pthread_mutex_t lock; - pthread_mutex_t *tcp_lock; + http_connection_t *hc; uint8_t *table_data; int table_data_len; void (*no_data_cb)(void *opaque); @@ -269,18 +269,12 @@ found: static int satip_rtp_tcp_data(satip_rtp_session_t *rtp, uint8_t stream, uint8_t *data, size_t data_len) { - int r = 0; - assert(data_len <= 0xffff); data[0] = '$'; data[1] = stream; data[2] = (data_len - 4) >> 8; data[3] = (data_len - 4) & 0xff; - pthread_mutex_lock(rtp->tcp_lock); - if (!tvh_write(rtp->fd_rtp, data, data_len)) - r = errno; - pthread_mutex_unlock(rtp->tcp_lock); - return r; + return http_extra_send_prealloc(rtp->hc, data, data_len); } static inline int @@ -291,7 +285,8 @@ satip_rtp_flush_tcp_data(satip_rtp_session_t *rtp) if (v->iov_len) r = satip_rtp_tcp_data(rtp, 0, v->iov_base, v->iov_len); - free(v->iov_base); + else + free(v->iov_base); v->iov_base = NULL; v->iov_len = 0; return r; @@ -459,7 +454,7 @@ satip_rtp_thread(void *aux) */ void *satip_rtp_queue(th_subscription_t *subs, streaming_queue_t *sq, - pthread_mutex_t *tcp_lock, + http_connection_t *hc, struct sockaddr_storage *peer, int port, int fd_rtp, int fd_rtcp, int frontend, int source, dvb_mux_conf_t *dmc, @@ -484,7 +479,7 @@ void *satip_rtp_queue(th_subscription_t *subs, rtp->fd_rtcp = fd_rtcp; rtp->subs = subs; rtp->sq = sq; - rtp->tcp_lock = tcp_lock; + rtp->hc = hc; rtp->tcp_payload = RTP_TCP_PAYLOAD; rtp->tcp_buffer_size = 16*1024*1024; rtp->no_data_cb = no_data_cb; @@ -618,11 +613,9 @@ void satip_rtp_close(void *_rtp) tvh_cond_signal(&sq->sq_cond, 0); pthread_mutex_unlock(&sq->sq_mutex); pthread_mutex_unlock(&satip_rtp_lock); - if (rtp->port == RTSP_TCP_DATA) - pthread_mutex_lock(rtp->tcp_lock); pthread_join(rtp->tid, NULL); if (rtp->port == RTSP_TCP_DATA) { - pthread_mutex_unlock(rtp->tcp_lock); + http_extra_destroy(rtp->hc); free(rtp->tcp_data.iov_base); } else { udp_multisend_free(&rtp->um); diff --git a/src/satip/rtsp.c b/src/satip/rtsp.c index e66582aa1..31fa4335d 100644 --- a/src/satip/rtsp.c +++ b/src/satip/rtsp.c @@ -227,8 +227,6 @@ rtsp_session_timer_cb(void *aux) tvhwarn(LS_SATIPS, "-/%s/%i: session closed (timeout)", rs->session, rs->stream); pthread_mutex_unlock(&global_lock); pthread_mutex_lock(&rtsp_lock); - if (rs->rtp_peer_port == RTSP_TCP_DATA && rs->tcp_data) - shutdown(rs->tcp_data->hc_fd, SHUT_RDWR); rtsp_close_session(rs); rtsp_free_session(rs); pthread_mutex_unlock(&rtsp_lock); @@ -630,7 +628,7 @@ pids: rs->no_data = 0; rs->rtp_handle = satip_rtp_queue(rs->subs, &rs->prch.prch_sq, - &hc->hc_fd_lock, hc->hc_peer, rs->rtp_peer_port, + hc, hc->hc_peer, rs->rtp_peer_port, rs->udp_rtp ? rs->udp_rtp->fd : hc->hc_fd, rs->udp_rtcp ? rs->udp_rtcp->fd : -1, rs->findex, rs->src, &rs->dmc_tuned, @@ -1217,9 +1215,9 @@ rtsp_process_options(http_connection_t *hc) http_arg_set(&args, "Public", "OPTIONS,DESCRIBE,SETUP,PLAY,TEARDOWN"); if (hc->hc_session) http_arg_set(&args, "Session", hc->hc_session); - pthread_mutex_lock(&hc->hc_fd_lock); + http_send_begin(hc); http_send_header(hc, HTTP_STATUS_OK, NULL, 0, NULL, NULL, 0, NULL, NULL, &args); - pthread_mutex_unlock(&hc->hc_fd_lock); + http_send_end(hc); http_arg_flush(&args); return 0; @@ -1360,11 +1358,11 @@ rtsp_process_describe(http_connection_t *hc) else snprintf(buf, sizeof(buf), "rtsp://%s", rtsp_ip); http_arg_set(&args, "Content-Base", buf); - pthread_mutex_lock(&hc->hc_fd_lock); + http_send_begin(hc); http_send_header(hc, HTTP_STATUS_OK, "application/sdp", q.hq_size, NULL, NULL, 0, NULL, NULL, &args); tcp_write_queue(hc->hc_fd, &q); - pthread_mutex_unlock(&hc->hc_fd_lock); + http_send_end(hc); http_arg_flush(&args); htsbuf_queue_flush(&q); return 0; @@ -1444,9 +1442,9 @@ rtsp_process_play(http_connection_t *hc, int cmd) pthread_mutex_unlock(&rtsp_lock); - pthread_mutex_lock(&hc->hc_fd_lock); + http_send_begin(hc); http_send_header(hc, HTTP_STATUS_OK, NULL, 0, NULL, NULL, 0, NULL, NULL, &args); - pthread_mutex_unlock(&hc->hc_fd_lock); + http_send_end(hc); goto end; @@ -1498,9 +1496,9 @@ rtsp_process_teardown(http_connection_t *hc) pthread_mutex_unlock(&rtsp_lock); http_arg_init(&args); http_arg_set(&args, "Session", session); - pthread_mutex_lock(&hc->hc_fd_lock); + http_send_begin(hc); http_send_header(hc, HTTP_STATUS_OK, NULL, 0, NULL, NULL, 0, NULL, NULL, NULL); - pthread_mutex_unlock(&hc->hc_fd_lock); + http_send_end(hc); http_arg_flush(&args); } return 0; @@ -1597,8 +1595,6 @@ rtsp_serve(int fd, void **opaque, struct sockaddr_storage *peer, http_serve_requests(&hc); - shutdown(fd, SHUT_RDWR); - rtsp_flush_requests(&hc); close(fd); diff --git a/src/satip/server.c b/src/satip/server.c index 39052a08a..1b6a3413d 100644 --- a/src/satip/server.c +++ b/src/satip/server.c @@ -216,10 +216,10 @@ satip_server_http_xml(http_connection_t *hc) snprintf(buf2, sizeof(buf2), "%d", srcs); http_arg_set(&args, "X-SATIP-Sources", buf2); } - pthread_mutex_lock(&hc->hc_fd_lock); + http_send_begin(hc); http_send_header(hc, 200, "text/xml", strlen(buf), 0, NULL, 10, 0, NULL, &args); tvh_write(hc->hc_fd, buf, strlen(buf)); - pthread_mutex_unlock(&hc->hc_fd_lock); + http_send_end(hc); http_arg_flush(&args); return 0; diff --git a/src/satip/server.h b/src/satip/server.h index 8e9817662..1965cfc99 100644 --- a/src/satip/server.h +++ b/src/satip/server.h @@ -69,7 +69,7 @@ extern const idclass_t satip_server_class; void *satip_rtp_queue(th_subscription_t *subs, streaming_queue_t *sq, - pthread_mutex_t *tcp_lock, + http_connection_t *hc, struct sockaddr_storage *peer, int port, int fd_rtp, int fd_rtcp, int frontend, int source, diff --git a/src/webui/webui.c b/src/webui/webui.c index 7728d4712..03b23bacf 100644 --- a/src/webui/webui.c +++ b/src/webui/webui.c @@ -331,7 +331,7 @@ page_static_file(http_connection_t *hc, const char *_remain, void *opaque) if (!gzip && fb_gzipped(fp)) gzip = "gzip"; - pthread_mutex_lock(&hc->hc_fd_lock); + http_send_begin(hc); http_send_header(hc, 200, content, size, gzip, NULL, 10, 0, NULL, NULL); while (!fb_eof(fp)) { ssize_t c = fb_read(fp, buf, sizeof(buf)); @@ -344,7 +344,7 @@ page_static_file(http_connection_t *hc, const char *_remain, void *opaque) break; } } - pthread_mutex_unlock(&hc->hc_fd_lock); + http_send_end(hc); fb_close(fp); return ret; @@ -1453,10 +1453,10 @@ page_xspf(http_connection_t *hc, const char *remain, void *opaque) pthread_mutex_unlock(&global_lock); len = strlen(buf); - pthread_mutex_lock(&hc->hc_fd_lock); + http_send_begin(hc); http_send_header(hc, 200, "application/xspf+xml", len, 0, NULL, 10, 0, NULL, NULL); tvh_write(hc->hc_fd, buf, len); - pthread_mutex_unlock(&hc->hc_fd_lock); + http_send_end(hc); free(hostpath); return 0; @@ -1496,10 +1496,10 @@ page_m3u(http_connection_t *hc, const char *remain, void *opaque) pthread_mutex_unlock(&global_lock); len = strlen(buf); - pthread_mutex_lock(&hc->hc_fd_lock); + http_send_begin(hc); http_send_header(hc, 200, MIME_M3U, len, 0, NULL, 10, 0, NULL, NULL); tvh_write(hc->hc_fd, buf, len); - pthread_mutex_unlock(&hc->hc_fd_lock); + http_send_end(hc); free(hostpath); return 0; @@ -1660,7 +1660,7 @@ http_serve_file(http_connection_t *hc, const char *fname, } } - pthread_mutex_lock(&hc->hc_fd_lock); + http_send_begin(hc); http_send_header(hc, range ? HTTP_STATUS_PARTIAL_CONTENT : HTTP_STATUS_OK, content, content_len, NULL, NULL, 10, range ? range_buf : NULL, disposition, NULL); @@ -1686,7 +1686,7 @@ http_serve_file(http_connection_t *hc, const char *fname, stats(hc, r, opaque); } } - pthread_mutex_unlock(&hc->hc_fd_lock); + http_send_end(hc); close(fd); return ret; @@ -1914,10 +1914,10 @@ http_redir(http_connection_t *hc, const char *remain, void *opaque) } } snprintf(buf, sizeof(buf), "tvh_locale={};tvh_locale_lang='';"); - pthread_mutex_lock(&hc->hc_fd_lock); + http_send_begin(hc); http_send_header(hc, 200, "text/javascript; charset=UTF-8", strlen(buf), 0, NULL, 10, 0, NULL, NULL); tvh_write(hc->hc_fd, buf, strlen(buf)); - pthread_mutex_unlock(&hc->hc_fd_lock); + http_send_end(hc); return 0; } if (!strcmp(components[0], "theme.css")) {