From: Mike Brady Date: Fri, 22 Jan 2016 17:47:25 +0000 (+0000) Subject: Begin to clean up the 'stop' signalling among the threads -- try to remove as many... X-Git-Tag: 2.7.7~4^2~6 X-Git-Url: http://git.ipfire.org/cgi-bin/gitweb.cgi?a=commitdiff_plain;h=21fb39d22f8ccf88eaf271d71a6dd595316c6f17;p=thirdparty%2Fshairport-sync.git Begin to clean up the 'stop' signalling among the threads -- try to remove as many globals as possible --- diff --git a/player.c b/player.c index 0b24a67e..856b3142 100644 --- a/player.c +++ b/player.c @@ -81,7 +81,7 @@ static int sampling_rate, frame_size; static aes_context dctx; #endif -static pthread_t player_thread; +//static pthread_t player_thread = NULL; static int please_stop; static int encrypted; // Normally the audio is encrypted, but it may not be @@ -893,6 +893,13 @@ typedef struct stats { // statistics for running averages } stats_t; static void *player_thread_func(void *arg) { + int threads_stop = 0; + // create and start the timing, control and audio receiver threads + pthread_t rtp_audio_thread, rtp_control_thread, rtp_timing_thread; + pthread_create(&rtp_audio_thread, NULL, &rtp_audio_receiver, (void *)&threads_stop); + pthread_create(&rtp_control_thread, NULL, &rtp_control_receiver, (void *)&threads_stop); + pthread_create(&rtp_timing_thread, NULL, &rtp_timing_receiver, (void *)&threads_stop); + session_corrections = 0; play_segment_reference_frame = 0; // zero signals that we are not in a play segment // check that there are enough buffers to accommodate the desired latency and the latency offset @@ -1219,8 +1226,22 @@ static void *player_thread_func(void *arg) { } } } + if (config.output->stop) + config.output->stop(); free(outbuf); free(silence); + debug(1,"Shut down audio, control and timing threads"); + + pthread_kill(rtp_audio_thread, SIGUSR1); + pthread_kill(rtp_control_thread, SIGUSR1); + pthread_kill(rtp_timing_thread, SIGUSR1); + pthread_join(rtp_timing_thread, NULL); + debug(1,"timing thread joined"); + pthread_join(rtp_audio_thread, NULL); + debug(1,"audio thread joined"); + pthread_join(rtp_control_thread, NULL); + debug(1,"control thread joined"); + debug(1,"Player thread exit"); return 0; } @@ -1389,7 +1410,9 @@ void player_flush(uint32_t timestamp) { #endif } -int player_play(stream_cfg *stream) { +int player_play(stream_cfg *stream, pthread_t *player_thread) { + //if (*player_thread!=NULL) + // die("Trying to create a second player thread for this RTSP session"); packet_count = 0; encrypted = stream->encrypted; if (config.buffer_start_fill > BUFFER_FRAMES) @@ -1434,23 +1457,26 @@ int player_play(stream_cfg *stream) { rc = pthread_attr_setstacksize(&tattr, size); if (rc) debug(1, "Error setting stack size for player_thread: %s", strerror(errno)); - pthread_create(&player_thread, &tattr, player_thread_func, NULL); + pthread_create(player_thread, &tattr, player_thread_func, NULL); pthread_attr_destroy(&tattr); return 0; } -void player_stop(void) { - please_stop = 1; - pthread_cond_signal(&flowcontrol); // tell it to give up - pthread_join(player_thread, NULL); -#ifdef CONFIG_METADATA - send_ssnc_metadata('pend', NULL, 0, 1); -#endif - config.output->stop(); - command_stop(); - free_buffer(); - free_decoder(); - int rc = pthread_cond_destroy(&flowcontrol); - if (rc) - debug(1, "Error destroying condition variable."); +void player_stop(pthread_t *player_thread) { + //if (*thread==NULL) + // debug(1,"Trying to stop a non-existent player thread"); + // else { + please_stop = 1; + pthread_cond_signal(&flowcontrol); // tell it to give up + pthread_join(*player_thread, NULL); + #ifdef CONFIG_METADATA + send_ssnc_metadata('pend', NULL, 0, 1); + #endif + command_stop(); + free_buffer(); + free_decoder(); + int rc = pthread_cond_destroy(&flowcontrol); + if (rc) + debug(1, "Error destroying condition variable."); + // } } diff --git a/player.h b/player.h index b9cc900a..9012b0cb 100644 --- a/player.h +++ b/player.h @@ -14,8 +14,8 @@ typedef uint16_t seq_t; // wrapped number between two seq_t. int32_t seq_diff(seq_t a, seq_t b); -int player_play(stream_cfg *cfg); -void player_stop(void); +int player_play(stream_cfg *cfg, pthread_t *thread); +void player_stop(pthread_t *thread); void player_volume(double f); void player_flush(uint32_t timestamp); diff --git a/rtp.c b/rtp.c index baf95124..2870e612 100644 --- a/rtp.c +++ b/rtp.c @@ -61,7 +61,6 @@ typedef struct time_ping_record { // only one RTP session can be active at a time. static int running = 0; -static int please_shutdown; static char client_ip_string[INET6_ADDRSTRLEN]; // the ip string pointing to the client static short client_ip_family; // AF_INET / AF_INET6 @@ -72,7 +71,7 @@ static SOCKADDR rtp_client_timing_socket; // a socket pointing to the timing po static int audio_socket; // our local [server] audio socket static int control_socket; // our local [server] control socket static int timing_socket; // local timing socket -static pthread_t rtp_audio_thread, rtp_control_thread, rtp_timing_thread; +//static pthread_t rtp_audio_thread, rtp_control_thread, rtp_timing_thread; static uint32_t reference_timestamp; static uint64_t reference_timestamp_time; @@ -96,16 +95,16 @@ static pthread_mutex_t reference_time_mutex = PTHREAD_MUTEX_INITIALIZER; uint64_t static local_to_remote_time_difference; // used to switch between local and remote clocks -static void *rtp_audio_receiver(void *arg) { +void *rtp_audio_receiver(void *arg) { // we inherit the signal mask (SIGUSR1) + int *stop = arg; // when set to 1, we should stop + int32_t last_seqno = -1; uint8_t packet[2048], *pktp; ssize_t nread; - while (1) { - if (please_shutdown) - break; + while (*stop==0) { nread = recv(audio_socket, packet, sizeof(packet), 0); if (nread < 0) break; @@ -158,17 +157,18 @@ static void *rtp_audio_receiver(void *arg) { return NULL; } -static void *rtp_control_receiver(void *arg) { +void *rtp_control_receiver(void *arg) { // we inherit the signal mask (SIGUSR1) + + int *stop = arg; // when set to 1, we should stop + reference_timestamp = 0; // nothing valid received yet uint8_t packet[2048], *pktp; struct timespec tn; uint64_t remote_time_of_sync, local_time_now, remote_time_now; uint32_t sync_rtp_timestamp, rtp_timestamp_less_latency; ssize_t nread; - while (1) { - if (please_shutdown) - break; + while (*stop==0) { nread = recv(control_socket, packet, sizeof(packet), 0); local_time_now = get_absolute_time_in_fp(); // clock_gettime(CLOCK_MONOTONIC,&tn); @@ -259,7 +259,8 @@ static void *rtp_control_receiver(void *arg) { return NULL; } -static void *rtp_timing_sender(void *arg) { +void *rtp_timing_sender(void *arg) { + int *stop = arg; // the parameter points to this request to stop thing struct timing_request { char leader; char type; @@ -280,9 +281,8 @@ static void *rtp_timing_sender(void *arg) { time_ping_count = 0; // we inherit the signal mask (SIGUSR1) - while (1) { - if (please_shutdown) - break; + while (*stop==0) { + // debug(1,"Send a timing request"); if (!running) die("rtp_timing_sender called without active stream!"); @@ -314,12 +314,16 @@ static void *rtp_timing_sender(void *arg) { return NULL; } -static void *rtp_timing_receiver(void *arg) { +void *rtp_timing_receiver(void *arg) { // we inherit the signal mask (SIGUSR1) + + int *stop = arg; // when set to 1, we should stop + uint8_t packet[2048], *pktp; ssize_t nread; + int request_stop = 0; pthread_t timer_requester; - pthread_create(&timer_requester, NULL, &rtp_timing_sender, NULL); + pthread_create(&timer_requester, NULL, &rtp_timing_sender, (void *)&request_stop); // struct timespec att; uint64_t distant_receive_time, distant_transmit_time, arrival_time, return_time, transit_time, processing_time; @@ -331,9 +335,7 @@ static void *rtp_timing_receiver(void *arg) { uint64_t first_local_to_remote_time_difference = 0; uint64_t first_local_to_remote_time_difference_time; uint64_t l2rtd = 0; - while (1) { - if (please_shutdown) - break; + while (*stop==0) { nread = recv(timing_socket, packet, sizeof(packet), 0); arrival_time = get_absolute_time_in_fp(); // clock_gettime(CLOCK_MONOTONIC,&att); @@ -497,7 +499,8 @@ static void *rtp_timing_receiver(void *arg) { } } - debug(1, "Timing RTP thread interrupted. terminating."); + debug(1, "Timing thread interrupted. terminating."); + request_stop = 1; void *retval; pthread_kill(timer_requester, SIGUSR1); pthread_join(timer_requester, &retval); @@ -655,11 +658,10 @@ void rtp_setup(SOCKADDR *remote, int cport, int tport, uint32_t active_remote, i debug(2, "listening for audio, control and timing on ports %d, %d, %d.", *lsport, *lcport, *ltport); - please_shutdown = 0; reference_timestamp = 0; - pthread_create(&rtp_audio_thread, NULL, &rtp_audio_receiver, NULL); - pthread_create(&rtp_control_thread, NULL, &rtp_control_receiver, NULL); - pthread_create(&rtp_timing_thread, NULL, &rtp_timing_receiver, NULL); + //pthread_create(&rtp_audio_thread, NULL, &rtp_audio_receiver, NULL); + //pthread_create(&rtp_control_thread, NULL, &rtp_control_receiver, NULL); + //pthread_create(&rtp_timing_thread, NULL, &rtp_timing_receiver, NULL); running = 1; request_sent = 0; @@ -682,18 +684,18 @@ void clear_reference_timestamp(void) { void rtp_shutdown(void) { if (!running) - die("rtp_shutdown called without active stream!"); + debug(1,"rtp_shutdown called without active stream!"); debug(2, "shutting down RTP thread"); - please_shutdown = 1; - void *retval; - reference_timestamp = 0; - pthread_kill(rtp_audio_thread, SIGUSR1); - pthread_join(rtp_audio_thread, &retval); - pthread_kill(rtp_control_thread, SIGUSR1); - pthread_join(rtp_control_thread, &retval); - pthread_kill(rtp_timing_thread, SIGUSR1); - pthread_join(rtp_timing_thread, &retval); + clear_reference_timestamp(); +// debug(1,"Shut down audio, control and timing threads"); +// usleep(3000000); // hack +// pthread_kill(rtp_audio_thread, SIGUSR1); +// pthread_kill(rtp_control_thread, SIGUSR1); +// pthread_kill(rtp_timing_thread, SIGUSR1); +// pthread_join(rtp_audio_thread, &retval); +// pthread_join(rtp_control_thread, &retval); +// pthread_join(rtp_timing_thread, &retval); running = 0; } diff --git a/rtp.h b/rtp.h index 8045d2bb..cc0776da 100644 --- a/rtp.h +++ b/rtp.h @@ -5,6 +5,10 @@ #include "player.h" +void *rtp_audio_receiver(void *arg); +void *rtp_control_receiver(void *arg); +void *rtp_timing_receiver(void *arg); + void rtp_setup(SOCKADDR *remote, int controlport, int timingport, uint32_t active_remote, int *local_server_port, int *local_control_port, int *local_timing_port); void rtp_shutdown(void); diff --git a/rtsp.c b/rtsp.c index ac173b31..7d6063f0 100644 --- a/rtsp.c +++ b/rtsp.c @@ -92,6 +92,7 @@ typedef struct { SOCKADDR remote; int running; pthread_t thread; + pthread_t player_thread; } rtsp_conn_info; #ifdef CONFIG_METADATA @@ -426,7 +427,7 @@ static int msg_handle_line(rtsp_message **pmsg, char *line) { *p = 0; p += 2; msg_add_header(msg, line, p); - debug(2, " %s: %s.", line, p); + // debug(2, " %s: %s.", line, p); return -1; } else { char *cl = msg_get_header(msg, "Content-Length"); @@ -568,7 +569,7 @@ static void msg_write_response(int fd, rtsp_message *resp) { p += n; for (i = 0; i < resp->nheaders; i++) { - debug(2, " %s: %s.", resp->name[i], resp->value[i]); +// debug(3, " %s: %s.", resp->name[i], resp->value[i]); n = snprintf(p, pktfree, "%s: %s\r\n", resp->name[i], resp->value[i]); pktfree -= n; p += n; @@ -764,7 +765,7 @@ static void handle_setup(rtsp_conn_info *conn, rtsp_message *req, rtsp_message * strcat(hdr, q); // should unsplice the timing port entry } - player_play(&conn->stream); + player_play(&conn->stream,&conn->player_thread); // the tread better be null char *resphdr = alloca(200); *resphdr = 0; @@ -1535,12 +1536,12 @@ static void *rtsp_conversation_thread_func(void *pconn) { int method_selected = 0; for (mh = method_handlers; mh->method; mh++) { if (!strcmp(mh->method, req->method)) { - //debug(1,"RTSP Packet received of type \"%s\":",mh->method), - //msg_print_debug_headers(req); + // debug(1,"RTSP Packet received of type \"%s\":",mh->method), + // msg_print_debug_headers(req); method_selected = 1; mh->handler(conn, req, resp); - //debug(1,"RTSP Response:"); - //msg_print_debug_headers(resp); + // debug(1,"RTSP Response:"); + // msg_print_debug_headers(resp); break; } } @@ -1557,20 +1558,20 @@ static void *rtsp_conversation_thread_func(void *pconn) { } } while (reply != rtsp_read_request_response_shutdown_requested); - debug(1, "closing RTSP connection."); + debug(1, "Now closing RTSP connection."); if (conn->fd > 0) close(conn->fd); if (rtsp_playing()) { + player_stop(&conn->player_thread); // might be less noisy doing this first rtp_shutdown(); - player_stop(); pthread_mutex_unlock(&play_lock); - please_shutdown = 0; pthread_mutex_unlock(&playing_mutex); } if (auth_nonce) free(auth_nonce); conn->running = 0; - debug(2, "terminating RTSP thread."); + debug(2, "Now terminating RTSP conversation thread."); + please_shutdown = 0; return NULL; } @@ -1699,7 +1700,7 @@ void rtsp_listen_loop(void) { memset(conn, 0, sizeof(rtsp_conn_info)); socklen_t slen = sizeof(conn->remote); - debug(1, "new RTSP connection."); + debug(1, "New RTSP connection on port %d",config.port); conn->fd = accept(acceptfd, (struct sockaddr *)&conn->remote, &slen); if (conn->fd < 0) { perror("failed to accept connection");