]> git.ipfire.org Git - thirdparty/haproxy.git/commitdiff
MEDIUM: spoe: Use an ebtree to manage idle applets
authorChristopher Faulet <cfaulet@haproxy.com>
Wed, 24 Jan 2018 15:37:57 +0000 (16:37 +0100)
committerWilly Tarreau <w@1wt.eu>
Fri, 2 Feb 2018 15:00:32 +0000 (16:00 +0100)
Instead of using a list of applets with idle ones in front, we now use an
ebtree. Aapplets in the tree are idle by definition. And the key is the applet's
weight. When a new frame is queued, the first idle applet (with the lowest
weight) is woken up and its weight is increased by one. And when an applet sends
a frame to a SPOA, its weight is decremented by one.

This is empirical, but it should avoid to overuse a very few number of applets
and increase the balancing between idle applets.

include/types/spoe.h
src/flt_spoe.c

index 659dd273d1c9ebe52e5287657e98919cebc619fd..32d231146a7568239197ad4ab39095037786fe8f 100644 (file)
@@ -260,16 +260,18 @@ struct spoe_agent {
        /* running info */
        struct {
                unsigned int    frame_size;     /* current maximum frame size, only used to encode messages */
+#if defined(DEBUG_SPOE) || defined(DEBUG_FULL)
                unsigned int    applets_act;    /* # of applets alive at a time */
                unsigned int    applets_idle;   /* # of applets in the state SPOE_APPCTX_ST_IDLE */
-
+#endif
                unsigned int    processing;
                struct freq_ctr processing_per_sec;
 
                struct freq_ctr conn_per_sec;   /* connections per second */
                struct freq_ctr err_per_sec;    /* connetion errors per second */
 
-               struct list     applets;        /* List of available SPOE applets */
+               struct eb_root  idle_applets;   /* idle SPOE applets available to process data */
+               struct list     applets;        /* all SPOE applets for this agent */
                struct list     sending_queue;  /* Queue of streams waiting to send data */
                struct list     waiting_queue;  /* Queue of streams waiting for a ack, in async mode */
                __decl_hathreads(HA_SPINLOCK_T lock);
@@ -336,6 +338,7 @@ struct spoe_appctx {
        struct buffer_wait  buffer_wait;    /* position in the list of ressources waiting for a buffer */
        struct list         waiting_queue;  /* list of streams waiting for a ACK frame, in sync and pipelining mode */
        struct list         list;           /* next spoe appctx for the same agent */
+       struct eb32_node    node;           /* node used for applets tree */
        unsigned int        cur_fpa;
 
        struct {
index ae34c3b9006b2a10994172df0e97aa41657b67e9..5848fdc7b30c0c069f9f40512f0eec3ee9e28ae2 100644 (file)
 
 #if defined(DEBUG_SPOE) || defined(DEBUG_FULL)
 #define SPOE_PRINTF(x...) fprintf(x)
+#define SPOE_DEBUG_STMT(statement) statement
 #else
 #define SPOE_PRINTF(x...)
+#define SPOE_DEBUG_STMT(statement)
 #endif
 
 /* Reserved 4 bytes to the frame size. So a frame and its size can be written
@@ -1212,16 +1214,20 @@ spoe_release_appctx(struct appctx *appctx)
                    __FUNCTION__, appctx);
 
        /* Remove applet from the list of running applets */
-       agent->rt[tid].applets_act--;
+       SPOE_DEBUG_STMT(agent->rt[tid].applets_act--);
+       HA_SPIN_LOCK(SPOE_APPLET_LOCK, &agent->rt[tid].lock);
        if (!LIST_ISEMPTY(&spoe_appctx->list)) {
                LIST_DEL(&spoe_appctx->list);
                LIST_INIT(&spoe_appctx->list);
        }
+       HA_SPIN_UNLOCK(SPOE_APPLET_LOCK, &agent->rt[tid].lock);
 
        /* Shutdown the server connection, if needed */
        if (appctx->st0 != SPOE_APPCTX_ST_END) {
-               if (appctx->st0 == SPOE_APPCTX_ST_IDLE)
-                       agent->rt[tid].applets_idle--;
+               if (appctx->st0 == SPOE_APPCTX_ST_IDLE) {
+                       eb32_delete(&spoe_appctx->node);
+                       SPOE_DEBUG_STMT(agent->rt[tid].applets_idle--);
+               }
 
                appctx->st0 = SPOE_APPCTX_ST_END;
                if (spoe_appctx->status_code == SPOE_FRM_ERR_NONE)
@@ -1398,12 +1404,10 @@ spoe_handle_connecting_appctx(struct appctx *appctx)
                default:
                        /* HELLO handshake is finished, set the idle timeout and
                         * add the applet in the list of running applets. */
-                       agent->rt[tid].applets_idle++;
+                       SPOE_DEBUG_STMT(agent->rt[tid].applets_idle++);
                        appctx->st0 = SPOE_APPCTX_ST_IDLE;
-                       HA_SPIN_LOCK(SPOE_APPLET_LOCK, &agent->rt[tid].lock);
-                       LIST_DEL(&SPOE_APPCTX(appctx)->list);
-                       LIST_ADD(&agent->rt[tid].applets, &SPOE_APPCTX(appctx)->list);
-                       HA_SPIN_UNLOCK(SPOE_APPLET_LOCK, &agent->rt[tid].lock);
+                       SPOE_APPCTX(appctx)->node.key = 0;
+                       eb32_insert(&agent->rt[tid].idle_applets, &SPOE_APPCTX(appctx)->node);
 
                        /* Update runtinme agent info */
                        HA_ATOMIC_UPDATE_MIN(&agent->rt[tid].frame_size, SPOE_APPCTX(appctx)->max_frame_size);
@@ -1474,6 +1478,7 @@ spoe_handle_sending_frame_appctx(struct appctx *appctx, int *skip)
                        ctx->state = SPOE_CTX_ST_ERROR;
                        ctx->status_code = (SPOE_APPCTX(appctx)->status_code + 0x100);
                        task_wakeup(ctx->strm->task, TASK_WOKEN_MSG);
+                       *skip = 1;
                        break;
 
                case 1: /* retry */
@@ -1491,15 +1496,14 @@ spoe_handle_sending_frame_appctx(struct appctx *appctx, int *skip)
                        if (!(ctx->flags & SPOE_CTX_FL_FRAGMENTED) ||
                            (ctx->frag_ctx.flags & SPOE_FRM_FL_FIN))
                                goto no_frag_frame_sent;
-                       else {
-                               *skip = 1;
+                       else
                                goto frag_frame_sent;
-                       }
        }
        goto end;
 
   frag_frame_sent:
        appctx->st0 = SPOE_APPCTX_ST_SENDING_FRAG_NOTIFY;
+       *skip = 1;
        SPOE_APPCTX(appctx)->frag_ctx.ctx    = ctx;
        SPOE_APPCTX(appctx)->frag_ctx.cursid = ctx->stream_id;
        SPOE_APPCTX(appctx)->frag_ctx.curfid = ctx->frame_id;
@@ -1518,6 +1522,7 @@ spoe_handle_sending_frame_appctx(struct appctx *appctx, int *skip)
        }
        else {
                appctx->st0 = SPOE_APPCTX_ST_WAITING_SYNC_ACK;
+               *skip = 1;
                LIST_ADDQ(&SPOE_APPCTX(appctx)->waiting_queue, &ctx->list);
        }
        SPOE_APPCTX(appctx)->frag_ctx.ctx    = NULL;
@@ -1552,6 +1557,7 @@ spoe_handle_receiving_frame_appctx(struct appctx *appctx, int *skip)
        if (ret > 1) {
                if (*frame == SPOE_FRM_T_AGENT_DISCON) {
                        appctx->st0 = SPOE_APPCTX_ST_DISCONNECTING;
+                       ret = -1;
                        goto end;
                }
                trash.len = ret + 4;
@@ -1616,13 +1622,12 @@ spoe_handle_processing_appctx(struct appctx *appctx)
                goto next;
        }
 
-
        SPOE_PRINTF(stderr, "%d.%06d [SPOE/%-15s] %s: appctx=%p"
-                   " - process: fpa=%u/%u - appctx-state=%s - flags=0x%08x\n",
+                   " - process: fpa=%u/%u - appctx-state=%s - weight=%u - flags=0x%08x\n",
                    (int)now.tv_sec, (int)now.tv_usec, agent->id,
                    __FUNCTION__, appctx, SPOE_APPCTX(appctx)->cur_fpa,
                    agent->max_fpa, spoe_appctx_state_str[appctx->st0],
-                   SPOE_APPCTX(appctx)->flags);
+                   SPOE_APPCTX(appctx)->node.key, SPOE_APPCTX(appctx)->flags);
 
        if (appctx->st0 == SPOE_APPCTX_ST_WAITING_SYNC_ACK)
                skip_sending = 1;
@@ -1655,6 +1660,8 @@ spoe_handle_processing_appctx(struct appctx *appctx)
                                goto next;
 
                        case 0: /* ignore */
+                               if (SPOE_APPCTX(appctx)->node.key)
+                                       SPOE_APPCTX(appctx)->node.key--;
                                active_s++;
                                break;
 
@@ -1662,23 +1669,22 @@ spoe_handle_processing_appctx(struct appctx *appctx)
                                break;
 
                        default:
+                               if (SPOE_APPCTX(appctx)->node.key)
+                                       SPOE_APPCTX(appctx)->node.key--;
                                active_s++;
                                break;
                }
        }
 
        if (active_s || active_r) {
-               HA_SPIN_LOCK(SPOE_APPLET_LOCK, &agent->rt[tid].lock);
-               LIST_DEL(&SPOE_APPCTX(appctx)->list);
-               LIST_ADD(&agent->rt[tid].applets, &SPOE_APPCTX(appctx)->list);
-               HA_SPIN_UNLOCK(SPOE_APPLET_LOCK, &agent->rt[tid].lock);
-
                update_freq_ctr(&agent->rt[tid].processing_per_sec, active_s);
                SPOE_APPCTX(appctx)->task->expire = tick_add_ifset(now_ms, agent->timeout.idle);
        }
+
        if (appctx->st0 == SPOE_APPCTX_ST_PROCESSING && SPOE_APPCTX(appctx)->cur_fpa < agent->max_fpa) {
+               SPOE_DEBUG_STMT(agent->rt[tid].applets_idle++);
                appctx->st0 = SPOE_APPCTX_ST_IDLE;
-               agent->rt[tid].applets_idle++;
+               eb32_insert(&agent->rt[tid].idle_applets, &SPOE_APPCTX(appctx)->node);
        }
        return 1;
 
@@ -1847,7 +1853,8 @@ spoe_handle_appctx(struct appctx *appctx)
                                appctx->st0 = SPOE_APPCTX_ST_DISCONNECT;
                                goto switchstate;
                        }
-                       agent->rt[tid].applets_idle--;
+                       SPOE_DEBUG_STMT(agent->rt[tid].applets_idle--);
+                       eb32_delete(&SPOE_APPCTX(appctx)->node);
                        appctx->st0 = SPOE_APPCTX_ST_PROCESSING;
                        /* fall through */
 
@@ -1955,7 +1962,7 @@ spoe_create_appctx(struct spoe_config *conf)
        HA_SPIN_LOCK(SPOE_APPLET_LOCK, &conf->agent->rt[tid].lock);
        LIST_ADDQ(&conf->agent->rt[tid].applets, &SPOE_APPCTX(appctx)->list);
        HA_SPIN_UNLOCK(SPOE_APPLET_LOCK, &conf->agent->rt[tid].lock);
-       conf->agent->rt[tid].applets_act++;
+       SPOE_DEBUG_STMT(conf->agent->rt[tid].applets_act++);
 
        task_wakeup(SPOE_APPCTX(appctx)->task, TASK_WOKEN_INIT);
        task_wakeup(strm->task, TASK_WOKEN_INIT);
@@ -1983,7 +1990,7 @@ spoe_queue_context(struct spoe_context *ctx)
        struct spoe_appctx *spoe_appctx;
 
        /* Check if we need to create a new SPOE applet or not. */
-       if (agent->rt[tid].applets_idle &&
+       if (!eb_is_empty(&agent->rt[tid].idle_applets) &&
            agent->rt[tid].processing < read_freq_ctr(&agent->rt[tid].processing_per_sec))
                goto end;
 
@@ -2048,17 +2055,17 @@ spoe_queue_context(struct spoe_context *ctx)
                    ctx->strm, agent->rt[tid].applets_act, agent->rt[tid].applets_idle,
                    agent->rt[tid].processing);
 
-       /* Finally try to wakeup the first IDLE applet found and move it at the
-        * end of the list. */
-       list_for_each_entry(spoe_appctx, &agent->rt[tid].applets, list) {
-               appctx = spoe_appctx->owner;
-               if (appctx->st0 == SPOE_APPCTX_ST_IDLE) {
-                       spoe_wakeup_appctx(appctx);
-                       HA_SPIN_LOCK(SPOE_APPLET_LOCK, &agent->rt[tid].lock);
-                       LIST_DEL(&spoe_appctx->list);
-                       LIST_ADDQ(&agent->rt[tid].applets, &spoe_appctx->list);
-                       HA_SPIN_UNLOCK(SPOE_APPLET_LOCK, &agent->rt[tid].lock);
-                       break;
+       /* Finally try to wakeup an IDLE applet. */
+       if (!eb_is_empty(&agent->rt[tid].idle_applets)) {
+               struct eb32_node *node;
+
+               node = eb32_first(&agent->rt[tid].idle_applets);
+               spoe_appctx = eb32_entry(node, struct spoe_appctx, node);
+               if (node && spoe_appctx) {
+                       eb32_delete(&spoe_appctx->node);
+                       spoe_appctx->node.key++;
+                       eb32_insert(&agent->rt[tid].idle_applets, &spoe_appctx->node);
+                       spoe_wakeup_appctx(spoe_appctx->owner);
                }
        }
        return 1;
@@ -3169,7 +3176,7 @@ cfg_parse_spoe_agent(const char *file, int linenum, char **args, int kwm)
                curagent->cps_max        = 0;
                curagent->eps_max        = 0;
                curagent->max_frame_size = MAX_FRAME_SIZE;
-               curagent->max_fpa        = 100;
+               curagent->max_fpa        = 20;
 
                for (i = 0; i < SPOE_EV_EVENTS; ++i)
                        LIST_INIT(&curagent->events[i]);
@@ -3183,8 +3190,8 @@ cfg_parse_spoe_agent(const char *file, int linenum, char **args, int kwm)
                }
                for (i = 0; i < global.nbthread; ++i) {
                        curagent->rt[i].frame_size   = curagent->max_frame_size;
-                       curagent->rt[i].applets_act  = 0;
-                       curagent->rt[i].applets_idle = 0;
+                       SPOE_DEBUG_STMT(curagent->rt[i].applets_act  = 0);
+                       SPOE_DEBUG_STMT(curagent->rt[i].applets_idle = 0);
                        curagent->rt[i].processing   = 0;
                        LIST_INIT(&curagent->rt[i].applets);
                        LIST_INIT(&curagent->rt[i].sending_queue);