From: Mike Brady Date: Sat, 30 May 2020 14:49:00 +0000 (+0100) Subject: Make the pipe, hub and mqtt separate threads with their own metadata queues. If one... X-Git-Tag: 3.3.7d12~63 X-Git-Url: http://git.ipfire.org/cgi-bin/gitweb.cgi?a=commitdiff_plain;h=5decc1bb060d408427bdfd6483dd5ffd4b727026;p=thirdparty%2Fshairport-sync.git Make the pipe, hub and mqtt separate threads with their own metadata queues. If one blocks, no new metadata is sent to it and the others are unaffected. --- diff --git a/common.c b/common.c index f10ef7bf..9bf0dd14 100644 --- a/common.c +++ b/common.c @@ -1700,3 +1700,13 @@ int string_update_with_size(char **str, int *flag, char *s, size_t len) { } return *flag; } + +// from https://stackoverflow.com/questions/13663617/memdup-function-in-c, with thanks +void* memdup(const void* mem, size_t size) { + void* out = malloc(size); + + if(out != NULL) + memcpy(out, mem, size); + + return out; +} diff --git a/common.h b/common.h index 3540bcd0..93bb9c36 100644 --- a/common.h +++ b/common.h @@ -441,4 +441,7 @@ void malloc_cleanup(void *arg); int string_update_with_size(char **str, int *flag, char *s, size_t len); +// from https://stackoverflow.com/questions/13663617/memdup-function-in-c, with thanks +void* memdup(const void* mem, size_t size); + #endif // _COMMON_H diff --git a/metadata_hub.c b/metadata_hub.c index eb63d2df..2ea442a5 100644 --- a/metadata_hub.c +++ b/metadata_hub.c @@ -123,12 +123,13 @@ void run_metadata_watchers(void) { } void metadata_hub_unlock_hub_mutex_cleanup(__attribute__((unused)) void *arg) { - // debug(1, "metadata_hub_unlock_hub_mutex_cleanup called."); - pthread_rwlock_unlock(&metadata_hub_re_lock); + debug(1, "metadata_hub_unlock_hub_mutex_cleanup called."); + metadata_hub_modify_epilog(0); } char *last_metadata_hub_modify_prolog_file = NULL; int last_metadata_hub_modify_prolog_line; +int metadata_hub_re_lock_access_is_delayed; void _metadata_hub_modify_prolog(const char *filename, const int linenumber) { // always run this before changing an entry or a sequence of entries in the metadata_hub @@ -138,8 +139,9 @@ void _metadata_hub_modify_prolog(const char *filename, const int linenumber) { debug(1, "Metadata_hub write lock at \"%s:%d\" is already taken at \"%s:%d\" -- must wait.", filename, linenumber, last_metadata_hub_modify_prolog_file, last_metadata_hub_modify_prolog_line); else debug(1, "Metadata_hub write lock is already taken by unknown -- must wait."); + metadata_hub_re_lock_access_is_delayed = 0; pthread_rwlock_wrlock(&metadata_hub_re_lock); - debug(1, "Okay -- acquired the metadata_hub write lock."); + debug(1, "Okay -- acquired the metadata_hub write lock at \"%s:%d\".", filename, linenumber); } else { if (last_metadata_hub_modify_prolog_file) { free(last_metadata_hub_modify_prolog_file); @@ -148,6 +150,7 @@ void _metadata_hub_modify_prolog(const char *filename, const int linenumber) { last_metadata_hub_modify_prolog_line = linenumber; debug(3, "Metadata_hub write lock acquired."); } + metadata_hub_re_lock_access_is_delayed = 0; } void _metadata_hub_modify_epilog(int modified, const char *filename, const int linenumber) { @@ -156,9 +159,14 @@ void _metadata_hub_modify_epilog(int modified, const char *filename, const int l if (modified) { run_metadata_watchers(); } - if (last_metadata_hub_modify_prolog_file) { - free(last_metadata_hub_modify_prolog_file); - last_metadata_hub_modify_prolog_file = NULL; + if (metadata_hub_re_lock_access_is_delayed) { + if (last_metadata_hub_modify_prolog_file) { + debug(1, "Metadata_hub write lock taken at \"%s:%d\" is freed at \"%s:%d\".", last_metadata_hub_modify_prolog_file, last_metadata_hub_modify_prolog_line, filename, linenumber); + free(last_metadata_hub_modify_prolog_file); + last_metadata_hub_modify_prolog_file = NULL; + } else { + debug(1, "Metadata_hub write lock taken at an unknown place is freed at \"%s:%d\".", filename, linenumber); + } } pthread_rwlock_unlock(&metadata_hub_re_lock); debug(3, "Metadata_hub write lock unlocked."); @@ -316,10 +324,15 @@ void metadata_hub_process_metadata(uint32_t type, uint32_t code, char *data, uin // all the following items of metadata are contained in one metadata packet // they are preceded by an 'ssnc' 'mdst' item and followed by an 'ssnc 'mden' item. - debug(1, "metadata_hub_process_metadata type %x, code %x and length %u.", type, code, length); + // we don't set "changed" for them individually; instead we set it when the 'mden' token + // comes in if the metadata_packet_item_changed is set. - char *cs; int changed = 0; + int metadata_packet_item_changed; + metadata_hub_modify_prolog(); + pthread_cleanup_push(metadata_hub_unlock_hub_mutex_cleanup, NULL); + + char *cs; if (type == 'core') { switch (code) { case 'mper': { @@ -334,6 +347,7 @@ void metadata_hub_process_metadata(uint32_t type, uint32_t code, char *data, uin metadata_store.item_id_changed = 1; metadata_store.item_id_received = 1; debug(2, "MH Item ID set to: \"%" PRIx64 "\"", metadata_store.item_id); + metadata_packet_item_changed = 1; } } break; case 'astm': { @@ -343,12 +357,14 @@ void metadata_hub_process_metadata(uint32_t type, uint32_t code, char *data, uin metadata_store.songtime_in_milliseconds = ui; metadata_store.songtime_in_milliseconds_changed = 1; debug(2, "MH Song Time set to: \"%u\"", metadata_store.songtime_in_milliseconds); + metadata_packet_item_changed = 1; } } break; case 'asal': cs = strndup(data, length); if (string_update(&metadata_store.album_name, &metadata_store.album_name_changed, cs)) { debug(2, "MH Album name set to: \"%s\"", metadata_store.album_name); + metadata_packet_item_changed = 1; } free(cs); break; @@ -356,6 +372,7 @@ void metadata_hub_process_metadata(uint32_t type, uint32_t code, char *data, uin cs = strndup(data, length); if (string_update(&metadata_store.artist_name, &metadata_store.artist_name_changed, cs)) { debug(2, "MH Artist name set to: \"%s\"", metadata_store.artist_name); + metadata_packet_item_changed = 1; } free(cs); break; @@ -364,6 +381,7 @@ void metadata_hub_process_metadata(uint32_t type, uint32_t code, char *data, uin if (string_update(&metadata_store.album_artist_name, &metadata_store.album_artist_name_changed, cs)) { debug(2, "MH Album Artist name set to: \"%s\"", metadata_store.album_artist_name); + metadata_packet_item_changed = 1; } free(cs); break; @@ -371,6 +389,7 @@ void metadata_hub_process_metadata(uint32_t type, uint32_t code, char *data, uin cs = strndup(data, length); if (string_update(&metadata_store.comment, &metadata_store.comment_changed, cs)) { debug(2, "MH Comment set to: \"%s\"", metadata_store.comment); + metadata_packet_item_changed = 1; } free(cs); break; @@ -378,6 +397,7 @@ void metadata_hub_process_metadata(uint32_t type, uint32_t code, char *data, uin cs = strndup(data, length); if (string_update(&metadata_store.genre, &metadata_store.genre_changed, cs)) { debug(2, "MH Genre set to: \"%s\"", metadata_store.genre); + metadata_packet_item_changed = 1; } free(cs); break; @@ -385,6 +405,7 @@ void metadata_hub_process_metadata(uint32_t type, uint32_t code, char *data, uin cs = strndup(data, length); if (string_update(&metadata_store.track_name, &metadata_store.track_name_changed, cs)) { debug(2, "MH Track Name set to: \"%s\"", metadata_store.track_name); + metadata_packet_item_changed = 1; } free(cs); break; @@ -392,6 +413,7 @@ void metadata_hub_process_metadata(uint32_t type, uint32_t code, char *data, uin cs = strndup(data, length); if (string_update(&metadata_store.composer, &metadata_store.composer_changed, cs)) { debug(2, "MH Composer set to: \"%s\"", metadata_store.composer); + metadata_packet_item_changed = 1; } free(cs); break; @@ -408,6 +430,7 @@ void metadata_hub_process_metadata(uint32_t type, uint32_t code, char *data, uin if (string_update(&metadata_store.song_album_artist, &metadata_store.song_album_artist_changed, cs)) { debug(2, "MH Song Album Artist set to: \"%s\"", metadata_store.song_album_artist); + metadata_packet_item_changed = 1; } free(cs); break; @@ -415,6 +438,7 @@ void metadata_hub_process_metadata(uint32_t type, uint32_t code, char *data, uin cs = strndup(data, length); if (string_update(&metadata_store.sort_name, &metadata_store.sort_name_changed, cs)) { debug(2, "MH Sort Name set to: \"%s\"", metadata_store.sort_name); + metadata_packet_item_changed = 1; } free(cs); break; @@ -422,6 +446,7 @@ void metadata_hub_process_metadata(uint32_t type, uint32_t code, char *data, uin cs = strndup(data, length); if (string_update(&metadata_store.sort_artist, &metadata_store.sort_artist_changed, cs)) { debug(2, "MH Sort Artist set to: \"%s\"", metadata_store.sort_artist); + metadata_packet_item_changed = 1; } free(cs); break; @@ -429,6 +454,7 @@ void metadata_hub_process_metadata(uint32_t type, uint32_t code, char *data, uin cs = strndup(data, length); if (string_update(&metadata_store.sort_album, &metadata_store.sort_album_changed, cs)) { debug(2, "MH Sort Album set to: \"%s\"", metadata_store.sort_album); + metadata_packet_item_changed = 1; } free(cs); break; @@ -436,6 +462,7 @@ void metadata_hub_process_metadata(uint32_t type, uint32_t code, char *data, uin cs = strndup(data, length); if (string_update(&metadata_store.sort_composer, &metadata_store.sort_composer_changed, cs)) { debug(2, "MH Sort Composer set to: \"%s\"", metadata_store.sort_composer); + metadata_packet_item_changed = 1; } free(cs); default: @@ -468,45 +495,45 @@ void metadata_hub_process_metadata(uint32_t type, uint32_t code, char *data, uin break; case 'mdst': debug(2, "MH Metadata stream processing start."); - metadata_hub_modify_prolog(); + metadata_packet_item_changed = 0; break; case 'mden': debug(2, "MH Metadata stream processing end."); - metadata_hub_modify_epilog(1); - debug(2, "MH Metadata stream processing epilog complete."); + changed = metadata_packet_item_changed; break; case 'PICT': - metadata_hub_modify_prolog(); - pthread_cleanup_push(metadata_hub_unlock_hub_mutex_cleanup, NULL); debug(2, "MH Picture received, length %u bytes.", length); + char uri[2048]; if ((length > 16) && (strcmp(config.cover_art_cache_dir,"")!=0)) { // if it's okay to write the file + // make this uncancellable + int oldState; + pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, &oldState); // make this un-cancellable char *pathname = metadata_write_image_file(data, length); snprintf(uri, sizeof(uri), "file://%s", pathname); free(pathname); + pthread_setcancelstate(oldState, NULL); + } else { uri[0] = '\0'; } if (string_update(&metadata_store.cover_art_pathname, &metadata_store.cover_art_pathname_changed, uri)) // if the picture's file path is different from the stored one... - metadata_hub_modify_epilog(1); + changed = 1; else - metadata_hub_modify_epilog(0); - pthread_cleanup_pop(0); // don't remove the lock -- it'll have been done + changed = 0; +// pthread_cleanup_pop(0); // don't remove the lock -- it'll have been done break; case 'clip': - metadata_hub_modify_prolog(); cs = strndup(data, length); if (string_update(&metadata_store.client_ip, &metadata_store.client_ip_changed, cs)) { changed = 1; debug(2, "MH Client IP set to: \"%s\"", metadata_store.client_ip); } free(cs); - metadata_hub_modify_epilog(changed); break; case 'prgr': - metadata_hub_modify_prolog(); cs = strndup(data, length); if (string_update(&metadata_store.progress_string, &metadata_store.progress_string_changed, cs)) { @@ -514,61 +541,45 @@ void metadata_hub_process_metadata(uint32_t type, uint32_t code, char *data, uin debug(2, "MH Progress String set to: \"%s\"", metadata_store.progress_string); } free(cs); - metadata_hub_modify_epilog(changed); break; case 'svip': - metadata_hub_modify_prolog(); cs = strndup(data, length); if (string_update(&metadata_store.server_ip, &metadata_store.server_ip_changed, cs)) { changed = 1; debug(2, "MH Server IP set to: \"%s\"", metadata_store.server_ip); } free(cs); - metadata_hub_modify_epilog(changed); break; case 'abeg': - metadata_hub_modify_prolog(); changed = (metadata_store.active_state != AM_ACTIVE); metadata_store.active_state = AM_ACTIVE; - metadata_hub_modify_epilog(changed); break; case 'aend': - metadata_hub_modify_prolog(); changed = (metadata_store.active_state != AM_INACTIVE); metadata_store.active_state = AM_INACTIVE; - metadata_hub_modify_epilog(changed); break; case 'pbeg': - metadata_hub_modify_prolog(); changed = ((metadata_store.player_state != PS_PLAYING) || (metadata_store.player_thread_active == 0)); metadata_store.player_state = PS_PLAYING; metadata_store.player_thread_active = 1; - metadata_hub_modify_epilog(changed); break; case 'pend': - metadata_hub_modify_prolog(); changed = ((metadata_store.player_state != PS_STOPPED) || (metadata_store.player_thread_active == 1)); metadata_store.player_state = PS_STOPPED; metadata_store.player_thread_active = 0; - metadata_hub_modify_epilog(changed); break; case 'pfls': - metadata_hub_modify_prolog(); changed = (metadata_store.player_state != PS_PAUSED); metadata_store.player_state = PS_PAUSED; - metadata_hub_modify_epilog(changed); break; case 'pffr': // this is sent when the first frame has been received case 'prsm': - metadata_hub_modify_prolog(); changed = (metadata_store.player_state != PS_PLAYING); metadata_store.player_state = PS_PLAYING; - metadata_hub_modify_epilog(changed); break; case 'pvol': { - metadata_hub_modify_prolog(); // Note: it's assumed that the config.airplay volume has already been correctly set. // int32_t actual_volume; // int gv = dacp_get_volume(&actual_volume); @@ -581,7 +592,6 @@ void metadata_hub_process_metadata(uint32_t type, uint32_t code, char *data, uin metadata_store.airplay_volume = config.airplay_volume; changed = 1; } - metadata_hub_modify_epilog(changed); // change } break; default: { @@ -604,4 +614,6 @@ void metadata_hub_process_metadata(uint32_t type, uint32_t code, char *data, uin } } } + pthread_cleanup_pop(0); // don't remove the lock + metadata_hub_modify_epilog(changed); } diff --git a/player.c b/player.c index 8847b8b5..c7020fc7 100644 --- a/player.c +++ b/player.c @@ -2941,20 +2941,18 @@ void player_volume_without_notification(double airplay_volume, rtsp_conn_info *c #ifdef CONFIG_METADATA // here, send the 'pvol' metadata message when the airplay volume information // is being used by shairport sync to control the output volume - char *dv = malloc(128); // will be freed in the metadata thread - if (dv) { - memset(dv, 0, 128); - if (volume_mode == vol_both) { - // normalise the maximum output to the hardware device's max output - snprintf(dv, 127, "%.2f,%.2f,%.2f,%.2f", airplay_volume, - (scaled_attenuation - max_db + hw_max_db) / 100.0, - (min_db - max_db + hw_max_db) / 100.0, (max_db - max_db + hw_max_db) / 100.0); - } else { - snprintf(dv, 127, "%.2f,%.2f,%.2f,%.2f", airplay_volume, scaled_attenuation / 100.0, - min_db / 100.0, max_db / 100.0); - } - send_ssnc_metadata('pvol', dv, strlen(dv), 1); - } + char dv[128]; + memset(dv, 0, 128); + if (volume_mode == vol_both) { + // normalise the maximum output to the hardware device's max output + snprintf(dv, 127, "%.2f,%.2f,%.2f,%.2f", airplay_volume, + (scaled_attenuation - max_db + hw_max_db) / 100.0, + (min_db - max_db + hw_max_db) / 100.0, (max_db - max_db + hw_max_db) / 100.0); + } else { + snprintf(dv, 127, "%.2f,%.2f,%.2f,%.2f", airplay_volume, scaled_attenuation / 100.0, + min_db / 100.0, max_db / 100.0); + } + send_ssnc_metadata('pvol', dv, strlen(dv), 1); #endif if (config.output->mute) @@ -2973,12 +2971,10 @@ void player_volume_without_notification(double airplay_volume, rtsp_conn_info *c else { // here, send the 'pvol' metadata message when the airplay volume information // is being used by shairport sync to control the output volume - char *dv = malloc(128); // will be freed in the metadata thread - if (dv) { - memset(dv, 0, 128); - snprintf(dv, 127, "%.2f,%.2f,%.2f,%.2f", airplay_volume, 0.0, 0.0, 0.0); - send_ssnc_metadata('pvol', dv, strlen(dv), 1); - } + char dv[128]; + memset(dv, 0, 128); + snprintf(dv, 127, "%.2f,%.2f,%.2f,%.2f", airplay_volume, 0.0, 0.0, 0.0); + send_ssnc_metadata('pvol', dv, strlen(dv), 1); } #endif diff --git a/rtp.c b/rtp.c index f7e12c50..71d6df0e 100644 --- a/rtp.c +++ b/rtp.c @@ -998,8 +998,8 @@ void rtp_setup(SOCKADDR *local, SOCKADDR *remote, uint16_t cport, uint16_t tport conn->rtp_running = 1; #ifdef CONFIG_METADATA - send_ssnc_metadata('clip', strdup(conn->client_ip_string), strlen(conn->client_ip_string), 1); - send_ssnc_metadata('svip', strdup(conn->self_ip_string), strlen(conn->self_ip_string), 1); + send_ssnc_metadata('clip', conn->client_ip_string, strlen(conn->client_ip_string), 1); + send_ssnc_metadata('svip', conn->self_ip_string, strlen(conn->self_ip_string), 1); #endif } } diff --git a/rtsp.c b/rtsp.c index 14807f74..d039a161 100644 --- a/rtsp.c +++ b/rtsp.c @@ -114,7 +114,8 @@ typedef struct { pthread_mutex_t pc_queue_lock; pthread_cond_t pc_queue_item_added_signal; pthread_cond_t pc_queue_item_removed_signal; - size_t item_size; // number of bytes in each item + char *name; + size_t item_size; // number of bytes in each item uint32_t count; // number of items in the queue uint32_t capacity; // maximum number of items uint32_t toq; // first item to take @@ -151,7 +152,11 @@ typedef struct { rtsp_message *carrier; } metadata_package; -void pc_queue_init(pc_queue *the_queue, char *items, size_t item_size, uint32_t number_of_items) { +void pc_queue_init(pc_queue *the_queue, char *items, size_t item_size, uint32_t number_of_items, const char* name) { + if (name) + debug(1, "Creating metadata queue \"%s\".", name); + else + debug(1, "Creating an unnamed metadata queue."); pthread_mutex_init(&the_queue->pc_queue_lock, NULL); pthread_cond_init(&the_queue->pc_queue_item_added_signal, NULL); pthread_cond_init(&the_queue->pc_queue_item_removed_signal, NULL); @@ -161,9 +166,19 @@ void pc_queue_init(pc_queue *the_queue, char *items, size_t item_size, uint32_t the_queue->capacity = number_of_items; the_queue->toq = 0; the_queue->eoq = 0; + if (name == NULL) + the_queue->name = NULL; + else + the_queue->name = strdup(name); } void pc_queue_delete(pc_queue *the_queue) { + if (the_queue->name) + debug(1, "Deleting metadata queue \"%s\".", the_queue->name); + else + debug(1, "Deleting an unnamed metadata queue."); + if (the_queue->name != NULL) + free(the_queue->name); pthread_cond_destroy(&the_queue->pc_queue_item_removed_signal); pthread_cond_destroy(&the_queue->pc_queue_item_added_signal); pthread_mutex_destroy(&the_queue->pc_queue_lock); @@ -185,6 +200,7 @@ void pc_queue_cleanup_handler(void *arg) { } int pc_queue_add_item(pc_queue *the_queue, const void *the_stuff, int block) { + int response = 0; int rc; if (the_queue) { if (block == 0) { @@ -196,34 +212,43 @@ int pc_queue_add_item(pc_queue *the_queue, const void *the_stuff, int block) { if (rc) debug(1, "Error locking for pc_queue_add_item"); pthread_cleanup_push(pc_queue_cleanup_handler, (void *)the_queue); - while (the_queue->count == the_queue->capacity) { - rc = pthread_cond_wait(&the_queue->pc_queue_item_removed_signal, &the_queue->pc_queue_lock); - if (rc) - debug(1, "Error waiting for item to be removed"); + // leave this out if you want this to return if the queue is already full + // irrespective of the block flag. + /* + while (the_queue->count == the_queue->capacity) { + rc = pthread_cond_wait(&the_queue->pc_queue_item_removed_signal, &the_queue->pc_queue_lock); + if (rc) + debug(1, "Error waiting for item to be removed"); + } + */ + if (the_queue->count < the_queue->capacity) { + uint32_t i = the_queue->eoq; + void *p = the_queue->items + the_queue->item_size * i; + // void * p = &the_queue->qbase + the_queue->item_size*the_queue->eoq; + memcpy(p, the_stuff, the_queue->item_size); + + // update the pointer + i++; + if (i == the_queue->capacity) + // fold pointer if necessary + i = 0; + the_queue->eoq = i; + the_queue->count++; + //debug(2,"metadata queue+ \"%s\" %d/%d.", the_queue->name, the_queue->count, the_queue->capacity); + if (the_queue->count == the_queue->capacity) + debug(3, "metadata queue \"%s\": is now full with %d items in it!", the_queue->name, the_queue->count); + rc = pthread_cond_signal(&the_queue->pc_queue_item_added_signal); + if (rc) + debug(1, "metadata queue \"%s\": error signalling after pc_queue_add_item", the_queue->name); + } else { + response = EWOULDBLOCK; // a bit arbitrary, this. + debug(3,"metadata queue \"%s\": is already full with %d items in it. Not adding this item to the queue.", the_queue->name, the_queue->count); } - uint32_t i = the_queue->eoq; - void *p = the_queue->items + the_queue->item_size * i; - // void * p = &the_queue->qbase + the_queue->item_size*the_queue->eoq; - memcpy(p, the_stuff, the_queue->item_size); - - // update the pointer - i++; - if (i == the_queue->capacity) - // fold pointer if necessary - i = 0; - the_queue->eoq = i; - the_queue->count++; - debug(2,"metadata queue+ %d/%d.", the_queue->count, the_queue->capacity); - if (the_queue->count == the_queue->capacity) - debug(1, "pc_queue is full with %d items in it!", the_queue->count); - rc = pthread_cond_signal(&the_queue->pc_queue_item_added_signal); - if (rc) - debug(1, "Error signalling after pc_queue_add_item"); pthread_cleanup_pop(1); // unlock the queue lock. } else { debug(1, "Adding an item to a NULL queue"); } - return 0; + return response; } int pc_queue_get_item(pc_queue *the_queue, void *the_stuff) { @@ -231,12 +256,12 @@ int pc_queue_get_item(pc_queue *the_queue, void *the_stuff) { if (the_queue) { rc = pthread_mutex_lock(&the_queue->pc_queue_lock); if (rc) - debug(1, "Error locking for pc_queue_get_item"); + debug(1, "metadata queue \"%s\": error locking for pc_queue_get_item", the_queue->name); pthread_cleanup_push(pc_queue_cleanup_handler, (void *)the_queue); while (the_queue->count == 0) { rc = pthread_cond_wait(&the_queue->pc_queue_item_added_signal, &the_queue->pc_queue_lock); if (rc) - debug(1, "Error waiting for item to be added"); + debug(1, "metadata queue \"%s\": error waiting for item to be added", the_queue->name); } uint32_t i = the_queue->toq; // void * p = &the_queue->qbase + the_queue->item_size*the_queue->toq; @@ -250,10 +275,10 @@ int pc_queue_get_item(pc_queue *the_queue, void *the_stuff) { i = 0; the_queue->toq = i; the_queue->count--; - debug(2,"metadata queue- %d/%d.", the_queue->count, the_queue->capacity); + debug(3,"metadata queue- \"%s\" %d/%d.", the_queue->name, the_queue->count, the_queue->capacity); rc = pthread_cond_signal(&the_queue->pc_queue_item_removed_signal); if (rc) - debug(1, "Error signalling after pc_queue_removed_item"); + debug(1, "metadata queue \"%s\": error signalling after pc_queue_get_item", the_queue->name); pthread_cleanup_pop(1); // unlock the queue lock. } else { debug(1, "Removing an item from a NULL queue"); @@ -417,6 +442,7 @@ void msg_retain(rtsp_message *msg) { debug(1, "Error %d locking reference counter lock"); if (msg > (rtsp_message *)0x00010000) { msg->referenceCount++; + debug(3,"msg_free increment reference counter message %d to %d.", msg->index_number, msg->referenceCount); // debug(1,"msg_retain -- item %d reference count %d.", msg->index_number, msg->referenceCount); rc = pthread_mutex_unlock(&reference_counter_lock); if (rc) @@ -432,6 +458,7 @@ rtsp_message *msg_init(void) { memset(msg, 0, sizeof(rtsp_message)); msg->referenceCount = 1; // from now on, any access to this must be protected with the lock msg->index_number = msg_indexes++; + debug(3,"msg_init message %d", msg->index_number); } else { die("msg_init -- can not allocate memory for rtsp_message %d.", msg_indexes); } @@ -495,6 +522,8 @@ void msg_free(rtsp_message **msgh) { if (*msgh > (rtsp_message *)0x00010000) { rtsp_message *msg = *msgh; msg->referenceCount--; + if (msg->referenceCount) + debug(3,"msg_free decrement reference counter message %d to %d", msg->index_number, msg->referenceCount); if (msg->referenceCount == 0) { unsigned int i; for (i = 0; i < msg->nheaders; i++) { @@ -509,6 +538,7 @@ void msg_free(rtsp_message **msgh) { index = 0x10000; // ensure it doesn't fold to zero. *msgh = (rtsp_message *)(index); // put a version of the index number of the freed message in here + debug(3,"msg_free freed message %d", msg->index_number); free(msg); } else { // debug(1,"msg_free item %d -- decrement reference to @@ -1072,7 +1102,7 @@ void handle_set_parameter_parameter(rtsp_conn_info *conn, rtsp_message *req, char *progress = cp + strlen("progress: "); // debug(2, "progress: \"%s\"",progress); // rtpstampstart/rtpstampnow/rtpstampend 44100 per // second - send_ssnc_metadata('prgr', strdup(progress), strlen(progress), 1); + send_ssnc_metadata('prgr', progress, strlen(progress), 1); } else #endif @@ -1220,14 +1250,28 @@ char *base64_encode_so(const unsigned char *data, size_t input_length, char *enc static int fd = -1; // static int dirty = 0; pc_queue metadata_queue; + static int metadata_sock = -1; static struct sockaddr_in metadata_sockaddr; static char *metadata_sockmsg; #define metadata_queue_size 500 metadata_package metadata_queue_items[metadata_queue_size]; - pthread_t metadata_thread; +#ifdef CONFIG_METADATA_HUB +pc_queue metadata_hub_queue; +#define metadata_hub_queue_size 500 +metadata_package metadata_hub_queue_items[metadata_hub_queue_size]; +pthread_t metadata_hub_thread; +#endif + +#ifdef CONFIG_MQTT +pc_queue metadata_mqtt_queue; +#define metadata_mqtt_queue_size 500 +metadata_package metadata_mqtt_queue_items[metadata_mqtt_queue_size]; +pthread_t metadata_mqtt_thread; +#endif + void metadata_create_multicast_socket(void) { if (config.metadata_enabled == 0) return; @@ -1424,13 +1468,6 @@ void metadata_process(uint32_t type, uint32_t code, char *data, uint32_t length) } } -void metadata_thread_cleanup_function(__attribute__((unused)) void *arg) { - debug(2, "metadata_thread_cleanup_function called"); - metadata_delete_multicast_socket(); - metadata_close(); - pc_queue_delete(&metadata_queue); -} - void metadata_pack_cleanup_function(void *arg) { // debug(1, "metadata_pack_cleanup_function called"); metadata_package *pack = (metadata_package *)arg; @@ -1440,55 +1477,141 @@ void metadata_pack_cleanup_function(void *arg) { free(pack->data); } +void metadata_thread_cleanup_function(__attribute__((unused)) void *arg) { + debug(2, "metadata_thread_cleanup_function called"); + metadata_delete_multicast_socket(); + metadata_close(); + pc_queue_delete(&metadata_queue); +} + void *metadata_thread_function(__attribute__((unused)) void *ignore) { // create a pc_queue for passing information to a threaded metadata handler pc_queue_init(&metadata_queue, (char *)&metadata_queue_items, sizeof(metadata_package), - metadata_queue_size); + metadata_queue_size, "pipe"); metadata_create_multicast_socket(); metadata_package pack; pthread_cleanup_push(metadata_thread_cleanup_function, NULL); while (1) { pc_queue_get_item(&metadata_queue, &pack); - // debug(1,"pc_queue get item."); pthread_cleanup_push(metadata_pack_cleanup_function, (void *)&pack); if (config.metadata_enabled) { - // debug(1, "metadata_process type %x, code %x and length %u.", pack.type, pack.code, pack.length); - // metadata_process(pack.type, pack.code, pack.data, pack.length); + if (pack.carrier) { + debug(3, " pipe: type %x, code %x, length %u, message %d.", pack.type, pack.code, pack.length, pack.carrier->index_number); + } else { + debug(3, " pipe: type %x, code %x, length %u.", pack.type, pack.code, pack.length); + } + metadata_process(pack.type, pack.code, pack.data, pack.length); + debug(3, " pipe: done."); + } + pthread_cleanup_pop(1); + } + pthread_cleanup_pop(1); // will never happen + pthread_exit(NULL); +} + #ifdef CONFIG_METADATA_HUB - debug(1, "metadata_hub_process."); - metadata_hub_process_metadata(pack.type, pack.code, pack.data, pack.length); +void metadata_hub_close(void) { +} + +void metadata_hub_thread_cleanup_function(__attribute__((unused)) void *arg) { + debug(2, "metadata_hub_thread_cleanup_function called"); + metadata_hub_close(); + pc_queue_delete(&metadata_hub_queue); +} + +void *metadata_hub_thread_function(__attribute__((unused)) void *ignore) { + // create a pc_queue for passing information to a threaded metadata handler + pc_queue_init(&metadata_hub_queue, (char *)&metadata_hub_queue_items, sizeof(metadata_package), + metadata_hub_queue_size, "hub"); + metadata_package pack; + pthread_cleanup_push(metadata_hub_thread_cleanup_function, NULL); + while (1) { + pc_queue_get_item(&metadata_hub_queue, &pack); + pthread_cleanup_push(metadata_pack_cleanup_function, (void *)&pack); + if (pack.carrier) { + debug(3, " hub: type %x, code %x, length %u, message %d.", pack.type, pack.code, pack.length, pack.carrier->index_number); + } else { + debug(3, " hub: type %x, code %x, length %u.", pack.type, pack.code, pack.length); + } + metadata_hub_process_metadata(pack.type, pack.code, pack.data, pack.length); + debug(3, " hub: done."); + pthread_cleanup_pop(1); + } + pthread_cleanup_pop(1); // will never happen + pthread_exit(NULL); +} #endif #ifdef CONFIG_MQTT - if (config.mqtt_enabled) { - debug(1, "mqtt_process_metadata."); - mqtt_process_metadata(pack.type, pack.code, pack.data, pack.length); - } -#endif - debug(1, "done."); - } +void metadata_mqtt_close(void) { +} + +void metadata_mqtt_thread_cleanup_function(__attribute__((unused)) void *arg) { + debug(2, "metadata_mqtt_thread_cleanup_function called"); + metadata_mqtt_close(); + pc_queue_delete(&metadata_hub_queue); +} + +void *metadata_mqtt_thread_function(__attribute__((unused)) void *ignore) { + // create a pc_queue for passing information to a threaded metadata handler + pc_queue_init(&metadata_mqtt_queue, (char *)&metadata_mqtt_queue_items, sizeof(metadata_package), + metadata_mqtt_queue_size, "mqtt"); + metadata_package pack; + pthread_cleanup_push(metadata_mqtt_thread_cleanup_function, NULL); + while (1) { + pc_queue_get_item(&metadata_mqtt_queue, &pack); + pthread_cleanup_push(metadata_pack_cleanup_function, (void *)&pack); + if (config.mqtt_enabled) { + if (pack.carrier) { + debug(3, " mqtt: type %x, code %x, length %u, message %d.", pack.type, pack.code, pack.length, pack.carrier->index_number); + } else { + debug(3, " mqtt: type %x, code %x, length %u.", pack.type, pack.code, pack.length); + } + mqtt_process_metadata(pack.type, pack.code, pack.data, pack.length); + debug(3, " mqtt: done."); + } + pthread_cleanup_pop(1); } pthread_cleanup_pop(1); // will never happen pthread_exit(NULL); } +#endif void metadata_init(void) { int ret = pthread_create(&metadata_thread, NULL, metadata_thread_function, NULL); if (ret) debug(1, "Failed to create metadata thread!"); +#ifdef CONFIG_METADATA_HUB + ret = pthread_create(&metadata_hub_thread, NULL, metadata_hub_thread_function, NULL); + if (ret) + debug(1, "Failed to create metadata hub thread!"); +#endif +#ifdef CONFIG_MQTT + ret = pthread_create(&metadata_mqtt_thread, NULL, metadata_mqtt_thread_function, NULL); + if (ret) + debug(1, "Failed to create metadata mqtt thread!"); +#endif metadata_running = 1; } void metadata_stop(void) { if (metadata_running) { debug(2, "metadata_stop called."); +#ifdef CONFIG_MQTT + pthread_join(metadata_mqtt_thread, NULL); + pthread_cancel(metadata_mqtt_thread); +#endif +#ifdef CONFIG_METADATA_HUB + pthread_join(metadata_hub_thread, NULL); + pthread_cancel(metadata_hub_thread); +#endif pthread_cancel(metadata_thread); pthread_join(metadata_thread, NULL); } } -int send_metadata(uint32_t type, uint32_t code, char *data, uint32_t length, rtsp_message *carrier, +int send_metadata_to_queue(pc_queue* queue, uint32_t type, uint32_t code, char *data, uint32_t length, rtsp_message *carrier, int block) { // parameters: type, code, pointer to data or NULL, length of data or NULL, @@ -1505,37 +1628,64 @@ int send_metadata(uint32_t type, uint32_t code, char *data, uint32_t length, rts // The reading of the parameters is a bit complex // If the rtsp_message field is non-null, then it represents an rtsp_message - // which should be freed - // in the thread handler when the parameter pointed to by the pointer and - // specified by the length - // is finished with - // If the rtsp_message is NULL, then if the pointer is non-null, it points to - // a malloc'ed block - // and should be freed when the thread is finished with it. The length of the - // data in the block is - // given in length + // and the data pointer is assumed to point to something within it. + // The reference counter of the rtsp_message is incremented here and + // should be decremented by the metadata handler when finished. + // If the reference count reduces to zero, the message will be freed. + + // If the rtsp_message is NULL, then if the pointer is non-null then the data it + // points to, of the length specified, is memcpy'd and passed to the metadata + // handler. The handler should free it when done. // If the rtsp_message is NULL and the pointer is also NULL, nothing further // is done. metadata_package pack; pack.type = type; pack.code = code; - pack.data = data; pack.length = length; pack.carrier = carrier; - if (pack.carrier) + pack.data = data; + if (pack.carrier) { msg_retain(pack.carrier); - int rc = pc_queue_add_item(&metadata_queue, &pack, block); - if (rc == EBUSY) { - if (pack.carrier) + } else { + if (data) + pack.data = memdup(data,length); // only if it's not a null + } + int rc = pc_queue_add_item(queue, &pack, block); + if (rc != 0) { + if (pack.carrier) { + if (rc == EWOULDBLOCK) + debug(2, "metadata queue \"%s\" full, dropping message item: type %x, code %x, data %x, length %u, message %d.", queue->name, pack.type, pack.code, pack.data, pack.length, pack.carrier->index_number); msg_free(&pack.carrier); - else if (data) - free(data); - warn("Metadata queue is busy, discarding message of type 0x%08X, code 0x%08X.", type, code); + } else { + if (rc == EWOULDBLOCK) + debug(2, "metadata queue \"%s\" full, dropping data item: type %x, code %x, data %x, length %u.", queue->name, pack.type, pack.code, pack.data, pack.length); + if (pack.data) + free(pack.data); + } } return rc; } +int send_metadata(uint32_t type, uint32_t code, char *data, uint32_t length, rtsp_message *carrier, + int block) { + int rc; + rc = send_metadata_to_queue(&metadata_queue, type, code, data, length, carrier, block); + +#ifdef CONFIG_METADATA_HUB + rc = send_metadata_to_queue(&metadata_hub_queue, type, code, data, length, carrier, block); +#endif + +#ifdef CONFIG_MQTT + rc = send_metadata_to_queue(&metadata_mqtt_queue, type, code, data, length, carrier, block); +#endif + + return rc; +} + + + + static void handle_set_parameter_metadata(__attribute__((unused)) rtsp_conn_info *conn, rtsp_message *req, __attribute__((unused)) rtsp_message *resp) {