]> git.ipfire.org Git - thirdparty/shairport-sync.git/commitdiff
Make the pipe, hub and mqtt separate threads with their own metadata queues. If one...
authorMike Brady <mikebradydublin@icloud.com>
Sat, 30 May 2020 14:49:00 +0000 (15:49 +0100)
committerMike Brady <mikebradydublin@icloud.com>
Sat, 30 May 2020 14:49:00 +0000 (15:49 +0100)
common.c
common.h
metadata_hub.c
player.c
rtp.c
rtsp.c

index f10ef7bf70b79fc1b2752d3f2d271c15c6e6ab93..9bf0dd14b052e767588c76aec3f1d104ba0e78b9 100644 (file)
--- a/common.c
+++ b/common.c
@@ -1700,3 +1700,13 @@ int string_update_with_size(char **str, int *flag, char *s, size_t len) {
   }
   return *flag;
 }
+
+// from https://stackoverflow.com/questions/13663617/memdup-function-in-c, with thanks
+void* memdup(const void* mem, size_t size) {
+   void* out = malloc(size);
+
+   if(out != NULL)
+       memcpy(out, mem, size);
+
+   return out;
+}
index 3540bcd074e11a36098566520a0c0d89130f1df1..93bb9c3633e4ee89d938c01453ee1de67e0ea571 100644 (file)
--- a/common.h
+++ b/common.h
@@ -441,4 +441,7 @@ void malloc_cleanup(void *arg);
 
 int string_update_with_size(char **str, int *flag, char *s, size_t len);
 
+// from https://stackoverflow.com/questions/13663617/memdup-function-in-c, with thanks
+void* memdup(const void* mem, size_t size);
+
 #endif // _COMMON_H
index eb63d2dfd9ea236b38058cbcceab5dae91e29d81..2ea442a58c74c9c26dabbb498913a1ff73d05467 100644 (file)
@@ -123,12 +123,13 @@ void run_metadata_watchers(void) {
 }
 
 void metadata_hub_unlock_hub_mutex_cleanup(__attribute__((unused)) void *arg) {
-  // debug(1, "metadata_hub_unlock_hub_mutex_cleanup called.");
-  pthread_rwlock_unlock(&metadata_hub_re_lock);
+  debug(1, "metadata_hub_unlock_hub_mutex_cleanup called.");
+  metadata_hub_modify_epilog(0);
 }
 
 char *last_metadata_hub_modify_prolog_file = NULL;
 int last_metadata_hub_modify_prolog_line;
+int metadata_hub_re_lock_access_is_delayed;
 
 void _metadata_hub_modify_prolog(const char *filename, const int linenumber) {
   // always run this before changing an entry or a sequence of entries in the metadata_hub
@@ -138,8 +139,9 @@ void _metadata_hub_modify_prolog(const char *filename, const int linenumber) {
        debug(1, "Metadata_hub write lock at \"%s:%d\" is already taken at \"%s:%d\" -- must wait.", filename, linenumber, last_metadata_hub_modify_prolog_file, last_metadata_hub_modify_prolog_line);
     else
        debug(1, "Metadata_hub write lock is already taken by unknown -- must wait.");
+    metadata_hub_re_lock_access_is_delayed = 0;
     pthread_rwlock_wrlock(&metadata_hub_re_lock);
-    debug(1, "Okay -- acquired the metadata_hub write lock.");
+    debug(1, "Okay -- acquired the metadata_hub write lock at \"%s:%d\".", filename, linenumber);
   } else {
        if (last_metadata_hub_modify_prolog_file) {
                free(last_metadata_hub_modify_prolog_file);
@@ -148,6 +150,7 @@ void _metadata_hub_modify_prolog(const char *filename, const int linenumber) {
        last_metadata_hub_modify_prolog_line = linenumber;
     debug(3, "Metadata_hub write lock acquired.");
   }
+  metadata_hub_re_lock_access_is_delayed = 0;
 }
 
 void _metadata_hub_modify_epilog(int modified, const char *filename, const int linenumber) {
@@ -156,9 +159,14 @@ void _metadata_hub_modify_epilog(int modified, const char *filename, const int l
   if (modified) {
     run_metadata_watchers();
   }
-  if (last_metadata_hub_modify_prolog_file) {
-               free(last_metadata_hub_modify_prolog_file);
-               last_metadata_hub_modify_prolog_file = NULL;
+  if (metadata_hub_re_lock_access_is_delayed) {
+               if (last_metadata_hub_modify_prolog_file) {
+                               debug(1, "Metadata_hub write lock taken at \"%s:%d\" is freed at \"%s:%d\".", last_metadata_hub_modify_prolog_file, last_metadata_hub_modify_prolog_line, filename, linenumber);
+                               free(last_metadata_hub_modify_prolog_file);
+                               last_metadata_hub_modify_prolog_file = NULL;
+               } else {
+                               debug(1, "Metadata_hub write lock taken at an unknown place is freed at \"%s:%d\".", filename, linenumber);
+               }
   }
   pthread_rwlock_unlock(&metadata_hub_re_lock);
   debug(3, "Metadata_hub write lock unlocked.");
@@ -316,10 +324,15 @@ void metadata_hub_process_metadata(uint32_t type, uint32_t code, char *data, uin
 
   // all the following items of metadata are contained in one metadata packet
   // they are preceded by an 'ssnc' 'mdst' item and followed by an 'ssnc 'mden' item.
-  debug(1, "metadata_hub_process_metadata type %x, code %x and length %u.", type, code, length);
+  // we don't set "changed" for them individually; instead we set it when the  'mden' token
+  // comes in if the metadata_packet_item_changed is set.
 
-  char *cs;
   int changed = 0;
+  int metadata_packet_item_changed;
+  metadata_hub_modify_prolog();
+  pthread_cleanup_push(metadata_hub_unlock_hub_mutex_cleanup, NULL);
+
+  char *cs;
   if (type == 'core') {
     switch (code) {
     case 'mper': {
@@ -334,6 +347,7 @@ void metadata_hub_process_metadata(uint32_t type, uint32_t code, char *data, uin
         metadata_store.item_id_changed = 1;
         metadata_store.item_id_received = 1;
         debug(2, "MH Item ID set to: \"%" PRIx64 "\"", metadata_store.item_id);
+        metadata_packet_item_changed = 1;
       }
     } break;
     case 'astm': {
@@ -343,12 +357,14 @@ void metadata_hub_process_metadata(uint32_t type, uint32_t code, char *data, uin
         metadata_store.songtime_in_milliseconds = ui;
         metadata_store.songtime_in_milliseconds_changed = 1;
         debug(2, "MH Song Time set to: \"%u\"", metadata_store.songtime_in_milliseconds);
+        metadata_packet_item_changed = 1;
       }
     } break;
     case 'asal':
       cs = strndup(data, length);
       if (string_update(&metadata_store.album_name, &metadata_store.album_name_changed, cs)) {
         debug(2, "MH Album name set to: \"%s\"", metadata_store.album_name);
+        metadata_packet_item_changed = 1;
       }
       free(cs);
       break;
@@ -356,6 +372,7 @@ void metadata_hub_process_metadata(uint32_t type, uint32_t code, char *data, uin
       cs = strndup(data, length);
       if (string_update(&metadata_store.artist_name, &metadata_store.artist_name_changed, cs)) {
         debug(2, "MH Artist name set to: \"%s\"", metadata_store.artist_name);
+        metadata_packet_item_changed = 1;
       }
       free(cs);
       break;
@@ -364,6 +381,7 @@ void metadata_hub_process_metadata(uint32_t type, uint32_t code, char *data, uin
       if (string_update(&metadata_store.album_artist_name,
                         &metadata_store.album_artist_name_changed, cs)) {
         debug(2, "MH Album Artist name set to: \"%s\"", metadata_store.album_artist_name);
+        metadata_packet_item_changed = 1;
       }
       free(cs);
       break;
@@ -371,6 +389,7 @@ void metadata_hub_process_metadata(uint32_t type, uint32_t code, char *data, uin
       cs = strndup(data, length);
       if (string_update(&metadata_store.comment, &metadata_store.comment_changed, cs)) {
         debug(2, "MH Comment set to: \"%s\"", metadata_store.comment);
+        metadata_packet_item_changed = 1;
       }
       free(cs);
       break;
@@ -378,6 +397,7 @@ void metadata_hub_process_metadata(uint32_t type, uint32_t code, char *data, uin
       cs = strndup(data, length);
       if (string_update(&metadata_store.genre, &metadata_store.genre_changed, cs)) {
         debug(2, "MH Genre set to: \"%s\"", metadata_store.genre);
+        metadata_packet_item_changed = 1;
       }
       free(cs);
       break;
@@ -385,6 +405,7 @@ void metadata_hub_process_metadata(uint32_t type, uint32_t code, char *data, uin
       cs = strndup(data, length);
       if (string_update(&metadata_store.track_name, &metadata_store.track_name_changed, cs)) {
         debug(2, "MH Track Name set to: \"%s\"", metadata_store.track_name);
+        metadata_packet_item_changed = 1;
       }
       free(cs);
       break;
@@ -392,6 +413,7 @@ void metadata_hub_process_metadata(uint32_t type, uint32_t code, char *data, uin
       cs = strndup(data, length);
       if (string_update(&metadata_store.composer, &metadata_store.composer_changed, cs)) {
         debug(2, "MH Composer set to: \"%s\"", metadata_store.composer);
+        metadata_packet_item_changed = 1;
       }
       free(cs);
       break;
@@ -408,6 +430,7 @@ void metadata_hub_process_metadata(uint32_t type, uint32_t code, char *data, uin
       if (string_update(&metadata_store.song_album_artist,
                         &metadata_store.song_album_artist_changed, cs)) {
         debug(2, "MH Song Album Artist set to: \"%s\"", metadata_store.song_album_artist);
+        metadata_packet_item_changed = 1;
       }
       free(cs);
       break;
@@ -415,6 +438,7 @@ void metadata_hub_process_metadata(uint32_t type, uint32_t code, char *data, uin
       cs = strndup(data, length);
       if (string_update(&metadata_store.sort_name, &metadata_store.sort_name_changed, cs)) {
         debug(2, "MH Sort Name set to: \"%s\"", metadata_store.sort_name);
+        metadata_packet_item_changed = 1;
       }
       free(cs);
       break;
@@ -422,6 +446,7 @@ void metadata_hub_process_metadata(uint32_t type, uint32_t code, char *data, uin
       cs = strndup(data, length);
       if (string_update(&metadata_store.sort_artist, &metadata_store.sort_artist_changed, cs)) {
         debug(2, "MH Sort Artist set to: \"%s\"", metadata_store.sort_artist);
+        metadata_packet_item_changed = 1;
       }
       free(cs);
       break;
@@ -429,6 +454,7 @@ void metadata_hub_process_metadata(uint32_t type, uint32_t code, char *data, uin
       cs = strndup(data, length);
       if (string_update(&metadata_store.sort_album, &metadata_store.sort_album_changed, cs)) {
         debug(2, "MH Sort Album set to: \"%s\"", metadata_store.sort_album);
+        metadata_packet_item_changed = 1;
       }
       free(cs);
       break;
@@ -436,6 +462,7 @@ void metadata_hub_process_metadata(uint32_t type, uint32_t code, char *data, uin
       cs = strndup(data, length);
       if (string_update(&metadata_store.sort_composer, &metadata_store.sort_composer_changed, cs)) {
         debug(2, "MH Sort Composer set to: \"%s\"", metadata_store.sort_composer);
+        metadata_packet_item_changed = 1;
       }
       free(cs);
     default:
@@ -468,45 +495,45 @@ void metadata_hub_process_metadata(uint32_t type, uint32_t code, char *data, uin
       break;
     case 'mdst':
       debug(2, "MH Metadata stream processing start.");
-      metadata_hub_modify_prolog();
+      metadata_packet_item_changed = 0;
       break;
     case 'mden':
       debug(2, "MH Metadata stream processing end.");
-      metadata_hub_modify_epilog(1);
-      debug(2, "MH Metadata stream processing epilog complete.");
+      changed = metadata_packet_item_changed;
       break;
     case 'PICT':
-      metadata_hub_modify_prolog();
-      pthread_cleanup_push(metadata_hub_unlock_hub_mutex_cleanup, NULL);
       debug(2, "MH Picture received, length %u bytes.", length);
+
       char uri[2048];
       if ((length > 16) && (strcmp(config.cover_art_cache_dir,"")!=0)) { // if it's okay to write the file
+       // make this uncancellable
+                               int oldState;
+                               pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, &oldState); // make this un-cancellable
         char *pathname = metadata_write_image_file(data, length);
         snprintf(uri, sizeof(uri), "file://%s", pathname);
         free(pathname);
+        pthread_setcancelstate(oldState, NULL);
+
       } else {
         uri[0] = '\0';
       }
       if (string_update(&metadata_store.cover_art_pathname,
                         &metadata_store.cover_art_pathname_changed,
                         uri)) // if the picture's file path is different from the stored one...
-        metadata_hub_modify_epilog(1);
+        changed = 1;
       else
-        metadata_hub_modify_epilog(0);
-      pthread_cleanup_pop(0); // don't remove the lock -- it'll have been done
+        changed = 0;
+//      pthread_cleanup_pop(0); // don't remove the lock -- it'll have been done
       break;
     case 'clip':
-      metadata_hub_modify_prolog();
       cs = strndup(data, length);
       if (string_update(&metadata_store.client_ip, &metadata_store.client_ip_changed, cs)) {
         changed = 1;
         debug(2, "MH Client IP set to: \"%s\"", metadata_store.client_ip);
       }
       free(cs);
-      metadata_hub_modify_epilog(changed);
       break;
     case 'prgr':
-      metadata_hub_modify_prolog();
       cs = strndup(data, length);
       if (string_update(&metadata_store.progress_string, &metadata_store.progress_string_changed,
                         cs)) {
@@ -514,61 +541,45 @@ void metadata_hub_process_metadata(uint32_t type, uint32_t code, char *data, uin
         debug(2, "MH Progress String set to: \"%s\"", metadata_store.progress_string);
       }
       free(cs);
-      metadata_hub_modify_epilog(changed);
       break;
     case 'svip':
-      metadata_hub_modify_prolog();
       cs = strndup(data, length);
       if (string_update(&metadata_store.server_ip, &metadata_store.server_ip_changed, cs)) {
         changed = 1;
         debug(2, "MH Server IP set to: \"%s\"", metadata_store.server_ip);
       }
       free(cs);
-      metadata_hub_modify_epilog(changed);
       break;
     case 'abeg':
-      metadata_hub_modify_prolog();
       changed = (metadata_store.active_state != AM_ACTIVE);
       metadata_store.active_state = AM_ACTIVE;
-      metadata_hub_modify_epilog(changed);
       break;
     case 'aend':
-      metadata_hub_modify_prolog();
       changed = (metadata_store.active_state != AM_INACTIVE);
       metadata_store.active_state = AM_INACTIVE;
-      metadata_hub_modify_epilog(changed);
       break;
     case 'pbeg':
-      metadata_hub_modify_prolog();
       changed = ((metadata_store.player_state != PS_PLAYING) ||
                  (metadata_store.player_thread_active == 0));
       metadata_store.player_state = PS_PLAYING;
       metadata_store.player_thread_active = 1;
-      metadata_hub_modify_epilog(changed);
       break;
     case 'pend':
-      metadata_hub_modify_prolog();
       changed = ((metadata_store.player_state != PS_STOPPED) ||
                  (metadata_store.player_thread_active == 1));
       metadata_store.player_state = PS_STOPPED;
       metadata_store.player_thread_active = 0;
-      metadata_hub_modify_epilog(changed);
       break;
     case 'pfls':
-      metadata_hub_modify_prolog();
       changed = (metadata_store.player_state != PS_PAUSED);
       metadata_store.player_state = PS_PAUSED;
-      metadata_hub_modify_epilog(changed);
       break;
     case 'pffr': // this is sent when the first frame has been received
     case 'prsm':
-      metadata_hub_modify_prolog();
       changed = (metadata_store.player_state != PS_PLAYING);
       metadata_store.player_state = PS_PLAYING;
-      metadata_hub_modify_epilog(changed);
       break;
     case 'pvol': {
-      metadata_hub_modify_prolog();
       // Note: it's assumed that the config.airplay volume has already been correctly set.
       // int32_t actual_volume;
       // int gv = dacp_get_volume(&actual_volume);
@@ -581,7 +592,6 @@ void metadata_hub_process_metadata(uint32_t type, uint32_t code, char *data, uin
         metadata_store.airplay_volume = config.airplay_volume;
         changed = 1;
       }
-      metadata_hub_modify_epilog(changed); // change
     } break;
 
     default: {
@@ -604,4 +614,6 @@ void metadata_hub_process_metadata(uint32_t type, uint32_t code, char *data, uin
     }
     }
   }
+  pthread_cleanup_pop(0); // don't remove the lock
+  metadata_hub_modify_epilog(changed);
 }
index 8847b8b5c1d3a5376d60e1369cfae74ea7251c1f..c7020fc7fad57758cb0b31f2190b2e7935811c9c 100644 (file)
--- a/player.c
+++ b/player.c
@@ -2941,20 +2941,18 @@ void player_volume_without_notification(double airplay_volume, rtsp_conn_info *c
 #ifdef CONFIG_METADATA
       // here, send the 'pvol' metadata message when the airplay volume information
       // is being used by shairport sync to control the output volume
-      char *dv = malloc(128); // will be freed in the metadata thread
-      if (dv) {
-        memset(dv, 0, 128);
-        if (volume_mode == vol_both) {
-          // normalise the maximum output to the hardware device's max output
-          snprintf(dv, 127, "%.2f,%.2f,%.2f,%.2f", airplay_volume,
-                   (scaled_attenuation - max_db + hw_max_db) / 100.0,
-                   (min_db - max_db + hw_max_db) / 100.0, (max_db - max_db + hw_max_db) / 100.0);
-        } else {
-          snprintf(dv, 127, "%.2f,%.2f,%.2f,%.2f", airplay_volume, scaled_attenuation / 100.0,
-                   min_db / 100.0, max_db / 100.0);
-        }
-        send_ssnc_metadata('pvol', dv, strlen(dv), 1);
-      }
+      char dv[128];
+                       memset(dv, 0, 128);
+                       if (volume_mode == vol_both) {
+                               // normalise the maximum output to the hardware device's max output
+                               snprintf(dv, 127, "%.2f,%.2f,%.2f,%.2f", airplay_volume,
+                                                                (scaled_attenuation - max_db + hw_max_db) / 100.0,
+                                                                (min_db - max_db + hw_max_db) / 100.0, (max_db - max_db + hw_max_db) / 100.0);
+                       } else {
+                               snprintf(dv, 127, "%.2f,%.2f,%.2f,%.2f", airplay_volume, scaled_attenuation / 100.0,
+                                                                min_db / 100.0, max_db / 100.0);
+                       }
+                       send_ssnc_metadata('pvol', dv, strlen(dv), 1);
 #endif
 
       if (config.output->mute)
@@ -2973,12 +2971,10 @@ void player_volume_without_notification(double airplay_volume, rtsp_conn_info *c
   else {
     // here, send the 'pvol' metadata message when the airplay volume information
     // is being used by shairport sync to control the output volume
-    char *dv = malloc(128); // will be freed in the metadata thread
-    if (dv) {
-      memset(dv, 0, 128);
-      snprintf(dv, 127, "%.2f,%.2f,%.2f,%.2f", airplay_volume, 0.0, 0.0, 0.0);
-      send_ssnc_metadata('pvol', dv, strlen(dv), 1);
-    }
+               char dv[128];
+               memset(dv, 0, 128);
+               snprintf(dv, 127, "%.2f,%.2f,%.2f,%.2f", airplay_volume, 0.0, 0.0, 0.0);
+               send_ssnc_metadata('pvol', dv, strlen(dv), 1);
   }
 #endif
 
diff --git a/rtp.c b/rtp.c
index f7e12c500f368f6fa4334921891115ccc5ce1b81..71d6df0e5387fa76c11931d1cb2bf0534e66eeca 100644 (file)
--- a/rtp.c
+++ b/rtp.c
@@ -998,8 +998,8 @@ void rtp_setup(SOCKADDR *local, SOCKADDR *remote, uint16_t cport, uint16_t tport
     conn->rtp_running = 1;
 
 #ifdef CONFIG_METADATA
-    send_ssnc_metadata('clip', strdup(conn->client_ip_string), strlen(conn->client_ip_string), 1);
-    send_ssnc_metadata('svip', strdup(conn->self_ip_string), strlen(conn->self_ip_string), 1);
+    send_ssnc_metadata('clip', conn->client_ip_string, strlen(conn->client_ip_string), 1);
+    send_ssnc_metadata('svip', conn->self_ip_string, strlen(conn->self_ip_string), 1);
 #endif
   }
 }
diff --git a/rtsp.c b/rtsp.c
index 14807f745fee341f6cde44d5c406b5796218fc91..d039a161ef662067ecd5c749f633ce179b3b07ee 100644 (file)
--- a/rtsp.c
+++ b/rtsp.c
@@ -114,7 +114,8 @@ typedef struct {
   pthread_mutex_t pc_queue_lock;
   pthread_cond_t pc_queue_item_added_signal;
   pthread_cond_t pc_queue_item_removed_signal;
-  size_t item_size;  // number of bytes in each item
+  char *name;
+       size_t item_size;  // number of bytes in each item
   uint32_t count;    // number of items in the queue
   uint32_t capacity; // maximum number of items
   uint32_t toq;      // first item to take
@@ -151,7 +152,11 @@ typedef struct {
   rtsp_message *carrier;
 } metadata_package;
 
-void pc_queue_init(pc_queue *the_queue, char *items, size_t item_size, uint32_t number_of_items) {
+void pc_queue_init(pc_queue *the_queue, char *items, size_t item_size, uint32_t number_of_items, const char* name) {
+       if (name)
+               debug(1, "Creating metadata queue \"%s\".", name);
+       else
+               debug(1, "Creating an unnamed metadata queue.");
   pthread_mutex_init(&the_queue->pc_queue_lock, NULL);
   pthread_cond_init(&the_queue->pc_queue_item_added_signal, NULL);
   pthread_cond_init(&the_queue->pc_queue_item_removed_signal, NULL);
@@ -161,9 +166,19 @@ void pc_queue_init(pc_queue *the_queue, char *items, size_t item_size, uint32_t
   the_queue->capacity = number_of_items;
   the_queue->toq = 0;
   the_queue->eoq = 0;
+  if (name == NULL)
+       the_queue->name = NULL;
+  else
+       the_queue->name = strdup(name);
 }
 
 void pc_queue_delete(pc_queue *the_queue) {
+       if (the_queue->name)
+               debug(1, "Deleting metadata queue \"%s\".", the_queue->name);
+       else
+               debug(1, "Deleting an unnamed metadata queue.");
+       if (the_queue->name != NULL)
+               free(the_queue->name);
   pthread_cond_destroy(&the_queue->pc_queue_item_removed_signal);
   pthread_cond_destroy(&the_queue->pc_queue_item_added_signal);
   pthread_mutex_destroy(&the_queue->pc_queue_lock);
@@ -185,6 +200,7 @@ void pc_queue_cleanup_handler(void *arg) {
 }
 
 int pc_queue_add_item(pc_queue *the_queue, const void *the_stuff, int block) {
+       int response = 0;
   int rc;
   if (the_queue) {
     if (block == 0) {
@@ -196,34 +212,43 @@ int pc_queue_add_item(pc_queue *the_queue, const void *the_stuff, int block) {
     if (rc)
       debug(1, "Error locking for pc_queue_add_item");
     pthread_cleanup_push(pc_queue_cleanup_handler, (void *)the_queue);
-    while (the_queue->count == the_queue->capacity) {
-      rc = pthread_cond_wait(&the_queue->pc_queue_item_removed_signal, &the_queue->pc_queue_lock);
-      if (rc)
-        debug(1, "Error waiting for item to be removed");
+    // leave this out if you want this to return if the queue is already full
+    // irrespective of the block flag.
+    /*
+               while (the_queue->count == the_queue->capacity) {
+                       rc = pthread_cond_wait(&the_queue->pc_queue_item_removed_signal, &the_queue->pc_queue_lock);
+                       if (rc)
+                               debug(1, "Error waiting for item to be removed");
+               }
+               */
+    if (the_queue->count < the_queue->capacity) {
+                       uint32_t i = the_queue->eoq;
+                       void *p = the_queue->items + the_queue->item_size * i;
+                       //    void * p = &the_queue->qbase + the_queue->item_size*the_queue->eoq;
+                       memcpy(p, the_stuff, the_queue->item_size);
+
+                       // update the pointer
+                       i++;
+                       if (i == the_queue->capacity)
+                               // fold pointer if necessary
+                               i = 0;
+                       the_queue->eoq = i;
+                       the_queue->count++;
+                       //debug(2,"metadata queue+ \"%s\" %d/%d.", the_queue->name, the_queue->count, the_queue->capacity);
+                       if (the_queue->count == the_queue->capacity)
+                               debug(3, "metadata queue \"%s\": is now full with %d items in it!", the_queue->name, the_queue->count);
+                       rc = pthread_cond_signal(&the_queue->pc_queue_item_added_signal);
+                       if (rc)
+                               debug(1, "metadata queue \"%s\": error signalling after pc_queue_add_item", the_queue->name);
+    } else {
+       response = EWOULDBLOCK; // a bit arbitrary, this.
+       debug(3,"metadata queue \"%s\": is already full with %d items in it. Not adding this item to the queue.", the_queue->name, the_queue->count);
     }
-    uint32_t i = the_queue->eoq;
-    void *p = the_queue->items + the_queue->item_size * i;
-    //    void * p = &the_queue->qbase + the_queue->item_size*the_queue->eoq;
-    memcpy(p, the_stuff, the_queue->item_size);
-
-    // update the pointer
-    i++;
-    if (i == the_queue->capacity)
-      // fold pointer if necessary
-      i = 0;
-    the_queue->eoq = i;
-    the_queue->count++;
-    debug(2,"metadata queue+ %d/%d.", the_queue->count, the_queue->capacity);
-    if (the_queue->count == the_queue->capacity)
-      debug(1, "pc_queue is full with %d items in it!", the_queue->count);
-    rc = pthread_cond_signal(&the_queue->pc_queue_item_added_signal);
-    if (rc)
-      debug(1, "Error signalling after pc_queue_add_item");
     pthread_cleanup_pop(1); // unlock the queue lock.
   } else {
     debug(1, "Adding an item to a NULL queue");
   }
-  return 0;
+  return response;
 }
 
 int pc_queue_get_item(pc_queue *the_queue, void *the_stuff) {
@@ -231,12 +256,12 @@ int pc_queue_get_item(pc_queue *the_queue, void *the_stuff) {
   if (the_queue) {
     rc = pthread_mutex_lock(&the_queue->pc_queue_lock);
     if (rc)
-      debug(1, "Error locking for pc_queue_get_item");
+      debug(1, "metadata queue \"%s\": error locking for pc_queue_get_item", the_queue->name);
     pthread_cleanup_push(pc_queue_cleanup_handler, (void *)the_queue);
     while (the_queue->count == 0) {
       rc = pthread_cond_wait(&the_queue->pc_queue_item_added_signal, &the_queue->pc_queue_lock);
       if (rc)
-        debug(1, "Error waiting for item to be added");
+        debug(1, "metadata queue \"%s\": error waiting for item to be added", the_queue->name);
     }
     uint32_t i = the_queue->toq;
     //    void * p = &the_queue->qbase + the_queue->item_size*the_queue->toq;
@@ -250,10 +275,10 @@ int pc_queue_get_item(pc_queue *the_queue, void *the_stuff) {
       i = 0;
     the_queue->toq = i;
     the_queue->count--;
-    debug(2,"metadata queue- %d/%d.", the_queue->count, the_queue->capacity);
+    debug(3,"metadata queue- \"%s\" %d/%d.", the_queue->name, the_queue->count, the_queue->capacity);
     rc = pthread_cond_signal(&the_queue->pc_queue_item_removed_signal);
     if (rc)
-      debug(1, "Error signalling after pc_queue_removed_item");
+      debug(1, "metadata queue \"%s\": error signalling after pc_queue_get_item", the_queue->name);
     pthread_cleanup_pop(1); // unlock the queue lock.
   } else {
     debug(1, "Removing an item from a NULL queue");
@@ -417,6 +442,7 @@ void msg_retain(rtsp_message *msg) {
     debug(1, "Error %d locking reference counter lock");
   if (msg > (rtsp_message *)0x00010000) {
     msg->referenceCount++;
+       debug(3,"msg_free increment reference counter message %d to %d.", msg->index_number,  msg->referenceCount);
     // debug(1,"msg_retain -- item %d reference count %d.", msg->index_number, msg->referenceCount);
     rc = pthread_mutex_unlock(&reference_counter_lock);
     if (rc)
@@ -432,6 +458,7 @@ rtsp_message *msg_init(void) {
     memset(msg, 0, sizeof(rtsp_message));
     msg->referenceCount = 1; // from now on, any access to this must be protected with the lock
     msg->index_number = msg_indexes++;
+    debug(3,"msg_init message %d", msg->index_number);
   } else {
     die("msg_init -- can not allocate memory for rtsp_message %d.", msg_indexes);
   }
@@ -495,6 +522,8 @@ void msg_free(rtsp_message **msgh) {
   if (*msgh > (rtsp_message *)0x00010000) {
     rtsp_message *msg = *msgh;
     msg->referenceCount--;
+       if (msg->referenceCount)
+               debug(3,"msg_free decrement reference counter message %d to %d", msg->index_number, msg->referenceCount);
     if (msg->referenceCount == 0) {
       unsigned int i;
       for (i = 0; i < msg->nheaders; i++) {
@@ -509,6 +538,7 @@ void msg_free(rtsp_message **msgh) {
         index = 0x10000; // ensure it doesn't fold to zero.
       *msgh =
           (rtsp_message *)(index); // put a version of the index number of the freed message in here
+                 debug(3,"msg_free freed message %d", msg->index_number);
       free(msg);
     } else {
       // debug(1,"msg_free item %d -- decrement reference to
@@ -1072,7 +1102,7 @@ void handle_set_parameter_parameter(rtsp_conn_info *conn, rtsp_message *req,
       char *progress = cp + strlen("progress: ");
       // debug(2, "progress: \"%s\"",progress); // rtpstampstart/rtpstampnow/rtpstampend 44100 per
       // second
-      send_ssnc_metadata('prgr', strdup(progress), strlen(progress), 1);
+      send_ssnc_metadata('prgr', progress, strlen(progress), 1);
 
     } else
 #endif
@@ -1220,14 +1250,28 @@ char *base64_encode_so(const unsigned char *data, size_t input_length, char *enc
 static int fd = -1;
 // static int dirty = 0;
 pc_queue metadata_queue;
+
 static int metadata_sock = -1;
 static struct sockaddr_in metadata_sockaddr;
 static char *metadata_sockmsg;
 #define metadata_queue_size 500
 metadata_package metadata_queue_items[metadata_queue_size];
-
 pthread_t metadata_thread;
 
+#ifdef CONFIG_METADATA_HUB
+pc_queue metadata_hub_queue;
+#define metadata_hub_queue_size 500
+metadata_package metadata_hub_queue_items[metadata_hub_queue_size];
+pthread_t metadata_hub_thread;
+#endif
+
+#ifdef CONFIG_MQTT
+pc_queue metadata_mqtt_queue;
+#define metadata_mqtt_queue_size 500
+metadata_package metadata_mqtt_queue_items[metadata_mqtt_queue_size];
+pthread_t metadata_mqtt_thread;
+#endif
+
 void metadata_create_multicast_socket(void) {
   if (config.metadata_enabled == 0)
     return;
@@ -1424,13 +1468,6 @@ void metadata_process(uint32_t type, uint32_t code, char *data, uint32_t length)
   }
 }
 
-void metadata_thread_cleanup_function(__attribute__((unused)) void *arg) {
-  debug(2, "metadata_thread_cleanup_function called");
-  metadata_delete_multicast_socket();
-  metadata_close();
-  pc_queue_delete(&metadata_queue);
-}
-
 void metadata_pack_cleanup_function(void *arg) {
   // debug(1, "metadata_pack_cleanup_function called");
   metadata_package *pack = (metadata_package *)arg;
@@ -1440,55 +1477,141 @@ void metadata_pack_cleanup_function(void *arg) {
     free(pack->data);
 }
 
+void metadata_thread_cleanup_function(__attribute__((unused)) void *arg) {
+  debug(2, "metadata_thread_cleanup_function called");
+  metadata_delete_multicast_socket();
+  metadata_close();
+  pc_queue_delete(&metadata_queue);
+}
+
 void *metadata_thread_function(__attribute__((unused)) void *ignore) {
   // create a pc_queue for passing information to a threaded metadata handler
   pc_queue_init(&metadata_queue, (char *)&metadata_queue_items, sizeof(metadata_package),
-                metadata_queue_size);
+                metadata_queue_size, "pipe");
   metadata_create_multicast_socket();
   metadata_package pack;
   pthread_cleanup_push(metadata_thread_cleanup_function, NULL);
   while (1) {
     pc_queue_get_item(&metadata_queue, &pack);
-    // debug(1,"pc_queue get item.");
     pthread_cleanup_push(metadata_pack_cleanup_function, (void *)&pack);
     if (config.metadata_enabled) {
-       // debug(1, "metadata_process type %x, code %x and length %u.", pack.type, pack.code, pack.length);
-      // metadata_process(pack.type, pack.code, pack.data, pack.length);
+       if (pack.carrier) {
+               debug(3, "     pipe: type %x, code %x, length %u, message %d.", pack.type, pack.code, pack.length, pack.carrier->index_number);
+       } else {
+               debug(3, "     pipe: type %x, code %x, length %u.", pack.type, pack.code, pack.length);
+       }
+      metadata_process(pack.type, pack.code, pack.data, pack.length);
+      debug(3, "     pipe: done.");
+    }
+    pthread_cleanup_pop(1);
+  }
+  pthread_cleanup_pop(1); // will never happen
+  pthread_exit(NULL);
+}
+
 #ifdef CONFIG_METADATA_HUB
-       debug(1, "metadata_hub_process.");
-      metadata_hub_process_metadata(pack.type, pack.code, pack.data, pack.length);
+void metadata_hub_close(void) {
+}
+
+void metadata_hub_thread_cleanup_function(__attribute__((unused)) void *arg) {
+  debug(2, "metadata_hub_thread_cleanup_function called");
+  metadata_hub_close();
+  pc_queue_delete(&metadata_hub_queue);
+}
+
+void *metadata_hub_thread_function(__attribute__((unused)) void *ignore) {
+  // create a pc_queue for passing information to a threaded metadata handler
+  pc_queue_init(&metadata_hub_queue, (char *)&metadata_hub_queue_items, sizeof(metadata_package),
+                metadata_hub_queue_size, "hub");
+  metadata_package pack;
+  pthread_cleanup_push(metadata_hub_thread_cleanup_function, NULL);
+  while (1) {
+    pc_queue_get_item(&metadata_hub_queue, &pack);
+    pthread_cleanup_push(metadata_pack_cleanup_function, (void *)&pack);
+       if (pack.carrier) {
+               debug(3, "                    hub: type %x, code %x, length %u, message %d.", pack.type, pack.code, pack.length, pack.carrier->index_number);
+       } else {
+               debug(3, "                    hub: type %x, code %x, length %u.", pack.type, pack.code, pack.length);
+       }
+               metadata_hub_process_metadata(pack.type, pack.code, pack.data, pack.length);
+               debug(3, "                    hub: done.");
+    pthread_cleanup_pop(1);
+  }
+  pthread_cleanup_pop(1); // will never happen
+  pthread_exit(NULL);
+}
 #endif
 
 #ifdef CONFIG_MQTT
-      if (config.mqtt_enabled) {
-               debug(1, "mqtt_process_metadata.");
-        mqtt_process_metadata(pack.type, pack.code, pack.data, pack.length);
-      }
-#endif
-               debug(1, "done.");
-    }
+void metadata_mqtt_close(void) {
+}
+
+void metadata_mqtt_thread_cleanup_function(__attribute__((unused)) void *arg) {
+  debug(2, "metadata_mqtt_thread_cleanup_function called");
+  metadata_mqtt_close();
+  pc_queue_delete(&metadata_hub_queue);
+}
+
+void *metadata_mqtt_thread_function(__attribute__((unused)) void *ignore) {
+  // create a pc_queue for passing information to a threaded metadata handler
+  pc_queue_init(&metadata_mqtt_queue, (char *)&metadata_mqtt_queue_items, sizeof(metadata_package),
+                metadata_mqtt_queue_size, "mqtt");
+  metadata_package pack;
+  pthread_cleanup_push(metadata_mqtt_thread_cleanup_function, NULL);
+  while (1) {
+    pc_queue_get_item(&metadata_mqtt_queue, &pack);
+    pthread_cleanup_push(metadata_pack_cleanup_function, (void *)&pack);
+               if (config.mqtt_enabled) {
+       if (pack.carrier) {
+               debug(3, "                                        mqtt: type %x, code %x, length %u, message %d.", pack.type, pack.code, pack.length, pack.carrier->index_number);
+       } else {
+               debug(3, "                                        mqtt: type %x, code %x, length %u.", pack.type, pack.code, pack.length);
+       }
+                       mqtt_process_metadata(pack.type, pack.code, pack.data, pack.length);
+                       debug(3, "                                        mqtt: done.");
+               }
+
     pthread_cleanup_pop(1);
   }
   pthread_cleanup_pop(1); // will never happen
   pthread_exit(NULL);
 }
+#endif
 
 void metadata_init(void) {
   int ret = pthread_create(&metadata_thread, NULL, metadata_thread_function, NULL);
   if (ret)
     debug(1, "Failed to create metadata thread!");
+#ifdef CONFIG_METADATA_HUB
+  ret = pthread_create(&metadata_hub_thread, NULL, metadata_hub_thread_function, NULL);
+  if (ret)
+    debug(1, "Failed to create metadata hub thread!");
+#endif
+#ifdef CONFIG_MQTT
+  ret = pthread_create(&metadata_mqtt_thread, NULL, metadata_mqtt_thread_function, NULL);
+  if (ret)
+    debug(1, "Failed to create metadata mqtt thread!");
+#endif
   metadata_running = 1;
 }
 
 void metadata_stop(void) {
   if (metadata_running) {
     debug(2, "metadata_stop called.");
+#ifdef CONFIG_MQTT
+    pthread_join(metadata_mqtt_thread, NULL);
+    pthread_cancel(metadata_mqtt_thread);
+#endif
+#ifdef CONFIG_METADATA_HUB
+    pthread_join(metadata_hub_thread, NULL);
+    pthread_cancel(metadata_hub_thread);
+#endif
     pthread_cancel(metadata_thread);
     pthread_join(metadata_thread, NULL);
   }
 }
 
-int send_metadata(uint32_t type, uint32_t code, char *data, uint32_t length, rtsp_message *carrier,
+int send_metadata_to_queue(pc_queue* queue, uint32_t type, uint32_t code, char *data, uint32_t length, rtsp_message *carrier,
                   int block) {
 
   // parameters: type, code, pointer to data or NULL, length of data or NULL,
@@ -1505,37 +1628,64 @@ int send_metadata(uint32_t type, uint32_t code, char *data, uint32_t length, rts
 
   // The reading of the parameters is a bit complex
   // If the rtsp_message field is non-null, then it represents an rtsp_message
-  // which should be freed
-  // in the thread handler when the parameter pointed to by the pointer and
-  // specified by the length
-  // is finished with
-  // If the rtsp_message is NULL, then if the pointer is non-null, it points to
-  // a malloc'ed block
-  // and should be freed when the thread is finished with it. The length of the
-  // data in the block is
-  // given in length
+  // and the data pointer is assumed to point to something within it.
+  // The reference counter of the rtsp_message is incremented here and
+  // should be decremented by the metadata handler when finished.
+  // If the reference count reduces to zero, the message will be freed.
+
+  // If the rtsp_message is NULL, then if the pointer is non-null then the data it
+  // points to, of the length specified, is memcpy'd and passed to the metadata
+  // handler. The handler should free it when done.
   // If the rtsp_message is NULL and the pointer is also NULL, nothing further
   // is done.
 
   metadata_package pack;
   pack.type = type;
   pack.code = code;
-  pack.data = data;
   pack.length = length;
   pack.carrier = carrier;
-  if (pack.carrier)
+  pack.data = data;
+  if (pack.carrier) {
     msg_retain(pack.carrier);
-  int rc = pc_queue_add_item(&metadata_queue, &pack, block);
-  if (rc == EBUSY) {
-    if (pack.carrier)
+  } else {
+       if (data)
+               pack.data = memdup(data,length); // only if it's not a null
+  }
+  int rc = pc_queue_add_item(queue, &pack, block);
+  if (rc != 0) {
+    if (pack.carrier) {
+       if (rc == EWOULDBLOCK)
+               debug(2, "metadata queue \"%s\" full, dropping message item: type %x, code %x, data %x, length %u, message %d.", queue->name, pack.type, pack.code, pack.data, pack.length, pack.carrier->index_number);
       msg_free(&pack.carrier);
-    else if (data)
-      free(data);
-    warn("Metadata queue is busy, discarding message of type 0x%08X, code 0x%08X.", type, code);
+    } else {
+       if (rc == EWOULDBLOCK)
+               debug(2, "metadata queue \"%s\" full, dropping data item: type %x, code %x, data %x, length %u.", queue->name, pack.type, pack.code, pack.data, pack.length);
+       if (pack.data)
+       free(pack.data);
+    }
   }
   return rc;
 }
 
+int send_metadata(uint32_t type, uint32_t code, char *data, uint32_t length, rtsp_message *carrier,
+                  int block) {
+       int rc;
+       rc = send_metadata_to_queue(&metadata_queue, type, code, data, length, carrier, block);
+
+#ifdef CONFIG_METADATA_HUB
+       rc = send_metadata_to_queue(&metadata_hub_queue, type, code, data, length, carrier, block);
+#endif
+
+#ifdef CONFIG_MQTT
+       rc = send_metadata_to_queue(&metadata_mqtt_queue, type, code, data, length, carrier, block);
+#endif
+
+       return rc;
+}
+
+
+
+
 static void handle_set_parameter_metadata(__attribute__((unused)) rtsp_conn_info *conn,
                                           rtsp_message *req,
                                           __attribute__((unused)) rtsp_message *resp) {