}
static void
-timeshift_packet_flush ( timeshift_t *ts, int64_t time )
+timeshift_packet_flush ( timeshift_t *ts, int64_t time, int deliver )
{
streaming_message_t *lowest, *sm;
struct streaming_message_queue *sq;
if (!lowest)
break;
TAILQ_REMOVE(sq, lowest, sm_link);
- timeshift_packet_deliver(ts, lowest);
+ if (deliver)
+ timeshift_packet_deliver(ts, lowest);
+ else
+ streaming_msg_free(lowest);
}
}
static void
-timeshift_packet( timeshift_t *ts, th_pkt_t *pkt )
+timeshift_packet( timeshift_t *ts, th_pkt_t *pkt, int deliver )
{
streaming_message_t *sm;
int64_t time;
time = ts_rescale(pkt->pkt_pts, 1000000);
if (time > ts->last_time) {
atomic_exchange_s64(&ts->last_time, time);
- timeshift_packet_flush(ts, time);
+ timeshift_packet_flush(ts, time, deliver);
}
sm->sm_time = time;
if (time + MAX_TIME_DELTA < ts->last_time) {
- timeshift_packet_deliver(ts, sm);
+ if (deliver)
+ timeshift_packet_deliver(ts, sm);
} else {
if (pkt->pkt_componentindex >= ts->backlog_max)
ts->backlog_max = pkt->pkt_componentindex + 1;
ts->smt_start = sm->sm_data;
atomic_add(&ts->smt_start->ss_refcount, 1);
if (ts->packet_mode) {
- timeshift_packet_flush(ts, ts->last_time + MAX_TIME_DELTA + 1000);
+ timeshift_packet_flush(ts, ts->last_time + MAX_TIME_DELTA + 1000, ts->dobuf);
if (ts->last_time)
ts->start_pts = ts->last_time + 1000;
}
if (ts->packet_mode) {
sm->sm_time = ts->last_time;
if (type == SMT_PACKET) {
- timeshift_packet(ts, pkt);
+ timeshift_packet(ts, pkt, 1);
goto msg_free;
}
} else {
streaming_target_deliver2(&ts->wr_queue.sq_st, sm);
} else {
if (type == SMT_PACKET) {
+ timeshift_packet(ts, pkt, 0);
tvhtrace("timeshift",
"ts %d pkt in - stream %d type %c pts %10"PRId64
" dts %10"PRId64" dur %10d len %6zu",