void
htsbuf_data_free(htsbuf_queue_t *hq, htsbuf_data_t *hd)
{
+ hq->hq_size -= hd->hd_data_size - hd->hd_data_off;
TAILQ_REMOVE(&hq->hq_q, hd, hd_link);
free(hd->hd_data);
free(hd);
time_t t;
int sess = 0;
- lock_assert(&hc->hc_fd_lock);
+ assert(atomic_get(&hc->hc_extra_insend) > 0);
htsbuf_queue_init(&hdrs, 0);
}
#endif
- pthread_mutex_lock(&hc->hc_fd_lock);
+ http_send_begin(hc);
http_send_header(hc, rc, content, size,
encoding, location, maxage, 0, NULL, NULL);
else
tvh_write(hc->hc_fd, data, size);
}
- pthread_mutex_unlock(&hc->hc_fd_lock);
+ http_send_end(hc);
free(data);
}
http_send_reply(hc, HTTP_STATUS_OK, "text/css", NULL, loc, 0);
}
+/**
+ *
+ */
+void
+http_extra_destroy(http_connection_t *hc)
+{
+ htsbuf_data_t *hd, *hd_next;
+
+ pthread_mutex_lock(&hc->hc_extra_lock);
+ for (hd = TAILQ_FIRST(&hc->hc_extra.hq_q); hd; hd = hd_next) {
+ hd_next = TAILQ_NEXT(hd, hd_link);
+ if (hd->hd_data_off <= 0) {
+ htsbuf_data_free(&hc->hc_extra, hd);
+ atomic_dec(&hc->hc_extra_chunks, 1);
+ }
+ }
+ pthread_mutex_unlock(&hc->hc_extra_lock);
+}
+
+/**
+ *
+ */
+int
+http_extra_flush(http_connection_t *hc)
+{
+ htsbuf_data_t *hd;
+ int r = 0, serr;
+
+ if (atomic_add(&hc->hc_extra_insend, 1) != 0)
+ goto fin;
+
+ while (1) {
+ pthread_mutex_lock(&hc->hc_extra_lock);
+ hd = TAILQ_FIRST(&hc->hc_extra.hq_q);
+ do {
+ r = send(hc->hc_fd, hd->hd_data + hd->hd_data_off,
+ hd->hd_data_size - hd->hd_data_off,
+ MSG_DONTWAIT | (TAILQ_NEXT(hd, hd_link) ? MSG_MORE : 0));
+ serr = errno;
+ } while (r < 0 && serr == EINTR);
+ if (r + hd->hd_data_off >= hd->hd_data_size) {
+ atomic_dec(&hc->hc_extra_chunks, 1);
+ htsbuf_data_free(&hc->hc_extra, hd);
+ } else if (r > 0) {
+ hd->hd_data_off += r;
+ hc->hc_extra.hq_size -= r;
+ }
+ pthread_mutex_unlock(&hc->hc_extra_lock);
+
+ if (r < 0) {
+ if (ERRNO_AGAIN(serr))
+ r = 0;
+ break;
+ }
+ }
+
+fin:
+ atomic_dec(&hc->hc_extra_insend, 1);
+ return r;
+}
+
+/**
+ *
+ */
+int
+http_extra_flush_partial(http_connection_t *hc)
+{
+ htsbuf_data_t *hd;
+ int r = 0;
+ unsigned int off, size;
+ void *data = NULL;
+
+ atomic_add(&hc->hc_extra_insend, 1);
+
+ pthread_mutex_lock(&hc->hc_extra_lock);
+ hd = TAILQ_FIRST(&hc->hc_extra.hq_q);
+ if (hd && hd->hd_data_off > 0) {
+ data = hd->hd_data;
+ hd->hd_data = NULL;
+ off = hd->hd_data_off;
+ size = hd->hd_data_size;
+ atomic_dec(&hc->hc_extra_chunks, 1);
+ htsbuf_data_free(&hc->hc_extra, hd);
+ }
+ pthread_mutex_unlock(&hc->hc_extra_lock);
+ if (data == NULL)
+ goto finish;
+
+ r = tvh_write(hc->hc_fd, data + off, size - off);
+ free(data);
+
+finish:
+ atomic_dec(&hc->hc_extra_insend, 1);
+ return r;
+}
+
+/**
+ *
+ */
+int
+http_extra_send(http_connection_t *hc, const void *data, size_t data_len)
+{
+ uint8_t *b = malloc(data_len);
+ memcpy(b, data, data_len);
+ return http_extra_send_prealloc(hc, b, data_len);
+}
+
+/**
+ *
+ */
+int
+http_extra_send_prealloc(http_connection_t *hc, const void *data, size_t data_len)
+{
+ if (data == NULL) return 0;
+ pthread_mutex_lock(&hc->hc_extra_lock);
+ if (hc->hc_extra.hq_size <= 1024*1024) {
+ atomic_add(&hc->hc_extra_chunks, 1);
+ htsbuf_append_prealloc(&hc->hc_extra, data, data_len);
+ } else {
+ free((void *)data);
+ }
+ pthread_mutex_unlock(&hc->hc_extra_lock);
+ return http_extra_flush(hc);
+}
+
/**
*
*/
static int
http_cmd_options(http_connection_t *hc)
{
- pthread_mutex_lock(&hc->hc_fd_lock);
+ http_send_begin(hc);
http_send_header(hc, HTTP_STATUS_OK, NULL, INT64_MIN,
NULL, NULL, -1, 0, NULL, NULL);
- pthread_mutex_unlock(&hc->hc_fd_lock);
+ http_send_end(hc);
return 0;
}
char *argv[3], *c, *s, *cmdline = NULL, *hdrline = NULL;
int n, r, delim;
- pthread_mutex_init(&hc->hc_fd_lock, NULL);
+ pthread_mutex_init(&hc->hc_extra_lock, NULL);
http_arg_init(&hc->hc_args);
http_arg_init(&hc->hc_req_args);
htsbuf_queue_init(&spill, 0);
htsbuf_queue_init(&hc->hc_reply, 0);
+ htsbuf_queue_init(&hc->hc_extra, 0);
+ atomic_set(&hc->hc_extra_insend, 0);
+ atomic_set(&hc->hc_extra_chunks, 0);
do {
hc->hc_no_output = 0;
free(hdrline);
free(cmdline);
htsbuf_queue_flush(&spill);
+ htsbuf_queue_flush(&hc->hc_extra);
free(hc->hc_nonce);
hc->hc_nonce = NULL;
#include "htsbuf.h"
#include "url.h"
#include "tvhpoll.h"
- #include "access.h"
+#include "access.h"
+#include "atomic.h"
struct channel;
struct http_path;
} http_ver_t;
typedef struct http_connection {
- pthread_mutex_t hc_fd_lock;
int hc_fd;
struct sockaddr_storage *hc_peer;
char *hc_peer_ipstr;
htsbuf_queue_t hc_reply;
+ int hc_extra_insend;
+ pthread_mutex_t hc_extra_lock;
+ int hc_extra_chunks;
+ htsbuf_queue_t hc_extra;
+
http_arg_list_t hc_args;
http_arg_list_t hc_req_args; /* Argumets from GET or POST request */
void http_css_import(http_connection_t *hc, const char *location);
+void http_extra_destroy(http_connection_t *hc);
+
+int http_extra_flush(http_connection_t *hc);
+
+int http_extra_flush_partial(http_connection_t *hc);
+
+int http_extra_send(http_connection_t *hc, const void *data, size_t data_len);
+
+int http_extra_send_prealloc(http_connection_t *hc, const void *data, size_t data_len);
+
+static inline void http_send_begin(http_connection_t *hc)
+{
+ if (atomic_get(&hc->hc_extra_chunks) > 0)
+ http_extra_flush_partial(hc);
+ atomic_add(&hc->hc_extra_insend, 1);
+}
+
+static inline void http_send_end(http_connection_t *hc)
+{
+ atomic_dec(&hc->hc_extra_insend, 1);
+ if (atomic_get(&hc->hc_extra_chunks) > 0)
+ http_extra_flush(hc);
+}
+
void http_send_header(http_connection_t *hc, int rc, const char *content,
int64_t contentlen, const char *encoding,
const char *location, int maxage, const char *range,
signal_status_t sig;
int sig_lock;
pthread_mutex_t lock;
- pthread_mutex_t *tcp_lock;
+ http_connection_t *hc;
uint8_t *table_data;
int table_data_len;
void (*no_data_cb)(void *opaque);
static int
satip_rtp_tcp_data(satip_rtp_session_t *rtp, uint8_t stream, uint8_t *data, size_t data_len)
{
- int r = 0;
-
assert(data_len <= 0xffff);
data[0] = '$';
data[1] = stream;
data[2] = (data_len - 4) >> 8;
data[3] = (data_len - 4) & 0xff;
- pthread_mutex_lock(rtp->tcp_lock);
- if (!tvh_write(rtp->fd_rtp, data, data_len))
- r = errno;
- pthread_mutex_unlock(rtp->tcp_lock);
- return r;
+ return http_extra_send_prealloc(rtp->hc, data, data_len);
}
static inline int
if (v->iov_len)
r = satip_rtp_tcp_data(rtp, 0, v->iov_base, v->iov_len);
- free(v->iov_base);
+ else
+ free(v->iov_base);
v->iov_base = NULL;
v->iov_len = 0;
return r;
*/
void *satip_rtp_queue(th_subscription_t *subs,
streaming_queue_t *sq,
- pthread_mutex_t *tcp_lock,
+ http_connection_t *hc,
struct sockaddr_storage *peer, int port,
int fd_rtp, int fd_rtcp,
int frontend, int source, dvb_mux_conf_t *dmc,
rtp->fd_rtcp = fd_rtcp;
rtp->subs = subs;
rtp->sq = sq;
- rtp->tcp_lock = tcp_lock;
+ rtp->hc = hc;
+ rtp->tcp_payload = RTP_TCP_PAYLOAD;
+ rtp->tcp_buffer_size = 16*1024*1024;
rtp->no_data_cb = no_data_cb;
rtp->no_data_opaque = no_data_opaque;
atomic_set(&rtp->allow_data, allow_data);
tvh_cond_signal(&sq->sq_cond, 0);
pthread_mutex_unlock(&sq->sq_mutex);
pthread_mutex_unlock(&satip_rtp_lock);
- if (rtp->port == RTSP_TCP_DATA)
- pthread_mutex_lock(rtp->tcp_lock);
pthread_join(rtp->tid, NULL);
if (rtp->port == RTSP_TCP_DATA) {
- pthread_mutex_unlock(rtp->tcp_lock);
+ http_extra_destroy(rtp->hc);
free(rtp->tcp_data.iov_base);
} else {
udp_multisend_free(&rtp->um);
tvhwarn(LS_SATIPS, "-/%s/%i: session closed (timeout)", rs->session, rs->stream);
pthread_mutex_unlock(&global_lock);
pthread_mutex_lock(&rtsp_lock);
- if (rs->rtp_peer_port == RTSP_TCP_DATA && rs->tcp_data)
- shutdown(rs->tcp_data->hc_fd, SHUT_RDWR);
rtsp_close_session(rs);
rtsp_free_session(rs);
pthread_mutex_unlock(&rtsp_lock);
rs->no_data = 0;
rs->rtp_handle =
satip_rtp_queue(rs->subs, &rs->prch.prch_sq,
- &hc->hc_fd_lock, hc->hc_peer, rs->rtp_peer_port,
+ hc, hc->hc_peer, rs->rtp_peer_port,
rs->udp_rtp ? rs->udp_rtp->fd : hc->hc_fd,
rs->udp_rtcp ? rs->udp_rtcp->fd : -1,
rs->findex, rs->src, &rs->dmc_tuned,
http_arg_set(&args, "Public", "OPTIONS,DESCRIBE,SETUP,PLAY,TEARDOWN");
if (hc->hc_session)
http_arg_set(&args, "Session", hc->hc_session);
- pthread_mutex_lock(&hc->hc_fd_lock);
+ http_send_begin(hc);
http_send_header(hc, HTTP_STATUS_OK, NULL, 0, NULL, NULL, 0, NULL, NULL, &args);
- pthread_mutex_unlock(&hc->hc_fd_lock);
+ http_send_end(hc);
http_arg_flush(&args);
return 0;
else
snprintf(buf, sizeof(buf), "rtsp://%s", rtsp_ip);
http_arg_set(&args, "Content-Base", buf);
- pthread_mutex_lock(&hc->hc_fd_lock);
+ http_send_begin(hc);
http_send_header(hc, HTTP_STATUS_OK, "application/sdp", q.hq_size,
NULL, NULL, 0, NULL, NULL, &args);
tcp_write_queue(hc->hc_fd, &q);
- pthread_mutex_unlock(&hc->hc_fd_lock);
+ http_send_end(hc);
http_arg_flush(&args);
htsbuf_queue_flush(&q);
return 0;
pthread_mutex_unlock(&rtsp_lock);
- pthread_mutex_lock(&hc->hc_fd_lock);
+ http_send_begin(hc);
http_send_header(hc, HTTP_STATUS_OK, NULL, 0, NULL, NULL, 0, NULL, NULL, &args);
- pthread_mutex_unlock(&hc->hc_fd_lock);
+ http_send_end(hc);
goto end;
pthread_mutex_unlock(&rtsp_lock);
http_arg_init(&args);
http_arg_set(&args, "Session", session);
- pthread_mutex_lock(&hc->hc_fd_lock);
+ http_send_begin(hc);
http_send_header(hc, HTTP_STATUS_OK, NULL, 0, NULL, NULL, 0, NULL, NULL, NULL);
- pthread_mutex_unlock(&hc->hc_fd_lock);
+ http_send_end(hc);
http_arg_flush(&args);
}
return 0;
http_serve_requests(&hc);
- shutdown(fd, SHUT_RDWR);
-
rtsp_flush_requests(&hc);
close(fd);
snprintf(buf2, sizeof(buf2), "%d", srcs);
http_arg_set(&args, "X-SATIP-Sources", buf2);
}
- pthread_mutex_lock(&hc->hc_fd_lock);
+ http_send_begin(hc);
http_send_header(hc, 200, "text/xml", strlen(buf), 0, NULL, 10, 0, NULL, &args);
tvh_write(hc->hc_fd, buf, strlen(buf));
- pthread_mutex_unlock(&hc->hc_fd_lock);
+ http_send_end(hc);
http_arg_flush(&args);
return 0;
void *satip_rtp_queue(th_subscription_t *subs,
streaming_queue_t *sq,
- pthread_mutex_t *tcp_lock,
+ http_connection_t *hc,
struct sockaddr_storage *peer, int port,
int fd_rtp, int fd_rtcp,
int frontend, int source,
if (!gzip && fb_gzipped(fp))
gzip = "gzip";
- pthread_mutex_lock(&hc->hc_fd_lock);
+ http_send_begin(hc);
http_send_header(hc, 200, content, size, gzip, NULL, 10, 0, NULL, NULL);
while (!fb_eof(fp)) {
ssize_t c = fb_read(fp, buf, sizeof(buf));
break;
}
}
- pthread_mutex_unlock(&hc->hc_fd_lock);
+ http_send_end(hc);
fb_close(fp);
return ret;
pthread_mutex_unlock(&global_lock);
len = strlen(buf);
- pthread_mutex_lock(&hc->hc_fd_lock);
+ http_send_begin(hc);
http_send_header(hc, 200, "application/xspf+xml", len, 0, NULL, 10, 0, NULL, NULL);
tvh_write(hc->hc_fd, buf, len);
- pthread_mutex_unlock(&hc->hc_fd_lock);
+ http_send_end(hc);
free(hostpath);
return 0;
pthread_mutex_unlock(&global_lock);
len = strlen(buf);
- pthread_mutex_lock(&hc->hc_fd_lock);
+ http_send_begin(hc);
http_send_header(hc, 200, MIME_M3U, len, 0, NULL, 10, 0, NULL, NULL);
tvh_write(hc->hc_fd, buf, len);
- pthread_mutex_unlock(&hc->hc_fd_lock);
+ http_send_end(hc);
free(hostpath);
return 0;
}
}
- pthread_mutex_lock(&hc->hc_fd_lock);
+ http_send_begin(hc);
http_send_header(hc, range ? HTTP_STATUS_PARTIAL_CONTENT : HTTP_STATUS_OK,
content, content_len, NULL, NULL, 10,
range ? range_buf : NULL, disposition, NULL);
stats(hc, r, opaque);
}
}
- pthread_mutex_unlock(&hc->hc_fd_lock);
+ http_send_end(hc);
close(fd);
return ret;
}
}
snprintf(buf, sizeof(buf), "tvh_locale={};tvh_locale_lang='';");
- pthread_mutex_lock(&hc->hc_fd_lock);
+ http_send_begin(hc);
http_send_header(hc, 200, "text/javascript; charset=UTF-8", strlen(buf), 0, NULL, 10, 0, NULL, NULL);
tvh_write(hc->hc_fd, buf, strlen(buf));
- pthread_mutex_unlock(&hc->hc_fd_lock);
+ http_send_end(hc);
return 0;
}
if (!strcmp(components[0], "theme.css")) {