From: Jaroslav Kysela Date: Fri, 4 Mar 2016 11:46:49 +0000 (+0100) Subject: use monotonic thread conditions where appropriate X-Git-Tag: v4.2.1~965 X-Git-Url: http://git.ipfire.org/gitweb.cgi?a=commitdiff_plain;h=6ff670943ce70a8f279e99306a1d49a78c6b6949;p=thirdparty%2Ftvheadend.git use monotonic thread conditions where appropriate --- diff --git a/src/descrambler/capmt.c b/src/descrambler/capmt.c index c2ca1a462..9cc694253 100644 --- a/src/descrambler/capmt.c +++ b/src/descrambler/capmt.c @@ -256,7 +256,7 @@ typedef struct capmt_adapter { typedef struct capmt { caclient_t; - pthread_cond_t capmt_cond; + tvh_cond_t capmt_cond; struct capmt_service_list capmt_services; @@ -1641,7 +1641,6 @@ capmt_thread(void *aux) capmt_t *capmt = aux; capmt_adapter_t *ca; capmt_opaque_t *t; - struct timespec ts; int d, i, j, fatal; tvhlog(LOG_INFO, "capmt", "%s active", capmt_name(capmt)); @@ -1677,7 +1676,7 @@ capmt_thread(void *aux) pthread_mutex_lock(&capmt->capmt_mutex); while(capmt->capmt_running && capmt->cac_enabled == 0) - pthread_cond_wait(&capmt->capmt_cond, &capmt->capmt_mutex); + tvh_cond_wait(&capmt->capmt_cond, &capmt->capmt_mutex); pthread_mutex_unlock(&capmt->capmt_mutex); @@ -1770,12 +1769,10 @@ capmt_thread(void *aux) d = 60; } - clock_gettime(CLOCK_REALTIME, &ts); - ts.tv_sec += d; - tvhlog(LOG_INFO, "capmt", "%s: Automatic reconnection attempt in in %d seconds", idnode_get_title(&capmt->cac_id, NULL), d); - pthread_cond_timedwait(&capmt->capmt_cond, &capmt->capmt_mutex, &ts); + tvh_cond_timedwait(&capmt->capmt_cond, &capmt->capmt_mutex, + getmonoclock() + d * MONOCLOCK_RESOLUTION); pthread_mutex_unlock(&capmt->capmt_mutex); } @@ -2195,7 +2192,7 @@ capmt_service_start(caclient_t *cac, service_t *s) LIST_INSERT_HEAD(&capmt->capmt_services, ct, ct_link); /* wake-up idle thread */ - pthread_cond_signal(&capmt->capmt_cond); + tvh_cond_signal(&capmt->capmt_cond, 0); fin: pthread_mutex_unlock(&capmt->capmt_mutex); @@ -2245,7 +2242,7 @@ capmt_conf_changed(caclient_t *cac) } pthread_mutex_lock(&capmt->capmt_mutex); capmt->capmt_reconfigure = 1; - pthread_cond_signal(&capmt->capmt_cond); + tvh_cond_signal(&capmt->capmt_cond, 0); pthread_mutex_unlock(&capmt->capmt_mutex); tvh_write(capmt->capmt_pipe.wr, "", 1); } else { @@ -2254,7 +2251,7 @@ capmt_conf_changed(caclient_t *cac) pthread_mutex_lock(&capmt->capmt_mutex); capmt->capmt_running = 0; capmt->capmt_reconfigure = 0; - pthread_cond_signal(&capmt->capmt_cond); + tvh_cond_signal(&capmt->capmt_cond, 0); tid = capmt->capmt_tid; pthread_mutex_unlock(&capmt->capmt_mutex); tvh_write(capmt->capmt_pipe.wr, "", 1); @@ -2319,7 +2316,7 @@ caclient_t *capmt_create(void) capmt_t *capmt = calloc(1, sizeof(*capmt)); pthread_mutex_init(&capmt->capmt_mutex, NULL); - pthread_cond_init(&capmt->capmt_cond, NULL); + tvh_cond_init(&capmt->capmt_cond); TAILQ_INIT(&capmt->capmt_writeq); tvh_pipe(O_NONBLOCK, &capmt->capmt_pipe); diff --git a/src/descrambler/cwc.c b/src/descrambler/cwc.c index 6cbc0cdd5..9f15631f4 100644 --- a/src/descrambler/cwc.c +++ b/src/descrambler/cwc.c @@ -198,11 +198,11 @@ typedef struct cwc { pthread_t cwc_tid; - pthread_cond_t cwc_cond; + tvh_cond_t cwc_cond; pthread_mutex_t cwc_mutex; pthread_mutex_t cwc_writer_mutex; - pthread_cond_t cwc_writer_cond; + tvh_cond_t cwc_writer_cond; int cwc_writer_running; struct cwc_message_queue cwc_writeq; @@ -443,7 +443,7 @@ cwc_send_msg(cwc_t *cwc, const uint8_t *msg, size_t len, int sid, int enq, uint1 cm->cm_len = len; pthread_mutex_lock(&cwc->cwc_writer_mutex); TAILQ_INSERT_TAIL(&cwc->cwc_writeq, cm, cm_link); - pthread_cond_signal(&cwc->cwc_writer_cond); + tvh_cond_signal(&cwc->cwc_writer_cond, 0); pthread_mutex_unlock(&cwc->cwc_writer_mutex); } else { if (tvh_write(cwc->cwc_fd, buf, len)) @@ -1046,7 +1046,6 @@ cwc_writer_thread(void *aux) { cwc_t *cwc = aux; cwc_message_t *cm; - struct timespec ts; int r; pthread_mutex_lock(&cwc->cwc_writer_mutex); @@ -1068,10 +1067,9 @@ cwc_writer_thread(void *aux) /* If nothing is to be sent in CWC_KEEPALIVE_INTERVAL seconds we need to send a keepalive */ - ts.tv_sec = time(NULL) + CWC_KEEPALIVE_INTERVAL; - ts.tv_nsec = 0; - r = pthread_cond_timedwait(&cwc->cwc_writer_cond, - &cwc->cwc_writer_mutex, &ts); + r = tvh_cond_timedwait(&cwc->cwc_writer_cond, + &cwc->cwc_writer_mutex, + getmonoclock() + CWC_KEEPALIVE_INTERVAL * MONOCLOCK_RESOLUTION); if(r == ETIMEDOUT) cwc_send_ka(cwc); } @@ -1146,7 +1144,7 @@ cwc_session(cwc_t *cwc) * We do all requests from now on in a separate thread */ cwc->cwc_writer_running = 1; - pthread_cond_init(&cwc->cwc_writer_cond, NULL); + tvh_cond_init(&cwc->cwc_writer_cond); pthread_mutex_init(&cwc->cwc_writer_mutex, NULL); TAILQ_INIT(&cwc->cwc_writeq); tvhthread_create(&writer_thread_id, NULL, cwc_writer_thread, cwc, "cwc-writer"); @@ -1168,7 +1166,7 @@ cwc_session(cwc_t *cwc) */ shutdown(cwc->cwc_fd, SHUT_RDWR); cwc->cwc_writer_running = 0; - pthread_cond_signal(&cwc->cwc_writer_cond); + tvh_cond_signal(&cwc->cwc_writer_cond, 0); pthread_join(writer_thread_id, NULL); tvhlog(LOG_DEBUG, "cwc", "Write thread joined"); } @@ -1184,7 +1182,6 @@ cwc_thread(void *aux) char errbuf[100]; char hostname[256]; int port; - struct timespec ts; int attempts = 0; pthread_mutex_lock(&cwc->cwc_mutex); @@ -1240,14 +1237,13 @@ cwc_thread(void *aux) caclient_set_status((caclient_t *)cwc, CACLIENT_STATUS_DISCONNECTED); d = 3; - ts.tv_sec = time(NULL) + d; - ts.tv_nsec = 0; tvhlog(LOG_INFO, "cwc", "%s:%i: Automatic connection attempt in %d seconds", cwc->cwc_hostname, cwc->cwc_port, d-1); - pthread_cond_timedwait(&cwc->cwc_cond, &cwc->cwc_mutex, &ts); + tvh_cond_timedwait(&cwc->cwc_cond, &cwc->cwc_mutex, + getmonoclock() + d * MONOCLOCK_RESOLUTION); } tvhlog(LOG_INFO, "cwc", "%s:%i inactive", @@ -1737,14 +1733,14 @@ cwc_conf_changed(caclient_t *cac) cwc->cwc_reconfigure = 1; if(cwc->cwc_fd >= 0) shutdown(cwc->cwc_fd, SHUT_RDWR); - pthread_cond_signal(&cwc->cwc_cond); + tvh_cond_signal(&cwc->cwc_cond, 0); pthread_mutex_unlock(&cwc->cwc_mutex); } else { if (!cwc->cwc_running) return; pthread_mutex_lock(&cwc->cwc_mutex); cwc->cwc_running = 0; - pthread_cond_signal(&cwc->cwc_cond); + tvh_cond_signal(&cwc->cwc_cond, 0); tid = cwc->cwc_tid; if (cwc->cwc_fd >= 0) shutdown(cwc->cwc_fd, SHUT_RDWR); @@ -1872,7 +1868,7 @@ caclient_t *cwc_create(void) cwc_t *cwc = calloc(1, sizeof(*cwc)); pthread_mutex_init(&cwc->cwc_mutex, NULL); - pthread_cond_init(&cwc->cwc_cond, NULL); + tvh_cond_init(&cwc->cwc_cond); cwc->cac_free = cwc_free; cwc->cac_start = cwc_service_start; cwc->cac_conf_changed = cwc_conf_changed; diff --git a/src/dvr/dvr_rec.c b/src/dvr/dvr_rec.c index f0a1290f2..b2460dc0d 100644 --- a/src/dvr/dvr_rec.c +++ b/src/dvr/dvr_rec.c @@ -1266,7 +1266,7 @@ dvr_thread(void *aux) while(run) { sm = TAILQ_FIRST(&sq->sq_queue); if(sm == NULL) { - pthread_cond_wait(&sq->sq_cond, &sq->sq_mutex); + tvh_cond_wait(&sq->sq_cond, &sq->sq_mutex); continue; } streaming_queue_remove(sq, sm); diff --git a/src/htsp_server.c b/src/htsp_server.c index fa1e1b43d..1d4f9ff1a 100644 --- a/src/htsp_server.c +++ b/src/htsp_server.c @@ -164,7 +164,7 @@ typedef struct htsp_connection { struct htsp_msg_q_queue htsp_active_output_queues; pthread_mutex_t htsp_out_mutex; - pthread_cond_t htsp_out_cond; + tvh_cond_t htsp_out_cond; htsp_msg_q_t htsp_hmq_ctrl; htsp_msg_q_t htsp_hmq_epg; @@ -415,7 +415,7 @@ htsp_send(htsp_connection_t *htsp, htsmsg_t *m, pktbuf_t *pb, hmq->hmq_length++; hmq->hmq_payload += payloadsize; - pthread_cond_signal(&htsp->htsp_out_cond); + tvh_cond_signal(&htsp->htsp_out_cond, 0); pthread_mutex_unlock(&htsp->htsp_out_mutex); } @@ -3150,7 +3150,7 @@ htsp_write_scheduler(void *aux) if((hmq = TAILQ_FIRST(&htsp->htsp_active_output_queues)) == NULL) { /* Nothing to be done, go to sleep */ - pthread_cond_wait(&htsp->htsp_out_cond, &htsp->htsp_out_mutex); + tvh_cond_wait(&htsp->htsp_out_cond, &htsp->htsp_out_mutex); continue; } @@ -3268,7 +3268,7 @@ htsp_serve(int fd, void **opaque, struct sockaddr_storage *source, pthread_mutex_lock(&htsp.htsp_out_mutex); htsp.htsp_writer_run = 0; - pthread_cond_signal(&htsp.htsp_out_cond); + tvh_cond_signal(&htsp.htsp_out_cond, 0); pthread_mutex_unlock(&htsp.htsp_out_mutex); pthread_join(htsp.htsp_writer_thread, NULL); diff --git a/src/httpc.c b/src/httpc.c index f736aac0f..1fe13cd43 100644 --- a/src/httpc.c +++ b/src/httpc.c @@ -85,7 +85,7 @@ static int http_running; static tvhpoll_t *http_poll; static TAILQ_HEAD(,http_client) http_clients; static pthread_mutex_t http_lock; -static pthread_cond_t http_cond; +static tvh_cond_t http_cond; static th_pipe_t http_pipe; static char *http_user_agent; @@ -1352,7 +1352,7 @@ http_client_thread ( void *p ) continue; } if (hc->hc_shutdown_wait) { - pthread_cond_broadcast(&http_cond); + tvh_cond_signal(&http_cond, 1); /* Disable the poll looping for this moment */ http_client_poll_dir(hc, 0, 0); pthread_mutex_unlock(&http_lock); @@ -1364,7 +1364,7 @@ http_client_thread ( void *p ) pthread_mutex_lock(&http_lock); hc->hc_running = 0; if (hc->hc_shutdown_wait) - pthread_cond_broadcast(&http_cond); + tvh_cond_signal(&http_cond, 1); pthread_mutex_unlock(&http_lock); } } @@ -1529,7 +1529,7 @@ http_client_close ( http_client_t *hc ) pthread_mutex_lock(&http_lock); hc->hc_shutdown_wait = 1; while (hc->hc_running) - pthread_cond_wait(&http_cond, &http_lock); + tvh_cond_wait(&http_cond, &http_lock); if (hc->hc_efd) { memset(&ev, 0, sizeof(ev)); ev.fd = hc->hc_fd; @@ -1572,7 +1572,7 @@ http_client_init ( const char *user_agent ) /* Setup list */ pthread_mutex_init(&http_lock, NULL); - pthread_cond_init(&http_cond, NULL); + tvh_cond_init(&http_cond); TAILQ_INIT(&http_clients); /* Setup pipe */ diff --git a/src/idnode.c b/src/idnode.c index 9c00319d0..2e97f0db3 100644 --- a/src/idnode.c +++ b/src/idnode.c @@ -46,7 +46,7 @@ static RB_HEAD(,idclass_link) idclasses; static RB_HEAD(,idclass_link) idrootclasses; static TAILQ_HEAD(,idnode_save) idnodes_save; -pthread_cond_t save_cond; +tvh_cond_t save_cond; pthread_t save_tid; static int save_running; static gtimer_t save_timer; @@ -1096,7 +1096,7 @@ idnode_savefn ( idnode_t *self, char *filename, size_t fsize ) static void idnode_save_trigger_thread_cb( void *aux ) { - pthread_cond_signal(&save_cond); + tvh_cond_signal(&save_cond, 0); } static void @@ -1703,7 +1703,7 @@ save_thread ( void *aux ) if (ise) gtimer_arm(&save_timer, idnode_save_trigger_thread_cb, NULL, (ise->ise_reqtime + IDNODE_SAVE_DELAY) - dispatch_clock); - pthread_cond_wait(&save_cond, &global_lock); + tvh_cond_wait(&save_cond, &global_lock); continue; } m = idnode_savefn(ise->ise_node, filename, sizeof(filename)); @@ -1749,7 +1749,7 @@ idnode_init(void) RB_INIT(&idrootclasses); TAILQ_INIT(&idnodes_save); - pthread_cond_init(&save_cond, NULL); + tvh_cond_init(&save_cond); save_running = 1; tvhthread_create(&save_tid, NULL, save_thread, NULL, "save"); } @@ -1760,7 +1760,7 @@ idnode_done(void) idclass_link_t *il; save_running = 0; - pthread_cond_signal(&save_cond); + tvh_cond_signal(&save_cond, 0); pthread_join(save_tid, NULL); gtimer_disarm(&save_timer); diff --git a/src/imagecache.c b/src/imagecache.c index 4ad03b8f5..bf9c5440f 100644 --- a/src/imagecache.c +++ b/src/imagecache.c @@ -117,7 +117,7 @@ const idclass_t imagecache_class = { } }; -static pthread_cond_t imagecache_cond; +static tvh_cond_t imagecache_cond; static TAILQ_HEAD(, imagecache_image) imagecache_queue; static gtimer_t imagecache_timer; #endif @@ -179,7 +179,7 @@ imagecache_image_add ( imagecache_image_t *img ) if (strncasecmp("file://", img->url, 7)) { img->state = QUEUED; TAILQ_INSERT_TAIL(&imagecache_queue, img, q_link); - pthread_cond_broadcast(&imagecache_cond); + tvh_cond_signal(&imagecache_cond, 1); } else { time(&img->updated); } @@ -330,7 +330,7 @@ error: } else { tvhdebug("imagecache", "downloaded %s", img->url); } - pthread_cond_broadcast(&imagecache_cond); + tvh_cond_signal(&imagecache_cond, 1); return res; }; @@ -345,13 +345,13 @@ imagecache_thread ( void *p ) /* Check we're enabled */ if (!imagecache_conf.enabled) { - pthread_cond_wait(&imagecache_cond, &global_lock); + tvh_cond_wait(&imagecache_cond, &global_lock); continue; } /* Get entry */ if (!(img = TAILQ_FIRST(&imagecache_queue))) { - pthread_cond_wait(&imagecache_cond, &global_lock); + tvh_cond_wait(&imagecache_cond, &global_lock); continue; } @@ -412,7 +412,7 @@ imagecache_init ( void ) /* Create threads */ #if ENABLE_IMAGECACHE - pthread_cond_init(&imagecache_cond, NULL); + tvh_cond_init(&imagecache_cond); TAILQ_INIT(&imagecache_queue); #endif @@ -493,7 +493,7 @@ imagecache_done ( void ) imagecache_image_t *img; #if ENABLE_IMAGECACHE - pthread_cond_broadcast(&imagecache_cond); + tvh_cond_signal(&imagecache_cond, 1); pthread_join(imagecache_tid, NULL); #endif while ((img = RB_FIRST(&imagecache_by_id)) != NULL) @@ -513,7 +513,7 @@ imagecache_save ( idnode_t *self, char *filename, size_t fsize ) htsmsg_t *c = htsmsg_create_map(); idnode_save(&imagecache_conf.idnode, c); snprintf(filename, fsize, "imagecache/config"); - pthread_cond_broadcast(&imagecache_cond); + tvh_cond_signal(&imagecache_cond, 1); return c; } @@ -660,17 +660,14 @@ imagecache_open ( uint32_t id ) #if ENABLE_IMAGECACHE else if (imagecache_conf.enabled) { int e; - struct timespec ts; /* Use existing */ if (i->updated) { /* Wait */ } else if (i->state == FETCHING) { - time(&ts.tv_sec); - ts.tv_nsec = 0; - ts.tv_sec += 5; - e = pthread_cond_timedwait(&imagecache_cond, &global_lock, &ts); + e = tvh_cond_timedwait(&imagecache_cond, &global_lock, + getmonoclock() + 5 * MONOCLOCK_RESOLUTION); if (e == ETIMEDOUT) return -1; diff --git a/src/input/mpegts.h b/src/input/mpegts.h index f644e3913..252f4397c 100644 --- a/src/input/mpegts.h +++ b/src/input/mpegts.h @@ -683,7 +683,7 @@ struct mpegts_input // Note: this section is protected by mi_input_lock pthread_t mi_input_tid; pthread_mutex_t mi_input_lock; - pthread_cond_t mi_input_cond; + tvh_cond_t mi_input_cond; TAILQ_HEAD(,mpegts_packet) mi_input_queue; /* Data processing/output */ @@ -696,7 +696,7 @@ struct mpegts_input /* Table processing */ pthread_t mi_table_tid; - pthread_cond_t mi_table_cond; + tvh_cond_t mi_table_cond; mpegts_table_feed_queue_t mi_table_queue; /* DBus */ diff --git a/src/input/mpegts/iptv/iptv_file.c b/src/input/mpegts/iptv/iptv_file.c index eaff540e9..6470482e9 100644 --- a/src/input/mpegts/iptv/iptv_file.c +++ b/src/input/mpegts/iptv/iptv_file.c @@ -31,7 +31,7 @@ typedef struct file_priv { int fd; int shutdown; pthread_t tid; - pthread_cond_t cond; + tvh_cond_t cond; } file_priv_t; /* @@ -47,6 +47,7 @@ iptv_file_thread ( void *aux ) int fd = fp->fd, pause = 0; char buf[32*1024]; off_t off = 0; + int n; #if defined(PLATFORM_DARWIN) fcntl(fd, F_NOCACHE, 1); @@ -56,7 +57,9 @@ iptv_file_thread ( void *aux ) while (!fp->shutdown && pause) { clock_gettime(CLOCK_REALTIME, &ts); ts.tv_sec += 1; - if (pthread_cond_timedwait(&fp->cond, &iptv_lock, &ts) == ETIMEDOUT) + n = tvh_cond_timedwait(&fp->cond, &iptv_lock, + getmonoclock() + 1 * MONOCLOCK_RESOLUTION); + if (n == ETIMEDOUT) break; } if (fp->shutdown) @@ -102,7 +105,7 @@ iptv_file_start ( iptv_mux_t *im, const char *raw, const url_t *url ) fp = calloc(1, sizeof(*fp)); fp->fd = fd; - pthread_cond_init(&fp->cond, NULL); + tvh_cond_init(&fp->cond); im->im_data = fp; iptv_input_mux_started(im); tvhthread_create(&fp->tid, NULL, iptv_file_thread, im, "iptvfile"); @@ -118,10 +121,10 @@ iptv_file_stop if (rd > 0) close(rd); fp->shutdown = 1; - pthread_cond_signal(&fp->cond); + tvh_cond_signal(&fp->cond, 0); pthread_mutex_unlock(&iptv_lock); pthread_join(fp->tid, NULL); - pthread_cond_destroy(&fp->cond); + tvh_cond_destroy(&fp->cond); pthread_mutex_lock(&iptv_lock); free(im->im_data); im->im_data = NULL; diff --git a/src/input/mpegts/linuxdvb/linuxdvb_frontend.c b/src/input/mpegts/linuxdvb/linuxdvb_frontend.c index 805c52d4d..e34fb01d3 100644 --- a/src/input/mpegts/linuxdvb/linuxdvb_frontend.c +++ b/src/input/mpegts/linuxdvb/linuxdvb_frontend.c @@ -772,7 +772,7 @@ linuxdvb_frontend_monitor ( void *aux ) pthread_mutex_lock(&lfe->lfe_dvr_lock); tvhthread_create(&lfe->lfe_dvr_thread, NULL, linuxdvb_frontend_input_thread, lfe, "lnxdvb-front"); - pthread_cond_wait(&lfe->lfe_dvr_cond, &lfe->lfe_dvr_lock); + tvh_cond_wait(&lfe->lfe_dvr_cond, &lfe->lfe_dvr_lock); pthread_mutex_unlock(&lfe->lfe_dvr_lock); /* Table handlers */ @@ -1167,7 +1167,7 @@ linuxdvb_frontend_input_thread ( void *aux ) pthread_mutex_lock(&lfe->lfe_dvr_lock); lfe->mi_display_name((mpegts_input_t*)lfe, name, sizeof(name)); mmi = LIST_FIRST(&lfe->mi_mux_active); - pthread_cond_signal(&lfe->lfe_dvr_cond); + tvh_cond_signal(&lfe->lfe_dvr_cond, 0); pthread_mutex_unlock(&lfe->lfe_dvr_lock); if (mmi == NULL) return NULL; @@ -1953,7 +1953,7 @@ linuxdvb_frontend_create /* DVR lock/cond */ pthread_mutex_init(&lfe->lfe_dvr_lock, NULL); - pthread_cond_init(&lfe->lfe_dvr_cond, NULL); + tvh_cond_init(&lfe->lfe_dvr_cond); mpegts_pid_init(&lfe->lfe_pids); /* Create satconf */ diff --git a/src/input/mpegts/linuxdvb/linuxdvb_private.h b/src/input/mpegts/linuxdvb/linuxdvb_private.h index 09b5bcf09..e1300f55e 100644 --- a/src/input/mpegts/linuxdvb/linuxdvb_private.h +++ b/src/input/mpegts/linuxdvb/linuxdvb_private.h @@ -126,7 +126,7 @@ struct linuxdvb_frontend pthread_t lfe_dvr_thread; th_pipe_t lfe_dvr_pipe; pthread_mutex_t lfe_dvr_lock; - pthread_cond_t lfe_dvr_cond; + tvh_cond_t lfe_dvr_cond; mpegts_apids_t lfe_pids; int lfe_pids_max; diff --git a/src/input/mpegts/mpegts_input.c b/src/input/mpegts/mpegts_input.c index 51c4cb950..2e4eae20f 100644 --- a/src/input/mpegts/mpegts_input.c +++ b/src/input/mpegts/mpegts_input.c @@ -1118,7 +1118,7 @@ mpegts_input_recv_packets pthread_mutex_lock(&mi->mi_input_lock); if (mmi->mmi_mux->mm_active == mmi) { TAILQ_INSERT_TAIL(&mi->mi_input_queue, mp, mp_link); - pthread_cond_signal(&mi->mi_input_cond); + tvh_cond_signal(&mi->mi_input_cond, 0); } else { free(mp); } @@ -1434,7 +1434,7 @@ done: /* Wake table */ if (table_wakeup) - pthread_cond_signal(&mi->mi_table_cond); + tvh_cond_signal(&mi->mi_table_cond, 0); /* Bandwidth monitoring */ llen = tsb - mpkt->mp_data; @@ -1460,7 +1460,7 @@ mpegts_input_thread ( void * p ) tvhtrace("mpegts", "input %s got %zu bytes", buf, bytes); bytes = 0; } - pthread_cond_wait(&mi->mi_input_cond, &mi->mi_input_lock); + tvh_cond_wait(&mi->mi_input_cond, &mi->mi_input_lock); continue; } TAILQ_REMOVE(&mi->mi_input_queue, mp, mp_link); @@ -1522,7 +1522,7 @@ mpegts_input_table_thread ( void *aux ) /* Wait for data */ if (!(mtf = TAILQ_FIRST(&mi->mi_table_queue))) { - pthread_cond_wait(&mi->mi_table_cond, &mi->mi_output_lock); + tvh_cond_wait(&mi->mi_table_cond, &mi->mi_output_lock); continue; } TAILQ_REMOVE(&mi->mi_table_queue, mtf, mtf_link); @@ -1699,12 +1699,12 @@ mpegts_input_thread_stop ( mpegts_input_t *mi ) /* Stop input thread */ pthread_mutex_lock(&mi->mi_input_lock); - pthread_cond_signal(&mi->mi_input_cond); + tvh_cond_signal(&mi->mi_input_cond, 0); pthread_mutex_unlock(&mi->mi_input_lock); /* Stop table thread */ pthread_mutex_lock(&mi->mi_output_lock); - pthread_cond_signal(&mi->mi_table_cond); + tvh_cond_signal(&mi->mi_table_cond, 0); pthread_mutex_unlock(&mi->mi_output_lock); /* Join threads (relinquish lock due to potential deadlock) */ @@ -1787,11 +1787,11 @@ mpegts_input_create0 /* Init input/output structures */ pthread_mutex_init(&mi->mi_input_lock, NULL); - pthread_cond_init(&mi->mi_input_cond, NULL); + tvh_cond_init(&mi->mi_input_cond); TAILQ_INIT(&mi->mi_input_queue); pthread_mutex_init(&mi->mi_output_lock, NULL); - pthread_cond_init(&mi->mi_table_cond, NULL); + tvh_cond_init(&mi->mi_table_cond); TAILQ_INIT(&mi->mi_table_queue); /* Defaults */ @@ -1853,7 +1853,7 @@ mpegts_input_delete ( mpegts_input_t *mi, int delconf ) mpegts_input_thread_stop(mi); pthread_mutex_destroy(&mi->mi_output_lock); - pthread_cond_destroy(&mi->mi_table_cond); + tvh_cond_destroy(&mi->mi_table_cond); free(mi->mi_name); free(mi->mi_linked); free(mi); diff --git a/src/input/mpegts/tvhdhomerun/tvhdhomerun.c b/src/input/mpegts/tvhdhomerun/tvhdhomerun.c index a52c03bbd..49f88dfaf 100644 --- a/src/input/mpegts/tvhdhomerun/tvhdhomerun.c +++ b/src/input/mpegts/tvhdhomerun/tvhdhomerun.c @@ -78,7 +78,7 @@ static struct tvhdhomerun_discovery_queue tvhdhomerun_discoveries; static pthread_t tvhdhomerun_discovery_tid; static pthread_mutex_t tvhdhomerun_discovery_lock; -static pthread_cond_t tvhdhomerun_discovery_cond; +static tvh_cond_t tvhdhomerun_discovery_cond; static const char * tvhdhomerun_device_class_get_title( idnode_t *in, const char *lang ) @@ -346,8 +346,6 @@ static void * tvhdhomerun_device_discovery_thread( void *aux ) { struct hdhomerun_discover_device_t result_list[MAX_HDHOMERUN_DEVICES]; - struct timespec ts; - struct timeval tp; int numDevices, brk; while (tvheadend_running) { @@ -379,12 +377,9 @@ tvhdhomerun_device_discovery_thread( void *aux ) pthread_mutex_lock(&tvhdhomerun_discovery_lock); brk = 0; if (tvheadend_running) { - gettimeofday(&tp, NULL); - ts.tv_sec = tp.tv_sec + 15; - ts.tv_nsec = tp.tv_usec * 1000; - brk = pthread_cond_timedwait(&tvhdhomerun_discovery_cond, - &tvhdhomerun_discovery_lock, - &ts); + brk = tvh_cond_timedwait(&tvhdhomerun_discovery_cond, + &tvhdhomerun_discovery_lock, + getmonoclock() + 15 * MONOCLOCK_RESOLUTION); brk = !ERRNO_AGAIN(brk) && brk != ETIMEDOUT; } pthread_mutex_unlock(&tvhdhomerun_discovery_lock); @@ -406,7 +401,7 @@ void tvhdhomerun_init ( void ) } TAILQ_INIT(&tvhdhomerun_discoveries); pthread_mutex_init(&tvhdhomerun_discovery_lock, NULL); - pthread_cond_init(&tvhdhomerun_discovery_cond, NULL); + tvh_cond_init(&tvhdhomerun_discovery_cond); tvhthread_create(&tvhdhomerun_discovery_tid, NULL, tvhdhomerun_device_discovery_thread, NULL, "hdhm-disc"); @@ -418,7 +413,7 @@ void tvhdhomerun_done ( void ) tvhdhomerun_discovery_t *d, *nd; pthread_mutex_lock(&tvhdhomerun_discovery_lock); - pthread_cond_signal(&tvhdhomerun_discovery_cond); + tvh_cond_signal(&tvhdhomerun_discovery_cond, 0); pthread_mutex_unlock(&tvhdhomerun_discovery_lock); pthread_join(tvhdhomerun_discovery_tid, NULL); diff --git a/src/input/mpegts/tvhdhomerun/tvhdhomerun_frontend.c b/src/input/mpegts/tvhdhomerun/tvhdhomerun_frontend.c index fba6ef7dd..e149edd95 100644 --- a/src/input/mpegts/tvhdhomerun/tvhdhomerun_frontend.c +++ b/src/input/mpegts/tvhdhomerun/tvhdhomerun_frontend.c @@ -71,7 +71,7 @@ tvhdhomerun_frontend_input_thread ( void *aux ) pthread_mutex_lock(&hfe->hf_input_thread_mutex); hfe->mi_display_name((mpegts_input_t*)hfe, buf, sizeof(buf)); mmi = LIST_FIRST(&hfe->mi_mux_active); - pthread_cond_signal(&hfe->hf_input_thread_cond); + tvh_cond_signal(&hfe->hf_input_thread_cond, 0); pthread_mutex_unlock(&hfe->hf_input_thread_mutex); if (mmi == NULL) return NULL; @@ -255,7 +255,7 @@ tvhdhomerun_frontend_monitor_cb( void *aux ) tvh_pipe(O_NONBLOCK, &hfe->hf_input_thread_pipe); pthread_mutex_lock(&hfe->hf_input_thread_mutex); tvhthread_create(&hfe->hf_input_thread, NULL, tvhdhomerun_frontend_input_thread, hfe, "hdhm-front"); - pthread_cond_wait(&hfe->hf_input_thread_cond, &hfe->hf_input_thread_mutex); + tvh_cond_wait(&hfe->hf_input_thread_cond, &hfe->hf_input_thread_mutex); pthread_mutex_unlock(&hfe->hf_input_thread_mutex); /* install table handlers */ @@ -735,7 +735,7 @@ tvhdhomerun_frontend_create(tvhdhomerun_device_t *hd, struct hdhomerun_discover_ /* mutex init */ pthread_mutex_init(&hfe->hf_hdhomerun_device_mutex, NULL); pthread_mutex_init(&hfe->hf_input_thread_mutex, NULL); - pthread_cond_init(&hfe->hf_input_thread_cond, NULL); + tvh_cond_init(&hfe->hf_input_thread_cond); return hfe; } diff --git a/src/input/mpegts/tvhdhomerun/tvhdhomerun_private.h b/src/input/mpegts/tvhdhomerun/tvhdhomerun_private.h index bf000d6c4..58904af52 100644 --- a/src/input/mpegts/tvhdhomerun/tvhdhomerun_private.h +++ b/src/input/mpegts/tvhdhomerun/tvhdhomerun_private.h @@ -101,7 +101,7 @@ struct tvhdhomerun_frontend // input thread.. pthread_t hf_input_thread; pthread_mutex_t hf_input_thread_mutex; /* used in condition signaling */ - pthread_cond_t hf_input_thread_cond; /* used in condition signaling */ + tvh_cond_t hf_input_thread_cond; /* used in condition signaling */ th_pipe_t hf_input_thread_pipe; /* IPC with input thread */ uint8_t hf_input_thread_running; // Indicates if input_thread is running. uint8_t hf_input_thread_terminating; // Used for terminating the input_thread. diff --git a/src/main.c b/src/main.c index 339e084f8..641736214 100644 --- a/src/main.c +++ b/src/main.c @@ -176,7 +176,7 @@ pthread_mutex_t atomic_lock; static LIST_HEAD(, gtimer) gtimers; static pthread_cond_t gtimer_cond; static TAILQ_HEAD(, tasklet) tasklets; -static pthread_cond_t tasklet_cond; +static tvh_cond_t tasklet_cond; static pthread_t tasklet_tid; static void @@ -359,7 +359,7 @@ tasklet_arm(tasklet_t *tsk, tsk_callback_t *callback, void *opaque) TAILQ_INSERT_TAIL(&tasklets, tsk, tsk_link); if (TAILQ_FIRST(&tasklets) == tsk) - pthread_cond_signal(&tasklet_cond); + tvh_cond_signal(&tasklet_cond, 0); pthread_mutex_unlock(&tasklet_lock); } @@ -415,7 +415,7 @@ tasklet_thread ( void *aux ) while (tvheadend_running) { tsk = TAILQ_FIRST(&tasklets); if (tsk == NULL) { - pthread_cond_wait(&tasklet_cond, &tasklet_lock); + tvh_cond_wait(&tasklet_cond, &tasklet_lock); continue; } if (tsk->tsk_callback) { @@ -609,7 +609,7 @@ main(int argc, char **argv) pthread_mutex_init(&tasklet_lock, NULL); pthread_mutex_init(&atomic_lock, NULL); pthread_cond_init(>imer_cond, NULL); - pthread_cond_init(&tasklet_cond, NULL); + tvh_cond_init(&tasklet_cond); TAILQ_INIT(&tasklets); /* Defaults */ @@ -1157,7 +1157,7 @@ main(int argc, char **argv) tvhftrace("main", api_done); tvhtrace("main", "tasklet enter"); - pthread_cond_signal(&tasklet_cond); + tvh_cond_signal(&tasklet_cond, 0); pthread_join(tasklet_tid, NULL); tvhtrace("main", "tasklet thread end"); tasklet_flush(); diff --git a/src/notify.c b/src/notify.c index 79f26d2af..f039f1330 100644 --- a/src/notify.c +++ b/src/notify.c @@ -26,7 +26,7 @@ #include "notify.h" #include "webui/webui.h" -static pthread_cond_t notify_cond; +static tvh_cond_t notify_cond; static pthread_mutex_t notify_mutex; static htsmsg_t *notify_queue; static pthread_t notify_tid; @@ -75,7 +75,7 @@ notify_delayed(const char *id, const char *event, const char *action) if (strcmp(htsmsg_field_get_str(f) ?: "", id) == 0) goto skip; htsmsg_add_str(e, NULL, id); - pthread_cond_signal(¬ify_cond); + tvh_cond_signal(¬ify_cond, 0); skip: pthread_mutex_unlock(¬ify_mutex); } @@ -92,7 +92,7 @@ notify_thread ( void *p ) /* Get queue */ if (!notify_queue) { - pthread_cond_wait(¬ify_cond, ¬ify_mutex); + tvh_cond_wait(¬ify_cond, ¬ify_mutex); continue; } q = notify_queue; @@ -126,13 +126,13 @@ void notify_init( void ) { notify_queue = NULL; pthread_mutex_init(¬ify_mutex, NULL); - pthread_cond_init(¬ify_cond, NULL); + tvh_cond_init(¬ify_cond); tvhthread_create(¬ify_tid, NULL, notify_thread, NULL, "notify"); } void notify_done( void ) { - pthread_cond_signal(¬ify_cond); + tvh_cond_signal(¬ify_cond, 0); pthread_join(notify_tid, NULL); pthread_mutex_lock(¬ify_mutex); htsmsg_destroy(notify_queue); diff --git a/src/satip/rtp.c b/src/satip/rtp.c index f97972168..154bf3d0d 100644 --- a/src/satip/rtp.c +++ b/src/satip/rtp.c @@ -376,7 +376,7 @@ satip_rtp_thread(void *aux) continue; } } - pthread_cond_wait(&sq->sq_cond, &sq->sq_mutex); + tvh_cond_wait(&sq->sq_cond, &sq->sq_mutex); continue; } streaming_queue_remove(sq, sm); @@ -566,7 +566,7 @@ void satip_rtp_close(void *id) sq = rtp->sq; pthread_mutex_lock(&sq->sq_mutex); rtp->sq = NULL; - pthread_cond_signal(&sq->sq_cond); + tvh_cond_signal(&sq->sq_cond, 0); pthread_mutex_unlock(&sq->sq_mutex); pthread_mutex_unlock(&satip_rtp_lock); if (rtp->port == RTSP_TCP_DATA) diff --git a/src/service.c b/src/service.c index 6230df9f4..75316c8bb 100644 --- a/src/service.c +++ b/src/service.c @@ -964,7 +964,6 @@ service_create0 TAILQ_INSERT_TAIL(&service_all, t, s_all_link); pthread_mutex_init(&t->s_stream_mutex, NULL); - pthread_cond_init(&t->s_tss_cond, NULL); t->s_type = service_type; t->s_type_user = ST_UNSET; t->s_source_type = source_type; @@ -1264,8 +1263,6 @@ service_send_streaming_status(service_t *t) streaming_pad_deliver(&t->s_streaming_pad, streaming_msg_create_code(SMT_SERVICE_STATUS, t->s_streaming_status)); - - pthread_cond_broadcast(&t->s_tss_cond); } /** @@ -1398,7 +1395,7 @@ service_build_stream_start(service_t *t) */ static pthread_mutex_t pending_save_mutex; -static pthread_cond_t pending_save_cond; +static tvh_cond_t pending_save_cond; static struct service_queue pending_save_queue; /** @@ -1416,7 +1413,7 @@ service_request_save(service_t *t, int restart) t->s_ps_onqueue = 1 + !!restart; TAILQ_INSERT_TAIL(&pending_save_queue, t, s_ps_link); service_ref(t); - pthread_cond_signal(&pending_save_cond); + tvh_cond_signal(&pending_save_cond, 0); } else if (restart) { t->s_ps_onqueue = 2; // upgrade to restart too } @@ -1460,7 +1457,7 @@ service_saver(void *aux) while(tvheadend_running) { if((t = TAILQ_FIRST(&pending_save_queue)) == NULL) { - pthread_cond_wait(&pending_save_cond, &pending_save_mutex); + tvh_cond_wait(&pending_save_cond, &pending_save_mutex); continue; } assert(t->s_ps_onqueue != 0); @@ -1500,7 +1497,7 @@ service_init(void) TAILQ_INIT(&service_raw_all); TAILQ_INIT(&service_raw_remove); pthread_mutex_init(&pending_save_mutex, NULL); - pthread_cond_init(&pending_save_cond, NULL); + tvh_cond_init(&pending_save_cond); tvhthread_create(&service_saver_tid, NULL, service_saver, NULL, "service"); } @@ -1509,7 +1506,9 @@ service_done(void) { service_t *t; - pthread_cond_signal(&pending_save_cond); + pthread_mutex_lock(&pending_save_mutex); + tvh_cond_signal(&pending_save_cond, 0); + pthread_mutex_unlock(&pending_save_mutex); pthread_join(service_saver_tid, NULL); pthread_mutex_lock(&global_lock); diff --git a/src/service.h b/src/service.h index 3c28659be..de13a8607 100644 --- a/src/service.h +++ b/src/service.h @@ -411,12 +411,6 @@ typedef struct service { */ pthread_mutex_t s_stream_mutex; - - /** - * Condition variable to singal when streaming_status changes - * interlocked with s_stream_mutex - */ - pthread_cond_t s_tss_cond; /** * */ diff --git a/src/service_mapper.c b/src/service_mapper.c index 7b63324e8..b3f1a73cc 100644 --- a/src/service_mapper.c +++ b/src/service_mapper.c @@ -41,7 +41,7 @@ typedef struct service_mapper_item { } service_mapper_item_t; static service_mapper_status_t service_mapper_stat; -static pthread_cond_t service_mapper_cond; +static tvh_cond_t service_mapper_cond; static TAILQ_HEAD(, service_mapper_item) service_mapper_queue; service_mapper_t service_mapper_conf; @@ -132,7 +132,7 @@ service_mapper_start ( const service_mapper_conf_t *conf, htsmsg_t *uuids ) api_service_mapper_notify(); /* Signal */ - if (qd) pthread_cond_signal(&service_mapper_cond); + if (qd) tvh_cond_signal(&service_mapper_cond, 0); } /* @@ -312,7 +312,7 @@ service_mapper_thread ( void *aux ) working = 0; tvhinfo("service_mapper", "idle"); } - pthread_cond_wait(&service_mapper_cond, &global_lock); + tvh_cond_wait(&service_mapper_cond, &global_lock); if (!tvheadend_running) break; } @@ -354,7 +354,7 @@ service_mapper_thread ( void *aux ) /* Wait for message */ while((sm = TAILQ_FIRST(&sq->sq_queue)) == NULL) { - pthread_cond_wait(&sq->sq_cond, &sq->sq_mutex); + tvh_cond_wait(&sq->sq_cond, &sq->sq_mutex); if (!tvheadend_running) break; } @@ -557,7 +557,7 @@ void service_mapper_init ( void ) htsmsg_t *m; TAILQ_INIT(&service_mapper_queue); - pthread_cond_init(&service_mapper_cond, NULL); + tvh_cond_init(&service_mapper_cond); tvhthread_create(&service_mapper_tid, NULL, service_mapper_thread, NULL, "svcmap"); /* Defaults */ @@ -576,7 +576,7 @@ void service_mapper_init ( void ) void service_mapper_done ( void ) { - pthread_cond_signal(&service_mapper_cond); + tvh_cond_signal(&service_mapper_cond, 0); pthread_join(service_mapper_tid, NULL); htsmsg_destroy(service_mapper_conf.services); service_mapper_conf.services = NULL; diff --git a/src/streaming.c b/src/streaming.c index 319e92473..d5273b60a 100644 --- a/src/streaming.c +++ b/src/streaming.c @@ -82,7 +82,7 @@ streaming_queue_deliver(void *opauqe, streaming_message_t *sm) sq->sq_size += streaming_message_data_size(sm); } - pthread_cond_signal(&sq->sq_cond); + tvh_cond_signal(&sq->sq_cond, 0); pthread_mutex_unlock(&sq->sq_mutex); } @@ -105,7 +105,7 @@ streaming_queue_init(streaming_queue_t *sq, int reject_filter, size_t maxsize) streaming_target_init(&sq->sq_st, streaming_queue_deliver, sq, reject_filter); pthread_mutex_init(&sq->sq_mutex, NULL); - pthread_cond_init(&sq->sq_cond, NULL); + tvh_cond_init(&sq->sq_cond); TAILQ_INIT(&sq->sq_queue); sq->sq_maxsize = maxsize; @@ -121,7 +121,7 @@ streaming_queue_deinit(streaming_queue_t *sq) sq->sq_size = 0; streaming_queue_clear(&sq->sq_queue); pthread_mutex_destroy(&sq->sq_mutex); - pthread_cond_destroy(&sq->sq_cond); + tvh_cond_destroy(&sq->sq_cond); } /** diff --git a/src/timeshift/timeshift_filemgr.c b/src/timeshift/timeshift_filemgr.c index 0af05b942..6613168a1 100644 --- a/src/timeshift/timeshift_filemgr.c +++ b/src/timeshift/timeshift_filemgr.c @@ -36,7 +36,7 @@ static int timeshift_reaper_run; static timeshift_file_list_t timeshift_reaper_list; static pthread_t timeshift_reaper_thread; static pthread_mutex_t timeshift_reaper_lock; -static pthread_cond_t timeshift_reaper_cond; +static tvh_cond_t timeshift_reaper_cond; uint64_t timeshift_total_size; uint64_t timeshift_total_ram_size; @@ -58,7 +58,7 @@ static void* timeshift_reaper_callback ( void *p ) /* Get next */ tsf = TAILQ_FIRST(×hift_reaper_list); if (!tsf) { - pthread_cond_wait(×hift_reaper_cond, ×hift_reaper_lock); + tvh_cond_wait(×hift_reaper_cond, ×hift_reaper_lock); continue; } TAILQ_REMOVE(×hift_reaper_list, tsf, link); @@ -111,7 +111,7 @@ static void timeshift_reaper_remove ( timeshift_file_t *tsf ) } pthread_mutex_lock(×hift_reaper_lock); TAILQ_INSERT_TAIL(×hift_reaper_list, tsf, link); - pthread_cond_signal(×hift_reaper_cond); + tvh_cond_signal(×hift_reaper_cond, 0); pthread_mutex_unlock(×hift_reaper_lock); } @@ -239,7 +239,7 @@ static timeshift_file_t * timeshift_filemgr_file_init timeshift_file_t *tsf; tsf = calloc(1, sizeof(timeshift_file_t)); - tsf->time = start_time / (1000000LL * TIMESHIFT_FILE_PERIOD); + tsf->time = start_time / (MONOCLOCK_RESOLUTION * TIMESHIFT_FILE_PERIOD); tsf->last = start_time; tsf->wfd = -1; tsf->rfd = -1; @@ -272,7 +272,7 @@ timeshift_file_t *timeshift_filemgr_get ( timeshift_t *ts, int64_t start_time ) /* Store to file */ tsf_tl = TAILQ_LAST(&ts->files, timeshift_file_list); - time = start_time / (1000000LL * TIMESHIFT_FILE_PERIOD); + time = start_time / (MONOCLOCK_RESOLUTION * TIMESHIFT_FILE_PERIOD); if (!tsf_tl || tsf_tl->time < time || (tsf_tl->ram && tsf_tl->woff >= timeshift_conf.ram_segment_size)) { tsf_hd = TAILQ_FIRST(&ts->files); @@ -450,7 +450,7 @@ void timeshift_filemgr_init ( void ) /* Start the reaper thread */ timeshift_reaper_run = 1; pthread_mutex_init(×hift_reaper_lock, NULL); - pthread_cond_init(×hift_reaper_cond, NULL); + tvh_cond_init(×hift_reaper_cond); TAILQ_INIT(×hift_reaper_list); tvhthread_create(×hift_reaper_thread, NULL, timeshift_reaper_callback, NULL, "tshift-reap"); @@ -466,7 +466,7 @@ void timeshift_filemgr_term ( void ) /* Wait for thread */ pthread_mutex_lock(×hift_reaper_lock); timeshift_reaper_run = 0; - pthread_cond_signal(×hift_reaper_cond); + tvh_cond_signal(×hift_reaper_cond, 0); pthread_mutex_unlock(×hift_reaper_lock); pthread_join(timeshift_reaper_thread, NULL); diff --git a/src/timeshift/timeshift_reader.c b/src/timeshift/timeshift_reader.c index 4e625518c..3f512d700 100644 --- a/src/timeshift/timeshift_reader.c +++ b/src/timeshift/timeshift_reader.c @@ -281,7 +281,7 @@ static int _timeshift_skip { timeshift_index_iframe_t *tsi = seek->frame; timeshift_file_t *tsf = seek->file, *tsf_last; - int64_t sec = req_time / (1000000 * TIMESHIFT_FILE_PERIOD); + int64_t sec = req_time / (MONOCLOCK_RESOLUTION * TIMESHIFT_FILE_PERIOD); int back = (req_time < cur_time) ? 1 : 0; int end = 0; @@ -773,7 +773,7 @@ void *timeshift_reader ( void *p ) /* Done */ if (!run || !seek->file || ((ts->state != TS_PLAY && !skip))) { - if (mono_now >= (mono_last_status + 1000000)) { + if (mono_now >= (mono_last_status + MONOCLOCK_RESOLUTION)) { timeshift_status(ts, last_time); mono_last_status = mono_now; } @@ -865,7 +865,7 @@ void *timeshift_reader ( void *p ) } /* Periodic timeshift status */ - if (mono_now >= (mono_last_status + 1000000)) { + if (mono_now >= (mono_last_status + MONOCLOCK_RESOLUTION)) { timeshift_status(ts, last_time); mono_last_status = mono_now; } diff --git a/src/timeshift/timeshift_writer.c b/src/timeshift/timeshift_writer.c index e5c47cf67..37eaa0d9f 100644 --- a/src/timeshift/timeshift_writer.c +++ b/src/timeshift/timeshift_writer.c @@ -398,7 +398,7 @@ void *timeshift_writer ( void *aux ) /* Get message */ sm = TAILQ_FIRST(&sq->sq_queue); if (sm == NULL) { - pthread_cond_wait(&sq->sq_cond, &sq->sq_mutex); + tvh_cond_wait(&sq->sq_cond, &sq->sq_mutex); continue; } streaming_queue_remove(sq, sm); diff --git a/src/tvheadend.h b/src/tvheadend.h index 7e4e0d6ea..873dc79fa 100644 --- a/src/tvheadend.h +++ b/src/tvheadend.h @@ -112,6 +112,14 @@ lock_assert0(pthread_mutex_t *l, const char *file, int line) #define lock_assert(l) lock_assert0(l, __FILE__, __LINE__) +/* + * + */ + +typedef struct { + pthread_cond_t cond; +} tvh_cond_t; + /* * Commercial status @@ -563,7 +571,7 @@ typedef struct streaming_queue { streaming_target_t sq_st; pthread_mutex_t sq_mutex; /* Protects sp_queue */ - pthread_cond_t sq_cond; /* Condvar for signalling new packets */ + tvh_cond_t sq_cond; /* Condvar for signalling new packets */ size_t sq_maxsize; /* Max queue size (bytes) */ size_t sq_size; /* Actual queue size (bytes) - only data */ @@ -624,6 +632,8 @@ static inline int clock_gettime(int clk_id, struct timespec* t) { } #endif +#define MONOCLOCK_RESOLUTION 1000000LL /* microseconds */ + static inline int64_t getmonoclock(void) { @@ -631,7 +641,8 @@ getmonoclock(void) clock_gettime(CLOCK_MONOTONIC_COARSE, &tp); - return tp.tv_sec * 1000000ULL + (tp.tv_nsec / 1000); + return tp.tv_sec * MONOCLOCK_RESOLUTION + + (tp.tv_nsec / (1000000000LL/MONOCLOCK_RESOLUTION)); } int sri_to_rate(int sri); @@ -689,6 +700,16 @@ int tvhtread_renice(int value); int tvh_mutex_timedlock(pthread_mutex_t *mutex, int64_t usec); +int tvh_cond_init(tvh_cond_t *cond); + +int tvh_cond_destroy(tvh_cond_t *cond); + +int tvh_cond_signal(tvh_cond_t *cond, int broadcast); + +int tvh_cond_wait(tvh_cond_t *cond, pthread_mutex_t *mutex); + +int tvh_cond_timedwait(tvh_cond_t *cond, pthread_mutex_t *mutex, int64_t monoclock); + int tvh_open(const char *pathname, int flags, mode_t mode); int tvh_socket(int domain, int type, int protocol); diff --git a/src/tvhlog.c b/src/tvhlog.c index d265ff613..7831121b4 100644 --- a/src/tvhlog.c +++ b/src/tvhlog.c @@ -40,7 +40,7 @@ htsmsg_t *tvhlog_debug; htsmsg_t *tvhlog_trace; pthread_t tvhlog_tid; pthread_mutex_t tvhlog_mutex; -pthread_cond_t tvhlog_cond; +tvh_cond_t tvhlog_cond; TAILQ_HEAD(,tvhlog_msg) tvhlog_queue; int tvhlog_queue_size; int tvhlog_queue_full; @@ -237,7 +237,7 @@ tvhlog_thread ( void *p ) // but overall performance will be higher fp = NULL; } - pthread_cond_wait(&tvhlog_cond, &tvhlog_mutex); + tvh_cond_wait(&tvhlog_cond, &tvhlog_mutex); continue; } TAILQ_REMOVE(&tvhlog_queue, msg, link); @@ -336,7 +336,7 @@ void tvhlogv ( const char *file, int line, if (tvhlog_run) { TAILQ_INSERT_TAIL(&tvhlog_queue, msg, link); tvhlog_queue_size++; - pthread_cond_signal(&tvhlog_cond); + tvh_cond_signal(&tvhlog_cond, 0); } else { #endif FILE *fp = NULL; @@ -438,7 +438,7 @@ tvhlog_init ( int level, int options, const char *path ) tvhlog_run = 1; openlog("tvheadend", LOG_PID, LOG_DAEMON); pthread_mutex_init(&tvhlog_mutex, NULL); - pthread_cond_init(&tvhlog_cond, NULL); + tvh_cond_init(&tvhlog_cond); TAILQ_INIT(&tvhlog_queue); } @@ -455,7 +455,7 @@ tvhlog_end ( void ) tvhlog_msg_t *msg; pthread_mutex_lock(&tvhlog_mutex); tvhlog_run = 0; - pthread_cond_signal(&tvhlog_cond); + tvh_cond_signal(&tvhlog_cond, 0); pthread_mutex_unlock(&tvhlog_mutex); pthread_join(tvhlog_tid, NULL); pthread_mutex_lock(&tvhlog_mutex); diff --git a/src/webui/comet.c b/src/webui/comet.c index fded6f566..0f539b277 100644 --- a/src/webui/comet.c +++ b/src/webui/comet.c @@ -39,7 +39,7 @@ #include "tcp.h" static pthread_mutex_t comet_mutex = PTHREAD_MUTEX_INITIALIZER; -static pthread_cond_t comet_cond = PTHREAD_COND_INITIALIZER; +static tvh_cond_t comet_cond; #define MAILBOX_UNUSED_TIMEOUT 20 #define MAILBOX_EMPTY_REPLY_TIMEOUT 10 @@ -239,8 +239,6 @@ comet_mailbox_poll(http_connection_t *hc, const char *remain, void *opaque) const char *cometid = http_arg_get(&hc->hc_req_args, "boxid"); const char *immediate = http_arg_get(&hc->hc_req_args, "immediate"); int im = immediate ? atoi(immediate) : 0; - time_t reqtime; - struct timespec ts; htsmsg_t *m; if(!im) @@ -249,7 +247,7 @@ comet_mailbox_poll(http_connection_t *hc, const char *remain, void *opaque) pthread_mutex_lock(&comet_mutex); if (!comet_running) { pthread_mutex_unlock(&comet_mutex); - return 400; + return HTTP_STATUS_BAD_REQUEST; } if(cometid != NULL) @@ -262,15 +260,11 @@ comet_mailbox_poll(http_connection_t *hc, const char *remain, void *opaque) comet_access_update(hc, cmb); comet_serverIpPort(hc, cmb); } - time(&reqtime); - - ts.tv_sec = reqtime + 10; - ts.tv_nsec = 0; - cmb->cmb_last_used = 0; /* Make sure we're not flushed out */ if(!im && cmb->cmb_messages == NULL) { - pthread_cond_timedwait(&comet_cond, &comet_mutex, &ts); + tvh_cond_timedwait(&comet_cond, &comet_mutex, + getmonoclock() + 10 * MONOCLOCK_RESOLUTION); if (!comet_running) { pthread_mutex_unlock(&comet_mutex); return 400; @@ -322,7 +316,7 @@ comet_mailbox_dbg(http_connection_t *hc, const char *remain, void *opaque) htsmsg_add_str(m, "logtxt", buf); htsmsg_add_msg(cmb->cmb_messages, NULL, m); - pthread_cond_broadcast(&comet_cond); + tvh_cond_signal(&comet_cond, 1); } } pthread_mutex_unlock(&comet_mutex); @@ -338,6 +332,7 @@ void comet_init(void) { pthread_mutex_lock(&comet_mutex); + tvh_cond_init(&comet_cond); comet_running = 1; pthread_mutex_unlock(&comet_mutex); http_path_add("/comet/poll", NULL, comet_mailbox_poll, ACCESS_WEB_INTERFACE); @@ -406,6 +401,6 @@ comet_mailbox_add_message(htsmsg_t *m, int isdebug, int rewrite) } } - pthread_cond_broadcast(&comet_cond); + tvh_cond_signal(&comet_cond, 1); pthread_mutex_unlock(&comet_mutex); } diff --git a/src/webui/webui.c b/src/webui/webui.c index fc7fbda88..fbd4b17f7 100644 --- a/src/webui/webui.c +++ b/src/webui/webui.c @@ -317,8 +317,7 @@ http_stream_run(http_connection_t *hc, profile_chain_t *prch, muxer_t *mux = prch->prch_muxer; time_t lastpkt; int ptimeout, grace = 20; - struct timespec ts; - struct timeval tp; + struct timeval tp; streaming_start_t *ss_copy; int64_t mono; @@ -345,12 +344,8 @@ http_stream_run(http_connection_t *hc, profile_chain_t *prch, pthread_mutex_lock(&sq->sq_mutex); sm = TAILQ_FIRST(&sq->sq_queue); if(sm == NULL) { - gettimeofday(&tp, NULL); - ts.tv_sec = tp.tv_sec + 1; - ts.tv_nsec = tp.tv_usec * 1000; - - if(pthread_cond_timedwait(&sq->sq_cond, &sq->sq_mutex, &ts) == ETIMEDOUT) { - + if(tvh_cond_timedwait(&sq->sq_cond, &sq->sq_mutex, + getmonoclock() + 1 * MONOCLOCK_RESOLUTION) == ETIMEDOUT) { /* Check socket status */ if (tcp_socket_dead(hc->hc_fd)) { tvhlog(LOG_DEBUG, "webui", "Stop streaming %s, client hung up", hc->hc_url_orig); diff --git a/src/wrappers.c b/src/wrappers.c index c94b07cf2..aa9b788e6 100644 --- a/src/wrappers.c +++ b/src/wrappers.c @@ -18,22 +18,9 @@ #include #endif -int -tvh_mutex_timedlock - ( pthread_mutex_t *mutex, int64_t usec ) -{ - int64_t finish = getmonoclock() + usec; - int retcode; - - while ((retcode = pthread_mutex_trylock (mutex)) == EBUSY) { - if (getmonoclock() >= finish) - return ETIMEDOUT; - - usleep(10000); - } - - return retcode; -} +/* + * filedescriptor routines + */ int tvh_open(const char *pathname, int flags, mode_t mode) @@ -128,6 +115,10 @@ tvh_fopen(const char *filename, const char *mode) return f; } +/* + * thread routines + */ + static void doquit(int sig) { } @@ -207,6 +198,82 @@ tvhtread_renice(int value) return ret; } +int +tvh_mutex_timedlock + ( pthread_mutex_t *mutex, int64_t usec ) +{ + int64_t finish = getmonoclock() + usec; + int retcode; + + while ((retcode = pthread_mutex_trylock (mutex)) == EBUSY) { + if (getmonoclock() >= finish) + return ETIMEDOUT; + + usleep(10000); + } + + return retcode; +} + +/* + * thread condition variables - monotonic clocks + */ + +int +tvh_cond_init + ( tvh_cond_t *cond ) +{ + int r; + + pthread_condattr_t attr; + pthread_condattr_init(&attr); + r = pthread_condattr_setclock(&attr, CLOCK_MONOTONIC); + if (r) { + fprintf(stderr, "Unable to set monotonic clocks for conditions! (%d)", r); + abort(); + } + return pthread_cond_init(&cond->cond, &attr); +} + +int +tvh_cond_destroy + ( tvh_cond_t *cond ) +{ + return pthread_cond_destroy(&cond->cond); +} + +int +tvh_cond_signal + ( tvh_cond_t *cond, int broadcast ) +{ + if (broadcast) + return pthread_cond_broadcast(&cond->cond); + else + return pthread_cond_signal(&cond->cond); +} + +int +tvh_cond_wait + ( tvh_cond_t *cond, pthread_mutex_t *mutex) +{ + return pthread_cond_wait(&cond->cond, mutex); +} + +int +tvh_cond_timedwait + ( tvh_cond_t *cond, pthread_mutex_t *mutex, int64_t monoclock ) +{ + struct timespec ts; + ts.tv_sec = monoclock / MONOCLOCK_RESOLUTION; + ts.tv_nsec = (monoclock % MONOCLOCK_RESOLUTION) * + (1000000000ULL/MONOCLOCK_RESOLUTION); + return pthread_cond_timedwait(&cond->cond, mutex, &ts); +} + +/* + * qsort + */ + #if ! ENABLE_QSORT_R /* * qsort_r wrapper for pre GLIBC 2.8