]> git.ipfire.org Git - thirdparty/shairport-sync.git/commitdiff
Move ab_mutex, flow control CV and flush mutex initialisation and teardown out of...
authorMike Brady <mikebrady@eircom.net>
Sat, 14 Jul 2018 12:38:29 +0000 (13:38 +0100)
committerMike Brady <mikebrady@eircom.net>
Sat, 14 Jul 2018 12:38:29 +0000 (13:38 +0100)
12 files changed:
configure.ac
dbus-service.c
mdns_avahi.c
metadata_hub.h
mpris-service.c
mqtt.c
mqtt.h
player.c
player.h
rtp.c
rtsp.c
shairport.c

index f047fc15d4fcc143ccbe99c34e53bc04c3c65ef0..4ddb55f5c38a72a9faeaded92c0bd9b070bf51e4 100644 (file)
@@ -2,7 +2,7 @@
 # Process this file with autoconf to produce a configure script.
 
 AC_PREREQ([2.50])
-AC_INIT([shairport-sync], [3.2d67], [mikebrady@eircom.net])
+AC_INIT([shairport-sync], [3.2.2p0], [mikebrady@eircom.net])
 AM_INIT_AUTOMAKE
 AC_CONFIG_SRCDIR([shairport.c])
 AC_CONFIG_HEADERS([config.h])
index ebcd33718659406cf2650e81e1042e7986644938..14ace35b449d3fd9fb10e3e9288fa606db4e807a 100644 (file)
@@ -43,16 +43,15 @@ void dbus_metadata_watcher(struct metadata_bundle *argc, __attribute__((unused))
     shairport_sync_advanced_remote_control_set_available(shairportSyncAdvancedRemoteControlSkeleton,
                                                          FALSE);
   }
-  
+
   if (argc->progress_string) {
-       // debug(1, "Check progress string");
-               th = shairport_sync_remote_control_get_progress_string(
-                               shairportSyncRemoteControlSkeleton);
-               if ((th == NULL) || (strcasecmp(th, argc->progress_string) != 0)) {
-                       // debug(1, "Progress string should be changed");
-                       shairport_sync_remote_control_set_progress_string(
-                                       shairportSyncRemoteControlSkeleton, argc->progress_string);
-               }       
+    // debug(1, "Check progress string");
+    th = shairport_sync_remote_control_get_progress_string(shairportSyncRemoteControlSkeleton);
+    if ((th == NULL) || (strcasecmp(th, argc->progress_string) != 0)) {
+      // debug(1, "Progress string should be changed");
+      shairport_sync_remote_control_set_progress_string(shairportSyncRemoteControlSkeleton,
+                                                        argc->progress_string);
+    }
   }
 
   switch (argc->player_state) {
@@ -162,7 +161,8 @@ void dbus_metadata_watcher(struct metadata_bundle *argc, __attribute__((unused))
   if ((argc->track_metadata) && (argc->track_metadata->item_id)) {
     char trackidstring[128];
     // debug(1, "Set ID using mper ID: \"%u\".",argc->item_id);
-    snprintf(trackidstring, sizeof(trackidstring), "/org/gnome/ShairportSync/mper_%u", argc->track_metadata->item_id);
+    snprintf(trackidstring, sizeof(trackidstring), "/org/gnome/ShairportSync/mper_%u",
+             argc->track_metadata->item_id);
     GVariant *trackid = g_variant_new("o", trackidstring);
     g_variant_builder_add(dict_builder, "{sv}", "mpris:trackid", trackid);
   }
index 28aa9eca017a00669a9e88b1af2340be25d6af8b..c54f2df6103ec5ace0a06dc10fc85a8c1247aa98 100644 (file)
@@ -84,33 +84,33 @@ static void resolve_callback(AvahiServiceResolver *r, AVAHI_GCC_UNUSED AvahiIfIn
 
   /* Called whenever a service has been resolved successfully or timed out */
   switch (event) {
-    case AVAHI_RESOLVER_FAILURE:
-      debug(2, "(Resolver) Failed to resolve service '%s' of type '%s' in domain '%s': %s.", name,
-            type, domain, avahi_strerror(avahi_client_errno(avahi_service_resolver_get_client(r))));
-      break;
-    case AVAHI_RESOLVER_FOUND: {
-      //    char a[AVAHI_ADDRESS_STR_MAX], *t;
-      // debug(1, "Resolve callback: Service '%s' of type '%s' in domain '%s':", name, type, domain);
-      char *dacpid = strstr(name, "iTunes_Ctrl_");
-      if (dacpid) {
-        dacpid += strlen("iTunes_Ctrl_");
-        if (strcmp(dacpid, dbs->dacp_id) == 0) {
-          debug(3, "Client's DACP port: %u.", port);
-  #ifdef HAVE_DACP_CLIENT
-          dacp_monitor_port_update_callback(dacpid, port);
-  #endif
-  #ifdef CONFIG_METADATA
-          char portstring[20];
-          memset(portstring, 0, sizeof(portstring));
-          snprintf(portstring, sizeof(portstring), "%u", port);
-          send_ssnc_metadata('dapo', strdup(portstring), strlen(portstring), 0);
-  #endif
-        }
-      } else {
-        debug(1, "Resolve callback: Can't see a DACP string in a DACP Record!");
+  case AVAHI_RESOLVER_FAILURE:
+    debug(2, "(Resolver) Failed to resolve service '%s' of type '%s' in domain '%s': %s.", name,
+          type, domain, avahi_strerror(avahi_client_errno(avahi_service_resolver_get_client(r))));
+    break;
+  case AVAHI_RESOLVER_FOUND: {
+    //    char a[AVAHI_ADDRESS_STR_MAX], *t;
+    // debug(1, "Resolve callback: Service '%s' of type '%s' in domain '%s':", name, type, domain);
+    char *dacpid = strstr(name, "iTunes_Ctrl_");
+    if (dacpid) {
+      dacpid += strlen("iTunes_Ctrl_");
+      if (strcmp(dacpid, dbs->dacp_id) == 0) {
+        debug(3, "Client's DACP port: %u.", port);
+#ifdef HAVE_DACP_CLIENT
+        dacp_monitor_port_update_callback(dacpid, port);
+#endif
+#ifdef CONFIG_METADATA
+        char portstring[20];
+        memset(portstring, 0, sizeof(portstring));
+        snprintf(portstring, sizeof(portstring), "%u", port);
+        send_ssnc_metadata('dapo', strdup(portstring), strlen(portstring), 0);
+#endif
       }
+    } else {
+      debug(1, "Resolve callback: Can't see a DACP string in a DACP Record!");
     }
   }
+  }
   // debug(1,"service resolver freed by resolve_callback");
   check_avahi_response(1, avahi_service_resolver_free(r));
 }
@@ -164,47 +164,47 @@ static void register_service(AvahiClient *c);
 static void egroup_callback(AvahiEntryGroup *g, AvahiEntryGroupState state,
                             AVAHI_GCC_UNUSED void *userdata) {
   switch (state) {
-    case AVAHI_ENTRY_GROUP_ESTABLISHED:
-      /* The entry group has been established successfully */
-      debug(1, "avahi: service '%s' successfully added.", service_name);
-      break;
-
-    case AVAHI_ENTRY_GROUP_COLLISION: {
-      char *n;
-
-      /* A service name collision with a remote service
-       * happened. Let's pick a new name */
-      debug(1, "avahi name collision -- look for another");
-      n = avahi_alternative_service_name(service_name);
-      if (service_name)
-        avahi_free(service_name);
-      else
-        debug(1, "avahi attempt to free a NULL service name");
-      service_name = n;
+  case AVAHI_ENTRY_GROUP_ESTABLISHED:
+    /* The entry group has been established successfully */
+    debug(1, "avahi: service '%s' successfully added.", service_name);
+    break;
 
-      debug(2, "avahi: service name collision, renaming service to '%s'", service_name);
+  case AVAHI_ENTRY_GROUP_COLLISION: {
+    char *n;
 
-      /* And recreate the services */
-      register_service(avahi_entry_group_get_client(g));
-      break;
-    }
+    /* A service name collision with a remote service
+     * happened. Let's pick a new name */
+    debug(1, "avahi name collision -- look for another");
+    n = avahi_alternative_service_name(service_name);
+    if (service_name)
+      avahi_free(service_name);
+    else
+      debug(1, "avahi attempt to free a NULL service name");
+    service_name = n;
 
-    case AVAHI_ENTRY_GROUP_FAILURE:
-      debug(1, "avahi: entry group failure: %s",
-            avahi_strerror(avahi_client_errno(avahi_entry_group_get_client(g))));
-      break;
+    debug(2, "avahi: service name collision, renaming service to '%s'", service_name);
 
-    case AVAHI_ENTRY_GROUP_UNCOMMITED:
-      debug(2, "avahi: service '%s' group is not yet committed.", service_name);
-      break;
+    /* And recreate the services */
+    register_service(avahi_entry_group_get_client(g));
+    break;
+  }
 
-    case AVAHI_ENTRY_GROUP_REGISTERING:
-      debug(2, "avahi: service '%s' group is registering.", service_name);
-      break;
+  case AVAHI_ENTRY_GROUP_FAILURE:
+    debug(1, "avahi: entry group failure: %s",
+          avahi_strerror(avahi_client_errno(avahi_entry_group_get_client(g))));
+    break;
 
-    default:
-      debug(1, "avahi: unhandled egroup state: %d", state);
-      break;
+  case AVAHI_ENTRY_GROUP_UNCOMMITED:
+    debug(2, "avahi: service '%s' group is not yet committed.", service_name);
+    break;
+
+  case AVAHI_ENTRY_GROUP_REGISTERING:
+    debug(2, "avahi: service '%s' group is registering.", service_name);
+    break;
+
+  default:
+    debug(1, "avahi: unhandled egroup state: %d", state);
+    break;
   }
 }
 
@@ -464,7 +464,7 @@ void *avahi_dacp_monitor(char *dacp_id) {
 }
 
 void avahi_dacp_dont_monitor(void *userdata) {
-  debug(3,"avahi_dacp_dont_monitor");
+  debug(3, "avahi_dacp_dont_monitor");
   if (userdata) {
     dacp_browser_struct *dbs = (dacp_browser_struct *)userdata;
     // stop and dispose of everything
@@ -472,13 +472,13 @@ void avahi_dacp_dont_monitor(void *userdata) {
       avahi_threaded_poll_stop((dbs)->service_poll);
     */
     if (dbs->service_poll) {
-      avahi_threaded_poll_stop(dbs->service_poll);     
+      avahi_threaded_poll_stop(dbs->service_poll);
       avahi_threaded_poll_lock(dbs->service_poll);
       if (dbs->service_browser)
         avahi_service_browser_free(dbs->service_browser);
       if (dbs->service_client)
         avahi_client_free(dbs->service_client);
-      avahi_threaded_poll_unlock(dbs->service_poll); 
+      avahi_threaded_poll_unlock(dbs->service_poll);
       avahi_threaded_poll_free(dbs->service_poll);
     }
     free(dbs->dacp_id);
@@ -487,7 +487,7 @@ void avahi_dacp_dont_monitor(void *userdata) {
   } else {
     debug(1, "Avahi DACP Monitor is not running.");
   }
-  debug(3,"avahi_dacp_dont_monitor exit");
+  debug(3, "avahi_dacp_dont_monitor exit");
 }
 
 mdns_backend mdns_avahi = {.name = "avahi",
index 66368f985fe511d00c491904e49c8d13b21f3d43..b2084c122da66bdd935808d60c7e5320d17ee561 100644 (file)
@@ -56,7 +56,7 @@ typedef struct metadata_bundle {
   char *client_ip; // IP number used by the audio source (i.e. the "client"), which is also the DACP
                    // server
   char *server_ip; // IP number used by Shairport Sync
-  char *progress_string; // progress string, emitted by the source from time to time
+  char *progress_string;    // progress string, emitted by the source from time to time
   int player_thread_active; // true if a play thread is running
   int dacp_server_active;   // true if there's a reachable DACP server (assumed to be the Airplay
                             // client) ; false otherwise
index 3d6cf96bc7f3a400a7c0b4ea444163a11f8b66c3..4901a819a61689a655171b5a0aa4e2a853ff24a3 100644 (file)
@@ -123,7 +123,8 @@ void mpris_metadata_watcher(struct metadata_bundle *argc, __attribute__((unused)
   } else if ((argc->track_metadata) && (argc->track_metadata->item_id)) {
     char trackidstring[128];
     // debug(1, "Set ID using mper ID: \"%u\".",argc->item_id);
-    snprintf(trackidstring, sizeof(trackidstring), "/org/gnome/ShairportSync/mper_%u", argc->track_metadata->item_id);
+    snprintf(trackidstring, sizeof(trackidstring), "/org/gnome/ShairportSync/mper_%u",
+             argc->track_metadata->item_id);
     GVariant *trackid = g_variant_new("o", trackidstring);
     g_variant_builder_add(dict_builder, "{sv}", "mpris:trackid", trackid);
   }
@@ -291,7 +292,7 @@ static void on_mpris_name_lost(__attribute__((unused)) GDBusConnection *connecti
   //      name,(mpris_bus_type==G_BUS_TYPE_SESSION) ? "session" : "system");
   pid_t pid = getpid();
   char interface_name[256] = "";
-  snprintf(interface_name,  sizeof(interface_name), "org.mpris.MediaPlayer2.ShairportSync.i%d", pid);
+  snprintf(interface_name, sizeof(interface_name), "org.mpris.MediaPlayer2.ShairportSync.i%d", pid);
   GBusType mpris_bus_type = G_BUS_TYPE_SYSTEM;
   if (config.mpris_service_bus_type == DBT_session)
     mpris_bus_type = G_BUS_TYPE_SESSION;
diff --git a/mqtt.c b/mqtt.c
index c08a4318c5b2d8ee187d9ce5ac89db31484fcaf4..6d160e810f84531b22dbee9d8ea2cb9a2a117683 100644 (file)
--- a/mqtt.c
+++ b/mqtt.c
 #include "rtp.h"
 
 #include "dacp.h"
-#include <mosquitto.h>
 #include "mqtt.h"
+#include <mosquitto.h>
 
-//this holds the mosquitto client
+// this holds the mosquitto client
 struct mosquitto *global_mosq = NULL;
 char *topic = NULL;
 int connected = 0;
 
-//mosquitto logging
-void _cb_log( __attribute__((unused)) struct mosquitto *mosq, 
-              __attribute__((unused)) void *userdata, int level, const char *str){
-  switch(level){
-    case MOSQ_LOG_DEBUG:
-      debug(1, str);
-      break;
-    case MOSQ_LOG_INFO:
-      debug(2, str);
-      break;
-    case MOSQ_LOG_NOTICE:
-      debug(3, str);
-      break;
-    case MOSQ_LOG_WARNING:
-      inform(str);
-      break;
-    case MOSQ_LOG_ERR: {
-      die("MQTT: Error: %s\n", str);
-    }
+// mosquitto logging
+void _cb_log(__attribute__((unused)) struct mosquitto *mosq, __attribute__((unused)) void *userdata,
+             int level, const char *str) {
+  switch (level) {
+  case MOSQ_LOG_DEBUG:
+    debug(1, str);
+    break;
+  case MOSQ_LOG_INFO:
+    debug(2, str);
+    break;
+  case MOSQ_LOG_NOTICE:
+    debug(3, str);
+    break;
+  case MOSQ_LOG_WARNING:
+    inform(str);
+    break;
+  case MOSQ_LOG_ERR: {
+    die("MQTT: Error: %s\n", str);
+  }
   }
 }
 
-//mosquitto message handler
-void on_message( __attribute__((unused)) struct mosquitto* mosq, 
-                 __attribute__((unused)) void* userdata, const struct mosquitto_message* msg){
-  
-  //null-terminate the payload
-  char payload[msg->payloadlen+1];
-  memcpy(payload,msg->payload,msg->payloadlen);
-  payload[msg->payloadlen]=0;
-  
-  debug(1, "[MQTT]: received Message on topic %s: %s\n",msg->topic, payload);
-  
-  //All recognized commands
-  char* commands[] = {
-    "command", "beginff", "beginrew", "mutetoggle", "nextitem", "previtem", "pause",
-    "playpause", "play", "stop", "playresume", "shuffle_songs", "volumedown", "volumeup",
-    NULL};
-    
-  int it=0;
-  
-  //send command if it's a valid one
-  while(commands[it++]!=NULL){
-    if( (size_t)msg->payloadlen>=strlen(commands[it]) && 
-      strncmp(msg->payload, commands[it], strlen(commands[it]))==0
-    ){
-      debug(1, "[MQTT]: DACP Command: %s\n",commands[it]);
+// mosquitto message handler
+void on_message(__attribute__((unused)) struct mosquitto *mosq,
+                __attribute__((unused)) void *userdata, const struct mosquitto_message *msg) {
+
+  // null-terminate the payload
+  char payload[msg->payloadlen + 1];
+  memcpy(payload, msg->payload, msg->payloadlen);
+  payload[msg->payloadlen] = 0;
+
+  debug(1, "[MQTT]: received Message on topic %s: %s\n", msg->topic, payload);
+
+  // All recognized commands
+  char *commands[] = {"command",    "beginff",       "beginrew",   "mutetoggle", "nextitem",
+                      "previtem",   "pause",         "playpause",  "play",       "stop",
+                      "playresume", "shuffle_songs", "volumedown", "volumeup",   NULL};
+
+  int it = 0;
+
+  // send command if it's a valid one
+  while (commands[it++] != NULL) {
+    if ((size_t)msg->payloadlen >= strlen(commands[it]) &&
+        strncmp(msg->payload, commands[it], strlen(commands[it])) == 0) {
+      debug(1, "[MQTT]: DACP Command: %s\n", commands[it]);
       send_simple_dacp_command(commands[it]);
       break;
     }
   }
 }
 
-void on_disconnect( __attribute__((unused)) struct mosquitto* mosq, 
-                    __attribute__((unused)) void* userdata, 
-                    __attribute__((unused)) int rc){
+void on_disconnect(__attribute__((unused)) struct mosquitto *mosq,
+                   __attribute__((unused)) void *userdata, __attribute__((unused)) int rc) {
   connected = 0;
   debug(1, "[MQTT]: disconnected");
 }
 
-void on_connect(struct mosquitto* mosq, 
-                __attribute__((unused)) void* userdata, 
-                __attribute__((unused)) int rc){
+void on_connect(struct mosquitto *mosq, __attribute__((unused)) void *userdata,
+                __attribute__((unused)) int rc) {
   connected = 1;
   debug(1, "[MQTT]: connected");
-  
-  //subscribe if requested
-  if(config.mqtt_enable_remote){
-    char remotetopic[strlen(config.mqtt_topic)+8];
-    snprintf(remotetopic,strlen(config.mqtt_topic)+8,"%s/remote",config.mqtt_topic);
-    mosquitto_subscribe(mosq,NULL,remotetopic,0);
+
+  // subscribe if requested
+  if (config.mqtt_enable_remote) {
+    char remotetopic[strlen(config.mqtt_topic) + 8];
+    snprintf(remotetopic, strlen(config.mqtt_topic) + 8, "%s/remote", config.mqtt_topic);
+    mosquitto_subscribe(mosq, NULL, remotetopic, 0);
   }
 }
 
-//helper function to publish under a topic and automatically append the main topic
-void mqtt_publish(char* topic, char* data, uint32_t length){
-  char fulltopic[strlen(config.mqtt_topic)+strlen(topic)+3];
-  snprintf(fulltopic, strlen(config.mqtt_topic)+strlen(topic)+2, "%s/%s", config.mqtt_topic, topic);
-  debug(1, "[MQTT]: publishing under %s",fulltopic);
-  
+// helper function to publish under a topic and automatically append the main topic
+void mqtt_publish(char *topic, char *data, uint32_t length) {
+  char fulltopic[strlen(config.mqtt_topic) + strlen(topic) + 3];
+  snprintf(fulltopic, strlen(config.mqtt_topic) + strlen(topic) + 2, "%s/%s", config.mqtt_topic,
+           topic);
+  debug(1, "[MQTT]: publishing under %s", fulltopic);
+
   int rc;
-  if((rc=mosquitto_publish(global_mosq, NULL, fulltopic, length, data, 0, 0))!=MOSQ_ERR_SUCCESS) {
-    switch(rc){
-      case MOSQ_ERR_NO_CONN:
-        debug(1, "[MQTT]: Publish failed: not connected to broker");
-        break;
-      default:
-        debug(1, "[MQTT]: Publish failed: unknown error");
-        break;
+  if ((rc = mosquitto_publish(global_mosq, NULL, fulltopic, length, data, 0, 0)) !=
+      MOSQ_ERR_SUCCESS) {
+    switch (rc) {
+    case MOSQ_ERR_NO_CONN:
+      debug(1, "[MQTT]: Publish failed: not connected to broker");
+      break;
+    default:
+      debug(1, "[MQTT]: Publish failed: unknown error");
+      break;
     }
   }
 }
 
-//handler for incoming metadata
-void mqtt_process_metadata(uint32_t type, uint32_t code, char *data, uint32_t length){
-  if(global_mosq==NULL || connected!=1){
+// handler for incoming metadata
+void mqtt_process_metadata(uint32_t type, uint32_t code, char *data, uint32_t length) {
+  if (global_mosq == NULL || connected != 1) {
     debug(3, "[MQTT]: Client not connected, skipping metadata handling");
     return;
   }
-  if(config.mqtt_publish_raw){
+  if (config.mqtt_publish_raw) {
     uint32_t val;
     char topic[] = "____/____";
-    
-    val=htonl(type);
-    memcpy(topic,&val, 4);
-    val=htonl(code);
-    memcpy(topic+5,&val, 4);
+
+    val = htonl(type);
+    memcpy(topic, &val, 4);
+    val = htonl(code);
+    memcpy(topic + 5, &val, 4);
     mqtt_publish(topic, data, length);
   }
-  if(config.mqtt_publish_parsed){
-    if(type=='core'){
+  if (config.mqtt_publish_parsed) {
+    if (type == 'core') {
       switch (code) {
-        case 'asar':
-          mqtt_publish("artist", data, length);
-          break;
-        case 'asal':
-          mqtt_publish("album", data, length);
-          break;
-        case 'minm':
-          mqtt_publish("title", data, length);
-          break;
-        case 'asgn':
-          mqtt_publish("genre", data, length);
-          break;
-        case 'asfm':
-          mqtt_publish("format", data, length);
-          break;
+      case 'asar':
+        mqtt_publish("artist", data, length);
+        break;
+      case 'asal':
+        mqtt_publish("album", data, length);
+        break;
+      case 'minm':
+        mqtt_publish("title", data, length);
+        break;
+      case 'asgn':
+        mqtt_publish("genre", data, length);
+        break;
+      case 'asfm':
+        mqtt_publish("format", data, length);
+        break;
       }
-    }else if(type=='ssnc'){
+    } else if (type == 'ssnc') {
       switch (code) {
-        case 'asal':
-          mqtt_publish("songalbum", data, length);
-          break;
-        case 'pvol':
-          mqtt_publish("volume", data, length);
-          break;
-        case 'clip':
-          mqtt_publish("client_ip", data, length);
-          break;
-        case 'pbeg':
-          mqtt_publish("play_start", data, length);
-          break;
-        case 'pend':
-          mqtt_publish("play_end", data, length);
-          break;
-        case 'pfls':
-          mqtt_publish("play_flush", data, length);
-          break;
-        case 'prsm':
-          mqtt_publish("play_resume", data, length);
-          break;
-        case 'PICT':
-          if(config.mqtt_publish_parsed){
-            mqtt_publish("cover", data, length);
-          }
-          break;
+      case 'asal':
+        mqtt_publish("songalbum", data, length);
+        break;
+      case 'pvol':
+        mqtt_publish("volume", data, length);
+        break;
+      case 'clip':
+        mqtt_publish("client_ip", data, length);
+        break;
+      case 'pbeg':
+        mqtt_publish("play_start", data, length);
+        break;
+      case 'pend':
+        mqtt_publish("play_end", data, length);
+        break;
+      case 'pfls':
+        mqtt_publish("play_flush", data, length);
+        break;
+      case 'prsm':
+        mqtt_publish("play_resume", data, length);
+        break;
+      case 'PICT':
+        if (config.mqtt_publish_parsed) {
+          mqtt_publish("cover", data, length);
+        }
+        break;
       }
     }
   }
@@ -182,50 +180,44 @@ void mqtt_process_metadata(uint32_t type, uint32_t code, char *data, uint32_t le
   return;
 }
 
-
 int initialise_mqtt() {
   debug(1, "Initialising MQTT");
-  if(config.mqtt_hostname==NULL){
+  if (config.mqtt_hostname == NULL) {
     debug(1, "[MQTT]: Not initialized, as the hostname is not set");
     return 0;
   }
   int keepalive = 60;
   mosquitto_lib_init();
-  if( !(global_mosq = mosquitto_new(config.service_name, true, NULL)) ){
+  if (!(global_mosq = mosquitto_new(config.service_name, true, NULL))) {
     die("[MQTT]: FATAL: Could not create mosquitto object! %d\n", global_mosq);
   }
 
-  if(
-    config.mqtt_cafile != NULL ||
-    config.mqtt_capath != NULL ||
-    config.mqtt_certfile != NULL ||
-    config.mqtt_keyfile != NULL
-  ){
-    if(mosquitto_tls_set(global_mosq,config.mqtt_cafile, config.mqtt_capath, config.mqtt_certfile, config.mqtt_keyfile, NULL) != MOSQ_ERR_SUCCESS) {
+  if (config.mqtt_cafile != NULL || config.mqtt_capath != NULL || config.mqtt_certfile != NULL ||
+      config.mqtt_keyfile != NULL) {
+    if (mosquitto_tls_set(global_mosq, config.mqtt_cafile, config.mqtt_capath, config.mqtt_certfile,
+                          config.mqtt_keyfile, NULL) != MOSQ_ERR_SUCCESS) {
       die("[MQTT]: TLS Setup failed");
     }
   }
 
-  if(
-    config.mqtt_username != NULL ||
-    config.mqtt_password != NULL
-  ){
-    if(mosquitto_username_pw_set(global_mosq,config.mqtt_username,config.mqtt_password) != MOSQ_ERR_SUCCESS) {
+  if (config.mqtt_username != NULL || config.mqtt_password != NULL) {
+    if (mosquitto_username_pw_set(global_mosq, config.mqtt_username, config.mqtt_password) !=
+        MOSQ_ERR_SUCCESS) {
       die("[MQTT]: Username/Password set failed");
     }
   }
   mosquitto_log_callback_set(global_mosq, _cb_log);
-  
-  if(config.mqtt_enable_remote){
+
+  if (config.mqtt_enable_remote) {
     mosquitto_message_callback_set(global_mosq, on_message);
   }
-  
+
   mosquitto_disconnect_callback_set(global_mosq, on_disconnect);
   mosquitto_connect_callback_set(global_mosq, on_connect);
-  if(mosquitto_connect(global_mosq, config.mqtt_hostname, config.mqtt_port, keepalive)){
+  if (mosquitto_connect(global_mosq, config.mqtt_hostname, config.mqtt_port, keepalive)) {
     inform("[MQTT]: Could not establish a mqtt connection");
   }
-  if(mosquitto_loop_start(global_mosq) != MOSQ_ERR_SUCCESS){
+  if (mosquitto_loop_start(global_mosq) != MOSQ_ERR_SUCCESS) {
     inform("[MQTT]: Could start MQTT Main loop");
   }
 
diff --git a/mqtt.h b/mqtt.h
index b5ec5ab05e121912dcf6b48655c61a9a6812c947..4bf680edd2f559112bd73d8d6a67f3cfb95ef1bd 100644 (file)
--- a/mqtt.h
+++ b/mqtt.h
@@ -1,15 +1,14 @@
 #ifndef MQTT_H
 #define MQTT_H
-#include <stdint.h>
 #include <mosquitto.h>
-
+#include <stdint.h>
 
 int initialise_mqtt();
 void mqtt_process_metadata(uint32_t type, uint32_t code, char *data, uint32_t length);
-void mqtt_publish(char* topic, char* data, uint32_t length);
+void mqtt_publish(char *topic, char *data, uint32_t length);
 void mqtt_setup();
-void on_connect(struct mosquitto* mosq, void* userdata, int rc);
-void on_disconnect(struct mosquitto* mosq, void* userdata, int rc);
-void on_message(struct mosquitto* mosq, void* userdata, const struct mosquitto_message* msg);
+void on_connect(struct mosquitto *mosq, void *userdata, int rc);
+void on_disconnect(struct mosquitto *mosq, void *userdata, int rc);
+void on_message(struct mosquitto *mosq, void *userdata, const struct mosquitto_message *msg);
 void _cb_log(struct mosquitto *mosq, void *userdata, int level, const char *str);
 #endif /* #ifndef MQTT_H */
index 3c60e24deec916e508332f8fdbff3713e6ac83e5..2a659f6ee02a77e9cbde2f0d591a6a84ea5e479f 100644 (file)
--- a/player.c
+++ b/player.c
@@ -315,18 +315,16 @@ static int alac_decode(short *dest, int *destlen, uint8_t *buf, int len, rtsp_co
   }
 
   if (outsize > toutsize) {
-    debug(2,
-          "Output from alac_decode larger (%d bytes, not frames) than expected (%d bytes) -- "
-          "truncated, but buffer overflow possible! Encrypted = %d.",
+    debug(2, "Output from alac_decode larger (%d bytes, not frames) than expected (%d bytes) -- "
+             "truncated, but buffer overflow possible! Encrypted = %d.",
           outsize, toutsize, conn->stream.encrypted);
     reply = -1; // output packet is the wrong size
   }
 
   *destlen = outsize / conn->input_bytes_per_frame;
   if ((outsize % conn->input_bytes_per_frame) != 0)
-    debug(1,
-          "Number of audio frames (%d) does not correspond exactly to the number of bytes (%d) "
-          "and the audio frame size (%d).",
+    debug(1, "Number of audio frames (%d) does not correspond exactly to the number of bytes (%d) "
+             "and the audio frame size (%d).",
           *destlen, outsize, conn->input_bytes_per_frame);
   return reply;
 }
@@ -420,7 +418,8 @@ static int init_decoder(int32_t fmtp[12], rtsp_conn_info *conn) {
 
   conn->input_bytes_per_frame = conn->input_num_channels * ((conn->input_bit_depth + 7) / 8);
 
-  alac = alac_create(conn->input_bit_depth, conn->input_num_channels);
+  alac = alac_create(conn->input_bit_depth,
+                     conn->input_num_channels); // no pthread cancellation point in here
   if (!alac)
     return 1;
   conn->decoder_info = alac;
@@ -436,10 +435,10 @@ static int init_decoder(int32_t fmtp[12], rtsp_conn_info *conn) {
   alac->setinfo_82 = fmtp[9];
   alac->setinfo_86 = fmtp[10];
   alac->setinfo_8a_rate = fmtp[11];
-  alac_allocate_buffers(alac);
+  alac_allocate_buffers(alac); // no pthread cancellation point in here
 
 #ifdef HAVE_APPLE_ALAC
-  apple_alac_init(fmtp);
+  apple_alac_init(fmtp); // no pthread cancellation point in here
 #endif
 
   return 0;
@@ -465,193 +464,172 @@ static void free_audio_buffers(rtsp_conn_info *conn) {
     free(conn->audio_buffer[i].data);
 }
 
-void player_thread_lock_cleanup(void *arg) {
-  rtsp_conn_info *conn = (rtsp_conn_info *)arg;
-  debug(3, "Cleaning up player_thread_lock.");
-  pthread_rwlock_unlock(&conn->player_thread_lock);
-}
-
 void player_put_packet(seq_t seqno, uint32_t actual_timestamp, int64_t timestamp, uint8_t *data,
                        int len, rtsp_conn_info *conn) {
-  if (pthread_rwlock_tryrdlock(&conn->player_thread_lock) == 0) {
-    pthread_cleanup_push(player_thread_lock_cleanup, (void *)conn);
-    if (conn->player_thread != NULL) {
-
-      // all timestamps are done at the output rate
-      // the "actual_timestamp" is the one that comes in the packet, and is carried over for
-      // debugging
-      // and checking only.
+  // all timestamps are done at the output rate
+  // the "actual_timestamp" is the one that comes in the packet, and is carried over for
+  // debugging
+  // and checking only.
 
-      int64_t ltimestamp = timestamp * conn->output_sample_ratio;
+  int64_t ltimestamp = timestamp * conn->output_sample_ratio;
 
-      // ignore a request to flush that has been made before the first packet...
-      if (conn->packet_count == 0) {
-        debug_mutex_lock(&conn->flush_mutex, 1000, 1);
-        conn->flush_requested = 0;
-        conn->flush_rtp_timestamp = 0;
-        debug_mutex_unlock(&conn->flush_mutex, 3);
-      }
+  // ignore a request to flush that has been made before the first packet...
+  if (conn->packet_count == 0) {
+    debug_mutex_lock(&conn->flush_mutex, 1000, 1);
+    conn->flush_requested = 0;
+    conn->flush_rtp_timestamp = 0;
+    debug_mutex_unlock(&conn->flush_mutex, 3);
+  }
 
-      debug_mutex_lock(&conn->ab_mutex, 30000, 1);
-      conn->packet_count++;
-      conn->time_of_last_audio_packet = get_absolute_time_in_fp();
-      if (conn->connection_state_to_output) { // if we are supposed to be processing these packets
+  debug_mutex_lock(&conn->ab_mutex, 30000, 1);
+  conn->packet_count++;
+  conn->time_of_last_audio_packet = get_absolute_time_in_fp();
+  if (conn->connection_state_to_output) { // if we are supposed to be processing these packets
+
+    //    if (flush_rtp_timestamp != 0)
+    //         debug(1,"Flush_rtp_timestamp is %u",flush_rtp_timestamp);
+
+    if ((conn->flush_rtp_timestamp != 0) && (ltimestamp <= conn->flush_rtp_timestamp)) {
+      debug(3,
+            "Dropping flushed packet in player_put_packet, seqno %u, timestamp %lld, flushing to "
+            "timestamp: %lld.",
+            seqno, ltimestamp, conn->flush_rtp_timestamp);
+    } else {
+      if ((conn->flush_rtp_timestamp != 0x0) &&
+          (ltimestamp > conn->flush_rtp_timestamp)) // if we have gone past the flush boundary time
+        conn->flush_rtp_timestamp = 0x0;
 
-        //    if (flush_rtp_timestamp != 0)
-        //     debug(1,"Flush_rtp_timestamp is %u",flush_rtp_timestamp);
+      abuf_t *abuf = 0;
 
-        if ((conn->flush_rtp_timestamp != 0) && (ltimestamp <= conn->flush_rtp_timestamp)) {
-          debug(
-              3,
-              "Dropping flushed packet in player_put_packet, seqno %u, timestamp %lld, flushing to "
-              "timestamp: %lld.",
-              seqno, ltimestamp, conn->flush_rtp_timestamp);
-        } else {
-          if ((conn->flush_rtp_timestamp != 0x0) &&
-              (ltimestamp >
-               conn->flush_rtp_timestamp)) // if we have gone past the flush boundary time
-            conn->flush_rtp_timestamp = 0x0;
-
-          abuf_t *abuf = 0;
-
-          if (!conn->ab_synced) {
-            debug(3, "syncing to seqno %u.", seqno);
-            conn->ab_write = seqno;
-            conn->ab_read = seqno;
-            conn->ab_synced = 1;
-          }
+      if (!conn->ab_synced) {
+        debug(3, "syncing to seqno %u.", seqno);
+        conn->ab_write = seqno;
+        conn->ab_read = seqno;
+        conn->ab_synced = 1;
+      }
 
-          // here, we should check for missing frames
-          int resend_interval = (((250 * 44100) / 352) / 1000); // approximately 250 ms intervals
-          const int number_of_resend_attempts = 8;
-          int latency_based_resend_interval =
-              (conn->latency) / (number_of_resend_attempts * conn->max_frames_per_packet);
-          if (latency_based_resend_interval > resend_interval)
-            resend_interval = latency_based_resend_interval;
-
-          if (conn->resend_interval != resend_interval) {
-            debug(2, "Resend interval for latency of %" PRId64 " frames is %d frames.",
-                  conn->latency, resend_interval);
-            conn->resend_interval = resend_interval;
-          }
+      // here, we should check for missing frames
+      int resend_interval = (((250 * 44100) / 352) / 1000); // approximately 250 ms intervals
+      const int number_of_resend_attempts = 8;
+      int latency_based_resend_interval =
+          (conn->latency) / (number_of_resend_attempts * conn->max_frames_per_packet);
+      if (latency_based_resend_interval > resend_interval)
+        resend_interval = latency_based_resend_interval;
+
+      if (conn->resend_interval != resend_interval) {
+        debug(2, "Resend interval for latency of %" PRId64 " frames is %d frames.", conn->latency,
+              resend_interval);
+        conn->resend_interval = resend_interval;
+      }
 
-          if (conn->ab_write == seqno) { // expected packet
-            abuf = conn->audio_buffer + BUFIDX(seqno);
-            conn->ab_write = SUCCESSOR(seqno);
-          } else if (seq_order(conn->ab_write, seqno, conn->ab_read)) { // newer than expected
-            // if (ORDINATE(seqno)>(BUFFER_FRAMES*7)/8)
-            // debug(1,"An interval of %u frames has opened, with ab_read: %u, ab_write: %u and
-            // seqno:
-            // %u.",seq_diff(ab_read,seqno),ab_read,ab_write,seqno);
-            int32_t gap = seq_diff(conn->ab_write, seqno, conn->ab_read);
-            if (gap <= 0)
-              debug(1, "Unexpected gap size: %d.", gap);
-            int i;
-            for (i = 0; i < gap; i++) {
-              abuf = conn->audio_buffer + BUFIDX(seq_sum(conn->ab_write, i));
-              abuf->ready = 0; // to be sure, to be sure
-              abuf->resend_level = 0;
-              abuf->timestamp = 0;
-              abuf->given_timestamp = 0;
-              abuf->sequence_number = 0;
-            }
-            // debug(1,"N %d s %u.",seq_diff(ab_write,PREDECESSOR(seqno))+1,ab_write);
-            abuf = conn->audio_buffer + BUFIDX(seqno);
-            //        rtp_request_resend(ab_write, gap);
-            //        resend_requests++;
-            conn->ab_write = SUCCESSOR(seqno);
-          } else if (seq_order(conn->ab_read, seqno, conn->ab_read)) { // late but not yet played
-            conn->late_packets++;
-            abuf = conn->audio_buffer + BUFIDX(seqno);
-            /*
-            if (abuf->ready)
-                    debug(1,"Late apparently duplicate packet received that is %d packets
-            late.",seq_diff(seqno, conn->ab_write, conn->ab_read));
-            else
-                    debug(1,"Late packet received that is %d packets late.",seq_diff(seqno,
-            conn->ab_write, conn->ab_read));
-                  */
-          } else { // too late.
+      if (conn->ab_write == seqno) { // expected packet
+        abuf = conn->audio_buffer + BUFIDX(seqno);
+        conn->ab_write = SUCCESSOR(seqno);
+      } else if (seq_order(conn->ab_write, seqno, conn->ab_read)) { // newer than expected
+        // if (ORDINATE(seqno)>(BUFFER_FRAMES*7)/8)
+        // debug(1,"An interval of %u frames has opened, with ab_read: %u, ab_write: %u and
+        // seqno:
+        // %u.",seq_diff(ab_read,seqno),ab_read,ab_write,seqno);
+        int32_t gap = seq_diff(conn->ab_write, seqno, conn->ab_read);
+        if (gap <= 0)
+          debug(1, "Unexpected gap size: %d.", gap);
+        int i;
+        for (i = 0; i < gap; i++) {
+          abuf = conn->audio_buffer + BUFIDX(seq_sum(conn->ab_write, i));
+          abuf->ready = 0; // to be sure, to be sure
+          abuf->resend_level = 0;
+          abuf->timestamp = 0;
+          abuf->given_timestamp = 0;
+          abuf->sequence_number = 0;
+        }
+        // debug(1,"N %d s %u.",seq_diff(ab_write,PREDECESSOR(seqno))+1,ab_write);
+        abuf = conn->audio_buffer + BUFIDX(seqno);
+        //        rtp_request_resend(ab_write, gap);
+        //        resend_requests++;
+        conn->ab_write = SUCCESSOR(seqno);
+      } else if (seq_order(conn->ab_read, seqno, conn->ab_read)) { // late but not yet played
+        conn->late_packets++;
+        abuf = conn->audio_buffer + BUFIDX(seqno);
+        /*
+        if (abuf->ready)
+                debug(1,"Late apparently duplicate packet received that is %d packets
+        late.",seq_diff(seqno, conn->ab_write, conn->ab_read));
+        else
+                debug(1,"Late packet received that is %d packets late.",seq_diff(seqno,
+        conn->ab_write, conn->ab_read));
+              */
+      } else { // too late.
 
-            // debug(1,"Too late packet received that is %d packets late.",seq_diff(seqno,
-            // conn->ab_write, conn->ab_read));
-            conn->too_late_packets++;
-          }
-          // pthread_mutex_unlock(&ab_mutex);
-
-          if (abuf) {
-            int datalen = conn->max_frames_per_packet;
-            if (alac_decode(abuf->data, &datalen, data, len, conn) == 0) {
-              abuf->ready = 1;
-              abuf->length = datalen;
-              abuf->timestamp = ltimestamp;
-              abuf->given_timestamp = actual_timestamp;
-              abuf->sequence_number = seqno;
-            } else {
-              debug(1, "Bad audio packet detected and discarded.");
-              abuf->ready = 0;
-              abuf->resend_level = 0;
-              abuf->timestamp = 0;
-              abuf->given_timestamp = 0;
-              abuf->sequence_number = 0;
-            }
-          }
+        // debug(1,"Too late packet received that is %d packets late.",seq_diff(seqno,
+        // conn->ab_write, conn->ab_read));
+        conn->too_late_packets++;
+      }
+      // pthread_mutex_unlock(&ab_mutex);
+
+      if (abuf) {
+        int datalen = conn->max_frames_per_packet;
+        if (alac_decode(abuf->data, &datalen, data, len, conn) == 0) {
+          abuf->ready = 1;
+          abuf->length = datalen;
+          abuf->timestamp = ltimestamp;
+          abuf->given_timestamp = actual_timestamp;
+          abuf->sequence_number = seqno;
+        } else {
+          debug(1, "Bad audio packet detected and discarded.");
+          abuf->ready = 0;
+          abuf->resend_level = 0;
+          abuf->timestamp = 0;
+          abuf->given_timestamp = 0;
+          abuf->sequence_number = 0;
+        }
+      }
 
-          // pthread_mutex_lock(&ab_mutex);
-          int rc = pthread_cond_signal(&conn->flowcontrol);
-          if (rc)
-            debug(1, "Error signalling flowcontrol.");
-
-          // if it's at the expected time, do a look back for missing packets
-          // but release the ab_mutex when doing a resend
-          if (!conn->ab_buffering) {
-            int j;
-            for (j = 1; j <= number_of_resend_attempts; j++) {
-              // check j times, after a short period of has elapsed, assuming 352 frames per packet
-              // the higher the step_exponent, the less it will try. 1 means it will try very
-              // hard. 2.0 seems good.
-              float step_exponent = 2.0;
-              int back_step = (int)(resend_interval * pow(j, step_exponent));
-              int k;
-              for (k = -1; k <= 1; k++) {
-                if ((back_step + k) <
-                    seq_diff(conn->ab_read, conn->ab_write,
-                             conn->ab_read)) { // if it's within the range of frames in use...
-                  int item_to_check = (conn->ab_write - (back_step + k)) & 0xffff;
-                  seq_t next = item_to_check;
-                  abuf_t *check_buf = conn->audio_buffer + BUFIDX(next);
-                  if ((!check_buf->ready) &&
-                      (check_buf->resend_level <
-                       j)) { // prevent multiple requests from the same level of lookback
-                    check_buf->resend_level = j;
-                    if (config.disable_resend_requests == 0) {
-                      if (((int)(resend_interval * pow(j + 1, step_exponent)) + k) >=
-                          seq_diff(conn->ab_read, conn->ab_write, conn->ab_read))
-                        debug(3,
-                              "Last-ditch (#%d) resend request for packet %u in range %u to %u. "
-                              "Looking back %d packets.",
-                              j, next, conn->ab_read, conn->ab_write, back_step + k);
-                      debug_mutex_unlock(&conn->ab_mutex, 3);
-                      rtp_request_resend(next, 1, conn);
-                      conn->resend_requests++;
-                      debug_mutex_lock(&conn->ab_mutex, 20000, 1);
-                    }
-                  }
+      // pthread_mutex_lock(&ab_mutex);
+      int rc = pthread_cond_signal(&conn->flowcontrol);
+      if (rc)
+        debug(1, "Error signalling flowcontrol.");
+
+      // if it's at the expected time, do a look back for missing packets
+      // but release the ab_mutex when doing a resend
+      if (!conn->ab_buffering) {
+        int j;
+        for (j = 1; j <= number_of_resend_attempts; j++) {
+          // check j times, after a short period of has elapsed, assuming 352 frames per packet
+          // the higher the step_exponent, the less it will try. 1 means it will try very
+          // hard. 2.0 seems good.
+          float step_exponent = 2.0;
+          int back_step = (int)(resend_interval * pow(j, step_exponent));
+          int k;
+          for (k = -1; k <= 1; k++) {
+            if ((back_step + k) <
+                seq_diff(conn->ab_read, conn->ab_write,
+                         conn->ab_read)) { // if it's within the range of frames in use...
+              int item_to_check = (conn->ab_write - (back_step + k)) & 0xffff;
+              seq_t next = item_to_check;
+              abuf_t *check_buf = conn->audio_buffer + BUFIDX(next);
+              if ((!check_buf->ready) &&
+                  (check_buf->resend_level <
+                   j)) { // prevent multiple requests from the same level of lookback
+                check_buf->resend_level = j;
+                if (config.disable_resend_requests == 0) {
+                  if (((int)(resend_interval * pow(j + 1, step_exponent)) + k) >=
+                      seq_diff(conn->ab_read, conn->ab_write, conn->ab_read))
+                    debug(3, "Last-ditch (#%d) resend request for packet %u in range %u to %u. "
+                             "Looking back %d packets.",
+                          j, next, conn->ab_read, conn->ab_write, back_step + k);
+                  debug_mutex_unlock(&conn->ab_mutex, 3);
+                  rtp_request_resend(next, 1, conn);
+                  conn->resend_requests++;
+                  debug_mutex_lock(&conn->ab_mutex, 20000, 1);
                 }
               }
             }
           }
         }
       }
-      debug_mutex_unlock(&conn->ab_mutex, 3);
-    } else {
-      debug(1, "player_put_packet discarded packet %d because the player thread was gone.");
     }
-    pthread_cleanup_pop(1);
-    // pthread_rwlock_unlock(&conn->player_thread_lock);
-  } else {
-    debug(1, "player_put_packet discarded packet %d because the player thread was locked.", seqno);
   }
+  debug_mutex_unlock(&conn->ab_mutex, 3);
 }
 
 int32_t rand_in_range(int32_t exclusive_range_limit) {
@@ -820,9 +798,8 @@ static abuf_t *buffer_get_frame(rtsp_conn_info *conn) {
       //      if (conn->packet_count>500) { //for testing -- about 4 seconds of play first
       if ((local_time_now > conn->time_of_last_audio_packet) &&
           (local_time_now - conn->time_of_last_audio_packet >= ct << 32)) {
-        debug(1,
-              "As Yeats almost said, \"Too long a silence / can make a stone of the heart\" "
-              "from RTSP conversation %d.",
+        debug(1, "As Yeats almost said, \"Too long a silence / can make a stone of the heart\" "
+                 "from RTSP conversation %d.",
               conn->connection_number);
         conn->stop = 1;
         pthread_kill(conn->thread, SIGUSR1);
@@ -1439,36 +1416,14 @@ typedef struct stats { // statistics for running averages
   int64_t sync_error, correction, drift;
 } stats_t;
 
-void *player_thread_func(void *arg) {
-  // note, the thread will be started up with the player_thread_lock locked. You must release it
-  // quickly.
-
+void player_thread_cleanup_handler(void *arg) {
+  debug(1, "player_thread_cleanup_handler called");
   rtsp_conn_info *conn = (rtsp_conn_info *)arg;
+}
 
-  pthread_rwlock_wrlock(&conn->player_thread_lock);
-
-  int rc = pthread_mutex_init(&conn->ab_mutex, NULL);
-  if (rc)
-    debug(1, "Error initialising ab_mutex.");
-  rc = pthread_mutex_init(&conn->flush_mutex, NULL);
-  if (rc)
-    debug(1, "Error initialising flush_mutex.");
-// set the flowcontrol condition variable to wait on a monotonic clock
-#ifdef COMPILE_FOR_LINUX_AND_FREEBSD_AND_CYGWIN_AND_OPENBSD
-  pthread_condattr_t attr;
-  pthread_condattr_init(&attr);
-  pthread_condattr_setclock(&attr, CLOCK_MONOTONIC); // can't do this in OS X, and don't need it.
-  rc = pthread_cond_init(&conn->flowcontrol, &attr);
-#endif
-#ifdef COMPILE_FOR_OSX
-  rc = pthread_cond_init(&conn->flowcontrol, NULL);
-#endif
-  if (rc)
-    debug(1, "Error initialising flowcontrol condition variable.");
-
-  pthread_rwlock_unlock(&conn->player_thread_lock);
-
-  // it's safe now
+void *player_thread_func(void *arg) {
+  pthread_cleanup_push(player_thread_cleanup_handler, arg);
+  rtsp_conn_info *conn = (rtsp_conn_info *)arg;
 
   conn->packet_count = 0;
   conn->previous_random_number = 0;
@@ -1487,12 +1442,13 @@ void *player_thread_func(void *arg) {
     conn->latency = 88200;
   }
 
-  config.output->start(config.output_rate, config.output_format);
+  config.output->start(config.output_rate, config.output_format); // will need a corresponding stop
 
   init_decoder((int32_t *)&conn->stream.fmtp,
-               conn); // this sets up incoming rate, bit depth, channels
-  // must be after decoder init
-  init_buffer(conn);
+               conn); // this sets up incoming rate, bit depth, channels.
+                      // No pthread cancellation point in here
+  // This must be after init_decoder
+  init_buffer(conn); // will need a corresponding deallocation
 
   if (conn->stream.encrypted) {
 #ifdef HAVE_LIBMBEDTLS
@@ -1639,18 +1595,16 @@ void *player_thread_func(void *arg) {
   // if ((input_rate!=config.output_rate) || (input_bit_depth!=output_bit_depth)) {
   // debug(1,"Define tbuf of length
   // %d.",output_bytes_per_frame*(max_frames_per_packet*output_sample_ratio+max_frame_size_change));
-  tbuf = malloc(
-      sizeof(int32_t) * 2 *
-      (conn->max_frames_per_packet * conn->output_sample_ratio + conn->max_frame_size_change));
+  tbuf = malloc(sizeof(int32_t) * 2 * (conn->max_frames_per_packet * conn->output_sample_ratio +
+                                       conn->max_frame_size_change));
   if (tbuf == NULL)
     die("Failed to allocate memory for the transition buffer.");
 
   sbuf = 0;
   // initialise this, because soxr stuffing might be chosen later
 
-  sbuf = malloc(
-      sizeof(int32_t) * 2 *
-      (conn->max_frames_per_packet * conn->output_sample_ratio + conn->max_frame_size_change));
+  sbuf = malloc(sizeof(int32_t) * 2 * (conn->max_frames_per_packet * conn->output_sample_ratio +
+                                       conn->max_frame_size_change));
   if (sbuf == NULL)
     debug(1, "Failed to allocate memory for the sbuf buffer.");
 
@@ -1670,7 +1624,8 @@ void *player_thread_func(void *arg) {
 
 // stop looking elsewhere for DACP stuff
 #ifdef HAVE_DACP_CLIENT
-  set_dacp_server_information(conn); // this will start scanning when a port is registered by the
+  // this may have pthread cancellation points in it
+  set_dacp_server_information(conn); //  this will start scanning when a port is registered by the
                                      // code initiated by the mdns_dacp_monitor
 #else
   // this is only used for compatability, if dacp stuff isn't enabled.
@@ -1679,7 +1634,7 @@ void *player_thread_func(void *arg) {
   if (conn->dapo_private_storage)
     debug(1, "DACP monitor already initialised?");
   else
-    conn->dapo_private_storage = mdns_dacp_monitor(conn->dacp_id);
+    conn->dapo_private_storage = mdns_dacp_monitor(conn->dacp_id); // ??
 #endif
 
   conn->framesProcessedInThisEpoch = 0;
@@ -1726,7 +1681,7 @@ void *player_thread_func(void *arg) {
   // set the default volume to whaterver it was before, as stored in the config airplay_volume
   debug(3, "Set initial volume to %f.", config.airplay_volume);
 
-  player_volume(config.airplay_volume, conn);
+  player_volume(config.airplay_volume, conn); // ??
   int64_t frames_to_drop = 0;
   // debug(1, "Play begin");
   while (!conn->player_thread_please_stop) {
@@ -1752,9 +1707,8 @@ void *player_thread_func(void *arg) {
           } else {
             // the player may change the contents of the buffer, so it has to be zeroed each time;
             // might as well malloc and freee it locally
-            memset(silence, 0,
-                   conn->output_bytes_per_frame * conn->max_frames_per_packet *
-                       conn->output_sample_ratio);
+            memset(silence, 0, conn->output_bytes_per_frame * conn->max_frames_per_packet *
+                                   conn->output_sample_ratio);
             config.output->play(silence, conn->max_frames_per_packet * conn->output_sample_ratio);
             free(silence);
           }
@@ -1774,9 +1728,8 @@ void *player_thread_func(void *arg) {
           } else {
             // the player may change the contents of the buffer, so it has to be zeroed each time;
             // might as well malloc and freee it locally
-            memset(silence, 0,
-                   conn->output_bytes_per_frame * conn->max_frames_per_packet *
-                       conn->output_sample_ratio);
+            memset(silence, 0, conn->output_bytes_per_frame * conn->max_frames_per_packet *
+                                   conn->output_sample_ratio);
             config.output->play(silence, conn->max_frames_per_packet * conn->output_sample_ratio);
             free(silence);
           }
@@ -1923,9 +1876,8 @@ void *player_thread_func(void *arg) {
                 SUCCESSOR(conn->last_seqno_read); // int32_t from seq_t, i.e. uint16_t, so okay.
             if (inframe->sequence_number !=
                 conn->last_seqno_read) { // seq_t, ei.e. uint16_t and int32_t, so okay
-              debug(2,
-                    "Player: packets out of sequence: expected: %u, got: %u, with ab_read: %u "
-                    "and ab_write: %u.",
+              debug(2, "Player: packets out of sequence: expected: %u, got: %u, with ab_read: %u "
+                       "and ab_write: %u.",
                     conn->last_seqno_read, inframe->sequence_number, conn->ab_read, conn->ab_write);
               conn->last_seqno_read = inframe->sequence_number; // reset warning...
             }
@@ -2076,7 +2028,7 @@ void *player_thread_func(void *arg) {
 #ifdef CONFIG_CONVOLUTION
                   || config.convolution
 #endif
-              ) {
+                  ) {
                 int32_t *tbuf32 = (int32_t *)tbuf;
                 float fbuf_l[inbuflength];
                 float fbuf_r[inbuflength];
@@ -2278,10 +2230,10 @@ void *player_thread_func(void *arg) {
                       "%*d",    /* max buffer occupancy */
                       9, /* should be 10, but there's an explicit space at the start to ensure
                             alignment */
-                      1000 * moving_average_sync_error / config.output_rate, 10,
-                      moving_average_correction * 1000000 / (352 * conn->output_sample_ratio), 10,
-                      moving_average_insertions_plus_deletions * 1000000 /
-                          (352 * conn->output_sample_ratio),
+                      1000 * moving_average_sync_error / config.output_rate,
+                      10, moving_average_correction * 1000000 / (352 * conn->output_sample_ratio),
+                      10, moving_average_insertions_plus_deletions * 1000000 /
+                              (352 * conn->output_sample_ratio),
                       12, play_number, 7, conn->missing_packets, 7, conn->late_packets, 7,
                       conn->too_late_packets, 7, conn->resend_requests, 7, minimum_dac_queue_size,
                       5, minimum_buffer_occupancy, 5, maximum_buffer_occupancy);
@@ -2297,10 +2249,11 @@ void *player_thread_func(void *arg) {
                          "%*d",    /* max buffer occupancy */
                          9, /* should be 10, but there's an explicit space at the start to ensure
                                alignment */
-                         1000 * moving_average_sync_error / config.output_rate, 12, play_number, 7,
-                         conn->missing_packets, 7, conn->late_packets, 7, conn->too_late_packets, 7,
-                         conn->resend_requests, 7, minimum_dac_queue_size, 5,
-                         minimum_buffer_occupancy, 5, maximum_buffer_occupancy);
+                         1000 * moving_average_sync_error / config.output_rate,
+                         12, play_number, 7, conn->missing_packets, 7, conn->late_packets, 7,
+                         conn->too_late_packets, 7, conn->resend_requests, 7,
+                         minimum_dac_queue_size, 5, minimum_buffer_occupancy, 5,
+                         maximum_buffer_occupancy);
                 }
               } else {
                 inform(" %*.1f," /* Sync error in milliseconds */
@@ -2313,10 +2266,10 @@ void *player_thread_func(void *arg) {
                        "%*d",    /* max buffer occupancy */
                        9, /* should be 10, but there's an explicit space at the start to ensure
                              alignment */
-                       1000 * moving_average_sync_error / config.output_rate, 12, play_number, 7,
-                       conn->missing_packets, 7, conn->late_packets, 7, conn->too_late_packets, 7,
-                       conn->resend_requests, 5, minimum_buffer_occupancy, 5,
-                       maximum_buffer_occupancy);
+                       1000 * moving_average_sync_error / config.output_rate,
+                       12, play_number, 7, conn->missing_packets, 7, conn->late_packets, 7,
+                       conn->too_late_packets, 7, conn->resend_requests, 5,
+                       minimum_buffer_occupancy, 5, maximum_buffer_occupancy);
               }
             } else {
               inform("No frames received in the last sampling interval.");
@@ -2381,16 +2334,6 @@ void *player_thread_func(void *arg) {
 
   free_audio_buffers(conn);
   terminate_decoders(conn);
-  // remove flow control and mutexes
-  rc = pthread_cond_destroy(&conn->flowcontrol);
-  if (rc)
-    debug(1, "Error destroying flowcontrol condition variable.");
-  rc = pthread_mutex_destroy(&conn->flush_mutex);
-  if (rc)
-    debug(1, "Error destroying flush_mutex variable.");
-  rc = pthread_mutex_destroy(&conn->ab_mutex);
-  if (rc)
-    debug(1, "Error destroying ab_mutex variable.");
   debug(2, "Connection %d: player thread terminated.", conn->connection_number);
   if (conn->dacp_id) {
     free(conn->dacp_id);
@@ -2402,6 +2345,7 @@ void *player_thread_func(void *arg) {
     free(tbuf);
   if (sbuf)
     free(sbuf);
+  pthread_cleanup_pop(1);
   pthread_exit(NULL);
 }
 
@@ -2640,14 +2584,16 @@ void player_volume(double airplay_volume, rtsp_conn_info *conn) {
 }
 
 void do_flush(int64_t timestamp, rtsp_conn_info *conn) {
+
   debug(3, "Flush requested up to %u. It seems as if 0 is special.", timestamp);
   debug_mutex_lock(&conn->flush_mutex, 1000, 1);
   conn->flush_requested = 1;
   // if (timestamp!=0)
   conn->flush_rtp_timestamp = timestamp; // flush all packets up to (and including?) this
-  debug_mutex_unlock(&conn->flush_mutex, 3);
   conn->play_segment_reference_frame = 0;
   conn->play_number_after_flush = 0;
+  debug_mutex_unlock(&conn->flush_mutex, 3);
+
 #ifdef CONFIG_METADATA
   // only send a flush metadata message if the first packet has been seen -- it's a bogus message
   // otherwise
@@ -2656,20 +2602,13 @@ void do_flush(int64_t timestamp, rtsp_conn_info *conn) {
     send_ssnc_metadata('pfls', NULL, 0, 1);
   }
 #endif
+
   debug(3, "Flush request made.");
 }
 
 void player_flush(int64_t timestamp, rtsp_conn_info *conn) {
   debug(3, "player_flush");
-  if (pthread_rwlock_tryrdlock(&conn->player_thread_lock) == 0) {
-    if (conn->player_thread != NULL)
-      do_flush(timestamp, conn);
-    else
-      debug(1, "Flush requested when player thread is gone.");
-    pthread_rwlock_unlock(&conn->player_thread_lock);
-  } else {
-    debug(3, "Can't acquire a read lock for a flush -- ignored.");
-  }
+  do_flush(timestamp, conn);
 }
 
 int player_play(rtsp_conn_info *conn) {
@@ -2694,8 +2633,6 @@ int player_play(rtsp_conn_info *conn) {
   int rc = pthread_attr_setstacksize(&tattr, size);
   if (rc)
     debug(1, "Error setting stack size for player_thread: %s", strerror(errno));
-
-  // hack alert -- the player thread itself releases the player_thread_lock rwlock as soon as it's
   // finished initialising.
   pthread_create(pt, &tattr, player_thread_func, (void *)conn);
   pthread_attr_destroy(&tattr);
@@ -2704,8 +2641,6 @@ int player_play(rtsp_conn_info *conn) {
 
 int player_stop(rtsp_conn_info *conn) {
   debug(3, "player_stop");
-  pthread_rwlock_wrlock(&conn->player_thread_lock);
-  debug(3, "player_thread_lock acquired");
   if (conn->player_thread) {
     debug(3, "player_thread exists");
     conn->player_thread_please_stop = 1;
@@ -2715,7 +2650,6 @@ int player_stop(rtsp_conn_info *conn) {
     pthread_join(*conn->player_thread, NULL);
     free(conn->player_thread);
     conn->player_thread = NULL;
-    pthread_rwlock_unlock(&conn->player_thread_lock);
 #ifdef CONFIG_METADATA
     debug(2, "pend");
     send_ssnc_metadata('pend', NULL, 0, 1);
@@ -2723,8 +2657,7 @@ int player_stop(rtsp_conn_info *conn) {
     command_stop();
     return 0;
   } else {
-    pthread_rwlock_unlock(&conn->player_thread_lock);
-    debug(3, "player thread of RTSP conversation %d is already deleted.", conn->connection_number);
+    debug(3, "Connection %d: player thread already deleted.", conn->connection_number);
     return -1;
   }
 }
index 3274d6487a65ba7fcb62a3deda1cad2207dbc629..4c12c803805762681c06d86ab520142808d89df0 100644 (file)
--- a/player.h
+++ b/player.h
@@ -85,7 +85,6 @@ typedef struct {
 
   // pthread_t *ptp;
   pthread_t *player_thread;
-  pthread_rwlock_t player_thread_lock; // used to control access by "outsiders"
 
   abuf_t audio_buffer[BUFFER_FRAMES];
   int max_frames_per_packet, input_num_channels, input_bit_depth, input_rate;
diff --git a/rtp.c b/rtp.c
index 6a588b0b0b9c755fb1b28c9d109922898e80ab24..c86d3fa075d562e395d444e6093a5aa24e57f4c1 100644 (file)
--- a/rtp.c
+++ b/rtp.c
@@ -68,9 +68,9 @@ void rtp_terminate(rtsp_conn_info *conn) {
 void rtp_audio_receiver_cleanup_handler(void *arg) {
   debug(3, "Audio Receiver Cleanup.");
   rtsp_conn_info *conn = (rtsp_conn_info *)arg;
-  debug(1,"shutdown audio socket.");
-  shutdown(conn->audio_socket,SHUT_RDWR);  
-  debug(1,"close audio socket.");
+  debug(1, "shutdown audio socket.");
+  shutdown(conn->audio_socket, SHUT_RDWR);
+  debug(1, "close audio socket.");
   close(conn->audio_socket);
   debug(3, "Audio Receiver Cleanup Successful.");
 }
@@ -191,9 +191,9 @@ void *rtp_audio_receiver(void *arg) {
 void rtp_control_handler_cleanup_handler(void *arg) {
   debug(3, "Control Receiver Cleanup.");
   rtsp_conn_info *conn = (rtsp_conn_info *)arg;
-  debug(1,"shutdown control socket.");
-  shutdown(conn->control_socket,SHUT_RDWR);  
-  debug(1,"close control socket.");
+  debug(1, "shutdown control socket.");
+  shutdown(conn->control_socket, SHUT_RDWR);
+  debug(1, "close control socket.");
   close(conn->control_socket);
   debug(3, "Control Receiver Cleanup Successful.");
 }
@@ -477,9 +477,9 @@ void rtp_timing_receiver_cleanup_handler(void *arg) {
   rtsp_conn_info *conn = (rtsp_conn_info *)arg;
   pthread_cancel(conn->timer_requester);
   pthread_join(conn->timer_requester, NULL);
-  debug(1,"shutdown timing socket.");
-  shutdown(conn->timing_socket,SHUT_RDWR);  
-  debug(1,"close timing socket.");
+  debug(1, "shutdown timing socket.");
+  shutdown(conn->timing_socket, SHUT_RDWR);
+  debug(1, "close timing socket.");
   close(conn->timing_socket);
   debug(3, "Timing Receiver Cleanup Successful.");
 }
diff --git a/rtsp.c b/rtsp.c
index b66e8954041c4d18fceb7f053f2e6de182d8b21e..00a168baedf4fc3b4d858e7fd0db5ba8e268bbe0 100644 (file)
--- a/rtsp.c
+++ b/rtsp.c
@@ -1887,11 +1887,25 @@ authenticate:
 static void *rtsp_conversation_thread_func(void *pconn) {
   rtsp_conn_info *conn = pconn;
 
-  // create the player thread lock.
-  int rwli = pthread_rwlock_init(&conn->player_thread_lock, NULL);
-  if (rwli != 0)
-    die("Error %d initialising player_thread_lock for conversation thread %d.", rwli,
-        conn->connection_number);
+  int rc = pthread_mutex_init(&conn->flush_mutex, NULL);
+  if (rc)
+    die("Connection %d: error %d initialising flush_mutex.", conn->connection_number, rc);
+  rc = pthread_mutex_init(&conn->ab_mutex, NULL);
+  if (rc)
+    die("Connection %d: error %d initialising ab_mutex.", conn->connection_number, rc);
+// set the flowcontrol condition variable to wait on a monotonic clock
+#ifdef COMPILE_FOR_LINUX_AND_FREEBSD_AND_CYGWIN_AND_OPENBSD
+  pthread_condattr_t attr;
+  pthread_condattr_init(&attr);
+  pthread_condattr_setclock(&attr, CLOCK_MONOTONIC); // can't do this in OS X, and don't need it.
+  rc = pthread_cond_init(&conn->flowcontrol, &attr);
+#endif
+#ifdef COMPILE_FOR_OSX
+  rc = pthread_cond_init(&conn->flowcontrol, NULL);
+#endif
+  if (rc)
+    die("Connection %d: error %d initialising flow control condition variable.",
+        conn->connection_number, rc);
 
   rtp_initialise(conn);
 
@@ -2005,12 +2019,17 @@ static void *rtsp_conversation_thread_func(void *pconn) {
   debug(2, "Connection %d: RTSP thread terminated.", conn->connection_number);
   conn->running = 0;
 
-  // release the player_thread_lock
-  int rwld = pthread_rwlock_destroy(&conn->player_thread_lock);
-  if (rwld)
-    debug(1, "Error %d destroying player_thread_lock for conversation thread %d.", rwld,
-          conn->connection_number);
-
+  // remove flow control and mutexes
+  rc = pthread_cond_destroy(&conn->flowcontrol);
+  if (rc)
+    debug(1, "Connection %d: error %d destroying flow control condition variable.",
+          conn->connection_number, rc);
+  rc = pthread_mutex_destroy(&conn->ab_mutex);
+  if (rc)
+    debug(1, "Connection %d: error %d destroying ab_mutex.", conn->connection_number, rc);
+  rc = pthread_mutex_destroy(&conn->flush_mutex);
+  if (rc)
+    debug(1, "Connection %d: error %d destroying flush_mutex.", conn->connection_number, rc);
   pthread_exit(NULL);
 }
 
index c5450ad797dd0c47e001705d72a161b188a6de8a..925211075e1dda7c12581c7dc4852f4b849cdd51 100644 (file)
 #include <FFTConvolver/convolver.h>
 #endif
 
-static inline int config_set_lookup_bool(config_t* cfg, char* where, int* dst) {
+static inline int config_set_lookup_bool(config_t *cfg, char *where, int *dst) {
   const char *str = 0;
   if (config_lookup_string(cfg, where, &str)) {
-    if (strcasecmp(str, "no") == 0){
-      (*dst)=0;
+    if (strcasecmp(str, "no") == 0) {
+      (*dst) = 0;
       return 1;
-    }else if (strcasecmp(str, "yes") == 0){
-      (*dst)=1;
+    } else if (strcasecmp(str, "yes") == 0) {
+      (*dst) = 1;
       return 1;
-    }else{
+    } else {
       die("Invalid %s option choice \"%s\". It should be \"yes\" or \"no\"", where, str);
       return 0;
     }
-  }else{
+  } else {
     return 0;
   }
 }
@@ -404,7 +404,8 @@ int parse_options(int argc, char **argv) {
       config_set_lookup_bool(config.cfg, "sessioncontrol.daemonize_with_pid_file", &daemonisewith);
 
       /* Get the Just_Daemonize setting. */
-      config_set_lookup_bool(config.cfg, "sessioncontrol.daemonize_without_pid_file", &daemonisewithout);
+      config_set_lookup_bool(config.cfg, "sessioncontrol.daemonize_without_pid_file",
+                             &daemonisewithout);
 
       if ((daemonisewith) && (daemonisewithout))
         die("Select either daemonize_with_pid_file or daemonize_without_pid_file -- you have "
@@ -474,7 +475,8 @@ int parse_options(int argc, char **argv) {
       }
 
       /* Get the statistics setting. */
-      if (!config_set_lookup_bool(config.cfg, "general.statistics", &(config.statistics_requested))) {
+      if (!config_set_lookup_bool(config.cfg, "general.statistics",
+                                  &(config.statistics_requested))) {
         warn("The \"general\" \"statistics\" setting is deprecated. Please use the \"diagnostics\" "
              "\"statistics\" setting instead.");
       }
@@ -881,61 +883,60 @@ int parse_options(int argc, char **argv) {
 #endif
 
 #ifdef CONFIG_MQTT
-      int tmpval=0;
-      config_set_lookup_bool(config.cfg, "mqtt.enabled", &config.mqtt_enabled);
-      if(config.mqtt_enabled && !config.metadata_enabled){
-        die("You need to have metadata enabled in order to use mqtt");
-      }
-      if (config_lookup_string(config.cfg, "mqtt.hostname", &str)) {
-        config.mqtt_hostname = (char *)str;
-        //TODO: Document that, if this is false, whole mqtt func is disabled
-      }
-      if (config_lookup_int(config.cfg, "mqtt.port", &tmpval)) {
-        config.mqtt_port = tmpval;
-      }else{
-        //TODO: Is this the correct way to set a default value?
-        config.mqtt_port = 1883;
-      }
-      
-      if (config_lookup_string(config.cfg, "mqtt.username", &str)) {
-        config.mqtt_username = (char *)str;
-      }
-      if (config_lookup_string(config.cfg, "mqtt.password", &str)) {
-        config.mqtt_password = (char *)str;
-      }
-      int capath=0;
-      if (config_lookup_string(config.cfg, "mqtt.capath", &str)) {
-        config.mqtt_capath = (char *)str;
-        capath=1;
-      }
-      if (config_lookup_string(config.cfg, "mqtt.cafile", &str)) {
-        if(capath)
-          die("Supply either mqtt cafile or mqtt capath -- you have supplied both!");
-        config.mqtt_cafile = (char *)str;
-      }
-      int certkeynum=0;
-      if (config_lookup_string(config.cfg, "mqtt.certfile", &str)) {
-        config.mqtt_certfile = (char *)str;
-        certkeynum++;
-      }
-      if (config_lookup_string(config.cfg, "mqtt.keyfile", &str)) {
-        config.mqtt_keyfile = (char *)str;
-        certkeynum++;
-      }
-      if( certkeynum!=0 && certkeynum!=2){
-        die("If you want to use TLS Client Authentication, you have to specify "
-            "mqtt.certfile AND mqtt.keyfile.\nYou have supplied only one of them.\n"
-            "If you do not want to use TLS Client Authentication, leave both empty."
-        );
-      }
-      
-      if(config_lookup_string(config.cfg, "mqtt.topic", &str)){
-        config.mqtt_topic = (char *)str;
-      }
-      config_set_lookup_bool(config.cfg, "mqtt.publish_raw", &config.mqtt_publish_raw);
-      config_set_lookup_bool(config.cfg, "mqtt.publish_parsed", &config.mqtt_publish_parsed);
-      config_set_lookup_bool(config.cfg, "mqtt.publish_cover", &config.mqtt_publish_cover);
-      config_set_lookup_bool(config.cfg, "mqtt.enable_remote", &config.mqtt_enable_remote);
+    int tmpval = 0;
+    config_set_lookup_bool(config.cfg, "mqtt.enabled", &config.mqtt_enabled);
+    if (config.mqtt_enabled && !config.metadata_enabled) {
+      die("You need to have metadata enabled in order to use mqtt");
+    }
+    if (config_lookup_string(config.cfg, "mqtt.hostname", &str)) {
+      config.mqtt_hostname = (char *)str;
+      // TODO: Document that, if this is false, whole mqtt func is disabled
+    }
+    if (config_lookup_int(config.cfg, "mqtt.port", &tmpval)) {
+      config.mqtt_port = tmpval;
+    } else {
+      // TODO: Is this the correct way to set a default value?
+      config.mqtt_port = 1883;
+    }
+
+    if (config_lookup_string(config.cfg, "mqtt.username", &str)) {
+      config.mqtt_username = (char *)str;
+    }
+    if (config_lookup_string(config.cfg, "mqtt.password", &str)) {
+      config.mqtt_password = (char *)str;
+    }
+    int capath = 0;
+    if (config_lookup_string(config.cfg, "mqtt.capath", &str)) {
+      config.mqtt_capath = (char *)str;
+      capath = 1;
+    }
+    if (config_lookup_string(config.cfg, "mqtt.cafile", &str)) {
+      if (capath)
+        die("Supply either mqtt cafile or mqtt capath -- you have supplied both!");
+      config.mqtt_cafile = (char *)str;
+    }
+    int certkeynum = 0;
+    if (config_lookup_string(config.cfg, "mqtt.certfile", &str)) {
+      config.mqtt_certfile = (char *)str;
+      certkeynum++;
+    }
+    if (config_lookup_string(config.cfg, "mqtt.keyfile", &str)) {
+      config.mqtt_keyfile = (char *)str;
+      certkeynum++;
+    }
+    if (certkeynum != 0 && certkeynum != 2) {
+      die("If you want to use TLS Client Authentication, you have to specify "
+          "mqtt.certfile AND mqtt.keyfile.\nYou have supplied only one of them.\n"
+          "If you do not want to use TLS Client Authentication, leave both empty.");
+    }
+
+    if (config_lookup_string(config.cfg, "mqtt.topic", &str)) {
+      config.mqtt_topic = (char *)str;
+    }
+    config_set_lookup_bool(config.cfg, "mqtt.publish_raw", &config.mqtt_publish_raw);
+    config_set_lookup_bool(config.cfg, "mqtt.publish_parsed", &config.mqtt_publish_parsed);
+    config_set_lookup_bool(config.cfg, "mqtt.publish_cover", &config.mqtt_publish_cover);
+    config_set_lookup_bool(config.cfg, "mqtt.enable_remote", &config.mqtt_enable_remote);
 #endif
     free(config_file_real_path);
   }
@@ -1035,13 +1036,13 @@ int parse_options(int argc, char **argv) {
   free(i2);
   free(i3);
   free(vs);
-  
+
 #ifdef CONFIG_MQTT
   // mqtt topic was not set. As we have the service name just now, set it
-  if(config.mqtt_topic==NULL){
-    int topic_length=1+strlen(config.service_name)+1;
-    char* topic=malloc(topic_length+1);
-    snprintf(topic,topic_length,"/%s/",config.service_name);
+  if (config.mqtt_topic == NULL) {
+    int topic_length = 1 + strlen(config.service_name) + 1;
+    char *topic = malloc(topic_length + 1);
+    snprintf(topic, topic_length, "/%s/", config.service_name);
     config.mqtt_topic = topic;
   }
 #endif
@@ -1251,7 +1252,7 @@ int main(int argc, char **argv) {
   daemon_pid_file_ident = daemon_log_ident = daemon_ident_from_argv0(argv[0]);
 
   daemon_pid_file_proc = pid_file_proc;
-  
+
   /* Check if we are called with -D or --disconnectFromOutput parameter */
   if (argc >= 2 &&
       ((strcmp(argv[1], "-D") == 0) || (strcmp(argv[1], "--disconnectFromOutput") == 0))) {
@@ -1577,7 +1578,7 @@ int main(int argc, char **argv) {
 #endif
 
 #ifdef HAVE_MQTT
-  if(config.mqtt_enabled){
+  if (config.mqtt_enabled) {
     initialise_mqtt();
   }
 #endif