/*
* Functions managing stream_interface structures
*
- * Copyright 2000-2008 Willy Tarreau <w@1wt.eu>
+ * Copyright 2000-2009 Willy Tarreau <w@1wt.eu>
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
buffer_auto_close(si->ob);
}
+/* default update function for scheduled tasks, not used for embedded tasks */
+void stream_int_update(struct stream_interface *si)
+{
+ DPRINTF(stderr, "%s: si=%p, si->state=%d ib->flags=%08x ob->flags=%08x\n",
+ __FUNCTION__,
+ si, si->state, si->ib->flags, si->ob->flags);
+
+ if (!(si->flags & SI_FL_DONT_WAKE) && si->owner)
+ task_wakeup(si->owner, TASK_WOKEN_IO);
+}
+
+/* default update function for embedded tasks, to be used at the end of the i/o handler */
+void stream_int_update_embedded(struct stream_interface *si)
+{
+ DPRINTF(stderr, "%s: si=%p, si->state=%d ib->flags=%08x ob->flags=%08x\n",
+ __FUNCTION__,
+ si, si->state, si->ib->flags, si->ob->flags);
+
+ if (si->state != SI_ST_EST)
+ return;
+
+ if ((si->ob->flags & (BF_OUT_EMPTY|BF_SHUTW|BF_HIJACK|BF_SHUTW_NOW)) == (BF_OUT_EMPTY|BF_SHUTW_NOW))
+ si->shutw(si);
+
+ if ((si->ob->flags & (BF_FULL|BF_SHUTW|BF_SHUTW_NOW|BF_HIJACK)) == 0)
+ si->flags |= SI_FL_WAIT_DATA;
+
+ if ((si->ib->flags & (BF_FULL|BF_SHUTR)) == BF_FULL)
+ si->flags |= SI_FL_WAIT_ROOM;
+
+ if (si->ob->flags & BF_WRITE_ACTIVITY || si->ib->flags & BF_READ_ACTIVITY) {
+ if (tick_isset(si->ib->rex))
+ si->ib->rex = tick_add_ifset(now_ms, si->ib->rto);
+ if (tick_isset(si->ob->wex))
+ si->ob->wex = tick_add_ifset(now_ms, si->ob->wto);
+ }
+
+ if (si->ob->flags & BF_WRITE_PARTIAL)
+ si->ob->prod->chk_rcv(si->ob->prod);
+
+ if (si->ib->flags & BF_READ_PARTIAL)
+ si->ib->cons->chk_snd(si->ib->cons);
+
+ /* Note that we're trying to wake up in two conditions here :
+ * - special event, which needs the holder task attention
+ * - status indicating that the applet can go on working. This
+ * is rather hard because we might be blocking on output and
+ * don't want to wake up on input and vice-versa. The idea is
+ * the to only rely the changes the chk_* might have performed.
+ */
+ if (/* check stream interface changes */
+ (si->flags & SI_FL_ERR) || si->state != SI_ST_EST || si->ib->cons->state != SI_ST_EST ||
+ /* check response buffer changes */
+ (si->ib->flags & (BF_READ_NULL|BF_READ_ERROR|BF_READ_DONTWAIT)) ||
+ ((si->ib->flags & BF_READ_ACTIVITY) && !si->ib->to_forward) ||
+ (!(si->ib->flags & BF_FULL) && (si->ib->flags & BF_WRITE_ACTIVITY) && si->ib->to_forward) ||
+ /* check request buffer changes */
+ (si->ob->flags & (BF_WRITE_ERROR)) ||
+ ((si->ob->flags & BF_WRITE_ACTIVITY) && (si->ob->flags & BF_OUT_EMPTY) && !si->ob->to_forward) ||
+ (si->ob->flags & BF_READ_ACTIVITY)) {
+ if (!(si->flags & SI_FL_DONT_WAKE) && si->owner)
+ task_wakeup(si->owner, TASK_WOKEN_IO);
+ }
+}
+
+/* default shutr function for scheduled tasks */
+void stream_int_shutr(struct stream_interface *si)
+{
+ DPRINTF(stderr, "%s: si=%p, si->state=%d ib->flags=%08x ob->flags=%08x\n",
+ __FUNCTION__,
+ si, si->state, si->ib->flags, si->ob->flags);
+
+ si->ib->flags &= ~BF_SHUTR_NOW;
+ if (si->ib->flags & BF_SHUTR)
+ return;
+ si->ib->flags |= BF_SHUTR;
+ si->ib->rex = TICK_ETERNITY;
+ si->flags &= ~SI_FL_WAIT_ROOM;
+
+ if (si->state != SI_ST_EST && si->state != SI_ST_CON)
+ return;
+
+ if (si->ob->flags & BF_SHUTW) {
+ si->state = SI_ST_DIS;
+ si->exp = TICK_ETERNITY;
+ }
+
+ /* note that if the task exist, it must unregister itself once it runs */
+ if (!(si->flags & SI_FL_DONT_WAKE) && si->owner)
+ task_wakeup(si->owner, TASK_WOKEN_IO);
+}
+
+/* default shutw function for scheduled tasks */
+void stream_int_shutw(struct stream_interface *si)
+{
+ DPRINTF(stderr, "%s: si=%p, si->state=%d ib->flags=%08x ob->flags=%08x\n",
+ __FUNCTION__,
+ si, si->state, si->ib->flags, si->ob->flags);
+
+ si->ob->flags &= ~BF_SHUTW_NOW;
+ if (si->ob->flags & BF_SHUTW)
+ return;
+ si->ob->flags |= BF_SHUTW;
+ si->ob->wex = TICK_ETERNITY;
+ si->flags &= ~SI_FL_WAIT_DATA;
+
+ switch (si->state) {
+ case SI_ST_EST:
+ if (!(si->ib->flags & BF_SHUTR))
+ break;
+
+ /* fall through */
+ case SI_ST_CON:
+ case SI_ST_CER:
+ si->state = SI_ST_DIS;
+ /* fall through */
+ default:
+ si->flags &= ~SI_FL_WAIT_ROOM;
+ si->ib->flags |= BF_SHUTR;
+ si->ib->rex = TICK_ETERNITY;
+ si->exp = TICK_ETERNITY;
+ }
+
+ /* note that if the task exist, it must unregister itself once it runs */
+ if (!(si->flags & SI_FL_DONT_WAKE) && si->owner)
+ task_wakeup(si->owner, TASK_WOKEN_IO);
+}
+
+/* default chk_rcv function for scheduled tasks */
+void stream_int_chk_rcv(struct stream_interface *si)
+{
+ struct buffer *ib = si->ib;
+
+ DPRINTF(stderr, "%s: si=%p, si->state=%d ib->flags=%08x ob->flags=%08x\n",
+ __FUNCTION__,
+ si, si->state, si->ib->flags, si->ob->flags);
+
+ if (unlikely(si->state != SI_ST_EST || (ib->flags & BF_SHUTR)))
+ return;
+
+ if (ib->flags & (BF_FULL|BF_HIJACK)) {
+ /* stop reading */
+ if ((ib->flags & (BF_FULL|BF_HIJACK)) == BF_FULL)
+ si->flags |= SI_FL_WAIT_ROOM;
+ }
+ else {
+ /* (re)start reading */
+ si->flags &= ~SI_FL_WAIT_ROOM;
+ if (!(si->flags & SI_FL_DONT_WAKE) && si->owner)
+ task_wakeup(si->owner, TASK_WOKEN_IO);
+ }
+}
+
+/* default chk_snd function for scheduled tasks */
+void stream_int_chk_snd(struct stream_interface *si)
+{
+ struct buffer *ob = si->ob;
+
+ DPRINTF(stderr, "%s: si=%p, si->state=%d ib->flags=%08x ob->flags=%08x\n",
+ __FUNCTION__,
+ si, si->state, si->ib->flags, si->ob->flags);
+
+ if (unlikely(si->state != SI_ST_EST || (si->ob->flags & BF_SHUTW)))
+ return;
+
+ if (!(si->flags & SI_FL_WAIT_DATA) || /* not waiting for data */
+ (ob->flags & BF_OUT_EMPTY)) /* called with nothing to send ! */
+ return;
+
+ /* Otherwise there are remaining data to be sent in the buffer,
+ * so we tell the handler.
+ */
+ si->flags &= ~SI_FL_WAIT_DATA;
+ if (!tick_isset(ob->wex))
+ ob->wex = tick_add_ifset(now_ms, ob->wto);
+
+ if (!(si->flags & SI_FL_DONT_WAKE) && si->owner)
+ task_wakeup(si->owner, TASK_WOKEN_IO);
+}
+
+/* Register a function to handle a stream_interface as part of the stream
+ * interface's owner task, which is returned. The SI will wake it up everytime
+ * it is solicited. The task's processing function must call the specified
+ * function before returning. It must be deleted by the task handler using
+ * stream_int_unregister_handler(), possibly from withing the function itself.
+ */
+struct task *stream_int_register_handler(struct stream_interface *si,
+ void (*fct)(struct stream_interface *))
+{
+ DPRINTF(stderr, "registering handler %p for si %p (was %p)\n", fct, si, si->owner);
+
+ si->update = stream_int_update_embedded;
+ si->shutr = stream_int_shutr;
+ si->shutw = stream_int_shutw;
+ si->chk_rcv = stream_int_chk_rcv;
+ si->chk_snd = stream_int_chk_snd;
+ si->connect = NULL;
+ si->iohandler = fct;
+ si->flags |= SI_FL_WAIT_DATA;
+ return si->owner;
+}
+
+/* Register a function to handle a stream_interface as a standalone task. The
+ * new task itself is returned and is assigned as si->owner. The stream_interface
+ * pointer will be pointed to by the task's context. The handler can be detached
+ * by using stream_int_unregister_handler().
+ */
+struct task *stream_int_register_handler_task(struct stream_interface *si,
+ struct task *(*fct)(struct task *))
+{
+ struct task *t;
+
+ DPRINTF(stderr, "registering handler %p for si %p (was %p)\n", fct, si, si->owner);
+
+ si->update = stream_int_update;
+ si->shutr = stream_int_shutr;
+ si->shutw = stream_int_shutw;
+ si->chk_rcv = stream_int_chk_rcv;
+ si->chk_snd = stream_int_chk_snd;
+ si->connect = NULL;
+ si->iohandler = NULL; /* not used when running as an external task */
+ si->flags |= SI_FL_WAIT_DATA;
+
+ t = task_new();
+ si->owner = t;
+ if (!t)
+ return t;
+ t->process = fct;
+ t->context = si;
+ task_wakeup(si->owner, TASK_WOKEN_INIT);
+
+ return t;
+}
+
+/* Unregister a stream interface handler. This must be called by the handler task
+ * itself when it detects that it is in the SI_ST_DIS state. This function can
+ * both detach standalone handlers and embedded handlers.
+ */
+void stream_int_unregister_handler(struct stream_interface *si)
+{
+ if (!si->iohandler && si->owner) {
+ /* external handler : kill the task */
+ task_delete(si->owner);
+ task_free(si->owner);
+ }
+ si->iohandler = NULL;
+ si->owner = NULL;
+}
+
/*
* Local variables:
* c-indent-level: 8