TAILQ_ENTRY(upnp_data) data_link;
struct sockaddr_storage storage;
htsbuf_queue_t queue;
+ int delay_ms;
} upnp_data_t;
TAILQ_HEAD(upnp_data_queue_write, upnp_data);
*
*/
void
-upnp_send( htsbuf_queue_t *q, struct sockaddr_storage *storage )
+upnp_send( htsbuf_queue_t *q, struct sockaddr_storage *storage, int delay_ms )
{
upnp_data_t *data;
data->storage = upnp_ipv4_multicast;
else
data->storage = *storage;
+ data->delay_ms = delay_ms;
pthread_mutex_lock(&upnp_lock);
TAILQ_INSERT_TAIL(&upnp_data_write, data, data_link);
pthread_mutex_unlock(&upnp_lock);
struct sockaddr_storage ip;
socklen_t iplen;
size_t size;
- int r;
+ int r, delay_ms;
multicast = udp_bind("upnp", "upnp_thread_multicast",
"239.255.255.250", 1900,
ev[1].data.ptr = unicast;
tvhpoll_add(poll, ev, 2);
+ delay_ms = 0;
+
while (upnp_running && multicast->fd >= 0) {
- r = tvhpoll_wait(poll, ev, 2, 1000);
+ r = tvhpoll_wait(poll, ev, 2, delay_ms ?: 1000);
+ if (r == 0) /* timeout */
+ delay_ms = 0;
while (r-- > 0) {
if ((ev[r].events & TVHPOLL_IN) != 0) {
}
}
- while (1) {
+ while (delay_ms == 0) {
pthread_mutex_lock(&upnp_lock);
data = TAILQ_FIRST(&upnp_data_write);
- if (data)
- TAILQ_REMOVE(&upnp_data_write, data, data_link);
+ if (data) {
+ delay_ms = data->delay_ms;
+ data->delay_ms = 0;
+ if (!delay_ms) {
+ TAILQ_REMOVE(&upnp_data_write, data, data_link);
+ } else {
+ data = NULL;
+ }
+ }
pthread_mutex_unlock(&upnp_lock);
if (data == NULL)
break;
udp_write_queue(unicast, &data->queue, &data->storage);
htsbuf_queue_flush(&data->queue);
free(data);
+ delay_ms = 0;
}
}
+ /* flush the write queue (byebye messages) */
+ while (1) {
+ pthread_mutex_lock(&upnp_lock);
+ data = TAILQ_FIRST(&upnp_data_write);
+ if (data)
+ TAILQ_REMOVE(&upnp_data_write, data, data_link);
+ pthread_mutex_unlock(&upnp_lock);
+ if (data == NULL)
+ break;
+ usleep((long)data->delay_ms * 1000);
+ udp_write_queue(unicast, &data->queue, &data->storage);
+ htsbuf_queue_flush(&data->queue);
+ free(data);
+ }
+
error:
upnp_running = 0;
tvhpoll_destroy(poll);