htsp_read_loop(htsp_connection_t *htsp)
{
htsmsg_t *m = NULL, *reply;
- int r = 0, i;
+ int run = 1, r = 0, i, streaming = 0;
const char *method;
void *tcp_id = NULL;;
htsp->htsp_granted_access = access_get_by_addr(htsp->htsp_peer);
htsp->htsp_granted_access->aa_rights |= ACCESS_HTSP_INTERFACE;
- tcp_id = tcp_connection_launch(htsp->htsp_fd, htsp_server_status,
+ tcp_id = tcp_connection_launch(htsp->htsp_fd, streaming, htsp_server_status,
htsp->htsp_granted_access);
pthread_mutex_unlock(&global_lock);
/* Session main loop */
- while(tvheadend_is_running()) {
+ while(run && tvheadend_is_running()) {
readmsg:
reply = NULL;
pthread_mutex_lock(&global_lock);
if (htsp_authenticate(htsp, m)) {
tcp_connection_land(tcp_id);
- tcp_id = tcp_connection_launch(htsp->htsp_fd, htsp_server_status,
+ tcp_id = tcp_connection_launch(htsp->htsp_fd, streaming, htsp_server_status,
htsp->htsp_granted_access);
if (tcp_id == NULL) {
- htsmsg_destroy(m);
- pthread_mutex_unlock(&global_lock);
- return 1;
+ reply = htsmsg_create_map();
+ htsmsg_add_u32(reply, "noaccess", 1);
+ htsmsg_add_u32(reply, "connlimit", 1);
+ run = 0;
+ goto send_reply_with_unlock;
}
}
goto readmsg;
} else {
+ if (!strcmp(method, "subscribe") && !streaming) {
+ tcp_connection_land(tcp_id);
+ tcp_id = tcp_connection_launch(htsp->htsp_fd, 1, htsp_server_status,
+ htsp->htsp_granted_access);
+ if (tcp_id == NULL) {
+ reply = htsmsg_create_map();
+ htsmsg_add_u32(reply, "noaccess", 1);
+ htsmsg_add_u32(reply, "connlimit", 1);
+ goto send_reply_with_unlock;
+ }
+ streaming = 1;
+ }
reply = htsp_methods[i].fn(htsp, m);
}
break;
reply = htsp_error(htsp, N_("Invalid arguments"));
}
+send_reply_with_unlock:
pthread_mutex_unlock(&global_lock);
if(reply != NULL) /* Methods can do all the replying inline */
tcp_get_str_from_ip(peer, buf + strlen(buf), sizeof(buf) - strlen(buf));
aa.aa_representative = buf;
- tcp = tcp_connection_launch(fd, rtsp_stream_status, &aa);
+ tcp = tcp_connection_launch(fd, 1, rtsp_stream_status, &aa);
/* Note: global_lock held on entry */
pthread_mutex_unlock(&global_lock);
pthread_t tid;
uint32_t id;
int fd;
+ int streaming;
tcp_server_ops_t ops;
void *opaque;
char *representative;
*/
void *
tcp_connection_launch
- (int fd, void (*status) (void *opaque, htsmsg_t *m), access_t *aa)
+ (int fd, int streaming, void (*status) (void *opaque, htsmsg_t *m), access_t *aa)
{
tcp_server_launch_t *tsl, *res;
- uint32_t used, used2;
+ uint32_t sused, used2;
int64_t started = mclk();
int c1, c2;
try_again:
res = NULL;
- used = 0;
+ sused = 0;
LIST_FOREACH(tsl, &tcp_server_active, alink) {
if (tsl->fd == fd) {
res = tsl;
continue;
}
if (!strcmp(aa->aa_representative ?: "", tsl->representative ?: ""))
- used++;
+ if (tsl->streaming)
+ sused++;
}
if (res == NULL)
return NULL;
if (aa->aa_conn_limit || aa->aa_conn_limit_streaming) {
used2 = aa->aa_conn_limit ? dvr_usage_count(aa) : 0;
/* the rule is: allow if one condition is OK */
- c1 = aa->aa_conn_limit ? used + used2 >= aa->aa_conn_limit : -1;
- c2 = aa->aa_conn_limit_streaming ? used >= aa->aa_conn_limit_streaming : -1;
+ c1 = aa->aa_conn_limit ? sused + used2 >= aa->aa_conn_limit : -1;
+ c2 = aa->aa_conn_limit_streaming ? sused >= aa->aa_conn_limit_streaming : -1;
if (c1 && c2) {
if (started + sec2mono(5) < mclk()) {
"(limit %u, streaming limit %u, active streaming %u, DVR %u)",
aa->aa_username ?: "", aa->aa_representative ?: "",
aa->aa_conn_limit, aa->aa_conn_limit_streaming,
- used, used2);
+ sused, used2);
return NULL;
}
pthread_mutex_unlock(&global_lock);
res->representative = aa->aa_representative ? strdup(aa->aa_representative) : NULL;
res->status = status;
+ res->streaming = streaming;
LIST_INSERT_HEAD(&tcp_server_launches, res, link);
notify_reload("connections");
return res;
htsmsg_add_str(e, "peer", buf);
htsmsg_add_u32(e, "peer_port", ntohs(IP_PORT(tsl->peer)));
htsmsg_add_s64(e, "started", tsl->started);
+ htsmsg_add_u32(e, "streaming", tsl->streaming);
tsl->status(tsl->opaque, e);
htsmsg_add_msg(l, NULL, e);
}
struct access;
uint32_t tcp_connection_count(struct access *aa);
-void *tcp_connection_launch(int fd, void (*status) (void *opaque, htsmsg_t *m),
+void *tcp_connection_launch(int fd, int streaming,
+ void (*status) (void *opaque, htsmsg_t *m),
struct access *aa);
void tcp_connection_land(void *tcp_id);
void tcp_connection_cancel(uint32_t id);
type: 'date',
dateFormat: 'U', /* unix time */
sortType: Ext.data.SortTypes.asDate
- }
+ },
+ { name: 'streaming' }
],
url: 'api/status/connections',
autoLoad: true,
dataIndex: 'started',
renderer: renderDate,
sortable: true
+ }, {
+ width: 50,
+ id: 'streaming',
+ header: _("Streaming"),
+ dataIndex: 'streaming',
+ sortable: true
}, {
width: 50,
id: 'server',
static inline void *
http_stream_preop ( http_connection_t *hc )
{
- return tcp_connection_launch(hc->hc_fd, http_stream_status, hc->hc_access);
+ return tcp_connection_launch(hc->hc_fd, 1, http_stream_status, hc->hc_access);
}
static inline void