]> git.ipfire.org Git - thirdparty/shairport-sync.git/commitdiff
Fixed race condition when stopping other threads with SIGUSR1 612/head
authorBelbo <belbo@gmx.de>
Tue, 15 Aug 2017 20:00:57 +0000 (22:00 +0200)
committerBelbo <belbo@gmx.de>
Wed, 1 Nov 2017 19:46:05 +0000 (20:46 +0100)
Many threads have a main loop that checks some exit variable in the loop condition. To stop a thread, another thread sets that exit variable, sends a SIGUSR1 to that thread to get it out of blocking calls like read(), write() or select(), and finally joins it.

There was a race condition though: if the signal arrived after the loop condition was checked but before the blocking system function was entered, the target thread essentially ignored the signal, made the blocking call anyway, and could get stuck forever there.

The only way around this race condition is to block SIGUSR1 in the destination thread during the critical time interval. This requires that the blocking system call and the call to unblock SIGUSR1 be an atomic unit. There is only one system call that makes this possible: pselect(), which takes a signal mask to replace the current signal mask with, but only while pselect() runs. Other functions like read(), recv(), write() and sendto() may only be called if we can be sure that they don't block. For this purpose, the file descriptors must be set to non-blocking mode and we must prepend the call to them with a call to pselect() to block until the socket becomes readable/writable.

common.c
common.h
mdns_external.c
player.c
rtp.c
rtsp.c
shairport.c

index 36fa183c841b88e3efe71a833c930f28e3086d81..e8cc16ac86ceb4f752b211dd9e77a48a20ec7b58 100644 (file)
--- a/common.c
+++ b/common.c
@@ -92,6 +92,8 @@ shairport_cfg config;
 
 int debuglev = 0;
 
+sigset_t pselect_sigset;
+
 int get_requested_connection_state_to_output() { return requested_connection_state_to_output; }
 
 void set_requested_connection_state_to_output(int v) { requested_connection_state_to_output = v; }
index 4c2584e93a10ef7d5e96355d93f976bdbdcc9cb8..20d142d4c2954756c66aaa5f914db2e1bc2364e2 100644 (file)
--- a/common.h
+++ b/common.h
@@ -4,6 +4,7 @@
 #include <libconfig.h>
 #include <stdint.h>
 #include <sys/socket.h>
+#include <signal.h>
 
 #include "audio.h"
 #include "config.h"
@@ -219,4 +220,6 @@ void command_set_volume(double volume);
 void shairport_shutdown();
 // void shairport_startup_complete(void);
 
+extern sigset_t pselect_sigset;
+
 #endif // _COMMON_H
index 86ad208b663ce046f071430ee350e0eee24dde1a..c4aecd113caf121c6aaae7cc605985f82272e20d 100644 (file)
@@ -28,7 +28,6 @@
 #include "common.h"
 #include <errno.h>
 #include <fcntl.h>
-#include <signal.h>
 #include <stdio.h>
 #include <string.h>
 #include <unistd.h>
index 92d7e575263a082231f913d80528a98142427174..0b3200a8844b1c454a3203689a8ead73acf0233b 100644 (file)
--- a/player.c
+++ b/player.c
@@ -35,7 +35,6 @@
 #include <limits.h>
 #include <math.h>
 #include <pthread.h>
-#include <signal.h>
 #include <stdio.h>
 #include <stdlib.h>
 #include <stdlib.h>
diff --git a/rtp.c b/rtp.c
index 89c9c7364024f8f43bd1af84deab3d4210b05e31..ba12744b51813c00db0bd312ab01e1d5d8508c81 100644 (file)
--- a/rtp.c
+++ b/rtp.c
 #include <netdb.h>
 #include <netinet/in.h>
 #include <pthread.h>
-#include <signal.h>
 #include <stdio.h>
 #include <sys/socket.h>
 #include <sys/types.h>
 #include <time.h>
 #include <unistd.h>
+#include <fcntl.h>
 
 #include "common.h"
 #include "player.h"
 #include "rtp.h"
 
+void memory_barrier();
+
 void rtp_initialise(rtsp_conn_info *conn) {
 
   conn->rtp_running = 0;
@@ -81,6 +83,15 @@ void *rtp_audio_receiver(void *arg) {
 
   ssize_t nread;
   while (conn->please_stop == 0) {
+    fd_set readfds;
+    FD_ZERO(&readfds);
+    FD_SET(conn->audio_socket, &readfds);
+    do {
+      memory_barrier();
+    } while (conn->please_stop == 0 && pselect(conn->audio_socket + 1, &readfds, NULL, NULL, NULL, &pselect_sigset) <= 0);
+    if (conn->please_stop != 0) {
+      break;
+    }
     nread = recv(conn->audio_socket, packet, sizeof(packet), 0);
 
     uint64_t local_time_now_fp = get_absolute_time_in_fp();
@@ -173,6 +184,15 @@ void *rtp_control_receiver(void *arg) {
   int64_t sync_rtp_timestamp, rtp_timestamp_less_latency;
   ssize_t nread;
   while (conn->please_stop == 0) {
+    fd_set readfds;
+    FD_ZERO(&readfds);
+    FD_SET(conn->control_socket, &readfds);
+    do {
+      memory_barrier();
+    } while (conn->please_stop == 0 && pselect(conn->control_socket + 1, &readfds, NULL, NULL, NULL, &pselect_sigset) <= 0);
+    if (conn->please_stop != 0) {
+      break;
+    }
     nread = recv(conn->control_socket, packet, sizeof(packet), 0);
     local_time_now = get_absolute_time_in_fp();
     //        clock_gettime(CLOCK_MONOTONIC,&tn);
@@ -308,6 +328,15 @@ void *rtp_timing_sender(void *arg) {
       msgsize = sizeof(struct sockaddr_in6);
     }
 #endif
+    fd_set writefds;
+    FD_ZERO(&writefds);
+    FD_SET(conn->timing_socket, &writefds);
+    do {
+      memory_barrier();
+    } while (conn->timing_sender_stop == 0 && pselect(conn->timing_socket + 1, NULL, &writefds, NULL, NULL, &pselect_sigset) <= 0);
+    if (conn->timing_sender_stop != 0) {
+      break;
+    }
     if (sendto(conn->timing_socket, &req, sizeof(req), 0,
                (struct sockaddr *)&conn->rtp_client_timing_socket, msgsize) == -1) {
       perror("Error sendto-ing to timing socket");
@@ -344,6 +373,15 @@ void *rtp_timing_receiver(void *arg) {
   uint64_t first_local_to_remote_time_difference_time;
   uint64_t l2rtd = 0;
   while (conn->please_stop == 0) {
+    fd_set readfds;
+    FD_ZERO(&readfds);
+    FD_SET(conn->timing_socket, &readfds);
+    do {
+      memory_barrier();
+    } while (conn->please_stop == 0 && pselect(conn->timing_socket + 1, &readfds, NULL, NULL, NULL, &pselect_sigset) <= 0);
+    if (conn->please_stop != 0) {
+      break;
+    }
     nread = recv(conn->timing_socket, packet, sizeof(packet), 0);
     arrival_time = get_absolute_time_in_fp();
     //      clock_gettime(CLOCK_MONOTONIC,&att);
@@ -589,6 +627,7 @@ static int bind_port(int ip_family, const char *self_ip_address, uint32_t scope_
     struct sockaddr_in *sa = (struct sockaddr_in *)&local;
     sport = ntohs(sa->sin_port);
   }
+  fcntl(local_socket, F_SETFL, O_NONBLOCK);
 
   *sock = local_socket;
   return sport;
diff --git a/rtsp.c b/rtsp.c
index 792600a9ad1595befd8d79f932e6ad0e825b7ba2..4e9749e0198aa9b61268272afe47bf69a58f80b1 100644 (file)
--- a/rtsp.c
+++ b/rtsp.c
@@ -34,7 +34,6 @@
 #include <netinet/in.h>
 #include <poll.h>
 #include <pthread.h>
-#include <signal.h>
 #include <stdio.h>
 #include <stdlib.h>
 #include <string.h>
@@ -466,7 +465,12 @@ static enum rtsp_read_request_response rtsp_read_request(rtsp_conn_info *conn,
   int msg_size = -1;
 
   while (msg_size < 0) {
-    memory_barrier();
+    fd_set readfds;
+    FD_ZERO(&readfds);
+    FD_SET(conn->fd, &readfds);
+    do {
+      memory_barrier();
+    } while (conn->stop == 0 && pselect(conn->fd + 1, &readfds, NULL, NULL, NULL, &pselect_sigset) <= 0);
     if (conn->stop != 0) {
       debug(3, "RTSP conversation thread %d shutdown requested.", conn->connection_number);
       reply = rtsp_read_request_response_immediate_shutdown_requested;
@@ -540,6 +544,17 @@ static enum rtsp_read_request_response rtsp_read_request(rtsp_conn_info *conn,
         warning_message_sent = 1;
       }
     }
+    fd_set readfds;
+    FD_ZERO(&readfds);
+    FD_SET(conn->fd, &readfds);
+    do {
+      memory_barrier();
+    } while (conn->stop == 0 && pselect(conn->fd + 1, &readfds, NULL, NULL, NULL, &pselect_sigset) <= 0);
+    if (conn->stop != 0) {
+      debug(1, "RTSP shutdown requested.");
+      reply = rtsp_read_request_response_immediate_shutdown_requested;
+      goto shutdown;
+    }
     ssize_t read_chunk = msg_size - inbuf;
     if (read_chunk > max_read_chunk)
       read_chunk = max_read_chunk;
@@ -1026,6 +1041,7 @@ void metadata_create(void) {
     } else {
       int buffer_size = METADATA_SNDBUF;
       setsockopt(metadata_sock, SOL_SOCKET, SO_SNDBUF, &buffer_size, sizeof(buffer_size));
+      fcntl(fd, F_SETFL, O_NONBLOCK);
       bzero((char *)&metadata_sockaddr, sizeof(metadata_sockaddr));
       metadata_sockaddr.sin_family = AF_INET;
       metadata_sockaddr.sin_addr.s_addr = inet_addr(config.metadata_sockaddr);
@@ -1773,12 +1789,6 @@ authenticate:
 }
 
 static void *rtsp_conversation_thread_func(void *pconn) {
-  // SIGUSR1 is used to interrupt this thread if blocked for read
-  sigset_t set;
-  sigemptyset(&set);
-  sigaddset(&set, SIGUSR1);
-  pthread_sigmask(SIG_UNBLOCK, &set, NULL);
-
   rtsp_conn_info *conn = pconn;
 
   rtp_initialise(conn);
@@ -1821,7 +1831,15 @@ static void *rtsp_conversation_thread_func(void *pconn) {
       }
       debug(3, "RTSP thread %d: RTSP Response:", conn->connection_number);
       debug_print_msg_headers(3, resp);
-      msg_write_response(conn->fd, resp);
+      fd_set writefds;
+      FD_ZERO(&writefds);
+      FD_SET(conn->fd, &writefds);
+      do {
+        memory_barrier();
+      } while (conn->stop == 0 && pselect(conn->fd + 1, NULL, &writefds, NULL, NULL, &pselect_sigset) <= 0);
+      if (conn->stop == 0) {
+        msg_write_response(conn->fd, resp);
+      }
       msg_free(req);
       msg_free(resp);
     } else {
@@ -2063,6 +2081,8 @@ void rtsp_listen_loop(void) {
       //      conn->thread = rtsp_conversation_thread;
       //      conn->stop = 0; // record's memory has been zeroed
       //      conn->authorized = 0; // record's memory has been zeroed
+      fcntl(conn->fd, F_SETFL, O_NONBLOCK);
+
       ret = pthread_create(&conn->thread, NULL, rtsp_conversation_thread_func,
                            conn); // also acts as a memory barrier
       if (ret)
index 033bb98cc96b06fd968d4d05a49782207857466e..b2f68b4a68a68a2ef42d639b2da45b204dbb1148 100644 (file)
@@ -33,7 +33,6 @@
 #include <memory.h>
 #include <net/if.h>
 #include <popt.h>
-#include <signal.h>
 #include <stdio.h>
 #include <stdlib.h>
 #include <sys/stat.h>
@@ -987,6 +986,10 @@ void signal_setup(void) {
   sigdelset(&set, SIGUSR2);
   pthread_sigmask(SIG_BLOCK, &set, NULL);
 
+  // SIGUSR1 is used to interrupt a thread if blocked in pselect
+  pthread_sigmask(SIG_SETMASK, NULL, &pselect_sigset);
+  sigdelset(&pselect_sigset, SIGUSR1);
+
   // setting this to SIG_IGN would prevent signalling any threads.
   struct sigaction sa;
   memset(&sa, 0, sizeof(sa));