]> git.ipfire.org Git - thirdparty/haproxy.git/commitdiff
MEDIUM: stream: make stream_new() allocate its own task
authorWilly Tarreau <w@1wt.eu>
Mon, 28 Aug 2017 14:22:54 +0000 (16:22 +0200)
committerWilly Tarreau <w@1wt.eu>
Wed, 30 Aug 2017 05:05:04 +0000 (07:05 +0200)
Currently a task is allocated in session_new() and serves two purposes :
  - either the handshake is complete and it is offered to the stream via
    the second arg of stream_new()

  - or the handshake is not complete and it's diverted to be used as a
    timeout handler for the embryonic session and repurposed once we land
    into conn_complete_session()

Furthermore, the task's process() function was taken from the listener's
handler in conn_complete_session() prior to being replaced by a call to
stream_new(). This will become a serious mess with the mux.

Since it's impossible to have a stream without a task, this patch removes
the second arg from stream_new() and make this function allocate its own
task. In session_accept_fd(), we now only allocate the task if needed for
the embryonic session and delete it later.

include/proto/stream.h
src/flt_spoe.c
src/hlua.c
src/peers.c
src/session.c
src/stream.c

index 5ff22916aef7c90e7c21927f43e261ff92c7856e..44fc8bea5505fd5d665357ca43df9a8fdf847d36 100644 (file)
@@ -35,7 +35,7 @@ extern struct list streams;
 
 extern struct data_cb sess_conn_cb;
 
-struct stream *stream_new(struct session *sess, struct task *t, enum obj_type *origin);
+struct stream *stream_new(struct session *sess, enum obj_type *origin);
 
 /* perform minimal intializations, report 0 in case of error, 1 if OK. */
 int init_stream();
index 1a8bd2c0f1054c576b2bee59e8f4b9250f5e64e9..47aef57626b05f12ff07000f20cf9252d0c04fbe 100644 (file)
@@ -1901,7 +1901,6 @@ spoe_create_appctx(struct spoe_config *conf)
 {
        struct appctx      *appctx;
        struct session     *sess;
-       struct task        *task;
        struct stream      *strm;
 
        if ((appctx = appctx_new(&spoe_applet)) == NULL)
@@ -1937,12 +1936,9 @@ spoe_create_appctx(struct spoe_config *conf)
        if (!sess)
                goto out_free_spoe;
 
-       if ((task = task_new()) == NULL)
+       if ((strm = stream_new(sess, &appctx->obj_type)) == NULL)
                goto out_free_sess;
 
-       if ((strm = stream_new(sess, task, &appctx->obj_type)) == NULL)
-               goto out_free_task;
-
        stream_set_backend(strm, conf->agent->b.be);
 
        /* applet is waiting for data */
@@ -1960,12 +1956,10 @@ spoe_create_appctx(struct spoe_config *conf)
        LIST_ADDQ(&conf->agent->applets, &SPOE_APPCTX(appctx)->list);
        conf->agent->applets_act++;
 
-       task_wakeup(task, TASK_WOKEN_INIT);
+       task_wakeup(strm->task, TASK_WOKEN_INIT);
        return appctx;
 
        /* Error unrolling */
- out_free_task:
-       task_free(task);
  out_free_sess:
        session_free(sess);
  out_free_spoe:
index 0f82425de22aad4f7d524649cbdfc9e1614127d7..594d880eda51ecd452bb39d074589ae6d5587bdb 100644 (file)
@@ -2297,7 +2297,6 @@ __LJMP static int hlua_socket_new(lua_State *L)
        struct appctx *appctx;
        struct session *sess;
        struct stream *strm;
-       struct task *task;
 
        /* Check stack size. */
        if (!lua_checkstack(L, 3)) {
@@ -2341,14 +2340,7 @@ __LJMP static int hlua_socket_new(lua_State *L)
                goto out_fail_sess;
        }
 
-       task = task_new();
-       if (!task) {
-               hlua_pusherror(L, "socket: out of memory");
-               goto out_fail_task;
-       }
-       task->nice = 0;
-
-       strm = stream_new(sess, task, &appctx->obj_type);
+       strm = stream_new(sess, &appctx->obj_type);
        if (!strm) {
                hlua_pusherror(L, "socket: out of memory");
                goto out_fail_stream;
@@ -2372,13 +2364,11 @@ __LJMP static int hlua_socket_new(lua_State *L)
        jobs++;
        totalconn++;
 
-       task_wakeup(task, TASK_WOKEN_INIT);
+       task_wakeup(strm->task, TASK_WOKEN_INIT);
        /* Return yield waiting for connection. */
        return 1;
 
  out_fail_stream:
-       task_free(task);
- out_fail_task:
        session_free(sess);
  out_fail_sess:
        appctx_free(appctx);
index 249edf7a314c0907296a9afc7b1e18e5726df9b2..d03e72fef02cfcc438998637d974d92da18ead31 100644 (file)
@@ -1784,7 +1784,6 @@ static struct appctx *peer_session_create(struct peers *peers, struct peer *peer
        struct appctx *appctx;
        struct session *sess;
        struct stream *s;
-       struct task *t;
        struct connection *conn;
 
        peer->reconnect = tick_add(now_ms, MS_TO_TICKS(5000));
@@ -1804,15 +1803,9 @@ static struct appctx *peer_session_create(struct peers *peers, struct peer *peer
                goto out_free_appctx;
        }
 
-       if ((t = task_new()) == NULL) {
-               Alert("out of memory in peer_session_create().\n");
-               goto out_free_sess;
-       }
-       t->nice = l->nice;
-
-       if ((s = stream_new(sess, t, &appctx->obj_type)) == NULL) {
+       if ((s = stream_new(sess, &appctx->obj_type)) == NULL) {
                Alert("Failed to initialize stream in peer_session_create().\n");
-               goto out_free_task;
+               goto out_free_sess;
        }
 
        /* The tasks below are normally what is supposed to be done by
@@ -1851,7 +1844,7 @@ static struct appctx *peer_session_create(struct peers *peers, struct peer *peer
        totalconn++;
 
        peer->appctx = appctx;
-       task_wakeup(t, TASK_WOKEN_INIT);
+       task_wakeup(s->task, TASK_WOKEN_INIT);
        return appctx;
 
        /* Error unrolling */
@@ -1859,8 +1852,6 @@ static struct appctx *peer_session_create(struct peers *peers, struct peer *peer
        LIST_DEL(&s->by_sess);
        LIST_DEL(&s->list);
        pool_free2(pool2_stream, s);
- out_free_task:
-       task_free(t);
  out_free_sess:
        session_free(sess);
  out_free_appctx:
index cc2a0b87a74ed7b5305191d94d54bea2e0206dd7..ea4e020a6b066d777bfe277d2d061cf73a063669 100644 (file)
@@ -109,7 +109,7 @@ int session_accept_fd(struct listener *l, int cfd, struct sockaddr_storage *addr
        struct connection *cli_conn;
        struct proxy *p = l->bind_conf->frontend;
        struct session *sess;
-       struct task *t;
+       struct stream *strm;
        int ret;
 
 
@@ -222,12 +222,6 @@ int session_accept_fd(struct listener *l, int cfd, struct sockaddr_storage *addr
        if (global.tune.client_rcvbuf)
                setsockopt(cfd, SOL_SOCKET, SO_RCVBUF, &global.tune.client_rcvbuf, sizeof(global.tune.client_rcvbuf));
 
-       if (unlikely((t = task_new()) == NULL))
-               goto out_free_sess;
-
-       t->context = sess;
-       t->nice = l->nice;
-
        /* OK, now either we have a pending handshake to execute with and
         * then we must return to the I/O layer, or we can proceed with the
         * end of the stream initialization. In case of handshake, we also
@@ -241,10 +235,18 @@ int session_accept_fd(struct listener *l, int cfd, struct sockaddr_storage *addr
         *          conn -- owner ---> task
         */
        if (cli_conn->flags & CO_FL_HANDSHAKE) {
+               struct task *t;
+
+               if (unlikely((t = task_new()) == NULL))
+                       goto out_free_sess;
+
                conn_set_owner(cli_conn, t);
                conn_set_xprt_done_cb(cli_conn, conn_complete_session);
+
+               t->context = sess;
+               t->nice    = l->nice;
                t->process = session_expire_embryonic;
-               t->expire = tick_add_ifset(now_ms, p->timeout.client);
+               t->expire  = tick_add_ifset(now_ms, p->timeout.client);
                task_queue(t);
                return 1;
        }
@@ -261,14 +263,12 @@ int session_accept_fd(struct listener *l, int cfd, struct sockaddr_storage *addr
                goto out_free_sess;
 
        session_count_new(sess);
-       if (!stream_new(sess, t, &cli_conn->obj_type))
-               goto out_free_task;
+       if ((strm = stream_new(sess, &cli_conn->obj_type)) == NULL)
+               goto out_free_sess;
 
-       task_wakeup(t, TASK_WOKEN_INIT);
+       task_wakeup(strm->task, TASK_WOKEN_INIT);
        return 1;
 
- out_free_task:
-       task_free(t);
  out_free_sess:
        p->feconn--;
        session_free(sess);
@@ -412,6 +412,7 @@ static int conn_complete_session(struct connection *conn)
 {
        struct task *task = conn->owner;
        struct session *sess = task->context;
+       struct stream *strm;
 
        conn_clear_xprt_done_cb(conn);
 
@@ -430,11 +431,14 @@ static int conn_complete_session(struct connection *conn)
                goto fail;
 
        session_count_new(sess);
-       task->process = sess->listener->handler;
-       if (!stream_new(sess, task, &conn->obj_type))
+       if ((strm = stream_new(sess, &conn->obj_type)) == NULL)
                goto fail;
 
-       task_wakeup(task, TASK_WOKEN_INIT);
+       task_wakeup(strm->task, TASK_WOKEN_INIT);
+
+       /* the embryonic session's task is not needed anymore */
+       task_delete(task);
+       task_free(task);
        return 0;
 
  fail:
index 6f7a1be64a2a69e79c8d5b11729bdbe18da7671f..8527c297fc205a7c78ab5ce9930c373a7963d599 100644 (file)
@@ -67,20 +67,22 @@ static struct list service_keywords = LIST_HEAD_INIT(service_keywords);
 
 /* This function is called from the session handler which detects the end of
  * handshake, in order to complete initialization of a valid stream. It must be
- * called with a session (which may be embryonic). It returns the pointer to
+ * called with a completley initialized session. It returns the pointer to
  * the newly created stream, or NULL in case of fatal error. The client-facing
- * end point is assigned to <origin>, which must be valid. The task's context
- * is set to the new stream, and its function is set to process_stream().
- * Target and analysers are null.
+ * end point is assigned to <origin>, which must be valid. The stream's task
+ * is configured with a nice value inherited from the listener's nice if any.
+ * The task's context is set to the new stream, and its function is set to
+ * process_stream(). Target and analysers are null.
  */
-struct stream *stream_new(struct session *sess, struct task *t, enum obj_type *origin)
+struct stream *stream_new(struct session *sess, enum obj_type *origin)
 {
        struct stream *s;
+       struct task *t;
        struct connection *conn = objt_conn(origin);
        struct appctx *appctx   = objt_appctx(origin);
 
        if (unlikely((s = pool_alloc2(pool2_stream)) == NULL))
-               return s;
+               goto out_fail_alloc;
 
        /* minimum stream initialization required for an embryonic stream is
         * fairly low. We need very little to execute L4 ACLs, then we need a
@@ -145,11 +147,16 @@ struct stream *stream_new(struct session *sess, struct task *t, enum obj_type *o
        s->flags |= SF_INITIALIZED;
        s->unique_id = NULL;
 
+       if ((t = task_new()) == NULL)
+               goto out_fail_alloc;
+
        s->task = t;
        s->pending_events = 0;
        t->process = process_stream;
        t->context = s;
        t->expire = TICK_ETERNITY;
+       if (sess->listener)
+               t->nice = sess->listener->nice;
 
        /* Note: initially, the stream's backend points to the frontend.
         * This changes later when switching rules are executed or
@@ -250,6 +257,8 @@ struct stream *stream_new(struct session *sess, struct task *t, enum obj_type *o
        /* Error unrolling */
  out_fail_accept:
        flt_stream_release(s, 0);
+       task_free(t);
+ out_fail_alloc:
        LIST_DEL(&s->by_sess);
        LIST_DEL(&s->list);
        pool_free2(pool2_stream, s);