struct channel *ic = si_ic(si);
struct channel *oc = si_oc(si);
struct stream_interface *sio = si_opposite(si);
+ struct task *task = si_task(si);
/* process consumer side */
if (channel_is_empty(oc)) {
!(oc->flags & (CF_AUTO_CLOSE|CF_SHUTW_NOW|CF_SHUTW))) &&
(sio->state != SI_ST_EST ||
(channel_is_empty(oc) && !oc->to_forward)))))) {
- task_wakeup(si_task(si), TASK_WOKEN_IO);
+ task_wakeup(task, TASK_WOKEN_IO);
+ }
+ else {
+ /* Update expiration date for the task and requeue it */
+ task->expire = tick_first((tick_is_expired(task->expire, now_ms) ? 0 : task->expire),
+ tick_first(tick_first(ic->rex, ic->wex),
+ tick_first(oc->rex, oc->wex)));
+ task_queue(task);
}
if (ic->flags & CF_READ_ACTIVITY)
ic->flags &= ~CF_READ_DONTWAIT;