# Process this file with autoconf to produce a configure script.
AC_PREREQ([2.50])
-AC_INIT([shairport-sync], [3.2d67], [mikebrady@eircom.net])
+AC_INIT([shairport-sync], [3.2.2p0], [mikebrady@eircom.net])
AM_INIT_AUTOMAKE
AC_CONFIG_SRCDIR([shairport.c])
AC_CONFIG_HEADERS([config.h])
shairport_sync_advanced_remote_control_set_available(shairportSyncAdvancedRemoteControlSkeleton,
FALSE);
}
-
+
if (argc->progress_string) {
- // debug(1, "Check progress string");
- th = shairport_sync_remote_control_get_progress_string(
- shairportSyncRemoteControlSkeleton);
- if ((th == NULL) || (strcasecmp(th, argc->progress_string) != 0)) {
- // debug(1, "Progress string should be changed");
- shairport_sync_remote_control_set_progress_string(
- shairportSyncRemoteControlSkeleton, argc->progress_string);
- }
+ // debug(1, "Check progress string");
+ th = shairport_sync_remote_control_get_progress_string(shairportSyncRemoteControlSkeleton);
+ if ((th == NULL) || (strcasecmp(th, argc->progress_string) != 0)) {
+ // debug(1, "Progress string should be changed");
+ shairport_sync_remote_control_set_progress_string(shairportSyncRemoteControlSkeleton,
+ argc->progress_string);
+ }
}
switch (argc->player_state) {
if ((argc->track_metadata) && (argc->track_metadata->item_id)) {
char trackidstring[128];
// debug(1, "Set ID using mper ID: \"%u\".",argc->item_id);
- snprintf(trackidstring, sizeof(trackidstring), "/org/gnome/ShairportSync/mper_%u", argc->track_metadata->item_id);
+ snprintf(trackidstring, sizeof(trackidstring), "/org/gnome/ShairportSync/mper_%u",
+ argc->track_metadata->item_id);
GVariant *trackid = g_variant_new("o", trackidstring);
g_variant_builder_add(dict_builder, "{sv}", "mpris:trackid", trackid);
}
/* Called whenever a service has been resolved successfully or timed out */
switch (event) {
- case AVAHI_RESOLVER_FAILURE:
- debug(2, "(Resolver) Failed to resolve service '%s' of type '%s' in domain '%s': %s.", name,
- type, domain, avahi_strerror(avahi_client_errno(avahi_service_resolver_get_client(r))));
- break;
- case AVAHI_RESOLVER_FOUND: {
- // char a[AVAHI_ADDRESS_STR_MAX], *t;
- // debug(1, "Resolve callback: Service '%s' of type '%s' in domain '%s':", name, type, domain);
- char *dacpid = strstr(name, "iTunes_Ctrl_");
- if (dacpid) {
- dacpid += strlen("iTunes_Ctrl_");
- if (strcmp(dacpid, dbs->dacp_id) == 0) {
- debug(3, "Client's DACP port: %u.", port);
- #ifdef HAVE_DACP_CLIENT
- dacp_monitor_port_update_callback(dacpid, port);
- #endif
- #ifdef CONFIG_METADATA
- char portstring[20];
- memset(portstring, 0, sizeof(portstring));
- snprintf(portstring, sizeof(portstring), "%u", port);
- send_ssnc_metadata('dapo', strdup(portstring), strlen(portstring), 0);
- #endif
- }
- } else {
- debug(1, "Resolve callback: Can't see a DACP string in a DACP Record!");
+ case AVAHI_RESOLVER_FAILURE:
+ debug(2, "(Resolver) Failed to resolve service '%s' of type '%s' in domain '%s': %s.", name,
+ type, domain, avahi_strerror(avahi_client_errno(avahi_service_resolver_get_client(r))));
+ break;
+ case AVAHI_RESOLVER_FOUND: {
+ // char a[AVAHI_ADDRESS_STR_MAX], *t;
+ // debug(1, "Resolve callback: Service '%s' of type '%s' in domain '%s':", name, type, domain);
+ char *dacpid = strstr(name, "iTunes_Ctrl_");
+ if (dacpid) {
+ dacpid += strlen("iTunes_Ctrl_");
+ if (strcmp(dacpid, dbs->dacp_id) == 0) {
+ debug(3, "Client's DACP port: %u.", port);
+#ifdef HAVE_DACP_CLIENT
+ dacp_monitor_port_update_callback(dacpid, port);
+#endif
+#ifdef CONFIG_METADATA
+ char portstring[20];
+ memset(portstring, 0, sizeof(portstring));
+ snprintf(portstring, sizeof(portstring), "%u", port);
+ send_ssnc_metadata('dapo', strdup(portstring), strlen(portstring), 0);
+#endif
}
+ } else {
+ debug(1, "Resolve callback: Can't see a DACP string in a DACP Record!");
}
}
+ }
// debug(1,"service resolver freed by resolve_callback");
check_avahi_response(1, avahi_service_resolver_free(r));
}
static void egroup_callback(AvahiEntryGroup *g, AvahiEntryGroupState state,
AVAHI_GCC_UNUSED void *userdata) {
switch (state) {
- case AVAHI_ENTRY_GROUP_ESTABLISHED:
- /* The entry group has been established successfully */
- debug(1, "avahi: service '%s' successfully added.", service_name);
- break;
-
- case AVAHI_ENTRY_GROUP_COLLISION: {
- char *n;
-
- /* A service name collision with a remote service
- * happened. Let's pick a new name */
- debug(1, "avahi name collision -- look for another");
- n = avahi_alternative_service_name(service_name);
- if (service_name)
- avahi_free(service_name);
- else
- debug(1, "avahi attempt to free a NULL service name");
- service_name = n;
+ case AVAHI_ENTRY_GROUP_ESTABLISHED:
+ /* The entry group has been established successfully */
+ debug(1, "avahi: service '%s' successfully added.", service_name);
+ break;
- debug(2, "avahi: service name collision, renaming service to '%s'", service_name);
+ case AVAHI_ENTRY_GROUP_COLLISION: {
+ char *n;
- /* And recreate the services */
- register_service(avahi_entry_group_get_client(g));
- break;
- }
+ /* A service name collision with a remote service
+ * happened. Let's pick a new name */
+ debug(1, "avahi name collision -- look for another");
+ n = avahi_alternative_service_name(service_name);
+ if (service_name)
+ avahi_free(service_name);
+ else
+ debug(1, "avahi attempt to free a NULL service name");
+ service_name = n;
- case AVAHI_ENTRY_GROUP_FAILURE:
- debug(1, "avahi: entry group failure: %s",
- avahi_strerror(avahi_client_errno(avahi_entry_group_get_client(g))));
- break;
+ debug(2, "avahi: service name collision, renaming service to '%s'", service_name);
- case AVAHI_ENTRY_GROUP_UNCOMMITED:
- debug(2, "avahi: service '%s' group is not yet committed.", service_name);
- break;
+ /* And recreate the services */
+ register_service(avahi_entry_group_get_client(g));
+ break;
+ }
- case AVAHI_ENTRY_GROUP_REGISTERING:
- debug(2, "avahi: service '%s' group is registering.", service_name);
- break;
+ case AVAHI_ENTRY_GROUP_FAILURE:
+ debug(1, "avahi: entry group failure: %s",
+ avahi_strerror(avahi_client_errno(avahi_entry_group_get_client(g))));
+ break;
- default:
- debug(1, "avahi: unhandled egroup state: %d", state);
- break;
+ case AVAHI_ENTRY_GROUP_UNCOMMITED:
+ debug(2, "avahi: service '%s' group is not yet committed.", service_name);
+ break;
+
+ case AVAHI_ENTRY_GROUP_REGISTERING:
+ debug(2, "avahi: service '%s' group is registering.", service_name);
+ break;
+
+ default:
+ debug(1, "avahi: unhandled egroup state: %d", state);
+ break;
}
}
}
void avahi_dacp_dont_monitor(void *userdata) {
- debug(3,"avahi_dacp_dont_monitor");
+ debug(3, "avahi_dacp_dont_monitor");
if (userdata) {
dacp_browser_struct *dbs = (dacp_browser_struct *)userdata;
// stop and dispose of everything
avahi_threaded_poll_stop((dbs)->service_poll);
*/
if (dbs->service_poll) {
- avahi_threaded_poll_stop(dbs->service_poll);
+ avahi_threaded_poll_stop(dbs->service_poll);
avahi_threaded_poll_lock(dbs->service_poll);
if (dbs->service_browser)
avahi_service_browser_free(dbs->service_browser);
if (dbs->service_client)
avahi_client_free(dbs->service_client);
- avahi_threaded_poll_unlock(dbs->service_poll);
+ avahi_threaded_poll_unlock(dbs->service_poll);
avahi_threaded_poll_free(dbs->service_poll);
}
free(dbs->dacp_id);
} else {
debug(1, "Avahi DACP Monitor is not running.");
}
- debug(3,"avahi_dacp_dont_monitor exit");
+ debug(3, "avahi_dacp_dont_monitor exit");
}
mdns_backend mdns_avahi = {.name = "avahi",
char *client_ip; // IP number used by the audio source (i.e. the "client"), which is also the DACP
// server
char *server_ip; // IP number used by Shairport Sync
- char *progress_string; // progress string, emitted by the source from time to time
+ char *progress_string; // progress string, emitted by the source from time to time
int player_thread_active; // true if a play thread is running
int dacp_server_active; // true if there's a reachable DACP server (assumed to be the Airplay
// client) ; false otherwise
} else if ((argc->track_metadata) && (argc->track_metadata->item_id)) {
char trackidstring[128];
// debug(1, "Set ID using mper ID: \"%u\".",argc->item_id);
- snprintf(trackidstring, sizeof(trackidstring), "/org/gnome/ShairportSync/mper_%u", argc->track_metadata->item_id);
+ snprintf(trackidstring, sizeof(trackidstring), "/org/gnome/ShairportSync/mper_%u",
+ argc->track_metadata->item_id);
GVariant *trackid = g_variant_new("o", trackidstring);
g_variant_builder_add(dict_builder, "{sv}", "mpris:trackid", trackid);
}
// name,(mpris_bus_type==G_BUS_TYPE_SESSION) ? "session" : "system");
pid_t pid = getpid();
char interface_name[256] = "";
- snprintf(interface_name, sizeof(interface_name), "org.mpris.MediaPlayer2.ShairportSync.i%d", pid);
+ snprintf(interface_name, sizeof(interface_name), "org.mpris.MediaPlayer2.ShairportSync.i%d", pid);
GBusType mpris_bus_type = G_BUS_TYPE_SYSTEM;
if (config.mpris_service_bus_type == DBT_session)
mpris_bus_type = G_BUS_TYPE_SESSION;
#include "rtp.h"
#include "dacp.h"
-#include <mosquitto.h>
#include "mqtt.h"
+#include <mosquitto.h>
-//this holds the mosquitto client
+// this holds the mosquitto client
struct mosquitto *global_mosq = NULL;
char *topic = NULL;
int connected = 0;
-//mosquitto logging
-void _cb_log( __attribute__((unused)) struct mosquitto *mosq,
- __attribute__((unused)) void *userdata, int level, const char *str){
- switch(level){
- case MOSQ_LOG_DEBUG:
- debug(1, str);
- break;
- case MOSQ_LOG_INFO:
- debug(2, str);
- break;
- case MOSQ_LOG_NOTICE:
- debug(3, str);
- break;
- case MOSQ_LOG_WARNING:
- inform(str);
- break;
- case MOSQ_LOG_ERR: {
- die("MQTT: Error: %s\n", str);
- }
+// mosquitto logging
+void _cb_log(__attribute__((unused)) struct mosquitto *mosq, __attribute__((unused)) void *userdata,
+ int level, const char *str) {
+ switch (level) {
+ case MOSQ_LOG_DEBUG:
+ debug(1, str);
+ break;
+ case MOSQ_LOG_INFO:
+ debug(2, str);
+ break;
+ case MOSQ_LOG_NOTICE:
+ debug(3, str);
+ break;
+ case MOSQ_LOG_WARNING:
+ inform(str);
+ break;
+ case MOSQ_LOG_ERR: {
+ die("MQTT: Error: %s\n", str);
+ }
}
}
-//mosquitto message handler
-void on_message( __attribute__((unused)) struct mosquitto* mosq,
- __attribute__((unused)) void* userdata, const struct mosquitto_message* msg){
-
- //null-terminate the payload
- char payload[msg->payloadlen+1];
- memcpy(payload,msg->payload,msg->payloadlen);
- payload[msg->payloadlen]=0;
-
- debug(1, "[MQTT]: received Message on topic %s: %s\n",msg->topic, payload);
-
- //All recognized commands
- char* commands[] = {
- "command", "beginff", "beginrew", "mutetoggle", "nextitem", "previtem", "pause",
- "playpause", "play", "stop", "playresume", "shuffle_songs", "volumedown", "volumeup",
- NULL};
-
- int it=0;
-
- //send command if it's a valid one
- while(commands[it++]!=NULL){
- if( (size_t)msg->payloadlen>=strlen(commands[it]) &&
- strncmp(msg->payload, commands[it], strlen(commands[it]))==0
- ){
- debug(1, "[MQTT]: DACP Command: %s\n",commands[it]);
+// mosquitto message handler
+void on_message(__attribute__((unused)) struct mosquitto *mosq,
+ __attribute__((unused)) void *userdata, const struct mosquitto_message *msg) {
+
+ // null-terminate the payload
+ char payload[msg->payloadlen + 1];
+ memcpy(payload, msg->payload, msg->payloadlen);
+ payload[msg->payloadlen] = 0;
+
+ debug(1, "[MQTT]: received Message on topic %s: %s\n", msg->topic, payload);
+
+ // All recognized commands
+ char *commands[] = {"command", "beginff", "beginrew", "mutetoggle", "nextitem",
+ "previtem", "pause", "playpause", "play", "stop",
+ "playresume", "shuffle_songs", "volumedown", "volumeup", NULL};
+
+ int it = 0;
+
+ // send command if it's a valid one
+ while (commands[it++] != NULL) {
+ if ((size_t)msg->payloadlen >= strlen(commands[it]) &&
+ strncmp(msg->payload, commands[it], strlen(commands[it])) == 0) {
+ debug(1, "[MQTT]: DACP Command: %s\n", commands[it]);
send_simple_dacp_command(commands[it]);
break;
}
}
}
-void on_disconnect( __attribute__((unused)) struct mosquitto* mosq,
- __attribute__((unused)) void* userdata,
- __attribute__((unused)) int rc){
+void on_disconnect(__attribute__((unused)) struct mosquitto *mosq,
+ __attribute__((unused)) void *userdata, __attribute__((unused)) int rc) {
connected = 0;
debug(1, "[MQTT]: disconnected");
}
-void on_connect(struct mosquitto* mosq,
- __attribute__((unused)) void* userdata,
- __attribute__((unused)) int rc){
+void on_connect(struct mosquitto *mosq, __attribute__((unused)) void *userdata,
+ __attribute__((unused)) int rc) {
connected = 1;
debug(1, "[MQTT]: connected");
-
- //subscribe if requested
- if(config.mqtt_enable_remote){
- char remotetopic[strlen(config.mqtt_topic)+8];
- snprintf(remotetopic,strlen(config.mqtt_topic)+8,"%s/remote",config.mqtt_topic);
- mosquitto_subscribe(mosq,NULL,remotetopic,0);
+
+ // subscribe if requested
+ if (config.mqtt_enable_remote) {
+ char remotetopic[strlen(config.mqtt_topic) + 8];
+ snprintf(remotetopic, strlen(config.mqtt_topic) + 8, "%s/remote", config.mqtt_topic);
+ mosquitto_subscribe(mosq, NULL, remotetopic, 0);
}
}
-//helper function to publish under a topic and automatically append the main topic
-void mqtt_publish(char* topic, char* data, uint32_t length){
- char fulltopic[strlen(config.mqtt_topic)+strlen(topic)+3];
- snprintf(fulltopic, strlen(config.mqtt_topic)+strlen(topic)+2, "%s/%s", config.mqtt_topic, topic);
- debug(1, "[MQTT]: publishing under %s",fulltopic);
-
+// helper function to publish under a topic and automatically append the main topic
+void mqtt_publish(char *topic, char *data, uint32_t length) {
+ char fulltopic[strlen(config.mqtt_topic) + strlen(topic) + 3];
+ snprintf(fulltopic, strlen(config.mqtt_topic) + strlen(topic) + 2, "%s/%s", config.mqtt_topic,
+ topic);
+ debug(1, "[MQTT]: publishing under %s", fulltopic);
+
int rc;
- if((rc=mosquitto_publish(global_mosq, NULL, fulltopic, length, data, 0, 0))!=MOSQ_ERR_SUCCESS) {
- switch(rc){
- case MOSQ_ERR_NO_CONN:
- debug(1, "[MQTT]: Publish failed: not connected to broker");
- break;
- default:
- debug(1, "[MQTT]: Publish failed: unknown error");
- break;
+ if ((rc = mosquitto_publish(global_mosq, NULL, fulltopic, length, data, 0, 0)) !=
+ MOSQ_ERR_SUCCESS) {
+ switch (rc) {
+ case MOSQ_ERR_NO_CONN:
+ debug(1, "[MQTT]: Publish failed: not connected to broker");
+ break;
+ default:
+ debug(1, "[MQTT]: Publish failed: unknown error");
+ break;
}
}
}
-//handler for incoming metadata
-void mqtt_process_metadata(uint32_t type, uint32_t code, char *data, uint32_t length){
- if(global_mosq==NULL || connected!=1){
+// handler for incoming metadata
+void mqtt_process_metadata(uint32_t type, uint32_t code, char *data, uint32_t length) {
+ if (global_mosq == NULL || connected != 1) {
debug(3, "[MQTT]: Client not connected, skipping metadata handling");
return;
}
- if(config.mqtt_publish_raw){
+ if (config.mqtt_publish_raw) {
uint32_t val;
char topic[] = "____/____";
-
- val=htonl(type);
- memcpy(topic,&val, 4);
- val=htonl(code);
- memcpy(topic+5,&val, 4);
+
+ val = htonl(type);
+ memcpy(topic, &val, 4);
+ val = htonl(code);
+ memcpy(topic + 5, &val, 4);
mqtt_publish(topic, data, length);
}
- if(config.mqtt_publish_parsed){
- if(type=='core'){
+ if (config.mqtt_publish_parsed) {
+ if (type == 'core') {
switch (code) {
- case 'asar':
- mqtt_publish("artist", data, length);
- break;
- case 'asal':
- mqtt_publish("album", data, length);
- break;
- case 'minm':
- mqtt_publish("title", data, length);
- break;
- case 'asgn':
- mqtt_publish("genre", data, length);
- break;
- case 'asfm':
- mqtt_publish("format", data, length);
- break;
+ case 'asar':
+ mqtt_publish("artist", data, length);
+ break;
+ case 'asal':
+ mqtt_publish("album", data, length);
+ break;
+ case 'minm':
+ mqtt_publish("title", data, length);
+ break;
+ case 'asgn':
+ mqtt_publish("genre", data, length);
+ break;
+ case 'asfm':
+ mqtt_publish("format", data, length);
+ break;
}
- }else if(type=='ssnc'){
+ } else if (type == 'ssnc') {
switch (code) {
- case 'asal':
- mqtt_publish("songalbum", data, length);
- break;
- case 'pvol':
- mqtt_publish("volume", data, length);
- break;
- case 'clip':
- mqtt_publish("client_ip", data, length);
- break;
- case 'pbeg':
- mqtt_publish("play_start", data, length);
- break;
- case 'pend':
- mqtt_publish("play_end", data, length);
- break;
- case 'pfls':
- mqtt_publish("play_flush", data, length);
- break;
- case 'prsm':
- mqtt_publish("play_resume", data, length);
- break;
- case 'PICT':
- if(config.mqtt_publish_parsed){
- mqtt_publish("cover", data, length);
- }
- break;
+ case 'asal':
+ mqtt_publish("songalbum", data, length);
+ break;
+ case 'pvol':
+ mqtt_publish("volume", data, length);
+ break;
+ case 'clip':
+ mqtt_publish("client_ip", data, length);
+ break;
+ case 'pbeg':
+ mqtt_publish("play_start", data, length);
+ break;
+ case 'pend':
+ mqtt_publish("play_end", data, length);
+ break;
+ case 'pfls':
+ mqtt_publish("play_flush", data, length);
+ break;
+ case 'prsm':
+ mqtt_publish("play_resume", data, length);
+ break;
+ case 'PICT':
+ if (config.mqtt_publish_parsed) {
+ mqtt_publish("cover", data, length);
+ }
+ break;
}
}
}
return;
}
-
int initialise_mqtt() {
debug(1, "Initialising MQTT");
- if(config.mqtt_hostname==NULL){
+ if (config.mqtt_hostname == NULL) {
debug(1, "[MQTT]: Not initialized, as the hostname is not set");
return 0;
}
int keepalive = 60;
mosquitto_lib_init();
- if( !(global_mosq = mosquitto_new(config.service_name, true, NULL)) ){
+ if (!(global_mosq = mosquitto_new(config.service_name, true, NULL))) {
die("[MQTT]: FATAL: Could not create mosquitto object! %d\n", global_mosq);
}
- if(
- config.mqtt_cafile != NULL ||
- config.mqtt_capath != NULL ||
- config.mqtt_certfile != NULL ||
- config.mqtt_keyfile != NULL
- ){
- if(mosquitto_tls_set(global_mosq,config.mqtt_cafile, config.mqtt_capath, config.mqtt_certfile, config.mqtt_keyfile, NULL) != MOSQ_ERR_SUCCESS) {
+ if (config.mqtt_cafile != NULL || config.mqtt_capath != NULL || config.mqtt_certfile != NULL ||
+ config.mqtt_keyfile != NULL) {
+ if (mosquitto_tls_set(global_mosq, config.mqtt_cafile, config.mqtt_capath, config.mqtt_certfile,
+ config.mqtt_keyfile, NULL) != MOSQ_ERR_SUCCESS) {
die("[MQTT]: TLS Setup failed");
}
}
- if(
- config.mqtt_username != NULL ||
- config.mqtt_password != NULL
- ){
- if(mosquitto_username_pw_set(global_mosq,config.mqtt_username,config.mqtt_password) != MOSQ_ERR_SUCCESS) {
+ if (config.mqtt_username != NULL || config.mqtt_password != NULL) {
+ if (mosquitto_username_pw_set(global_mosq, config.mqtt_username, config.mqtt_password) !=
+ MOSQ_ERR_SUCCESS) {
die("[MQTT]: Username/Password set failed");
}
}
mosquitto_log_callback_set(global_mosq, _cb_log);
-
- if(config.mqtt_enable_remote){
+
+ if (config.mqtt_enable_remote) {
mosquitto_message_callback_set(global_mosq, on_message);
}
-
+
mosquitto_disconnect_callback_set(global_mosq, on_disconnect);
mosquitto_connect_callback_set(global_mosq, on_connect);
- if(mosquitto_connect(global_mosq, config.mqtt_hostname, config.mqtt_port, keepalive)){
+ if (mosquitto_connect(global_mosq, config.mqtt_hostname, config.mqtt_port, keepalive)) {
inform("[MQTT]: Could not establish a mqtt connection");
}
- if(mosquitto_loop_start(global_mosq) != MOSQ_ERR_SUCCESS){
+ if (mosquitto_loop_start(global_mosq) != MOSQ_ERR_SUCCESS) {
inform("[MQTT]: Could start MQTT Main loop");
}
#ifndef MQTT_H
#define MQTT_H
-#include <stdint.h>
#include <mosquitto.h>
-
+#include <stdint.h>
int initialise_mqtt();
void mqtt_process_metadata(uint32_t type, uint32_t code, char *data, uint32_t length);
-void mqtt_publish(char* topic, char* data, uint32_t length);
+void mqtt_publish(char *topic, char *data, uint32_t length);
void mqtt_setup();
-void on_connect(struct mosquitto* mosq, void* userdata, int rc);
-void on_disconnect(struct mosquitto* mosq, void* userdata, int rc);
-void on_message(struct mosquitto* mosq, void* userdata, const struct mosquitto_message* msg);
+void on_connect(struct mosquitto *mosq, void *userdata, int rc);
+void on_disconnect(struct mosquitto *mosq, void *userdata, int rc);
+void on_message(struct mosquitto *mosq, void *userdata, const struct mosquitto_message *msg);
void _cb_log(struct mosquitto *mosq, void *userdata, int level, const char *str);
#endif /* #ifndef MQTT_H */
}
if (outsize > toutsize) {
- debug(2,
- "Output from alac_decode larger (%d bytes, not frames) than expected (%d bytes) -- "
- "truncated, but buffer overflow possible! Encrypted = %d.",
+ debug(2, "Output from alac_decode larger (%d bytes, not frames) than expected (%d bytes) -- "
+ "truncated, but buffer overflow possible! Encrypted = %d.",
outsize, toutsize, conn->stream.encrypted);
reply = -1; // output packet is the wrong size
}
*destlen = outsize / conn->input_bytes_per_frame;
if ((outsize % conn->input_bytes_per_frame) != 0)
- debug(1,
- "Number of audio frames (%d) does not correspond exactly to the number of bytes (%d) "
- "and the audio frame size (%d).",
+ debug(1, "Number of audio frames (%d) does not correspond exactly to the number of bytes (%d) "
+ "and the audio frame size (%d).",
*destlen, outsize, conn->input_bytes_per_frame);
return reply;
}
conn->input_bytes_per_frame = conn->input_num_channels * ((conn->input_bit_depth + 7) / 8);
- alac = alac_create(conn->input_bit_depth, conn->input_num_channels);
+ alac = alac_create(conn->input_bit_depth,
+ conn->input_num_channels); // no pthread cancellation point in here
if (!alac)
return 1;
conn->decoder_info = alac;
alac->setinfo_82 = fmtp[9];
alac->setinfo_86 = fmtp[10];
alac->setinfo_8a_rate = fmtp[11];
- alac_allocate_buffers(alac);
+ alac_allocate_buffers(alac); // no pthread cancellation point in here
#ifdef HAVE_APPLE_ALAC
- apple_alac_init(fmtp);
+ apple_alac_init(fmtp); // no pthread cancellation point in here
#endif
return 0;
free(conn->audio_buffer[i].data);
}
-void player_thread_lock_cleanup(void *arg) {
- rtsp_conn_info *conn = (rtsp_conn_info *)arg;
- debug(3, "Cleaning up player_thread_lock.");
- pthread_rwlock_unlock(&conn->player_thread_lock);
-}
-
void player_put_packet(seq_t seqno, uint32_t actual_timestamp, int64_t timestamp, uint8_t *data,
int len, rtsp_conn_info *conn) {
- if (pthread_rwlock_tryrdlock(&conn->player_thread_lock) == 0) {
- pthread_cleanup_push(player_thread_lock_cleanup, (void *)conn);
- if (conn->player_thread != NULL) {
-
- // all timestamps are done at the output rate
- // the "actual_timestamp" is the one that comes in the packet, and is carried over for
- // debugging
- // and checking only.
+ // all timestamps are done at the output rate
+ // the "actual_timestamp" is the one that comes in the packet, and is carried over for
+ // debugging
+ // and checking only.
- int64_t ltimestamp = timestamp * conn->output_sample_ratio;
+ int64_t ltimestamp = timestamp * conn->output_sample_ratio;
- // ignore a request to flush that has been made before the first packet...
- if (conn->packet_count == 0) {
- debug_mutex_lock(&conn->flush_mutex, 1000, 1);
- conn->flush_requested = 0;
- conn->flush_rtp_timestamp = 0;
- debug_mutex_unlock(&conn->flush_mutex, 3);
- }
+ // ignore a request to flush that has been made before the first packet...
+ if (conn->packet_count == 0) {
+ debug_mutex_lock(&conn->flush_mutex, 1000, 1);
+ conn->flush_requested = 0;
+ conn->flush_rtp_timestamp = 0;
+ debug_mutex_unlock(&conn->flush_mutex, 3);
+ }
- debug_mutex_lock(&conn->ab_mutex, 30000, 1);
- conn->packet_count++;
- conn->time_of_last_audio_packet = get_absolute_time_in_fp();
- if (conn->connection_state_to_output) { // if we are supposed to be processing these packets
+ debug_mutex_lock(&conn->ab_mutex, 30000, 1);
+ conn->packet_count++;
+ conn->time_of_last_audio_packet = get_absolute_time_in_fp();
+ if (conn->connection_state_to_output) { // if we are supposed to be processing these packets
+
+ // if (flush_rtp_timestamp != 0)
+ // debug(1,"Flush_rtp_timestamp is %u",flush_rtp_timestamp);
+
+ if ((conn->flush_rtp_timestamp != 0) && (ltimestamp <= conn->flush_rtp_timestamp)) {
+ debug(3,
+ "Dropping flushed packet in player_put_packet, seqno %u, timestamp %lld, flushing to "
+ "timestamp: %lld.",
+ seqno, ltimestamp, conn->flush_rtp_timestamp);
+ } else {
+ if ((conn->flush_rtp_timestamp != 0x0) &&
+ (ltimestamp > conn->flush_rtp_timestamp)) // if we have gone past the flush boundary time
+ conn->flush_rtp_timestamp = 0x0;
- // if (flush_rtp_timestamp != 0)
- // debug(1,"Flush_rtp_timestamp is %u",flush_rtp_timestamp);
+ abuf_t *abuf = 0;
- if ((conn->flush_rtp_timestamp != 0) && (ltimestamp <= conn->flush_rtp_timestamp)) {
- debug(
- 3,
- "Dropping flushed packet in player_put_packet, seqno %u, timestamp %lld, flushing to "
- "timestamp: %lld.",
- seqno, ltimestamp, conn->flush_rtp_timestamp);
- } else {
- if ((conn->flush_rtp_timestamp != 0x0) &&
- (ltimestamp >
- conn->flush_rtp_timestamp)) // if we have gone past the flush boundary time
- conn->flush_rtp_timestamp = 0x0;
-
- abuf_t *abuf = 0;
-
- if (!conn->ab_synced) {
- debug(3, "syncing to seqno %u.", seqno);
- conn->ab_write = seqno;
- conn->ab_read = seqno;
- conn->ab_synced = 1;
- }
+ if (!conn->ab_synced) {
+ debug(3, "syncing to seqno %u.", seqno);
+ conn->ab_write = seqno;
+ conn->ab_read = seqno;
+ conn->ab_synced = 1;
+ }
- // here, we should check for missing frames
- int resend_interval = (((250 * 44100) / 352) / 1000); // approximately 250 ms intervals
- const int number_of_resend_attempts = 8;
- int latency_based_resend_interval =
- (conn->latency) / (number_of_resend_attempts * conn->max_frames_per_packet);
- if (latency_based_resend_interval > resend_interval)
- resend_interval = latency_based_resend_interval;
-
- if (conn->resend_interval != resend_interval) {
- debug(2, "Resend interval for latency of %" PRId64 " frames is %d frames.",
- conn->latency, resend_interval);
- conn->resend_interval = resend_interval;
- }
+ // here, we should check for missing frames
+ int resend_interval = (((250 * 44100) / 352) / 1000); // approximately 250 ms intervals
+ const int number_of_resend_attempts = 8;
+ int latency_based_resend_interval =
+ (conn->latency) / (number_of_resend_attempts * conn->max_frames_per_packet);
+ if (latency_based_resend_interval > resend_interval)
+ resend_interval = latency_based_resend_interval;
+
+ if (conn->resend_interval != resend_interval) {
+ debug(2, "Resend interval for latency of %" PRId64 " frames is %d frames.", conn->latency,
+ resend_interval);
+ conn->resend_interval = resend_interval;
+ }
- if (conn->ab_write == seqno) { // expected packet
- abuf = conn->audio_buffer + BUFIDX(seqno);
- conn->ab_write = SUCCESSOR(seqno);
- } else if (seq_order(conn->ab_write, seqno, conn->ab_read)) { // newer than expected
- // if (ORDINATE(seqno)>(BUFFER_FRAMES*7)/8)
- // debug(1,"An interval of %u frames has opened, with ab_read: %u, ab_write: %u and
- // seqno:
- // %u.",seq_diff(ab_read,seqno),ab_read,ab_write,seqno);
- int32_t gap = seq_diff(conn->ab_write, seqno, conn->ab_read);
- if (gap <= 0)
- debug(1, "Unexpected gap size: %d.", gap);
- int i;
- for (i = 0; i < gap; i++) {
- abuf = conn->audio_buffer + BUFIDX(seq_sum(conn->ab_write, i));
- abuf->ready = 0; // to be sure, to be sure
- abuf->resend_level = 0;
- abuf->timestamp = 0;
- abuf->given_timestamp = 0;
- abuf->sequence_number = 0;
- }
- // debug(1,"N %d s %u.",seq_diff(ab_write,PREDECESSOR(seqno))+1,ab_write);
- abuf = conn->audio_buffer + BUFIDX(seqno);
- // rtp_request_resend(ab_write, gap);
- // resend_requests++;
- conn->ab_write = SUCCESSOR(seqno);
- } else if (seq_order(conn->ab_read, seqno, conn->ab_read)) { // late but not yet played
- conn->late_packets++;
- abuf = conn->audio_buffer + BUFIDX(seqno);
- /*
- if (abuf->ready)
- debug(1,"Late apparently duplicate packet received that is %d packets
- late.",seq_diff(seqno, conn->ab_write, conn->ab_read));
- else
- debug(1,"Late packet received that is %d packets late.",seq_diff(seqno,
- conn->ab_write, conn->ab_read));
- */
- } else { // too late.
+ if (conn->ab_write == seqno) { // expected packet
+ abuf = conn->audio_buffer + BUFIDX(seqno);
+ conn->ab_write = SUCCESSOR(seqno);
+ } else if (seq_order(conn->ab_write, seqno, conn->ab_read)) { // newer than expected
+ // if (ORDINATE(seqno)>(BUFFER_FRAMES*7)/8)
+ // debug(1,"An interval of %u frames has opened, with ab_read: %u, ab_write: %u and
+ // seqno:
+ // %u.",seq_diff(ab_read,seqno),ab_read,ab_write,seqno);
+ int32_t gap = seq_diff(conn->ab_write, seqno, conn->ab_read);
+ if (gap <= 0)
+ debug(1, "Unexpected gap size: %d.", gap);
+ int i;
+ for (i = 0; i < gap; i++) {
+ abuf = conn->audio_buffer + BUFIDX(seq_sum(conn->ab_write, i));
+ abuf->ready = 0; // to be sure, to be sure
+ abuf->resend_level = 0;
+ abuf->timestamp = 0;
+ abuf->given_timestamp = 0;
+ abuf->sequence_number = 0;
+ }
+ // debug(1,"N %d s %u.",seq_diff(ab_write,PREDECESSOR(seqno))+1,ab_write);
+ abuf = conn->audio_buffer + BUFIDX(seqno);
+ // rtp_request_resend(ab_write, gap);
+ // resend_requests++;
+ conn->ab_write = SUCCESSOR(seqno);
+ } else if (seq_order(conn->ab_read, seqno, conn->ab_read)) { // late but not yet played
+ conn->late_packets++;
+ abuf = conn->audio_buffer + BUFIDX(seqno);
+ /*
+ if (abuf->ready)
+ debug(1,"Late apparently duplicate packet received that is %d packets
+ late.",seq_diff(seqno, conn->ab_write, conn->ab_read));
+ else
+ debug(1,"Late packet received that is %d packets late.",seq_diff(seqno,
+ conn->ab_write, conn->ab_read));
+ */
+ } else { // too late.
- // debug(1,"Too late packet received that is %d packets late.",seq_diff(seqno,
- // conn->ab_write, conn->ab_read));
- conn->too_late_packets++;
- }
- // pthread_mutex_unlock(&ab_mutex);
-
- if (abuf) {
- int datalen = conn->max_frames_per_packet;
- if (alac_decode(abuf->data, &datalen, data, len, conn) == 0) {
- abuf->ready = 1;
- abuf->length = datalen;
- abuf->timestamp = ltimestamp;
- abuf->given_timestamp = actual_timestamp;
- abuf->sequence_number = seqno;
- } else {
- debug(1, "Bad audio packet detected and discarded.");
- abuf->ready = 0;
- abuf->resend_level = 0;
- abuf->timestamp = 0;
- abuf->given_timestamp = 0;
- abuf->sequence_number = 0;
- }
- }
+ // debug(1,"Too late packet received that is %d packets late.",seq_diff(seqno,
+ // conn->ab_write, conn->ab_read));
+ conn->too_late_packets++;
+ }
+ // pthread_mutex_unlock(&ab_mutex);
+
+ if (abuf) {
+ int datalen = conn->max_frames_per_packet;
+ if (alac_decode(abuf->data, &datalen, data, len, conn) == 0) {
+ abuf->ready = 1;
+ abuf->length = datalen;
+ abuf->timestamp = ltimestamp;
+ abuf->given_timestamp = actual_timestamp;
+ abuf->sequence_number = seqno;
+ } else {
+ debug(1, "Bad audio packet detected and discarded.");
+ abuf->ready = 0;
+ abuf->resend_level = 0;
+ abuf->timestamp = 0;
+ abuf->given_timestamp = 0;
+ abuf->sequence_number = 0;
+ }
+ }
- // pthread_mutex_lock(&ab_mutex);
- int rc = pthread_cond_signal(&conn->flowcontrol);
- if (rc)
- debug(1, "Error signalling flowcontrol.");
-
- // if it's at the expected time, do a look back for missing packets
- // but release the ab_mutex when doing a resend
- if (!conn->ab_buffering) {
- int j;
- for (j = 1; j <= number_of_resend_attempts; j++) {
- // check j times, after a short period of has elapsed, assuming 352 frames per packet
- // the higher the step_exponent, the less it will try. 1 means it will try very
- // hard. 2.0 seems good.
- float step_exponent = 2.0;
- int back_step = (int)(resend_interval * pow(j, step_exponent));
- int k;
- for (k = -1; k <= 1; k++) {
- if ((back_step + k) <
- seq_diff(conn->ab_read, conn->ab_write,
- conn->ab_read)) { // if it's within the range of frames in use...
- int item_to_check = (conn->ab_write - (back_step + k)) & 0xffff;
- seq_t next = item_to_check;
- abuf_t *check_buf = conn->audio_buffer + BUFIDX(next);
- if ((!check_buf->ready) &&
- (check_buf->resend_level <
- j)) { // prevent multiple requests from the same level of lookback
- check_buf->resend_level = j;
- if (config.disable_resend_requests == 0) {
- if (((int)(resend_interval * pow(j + 1, step_exponent)) + k) >=
- seq_diff(conn->ab_read, conn->ab_write, conn->ab_read))
- debug(3,
- "Last-ditch (#%d) resend request for packet %u in range %u to %u. "
- "Looking back %d packets.",
- j, next, conn->ab_read, conn->ab_write, back_step + k);
- debug_mutex_unlock(&conn->ab_mutex, 3);
- rtp_request_resend(next, 1, conn);
- conn->resend_requests++;
- debug_mutex_lock(&conn->ab_mutex, 20000, 1);
- }
- }
+ // pthread_mutex_lock(&ab_mutex);
+ int rc = pthread_cond_signal(&conn->flowcontrol);
+ if (rc)
+ debug(1, "Error signalling flowcontrol.");
+
+ // if it's at the expected time, do a look back for missing packets
+ // but release the ab_mutex when doing a resend
+ if (!conn->ab_buffering) {
+ int j;
+ for (j = 1; j <= number_of_resend_attempts; j++) {
+ // check j times, after a short period of has elapsed, assuming 352 frames per packet
+ // the higher the step_exponent, the less it will try. 1 means it will try very
+ // hard. 2.0 seems good.
+ float step_exponent = 2.0;
+ int back_step = (int)(resend_interval * pow(j, step_exponent));
+ int k;
+ for (k = -1; k <= 1; k++) {
+ if ((back_step + k) <
+ seq_diff(conn->ab_read, conn->ab_write,
+ conn->ab_read)) { // if it's within the range of frames in use...
+ int item_to_check = (conn->ab_write - (back_step + k)) & 0xffff;
+ seq_t next = item_to_check;
+ abuf_t *check_buf = conn->audio_buffer + BUFIDX(next);
+ if ((!check_buf->ready) &&
+ (check_buf->resend_level <
+ j)) { // prevent multiple requests from the same level of lookback
+ check_buf->resend_level = j;
+ if (config.disable_resend_requests == 0) {
+ if (((int)(resend_interval * pow(j + 1, step_exponent)) + k) >=
+ seq_diff(conn->ab_read, conn->ab_write, conn->ab_read))
+ debug(3, "Last-ditch (#%d) resend request for packet %u in range %u to %u. "
+ "Looking back %d packets.",
+ j, next, conn->ab_read, conn->ab_write, back_step + k);
+ debug_mutex_unlock(&conn->ab_mutex, 3);
+ rtp_request_resend(next, 1, conn);
+ conn->resend_requests++;
+ debug_mutex_lock(&conn->ab_mutex, 20000, 1);
}
}
}
}
}
}
- debug_mutex_unlock(&conn->ab_mutex, 3);
- } else {
- debug(1, "player_put_packet discarded packet %d because the player thread was gone.");
}
- pthread_cleanup_pop(1);
- // pthread_rwlock_unlock(&conn->player_thread_lock);
- } else {
- debug(1, "player_put_packet discarded packet %d because the player thread was locked.", seqno);
}
+ debug_mutex_unlock(&conn->ab_mutex, 3);
}
int32_t rand_in_range(int32_t exclusive_range_limit) {
// if (conn->packet_count>500) { //for testing -- about 4 seconds of play first
if ((local_time_now > conn->time_of_last_audio_packet) &&
(local_time_now - conn->time_of_last_audio_packet >= ct << 32)) {
- debug(1,
- "As Yeats almost said, \"Too long a silence / can make a stone of the heart\" "
- "from RTSP conversation %d.",
+ debug(1, "As Yeats almost said, \"Too long a silence / can make a stone of the heart\" "
+ "from RTSP conversation %d.",
conn->connection_number);
conn->stop = 1;
pthread_kill(conn->thread, SIGUSR1);
int64_t sync_error, correction, drift;
} stats_t;
-void *player_thread_func(void *arg) {
- // note, the thread will be started up with the player_thread_lock locked. You must release it
- // quickly.
-
+void player_thread_cleanup_handler(void *arg) {
+ debug(1, "player_thread_cleanup_handler called");
rtsp_conn_info *conn = (rtsp_conn_info *)arg;
+}
- pthread_rwlock_wrlock(&conn->player_thread_lock);
-
- int rc = pthread_mutex_init(&conn->ab_mutex, NULL);
- if (rc)
- debug(1, "Error initialising ab_mutex.");
- rc = pthread_mutex_init(&conn->flush_mutex, NULL);
- if (rc)
- debug(1, "Error initialising flush_mutex.");
-// set the flowcontrol condition variable to wait on a monotonic clock
-#ifdef COMPILE_FOR_LINUX_AND_FREEBSD_AND_CYGWIN_AND_OPENBSD
- pthread_condattr_t attr;
- pthread_condattr_init(&attr);
- pthread_condattr_setclock(&attr, CLOCK_MONOTONIC); // can't do this in OS X, and don't need it.
- rc = pthread_cond_init(&conn->flowcontrol, &attr);
-#endif
-#ifdef COMPILE_FOR_OSX
- rc = pthread_cond_init(&conn->flowcontrol, NULL);
-#endif
- if (rc)
- debug(1, "Error initialising flowcontrol condition variable.");
-
- pthread_rwlock_unlock(&conn->player_thread_lock);
-
- // it's safe now
+void *player_thread_func(void *arg) {
+ pthread_cleanup_push(player_thread_cleanup_handler, arg);
+ rtsp_conn_info *conn = (rtsp_conn_info *)arg;
conn->packet_count = 0;
conn->previous_random_number = 0;
conn->latency = 88200;
}
- config.output->start(config.output_rate, config.output_format);
+ config.output->start(config.output_rate, config.output_format); // will need a corresponding stop
init_decoder((int32_t *)&conn->stream.fmtp,
- conn); // this sets up incoming rate, bit depth, channels
- // must be after decoder init
- init_buffer(conn);
+ conn); // this sets up incoming rate, bit depth, channels.
+ // No pthread cancellation point in here
+ // This must be after init_decoder
+ init_buffer(conn); // will need a corresponding deallocation
if (conn->stream.encrypted) {
#ifdef HAVE_LIBMBEDTLS
// if ((input_rate!=config.output_rate) || (input_bit_depth!=output_bit_depth)) {
// debug(1,"Define tbuf of length
// %d.",output_bytes_per_frame*(max_frames_per_packet*output_sample_ratio+max_frame_size_change));
- tbuf = malloc(
- sizeof(int32_t) * 2 *
- (conn->max_frames_per_packet * conn->output_sample_ratio + conn->max_frame_size_change));
+ tbuf = malloc(sizeof(int32_t) * 2 * (conn->max_frames_per_packet * conn->output_sample_ratio +
+ conn->max_frame_size_change));
if (tbuf == NULL)
die("Failed to allocate memory for the transition buffer.");
sbuf = 0;
// initialise this, because soxr stuffing might be chosen later
- sbuf = malloc(
- sizeof(int32_t) * 2 *
- (conn->max_frames_per_packet * conn->output_sample_ratio + conn->max_frame_size_change));
+ sbuf = malloc(sizeof(int32_t) * 2 * (conn->max_frames_per_packet * conn->output_sample_ratio +
+ conn->max_frame_size_change));
if (sbuf == NULL)
debug(1, "Failed to allocate memory for the sbuf buffer.");
// stop looking elsewhere for DACP stuff
#ifdef HAVE_DACP_CLIENT
- set_dacp_server_information(conn); // this will start scanning when a port is registered by the
+ // this may have pthread cancellation points in it
+ set_dacp_server_information(conn); // this will start scanning when a port is registered by the
// code initiated by the mdns_dacp_monitor
#else
// this is only used for compatability, if dacp stuff isn't enabled.
if (conn->dapo_private_storage)
debug(1, "DACP monitor already initialised?");
else
- conn->dapo_private_storage = mdns_dacp_monitor(conn->dacp_id);
+ conn->dapo_private_storage = mdns_dacp_monitor(conn->dacp_id); // ??
#endif
conn->framesProcessedInThisEpoch = 0;
// set the default volume to whaterver it was before, as stored in the config airplay_volume
debug(3, "Set initial volume to %f.", config.airplay_volume);
- player_volume(config.airplay_volume, conn);
+ player_volume(config.airplay_volume, conn); // ??
int64_t frames_to_drop = 0;
// debug(1, "Play begin");
while (!conn->player_thread_please_stop) {
} else {
// the player may change the contents of the buffer, so it has to be zeroed each time;
// might as well malloc and freee it locally
- memset(silence, 0,
- conn->output_bytes_per_frame * conn->max_frames_per_packet *
- conn->output_sample_ratio);
+ memset(silence, 0, conn->output_bytes_per_frame * conn->max_frames_per_packet *
+ conn->output_sample_ratio);
config.output->play(silence, conn->max_frames_per_packet * conn->output_sample_ratio);
free(silence);
}
} else {
// the player may change the contents of the buffer, so it has to be zeroed each time;
// might as well malloc and freee it locally
- memset(silence, 0,
- conn->output_bytes_per_frame * conn->max_frames_per_packet *
- conn->output_sample_ratio);
+ memset(silence, 0, conn->output_bytes_per_frame * conn->max_frames_per_packet *
+ conn->output_sample_ratio);
config.output->play(silence, conn->max_frames_per_packet * conn->output_sample_ratio);
free(silence);
}
SUCCESSOR(conn->last_seqno_read); // int32_t from seq_t, i.e. uint16_t, so okay.
if (inframe->sequence_number !=
conn->last_seqno_read) { // seq_t, ei.e. uint16_t and int32_t, so okay
- debug(2,
- "Player: packets out of sequence: expected: %u, got: %u, with ab_read: %u "
- "and ab_write: %u.",
+ debug(2, "Player: packets out of sequence: expected: %u, got: %u, with ab_read: %u "
+ "and ab_write: %u.",
conn->last_seqno_read, inframe->sequence_number, conn->ab_read, conn->ab_write);
conn->last_seqno_read = inframe->sequence_number; // reset warning...
}
#ifdef CONFIG_CONVOLUTION
|| config.convolution
#endif
- ) {
+ ) {
int32_t *tbuf32 = (int32_t *)tbuf;
float fbuf_l[inbuflength];
float fbuf_r[inbuflength];
"%*d", /* max buffer occupancy */
9, /* should be 10, but there's an explicit space at the start to ensure
alignment */
- 1000 * moving_average_sync_error / config.output_rate, 10,
- moving_average_correction * 1000000 / (352 * conn->output_sample_ratio), 10,
- moving_average_insertions_plus_deletions * 1000000 /
- (352 * conn->output_sample_ratio),
+ 1000 * moving_average_sync_error / config.output_rate,
+ 10, moving_average_correction * 1000000 / (352 * conn->output_sample_ratio),
+ 10, moving_average_insertions_plus_deletions * 1000000 /
+ (352 * conn->output_sample_ratio),
12, play_number, 7, conn->missing_packets, 7, conn->late_packets, 7,
conn->too_late_packets, 7, conn->resend_requests, 7, minimum_dac_queue_size,
5, minimum_buffer_occupancy, 5, maximum_buffer_occupancy);
"%*d", /* max buffer occupancy */
9, /* should be 10, but there's an explicit space at the start to ensure
alignment */
- 1000 * moving_average_sync_error / config.output_rate, 12, play_number, 7,
- conn->missing_packets, 7, conn->late_packets, 7, conn->too_late_packets, 7,
- conn->resend_requests, 7, minimum_dac_queue_size, 5,
- minimum_buffer_occupancy, 5, maximum_buffer_occupancy);
+ 1000 * moving_average_sync_error / config.output_rate,
+ 12, play_number, 7, conn->missing_packets, 7, conn->late_packets, 7,
+ conn->too_late_packets, 7, conn->resend_requests, 7,
+ minimum_dac_queue_size, 5, minimum_buffer_occupancy, 5,
+ maximum_buffer_occupancy);
}
} else {
inform(" %*.1f," /* Sync error in milliseconds */
"%*d", /* max buffer occupancy */
9, /* should be 10, but there's an explicit space at the start to ensure
alignment */
- 1000 * moving_average_sync_error / config.output_rate, 12, play_number, 7,
- conn->missing_packets, 7, conn->late_packets, 7, conn->too_late_packets, 7,
- conn->resend_requests, 5, minimum_buffer_occupancy, 5,
- maximum_buffer_occupancy);
+ 1000 * moving_average_sync_error / config.output_rate,
+ 12, play_number, 7, conn->missing_packets, 7, conn->late_packets, 7,
+ conn->too_late_packets, 7, conn->resend_requests, 5,
+ minimum_buffer_occupancy, 5, maximum_buffer_occupancy);
}
} else {
inform("No frames received in the last sampling interval.");
free_audio_buffers(conn);
terminate_decoders(conn);
- // remove flow control and mutexes
- rc = pthread_cond_destroy(&conn->flowcontrol);
- if (rc)
- debug(1, "Error destroying flowcontrol condition variable.");
- rc = pthread_mutex_destroy(&conn->flush_mutex);
- if (rc)
- debug(1, "Error destroying flush_mutex variable.");
- rc = pthread_mutex_destroy(&conn->ab_mutex);
- if (rc)
- debug(1, "Error destroying ab_mutex variable.");
debug(2, "Connection %d: player thread terminated.", conn->connection_number);
if (conn->dacp_id) {
free(conn->dacp_id);
free(tbuf);
if (sbuf)
free(sbuf);
+ pthread_cleanup_pop(1);
pthread_exit(NULL);
}
}
void do_flush(int64_t timestamp, rtsp_conn_info *conn) {
+
debug(3, "Flush requested up to %u. It seems as if 0 is special.", timestamp);
debug_mutex_lock(&conn->flush_mutex, 1000, 1);
conn->flush_requested = 1;
// if (timestamp!=0)
conn->flush_rtp_timestamp = timestamp; // flush all packets up to (and including?) this
- debug_mutex_unlock(&conn->flush_mutex, 3);
conn->play_segment_reference_frame = 0;
conn->play_number_after_flush = 0;
+ debug_mutex_unlock(&conn->flush_mutex, 3);
+
#ifdef CONFIG_METADATA
// only send a flush metadata message if the first packet has been seen -- it's a bogus message
// otherwise
send_ssnc_metadata('pfls', NULL, 0, 1);
}
#endif
+
debug(3, "Flush request made.");
}
void player_flush(int64_t timestamp, rtsp_conn_info *conn) {
debug(3, "player_flush");
- if (pthread_rwlock_tryrdlock(&conn->player_thread_lock) == 0) {
- if (conn->player_thread != NULL)
- do_flush(timestamp, conn);
- else
- debug(1, "Flush requested when player thread is gone.");
- pthread_rwlock_unlock(&conn->player_thread_lock);
- } else {
- debug(3, "Can't acquire a read lock for a flush -- ignored.");
- }
+ do_flush(timestamp, conn);
}
int player_play(rtsp_conn_info *conn) {
int rc = pthread_attr_setstacksize(&tattr, size);
if (rc)
debug(1, "Error setting stack size for player_thread: %s", strerror(errno));
-
- // hack alert -- the player thread itself releases the player_thread_lock rwlock as soon as it's
// finished initialising.
pthread_create(pt, &tattr, player_thread_func, (void *)conn);
pthread_attr_destroy(&tattr);
int player_stop(rtsp_conn_info *conn) {
debug(3, "player_stop");
- pthread_rwlock_wrlock(&conn->player_thread_lock);
- debug(3, "player_thread_lock acquired");
if (conn->player_thread) {
debug(3, "player_thread exists");
conn->player_thread_please_stop = 1;
pthread_join(*conn->player_thread, NULL);
free(conn->player_thread);
conn->player_thread = NULL;
- pthread_rwlock_unlock(&conn->player_thread_lock);
#ifdef CONFIG_METADATA
debug(2, "pend");
send_ssnc_metadata('pend', NULL, 0, 1);
command_stop();
return 0;
} else {
- pthread_rwlock_unlock(&conn->player_thread_lock);
- debug(3, "player thread of RTSP conversation %d is already deleted.", conn->connection_number);
+ debug(3, "Connection %d: player thread already deleted.", conn->connection_number);
return -1;
}
}
// pthread_t *ptp;
pthread_t *player_thread;
- pthread_rwlock_t player_thread_lock; // used to control access by "outsiders"
abuf_t audio_buffer[BUFFER_FRAMES];
int max_frames_per_packet, input_num_channels, input_bit_depth, input_rate;
void rtp_audio_receiver_cleanup_handler(void *arg) {
debug(3, "Audio Receiver Cleanup.");
rtsp_conn_info *conn = (rtsp_conn_info *)arg;
- debug(1,"shutdown audio socket.");
- shutdown(conn->audio_socket,SHUT_RDWR);
- debug(1,"close audio socket.");
+ debug(1, "shutdown audio socket.");
+ shutdown(conn->audio_socket, SHUT_RDWR);
+ debug(1, "close audio socket.");
close(conn->audio_socket);
debug(3, "Audio Receiver Cleanup Successful.");
}
void rtp_control_handler_cleanup_handler(void *arg) {
debug(3, "Control Receiver Cleanup.");
rtsp_conn_info *conn = (rtsp_conn_info *)arg;
- debug(1,"shutdown control socket.");
- shutdown(conn->control_socket,SHUT_RDWR);
- debug(1,"close control socket.");
+ debug(1, "shutdown control socket.");
+ shutdown(conn->control_socket, SHUT_RDWR);
+ debug(1, "close control socket.");
close(conn->control_socket);
debug(3, "Control Receiver Cleanup Successful.");
}
rtsp_conn_info *conn = (rtsp_conn_info *)arg;
pthread_cancel(conn->timer_requester);
pthread_join(conn->timer_requester, NULL);
- debug(1,"shutdown timing socket.");
- shutdown(conn->timing_socket,SHUT_RDWR);
- debug(1,"close timing socket.");
+ debug(1, "shutdown timing socket.");
+ shutdown(conn->timing_socket, SHUT_RDWR);
+ debug(1, "close timing socket.");
close(conn->timing_socket);
debug(3, "Timing Receiver Cleanup Successful.");
}
static void *rtsp_conversation_thread_func(void *pconn) {
rtsp_conn_info *conn = pconn;
- // create the player thread lock.
- int rwli = pthread_rwlock_init(&conn->player_thread_lock, NULL);
- if (rwli != 0)
- die("Error %d initialising player_thread_lock for conversation thread %d.", rwli,
- conn->connection_number);
+ int rc = pthread_mutex_init(&conn->flush_mutex, NULL);
+ if (rc)
+ die("Connection %d: error %d initialising flush_mutex.", conn->connection_number, rc);
+ rc = pthread_mutex_init(&conn->ab_mutex, NULL);
+ if (rc)
+ die("Connection %d: error %d initialising ab_mutex.", conn->connection_number, rc);
+// set the flowcontrol condition variable to wait on a monotonic clock
+#ifdef COMPILE_FOR_LINUX_AND_FREEBSD_AND_CYGWIN_AND_OPENBSD
+ pthread_condattr_t attr;
+ pthread_condattr_init(&attr);
+ pthread_condattr_setclock(&attr, CLOCK_MONOTONIC); // can't do this in OS X, and don't need it.
+ rc = pthread_cond_init(&conn->flowcontrol, &attr);
+#endif
+#ifdef COMPILE_FOR_OSX
+ rc = pthread_cond_init(&conn->flowcontrol, NULL);
+#endif
+ if (rc)
+ die("Connection %d: error %d initialising flow control condition variable.",
+ conn->connection_number, rc);
rtp_initialise(conn);
debug(2, "Connection %d: RTSP thread terminated.", conn->connection_number);
conn->running = 0;
- // release the player_thread_lock
- int rwld = pthread_rwlock_destroy(&conn->player_thread_lock);
- if (rwld)
- debug(1, "Error %d destroying player_thread_lock for conversation thread %d.", rwld,
- conn->connection_number);
-
+ // remove flow control and mutexes
+ rc = pthread_cond_destroy(&conn->flowcontrol);
+ if (rc)
+ debug(1, "Connection %d: error %d destroying flow control condition variable.",
+ conn->connection_number, rc);
+ rc = pthread_mutex_destroy(&conn->ab_mutex);
+ if (rc)
+ debug(1, "Connection %d: error %d destroying ab_mutex.", conn->connection_number, rc);
+ rc = pthread_mutex_destroy(&conn->flush_mutex);
+ if (rc)
+ debug(1, "Connection %d: error %d destroying flush_mutex.", conn->connection_number, rc);
pthread_exit(NULL);
}
#include <FFTConvolver/convolver.h>
#endif
-static inline int config_set_lookup_bool(config_t* cfg, char* where, int* dst) {
+static inline int config_set_lookup_bool(config_t *cfg, char *where, int *dst) {
const char *str = 0;
if (config_lookup_string(cfg, where, &str)) {
- if (strcasecmp(str, "no") == 0){
- (*dst)=0;
+ if (strcasecmp(str, "no") == 0) {
+ (*dst) = 0;
return 1;
- }else if (strcasecmp(str, "yes") == 0){
- (*dst)=1;
+ } else if (strcasecmp(str, "yes") == 0) {
+ (*dst) = 1;
return 1;
- }else{
+ } else {
die("Invalid %s option choice \"%s\". It should be \"yes\" or \"no\"", where, str);
return 0;
}
- }else{
+ } else {
return 0;
}
}
config_set_lookup_bool(config.cfg, "sessioncontrol.daemonize_with_pid_file", &daemonisewith);
/* Get the Just_Daemonize setting. */
- config_set_lookup_bool(config.cfg, "sessioncontrol.daemonize_without_pid_file", &daemonisewithout);
+ config_set_lookup_bool(config.cfg, "sessioncontrol.daemonize_without_pid_file",
+ &daemonisewithout);
if ((daemonisewith) && (daemonisewithout))
die("Select either daemonize_with_pid_file or daemonize_without_pid_file -- you have "
}
/* Get the statistics setting. */
- if (!config_set_lookup_bool(config.cfg, "general.statistics", &(config.statistics_requested))) {
+ if (!config_set_lookup_bool(config.cfg, "general.statistics",
+ &(config.statistics_requested))) {
warn("The \"general\" \"statistics\" setting is deprecated. Please use the \"diagnostics\" "
"\"statistics\" setting instead.");
}
#endif
#ifdef CONFIG_MQTT
- int tmpval=0;
- config_set_lookup_bool(config.cfg, "mqtt.enabled", &config.mqtt_enabled);
- if(config.mqtt_enabled && !config.metadata_enabled){
- die("You need to have metadata enabled in order to use mqtt");
- }
- if (config_lookup_string(config.cfg, "mqtt.hostname", &str)) {
- config.mqtt_hostname = (char *)str;
- //TODO: Document that, if this is false, whole mqtt func is disabled
- }
- if (config_lookup_int(config.cfg, "mqtt.port", &tmpval)) {
- config.mqtt_port = tmpval;
- }else{
- //TODO: Is this the correct way to set a default value?
- config.mqtt_port = 1883;
- }
-
- if (config_lookup_string(config.cfg, "mqtt.username", &str)) {
- config.mqtt_username = (char *)str;
- }
- if (config_lookup_string(config.cfg, "mqtt.password", &str)) {
- config.mqtt_password = (char *)str;
- }
- int capath=0;
- if (config_lookup_string(config.cfg, "mqtt.capath", &str)) {
- config.mqtt_capath = (char *)str;
- capath=1;
- }
- if (config_lookup_string(config.cfg, "mqtt.cafile", &str)) {
- if(capath)
- die("Supply either mqtt cafile or mqtt capath -- you have supplied both!");
- config.mqtt_cafile = (char *)str;
- }
- int certkeynum=0;
- if (config_lookup_string(config.cfg, "mqtt.certfile", &str)) {
- config.mqtt_certfile = (char *)str;
- certkeynum++;
- }
- if (config_lookup_string(config.cfg, "mqtt.keyfile", &str)) {
- config.mqtt_keyfile = (char *)str;
- certkeynum++;
- }
- if( certkeynum!=0 && certkeynum!=2){
- die("If you want to use TLS Client Authentication, you have to specify "
- "mqtt.certfile AND mqtt.keyfile.\nYou have supplied only one of them.\n"
- "If you do not want to use TLS Client Authentication, leave both empty."
- );
- }
-
- if(config_lookup_string(config.cfg, "mqtt.topic", &str)){
- config.mqtt_topic = (char *)str;
- }
- config_set_lookup_bool(config.cfg, "mqtt.publish_raw", &config.mqtt_publish_raw);
- config_set_lookup_bool(config.cfg, "mqtt.publish_parsed", &config.mqtt_publish_parsed);
- config_set_lookup_bool(config.cfg, "mqtt.publish_cover", &config.mqtt_publish_cover);
- config_set_lookup_bool(config.cfg, "mqtt.enable_remote", &config.mqtt_enable_remote);
+ int tmpval = 0;
+ config_set_lookup_bool(config.cfg, "mqtt.enabled", &config.mqtt_enabled);
+ if (config.mqtt_enabled && !config.metadata_enabled) {
+ die("You need to have metadata enabled in order to use mqtt");
+ }
+ if (config_lookup_string(config.cfg, "mqtt.hostname", &str)) {
+ config.mqtt_hostname = (char *)str;
+ // TODO: Document that, if this is false, whole mqtt func is disabled
+ }
+ if (config_lookup_int(config.cfg, "mqtt.port", &tmpval)) {
+ config.mqtt_port = tmpval;
+ } else {
+ // TODO: Is this the correct way to set a default value?
+ config.mqtt_port = 1883;
+ }
+
+ if (config_lookup_string(config.cfg, "mqtt.username", &str)) {
+ config.mqtt_username = (char *)str;
+ }
+ if (config_lookup_string(config.cfg, "mqtt.password", &str)) {
+ config.mqtt_password = (char *)str;
+ }
+ int capath = 0;
+ if (config_lookup_string(config.cfg, "mqtt.capath", &str)) {
+ config.mqtt_capath = (char *)str;
+ capath = 1;
+ }
+ if (config_lookup_string(config.cfg, "mqtt.cafile", &str)) {
+ if (capath)
+ die("Supply either mqtt cafile or mqtt capath -- you have supplied both!");
+ config.mqtt_cafile = (char *)str;
+ }
+ int certkeynum = 0;
+ if (config_lookup_string(config.cfg, "mqtt.certfile", &str)) {
+ config.mqtt_certfile = (char *)str;
+ certkeynum++;
+ }
+ if (config_lookup_string(config.cfg, "mqtt.keyfile", &str)) {
+ config.mqtt_keyfile = (char *)str;
+ certkeynum++;
+ }
+ if (certkeynum != 0 && certkeynum != 2) {
+ die("If you want to use TLS Client Authentication, you have to specify "
+ "mqtt.certfile AND mqtt.keyfile.\nYou have supplied only one of them.\n"
+ "If you do not want to use TLS Client Authentication, leave both empty.");
+ }
+
+ if (config_lookup_string(config.cfg, "mqtt.topic", &str)) {
+ config.mqtt_topic = (char *)str;
+ }
+ config_set_lookup_bool(config.cfg, "mqtt.publish_raw", &config.mqtt_publish_raw);
+ config_set_lookup_bool(config.cfg, "mqtt.publish_parsed", &config.mqtt_publish_parsed);
+ config_set_lookup_bool(config.cfg, "mqtt.publish_cover", &config.mqtt_publish_cover);
+ config_set_lookup_bool(config.cfg, "mqtt.enable_remote", &config.mqtt_enable_remote);
#endif
free(config_file_real_path);
}
free(i2);
free(i3);
free(vs);
-
+
#ifdef CONFIG_MQTT
// mqtt topic was not set. As we have the service name just now, set it
- if(config.mqtt_topic==NULL){
- int topic_length=1+strlen(config.service_name)+1;
- char* topic=malloc(topic_length+1);
- snprintf(topic,topic_length,"/%s/",config.service_name);
+ if (config.mqtt_topic == NULL) {
+ int topic_length = 1 + strlen(config.service_name) + 1;
+ char *topic = malloc(topic_length + 1);
+ snprintf(topic, topic_length, "/%s/", config.service_name);
config.mqtt_topic = topic;
}
#endif
daemon_pid_file_ident = daemon_log_ident = daemon_ident_from_argv0(argv[0]);
daemon_pid_file_proc = pid_file_proc;
-
+
/* Check if we are called with -D or --disconnectFromOutput parameter */
if (argc >= 2 &&
((strcmp(argv[1], "-D") == 0) || (strcmp(argv[1], "--disconnectFromOutput") == 0))) {
#endif
#ifdef HAVE_MQTT
- if(config.mqtt_enabled){
+ if (config.mqtt_enabled) {
initialise_mqtt();
}
#endif