Many threads have a main loop that checks some exit variable in the loop condition. To stop a thread, another thread sets that exit variable, sends a SIGUSR1 to that thread to get it out of blocking calls like read(), write() or select(), and finally joins it.
There was a race condition though: if the signal arrived after the loop condition was checked but before the blocking system function was entered, the target thread essentially ignored the signal, made the blocking call anyway, and could get stuck forever there.
The only way around this race condition is to block SIGUSR1 in the destination thread during the critical time interval. This requires that the blocking system call and the call to unblock SIGUSR1 be an atomic unit. There is only one system call that makes this possible: pselect(), which takes a signal mask to replace the current signal mask with, but only while pselect() runs. Other functions like read(), recv(), write() and sendto() may only be called if we can be sure that they don't block. For this purpose, the file descriptors must be set to non-blocking mode and we must prepend the call to them with a call to pselect() to block until the socket becomes readable/writable.
int debuglev = 0;
+sigset_t pselect_sigset;
+
int get_requested_connection_state_to_output() { return requested_connection_state_to_output; }
void set_requested_connection_state_to_output(int v) { requested_connection_state_to_output = v; }
#include <libconfig.h>
#include <stdint.h>
#include <sys/socket.h>
+#include <signal.h>
#include "audio.h"
#include "config.h"
void shairport_shutdown();
// void shairport_startup_complete(void);
+extern sigset_t pselect_sigset;
+
#endif // _COMMON_H
#include "common.h"
#include <errno.h>
#include <fcntl.h>
-#include <signal.h>
#include <stdio.h>
#include <string.h>
#include <unistd.h>
#include <limits.h>
#include <math.h>
#include <pthread.h>
-#include <signal.h>
#include <stdio.h>
#include <stdlib.h>
#include <stdlib.h>
#include <netdb.h>
#include <netinet/in.h>
#include <pthread.h>
-#include <signal.h>
#include <stdio.h>
#include <sys/socket.h>
#include <sys/types.h>
#include <time.h>
#include <unistd.h>
+#include <fcntl.h>
#include "common.h"
#include "player.h"
#include "rtp.h"
+void memory_barrier();
+
void rtp_initialise(rtsp_conn_info *conn) {
conn->rtp_running = 0;
ssize_t nread;
while (conn->please_stop == 0) {
+ fd_set readfds;
+ FD_ZERO(&readfds);
+ FD_SET(conn->audio_socket, &readfds);
+ do {
+ memory_barrier();
+ } while (conn->please_stop == 0 && pselect(conn->audio_socket + 1, &readfds, NULL, NULL, NULL, &pselect_sigset) <= 0);
+ if (conn->please_stop != 0) {
+ break;
+ }
nread = recv(conn->audio_socket, packet, sizeof(packet), 0);
uint64_t local_time_now_fp = get_absolute_time_in_fp();
int64_t sync_rtp_timestamp, rtp_timestamp_less_latency;
ssize_t nread;
while (conn->please_stop == 0) {
+ fd_set readfds;
+ FD_ZERO(&readfds);
+ FD_SET(conn->control_socket, &readfds);
+ do {
+ memory_barrier();
+ } while (conn->please_stop == 0 && pselect(conn->control_socket + 1, &readfds, NULL, NULL, NULL, &pselect_sigset) <= 0);
+ if (conn->please_stop != 0) {
+ break;
+ }
nread = recv(conn->control_socket, packet, sizeof(packet), 0);
local_time_now = get_absolute_time_in_fp();
// clock_gettime(CLOCK_MONOTONIC,&tn);
msgsize = sizeof(struct sockaddr_in6);
}
#endif
+ fd_set writefds;
+ FD_ZERO(&writefds);
+ FD_SET(conn->timing_socket, &writefds);
+ do {
+ memory_barrier();
+ } while (conn->timing_sender_stop == 0 && pselect(conn->timing_socket + 1, NULL, &writefds, NULL, NULL, &pselect_sigset) <= 0);
+ if (conn->timing_sender_stop != 0) {
+ break;
+ }
if (sendto(conn->timing_socket, &req, sizeof(req), 0,
(struct sockaddr *)&conn->rtp_client_timing_socket, msgsize) == -1) {
perror("Error sendto-ing to timing socket");
uint64_t first_local_to_remote_time_difference_time;
uint64_t l2rtd = 0;
while (conn->please_stop == 0) {
+ fd_set readfds;
+ FD_ZERO(&readfds);
+ FD_SET(conn->timing_socket, &readfds);
+ do {
+ memory_barrier();
+ } while (conn->please_stop == 0 && pselect(conn->timing_socket + 1, &readfds, NULL, NULL, NULL, &pselect_sigset) <= 0);
+ if (conn->please_stop != 0) {
+ break;
+ }
nread = recv(conn->timing_socket, packet, sizeof(packet), 0);
arrival_time = get_absolute_time_in_fp();
// clock_gettime(CLOCK_MONOTONIC,&att);
struct sockaddr_in *sa = (struct sockaddr_in *)&local;
sport = ntohs(sa->sin_port);
}
+ fcntl(local_socket, F_SETFL, O_NONBLOCK);
*sock = local_socket;
return sport;
#include <netinet/in.h>
#include <poll.h>
#include <pthread.h>
-#include <signal.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
int msg_size = -1;
while (msg_size < 0) {
- memory_barrier();
+ fd_set readfds;
+ FD_ZERO(&readfds);
+ FD_SET(conn->fd, &readfds);
+ do {
+ memory_barrier();
+ } while (conn->stop == 0 && pselect(conn->fd + 1, &readfds, NULL, NULL, NULL, &pselect_sigset) <= 0);
if (conn->stop != 0) {
debug(3, "RTSP conversation thread %d shutdown requested.", conn->connection_number);
reply = rtsp_read_request_response_immediate_shutdown_requested;
warning_message_sent = 1;
}
}
+ fd_set readfds;
+ FD_ZERO(&readfds);
+ FD_SET(conn->fd, &readfds);
+ do {
+ memory_barrier();
+ } while (conn->stop == 0 && pselect(conn->fd + 1, &readfds, NULL, NULL, NULL, &pselect_sigset) <= 0);
+ if (conn->stop != 0) {
+ debug(1, "RTSP shutdown requested.");
+ reply = rtsp_read_request_response_immediate_shutdown_requested;
+ goto shutdown;
+ }
ssize_t read_chunk = msg_size - inbuf;
if (read_chunk > max_read_chunk)
read_chunk = max_read_chunk;
} else {
int buffer_size = METADATA_SNDBUF;
setsockopt(metadata_sock, SOL_SOCKET, SO_SNDBUF, &buffer_size, sizeof(buffer_size));
+ fcntl(fd, F_SETFL, O_NONBLOCK);
bzero((char *)&metadata_sockaddr, sizeof(metadata_sockaddr));
metadata_sockaddr.sin_family = AF_INET;
metadata_sockaddr.sin_addr.s_addr = inet_addr(config.metadata_sockaddr);
}
static void *rtsp_conversation_thread_func(void *pconn) {
- // SIGUSR1 is used to interrupt this thread if blocked for read
- sigset_t set;
- sigemptyset(&set);
- sigaddset(&set, SIGUSR1);
- pthread_sigmask(SIG_UNBLOCK, &set, NULL);
-
rtsp_conn_info *conn = pconn;
rtp_initialise(conn);
}
debug(3, "RTSP thread %d: RTSP Response:", conn->connection_number);
debug_print_msg_headers(3, resp);
- msg_write_response(conn->fd, resp);
+ fd_set writefds;
+ FD_ZERO(&writefds);
+ FD_SET(conn->fd, &writefds);
+ do {
+ memory_barrier();
+ } while (conn->stop == 0 && pselect(conn->fd + 1, NULL, &writefds, NULL, NULL, &pselect_sigset) <= 0);
+ if (conn->stop == 0) {
+ msg_write_response(conn->fd, resp);
+ }
msg_free(req);
msg_free(resp);
} else {
// conn->thread = rtsp_conversation_thread;
// conn->stop = 0; // record's memory has been zeroed
// conn->authorized = 0; // record's memory has been zeroed
+ fcntl(conn->fd, F_SETFL, O_NONBLOCK);
+
ret = pthread_create(&conn->thread, NULL, rtsp_conversation_thread_func,
conn); // also acts as a memory barrier
if (ret)
#include <memory.h>
#include <net/if.h>
#include <popt.h>
-#include <signal.h>
#include <stdio.h>
#include <stdlib.h>
#include <sys/stat.h>
sigdelset(&set, SIGUSR2);
pthread_sigmask(SIG_BLOCK, &set, NULL);
+ // SIGUSR1 is used to interrupt a thread if blocked in pselect
+ pthread_sigmask(SIG_SETMASK, NULL, &pselect_sigset);
+ sigdelset(&pselect_sigset, SIGUSR1);
+
// setting this to SIG_IGN would prevent signalling any threads.
struct sigaction sa;
memset(&sa, 0, sizeof(sa));