From: Jaroslav Kysela Date: Thu, 27 Jul 2017 14:32:50 +0000 (+0200) Subject: satip server: remove the rtp/id layer X-Git-Tag: v4.2.4~73 X-Git-Url: http://git.ipfire.org/cgi-bin/gitweb.cgi?a=commitdiff_plain;h=6fad116abb97cef3293969b5477866179d8682be;p=thirdparty%2Ftvheadend.git satip server: remove the rtp/id layer --- diff --git a/src/satip/rtp.c b/src/satip/rtp.c index 644c05d35..889db1d93 100644 --- a/src/satip/rtp.c +++ b/src/satip/rtp.c @@ -46,7 +46,6 @@ typedef struct satip_rtp_table { typedef struct satip_rtp_session { TAILQ_ENTRY(satip_rtp_session) link; pthread_t tid; - void *id; struct sockaddr_storage peer; struct sockaddr_storage peer2; int port; @@ -435,35 +434,20 @@ satip_rtp_thread(void *aux) /* * */ -static satip_rtp_session_t * -satip_rtp_find(void *id) -{ - satip_rtp_session_t *rtp; - - TAILQ_FOREACH(rtp, &satip_rtp_sessions, link) - if (rtp->id == id) - break; - return rtp; -} - -/* - * - */ -void satip_rtp_queue(void *id, th_subscription_t *subs, - streaming_queue_t *sq, - pthread_mutex_t *tcp_lock, - struct sockaddr_storage *peer, int port, - int fd_rtp, int fd_rtcp, - int frontend, int source, dvb_mux_conf_t *dmc, - mpegts_apids_t *pids, int allow_data, int perm_lock) +void *satip_rtp_queue(th_subscription_t *subs, + streaming_queue_t *sq, + pthread_mutex_t *tcp_lock, + struct sockaddr_storage *peer, int port, + int fd_rtp, int fd_rtcp, + int frontend, int source, dvb_mux_conf_t *dmc, + mpegts_apids_t *pids, int allow_data, int perm_lock) { satip_rtp_session_t *rtp = calloc(1, sizeof(*rtp)); int dscp; if (rtp == NULL) - return; + return NULL; - rtp->id = id; rtp->peer = *peer; rtp->peer2 = *peer; if (port != RTSP_TCP_DATA) @@ -506,108 +490,104 @@ void satip_rtp_queue(void *id, th_subscription_t *subs, TAILQ_INSERT_TAIL(&satip_rtp_sessions, rtp, link); tvhthread_create(&rtp->tid, NULL, satip_rtp_thread, rtp, "satip-rtp"); pthread_mutex_unlock(&satip_rtp_lock); + return rtp; } -void satip_rtp_allow_data(void *id) +void satip_rtp_allow_data(void *_rtp) { - satip_rtp_session_t *rtp; + satip_rtp_session_t *rtp = _rtp; + if (rtp == NULL) + return; pthread_mutex_lock(&satip_rtp_lock); - rtp = satip_rtp_find(id); - if (rtp) - atomic_set(&rtp->allow_data, 1); + atomic_set(&rtp->allow_data, 1); pthread_mutex_unlock(&satip_rtp_lock); } -void satip_rtp_update_pids(void *id, mpegts_apids_t *pids) +void satip_rtp_update_pids(void *_rtp, mpegts_apids_t *pids) { - satip_rtp_session_t *rtp; + satip_rtp_session_t *rtp = _rtp; + if (rtp == NULL) + return; pthread_mutex_lock(&satip_rtp_lock); - rtp = satip_rtp_find(id); - if (rtp) { - pthread_mutex_lock(&rtp->lock); - mpegts_pid_copy(&rtp->pids, pids); - pthread_mutex_unlock(&rtp->lock); - } + pthread_mutex_lock(&rtp->lock); + mpegts_pid_copy(&rtp->pids, pids); + pthread_mutex_unlock(&rtp->lock); pthread_mutex_unlock(&satip_rtp_lock); } -void satip_rtp_update_pmt_pids(void *id, mpegts_apids_t *pmt_pids) +void satip_rtp_update_pmt_pids(void *_rtp, mpegts_apids_t *pmt_pids) { - satip_rtp_session_t *rtp; + satip_rtp_session_t *rtp = _rtp; satip_rtp_table_t *tbl, *tbl_next; int i, pid; + if (rtp == NULL) + return; pthread_mutex_lock(&satip_rtp_lock); - rtp = satip_rtp_find(id); - if (rtp) { - pthread_mutex_lock(&rtp->lock); + pthread_mutex_lock(&rtp->lock); + TAILQ_FOREACH(tbl, &rtp->pmt_tables, link) + if (!mpegts_pid_rexists(pmt_pids, tbl->pid)) + tbl->remove_mark = 1; + for (i = 0; i < pmt_pids->count; i++) { + pid = pmt_pids->pids[i].pid; TAILQ_FOREACH(tbl, &rtp->pmt_tables, link) - if (!mpegts_pid_rexists(pmt_pids, tbl->pid)) - tbl->remove_mark = 1; - for (i = 0; i < pmt_pids->count; i++) { - pid = pmt_pids->pids[i].pid; - TAILQ_FOREACH(tbl, &rtp->pmt_tables, link) - if (tbl->pid == pid) - break; - if (!tbl) { - tbl = calloc(1, sizeof(*tbl)); - dvb_table_parse_init(&tbl->tbl, "satip-pmt", LS_TBL_SATIP, pid, - DVB_PMT_BASE, DVB_PMT_MASK, rtp); - tbl->pid = pid; - TAILQ_INSERT_TAIL(&rtp->pmt_tables, tbl, link); - } + if (tbl->pid == pid) + break; + if (!tbl) { + tbl = calloc(1, sizeof(*tbl)); + dvb_table_parse_init(&tbl->tbl, "satip-pmt", LS_TBL_SATIP, pid, + DVB_PMT_BASE, DVB_PMT_MASK, rtp); + tbl->pid = pid; + TAILQ_INSERT_TAIL(&rtp->pmt_tables, tbl, link); } - for (tbl = TAILQ_FIRST(&rtp->pmt_tables); tbl; tbl = tbl_next){ - tbl_next = TAILQ_NEXT(tbl, link); - if (tbl->remove_mark) { - TAILQ_REMOVE(&rtp->pmt_tables, tbl, link); - free(tbl); - } + } + for (tbl = TAILQ_FIRST(&rtp->pmt_tables); tbl; tbl = tbl_next){ + tbl_next = TAILQ_NEXT(tbl, link); + if (tbl->remove_mark) { + TAILQ_REMOVE(&rtp->pmt_tables, tbl, link); + free(tbl); } - pthread_mutex_unlock(&rtp->lock); } + pthread_mutex_unlock(&rtp->lock); pthread_mutex_unlock(&satip_rtp_lock); } -void satip_rtp_close(void *id) +void satip_rtp_close(void *_rtp) { - satip_rtp_session_t *rtp; + satip_rtp_session_t *rtp = _rtp; satip_rtp_table_t *tbl; streaming_queue_t *sq; + if (rtp == NULL) + return; pthread_mutex_lock(&satip_rtp_lock); - rtp = satip_rtp_find(id); tvhtrace(LS_SATIPS, "rtp close %p", rtp); - if (rtp) { - TAILQ_REMOVE(&satip_rtp_sessions, rtp, link); - sq = rtp->sq; - pthread_mutex_lock(&sq->sq_mutex); - rtp->sq = NULL; - tvh_cond_signal(&sq->sq_cond, 0); - pthread_mutex_unlock(&sq->sq_mutex); - pthread_mutex_unlock(&satip_rtp_lock); - if (rtp->port == RTSP_TCP_DATA) - pthread_mutex_lock(rtp->tcp_lock); - pthread_join(rtp->tid, NULL); - if (rtp->port == RTSP_TCP_DATA) { - pthread_mutex_unlock(rtp->tcp_lock); - free(rtp->tcp_data.iov_base); - } else { - udp_multisend_free(&rtp->um); - } - mpegts_pid_done(&rtp->pids); - while ((tbl = TAILQ_FIRST(&rtp->pmt_tables)) != NULL) { - dvb_table_parse_done(&tbl->tbl); - TAILQ_REMOVE(&rtp->pmt_tables, tbl, link); - free(tbl); - } - pthread_mutex_destroy(&rtp->lock); - free(rtp); + TAILQ_REMOVE(&satip_rtp_sessions, rtp, link); + sq = rtp->sq; + pthread_mutex_lock(&sq->sq_mutex); + rtp->sq = NULL; + tvh_cond_signal(&sq->sq_cond, 0); + pthread_mutex_unlock(&sq->sq_mutex); + pthread_mutex_unlock(&satip_rtp_lock); + if (rtp->port == RTSP_TCP_DATA) + pthread_mutex_lock(rtp->tcp_lock); + pthread_join(rtp->tid, NULL); + if (rtp->port == RTSP_TCP_DATA) { + pthread_mutex_unlock(rtp->tcp_lock); + free(rtp->tcp_data.iov_base); } else { - pthread_mutex_unlock(&satip_rtp_lock); + udp_multisend_free(&rtp->um); } + mpegts_pid_done(&rtp->pids); + while ((tbl = TAILQ_FIRST(&rtp->pmt_tables)) != NULL) { + dvb_table_parse_done(&tbl->tbl); + TAILQ_REMOVE(&rtp->pmt_tables, tbl, link); + free(tbl); + } + pthread_mutex_destroy(&rtp->lock); + free(rtp); } /* @@ -822,19 +802,18 @@ satip_status_build(satip_rtp_session_t *rtp, char *buf, int len) /* * */ -int satip_rtp_status(void *id, char *buf, int len) +int satip_rtp_status(void *_rtp, char *buf, int len) { - satip_rtp_session_t *rtp; + satip_rtp_session_t *rtp = _rtp; int r = 0; buf[0] = '\0'; + if (rtp == NULL) + return 0; pthread_mutex_lock(&satip_rtp_lock); - rtp = satip_rtp_find(id); - if (rtp) { - pthread_mutex_lock(&rtp->lock); - r = satip_status_build(rtp, buf, len); - pthread_mutex_unlock(&rtp->lock); - } + pthread_mutex_lock(&rtp->lock); + r = satip_status_build(rtp, buf, len); + pthread_mutex_unlock(&rtp->lock); pthread_mutex_unlock(&satip_rtp_lock); return r; } diff --git a/src/satip/rtsp.c b/src/satip/rtsp.c index 753243084..c8c3d6287 100644 --- a/src/satip/rtsp.c +++ b/src/satip/rtsp.c @@ -69,6 +69,7 @@ typedef struct session { int rtp_peer_port; udp_connection_t *udp_rtp; udp_connection_t *udp_rtcp; + void *rtp_handle; http_connection_t *old_hc; LIST_HEAD(, slave_subscription) slaves; } session_t; @@ -381,7 +382,8 @@ rtsp_clean(session_t *rs) slave_subscription_t *sub; if (rs->state == STATE_PLAY) { - satip_rtp_close((void *)(intptr_t)rs->stream); + satip_rtp_close(rs->rtp_handle); + rs->rtp_handle = NULL; rs->state = STATE_DESCRIBE; } if (rs->subs) { @@ -494,7 +496,7 @@ end: if (s->s_pmt_pid <= 0 || s->s_pmt_pid >= 8191) continue; mpegts_pid_add(&pmt_pids, s->s_pmt_pid, MPS_WEIGHT_PMT); } - satip_rtp_update_pmt_pids((void *)(intptr_t)rs->stream, &pmt_pids); + satip_rtp_update_pmt_pids(rs->rtp_handle, &pmt_pids); mpegts_pid_done(&pmt_pids); } } @@ -609,22 +611,22 @@ pids: mpegts_pid_add(&rs->pids, 0, MPS_WEIGHT_RAW); svc = (mpegts_service_t *)rs->subs->ths_raw_service; svc->s_update_pids(svc, &rs->pids); - satip_rtp_update_pids((void *)(intptr_t)rs->stream, &rs->pids); + satip_rtp_update_pids(rs->rtp_handle, &rs->pids); if (rs->used_weight != weight && weight > 0) subscription_set_weight(rs->subs, rs->used_weight = weight); } if (cmd == RTSP_CMD_PLAY && rs->state != STATE_PLAY) { if (rs->mux == NULL) goto endclean; - satip_rtp_queue((void *)(intptr_t)rs->stream, - rs->subs, &rs->prch.prch_sq, - &hc->hc_fd_lock, hc->hc_peer, rs->rtp_peer_port, - rs->udp_rtp ? rs->udp_rtp->fd : hc->hc_fd, - rs->udp_rtcp ? rs->udp_rtcp->fd : -1, - rs->findex, rs->src, &rs->dmc_tuned, - &rs->pids, - ocmd == RTSP_CMD_PLAY || oldstate == STATE_PLAY, - rs->perm_lock); + rs->rtp_handle = + satip_rtp_queue(rs->subs, &rs->prch.prch_sq, + &hc->hc_fd_lock, hc->hc_peer, rs->rtp_peer_port, + rs->udp_rtp ? rs->udp_rtp->fd : hc->hc_fd, + rs->udp_rtcp ? rs->udp_rtcp->fd : -1, + rs->findex, rs->src, &rs->dmc_tuned, + &rs->pids, + ocmd == RTSP_CMD_PLAY || oldstate == STATE_PLAY, + rs->perm_lock); rs->tcp_data = rs->udp_rtp ? NULL : hc; if (!rs->pids.all && rs->pids.count == 0) mpegts_pid_add(&rs->pids, 0, MPS_WEIGHT_RAW); @@ -632,7 +634,7 @@ pids: svc->s_update_pids(svc, &rs->pids); rs->state = STATE_PLAY; } else if (ocmd == RTSP_CMD_PLAY) { - satip_rtp_allow_data((void *)(intptr_t)rs->stream); + satip_rtp_allow_data(rs->rtp_handle); } rtsp_manage_descramble(rs); pthread_mutex_unlock(&global_lock); @@ -1260,7 +1262,7 @@ rtsp_describe_session(session_t *rs, htsbuf_queue_t *q) else htsbuf_append_str(q, "c=IN IP4 0.0.0.0\r\n"); if (rs->state == STATE_PLAY || rs->state == STATE_SETUP) { - satip_rtp_status((void *)(intptr_t)rs->stream, buf, sizeof(buf)); + satip_rtp_status(rs->rtp_handle, buf, sizeof(buf)); htsbuf_qprintf(q, "a=fmtp:33 %s\r\n", buf); htsbuf_qprintf(q, "a=%s\r\n", rs->state == STATE_SETUP ? "inactive" : "sendonly"); } else { @@ -1524,7 +1526,8 @@ rtsp_flush_requests(http_connection_t *hc) rtsp_close_session(rs); rtsp_free_session(rs); } else if (rs->tcp_data == hc) { - satip_rtp_close((void *)(intptr_t)rs->stream); + satip_rtp_close(rs->rtp_handle); + rs->rtp_handle = NULL; rs->tcp_data = NULL; rs->state = STATE_SETUP; } @@ -1594,7 +1597,8 @@ rtsp_serve(int fd, void **opaque, struct sockaddr_storage *peer, static void rtsp_close_session(session_t *rs) { - satip_rtp_close((void *)(intptr_t)rs->stream); + satip_rtp_close(rs->rtp_handle); + rs->rtp_handle = NULL; rs->state = STATE_DESCRIBE; udp_close(rs->udp_rtp); rs->udp_rtp = NULL; diff --git a/src/satip/server.h b/src/satip/server.h index 471027fcb..d42d74559 100644 --- a/src/satip/server.h +++ b/src/satip/server.h @@ -65,24 +65,24 @@ extern struct satip_server_conf satip_server_conf; extern const idclass_t satip_server_class; -void satip_rtp_queue(void *id, th_subscription_t *subs, - streaming_queue_t *sq, - pthread_mutex_t *tcp_lock, - struct sockaddr_storage *peer, int port, - int fd_rtp, int fd_rtcp, - int frontend, int source, - dvb_mux_conf_t *dmc, - mpegts_apids_t *pids, - int allow_data, int perm_lock); -void satip_rtp_update(void *id, th_subscription_t *subs, +void *satip_rtp_queue(th_subscription_t *subs, + streaming_queue_t *sq, + pthread_mutex_t *tcp_lock, + struct sockaddr_storage *peer, int port, + int fd_rtp, int fd_rtcp, + int frontend, int source, + dvb_mux_conf_t *dmc, + mpegts_apids_t *pids, + int allow_data, int perm_lock); +void satip_rtp_update(void *_rtp, th_subscription_t *subs, streaming_queue_t *sq, int frontend, int source, dvb_mux_conf_t *dmc, mpegts_apids_t *pids, mpegts_apids_t *pmt_pids); -void satip_rtp_allow_data(void *id); -void satip_rtp_update_pids(void *id, mpegts_apids_t *pids); -void satip_rtp_update_pmt_pids(void *id, mpegts_apids_t *pmt_pids); +void satip_rtp_allow_data(void *_rtp); +void satip_rtp_update_pids(void *_rtp, mpegts_apids_t *pids); +void satip_rtp_update_pmt_pids(void *_rtp, mpegts_apids_t *pmt_pids); int satip_rtp_status(void *id, char *buf, int len); void satip_rtp_close(void *id);