streaming_queue_t *sq;
streaming_message_t *sm;
const char *err = NULL;
+ uint64_t timeout;
profile_chain_init(&prch, NULL, NULL);
prch.prch_st = &prch.prch_sq.sq_st;
/* Wait */
run = 1;
pthread_mutex_lock(&sq->sq_mutex);
+ timeout = mclk() + mono2sec(30);
while(tvheadend_is_running() && run) {
+ if (timeout < mclk()) {
+ run = 0;
+ err = streaming_code2txt(SM_CODE_BAD_SOURCE);
+ break;
+ }
+
/* Wait for message */
while((sm = TAILQ_FIRST(&sq->sq_queue)) == NULL) {
tvh_cond_wait(&sq->sq_cond, &sq->sq_mutex);
streaming_queue_remove(sq, sm);
pthread_mutex_unlock(&sq->sq_mutex);
- if(sm->sm_type == SMT_PACKET) {
+ switch (sm->sm_type) {
+ case SMT_GRACE:
+ timeout += mono2sec(sm->sm_code);
+ break;
+ case SMT_PACKET:
run = 0;
err = NULL;
- } else if (sm->sm_type == SMT_SERVICE_STATUS) {
- int status = sm->sm_code;
-
- if(status & TSS_ERRORS) {
+ break;
+ case SMT_SERVICE_STATUS:
+ if(sm->sm_code & TSS_ERRORS) {
run = 0;
- err = service_tss2text(status);
+ err = service_tss2text(sm->sm_code);
}
- } else if (sm->sm_type == SMT_NOSTART) {
+ break;
+ case SMT_NOSTART:
run = 0;
err = streaming_code2txt(sm->sm_code);
+ break;
+ default:
+ break;
}
streaming_msg_free(sm);