}
void metadata_hub_unlock_hub_mutex_cleanup(__attribute__((unused)) void *arg) {
- // debug(1, "metadata_hub_unlock_hub_mutex_cleanup called.");
- pthread_rwlock_unlock(&metadata_hub_re_lock);
+ debug(1, "metadata_hub_unlock_hub_mutex_cleanup called.");
+ metadata_hub_modify_epilog(0);
}
char *last_metadata_hub_modify_prolog_file = NULL;
int last_metadata_hub_modify_prolog_line;
+int metadata_hub_re_lock_access_is_delayed;
void _metadata_hub_modify_prolog(const char *filename, const int linenumber) {
// always run this before changing an entry or a sequence of entries in the metadata_hub
debug(1, "Metadata_hub write lock at \"%s:%d\" is already taken at \"%s:%d\" -- must wait.", filename, linenumber, last_metadata_hub_modify_prolog_file, last_metadata_hub_modify_prolog_line);
else
debug(1, "Metadata_hub write lock is already taken by unknown -- must wait.");
+ metadata_hub_re_lock_access_is_delayed = 0;
pthread_rwlock_wrlock(&metadata_hub_re_lock);
- debug(1, "Okay -- acquired the metadata_hub write lock.");
+ debug(1, "Okay -- acquired the metadata_hub write lock at \"%s:%d\".", filename, linenumber);
} else {
if (last_metadata_hub_modify_prolog_file) {
free(last_metadata_hub_modify_prolog_file);
last_metadata_hub_modify_prolog_line = linenumber;
debug(3, "Metadata_hub write lock acquired.");
}
+ metadata_hub_re_lock_access_is_delayed = 0;
}
void _metadata_hub_modify_epilog(int modified, const char *filename, const int linenumber) {
if (modified) {
run_metadata_watchers();
}
- if (last_metadata_hub_modify_prolog_file) {
- free(last_metadata_hub_modify_prolog_file);
- last_metadata_hub_modify_prolog_file = NULL;
+ if (metadata_hub_re_lock_access_is_delayed) {
+ if (last_metadata_hub_modify_prolog_file) {
+ debug(1, "Metadata_hub write lock taken at \"%s:%d\" is freed at \"%s:%d\".", last_metadata_hub_modify_prolog_file, last_metadata_hub_modify_prolog_line, filename, linenumber);
+ free(last_metadata_hub_modify_prolog_file);
+ last_metadata_hub_modify_prolog_file = NULL;
+ } else {
+ debug(1, "Metadata_hub write lock taken at an unknown place is freed at \"%s:%d\".", filename, linenumber);
+ }
}
pthread_rwlock_unlock(&metadata_hub_re_lock);
debug(3, "Metadata_hub write lock unlocked.");
// all the following items of metadata are contained in one metadata packet
// they are preceded by an 'ssnc' 'mdst' item and followed by an 'ssnc 'mden' item.
- debug(1, "metadata_hub_process_metadata type %x, code %x and length %u.", type, code, length);
+ // we don't set "changed" for them individually; instead we set it when the 'mden' token
+ // comes in if the metadata_packet_item_changed is set.
- char *cs;
int changed = 0;
+ int metadata_packet_item_changed;
+ metadata_hub_modify_prolog();
+ pthread_cleanup_push(metadata_hub_unlock_hub_mutex_cleanup, NULL);
+
+ char *cs;
if (type == 'core') {
switch (code) {
case 'mper': {
metadata_store.item_id_changed = 1;
metadata_store.item_id_received = 1;
debug(2, "MH Item ID set to: \"%" PRIx64 "\"", metadata_store.item_id);
+ metadata_packet_item_changed = 1;
}
} break;
case 'astm': {
metadata_store.songtime_in_milliseconds = ui;
metadata_store.songtime_in_milliseconds_changed = 1;
debug(2, "MH Song Time set to: \"%u\"", metadata_store.songtime_in_milliseconds);
+ metadata_packet_item_changed = 1;
}
} break;
case 'asal':
cs = strndup(data, length);
if (string_update(&metadata_store.album_name, &metadata_store.album_name_changed, cs)) {
debug(2, "MH Album name set to: \"%s\"", metadata_store.album_name);
+ metadata_packet_item_changed = 1;
}
free(cs);
break;
cs = strndup(data, length);
if (string_update(&metadata_store.artist_name, &metadata_store.artist_name_changed, cs)) {
debug(2, "MH Artist name set to: \"%s\"", metadata_store.artist_name);
+ metadata_packet_item_changed = 1;
}
free(cs);
break;
if (string_update(&metadata_store.album_artist_name,
&metadata_store.album_artist_name_changed, cs)) {
debug(2, "MH Album Artist name set to: \"%s\"", metadata_store.album_artist_name);
+ metadata_packet_item_changed = 1;
}
free(cs);
break;
cs = strndup(data, length);
if (string_update(&metadata_store.comment, &metadata_store.comment_changed, cs)) {
debug(2, "MH Comment set to: \"%s\"", metadata_store.comment);
+ metadata_packet_item_changed = 1;
}
free(cs);
break;
cs = strndup(data, length);
if (string_update(&metadata_store.genre, &metadata_store.genre_changed, cs)) {
debug(2, "MH Genre set to: \"%s\"", metadata_store.genre);
+ metadata_packet_item_changed = 1;
}
free(cs);
break;
cs = strndup(data, length);
if (string_update(&metadata_store.track_name, &metadata_store.track_name_changed, cs)) {
debug(2, "MH Track Name set to: \"%s\"", metadata_store.track_name);
+ metadata_packet_item_changed = 1;
}
free(cs);
break;
cs = strndup(data, length);
if (string_update(&metadata_store.composer, &metadata_store.composer_changed, cs)) {
debug(2, "MH Composer set to: \"%s\"", metadata_store.composer);
+ metadata_packet_item_changed = 1;
}
free(cs);
break;
if (string_update(&metadata_store.song_album_artist,
&metadata_store.song_album_artist_changed, cs)) {
debug(2, "MH Song Album Artist set to: \"%s\"", metadata_store.song_album_artist);
+ metadata_packet_item_changed = 1;
}
free(cs);
break;
cs = strndup(data, length);
if (string_update(&metadata_store.sort_name, &metadata_store.sort_name_changed, cs)) {
debug(2, "MH Sort Name set to: \"%s\"", metadata_store.sort_name);
+ metadata_packet_item_changed = 1;
}
free(cs);
break;
cs = strndup(data, length);
if (string_update(&metadata_store.sort_artist, &metadata_store.sort_artist_changed, cs)) {
debug(2, "MH Sort Artist set to: \"%s\"", metadata_store.sort_artist);
+ metadata_packet_item_changed = 1;
}
free(cs);
break;
cs = strndup(data, length);
if (string_update(&metadata_store.sort_album, &metadata_store.sort_album_changed, cs)) {
debug(2, "MH Sort Album set to: \"%s\"", metadata_store.sort_album);
+ metadata_packet_item_changed = 1;
}
free(cs);
break;
cs = strndup(data, length);
if (string_update(&metadata_store.sort_composer, &metadata_store.sort_composer_changed, cs)) {
debug(2, "MH Sort Composer set to: \"%s\"", metadata_store.sort_composer);
+ metadata_packet_item_changed = 1;
}
free(cs);
default:
break;
case 'mdst':
debug(2, "MH Metadata stream processing start.");
- metadata_hub_modify_prolog();
+ metadata_packet_item_changed = 0;
break;
case 'mden':
debug(2, "MH Metadata stream processing end.");
- metadata_hub_modify_epilog(1);
- debug(2, "MH Metadata stream processing epilog complete.");
+ changed = metadata_packet_item_changed;
break;
case 'PICT':
- metadata_hub_modify_prolog();
- pthread_cleanup_push(metadata_hub_unlock_hub_mutex_cleanup, NULL);
debug(2, "MH Picture received, length %u bytes.", length);
+
char uri[2048];
if ((length > 16) && (strcmp(config.cover_art_cache_dir,"")!=0)) { // if it's okay to write the file
+ // make this uncancellable
+ int oldState;
+ pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, &oldState); // make this un-cancellable
char *pathname = metadata_write_image_file(data, length);
snprintf(uri, sizeof(uri), "file://%s", pathname);
free(pathname);
+ pthread_setcancelstate(oldState, NULL);
+
} else {
uri[0] = '\0';
}
if (string_update(&metadata_store.cover_art_pathname,
&metadata_store.cover_art_pathname_changed,
uri)) // if the picture's file path is different from the stored one...
- metadata_hub_modify_epilog(1);
+ changed = 1;
else
- metadata_hub_modify_epilog(0);
- pthread_cleanup_pop(0); // don't remove the lock -- it'll have been done
+ changed = 0;
+// pthread_cleanup_pop(0); // don't remove the lock -- it'll have been done
break;
case 'clip':
- metadata_hub_modify_prolog();
cs = strndup(data, length);
if (string_update(&metadata_store.client_ip, &metadata_store.client_ip_changed, cs)) {
changed = 1;
debug(2, "MH Client IP set to: \"%s\"", metadata_store.client_ip);
}
free(cs);
- metadata_hub_modify_epilog(changed);
break;
case 'prgr':
- metadata_hub_modify_prolog();
cs = strndup(data, length);
if (string_update(&metadata_store.progress_string, &metadata_store.progress_string_changed,
cs)) {
debug(2, "MH Progress String set to: \"%s\"", metadata_store.progress_string);
}
free(cs);
- metadata_hub_modify_epilog(changed);
break;
case 'svip':
- metadata_hub_modify_prolog();
cs = strndup(data, length);
if (string_update(&metadata_store.server_ip, &metadata_store.server_ip_changed, cs)) {
changed = 1;
debug(2, "MH Server IP set to: \"%s\"", metadata_store.server_ip);
}
free(cs);
- metadata_hub_modify_epilog(changed);
break;
case 'abeg':
- metadata_hub_modify_prolog();
changed = (metadata_store.active_state != AM_ACTIVE);
metadata_store.active_state = AM_ACTIVE;
- metadata_hub_modify_epilog(changed);
break;
case 'aend':
- metadata_hub_modify_prolog();
changed = (metadata_store.active_state != AM_INACTIVE);
metadata_store.active_state = AM_INACTIVE;
- metadata_hub_modify_epilog(changed);
break;
case 'pbeg':
- metadata_hub_modify_prolog();
changed = ((metadata_store.player_state != PS_PLAYING) ||
(metadata_store.player_thread_active == 0));
metadata_store.player_state = PS_PLAYING;
metadata_store.player_thread_active = 1;
- metadata_hub_modify_epilog(changed);
break;
case 'pend':
- metadata_hub_modify_prolog();
changed = ((metadata_store.player_state != PS_STOPPED) ||
(metadata_store.player_thread_active == 1));
metadata_store.player_state = PS_STOPPED;
metadata_store.player_thread_active = 0;
- metadata_hub_modify_epilog(changed);
break;
case 'pfls':
- metadata_hub_modify_prolog();
changed = (metadata_store.player_state != PS_PAUSED);
metadata_store.player_state = PS_PAUSED;
- metadata_hub_modify_epilog(changed);
break;
case 'pffr': // this is sent when the first frame has been received
case 'prsm':
- metadata_hub_modify_prolog();
changed = (metadata_store.player_state != PS_PLAYING);
metadata_store.player_state = PS_PLAYING;
- metadata_hub_modify_epilog(changed);
break;
case 'pvol': {
- metadata_hub_modify_prolog();
// Note: it's assumed that the config.airplay volume has already been correctly set.
// int32_t actual_volume;
// int gv = dacp_get_volume(&actual_volume);
metadata_store.airplay_volume = config.airplay_volume;
changed = 1;
}
- metadata_hub_modify_epilog(changed); // change
} break;
default: {
}
}
}
+ pthread_cleanup_pop(0); // don't remove the lock
+ metadata_hub_modify_epilog(changed);
}
pthread_mutex_t pc_queue_lock;
pthread_cond_t pc_queue_item_added_signal;
pthread_cond_t pc_queue_item_removed_signal;
- size_t item_size; // number of bytes in each item
+ char *name;
+ size_t item_size; // number of bytes in each item
uint32_t count; // number of items in the queue
uint32_t capacity; // maximum number of items
uint32_t toq; // first item to take
rtsp_message *carrier;
} metadata_package;
-void pc_queue_init(pc_queue *the_queue, char *items, size_t item_size, uint32_t number_of_items) {
+void pc_queue_init(pc_queue *the_queue, char *items, size_t item_size, uint32_t number_of_items, const char* name) {
+ if (name)
+ debug(1, "Creating metadata queue \"%s\".", name);
+ else
+ debug(1, "Creating an unnamed metadata queue.");
pthread_mutex_init(&the_queue->pc_queue_lock, NULL);
pthread_cond_init(&the_queue->pc_queue_item_added_signal, NULL);
pthread_cond_init(&the_queue->pc_queue_item_removed_signal, NULL);
the_queue->capacity = number_of_items;
the_queue->toq = 0;
the_queue->eoq = 0;
+ if (name == NULL)
+ the_queue->name = NULL;
+ else
+ the_queue->name = strdup(name);
}
void pc_queue_delete(pc_queue *the_queue) {
+ if (the_queue->name)
+ debug(1, "Deleting metadata queue \"%s\".", the_queue->name);
+ else
+ debug(1, "Deleting an unnamed metadata queue.");
+ if (the_queue->name != NULL)
+ free(the_queue->name);
pthread_cond_destroy(&the_queue->pc_queue_item_removed_signal);
pthread_cond_destroy(&the_queue->pc_queue_item_added_signal);
pthread_mutex_destroy(&the_queue->pc_queue_lock);
}
int pc_queue_add_item(pc_queue *the_queue, const void *the_stuff, int block) {
+ int response = 0;
int rc;
if (the_queue) {
if (block == 0) {
if (rc)
debug(1, "Error locking for pc_queue_add_item");
pthread_cleanup_push(pc_queue_cleanup_handler, (void *)the_queue);
- while (the_queue->count == the_queue->capacity) {
- rc = pthread_cond_wait(&the_queue->pc_queue_item_removed_signal, &the_queue->pc_queue_lock);
- if (rc)
- debug(1, "Error waiting for item to be removed");
+ // leave this out if you want this to return if the queue is already full
+ // irrespective of the block flag.
+ /*
+ while (the_queue->count == the_queue->capacity) {
+ rc = pthread_cond_wait(&the_queue->pc_queue_item_removed_signal, &the_queue->pc_queue_lock);
+ if (rc)
+ debug(1, "Error waiting for item to be removed");
+ }
+ */
+ if (the_queue->count < the_queue->capacity) {
+ uint32_t i = the_queue->eoq;
+ void *p = the_queue->items + the_queue->item_size * i;
+ // void * p = &the_queue->qbase + the_queue->item_size*the_queue->eoq;
+ memcpy(p, the_stuff, the_queue->item_size);
+
+ // update the pointer
+ i++;
+ if (i == the_queue->capacity)
+ // fold pointer if necessary
+ i = 0;
+ the_queue->eoq = i;
+ the_queue->count++;
+ //debug(2,"metadata queue+ \"%s\" %d/%d.", the_queue->name, the_queue->count, the_queue->capacity);
+ if (the_queue->count == the_queue->capacity)
+ debug(3, "metadata queue \"%s\": is now full with %d items in it!", the_queue->name, the_queue->count);
+ rc = pthread_cond_signal(&the_queue->pc_queue_item_added_signal);
+ if (rc)
+ debug(1, "metadata queue \"%s\": error signalling after pc_queue_add_item", the_queue->name);
+ } else {
+ response = EWOULDBLOCK; // a bit arbitrary, this.
+ debug(3,"metadata queue \"%s\": is already full with %d items in it. Not adding this item to the queue.", the_queue->name, the_queue->count);
}
- uint32_t i = the_queue->eoq;
- void *p = the_queue->items + the_queue->item_size * i;
- // void * p = &the_queue->qbase + the_queue->item_size*the_queue->eoq;
- memcpy(p, the_stuff, the_queue->item_size);
-
- // update the pointer
- i++;
- if (i == the_queue->capacity)
- // fold pointer if necessary
- i = 0;
- the_queue->eoq = i;
- the_queue->count++;
- debug(2,"metadata queue+ %d/%d.", the_queue->count, the_queue->capacity);
- if (the_queue->count == the_queue->capacity)
- debug(1, "pc_queue is full with %d items in it!", the_queue->count);
- rc = pthread_cond_signal(&the_queue->pc_queue_item_added_signal);
- if (rc)
- debug(1, "Error signalling after pc_queue_add_item");
pthread_cleanup_pop(1); // unlock the queue lock.
} else {
debug(1, "Adding an item to a NULL queue");
}
- return 0;
+ return response;
}
int pc_queue_get_item(pc_queue *the_queue, void *the_stuff) {
if (the_queue) {
rc = pthread_mutex_lock(&the_queue->pc_queue_lock);
if (rc)
- debug(1, "Error locking for pc_queue_get_item");
+ debug(1, "metadata queue \"%s\": error locking for pc_queue_get_item", the_queue->name);
pthread_cleanup_push(pc_queue_cleanup_handler, (void *)the_queue);
while (the_queue->count == 0) {
rc = pthread_cond_wait(&the_queue->pc_queue_item_added_signal, &the_queue->pc_queue_lock);
if (rc)
- debug(1, "Error waiting for item to be added");
+ debug(1, "metadata queue \"%s\": error waiting for item to be added", the_queue->name);
}
uint32_t i = the_queue->toq;
// void * p = &the_queue->qbase + the_queue->item_size*the_queue->toq;
i = 0;
the_queue->toq = i;
the_queue->count--;
- debug(2,"metadata queue- %d/%d.", the_queue->count, the_queue->capacity);
+ debug(3,"metadata queue- \"%s\" %d/%d.", the_queue->name, the_queue->count, the_queue->capacity);
rc = pthread_cond_signal(&the_queue->pc_queue_item_removed_signal);
if (rc)
- debug(1, "Error signalling after pc_queue_removed_item");
+ debug(1, "metadata queue \"%s\": error signalling after pc_queue_get_item", the_queue->name);
pthread_cleanup_pop(1); // unlock the queue lock.
} else {
debug(1, "Removing an item from a NULL queue");
debug(1, "Error %d locking reference counter lock");
if (msg > (rtsp_message *)0x00010000) {
msg->referenceCount++;
+ debug(3,"msg_free increment reference counter message %d to %d.", msg->index_number, msg->referenceCount);
// debug(1,"msg_retain -- item %d reference count %d.", msg->index_number, msg->referenceCount);
rc = pthread_mutex_unlock(&reference_counter_lock);
if (rc)
memset(msg, 0, sizeof(rtsp_message));
msg->referenceCount = 1; // from now on, any access to this must be protected with the lock
msg->index_number = msg_indexes++;
+ debug(3,"msg_init message %d", msg->index_number);
} else {
die("msg_init -- can not allocate memory for rtsp_message %d.", msg_indexes);
}
if (*msgh > (rtsp_message *)0x00010000) {
rtsp_message *msg = *msgh;
msg->referenceCount--;
+ if (msg->referenceCount)
+ debug(3,"msg_free decrement reference counter message %d to %d", msg->index_number, msg->referenceCount);
if (msg->referenceCount == 0) {
unsigned int i;
for (i = 0; i < msg->nheaders; i++) {
index = 0x10000; // ensure it doesn't fold to zero.
*msgh =
(rtsp_message *)(index); // put a version of the index number of the freed message in here
+ debug(3,"msg_free freed message %d", msg->index_number);
free(msg);
} else {
// debug(1,"msg_free item %d -- decrement reference to
char *progress = cp + strlen("progress: ");
// debug(2, "progress: \"%s\"",progress); // rtpstampstart/rtpstampnow/rtpstampend 44100 per
// second
- send_ssnc_metadata('prgr', strdup(progress), strlen(progress), 1);
+ send_ssnc_metadata('prgr', progress, strlen(progress), 1);
} else
#endif
static int fd = -1;
// static int dirty = 0;
pc_queue metadata_queue;
+
static int metadata_sock = -1;
static struct sockaddr_in metadata_sockaddr;
static char *metadata_sockmsg;
#define metadata_queue_size 500
metadata_package metadata_queue_items[metadata_queue_size];
-
pthread_t metadata_thread;
+#ifdef CONFIG_METADATA_HUB
+pc_queue metadata_hub_queue;
+#define metadata_hub_queue_size 500
+metadata_package metadata_hub_queue_items[metadata_hub_queue_size];
+pthread_t metadata_hub_thread;
+#endif
+
+#ifdef CONFIG_MQTT
+pc_queue metadata_mqtt_queue;
+#define metadata_mqtt_queue_size 500
+metadata_package metadata_mqtt_queue_items[metadata_mqtt_queue_size];
+pthread_t metadata_mqtt_thread;
+#endif
+
void metadata_create_multicast_socket(void) {
if (config.metadata_enabled == 0)
return;
}
}
-void metadata_thread_cleanup_function(__attribute__((unused)) void *arg) {
- debug(2, "metadata_thread_cleanup_function called");
- metadata_delete_multicast_socket();
- metadata_close();
- pc_queue_delete(&metadata_queue);
-}
-
void metadata_pack_cleanup_function(void *arg) {
// debug(1, "metadata_pack_cleanup_function called");
metadata_package *pack = (metadata_package *)arg;
free(pack->data);
}
+void metadata_thread_cleanup_function(__attribute__((unused)) void *arg) {
+ debug(2, "metadata_thread_cleanup_function called");
+ metadata_delete_multicast_socket();
+ metadata_close();
+ pc_queue_delete(&metadata_queue);
+}
+
void *metadata_thread_function(__attribute__((unused)) void *ignore) {
// create a pc_queue for passing information to a threaded metadata handler
pc_queue_init(&metadata_queue, (char *)&metadata_queue_items, sizeof(metadata_package),
- metadata_queue_size);
+ metadata_queue_size, "pipe");
metadata_create_multicast_socket();
metadata_package pack;
pthread_cleanup_push(metadata_thread_cleanup_function, NULL);
while (1) {
pc_queue_get_item(&metadata_queue, &pack);
- // debug(1,"pc_queue get item.");
pthread_cleanup_push(metadata_pack_cleanup_function, (void *)&pack);
if (config.metadata_enabled) {
- // debug(1, "metadata_process type %x, code %x and length %u.", pack.type, pack.code, pack.length);
- // metadata_process(pack.type, pack.code, pack.data, pack.length);
+ if (pack.carrier) {
+ debug(3, " pipe: type %x, code %x, length %u, message %d.", pack.type, pack.code, pack.length, pack.carrier->index_number);
+ } else {
+ debug(3, " pipe: type %x, code %x, length %u.", pack.type, pack.code, pack.length);
+ }
+ metadata_process(pack.type, pack.code, pack.data, pack.length);
+ debug(3, " pipe: done.");
+ }
+ pthread_cleanup_pop(1);
+ }
+ pthread_cleanup_pop(1); // will never happen
+ pthread_exit(NULL);
+}
+
#ifdef CONFIG_METADATA_HUB
- debug(1, "metadata_hub_process.");
- metadata_hub_process_metadata(pack.type, pack.code, pack.data, pack.length);
+void metadata_hub_close(void) {
+}
+
+void metadata_hub_thread_cleanup_function(__attribute__((unused)) void *arg) {
+ debug(2, "metadata_hub_thread_cleanup_function called");
+ metadata_hub_close();
+ pc_queue_delete(&metadata_hub_queue);
+}
+
+void *metadata_hub_thread_function(__attribute__((unused)) void *ignore) {
+ // create a pc_queue for passing information to a threaded metadata handler
+ pc_queue_init(&metadata_hub_queue, (char *)&metadata_hub_queue_items, sizeof(metadata_package),
+ metadata_hub_queue_size, "hub");
+ metadata_package pack;
+ pthread_cleanup_push(metadata_hub_thread_cleanup_function, NULL);
+ while (1) {
+ pc_queue_get_item(&metadata_hub_queue, &pack);
+ pthread_cleanup_push(metadata_pack_cleanup_function, (void *)&pack);
+ if (pack.carrier) {
+ debug(3, " hub: type %x, code %x, length %u, message %d.", pack.type, pack.code, pack.length, pack.carrier->index_number);
+ } else {
+ debug(3, " hub: type %x, code %x, length %u.", pack.type, pack.code, pack.length);
+ }
+ metadata_hub_process_metadata(pack.type, pack.code, pack.data, pack.length);
+ debug(3, " hub: done.");
+ pthread_cleanup_pop(1);
+ }
+ pthread_cleanup_pop(1); // will never happen
+ pthread_exit(NULL);
+}
#endif
#ifdef CONFIG_MQTT
- if (config.mqtt_enabled) {
- debug(1, "mqtt_process_metadata.");
- mqtt_process_metadata(pack.type, pack.code, pack.data, pack.length);
- }
-#endif
- debug(1, "done.");
- }
+void metadata_mqtt_close(void) {
+}
+
+void metadata_mqtt_thread_cleanup_function(__attribute__((unused)) void *arg) {
+ debug(2, "metadata_mqtt_thread_cleanup_function called");
+ metadata_mqtt_close();
+ pc_queue_delete(&metadata_hub_queue);
+}
+
+void *metadata_mqtt_thread_function(__attribute__((unused)) void *ignore) {
+ // create a pc_queue for passing information to a threaded metadata handler
+ pc_queue_init(&metadata_mqtt_queue, (char *)&metadata_mqtt_queue_items, sizeof(metadata_package),
+ metadata_mqtt_queue_size, "mqtt");
+ metadata_package pack;
+ pthread_cleanup_push(metadata_mqtt_thread_cleanup_function, NULL);
+ while (1) {
+ pc_queue_get_item(&metadata_mqtt_queue, &pack);
+ pthread_cleanup_push(metadata_pack_cleanup_function, (void *)&pack);
+ if (config.mqtt_enabled) {
+ if (pack.carrier) {
+ debug(3, " mqtt: type %x, code %x, length %u, message %d.", pack.type, pack.code, pack.length, pack.carrier->index_number);
+ } else {
+ debug(3, " mqtt: type %x, code %x, length %u.", pack.type, pack.code, pack.length);
+ }
+ mqtt_process_metadata(pack.type, pack.code, pack.data, pack.length);
+ debug(3, " mqtt: done.");
+ }
+
pthread_cleanup_pop(1);
}
pthread_cleanup_pop(1); // will never happen
pthread_exit(NULL);
}
+#endif
void metadata_init(void) {
int ret = pthread_create(&metadata_thread, NULL, metadata_thread_function, NULL);
if (ret)
debug(1, "Failed to create metadata thread!");
+#ifdef CONFIG_METADATA_HUB
+ ret = pthread_create(&metadata_hub_thread, NULL, metadata_hub_thread_function, NULL);
+ if (ret)
+ debug(1, "Failed to create metadata hub thread!");
+#endif
+#ifdef CONFIG_MQTT
+ ret = pthread_create(&metadata_mqtt_thread, NULL, metadata_mqtt_thread_function, NULL);
+ if (ret)
+ debug(1, "Failed to create metadata mqtt thread!");
+#endif
metadata_running = 1;
}
void metadata_stop(void) {
if (metadata_running) {
debug(2, "metadata_stop called.");
+#ifdef CONFIG_MQTT
+ pthread_join(metadata_mqtt_thread, NULL);
+ pthread_cancel(metadata_mqtt_thread);
+#endif
+#ifdef CONFIG_METADATA_HUB
+ pthread_join(metadata_hub_thread, NULL);
+ pthread_cancel(metadata_hub_thread);
+#endif
pthread_cancel(metadata_thread);
pthread_join(metadata_thread, NULL);
}
}
-int send_metadata(uint32_t type, uint32_t code, char *data, uint32_t length, rtsp_message *carrier,
+int send_metadata_to_queue(pc_queue* queue, uint32_t type, uint32_t code, char *data, uint32_t length, rtsp_message *carrier,
int block) {
// parameters: type, code, pointer to data or NULL, length of data or NULL,
// The reading of the parameters is a bit complex
// If the rtsp_message field is non-null, then it represents an rtsp_message
- // which should be freed
- // in the thread handler when the parameter pointed to by the pointer and
- // specified by the length
- // is finished with
- // If the rtsp_message is NULL, then if the pointer is non-null, it points to
- // a malloc'ed block
- // and should be freed when the thread is finished with it. The length of the
- // data in the block is
- // given in length
+ // and the data pointer is assumed to point to something within it.
+ // The reference counter of the rtsp_message is incremented here and
+ // should be decremented by the metadata handler when finished.
+ // If the reference count reduces to zero, the message will be freed.
+
+ // If the rtsp_message is NULL, then if the pointer is non-null then the data it
+ // points to, of the length specified, is memcpy'd and passed to the metadata
+ // handler. The handler should free it when done.
// If the rtsp_message is NULL and the pointer is also NULL, nothing further
// is done.
metadata_package pack;
pack.type = type;
pack.code = code;
- pack.data = data;
pack.length = length;
pack.carrier = carrier;
- if (pack.carrier)
+ pack.data = data;
+ if (pack.carrier) {
msg_retain(pack.carrier);
- int rc = pc_queue_add_item(&metadata_queue, &pack, block);
- if (rc == EBUSY) {
- if (pack.carrier)
+ } else {
+ if (data)
+ pack.data = memdup(data,length); // only if it's not a null
+ }
+ int rc = pc_queue_add_item(queue, &pack, block);
+ if (rc != 0) {
+ if (pack.carrier) {
+ if (rc == EWOULDBLOCK)
+ debug(2, "metadata queue \"%s\" full, dropping message item: type %x, code %x, data %x, length %u, message %d.", queue->name, pack.type, pack.code, pack.data, pack.length, pack.carrier->index_number);
msg_free(&pack.carrier);
- else if (data)
- free(data);
- warn("Metadata queue is busy, discarding message of type 0x%08X, code 0x%08X.", type, code);
+ } else {
+ if (rc == EWOULDBLOCK)
+ debug(2, "metadata queue \"%s\" full, dropping data item: type %x, code %x, data %x, length %u.", queue->name, pack.type, pack.code, pack.data, pack.length);
+ if (pack.data)
+ free(pack.data);
+ }
}
return rc;
}
+int send_metadata(uint32_t type, uint32_t code, char *data, uint32_t length, rtsp_message *carrier,
+ int block) {
+ int rc;
+ rc = send_metadata_to_queue(&metadata_queue, type, code, data, length, carrier, block);
+
+#ifdef CONFIG_METADATA_HUB
+ rc = send_metadata_to_queue(&metadata_hub_queue, type, code, data, length, carrier, block);
+#endif
+
+#ifdef CONFIG_MQTT
+ rc = send_metadata_to_queue(&metadata_mqtt_queue, type, code, data, length, carrier, block);
+#endif
+
+ return rc;
+}
+
+
+
+
static void handle_set_parameter_metadata(__attribute__((unused)) rtsp_conn_info *conn,
rtsp_message *req,
__attribute__((unused)) rtsp_message *resp) {