sm.sm_data = &sigstat;
LIST_FOREACH(s, &lfe->mi_transports, s_active_link) {
pthread_mutex_lock(&s->s_stream_mutex);
- streaming_pad_deliver(&s->s_streaming_pad, &sm);
+ streaming_pad_deliver(&s->s_streaming_pad, streaming_msg_clone(&sm));
pthread_mutex_unlock(&s->s_stream_mutex);
}
}
memset(&sm, 0, sizeof(sm));
sm.sm_type = SMT_MPEGTS;
sm.sm_data = pb;
- streaming_pad_deliver(&mmi->mmi_streaming_pad, &sm);
+ streaming_pad_deliver(&mmi->mmi_streaming_pad, streaming_msg_clone(&sm));
pktbuf_ref_dec(pb);
}
sm.sm_data = &sigstat;
LIST_FOREACH(svc, &lfe->mi_transports, s_active_link) {
pthread_mutex_lock(&svc->s_stream_mutex);
- streaming_pad_deliver(&svc->s_streaming_pad, &sm);
+ streaming_pad_deliver(&svc->s_streaming_pad, streaming_msg_clone(&sm));
pthread_mutex_unlock(&svc->s_stream_mutex);
}
gtimer_arm_ms(&lfe->sf_monitor_timer, satip_frontend_signal_cb, lfe, 250);
sm.sm_type = SMT_MPEGTS;
sm.sm_data = pb;
- streaming_pad_deliver(&t->s_streaming_pad, &sm);
+ streaming_pad_deliver(&t->s_streaming_pad, streaming_msg_clone(&sm));
pktbuf_ref_dec(pb);
LIST_FOREACH(svc, &hfe->mi_transports, s_active_link) {
pthread_mutex_lock(&svc->s_stream_mutex);
- streaming_pad_deliver(&svc->s_streaming_pad, &sm);
+ streaming_pad_deliver(&svc->s_streaming_pad, streaming_msg_clone(&sm));
pthread_mutex_unlock(&svc->s_stream_mutex);
}
}
th_pkt_t *pkt = pkt_alloc(sub, off, pts, pts);
pkt->pkt_componentindex = st->es_index;
- streaming_message_t *sm = streaming_msg_create_pkt(pkt);
- streaming_pad_deliver(&t->s_streaming_pad, sm);
- streaming_msg_free(sm);
+ streaming_pad_deliver(&t->s_streaming_pad, streaming_msg_create_pkt(pkt));
/* Decrease our own reference to the packet */
pkt_ref_dec(pkt);
/* Forward packet */
pkt->pkt_componentindex = st->es_index;
- streaming_message_t *sm = streaming_msg_create_pkt(pkt);
-
- streaming_pad_deliver(&t->s_streaming_pad, sm);
- streaming_msg_free(sm);
+ streaming_pad_deliver(&t->s_streaming_pad, streaming_msg_create_pkt(pkt));
/* Decrease our own reference to the packet */
pkt_ref_dec(pkt);
void
service_set_streaming_status_flags_(service_t *t, int set)
{
- streaming_message_t *sm;
lock_assert(&t->s_stream_mutex);
if(set == t->s_streaming_status)
set & TSS_GRACEPERIOD ? "[Graceperiod expired] " : "",
set & TSS_TIMEOUT ? "[Data timeout] " : "");
- sm = streaming_msg_create_code(SMT_SERVICE_STATUS,
- t->s_streaming_status);
- streaming_pad_deliver(&t->s_streaming_pad, sm);
- streaming_msg_free(sm);
+ streaming_pad_deliver(&t->s_streaming_pad,
+ streaming_msg_create_code(SMT_SERVICE_STATUS,
+ t->s_streaming_status));
pthread_cond_broadcast(&t->s_tss_cond);
}
void
service_restart(service_t *t, int had_components)
{
- streaming_message_t *sm;
pthread_mutex_lock(&t->s_stream_mutex);
if(had_components) {
- sm = streaming_msg_create_code(SMT_STOP, SM_CODE_SOURCE_RECONFIGURED);
- streaming_pad_deliver(&t->s_streaming_pad, sm);
- streaming_msg_free(sm);
+ streaming_pad_deliver(&t->s_streaming_pad,
+ streaming_msg_create_code(SMT_STOP,
+ SM_CODE_SOURCE_RECONFIGURED));
}
service_build_filter(t);
if(TAILQ_FIRST(&t->s_filt_components) != NULL) {
- sm = streaming_msg_create_data(SMT_START,
- service_build_stream_start(t));
- streaming_pad_deliver(&t->s_streaming_pad, sm);
- streaming_msg_free(sm);
+ streaming_pad_deliver(&t->s_streaming_pad,
+ streaming_msg_create_data(SMT_START,
+ service_build_stream_start(t)));
}
pthread_mutex_unlock(&t->s_stream_mutex);
void
streaming_target_deliver2(streaming_target_t *st, streaming_message_t *sm)
{
- if(st->st_reject_filter & SMT_TO_MASK(sm->sm_type))
+ if (st->st_reject_filter & SMT_TO_MASK(sm->sm_type))
streaming_msg_free(sm);
else
- st->st_cb(st->st_opaque, sm);
+ streaming_target_deliver(st, sm);
}
/**
void
streaming_pad_deliver(streaming_pad_t *sp, streaming_message_t *sm)
{
- streaming_target_t *st, *next;
+ streaming_target_t *st, *next, *run = NULL;
- for(st = LIST_FIRST(&sp->sp_targets);st; st = next) {
+ for (st = LIST_FIRST(&sp->sp_targets); st; st = next) {
next = LIST_NEXT(st, st_link);
assert(next != st);
- if(st->st_reject_filter & SMT_TO_MASK(sm->sm_type))
+ if (st->st_reject_filter & SMT_TO_MASK(sm->sm_type))
continue;
- st->st_cb(st->st_opaque, streaming_msg_clone(sm));
+ if (run)
+ streaming_target_deliver(run, streaming_msg_clone(sm));
+ run = st;
}
+ if (run)
+ streaming_target_deliver(run, sm);
+ else
+ streaming_msg_free(sm);
}
-
/**
*
*/
streaming_message_t *streaming_msg_create_pkt(th_pkt_t *pkt);
-#define streaming_target_deliver(st, sm) ((st)->st_cb((st)->st_opaque, (sm)))
+static inline void
+streaming_target_deliver(streaming_target_t *st, streaming_message_t *sm)
+ { st->st_cb(st->st_opaque, sm); }
void streaming_target_deliver2(streaming_target_t *st, streaming_message_t *sm);
// Link to service output
streaming_target_connect(&t->s_streaming_pad, &s->ths_input);
- sm = streaming_msg_create_code(SMT_GRACE, s->ths_postpone + t->s_grace_delay);
- streaming_pad_deliver(&t->s_streaming_pad, sm);
- streaming_msg_free(sm);
+ streaming_pad_deliver(&t->s_streaming_pad,
+ streaming_msg_create_code(SMT_GRACE,
+ s->ths_postpone +
+ t->s_grace_delay));
if(s->ths_start_message != NULL && t->s_streaming_status & TSS_PACKETS) {