* Transmit a HTTP reply
*/
void
-http_send_header(http_connection_t *hc, int rc, const char *content,
+http_send_header(http_connection_t *hc, int rc, const char *content,
int64_t contentlen,
const char *encoding, const char *location,
int maxage, const char *range,
time_t t;
int sess = 0;
+ lock_assert(&hc->hc_fd_lock);
+
htsbuf_queue_init(&hdrs, 0);
htsbuf_qprintf(&hdrs, "%s %d %s\r\n",
}
#endif
+ pthread_mutex_lock(&hc->hc_fd_lock);
http_send_header(hc, rc, content, size,
encoding, location, maxage, 0, NULL, NULL);
else
tvh_write(hc->hc_fd, data, size);
}
+ pthread_mutex_unlock(&hc->hc_fd_lock);
free(data);
}
char *argv[3], *c, *cmdline = NULL, *hdrline = NULL;
int n, r;
+ pthread_mutex_init(&hc->hc_fd_lock, NULL);
http_arg_init(&hc->hc_args);
http_arg_init(&hc->hc_req_args);
htsbuf_queue_init(&spill, 0);
} http_ver_t;
typedef struct http_connection {
+ pthread_mutex_t hc_fd_lock;
int hc_fd;
struct sockaddr_storage *hc_peer;
char *hc_peer_ipstr;
#define RTP_PACKETS 128
#define RTP_PAYLOAD (7*188+12)
+#define RTP_TCP_PAYLOAD (87*188+12+4) /* cca 16kB */
#define RTCP_PAYLOAD (1420)
typedef struct satip_rtp_table {
signal_status_t sig;
int sig_lock;
pthread_mutex_t lock;
+ pthread_mutex_t *tcp_lock;
uint8_t *table_data;
int table_data_len;
} satip_rtp_session_t;
}
static void
-satip_rtp_header(satip_rtp_session_t *rtp)
+satip_rtp_header(satip_rtp_session_t *rtp, struct iovec *v)
{
- struct iovec *v = rtp->um_iovec + rtp->um_packet;
uint8_t *data = v->iov_base;
uint32_t tstamp = dispatch_clock + rtp->seq;
v->iov_len = len;
}
if (v->iov_len == 0)
- satip_rtp_header(rtp);
+ satip_rtp_header(rtp, rtp->um_iovec + rtp->um_packet);
return 0;
}
return r;
} else {
rtp->um_packet++;
- satip_rtp_header(rtp);
+ satip_rtp_header(rtp, rtp->um_iovec + rtp->um_packet);
}
*_v = rtp->um_iovec + rtp->um_packet;
} else {
dvb_table_parse(&tbl->tbl, "-", data, 188, 1, 0, satip_rtp_pmt_cb);
if (rtp->table_data_len) {
for (i = 0; i < rtp->table_data_len; i += 188) {
- r = satip_rtp_append_data(rtp, &v, data);
+ r = satip_rtp_append_data(rtp, &v, rtp->table_data + i);
if (r < 0)
return r;
}
return 0;
}
+static void
+satip_rtp_tcp_data(satip_rtp_session_t *rtp, uint8_t stream, uint8_t *data, size_t data_len)
+{
+ assert(data_len <= 0xffff);
+ data[0] = '$';
+ data[1] = stream;
+ data[2] = (data_len - 4) >> 8;
+ data[3] = data_len & 0xff;
+ pthread_mutex_lock(rtp->tcp_lock);
+ tvh_write(rtp->fd_rtp, data, data_len);
+ pthread_mutex_unlock(rtp->tcp_lock);
+}
+
+static inline void
+satip_rtp_flush_tcp_data(satip_rtp_session_t *rtp, struct iovec *v)
+{
+ if (v->iov_len)
+ satip_rtp_tcp_data(rtp, 0, v->iov_base, v->iov_len);
+ free(v->iov_base);
+ v->iov_base = NULL;
+ v->iov_len = 0;
+}
+
+static inline int
+satip_rtp_append_tcp_data(satip_rtp_session_t *rtp, struct iovec *v, uint8_t *data, size_t len)
+{
+ if (v->iov_base == NULL) {
+ v->iov_base = malloc(RTP_TCP_PAYLOAD);
+ v->iov_len = 4; /* keep room for RTSP embedded data header */
+ satip_rtp_header(rtp, v);
+ }
+ assert(v->iov_len + len <= RTP_TCP_PAYLOAD);
+ memcpy(v->iov_base + v->iov_len, data, len);
+ v->iov_len += len;
+ if (v->iov_len == RTP_TCP_PAYLOAD)
+ satip_rtp_flush_tcp_data(rtp, v);
+ return 0;
+}
+
+
+static int
+satip_rtp_tcp_loop(satip_rtp_session_t *rtp, uint8_t *data, int len)
+{
+ int i, j, pid, last_pid = -1, r;
+ mpegts_apid_t *pids = rtp->pids.pids;
+ struct iovec v = {NULL, 0};
+ satip_rtp_table_t *tbl;
+
+ assert((len % 188) == 0);
+ if (len > 0)
+ rtp->sig_lock = 1;
+ for ( ; len >= 188 ; data += 188, len -= 188) {
+ pid = ((data[1] & 0x1f) << 8) | data[2];
+ if (pid != last_pid && !rtp->pids.all) {
+ for (i = 0; i < rtp->pids.count; i++) {
+ j = pids[i].pid;
+ if (pid < j) break;
+ if (j == pid) goto found;
+ }
+ continue;
+found:
+ TAILQ_FOREACH(tbl, &rtp->pmt_tables, link)
+ if (tbl->pid == pid) {
+ dvb_table_parse(&tbl->tbl, "-", data, 188, 1, 0, satip_rtp_pmt_cb);
+ if (rtp->table_data_len) {
+ satip_rtp_append_tcp_data(rtp, &v, rtp->table_data, rtp->table_data_len);
+ free(rtp->table_data);
+ rtp->table_data = NULL;
+ }
+ break;
+ }
+ if (tbl)
+ continue;
+ last_pid = pid;
+ }
+ r = satip_rtp_append_tcp_data(rtp, &v, data, 188);
+ if (r < 0)
+ return r;
+ }
+ if (v.iov_len)
+ satip_rtp_flush_tcp_data(rtp, &v);
+ return 0;
+}
+
static void
satip_rtp_signal_status(satip_rtp_session_t *rtp, signal_status_t *sig)
{
pktbuf_t *pb;
char peername[50];
int alive = 1, fatal = 0, r;
+ int tcp = rtp->port == RTSP_TCP_DATA;
tcp_get_str_from_ip((struct sockaddr *)&rtp->peer, peername, sizeof(peername));
- tvhdebug("satips", "RTP streaming to %s:%d open", peername, rtp->port);
+ tvhdebug("satips", "RTP streaming to %s:%d open", peername,
+ tcp ? IP_PORT(rtp->peer) : rtp->port);
pthread_mutex_lock(&sq->sq_mutex);
while (rtp->sq && !fatal) {
pb = sm->sm_data;
subscription_add_bytes_out(subs, pktbuf_len(pb));
pthread_mutex_lock(&rtp->lock);
- r = satip_rtp_loop(rtp, pktbuf_ptr(pb), pktbuf_len(pb));
+ if (tcp)
+ r = satip_rtp_tcp_loop(rtp, pktbuf_ptr(pb), pktbuf_len(pb));
+ else
+ r = satip_rtp_loop(rtp, pktbuf_ptr(pb), pktbuf_len(pb));
pthread_mutex_unlock(&rtp->lock);
if (r) fatal = 1;
break;
*/
void satip_rtp_queue(void *id, th_subscription_t *subs,
streaming_queue_t *sq,
+ pthread_mutex_t *tcp_lock,
struct sockaddr_storage *peer, int port,
int fd_rtp, int fd_rtcp,
int frontend, int source, dvb_mux_conf_t *dmc,
mpegts_apids_t *pids, int perm_lock)
{
satip_rtp_session_t *rtp = calloc(1, sizeof(*rtp));
+ int dscp;
if (rtp == NULL)
return;
rtp->id = id;
rtp->peer = *peer;
rtp->peer2 = *peer;
- IP_PORT_SET(rtp->peer2, htons(port + 1));
+ if (port != RTSP_TCP_DATA)
+ IP_PORT_SET(rtp->peer2, htons(port + 1));
rtp->port = port;
rtp->fd_rtp = fd_rtp;
rtp->fd_rtcp = fd_rtcp;
rtp->subs = subs;
rtp->sq = sq;
+ rtp->tcp_lock = tcp_lock;
mpegts_pid_init(&rtp->pids);
mpegts_pid_copy(&rtp->pids, pids);
TAILQ_INIT(&rtp->pmt_tables);
udp_multisend_init(&rtp->um, RTP_PACKETS, RTP_PAYLOAD, &rtp->um_iovec);
- satip_rtp_header(rtp);
+ satip_rtp_header(rtp, rtp->um_iovec);
rtp->frontend = frontend;
rtp->dmc = *dmc;
rtp->source = source;
pthread_mutex_init(&rtp->lock, NULL);
- if (config.dscp >= 0) {
- socket_set_dscp(rtp->fd_rtp, config.dscp, NULL, 0);
- socket_set_dscp(rtp->fd_rtcp, config.dscp, NULL, 0);
- } else {
- socket_set_dscp(rtp->fd_rtp, IPTOS_DSCP_EF, NULL, 0);
- socket_set_dscp(rtp->fd_rtcp, IPTOS_DSCP_EF, NULL, 0);
- }
+ dscp = config.dscp >= 0 ? config.dscp : IPTOS_DSCP_EF;
+ socket_set_dscp(rtp->fd_rtp, dscp, NULL, 0);
+ if (rtp->fd_rtcp >= 0)
+ socket_set_dscp(rtp->fd_rtcp, dscp, NULL, 0);
if (perm_lock) {
rtp->sig.signal_scale = SIGNAL_STATUS_SCALE_RELATIVE;
if (rtp) {
TAILQ_REMOVE(&satip_rtp_sessions, rtp, link);
sq = rtp->sq;
+ pthread_mutex_lock(rtp->tcp_lock);
pthread_mutex_lock(&sq->sq_mutex);
rtp->sq = NULL;
pthread_cond_signal(&sq->sq_cond);
pthread_mutex_unlock(&sq->sq_mutex);
pthread_mutex_unlock(&satip_rtp_lock);
+ pthread_mutex_lock(rtp->tcp_lock);
pthread_join(rtp->tid, NULL);
+ pthread_mutex_unlock(rtp->tcp_lock);
udp_multisend_free(&rtp->um);
mpegts_pid_done(&rtp->pids);
while ((tbl = TAILQ_FIRST(&rtp->pmt_tables)) != NULL) {
tcp_get_str_from_ip((struct sockaddr*)&rtp->peer2, addrbuf, sizeof(addrbuf));
tvhtrace("satips", "RTCP send to %s:%d : %s", addrbuf, IP_PORT(rtp->peer2), msg + 16);
}
- r = sendto(rtp->fd_rtcp, msg, len, 0,
- (struct sockaddr*)&rtp->peer2,
- rtp->peer2.ss_family == AF_INET6 ?
- sizeof(struct sockaddr_in6) : sizeof(struct sockaddr_in));
+ if (rtp->port == RTSP_TCP_DATA) {
+ satip_rtp_tcp_data(rtp, 1, msg, len);
+ r = 0;
+ } else {
+ r = sendto(rtp->fd_rtcp, msg, len, 0,
+ (struct sockaddr*)&rtp->peer2,
+ rtp->peer2.ss_family == AF_INET6 ?
+ sizeof(struct sockaddr_in6) : sizeof(struct sockaddr_in));
+ }
if (r < 0) {
err = errno;
tcp_get_str_from_ip((struct sockaddr*)&rtp->peer2, addrbuf, sizeof(addrbuf));
#include "htsbuf.h"
#include "config.h"
#include "profile.h"
+#include "streaming.h"
#include "satip/server.h"
#include "input/mpegts/iptv/iptv_private.h"
goto endclean;
satip_rtp_queue((void *)(intptr_t)rs->stream,
rs->subs, &rs->prch.prch_sq,
- hc->hc_peer, rs->rtp_peer_port,
- rs->udp_rtp->fd, rs->udp_rtcp->fd,
+ &hc->hc_fd_lock, hc->hc_peer, rs->rtp_peer_port,
+ rs->udp_rtp ? rs->udp_rtp->fd : -1,
+ rs->udp_rtcp ? rs->udp_rtcp->fd : -1,
rs->frontend, rs->findex, &rs->dmc_tuned,
&rs->pids, rs->perm_lock);
if (!rs->pids.all && rs->pids.count == 0)
const char *s = http_arg_get(&hc->hc_args, "Transport");
const char *u;
int a, b;
- if (!s || strncmp(s, "RTP/AVP;unicast;client_port=", 28))
+ if (!s)
return -1;
- for (s += 28, u = s; isdigit(*u); u++);
- if (*u != '-')
- return -1;
- a = atoi(s);
- for (s = ++u; isdigit(*s); s++);
- if (*s != '\0' && *s != ';')
- return -1;
- b = atoi(u);
- if (a + 1 != b)
- return -1;
- return a;
+ if (strncmp(s, "RTP/AVP;unicast;client_port=", 28) == 0) {
+ for (s += 28, u = s; isdigit(*u); u++);
+ if (*u != '-')
+ return -1;
+ a = atoi(s);
+ for (s = ++u; isdigit(*s); s++);
+ if (*s != '\0' && *s != ';')
+ return -1;
+ b = atoi(u);
+ if (a + 1 != b)
+ return -1;
+ return a;
+ } else if (strncmp(s, "RTP/AVP/TCP;interleaved=0-1", 27) == 0) {
+ return RTSP_TCP_DATA;
+ }
+ return -1;
}
/*
http_arg_set(&args, "Public", "OPTIONS,DESCRIBE,SETUP,PLAY,TEARDOWN");
if (hc->hc_session)
http_arg_set(&args, "Session", hc->hc_session);
+ pthread_mutex_lock(&hc->hc_fd_lock);
http_send_header(hc, HTTP_STATUS_OK, NULL, 0, NULL, NULL, 0, NULL, NULL, &args);
+ pthread_mutex_unlock(&hc->hc_fd_lock);
http_arg_flush(&args);
return 0;
else
snprintf(buf, sizeof(buf), "rtsp://%s", rtsp_ip);
http_arg_set(&args, "Content-Base", buf);
+ pthread_mutex_lock(&hc->hc_fd_lock);
http_send_header(hc, HTTP_STATUS_OK, "application/sdp", q.hq_size,
NULL, NULL, 0, NULL, NULL, &args);
tcp_write_queue(hc->hc_fd, &q);
+ pthread_mutex_unlock(&hc->hc_fd_lock);
http_arg_flush(&args);
htsbuf_queue_flush(&q);
return 0;
if (errcode) goto error;
- if (setup) {
+ if (setup && rs->rtp_peer_port != RTSP_TCP_DATA) {
if (udp_bind_double(&rs->udp_rtp, &rs->udp_rtcp,
"satips", "rtsp", "rtcp",
rtsp_ip, 0, NULL,
pthread_mutex_unlock(&rtsp_lock);
+ pthread_mutex_lock(&hc->hc_fd_lock);
http_send_header(hc, HTTP_STATUS_OK, NULL, 0, NULL, NULL, 0, NULL, NULL, &args);
+ pthread_mutex_unlock(&hc->hc_fd_lock);
goto end;
pthread_mutex_unlock(&rtsp_lock);
http_arg_init(&args);
http_arg_set(&args, "Session", session);
+ pthread_mutex_lock(&hc->hc_fd_lock);
http_send_header(hc, HTTP_STATUS_OK, NULL, 0, NULL, NULL, 0, NULL, NULL, NULL);
+ pthread_mutex_unlock(&hc->hc_fd_lock);
http_arg_flush(&args);
}
return 0;
snprintf(buf2, sizeof(buf2), "%d", srcs);
http_arg_set(&args, "X-SATIP-Sources", buf2);
}
+ pthread_mutex_lock(&hc->hc_fd_lock);
http_send_header(hc, 200, "text/xml", strlen(buf), 0, NULL, 10, 0, NULL, &args);
- http_arg_flush(&args);
-
tvh_write(hc->hc_fd, buf, strlen(buf));
+ pthread_mutex_unlock(&hc->hc_fd_lock);
+ http_arg_flush(&args);
return 0;
#undef MSG
#define MUXCNF_KEEP 1
#define MUXCNF_REJECT 2
+#define RTSP_TCP_DATA 1000000
+
struct satip_server_conf {
idnode_t idnode;
int satip_deviceid;
void satip_rtp_queue(void *id, th_subscription_t *subs,
streaming_queue_t *sq,
+ pthread_mutex_t *tcp_lock,
struct sockaddr_storage *peer, int port,
int fd_rtp, int fd_rtcp,
int frontend, int source,
((struct sockaddr_in6 *)&(storage))->sin6_port : \
((struct sockaddr_in *)&(storage))->sin_port)
#define IP_PORT_SET(storage, port) \
- if ((storage).ss_family == AF_INET6) \
+ do { \
+ if ((storage).ss_family == AF_INET6) \
((struct sockaddr_in6 *)&(storage))->sin6_port = (port); else \
- ((struct sockaddr_in *)&(storage))->sin_port = (port);
+ ((struct sockaddr_in *)&(storage))->sin_port = (port); \
+ } while (0)
typedef struct tcp_server_ops
{
if (!gzip && fb_gzipped(fp))
gzip = "gzip";
+ pthread_mutex_lock(&hc->hc_fd_lock);
http_send_header(hc, 200, content, size, gzip, NULL, 10, 0, NULL, NULL);
while (!fb_eof(fp)) {
ssize_t c = fb_read(fp, buf, sizeof(buf));
break;
}
}
+ pthread_mutex_unlock(&hc->hc_fd_lock);
fb_close(fp);
return ret;
pthread_mutex_unlock(&global_lock);
len = strlen(buf);
+ pthread_mutex_lock(&hc->hc_fd_lock);
http_send_header(hc, 200, "application/xspf+xml", len, 0, NULL, 10, 0, NULL, NULL);
tvh_write(hc->hc_fd, buf, len);
+ pthread_mutex_unlock(&hc->hc_fd_lock);
free(hostpath);
return 0;
pthread_mutex_unlock(&global_lock);
len = strlen(buf);
+ pthread_mutex_lock(&hc->hc_fd_lock);
http_send_header(hc, 200, MIME_M3U, len, 0, NULL, 10, 0, NULL, NULL);
tvh_write(hc->hc_fd, buf, len);
+ pthread_mutex_unlock(&hc->hc_fd_lock);
free(hostpath);
return 0;
return HTTP_STATUS_NOT_ALLOWED;
}
+ pthread_mutex_lock(&hc->hc_fd_lock);
http_send_header(hc, range ? HTTP_STATUS_PARTIAL_CONTENT : HTTP_STATUS_OK,
content, content_len, NULL, NULL, 10,
range ? range_buf : NULL, disposition, NULL);
}
}
}
+ pthread_mutex_unlock(&hc->hc_fd_lock);
close(fd);
pthread_mutex_lock(&global_lock);
return HTTP_STATUS_NOT_FOUND;
}
+ pthread_mutex_lock(&hc->hc_fd_lock);
http_send_header(hc, 200, NULL, st.st_size, 0, NULL, 10, 0, NULL, NULL);
while (1) {
if (tvh_write(hc->hc_fd, buf, c))
break;
}
+ pthread_mutex_unlock(&hc->hc_fd_lock);
close(fd);
return 0;
}
}
snprintf(buf, sizeof(buf), "tvh_locale={};tvh_locale_lang='';");
+ pthread_mutex_lock(&hc->hc_fd_lock);
http_send_header(hc, 200, "text/javascript; charset=UTF-8", strlen(buf), 0, NULL, 10, 0, NULL, NULL);
tvh_write(hc->hc_fd, buf, strlen(buf));
+ pthread_mutex_unlock(&hc->hc_fd_lock);
return 0;
}
}