From: Jaroslav Kysela Date: Tue, 20 Oct 2015 21:16:43 +0000 (+0200) Subject: streaming: improve the client shutdown detection X-Git-Tag: v4.2.1~1852 X-Git-Url: http://git.ipfire.org/gitweb.cgi?a=commitdiff_plain;h=d7cf55869b11f8866c90d2c4b73ee9bae3b8e462;p=thirdparty%2Ftvheadend.git streaming: improve the client shutdown detection --- diff --git a/src/input/mpegts/tsdemux.c b/src/input/mpegts/tsdemux.c index be679d28e..05dca2bb9 100644 --- a/src/input/mpegts/tsdemux.c +++ b/src/input/mpegts/tsdemux.c @@ -395,6 +395,7 @@ ts_skip(mpegts_service_t *t, const uint8_t *src, int len) return; ts_flush(t, sb); + service_send_streaming_status((service_t *)t); } /* diff --git a/src/parsers/parsers.c b/src/parsers/parsers.c index c85e482e8..4f1223b01 100644 --- a/src/parsers/parsers.c +++ b/src/parsers/parsers.c @@ -178,8 +178,10 @@ skip_mpeg_ts(service_t *t, elementary_stream_t *st, const uint8_t *data, int len { if(len >= 188) sbuf_err(&st->es_buf, len / 188); - if(st->es_buf.sb_err > 1000) + if(st->es_buf.sb_err > 1000) { parser_deliver_error(t, st); + service_send_streaming_status((service_t *)t); + } } /** diff --git a/src/service.c b/src/service.c index 9b9f0ee44..1631005ab 100644 --- a/src/service.c +++ b/src/service.c @@ -1183,6 +1183,21 @@ service_servicetype_txt ( service_t *s ) } +/** + * + */ +void +service_send_streaming_status(service_t *t) +{ + lock_assert(&t->s_stream_mutex); + + streaming_pad_deliver(&t->s_streaming_pad, + streaming_msg_create_code(SMT_SERVICE_STATUS, + t->s_streaming_status)); + + pthread_cond_broadcast(&t->s_tss_cond); +} + /** * */ @@ -1208,14 +1223,9 @@ service_set_streaming_status_flags_(service_t *t, int set) set & TSS_GRACEPERIOD ? "[Graceperiod expired] " : "", set & TSS_TIMEOUT ? "[Data timeout] " : ""); - streaming_pad_deliver(&t->s_streaming_pad, - streaming_msg_create_code(SMT_SERVICE_STATUS, - t->s_streaming_status)); - - pthread_cond_broadcast(&t->s_tss_cond); + service_send_streaming_status(t); } - /** * Restart output on a service. * Happens if the stream composition changes. diff --git a/src/service.h b/src/service.h index 468e5749e..738af0a55 100644 --- a/src/service.h +++ b/src/service.h @@ -550,6 +550,9 @@ void service_remove_raw(service_t *); void service_remove_subscriber(service_t *t, struct th_subscription *s, int reason); + +void service_send_streaming_status(service_t *t); + void service_set_streaming_status_flags_(service_t *t, int flag); static inline void @@ -568,6 +571,7 @@ service_reset_streaming_status_flags(service_t *t, int flag) service_set_streaming_status_flags_(t, n & ~flag); } + struct streaming_start; struct streaming_start *service_build_stream_start(service_t *t); diff --git a/src/tcp.c b/src/tcp.c index 83666886f..9d65a35c7 100644 --- a/src/tcp.c +++ b/src/tcp.c @@ -374,6 +374,24 @@ tcp_read_timeout(int fd, void *buf, size_t len, int timeout) } +/** + * + */ +int +tcp_socket_dead(int fd) +{ + int err = 0; + socklen_t errlen = sizeof(err); + + if (getsockopt(fd, SOL_SOCKET, SO_ERROR, &err, &errlen)) + return -errno; + if (err) + return -err; + if (recv(fd, NULL, 0, MSG_PEEK | MSG_DONTWAIT) == 0) + return -EIO; + return 0; +} + /** * */ diff --git a/src/tcp.h b/src/tcp.h index 91eb2c005..d966aee88 100644 --- a/src/tcp.h +++ b/src/tcp.h @@ -91,6 +91,8 @@ char *tcp_get_str_from_ip(const struct sockaddr *sa, char *dst, size_t maxlen); struct sockaddr *tcp_get_ip_from_str(const char *str, struct sockaddr *sa); +int tcp_socket_dead(int fd); + struct access; uint32_t tcp_connection_count(struct access *aa); diff --git a/src/webui/webui.c b/src/webui/webui.c index 5ea3d8242..c0fd5826d 100644 --- a/src/webui/webui.c +++ b/src/webui/webui.c @@ -308,8 +308,6 @@ http_stream_run(http_connection_t *hc, profile_chain_t *prch, int ptimeout, grace = 20; struct timespec ts; struct timeval tp; - int err = 0; - socklen_t errlen = sizeof(err); streaming_start_t *ss_copy; int64_t mono; @@ -343,7 +341,7 @@ http_stream_run(http_connection_t *hc, profile_chain_t *prch, if(pthread_cond_timedwait(&sq->sq_cond, &sq->sq_mutex, &ts) == ETIMEDOUT) { /* Check socket status */ - if (getsockopt(hc->hc_fd, SOL_SOCKET, SO_ERROR, (char *)&err, &errlen) || err) { + if (tcp_socket_dead(hc->hc_fd)) { tvhlog(LOG_DEBUG, "webui", "Stop streaming %s, client hung up", hc->hc_url_orig); run = 0; } else if((!started && dispatch_clock - lastpkt > grace) || @@ -364,7 +362,7 @@ http_stream_run(http_connection_t *hc, profile_chain_t *prch, case SMT_PACKET: lastpkt = dispatch_clock; if(started) { - pktbuf_t *pb;; + pktbuf_t *pb; if (sm->sm_type == SMT_PACKET) pb = ((th_pkt_t*)sm->sm_data)->pkt_payload; else @@ -390,7 +388,7 @@ http_stream_run(http_connection_t *hc, profile_chain_t *prch, streaming_msg_free(sm); mono = getmonoclock() + 2000000; while (getmonoclock() < mono) { - if (getsockopt(hc->hc_fd, SOL_SOCKET, SO_ERROR, (char *)&err, &errlen) || err) + if (tcp_socket_dead(hc->hc_fd)) break; usleep(50000); } @@ -417,7 +415,7 @@ http_stream_run(http_connection_t *hc, profile_chain_t *prch, break; case SMT_SERVICE_STATUS: - if(getsockopt(hc->hc_fd, SOL_SOCKET, SO_ERROR, &err, &errlen) || err) { + if(tcp_socket_dead(hc->hc_fd)) { tvhlog(LOG_DEBUG, "webui", "Stop streaming %s, client hung up", hc->hc_url_orig); run = 0;