From: Mike Brady Date: Sat, 14 Jul 2018 12:38:29 +0000 (+0100) Subject: Move ab_mutex, flow control CV and flush mutex initialisation and teardown out of... X-Git-Tag: 3.3RC0~286^2~30 X-Git-Url: http://git.ipfire.org/cgi-bin/gitweb.cgi?a=commitdiff_plain;h=c2e3fa5aa7ccb0d658869e4d70d9240c9f6153ec;p=thirdparty%2Fshairport-sync.git Move ab_mutex, flow control CV and flush mutex initialisation and teardown out of the player thread. Thus remove need for player theread lock. Format some of the surce files. --- diff --git a/configure.ac b/configure.ac index f047fc15..4ddb55f5 100644 --- a/configure.ac +++ b/configure.ac @@ -2,7 +2,7 @@ # Process this file with autoconf to produce a configure script. AC_PREREQ([2.50]) -AC_INIT([shairport-sync], [3.2d67], [mikebrady@eircom.net]) +AC_INIT([shairport-sync], [3.2.2p0], [mikebrady@eircom.net]) AM_INIT_AUTOMAKE AC_CONFIG_SRCDIR([shairport.c]) AC_CONFIG_HEADERS([config.h]) diff --git a/dbus-service.c b/dbus-service.c index ebcd3371..14ace35b 100644 --- a/dbus-service.c +++ b/dbus-service.c @@ -43,16 +43,15 @@ void dbus_metadata_watcher(struct metadata_bundle *argc, __attribute__((unused)) shairport_sync_advanced_remote_control_set_available(shairportSyncAdvancedRemoteControlSkeleton, FALSE); } - + if (argc->progress_string) { - // debug(1, "Check progress string"); - th = shairport_sync_remote_control_get_progress_string( - shairportSyncRemoteControlSkeleton); - if ((th == NULL) || (strcasecmp(th, argc->progress_string) != 0)) { - // debug(1, "Progress string should be changed"); - shairport_sync_remote_control_set_progress_string( - shairportSyncRemoteControlSkeleton, argc->progress_string); - } + // debug(1, "Check progress string"); + th = shairport_sync_remote_control_get_progress_string(shairportSyncRemoteControlSkeleton); + if ((th == NULL) || (strcasecmp(th, argc->progress_string) != 0)) { + // debug(1, "Progress string should be changed"); + shairport_sync_remote_control_set_progress_string(shairportSyncRemoteControlSkeleton, + argc->progress_string); + } } switch (argc->player_state) { @@ -162,7 +161,8 @@ void dbus_metadata_watcher(struct metadata_bundle *argc, __attribute__((unused)) if ((argc->track_metadata) && (argc->track_metadata->item_id)) { char trackidstring[128]; // debug(1, "Set ID using mper ID: \"%u\".",argc->item_id); - snprintf(trackidstring, sizeof(trackidstring), "/org/gnome/ShairportSync/mper_%u", argc->track_metadata->item_id); + snprintf(trackidstring, sizeof(trackidstring), "/org/gnome/ShairportSync/mper_%u", + argc->track_metadata->item_id); GVariant *trackid = g_variant_new("o", trackidstring); g_variant_builder_add(dict_builder, "{sv}", "mpris:trackid", trackid); } diff --git a/mdns_avahi.c b/mdns_avahi.c index 28aa9eca..c54f2df6 100644 --- a/mdns_avahi.c +++ b/mdns_avahi.c @@ -84,33 +84,33 @@ static void resolve_callback(AvahiServiceResolver *r, AVAHI_GCC_UNUSED AvahiIfIn /* Called whenever a service has been resolved successfully or timed out */ switch (event) { - case AVAHI_RESOLVER_FAILURE: - debug(2, "(Resolver) Failed to resolve service '%s' of type '%s' in domain '%s': %s.", name, - type, domain, avahi_strerror(avahi_client_errno(avahi_service_resolver_get_client(r)))); - break; - case AVAHI_RESOLVER_FOUND: { - // char a[AVAHI_ADDRESS_STR_MAX], *t; - // debug(1, "Resolve callback: Service '%s' of type '%s' in domain '%s':", name, type, domain); - char *dacpid = strstr(name, "iTunes_Ctrl_"); - if (dacpid) { - dacpid += strlen("iTunes_Ctrl_"); - if (strcmp(dacpid, dbs->dacp_id) == 0) { - debug(3, "Client's DACP port: %u.", port); - #ifdef HAVE_DACP_CLIENT - dacp_monitor_port_update_callback(dacpid, port); - #endif - #ifdef CONFIG_METADATA - char portstring[20]; - memset(portstring, 0, sizeof(portstring)); - snprintf(portstring, sizeof(portstring), "%u", port); - send_ssnc_metadata('dapo', strdup(portstring), strlen(portstring), 0); - #endif - } - } else { - debug(1, "Resolve callback: Can't see a DACP string in a DACP Record!"); + case AVAHI_RESOLVER_FAILURE: + debug(2, "(Resolver) Failed to resolve service '%s' of type '%s' in domain '%s': %s.", name, + type, domain, avahi_strerror(avahi_client_errno(avahi_service_resolver_get_client(r)))); + break; + case AVAHI_RESOLVER_FOUND: { + // char a[AVAHI_ADDRESS_STR_MAX], *t; + // debug(1, "Resolve callback: Service '%s' of type '%s' in domain '%s':", name, type, domain); + char *dacpid = strstr(name, "iTunes_Ctrl_"); + if (dacpid) { + dacpid += strlen("iTunes_Ctrl_"); + if (strcmp(dacpid, dbs->dacp_id) == 0) { + debug(3, "Client's DACP port: %u.", port); +#ifdef HAVE_DACP_CLIENT + dacp_monitor_port_update_callback(dacpid, port); +#endif +#ifdef CONFIG_METADATA + char portstring[20]; + memset(portstring, 0, sizeof(portstring)); + snprintf(portstring, sizeof(portstring), "%u", port); + send_ssnc_metadata('dapo', strdup(portstring), strlen(portstring), 0); +#endif } + } else { + debug(1, "Resolve callback: Can't see a DACP string in a DACP Record!"); } } + } // debug(1,"service resolver freed by resolve_callback"); check_avahi_response(1, avahi_service_resolver_free(r)); } @@ -164,47 +164,47 @@ static void register_service(AvahiClient *c); static void egroup_callback(AvahiEntryGroup *g, AvahiEntryGroupState state, AVAHI_GCC_UNUSED void *userdata) { switch (state) { - case AVAHI_ENTRY_GROUP_ESTABLISHED: - /* The entry group has been established successfully */ - debug(1, "avahi: service '%s' successfully added.", service_name); - break; - - case AVAHI_ENTRY_GROUP_COLLISION: { - char *n; - - /* A service name collision with a remote service - * happened. Let's pick a new name */ - debug(1, "avahi name collision -- look for another"); - n = avahi_alternative_service_name(service_name); - if (service_name) - avahi_free(service_name); - else - debug(1, "avahi attempt to free a NULL service name"); - service_name = n; + case AVAHI_ENTRY_GROUP_ESTABLISHED: + /* The entry group has been established successfully */ + debug(1, "avahi: service '%s' successfully added.", service_name); + break; - debug(2, "avahi: service name collision, renaming service to '%s'", service_name); + case AVAHI_ENTRY_GROUP_COLLISION: { + char *n; - /* And recreate the services */ - register_service(avahi_entry_group_get_client(g)); - break; - } + /* A service name collision with a remote service + * happened. Let's pick a new name */ + debug(1, "avahi name collision -- look for another"); + n = avahi_alternative_service_name(service_name); + if (service_name) + avahi_free(service_name); + else + debug(1, "avahi attempt to free a NULL service name"); + service_name = n; - case AVAHI_ENTRY_GROUP_FAILURE: - debug(1, "avahi: entry group failure: %s", - avahi_strerror(avahi_client_errno(avahi_entry_group_get_client(g)))); - break; + debug(2, "avahi: service name collision, renaming service to '%s'", service_name); - case AVAHI_ENTRY_GROUP_UNCOMMITED: - debug(2, "avahi: service '%s' group is not yet committed.", service_name); - break; + /* And recreate the services */ + register_service(avahi_entry_group_get_client(g)); + break; + } - case AVAHI_ENTRY_GROUP_REGISTERING: - debug(2, "avahi: service '%s' group is registering.", service_name); - break; + case AVAHI_ENTRY_GROUP_FAILURE: + debug(1, "avahi: entry group failure: %s", + avahi_strerror(avahi_client_errno(avahi_entry_group_get_client(g)))); + break; - default: - debug(1, "avahi: unhandled egroup state: %d", state); - break; + case AVAHI_ENTRY_GROUP_UNCOMMITED: + debug(2, "avahi: service '%s' group is not yet committed.", service_name); + break; + + case AVAHI_ENTRY_GROUP_REGISTERING: + debug(2, "avahi: service '%s' group is registering.", service_name); + break; + + default: + debug(1, "avahi: unhandled egroup state: %d", state); + break; } } @@ -464,7 +464,7 @@ void *avahi_dacp_monitor(char *dacp_id) { } void avahi_dacp_dont_monitor(void *userdata) { - debug(3,"avahi_dacp_dont_monitor"); + debug(3, "avahi_dacp_dont_monitor"); if (userdata) { dacp_browser_struct *dbs = (dacp_browser_struct *)userdata; // stop and dispose of everything @@ -472,13 +472,13 @@ void avahi_dacp_dont_monitor(void *userdata) { avahi_threaded_poll_stop((dbs)->service_poll); */ if (dbs->service_poll) { - avahi_threaded_poll_stop(dbs->service_poll); + avahi_threaded_poll_stop(dbs->service_poll); avahi_threaded_poll_lock(dbs->service_poll); if (dbs->service_browser) avahi_service_browser_free(dbs->service_browser); if (dbs->service_client) avahi_client_free(dbs->service_client); - avahi_threaded_poll_unlock(dbs->service_poll); + avahi_threaded_poll_unlock(dbs->service_poll); avahi_threaded_poll_free(dbs->service_poll); } free(dbs->dacp_id); @@ -487,7 +487,7 @@ void avahi_dacp_dont_monitor(void *userdata) { } else { debug(1, "Avahi DACP Monitor is not running."); } - debug(3,"avahi_dacp_dont_monitor exit"); + debug(3, "avahi_dacp_dont_monitor exit"); } mdns_backend mdns_avahi = {.name = "avahi", diff --git a/metadata_hub.h b/metadata_hub.h index 66368f98..b2084c12 100644 --- a/metadata_hub.h +++ b/metadata_hub.h @@ -56,7 +56,7 @@ typedef struct metadata_bundle { char *client_ip; // IP number used by the audio source (i.e. the "client"), which is also the DACP // server char *server_ip; // IP number used by Shairport Sync - char *progress_string; // progress string, emitted by the source from time to time + char *progress_string; // progress string, emitted by the source from time to time int player_thread_active; // true if a play thread is running int dacp_server_active; // true if there's a reachable DACP server (assumed to be the Airplay // client) ; false otherwise diff --git a/mpris-service.c b/mpris-service.c index 3d6cf96b..4901a819 100644 --- a/mpris-service.c +++ b/mpris-service.c @@ -123,7 +123,8 @@ void mpris_metadata_watcher(struct metadata_bundle *argc, __attribute__((unused) } else if ((argc->track_metadata) && (argc->track_metadata->item_id)) { char trackidstring[128]; // debug(1, "Set ID using mper ID: \"%u\".",argc->item_id); - snprintf(trackidstring, sizeof(trackidstring), "/org/gnome/ShairportSync/mper_%u", argc->track_metadata->item_id); + snprintf(trackidstring, sizeof(trackidstring), "/org/gnome/ShairportSync/mper_%u", + argc->track_metadata->item_id); GVariant *trackid = g_variant_new("o", trackidstring); g_variant_builder_add(dict_builder, "{sv}", "mpris:trackid", trackid); } @@ -291,7 +292,7 @@ static void on_mpris_name_lost(__attribute__((unused)) GDBusConnection *connecti // name,(mpris_bus_type==G_BUS_TYPE_SESSION) ? "session" : "system"); pid_t pid = getpid(); char interface_name[256] = ""; - snprintf(interface_name, sizeof(interface_name), "org.mpris.MediaPlayer2.ShairportSync.i%d", pid); + snprintf(interface_name, sizeof(interface_name), "org.mpris.MediaPlayer2.ShairportSync.i%d", pid); GBusType mpris_bus_type = G_BUS_TYPE_SYSTEM; if (config.mpris_service_bus_type == DBT_session) mpris_bus_type = G_BUS_TYPE_SESSION; diff --git a/mqtt.c b/mqtt.c index c08a4318..6d160e81 100644 --- a/mqtt.c +++ b/mqtt.c @@ -11,170 +11,168 @@ #include "rtp.h" #include "dacp.h" -#include #include "mqtt.h" +#include -//this holds the mosquitto client +// this holds the mosquitto client struct mosquitto *global_mosq = NULL; char *topic = NULL; int connected = 0; -//mosquitto logging -void _cb_log( __attribute__((unused)) struct mosquitto *mosq, - __attribute__((unused)) void *userdata, int level, const char *str){ - switch(level){ - case MOSQ_LOG_DEBUG: - debug(1, str); - break; - case MOSQ_LOG_INFO: - debug(2, str); - break; - case MOSQ_LOG_NOTICE: - debug(3, str); - break; - case MOSQ_LOG_WARNING: - inform(str); - break; - case MOSQ_LOG_ERR: { - die("MQTT: Error: %s\n", str); - } +// mosquitto logging +void _cb_log(__attribute__((unused)) struct mosquitto *mosq, __attribute__((unused)) void *userdata, + int level, const char *str) { + switch (level) { + case MOSQ_LOG_DEBUG: + debug(1, str); + break; + case MOSQ_LOG_INFO: + debug(2, str); + break; + case MOSQ_LOG_NOTICE: + debug(3, str); + break; + case MOSQ_LOG_WARNING: + inform(str); + break; + case MOSQ_LOG_ERR: { + die("MQTT: Error: %s\n", str); + } } } -//mosquitto message handler -void on_message( __attribute__((unused)) struct mosquitto* mosq, - __attribute__((unused)) void* userdata, const struct mosquitto_message* msg){ - - //null-terminate the payload - char payload[msg->payloadlen+1]; - memcpy(payload,msg->payload,msg->payloadlen); - payload[msg->payloadlen]=0; - - debug(1, "[MQTT]: received Message on topic %s: %s\n",msg->topic, payload); - - //All recognized commands - char* commands[] = { - "command", "beginff", "beginrew", "mutetoggle", "nextitem", "previtem", "pause", - "playpause", "play", "stop", "playresume", "shuffle_songs", "volumedown", "volumeup", - NULL}; - - int it=0; - - //send command if it's a valid one - while(commands[it++]!=NULL){ - if( (size_t)msg->payloadlen>=strlen(commands[it]) && - strncmp(msg->payload, commands[it], strlen(commands[it]))==0 - ){ - debug(1, "[MQTT]: DACP Command: %s\n",commands[it]); +// mosquitto message handler +void on_message(__attribute__((unused)) struct mosquitto *mosq, + __attribute__((unused)) void *userdata, const struct mosquitto_message *msg) { + + // null-terminate the payload + char payload[msg->payloadlen + 1]; + memcpy(payload, msg->payload, msg->payloadlen); + payload[msg->payloadlen] = 0; + + debug(1, "[MQTT]: received Message on topic %s: %s\n", msg->topic, payload); + + // All recognized commands + char *commands[] = {"command", "beginff", "beginrew", "mutetoggle", "nextitem", + "previtem", "pause", "playpause", "play", "stop", + "playresume", "shuffle_songs", "volumedown", "volumeup", NULL}; + + int it = 0; + + // send command if it's a valid one + while (commands[it++] != NULL) { + if ((size_t)msg->payloadlen >= strlen(commands[it]) && + strncmp(msg->payload, commands[it], strlen(commands[it])) == 0) { + debug(1, "[MQTT]: DACP Command: %s\n", commands[it]); send_simple_dacp_command(commands[it]); break; } } } -void on_disconnect( __attribute__((unused)) struct mosquitto* mosq, - __attribute__((unused)) void* userdata, - __attribute__((unused)) int rc){ +void on_disconnect(__attribute__((unused)) struct mosquitto *mosq, + __attribute__((unused)) void *userdata, __attribute__((unused)) int rc) { connected = 0; debug(1, "[MQTT]: disconnected"); } -void on_connect(struct mosquitto* mosq, - __attribute__((unused)) void* userdata, - __attribute__((unused)) int rc){ +void on_connect(struct mosquitto *mosq, __attribute__((unused)) void *userdata, + __attribute__((unused)) int rc) { connected = 1; debug(1, "[MQTT]: connected"); - - //subscribe if requested - if(config.mqtt_enable_remote){ - char remotetopic[strlen(config.mqtt_topic)+8]; - snprintf(remotetopic,strlen(config.mqtt_topic)+8,"%s/remote",config.mqtt_topic); - mosquitto_subscribe(mosq,NULL,remotetopic,0); + + // subscribe if requested + if (config.mqtt_enable_remote) { + char remotetopic[strlen(config.mqtt_topic) + 8]; + snprintf(remotetopic, strlen(config.mqtt_topic) + 8, "%s/remote", config.mqtt_topic); + mosquitto_subscribe(mosq, NULL, remotetopic, 0); } } -//helper function to publish under a topic and automatically append the main topic -void mqtt_publish(char* topic, char* data, uint32_t length){ - char fulltopic[strlen(config.mqtt_topic)+strlen(topic)+3]; - snprintf(fulltopic, strlen(config.mqtt_topic)+strlen(topic)+2, "%s/%s", config.mqtt_topic, topic); - debug(1, "[MQTT]: publishing under %s",fulltopic); - +// helper function to publish under a topic and automatically append the main topic +void mqtt_publish(char *topic, char *data, uint32_t length) { + char fulltopic[strlen(config.mqtt_topic) + strlen(topic) + 3]; + snprintf(fulltopic, strlen(config.mqtt_topic) + strlen(topic) + 2, "%s/%s", config.mqtt_topic, + topic); + debug(1, "[MQTT]: publishing under %s", fulltopic); + int rc; - if((rc=mosquitto_publish(global_mosq, NULL, fulltopic, length, data, 0, 0))!=MOSQ_ERR_SUCCESS) { - switch(rc){ - case MOSQ_ERR_NO_CONN: - debug(1, "[MQTT]: Publish failed: not connected to broker"); - break; - default: - debug(1, "[MQTT]: Publish failed: unknown error"); - break; + if ((rc = mosquitto_publish(global_mosq, NULL, fulltopic, length, data, 0, 0)) != + MOSQ_ERR_SUCCESS) { + switch (rc) { + case MOSQ_ERR_NO_CONN: + debug(1, "[MQTT]: Publish failed: not connected to broker"); + break; + default: + debug(1, "[MQTT]: Publish failed: unknown error"); + break; } } } -//handler for incoming metadata -void mqtt_process_metadata(uint32_t type, uint32_t code, char *data, uint32_t length){ - if(global_mosq==NULL || connected!=1){ +// handler for incoming metadata +void mqtt_process_metadata(uint32_t type, uint32_t code, char *data, uint32_t length) { + if (global_mosq == NULL || connected != 1) { debug(3, "[MQTT]: Client not connected, skipping metadata handling"); return; } - if(config.mqtt_publish_raw){ + if (config.mqtt_publish_raw) { uint32_t val; char topic[] = "____/____"; - - val=htonl(type); - memcpy(topic,&val, 4); - val=htonl(code); - memcpy(topic+5,&val, 4); + + val = htonl(type); + memcpy(topic, &val, 4); + val = htonl(code); + memcpy(topic + 5, &val, 4); mqtt_publish(topic, data, length); } - if(config.mqtt_publish_parsed){ - if(type=='core'){ + if (config.mqtt_publish_parsed) { + if (type == 'core') { switch (code) { - case 'asar': - mqtt_publish("artist", data, length); - break; - case 'asal': - mqtt_publish("album", data, length); - break; - case 'minm': - mqtt_publish("title", data, length); - break; - case 'asgn': - mqtt_publish("genre", data, length); - break; - case 'asfm': - mqtt_publish("format", data, length); - break; + case 'asar': + mqtt_publish("artist", data, length); + break; + case 'asal': + mqtt_publish("album", data, length); + break; + case 'minm': + mqtt_publish("title", data, length); + break; + case 'asgn': + mqtt_publish("genre", data, length); + break; + case 'asfm': + mqtt_publish("format", data, length); + break; } - }else if(type=='ssnc'){ + } else if (type == 'ssnc') { switch (code) { - case 'asal': - mqtt_publish("songalbum", data, length); - break; - case 'pvol': - mqtt_publish("volume", data, length); - break; - case 'clip': - mqtt_publish("client_ip", data, length); - break; - case 'pbeg': - mqtt_publish("play_start", data, length); - break; - case 'pend': - mqtt_publish("play_end", data, length); - break; - case 'pfls': - mqtt_publish("play_flush", data, length); - break; - case 'prsm': - mqtt_publish("play_resume", data, length); - break; - case 'PICT': - if(config.mqtt_publish_parsed){ - mqtt_publish("cover", data, length); - } - break; + case 'asal': + mqtt_publish("songalbum", data, length); + break; + case 'pvol': + mqtt_publish("volume", data, length); + break; + case 'clip': + mqtt_publish("client_ip", data, length); + break; + case 'pbeg': + mqtt_publish("play_start", data, length); + break; + case 'pend': + mqtt_publish("play_end", data, length); + break; + case 'pfls': + mqtt_publish("play_flush", data, length); + break; + case 'prsm': + mqtt_publish("play_resume", data, length); + break; + case 'PICT': + if (config.mqtt_publish_parsed) { + mqtt_publish("cover", data, length); + } + break; } } } @@ -182,50 +180,44 @@ void mqtt_process_metadata(uint32_t type, uint32_t code, char *data, uint32_t le return; } - int initialise_mqtt() { debug(1, "Initialising MQTT"); - if(config.mqtt_hostname==NULL){ + if (config.mqtt_hostname == NULL) { debug(1, "[MQTT]: Not initialized, as the hostname is not set"); return 0; } int keepalive = 60; mosquitto_lib_init(); - if( !(global_mosq = mosquitto_new(config.service_name, true, NULL)) ){ + if (!(global_mosq = mosquitto_new(config.service_name, true, NULL))) { die("[MQTT]: FATAL: Could not create mosquitto object! %d\n", global_mosq); } - if( - config.mqtt_cafile != NULL || - config.mqtt_capath != NULL || - config.mqtt_certfile != NULL || - config.mqtt_keyfile != NULL - ){ - if(mosquitto_tls_set(global_mosq,config.mqtt_cafile, config.mqtt_capath, config.mqtt_certfile, config.mqtt_keyfile, NULL) != MOSQ_ERR_SUCCESS) { + if (config.mqtt_cafile != NULL || config.mqtt_capath != NULL || config.mqtt_certfile != NULL || + config.mqtt_keyfile != NULL) { + if (mosquitto_tls_set(global_mosq, config.mqtt_cafile, config.mqtt_capath, config.mqtt_certfile, + config.mqtt_keyfile, NULL) != MOSQ_ERR_SUCCESS) { die("[MQTT]: TLS Setup failed"); } } - if( - config.mqtt_username != NULL || - config.mqtt_password != NULL - ){ - if(mosquitto_username_pw_set(global_mosq,config.mqtt_username,config.mqtt_password) != MOSQ_ERR_SUCCESS) { + if (config.mqtt_username != NULL || config.mqtt_password != NULL) { + if (mosquitto_username_pw_set(global_mosq, config.mqtt_username, config.mqtt_password) != + MOSQ_ERR_SUCCESS) { die("[MQTT]: Username/Password set failed"); } } mosquitto_log_callback_set(global_mosq, _cb_log); - - if(config.mqtt_enable_remote){ + + if (config.mqtt_enable_remote) { mosquitto_message_callback_set(global_mosq, on_message); } - + mosquitto_disconnect_callback_set(global_mosq, on_disconnect); mosquitto_connect_callback_set(global_mosq, on_connect); - if(mosquitto_connect(global_mosq, config.mqtt_hostname, config.mqtt_port, keepalive)){ + if (mosquitto_connect(global_mosq, config.mqtt_hostname, config.mqtt_port, keepalive)) { inform("[MQTT]: Could not establish a mqtt connection"); } - if(mosquitto_loop_start(global_mosq) != MOSQ_ERR_SUCCESS){ + if (mosquitto_loop_start(global_mosq) != MOSQ_ERR_SUCCESS) { inform("[MQTT]: Could start MQTT Main loop"); } diff --git a/mqtt.h b/mqtt.h index b5ec5ab0..4bf680ed 100644 --- a/mqtt.h +++ b/mqtt.h @@ -1,15 +1,14 @@ #ifndef MQTT_H #define MQTT_H -#include #include - +#include int initialise_mqtt(); void mqtt_process_metadata(uint32_t type, uint32_t code, char *data, uint32_t length); -void mqtt_publish(char* topic, char* data, uint32_t length); +void mqtt_publish(char *topic, char *data, uint32_t length); void mqtt_setup(); -void on_connect(struct mosquitto* mosq, void* userdata, int rc); -void on_disconnect(struct mosquitto* mosq, void* userdata, int rc); -void on_message(struct mosquitto* mosq, void* userdata, const struct mosquitto_message* msg); +void on_connect(struct mosquitto *mosq, void *userdata, int rc); +void on_disconnect(struct mosquitto *mosq, void *userdata, int rc); +void on_message(struct mosquitto *mosq, void *userdata, const struct mosquitto_message *msg); void _cb_log(struct mosquitto *mosq, void *userdata, int level, const char *str); #endif /* #ifndef MQTT_H */ diff --git a/player.c b/player.c index 3c60e24d..2a659f6e 100644 --- a/player.c +++ b/player.c @@ -315,18 +315,16 @@ static int alac_decode(short *dest, int *destlen, uint8_t *buf, int len, rtsp_co } if (outsize > toutsize) { - debug(2, - "Output from alac_decode larger (%d bytes, not frames) than expected (%d bytes) -- " - "truncated, but buffer overflow possible! Encrypted = %d.", + debug(2, "Output from alac_decode larger (%d bytes, not frames) than expected (%d bytes) -- " + "truncated, but buffer overflow possible! Encrypted = %d.", outsize, toutsize, conn->stream.encrypted); reply = -1; // output packet is the wrong size } *destlen = outsize / conn->input_bytes_per_frame; if ((outsize % conn->input_bytes_per_frame) != 0) - debug(1, - "Number of audio frames (%d) does not correspond exactly to the number of bytes (%d) " - "and the audio frame size (%d).", + debug(1, "Number of audio frames (%d) does not correspond exactly to the number of bytes (%d) " + "and the audio frame size (%d).", *destlen, outsize, conn->input_bytes_per_frame); return reply; } @@ -420,7 +418,8 @@ static int init_decoder(int32_t fmtp[12], rtsp_conn_info *conn) { conn->input_bytes_per_frame = conn->input_num_channels * ((conn->input_bit_depth + 7) / 8); - alac = alac_create(conn->input_bit_depth, conn->input_num_channels); + alac = alac_create(conn->input_bit_depth, + conn->input_num_channels); // no pthread cancellation point in here if (!alac) return 1; conn->decoder_info = alac; @@ -436,10 +435,10 @@ static int init_decoder(int32_t fmtp[12], rtsp_conn_info *conn) { alac->setinfo_82 = fmtp[9]; alac->setinfo_86 = fmtp[10]; alac->setinfo_8a_rate = fmtp[11]; - alac_allocate_buffers(alac); + alac_allocate_buffers(alac); // no pthread cancellation point in here #ifdef HAVE_APPLE_ALAC - apple_alac_init(fmtp); + apple_alac_init(fmtp); // no pthread cancellation point in here #endif return 0; @@ -465,193 +464,172 @@ static void free_audio_buffers(rtsp_conn_info *conn) { free(conn->audio_buffer[i].data); } -void player_thread_lock_cleanup(void *arg) { - rtsp_conn_info *conn = (rtsp_conn_info *)arg; - debug(3, "Cleaning up player_thread_lock."); - pthread_rwlock_unlock(&conn->player_thread_lock); -} - void player_put_packet(seq_t seqno, uint32_t actual_timestamp, int64_t timestamp, uint8_t *data, int len, rtsp_conn_info *conn) { - if (pthread_rwlock_tryrdlock(&conn->player_thread_lock) == 0) { - pthread_cleanup_push(player_thread_lock_cleanup, (void *)conn); - if (conn->player_thread != NULL) { - - // all timestamps are done at the output rate - // the "actual_timestamp" is the one that comes in the packet, and is carried over for - // debugging - // and checking only. + // all timestamps are done at the output rate + // the "actual_timestamp" is the one that comes in the packet, and is carried over for + // debugging + // and checking only. - int64_t ltimestamp = timestamp * conn->output_sample_ratio; + int64_t ltimestamp = timestamp * conn->output_sample_ratio; - // ignore a request to flush that has been made before the first packet... - if (conn->packet_count == 0) { - debug_mutex_lock(&conn->flush_mutex, 1000, 1); - conn->flush_requested = 0; - conn->flush_rtp_timestamp = 0; - debug_mutex_unlock(&conn->flush_mutex, 3); - } + // ignore a request to flush that has been made before the first packet... + if (conn->packet_count == 0) { + debug_mutex_lock(&conn->flush_mutex, 1000, 1); + conn->flush_requested = 0; + conn->flush_rtp_timestamp = 0; + debug_mutex_unlock(&conn->flush_mutex, 3); + } - debug_mutex_lock(&conn->ab_mutex, 30000, 1); - conn->packet_count++; - conn->time_of_last_audio_packet = get_absolute_time_in_fp(); - if (conn->connection_state_to_output) { // if we are supposed to be processing these packets + debug_mutex_lock(&conn->ab_mutex, 30000, 1); + conn->packet_count++; + conn->time_of_last_audio_packet = get_absolute_time_in_fp(); + if (conn->connection_state_to_output) { // if we are supposed to be processing these packets + + // if (flush_rtp_timestamp != 0) + // debug(1,"Flush_rtp_timestamp is %u",flush_rtp_timestamp); + + if ((conn->flush_rtp_timestamp != 0) && (ltimestamp <= conn->flush_rtp_timestamp)) { + debug(3, + "Dropping flushed packet in player_put_packet, seqno %u, timestamp %lld, flushing to " + "timestamp: %lld.", + seqno, ltimestamp, conn->flush_rtp_timestamp); + } else { + if ((conn->flush_rtp_timestamp != 0x0) && + (ltimestamp > conn->flush_rtp_timestamp)) // if we have gone past the flush boundary time + conn->flush_rtp_timestamp = 0x0; - // if (flush_rtp_timestamp != 0) - // debug(1,"Flush_rtp_timestamp is %u",flush_rtp_timestamp); + abuf_t *abuf = 0; - if ((conn->flush_rtp_timestamp != 0) && (ltimestamp <= conn->flush_rtp_timestamp)) { - debug( - 3, - "Dropping flushed packet in player_put_packet, seqno %u, timestamp %lld, flushing to " - "timestamp: %lld.", - seqno, ltimestamp, conn->flush_rtp_timestamp); - } else { - if ((conn->flush_rtp_timestamp != 0x0) && - (ltimestamp > - conn->flush_rtp_timestamp)) // if we have gone past the flush boundary time - conn->flush_rtp_timestamp = 0x0; - - abuf_t *abuf = 0; - - if (!conn->ab_synced) { - debug(3, "syncing to seqno %u.", seqno); - conn->ab_write = seqno; - conn->ab_read = seqno; - conn->ab_synced = 1; - } + if (!conn->ab_synced) { + debug(3, "syncing to seqno %u.", seqno); + conn->ab_write = seqno; + conn->ab_read = seqno; + conn->ab_synced = 1; + } - // here, we should check for missing frames - int resend_interval = (((250 * 44100) / 352) / 1000); // approximately 250 ms intervals - const int number_of_resend_attempts = 8; - int latency_based_resend_interval = - (conn->latency) / (number_of_resend_attempts * conn->max_frames_per_packet); - if (latency_based_resend_interval > resend_interval) - resend_interval = latency_based_resend_interval; - - if (conn->resend_interval != resend_interval) { - debug(2, "Resend interval for latency of %" PRId64 " frames is %d frames.", - conn->latency, resend_interval); - conn->resend_interval = resend_interval; - } + // here, we should check for missing frames + int resend_interval = (((250 * 44100) / 352) / 1000); // approximately 250 ms intervals + const int number_of_resend_attempts = 8; + int latency_based_resend_interval = + (conn->latency) / (number_of_resend_attempts * conn->max_frames_per_packet); + if (latency_based_resend_interval > resend_interval) + resend_interval = latency_based_resend_interval; + + if (conn->resend_interval != resend_interval) { + debug(2, "Resend interval for latency of %" PRId64 " frames is %d frames.", conn->latency, + resend_interval); + conn->resend_interval = resend_interval; + } - if (conn->ab_write == seqno) { // expected packet - abuf = conn->audio_buffer + BUFIDX(seqno); - conn->ab_write = SUCCESSOR(seqno); - } else if (seq_order(conn->ab_write, seqno, conn->ab_read)) { // newer than expected - // if (ORDINATE(seqno)>(BUFFER_FRAMES*7)/8) - // debug(1,"An interval of %u frames has opened, with ab_read: %u, ab_write: %u and - // seqno: - // %u.",seq_diff(ab_read,seqno),ab_read,ab_write,seqno); - int32_t gap = seq_diff(conn->ab_write, seqno, conn->ab_read); - if (gap <= 0) - debug(1, "Unexpected gap size: %d.", gap); - int i; - for (i = 0; i < gap; i++) { - abuf = conn->audio_buffer + BUFIDX(seq_sum(conn->ab_write, i)); - abuf->ready = 0; // to be sure, to be sure - abuf->resend_level = 0; - abuf->timestamp = 0; - abuf->given_timestamp = 0; - abuf->sequence_number = 0; - } - // debug(1,"N %d s %u.",seq_diff(ab_write,PREDECESSOR(seqno))+1,ab_write); - abuf = conn->audio_buffer + BUFIDX(seqno); - // rtp_request_resend(ab_write, gap); - // resend_requests++; - conn->ab_write = SUCCESSOR(seqno); - } else if (seq_order(conn->ab_read, seqno, conn->ab_read)) { // late but not yet played - conn->late_packets++; - abuf = conn->audio_buffer + BUFIDX(seqno); - /* - if (abuf->ready) - debug(1,"Late apparently duplicate packet received that is %d packets - late.",seq_diff(seqno, conn->ab_write, conn->ab_read)); - else - debug(1,"Late packet received that is %d packets late.",seq_diff(seqno, - conn->ab_write, conn->ab_read)); - */ - } else { // too late. + if (conn->ab_write == seqno) { // expected packet + abuf = conn->audio_buffer + BUFIDX(seqno); + conn->ab_write = SUCCESSOR(seqno); + } else if (seq_order(conn->ab_write, seqno, conn->ab_read)) { // newer than expected + // if (ORDINATE(seqno)>(BUFFER_FRAMES*7)/8) + // debug(1,"An interval of %u frames has opened, with ab_read: %u, ab_write: %u and + // seqno: + // %u.",seq_diff(ab_read,seqno),ab_read,ab_write,seqno); + int32_t gap = seq_diff(conn->ab_write, seqno, conn->ab_read); + if (gap <= 0) + debug(1, "Unexpected gap size: %d.", gap); + int i; + for (i = 0; i < gap; i++) { + abuf = conn->audio_buffer + BUFIDX(seq_sum(conn->ab_write, i)); + abuf->ready = 0; // to be sure, to be sure + abuf->resend_level = 0; + abuf->timestamp = 0; + abuf->given_timestamp = 0; + abuf->sequence_number = 0; + } + // debug(1,"N %d s %u.",seq_diff(ab_write,PREDECESSOR(seqno))+1,ab_write); + abuf = conn->audio_buffer + BUFIDX(seqno); + // rtp_request_resend(ab_write, gap); + // resend_requests++; + conn->ab_write = SUCCESSOR(seqno); + } else if (seq_order(conn->ab_read, seqno, conn->ab_read)) { // late but not yet played + conn->late_packets++; + abuf = conn->audio_buffer + BUFIDX(seqno); + /* + if (abuf->ready) + debug(1,"Late apparently duplicate packet received that is %d packets + late.",seq_diff(seqno, conn->ab_write, conn->ab_read)); + else + debug(1,"Late packet received that is %d packets late.",seq_diff(seqno, + conn->ab_write, conn->ab_read)); + */ + } else { // too late. - // debug(1,"Too late packet received that is %d packets late.",seq_diff(seqno, - // conn->ab_write, conn->ab_read)); - conn->too_late_packets++; - } - // pthread_mutex_unlock(&ab_mutex); - - if (abuf) { - int datalen = conn->max_frames_per_packet; - if (alac_decode(abuf->data, &datalen, data, len, conn) == 0) { - abuf->ready = 1; - abuf->length = datalen; - abuf->timestamp = ltimestamp; - abuf->given_timestamp = actual_timestamp; - abuf->sequence_number = seqno; - } else { - debug(1, "Bad audio packet detected and discarded."); - abuf->ready = 0; - abuf->resend_level = 0; - abuf->timestamp = 0; - abuf->given_timestamp = 0; - abuf->sequence_number = 0; - } - } + // debug(1,"Too late packet received that is %d packets late.",seq_diff(seqno, + // conn->ab_write, conn->ab_read)); + conn->too_late_packets++; + } + // pthread_mutex_unlock(&ab_mutex); + + if (abuf) { + int datalen = conn->max_frames_per_packet; + if (alac_decode(abuf->data, &datalen, data, len, conn) == 0) { + abuf->ready = 1; + abuf->length = datalen; + abuf->timestamp = ltimestamp; + abuf->given_timestamp = actual_timestamp; + abuf->sequence_number = seqno; + } else { + debug(1, "Bad audio packet detected and discarded."); + abuf->ready = 0; + abuf->resend_level = 0; + abuf->timestamp = 0; + abuf->given_timestamp = 0; + abuf->sequence_number = 0; + } + } - // pthread_mutex_lock(&ab_mutex); - int rc = pthread_cond_signal(&conn->flowcontrol); - if (rc) - debug(1, "Error signalling flowcontrol."); - - // if it's at the expected time, do a look back for missing packets - // but release the ab_mutex when doing a resend - if (!conn->ab_buffering) { - int j; - for (j = 1; j <= number_of_resend_attempts; j++) { - // check j times, after a short period of has elapsed, assuming 352 frames per packet - // the higher the step_exponent, the less it will try. 1 means it will try very - // hard. 2.0 seems good. - float step_exponent = 2.0; - int back_step = (int)(resend_interval * pow(j, step_exponent)); - int k; - for (k = -1; k <= 1; k++) { - if ((back_step + k) < - seq_diff(conn->ab_read, conn->ab_write, - conn->ab_read)) { // if it's within the range of frames in use... - int item_to_check = (conn->ab_write - (back_step + k)) & 0xffff; - seq_t next = item_to_check; - abuf_t *check_buf = conn->audio_buffer + BUFIDX(next); - if ((!check_buf->ready) && - (check_buf->resend_level < - j)) { // prevent multiple requests from the same level of lookback - check_buf->resend_level = j; - if (config.disable_resend_requests == 0) { - if (((int)(resend_interval * pow(j + 1, step_exponent)) + k) >= - seq_diff(conn->ab_read, conn->ab_write, conn->ab_read)) - debug(3, - "Last-ditch (#%d) resend request for packet %u in range %u to %u. " - "Looking back %d packets.", - j, next, conn->ab_read, conn->ab_write, back_step + k); - debug_mutex_unlock(&conn->ab_mutex, 3); - rtp_request_resend(next, 1, conn); - conn->resend_requests++; - debug_mutex_lock(&conn->ab_mutex, 20000, 1); - } - } + // pthread_mutex_lock(&ab_mutex); + int rc = pthread_cond_signal(&conn->flowcontrol); + if (rc) + debug(1, "Error signalling flowcontrol."); + + // if it's at the expected time, do a look back for missing packets + // but release the ab_mutex when doing a resend + if (!conn->ab_buffering) { + int j; + for (j = 1; j <= number_of_resend_attempts; j++) { + // check j times, after a short period of has elapsed, assuming 352 frames per packet + // the higher the step_exponent, the less it will try. 1 means it will try very + // hard. 2.0 seems good. + float step_exponent = 2.0; + int back_step = (int)(resend_interval * pow(j, step_exponent)); + int k; + for (k = -1; k <= 1; k++) { + if ((back_step + k) < + seq_diff(conn->ab_read, conn->ab_write, + conn->ab_read)) { // if it's within the range of frames in use... + int item_to_check = (conn->ab_write - (back_step + k)) & 0xffff; + seq_t next = item_to_check; + abuf_t *check_buf = conn->audio_buffer + BUFIDX(next); + if ((!check_buf->ready) && + (check_buf->resend_level < + j)) { // prevent multiple requests from the same level of lookback + check_buf->resend_level = j; + if (config.disable_resend_requests == 0) { + if (((int)(resend_interval * pow(j + 1, step_exponent)) + k) >= + seq_diff(conn->ab_read, conn->ab_write, conn->ab_read)) + debug(3, "Last-ditch (#%d) resend request for packet %u in range %u to %u. " + "Looking back %d packets.", + j, next, conn->ab_read, conn->ab_write, back_step + k); + debug_mutex_unlock(&conn->ab_mutex, 3); + rtp_request_resend(next, 1, conn); + conn->resend_requests++; + debug_mutex_lock(&conn->ab_mutex, 20000, 1); } } } } } } - debug_mutex_unlock(&conn->ab_mutex, 3); - } else { - debug(1, "player_put_packet discarded packet %d because the player thread was gone."); } - pthread_cleanup_pop(1); - // pthread_rwlock_unlock(&conn->player_thread_lock); - } else { - debug(1, "player_put_packet discarded packet %d because the player thread was locked.", seqno); } + debug_mutex_unlock(&conn->ab_mutex, 3); } int32_t rand_in_range(int32_t exclusive_range_limit) { @@ -820,9 +798,8 @@ static abuf_t *buffer_get_frame(rtsp_conn_info *conn) { // if (conn->packet_count>500) { //for testing -- about 4 seconds of play first if ((local_time_now > conn->time_of_last_audio_packet) && (local_time_now - conn->time_of_last_audio_packet >= ct << 32)) { - debug(1, - "As Yeats almost said, \"Too long a silence / can make a stone of the heart\" " - "from RTSP conversation %d.", + debug(1, "As Yeats almost said, \"Too long a silence / can make a stone of the heart\" " + "from RTSP conversation %d.", conn->connection_number); conn->stop = 1; pthread_kill(conn->thread, SIGUSR1); @@ -1439,36 +1416,14 @@ typedef struct stats { // statistics for running averages int64_t sync_error, correction, drift; } stats_t; -void *player_thread_func(void *arg) { - // note, the thread will be started up with the player_thread_lock locked. You must release it - // quickly. - +void player_thread_cleanup_handler(void *arg) { + debug(1, "player_thread_cleanup_handler called"); rtsp_conn_info *conn = (rtsp_conn_info *)arg; +} - pthread_rwlock_wrlock(&conn->player_thread_lock); - - int rc = pthread_mutex_init(&conn->ab_mutex, NULL); - if (rc) - debug(1, "Error initialising ab_mutex."); - rc = pthread_mutex_init(&conn->flush_mutex, NULL); - if (rc) - debug(1, "Error initialising flush_mutex."); -// set the flowcontrol condition variable to wait on a monotonic clock -#ifdef COMPILE_FOR_LINUX_AND_FREEBSD_AND_CYGWIN_AND_OPENBSD - pthread_condattr_t attr; - pthread_condattr_init(&attr); - pthread_condattr_setclock(&attr, CLOCK_MONOTONIC); // can't do this in OS X, and don't need it. - rc = pthread_cond_init(&conn->flowcontrol, &attr); -#endif -#ifdef COMPILE_FOR_OSX - rc = pthread_cond_init(&conn->flowcontrol, NULL); -#endif - if (rc) - debug(1, "Error initialising flowcontrol condition variable."); - - pthread_rwlock_unlock(&conn->player_thread_lock); - - // it's safe now +void *player_thread_func(void *arg) { + pthread_cleanup_push(player_thread_cleanup_handler, arg); + rtsp_conn_info *conn = (rtsp_conn_info *)arg; conn->packet_count = 0; conn->previous_random_number = 0; @@ -1487,12 +1442,13 @@ void *player_thread_func(void *arg) { conn->latency = 88200; } - config.output->start(config.output_rate, config.output_format); + config.output->start(config.output_rate, config.output_format); // will need a corresponding stop init_decoder((int32_t *)&conn->stream.fmtp, - conn); // this sets up incoming rate, bit depth, channels - // must be after decoder init - init_buffer(conn); + conn); // this sets up incoming rate, bit depth, channels. + // No pthread cancellation point in here + // This must be after init_decoder + init_buffer(conn); // will need a corresponding deallocation if (conn->stream.encrypted) { #ifdef HAVE_LIBMBEDTLS @@ -1639,18 +1595,16 @@ void *player_thread_func(void *arg) { // if ((input_rate!=config.output_rate) || (input_bit_depth!=output_bit_depth)) { // debug(1,"Define tbuf of length // %d.",output_bytes_per_frame*(max_frames_per_packet*output_sample_ratio+max_frame_size_change)); - tbuf = malloc( - sizeof(int32_t) * 2 * - (conn->max_frames_per_packet * conn->output_sample_ratio + conn->max_frame_size_change)); + tbuf = malloc(sizeof(int32_t) * 2 * (conn->max_frames_per_packet * conn->output_sample_ratio + + conn->max_frame_size_change)); if (tbuf == NULL) die("Failed to allocate memory for the transition buffer."); sbuf = 0; // initialise this, because soxr stuffing might be chosen later - sbuf = malloc( - sizeof(int32_t) * 2 * - (conn->max_frames_per_packet * conn->output_sample_ratio + conn->max_frame_size_change)); + sbuf = malloc(sizeof(int32_t) * 2 * (conn->max_frames_per_packet * conn->output_sample_ratio + + conn->max_frame_size_change)); if (sbuf == NULL) debug(1, "Failed to allocate memory for the sbuf buffer."); @@ -1670,7 +1624,8 @@ void *player_thread_func(void *arg) { // stop looking elsewhere for DACP stuff #ifdef HAVE_DACP_CLIENT - set_dacp_server_information(conn); // this will start scanning when a port is registered by the + // this may have pthread cancellation points in it + set_dacp_server_information(conn); // this will start scanning when a port is registered by the // code initiated by the mdns_dacp_monitor #else // this is only used for compatability, if dacp stuff isn't enabled. @@ -1679,7 +1634,7 @@ void *player_thread_func(void *arg) { if (conn->dapo_private_storage) debug(1, "DACP monitor already initialised?"); else - conn->dapo_private_storage = mdns_dacp_monitor(conn->dacp_id); + conn->dapo_private_storage = mdns_dacp_monitor(conn->dacp_id); // ?? #endif conn->framesProcessedInThisEpoch = 0; @@ -1726,7 +1681,7 @@ void *player_thread_func(void *arg) { // set the default volume to whaterver it was before, as stored in the config airplay_volume debug(3, "Set initial volume to %f.", config.airplay_volume); - player_volume(config.airplay_volume, conn); + player_volume(config.airplay_volume, conn); // ?? int64_t frames_to_drop = 0; // debug(1, "Play begin"); while (!conn->player_thread_please_stop) { @@ -1752,9 +1707,8 @@ void *player_thread_func(void *arg) { } else { // the player may change the contents of the buffer, so it has to be zeroed each time; // might as well malloc and freee it locally - memset(silence, 0, - conn->output_bytes_per_frame * conn->max_frames_per_packet * - conn->output_sample_ratio); + memset(silence, 0, conn->output_bytes_per_frame * conn->max_frames_per_packet * + conn->output_sample_ratio); config.output->play(silence, conn->max_frames_per_packet * conn->output_sample_ratio); free(silence); } @@ -1774,9 +1728,8 @@ void *player_thread_func(void *arg) { } else { // the player may change the contents of the buffer, so it has to be zeroed each time; // might as well malloc and freee it locally - memset(silence, 0, - conn->output_bytes_per_frame * conn->max_frames_per_packet * - conn->output_sample_ratio); + memset(silence, 0, conn->output_bytes_per_frame * conn->max_frames_per_packet * + conn->output_sample_ratio); config.output->play(silence, conn->max_frames_per_packet * conn->output_sample_ratio); free(silence); } @@ -1923,9 +1876,8 @@ void *player_thread_func(void *arg) { SUCCESSOR(conn->last_seqno_read); // int32_t from seq_t, i.e. uint16_t, so okay. if (inframe->sequence_number != conn->last_seqno_read) { // seq_t, ei.e. uint16_t and int32_t, so okay - debug(2, - "Player: packets out of sequence: expected: %u, got: %u, with ab_read: %u " - "and ab_write: %u.", + debug(2, "Player: packets out of sequence: expected: %u, got: %u, with ab_read: %u " + "and ab_write: %u.", conn->last_seqno_read, inframe->sequence_number, conn->ab_read, conn->ab_write); conn->last_seqno_read = inframe->sequence_number; // reset warning... } @@ -2076,7 +2028,7 @@ void *player_thread_func(void *arg) { #ifdef CONFIG_CONVOLUTION || config.convolution #endif - ) { + ) { int32_t *tbuf32 = (int32_t *)tbuf; float fbuf_l[inbuflength]; float fbuf_r[inbuflength]; @@ -2278,10 +2230,10 @@ void *player_thread_func(void *arg) { "%*d", /* max buffer occupancy */ 9, /* should be 10, but there's an explicit space at the start to ensure alignment */ - 1000 * moving_average_sync_error / config.output_rate, 10, - moving_average_correction * 1000000 / (352 * conn->output_sample_ratio), 10, - moving_average_insertions_plus_deletions * 1000000 / - (352 * conn->output_sample_ratio), + 1000 * moving_average_sync_error / config.output_rate, + 10, moving_average_correction * 1000000 / (352 * conn->output_sample_ratio), + 10, moving_average_insertions_plus_deletions * 1000000 / + (352 * conn->output_sample_ratio), 12, play_number, 7, conn->missing_packets, 7, conn->late_packets, 7, conn->too_late_packets, 7, conn->resend_requests, 7, minimum_dac_queue_size, 5, minimum_buffer_occupancy, 5, maximum_buffer_occupancy); @@ -2297,10 +2249,11 @@ void *player_thread_func(void *arg) { "%*d", /* max buffer occupancy */ 9, /* should be 10, but there's an explicit space at the start to ensure alignment */ - 1000 * moving_average_sync_error / config.output_rate, 12, play_number, 7, - conn->missing_packets, 7, conn->late_packets, 7, conn->too_late_packets, 7, - conn->resend_requests, 7, minimum_dac_queue_size, 5, - minimum_buffer_occupancy, 5, maximum_buffer_occupancy); + 1000 * moving_average_sync_error / config.output_rate, + 12, play_number, 7, conn->missing_packets, 7, conn->late_packets, 7, + conn->too_late_packets, 7, conn->resend_requests, 7, + minimum_dac_queue_size, 5, minimum_buffer_occupancy, 5, + maximum_buffer_occupancy); } } else { inform(" %*.1f," /* Sync error in milliseconds */ @@ -2313,10 +2266,10 @@ void *player_thread_func(void *arg) { "%*d", /* max buffer occupancy */ 9, /* should be 10, but there's an explicit space at the start to ensure alignment */ - 1000 * moving_average_sync_error / config.output_rate, 12, play_number, 7, - conn->missing_packets, 7, conn->late_packets, 7, conn->too_late_packets, 7, - conn->resend_requests, 5, minimum_buffer_occupancy, 5, - maximum_buffer_occupancy); + 1000 * moving_average_sync_error / config.output_rate, + 12, play_number, 7, conn->missing_packets, 7, conn->late_packets, 7, + conn->too_late_packets, 7, conn->resend_requests, 5, + minimum_buffer_occupancy, 5, maximum_buffer_occupancy); } } else { inform("No frames received in the last sampling interval."); @@ -2381,16 +2334,6 @@ void *player_thread_func(void *arg) { free_audio_buffers(conn); terminate_decoders(conn); - // remove flow control and mutexes - rc = pthread_cond_destroy(&conn->flowcontrol); - if (rc) - debug(1, "Error destroying flowcontrol condition variable."); - rc = pthread_mutex_destroy(&conn->flush_mutex); - if (rc) - debug(1, "Error destroying flush_mutex variable."); - rc = pthread_mutex_destroy(&conn->ab_mutex); - if (rc) - debug(1, "Error destroying ab_mutex variable."); debug(2, "Connection %d: player thread terminated.", conn->connection_number); if (conn->dacp_id) { free(conn->dacp_id); @@ -2402,6 +2345,7 @@ void *player_thread_func(void *arg) { free(tbuf); if (sbuf) free(sbuf); + pthread_cleanup_pop(1); pthread_exit(NULL); } @@ -2640,14 +2584,16 @@ void player_volume(double airplay_volume, rtsp_conn_info *conn) { } void do_flush(int64_t timestamp, rtsp_conn_info *conn) { + debug(3, "Flush requested up to %u. It seems as if 0 is special.", timestamp); debug_mutex_lock(&conn->flush_mutex, 1000, 1); conn->flush_requested = 1; // if (timestamp!=0) conn->flush_rtp_timestamp = timestamp; // flush all packets up to (and including?) this - debug_mutex_unlock(&conn->flush_mutex, 3); conn->play_segment_reference_frame = 0; conn->play_number_after_flush = 0; + debug_mutex_unlock(&conn->flush_mutex, 3); + #ifdef CONFIG_METADATA // only send a flush metadata message if the first packet has been seen -- it's a bogus message // otherwise @@ -2656,20 +2602,13 @@ void do_flush(int64_t timestamp, rtsp_conn_info *conn) { send_ssnc_metadata('pfls', NULL, 0, 1); } #endif + debug(3, "Flush request made."); } void player_flush(int64_t timestamp, rtsp_conn_info *conn) { debug(3, "player_flush"); - if (pthread_rwlock_tryrdlock(&conn->player_thread_lock) == 0) { - if (conn->player_thread != NULL) - do_flush(timestamp, conn); - else - debug(1, "Flush requested when player thread is gone."); - pthread_rwlock_unlock(&conn->player_thread_lock); - } else { - debug(3, "Can't acquire a read lock for a flush -- ignored."); - } + do_flush(timestamp, conn); } int player_play(rtsp_conn_info *conn) { @@ -2694,8 +2633,6 @@ int player_play(rtsp_conn_info *conn) { int rc = pthread_attr_setstacksize(&tattr, size); if (rc) debug(1, "Error setting stack size for player_thread: %s", strerror(errno)); - - // hack alert -- the player thread itself releases the player_thread_lock rwlock as soon as it's // finished initialising. pthread_create(pt, &tattr, player_thread_func, (void *)conn); pthread_attr_destroy(&tattr); @@ -2704,8 +2641,6 @@ int player_play(rtsp_conn_info *conn) { int player_stop(rtsp_conn_info *conn) { debug(3, "player_stop"); - pthread_rwlock_wrlock(&conn->player_thread_lock); - debug(3, "player_thread_lock acquired"); if (conn->player_thread) { debug(3, "player_thread exists"); conn->player_thread_please_stop = 1; @@ -2715,7 +2650,6 @@ int player_stop(rtsp_conn_info *conn) { pthread_join(*conn->player_thread, NULL); free(conn->player_thread); conn->player_thread = NULL; - pthread_rwlock_unlock(&conn->player_thread_lock); #ifdef CONFIG_METADATA debug(2, "pend"); send_ssnc_metadata('pend', NULL, 0, 1); @@ -2723,8 +2657,7 @@ int player_stop(rtsp_conn_info *conn) { command_stop(); return 0; } else { - pthread_rwlock_unlock(&conn->player_thread_lock); - debug(3, "player thread of RTSP conversation %d is already deleted.", conn->connection_number); + debug(3, "Connection %d: player thread already deleted.", conn->connection_number); return -1; } } diff --git a/player.h b/player.h index 3274d648..4c12c803 100644 --- a/player.h +++ b/player.h @@ -85,7 +85,6 @@ typedef struct { // pthread_t *ptp; pthread_t *player_thread; - pthread_rwlock_t player_thread_lock; // used to control access by "outsiders" abuf_t audio_buffer[BUFFER_FRAMES]; int max_frames_per_packet, input_num_channels, input_bit_depth, input_rate; diff --git a/rtp.c b/rtp.c index 6a588b0b..c86d3fa0 100644 --- a/rtp.c +++ b/rtp.c @@ -68,9 +68,9 @@ void rtp_terminate(rtsp_conn_info *conn) { void rtp_audio_receiver_cleanup_handler(void *arg) { debug(3, "Audio Receiver Cleanup."); rtsp_conn_info *conn = (rtsp_conn_info *)arg; - debug(1,"shutdown audio socket."); - shutdown(conn->audio_socket,SHUT_RDWR); - debug(1,"close audio socket."); + debug(1, "shutdown audio socket."); + shutdown(conn->audio_socket, SHUT_RDWR); + debug(1, "close audio socket."); close(conn->audio_socket); debug(3, "Audio Receiver Cleanup Successful."); } @@ -191,9 +191,9 @@ void *rtp_audio_receiver(void *arg) { void rtp_control_handler_cleanup_handler(void *arg) { debug(3, "Control Receiver Cleanup."); rtsp_conn_info *conn = (rtsp_conn_info *)arg; - debug(1,"shutdown control socket."); - shutdown(conn->control_socket,SHUT_RDWR); - debug(1,"close control socket."); + debug(1, "shutdown control socket."); + shutdown(conn->control_socket, SHUT_RDWR); + debug(1, "close control socket."); close(conn->control_socket); debug(3, "Control Receiver Cleanup Successful."); } @@ -477,9 +477,9 @@ void rtp_timing_receiver_cleanup_handler(void *arg) { rtsp_conn_info *conn = (rtsp_conn_info *)arg; pthread_cancel(conn->timer_requester); pthread_join(conn->timer_requester, NULL); - debug(1,"shutdown timing socket."); - shutdown(conn->timing_socket,SHUT_RDWR); - debug(1,"close timing socket."); + debug(1, "shutdown timing socket."); + shutdown(conn->timing_socket, SHUT_RDWR); + debug(1, "close timing socket."); close(conn->timing_socket); debug(3, "Timing Receiver Cleanup Successful."); } diff --git a/rtsp.c b/rtsp.c index b66e8954..00a168ba 100644 --- a/rtsp.c +++ b/rtsp.c @@ -1887,11 +1887,25 @@ authenticate: static void *rtsp_conversation_thread_func(void *pconn) { rtsp_conn_info *conn = pconn; - // create the player thread lock. - int rwli = pthread_rwlock_init(&conn->player_thread_lock, NULL); - if (rwli != 0) - die("Error %d initialising player_thread_lock for conversation thread %d.", rwli, - conn->connection_number); + int rc = pthread_mutex_init(&conn->flush_mutex, NULL); + if (rc) + die("Connection %d: error %d initialising flush_mutex.", conn->connection_number, rc); + rc = pthread_mutex_init(&conn->ab_mutex, NULL); + if (rc) + die("Connection %d: error %d initialising ab_mutex.", conn->connection_number, rc); +// set the flowcontrol condition variable to wait on a monotonic clock +#ifdef COMPILE_FOR_LINUX_AND_FREEBSD_AND_CYGWIN_AND_OPENBSD + pthread_condattr_t attr; + pthread_condattr_init(&attr); + pthread_condattr_setclock(&attr, CLOCK_MONOTONIC); // can't do this in OS X, and don't need it. + rc = pthread_cond_init(&conn->flowcontrol, &attr); +#endif +#ifdef COMPILE_FOR_OSX + rc = pthread_cond_init(&conn->flowcontrol, NULL); +#endif + if (rc) + die("Connection %d: error %d initialising flow control condition variable.", + conn->connection_number, rc); rtp_initialise(conn); @@ -2005,12 +2019,17 @@ static void *rtsp_conversation_thread_func(void *pconn) { debug(2, "Connection %d: RTSP thread terminated.", conn->connection_number); conn->running = 0; - // release the player_thread_lock - int rwld = pthread_rwlock_destroy(&conn->player_thread_lock); - if (rwld) - debug(1, "Error %d destroying player_thread_lock for conversation thread %d.", rwld, - conn->connection_number); - + // remove flow control and mutexes + rc = pthread_cond_destroy(&conn->flowcontrol); + if (rc) + debug(1, "Connection %d: error %d destroying flow control condition variable.", + conn->connection_number, rc); + rc = pthread_mutex_destroy(&conn->ab_mutex); + if (rc) + debug(1, "Connection %d: error %d destroying ab_mutex.", conn->connection_number, rc); + rc = pthread_mutex_destroy(&conn->flush_mutex); + if (rc) + debug(1, "Connection %d: error %d destroying flush_mutex.", conn->connection_number, rc); pthread_exit(NULL); } diff --git a/shairport.c b/shairport.c index c5450ad7..92521107 100644 --- a/shairport.c +++ b/shairport.c @@ -95,20 +95,20 @@ #include #endif -static inline int config_set_lookup_bool(config_t* cfg, char* where, int* dst) { +static inline int config_set_lookup_bool(config_t *cfg, char *where, int *dst) { const char *str = 0; if (config_lookup_string(cfg, where, &str)) { - if (strcasecmp(str, "no") == 0){ - (*dst)=0; + if (strcasecmp(str, "no") == 0) { + (*dst) = 0; return 1; - }else if (strcasecmp(str, "yes") == 0){ - (*dst)=1; + } else if (strcasecmp(str, "yes") == 0) { + (*dst) = 1; return 1; - }else{ + } else { die("Invalid %s option choice \"%s\". It should be \"yes\" or \"no\"", where, str); return 0; } - }else{ + } else { return 0; } } @@ -404,7 +404,8 @@ int parse_options(int argc, char **argv) { config_set_lookup_bool(config.cfg, "sessioncontrol.daemonize_with_pid_file", &daemonisewith); /* Get the Just_Daemonize setting. */ - config_set_lookup_bool(config.cfg, "sessioncontrol.daemonize_without_pid_file", &daemonisewithout); + config_set_lookup_bool(config.cfg, "sessioncontrol.daemonize_without_pid_file", + &daemonisewithout); if ((daemonisewith) && (daemonisewithout)) die("Select either daemonize_with_pid_file or daemonize_without_pid_file -- you have " @@ -474,7 +475,8 @@ int parse_options(int argc, char **argv) { } /* Get the statistics setting. */ - if (!config_set_lookup_bool(config.cfg, "general.statistics", &(config.statistics_requested))) { + if (!config_set_lookup_bool(config.cfg, "general.statistics", + &(config.statistics_requested))) { warn("The \"general\" \"statistics\" setting is deprecated. Please use the \"diagnostics\" " "\"statistics\" setting instead."); } @@ -881,61 +883,60 @@ int parse_options(int argc, char **argv) { #endif #ifdef CONFIG_MQTT - int tmpval=0; - config_set_lookup_bool(config.cfg, "mqtt.enabled", &config.mqtt_enabled); - if(config.mqtt_enabled && !config.metadata_enabled){ - die("You need to have metadata enabled in order to use mqtt"); - } - if (config_lookup_string(config.cfg, "mqtt.hostname", &str)) { - config.mqtt_hostname = (char *)str; - //TODO: Document that, if this is false, whole mqtt func is disabled - } - if (config_lookup_int(config.cfg, "mqtt.port", &tmpval)) { - config.mqtt_port = tmpval; - }else{ - //TODO: Is this the correct way to set a default value? - config.mqtt_port = 1883; - } - - if (config_lookup_string(config.cfg, "mqtt.username", &str)) { - config.mqtt_username = (char *)str; - } - if (config_lookup_string(config.cfg, "mqtt.password", &str)) { - config.mqtt_password = (char *)str; - } - int capath=0; - if (config_lookup_string(config.cfg, "mqtt.capath", &str)) { - config.mqtt_capath = (char *)str; - capath=1; - } - if (config_lookup_string(config.cfg, "mqtt.cafile", &str)) { - if(capath) - die("Supply either mqtt cafile or mqtt capath -- you have supplied both!"); - config.mqtt_cafile = (char *)str; - } - int certkeynum=0; - if (config_lookup_string(config.cfg, "mqtt.certfile", &str)) { - config.mqtt_certfile = (char *)str; - certkeynum++; - } - if (config_lookup_string(config.cfg, "mqtt.keyfile", &str)) { - config.mqtt_keyfile = (char *)str; - certkeynum++; - } - if( certkeynum!=0 && certkeynum!=2){ - die("If you want to use TLS Client Authentication, you have to specify " - "mqtt.certfile AND mqtt.keyfile.\nYou have supplied only one of them.\n" - "If you do not want to use TLS Client Authentication, leave both empty." - ); - } - - if(config_lookup_string(config.cfg, "mqtt.topic", &str)){ - config.mqtt_topic = (char *)str; - } - config_set_lookup_bool(config.cfg, "mqtt.publish_raw", &config.mqtt_publish_raw); - config_set_lookup_bool(config.cfg, "mqtt.publish_parsed", &config.mqtt_publish_parsed); - config_set_lookup_bool(config.cfg, "mqtt.publish_cover", &config.mqtt_publish_cover); - config_set_lookup_bool(config.cfg, "mqtt.enable_remote", &config.mqtt_enable_remote); + int tmpval = 0; + config_set_lookup_bool(config.cfg, "mqtt.enabled", &config.mqtt_enabled); + if (config.mqtt_enabled && !config.metadata_enabled) { + die("You need to have metadata enabled in order to use mqtt"); + } + if (config_lookup_string(config.cfg, "mqtt.hostname", &str)) { + config.mqtt_hostname = (char *)str; + // TODO: Document that, if this is false, whole mqtt func is disabled + } + if (config_lookup_int(config.cfg, "mqtt.port", &tmpval)) { + config.mqtt_port = tmpval; + } else { + // TODO: Is this the correct way to set a default value? + config.mqtt_port = 1883; + } + + if (config_lookup_string(config.cfg, "mqtt.username", &str)) { + config.mqtt_username = (char *)str; + } + if (config_lookup_string(config.cfg, "mqtt.password", &str)) { + config.mqtt_password = (char *)str; + } + int capath = 0; + if (config_lookup_string(config.cfg, "mqtt.capath", &str)) { + config.mqtt_capath = (char *)str; + capath = 1; + } + if (config_lookup_string(config.cfg, "mqtt.cafile", &str)) { + if (capath) + die("Supply either mqtt cafile or mqtt capath -- you have supplied both!"); + config.mqtt_cafile = (char *)str; + } + int certkeynum = 0; + if (config_lookup_string(config.cfg, "mqtt.certfile", &str)) { + config.mqtt_certfile = (char *)str; + certkeynum++; + } + if (config_lookup_string(config.cfg, "mqtt.keyfile", &str)) { + config.mqtt_keyfile = (char *)str; + certkeynum++; + } + if (certkeynum != 0 && certkeynum != 2) { + die("If you want to use TLS Client Authentication, you have to specify " + "mqtt.certfile AND mqtt.keyfile.\nYou have supplied only one of them.\n" + "If you do not want to use TLS Client Authentication, leave both empty."); + } + + if (config_lookup_string(config.cfg, "mqtt.topic", &str)) { + config.mqtt_topic = (char *)str; + } + config_set_lookup_bool(config.cfg, "mqtt.publish_raw", &config.mqtt_publish_raw); + config_set_lookup_bool(config.cfg, "mqtt.publish_parsed", &config.mqtt_publish_parsed); + config_set_lookup_bool(config.cfg, "mqtt.publish_cover", &config.mqtt_publish_cover); + config_set_lookup_bool(config.cfg, "mqtt.enable_remote", &config.mqtt_enable_remote); #endif free(config_file_real_path); } @@ -1035,13 +1036,13 @@ int parse_options(int argc, char **argv) { free(i2); free(i3); free(vs); - + #ifdef CONFIG_MQTT // mqtt topic was not set. As we have the service name just now, set it - if(config.mqtt_topic==NULL){ - int topic_length=1+strlen(config.service_name)+1; - char* topic=malloc(topic_length+1); - snprintf(topic,topic_length,"/%s/",config.service_name); + if (config.mqtt_topic == NULL) { + int topic_length = 1 + strlen(config.service_name) + 1; + char *topic = malloc(topic_length + 1); + snprintf(topic, topic_length, "/%s/", config.service_name); config.mqtt_topic = topic; } #endif @@ -1251,7 +1252,7 @@ int main(int argc, char **argv) { daemon_pid_file_ident = daemon_log_ident = daemon_ident_from_argv0(argv[0]); daemon_pid_file_proc = pid_file_proc; - + /* Check if we are called with -D or --disconnectFromOutput parameter */ if (argc >= 2 && ((strcmp(argv[1], "-D") == 0) || (strcmp(argv[1], "--disconnectFromOutput") == 0))) { @@ -1577,7 +1578,7 @@ int main(int argc, char **argv) { #endif #ifdef HAVE_MQTT - if(config.mqtt_enabled){ + if (config.mqtt_enabled) { initialise_mqtt(); } #endif