#include "channels.h"
#include "subscriptions.h"
#include "streaming.h"
+#include "psi.h"
static void *http_server;
static void
http_stream_run(http_connection_t *hc, streaming_queue_t *sq)
{
+ streaming_message_t *sm;
+ int run = 1;
+ int start = 1;
+ int timeouts = 0;
+ pthread_mutex_lock(&sq->sq_mutex);
+
+ while(run) {
+ sm = TAILQ_FIRST(&sq->sq_queue);
+ if(sm == NULL) {
+ struct timespec ts;
+ struct timeval tp;
+
+ gettimeofday(&tp, NULL);
+ ts.tv_sec = tp.tv_sec + 1;
+ ts.tv_nsec = tp.tv_usec * 1000;
+
+ if(pthread_cond_timedwait(&sq->sq_cond, &sq->sq_mutex, &ts) == ETIMEDOUT) {
+ int err = 0;
+ socklen_t errlen = sizeof(err);
+
+ timeouts++;
+
+ //Check socket status
+ getsockopt(hc->hc_fd, SOL_SOCKET, SO_ERROR, (char *)&err, &errlen);
+
+ //Abort upon socket error, or after 5 seconds of silence
+ if(err || timeouts > 4){
+ run = 0;
+ }
+ }
+ continue;
+ }
+
+ timeouts = 0; //Reset timeout counter
+ TAILQ_REMOVE(&sq->sq_queue, sm, sm_link);
+
+ pthread_mutex_unlock(&sq->sq_mutex);
+
+ switch(sm->sm_type) {
+ case SMT_PACKET:
+ //printf("SMT_PACKET\n");
+ break;
+
+ case SMT_START:
+ if (start) {
+ struct streaming_start *ss = sm->sm_data;
+ uint8_t pat_ts[188];
+ uint8_t pmt_ts[188];
+ int pcrpid = ss->ss_pcr_pid;
+ int pmtpid = 0x0fff;
+
http_output_content(hc, "audio/mp2t");
+
+ //Send PAT
+ memset(pat_ts, 0xff, 188);
+ psi_build_pat(NULL, pat_ts+5, 183, pmtpid);
+ pat_ts[0] = 0x47;
+ pat_ts[1] = 0x40;
+ pat_ts[2] = 0x00;
+ pat_ts[3] = 0x10;
+ pat_ts[4] = 0x00;
+ run = (write(hc->hc_fd, pat_ts, 188) == 188);
+
+ //Send PMT
+ memset(pmt_ts, 0xff, 188);
+ psi_build_pmt(ss, pmt_ts+5, 183, pcrpid);
+ pmt_ts[0] = 0x47;
+ pmt_ts[1] = 0x40 | (pmtpid >> 8);
+ pmt_ts[2] = pmtpid;
+ pmt_ts[3] = 0x10;
+ pmt_ts[4] = 0x00;
+ run = (write(hc->hc_fd, pmt_ts, 188) == 188);
+
+ start = 0;
+ }
+ break;
+
+ case SMT_STOP:
+ run = 0;
+ break;
+
+ case SMT_TRANSPORT_STATUS:
+ //printf("SMT_TRANSPORT_STATUS\n");
+ break;
+
+ case SMT_NOSTART:
+ run = 0;
+ break;
+
+ case SMT_MPEGTS:
+ run = (write(hc->hc_fd, sm->sm_data, 188) == 188);
+ break;
+
+ case SMT_EXIT:
+ run = 0;
+ break;
+ }
+
+ streaming_msg_free(sm);
+ pthread_mutex_lock(&sq->sq_mutex);
+ }
+
+ pthread_mutex_unlock(&sq->sq_mutex);
}