/* SPOE filter id. Used to identify SPOE filters */
const char *spoe_filter_id = "SPOE filter";
-/* Set if the handle on SIGUSR1 is registered */
-static int sighandler_registered = 0;
-
/* The name of the SPOE engine, used during the parsing */
char *curengine = NULL;
struct flt_ops spoe_ops;
-static int spoe_queue_context(struct spoe_context *ctx);
static int spoe_acquire_buffer(struct buffer *buf, struct buffer_wait *buffer_wait);
static void spoe_release_buffer(struct buffer *buf, struct buffer_wait *buffer_wait);
static struct appctx *spoe_create_appctx(struct spoe_config *conf);
{
struct spoe_message *msg, *msgback;
struct spoe_group *grp, *grpback;
- int i;
if (!agent)
return;
spoe_release_group(grp);
}
free(agent->engine_id);
- if (agent->rt) {
- for (i = 0; i < global.nbthread; ++i) {
- HA_SPIN_DESTROY(&agent->rt[i].lock);
- }
- }
- free(agent->rt);
free(agent);
}
* encoded bytes in the frame on success, 0 if an encoding error occurred and -1
* if a fatal error occurred. */
static int
-spoe_prepare_hanotify_frame(struct appctx *appctx, struct spoe_context *ctx,
- char *frame, size_t size)
+spoe_prepare_hanotify_frame(struct appctx *appctx, char *frame, size_t size)
{
char *p, *end;
unsigned int stream_id, frame_id;
p = frame;
end = frame+size;
- stream_id = ctx->stream_id;
- frame_id = ctx->frame_id;
+ stream_id = SPOE_APPCTX(appctx)->spoe_ctx->stream_id;
+ frame_id = SPOE_APPCTX(appctx)->spoe_ctx->frame_id;
/* Set Frame type */
*p++ = SPOE_FRM_T_HAPROXY_NOTIFY;
goto too_big;
/* Copy encoded messages, if possible */
- sz = b_data(&ctx->buffer);
+ sz = b_data(&SPOE_APPCTX(appctx)->spoe_ctx->buffer);
if (p + sz >= end)
goto too_big;
- memcpy(p, b_head(&ctx->buffer), sz);
+ memcpy(p, b_head(&SPOE_APPCTX(appctx)->spoe_ctx->buffer), sz);
p += sz;
return (p - frame);
/* Decode ACK frame sent by an agent. It returns the number of read bytes on
* success, 0 if the frame can be ignored and -1 if an error occurred. */
static int
-spoe_handle_agentack_frame(struct appctx *appctx, struct spoe_context **ctx,
- char *frame, size_t size)
+spoe_handle_agentack_frame(struct appctx *appctx, char *frame, size_t size)
{
char *p, *end;
uint64_t stream_id, frame_id;
p = frame;
end = frame + size;
- *ctx = NULL;
/* Check frame type */
if (*p++ != SPOE_FRM_T_AGENT_ACK) {
SPOE_APPCTX(appctx)->status_code = SPOE_FRM_ERR_INVALID;
- return 0;
+ return -1;
}
if (size < 7 /* TYPE + METADATA */) {
SPOE_APPCTX(appctx)->status_code = SPOE_FRM_ERR_INVALID;
- return 0;
+ return -1;
}
/* Retrieve flags */
/* Get the stream-id and the frame-id */
if (decode_varint(&p, end, &stream_id) == -1) {
SPOE_APPCTX(appctx)->status_code = SPOE_FRM_ERR_INVALID;
- return 0;
+ return -1;
}
if (decode_varint(&p, end, &frame_id) == -1) {
SPOE_APPCTX(appctx)->status_code = SPOE_FRM_ERR_INVALID;
- return 0;
+ return -1;
}
- list_for_each_entry((*ctx), &SPOE_APPCTX(appctx)->waiting_queue, list) {
- if ((*ctx)->stream_id == (unsigned int)stream_id &&
- (*ctx)->frame_id == (unsigned int)frame_id)
+ if (SPOE_APPCTX(appctx)->spoe_ctx->stream_id == (unsigned int)stream_id &&
+ SPOE_APPCTX(appctx)->spoe_ctx->frame_id == (unsigned int)frame_id)
goto found;
- }
/* No Stream found, ignore the frame */
SPOE_APPCTX(appctx)->status_code = SPOE_FRM_ERR_FRAMEID_NOTFOUND;
- if (appctx->st0 == SPOE_APPCTX_ST_WAITING_SYNC_ACK) {
- /* Report an error if we are waiting the ack for another frame,
- * but not if there is no longer frame waiting for a ack
- * (timeout)
- */
- if (!LIST_ISEMPTY(&SPOE_APPCTX(appctx)->waiting_queue))
- return -1;
- appctx->st0 = SPOE_APPCTX_ST_PROCESSING;
- SPOE_APPCTX(appctx)->cur_fpa = 0;
- }
- return 0;
+ return -1;
found:
if (!spoe_acquire_buffer(&SPOE_APPCTX(appctx)->buffer,
&SPOE_APPCTX(appctx)->buffer_wait)) {
- *ctx = NULL;
return 1; /* Retry later */
}
b_set_data(&SPOE_APPCTX(appctx)->buffer, len);
p += len;
- /* Transfer the buffer ownership to the SPOE context */
- (*ctx)->buffer = SPOE_APPCTX(appctx)->buffer;
- SPOE_APPCTX(appctx)->buffer = BUF_NULL;
-
- (*ctx)->state = SPOE_CTX_ST_DONE;
-
end:
return (p - frame);
}
s->do_log = NULL;
s->scb->flags |= SC_FL_RCV_ONCE;
- HA_SPIN_LOCK(SPOE_APPLET_LOCK, &agent->rt[tid].lock);
- LIST_APPEND(&agent->rt[tid].applets, &spoe_appctx->list);
- HA_SPIN_UNLOCK(SPOE_APPLET_LOCK, &agent->rt[tid].lock);
- _HA_ATOMIC_INC(&agent->counters.applets);
-
appctx->st0 = SPOE_APPCTX_ST_CONNECT;
task_wakeup(spoe_appctx->task, TASK_WOKEN_INIT);
return 0;
spoe_release_appctx(struct appctx *appctx)
{
struct spoe_appctx *spoe_appctx = SPOE_APPCTX(appctx);
- struct spoe_agent *agent;
- struct spoe_context *ctx, *back;
if (spoe_appctx == NULL)
return;
- appctx->svcctx = NULL;
- agent = spoe_appctx->agent;
-
- /* Remove applet from the list of running applets */
- _HA_ATOMIC_DEC(&agent->counters.applets);
- HA_SPIN_LOCK(SPOE_APPLET_LOCK, &agent->rt[tid].lock);
- if (!LIST_ISEMPTY(&spoe_appctx->list)) {
- LIST_DELETE(&spoe_appctx->list);
- LIST_INIT(&spoe_appctx->list);
- }
- HA_SPIN_UNLOCK(SPOE_APPLET_LOCK, &agent->rt[tid].lock);
+ appctx->svcctx = NULL;
/* Shutdown the server connection, if needed */
if (appctx->st0 != SPOE_APPCTX_ST_END) {
- if (appctx->st0 == SPOE_APPCTX_ST_IDLE) {
- eb32_delete(&spoe_appctx->node);
- _HA_ATOMIC_DEC(&agent->counters.idles);
- agent->rt[tid].idles--;
- }
-
appctx->st0 = SPOE_APPCTX_ST_END;
if (spoe_appctx->status_code == SPOE_FRM_ERR_NONE)
spoe_appctx->status_code = SPOE_FRM_ERR_IO;
/* Destroy the task attached to this applet */
task_destroy(spoe_appctx->task);
- /* Report an error to all streams in the appctx waiting queue */
- list_for_each_entry_safe(ctx, back, &spoe_appctx->waiting_queue, list) {
- LIST_DELETE(&ctx->list);
- LIST_INIT(&ctx->list);
- _HA_ATOMIC_DEC(&agent->counters.nb_waiting);
- spoe_update_stat_time(&ctx->stats.wait_ts, &ctx->stats.t_waiting);
- ctx->spoe_appctx = NULL;
- ctx->state = SPOE_CTX_ST_ERROR;
- ctx->status_code = (spoe_appctx->status_code + 0x100);
- task_wakeup(ctx->strm->task, TASK_WOKEN_MSG);
- }
-
- if (LIST_ISEMPTY(&agent->rt[tid].applets)) {
- /* It is the last running applet and the sending queue is not empty.
- * So try to start a new applet if HAproxy is not stopping.
- */
- if (!stopping && !LIST_ISEMPTY(&agent->rt[tid].sending_queue) && spoe_create_appctx(agent->spoe_conf))
- goto end;
-
- /* Otherwise, report an error to all streams in the sending queue.
- */
- list_for_each_entry_safe(ctx, back, &agent->rt[tid].sending_queue, list) {
- LIST_DELETE(&ctx->list);
- LIST_INIT(&ctx->list);
- _HA_ATOMIC_DEC(&agent->counters.nb_sending);
- spoe_update_stat_time(&ctx->stats.queue_ts, &ctx->stats.t_queue);
- ctx->spoe_appctx = NULL;
- ctx->state = SPOE_CTX_ST_ERROR;
- ctx->status_code = (spoe_appctx->status_code + 0x100);
- task_wakeup(ctx->strm->task, TASK_WOKEN_MSG);
- }
+ if (spoe_appctx->spoe_ctx) {
+ /* Report an error to stream */
+ spoe_appctx->spoe_ctx->spoe_appctx = NULL;
+ spoe_appctx->spoe_ctx->state = SPOE_CTX_ST_ERROR;
+ spoe_appctx->spoe_ctx->status_code = (spoe_appctx->status_code + 0x100);
+ task_wakeup(spoe_appctx->spoe_ctx->strm->task, TASK_WOKEN_MSG);
}
end:
spoe_release_buffer(&spoe_appctx->buffer,
&spoe_appctx->buffer_wait);
pool_free(pool_head_spoe_appctx, spoe_appctx);
-
- /* Update runtinme agent info */
- agent->rt[tid].frame_size = agent->max_frame_size;
- list_for_each_entry(spoe_appctx, &agent->rt[tid].applets, list)
- HA_ATOMIC_UPDATE_MIN(&agent->rt[tid].frame_size, spoe_appctx->max_frame_size);
}
static int
spoe_handle_connect_appctx(struct appctx *appctx)
{
- struct stconn *sc = appctx_sc(appctx);
- struct spoe_agent *agent = SPOE_APPCTX(appctx)->agent;
+ struct stconn *sc = appctx_sc(appctx);
char *frame, *buf;
int ret;
goto exit;
}
- if (SPOE_APPCTX(appctx)->task->expire == TICK_ETERNITY)
- SPOE_APPCTX(appctx)->task->expire =
- tick_add_ifset(now_ms, agent->timeout.hello);
-
/* 4 bytes are reserved at the beginning of <buf> to store the frame
* length. */
buf = trash.area; frame = buf+4;
static int
spoe_handle_connecting_appctx(struct appctx *appctx)
{
- struct stconn *sc = appctx_sc(appctx);
- struct spoe_agent *agent = SPOE_APPCTX(appctx)->agent;
+ struct stconn *sc = appctx_sc(appctx);
char *frame;
int ret;
goto stop;
default:
- _HA_ATOMIC_INC(&agent->counters.idles);
- agent->rt[tid].idles++;
appctx->st0 = SPOE_APPCTX_ST_IDLE;
- SPOE_APPCTX(appctx)->node.key = 0;
- eb32_insert(&agent->rt[tid].idle_applets, &SPOE_APPCTX(appctx)->node);
-
- /* Update runtinme agent info */
- HA_ATOMIC_UPDATE_MIN(&agent->rt[tid].frame_size, SPOE_APPCTX(appctx)->max_frame_size);
goto next;
}
if (trash.data)
co_skip(sc_oc(sc), trash.data);
- SPOE_APPCTX(appctx)->task->expire =
- tick_add_ifset(now_ms, agent->timeout.idle);
return 0;
stop:
return 1;
static int
-spoe_handle_sending_frame_appctx(struct appctx *appctx, int *skip)
+spoe_handle_sending_frame_appctx(struct appctx *appctx)
{
- struct spoe_agent *agent = SPOE_APPCTX(appctx)->agent;
- struct spoe_context *ctx = NULL;
char *frame, *buf;
int ret;
* length. */
buf = trash.area; frame = buf+4;
- if (LIST_ISEMPTY(&agent->rt[tid].sending_queue)) {
- *skip = 1;
- ret = 1;
+ if (!SPOE_APPCTX(appctx)->spoe_ctx) {
+ appctx->st0 = SPOE_APPCTX_ST_DISCONNECT;
+ ret = -1;
goto end;
}
- else {
- ctx = LIST_NEXT(&agent->rt[tid].sending_queue, typeof(ctx), list);
- ret = spoe_prepare_hanotify_frame(appctx, ctx, frame,
- SPOE_APPCTX(appctx)->max_frame_size);
-
- }
-
+ ret = spoe_prepare_hanotify_frame(appctx, frame, SPOE_APPCTX(appctx)->max_frame_size);
if (ret > 1)
ret = spoe_send_frame(appctx, buf, ret);
switch (ret) {
case -1: /* error */
- appctx->st0 = SPOE_APPCTX_ST_DISCONNECT;
- goto end;
-
case 0: /* ignore */
- spoe_release_buffer(&ctx->buffer, &ctx->buffer_wait);
- LIST_DELETE(&ctx->list);
- LIST_INIT(&ctx->list);
- _HA_ATOMIC_DEC(&agent->counters.nb_sending);
- spoe_update_stat_time(&ctx->stats.queue_ts, &ctx->stats.t_queue);
- ctx->spoe_appctx = NULL;
- ctx->state = SPOE_CTX_ST_ERROR;
- ctx->status_code = (SPOE_APPCTX(appctx)->status_code + 0x100);
- task_wakeup(ctx->strm->task, TASK_WOKEN_MSG);
- *skip = 1;
+ spoe_release_buffer(&SPOE_APPCTX(appctx)->spoe_ctx->buffer, &SPOE_APPCTX(appctx)->spoe_ctx->buffer_wait);
+ SPOE_APPCTX(appctx)->spoe_ctx->spoe_appctx = NULL;
+ SPOE_APPCTX(appctx)->spoe_ctx->state = SPOE_CTX_ST_ERROR;
+ SPOE_APPCTX(appctx)->spoe_ctx->status_code = (SPOE_APPCTX(appctx)->status_code + 0x100);
+ task_wakeup(SPOE_APPCTX(appctx)->spoe_ctx->strm->task, TASK_WOKEN_MSG);
+ SPOE_APPCTX(appctx)->spoe_ctx = NULL;
+ appctx->st0 = SPOE_APPCTX_ST_DISCONNECT;
break;
case 1: /* retry */
- *skip = 1;
break;
default:
- spoe_release_buffer(&ctx->buffer, &ctx->buffer_wait);
- LIST_DELETE(&ctx->list);
- LIST_INIT(&ctx->list);
- _HA_ATOMIC_DEC(&agent->counters.nb_sending);
- spoe_update_stat_time(&ctx->stats.queue_ts, &ctx->stats.t_queue);
- ctx->spoe_appctx = SPOE_APPCTX(appctx);
- goto frame_sent;
- }
- goto end;
-
- frame_sent:
- if (SPOE_APPCTX(appctx)->flags & SPOE_APPCTX_FL_PIPELINING) {
- appctx->st0 = SPOE_APPCTX_ST_PROCESSING;
- LIST_APPEND(&SPOE_APPCTX(appctx)->waiting_queue, &ctx->list);
- }
- else {
- appctx->st0 = SPOE_APPCTX_ST_WAITING_SYNC_ACK;
- *skip = 1;
- LIST_APPEND(&SPOE_APPCTX(appctx)->waiting_queue, &ctx->list);
+ spoe_release_buffer(&SPOE_APPCTX(appctx)->spoe_ctx->buffer, &SPOE_APPCTX(appctx)->spoe_ctx->buffer_wait);
+ SPOE_APPCTX(appctx)->spoe_ctx->state = SPOE_CTX_ST_WAITING_ACK;
+ appctx->st0 = SPOE_APPCTX_ST_WAITING_SYNC_ACK;
+ break;
}
- _HA_ATOMIC_INC(&agent->counters.nb_waiting);
- ctx->stats.wait_ts = now_ns;
- SPOE_APPCTX(appctx)->cur_fpa++;
-
- ctx->state = SPOE_CTX_ST_WAITING_ACK;
end:
return ret;
}
static int
-spoe_handle_receiving_frame_appctx(struct appctx *appctx, int *skip)
+spoe_handle_receiving_frame_appctx(struct appctx *appctx)
{
- struct spoe_agent *agent = SPOE_APPCTX(appctx)->agent;
- struct spoe_context *ctx = NULL;
char *frame;
int ret;
goto end;
}
trash.data = ret + 4;
- ret = spoe_handle_agentack_frame(appctx, &ctx, frame, ret);
+ if (appctx->st0 == SPOE_APPCTX_ST_WAITING_SYNC_ACK)
+ ret = spoe_handle_agentack_frame(appctx, frame, ret);
+ else {
+ /* ignore frame by default except if we are waiting for a ack */
+ ret = 0;
+ }
}
switch (ret) {
case -1: /* error */
+ spoe_release_buffer(&SPOE_APPCTX(appctx)->spoe_ctx->buffer, &SPOE_APPCTX(appctx)->spoe_ctx->buffer_wait);
+ SPOE_APPCTX(appctx)->spoe_ctx->spoe_appctx = NULL;
+ SPOE_APPCTX(appctx)->spoe_ctx->state = SPOE_CTX_ST_ERROR;
+ SPOE_APPCTX(appctx)->spoe_ctx->status_code = (SPOE_APPCTX(appctx)->status_code + 0x100);
+ task_wakeup(SPOE_APPCTX(appctx)->spoe_ctx->strm->task, TASK_WOKEN_MSG);
+ SPOE_APPCTX(appctx)->spoe_ctx = NULL;
appctx->st0 = SPOE_APPCTX_ST_DISCONNECT;
break;
break;
case 1: /* retry */
- *skip = 1;
- break;
+ goto end;
default:
- LIST_DELETE(&ctx->list);
- LIST_INIT(&ctx->list);
- _HA_ATOMIC_DEC(&agent->counters.nb_waiting);
- spoe_update_stat_time(&ctx->stats.wait_ts, &ctx->stats.t_waiting);
- ctx->stats.response_ts = now_ns;
- if (ctx->spoe_appctx) {
- ctx->spoe_appctx->cur_fpa--;
- ctx->spoe_appctx = NULL;
- }
- if (appctx->st0 == SPOE_APPCTX_ST_WAITING_SYNC_ACK)
- appctx->st0 = SPOE_APPCTX_ST_PROCESSING;
- task_wakeup(ctx->strm->task, TASK_WOKEN_MSG);
- break;
+ SPOE_APPCTX(appctx)->spoe_ctx->buffer = SPOE_APPCTX(appctx)->buffer;
+ SPOE_APPCTX(appctx)->buffer = BUF_NULL;
+ SPOE_APPCTX(appctx)->spoe_ctx->state = SPOE_CTX_ST_DONE;
+ SPOE_APPCTX(appctx)->spoe_ctx->spoe_appctx = NULL;
+ task_wakeup(SPOE_APPCTX(appctx)->spoe_ctx->strm->task, TASK_WOKEN_MSG);
+ SPOE_APPCTX(appctx)->spoe_ctx = NULL;
+ appctx->st0 = SPOE_APPCTX_ST_DISCONNECT;
+ ret = -1;
}
/* Do not forget to remove processed frame from the output buffer */
static int
spoe_handle_processing_appctx(struct appctx *appctx)
{
- struct stconn *sc = appctx_sc(appctx);
- struct server *srv = objt_server(__sc_strm(sc)->target);
- struct spoe_agent *agent = SPOE_APPCTX(appctx)->agent;
- int ret, skip_sending = 0, skip_receiving = 0, active_s = 0, active_r = 0, close_asap = 0;
-
- if (appctx->st1 == SPOE_APPCTX_ERR_TOUT) {
- SPOE_APPCTX(appctx)->status_code = SPOE_FRM_ERR_TOUT;
- appctx->st0 = SPOE_APPCTX_ST_DISCONNECT;
- appctx->st1 = SPOE_APPCTX_ERR_NONE;
- goto next;
- }
-
- /* Close the applet ASAP because some sessions are waiting for a free
- * connection slot. It is only an issue in multithreaded mode.
- */
- close_asap = (global.nbthread > 1 &&
- (agent->b.be->queue.length ||
- (srv && (srv->queue.length || (srv->maxconn && srv->served >= srv_dynamic_maxconn(srv))))));
+ int ret;
/* receiving_frame loop */
- while (!skip_receiving) {
- ret = spoe_handle_receiving_frame_appctx(appctx, &skip_receiving);
+ while (1) {
+ ret = spoe_handle_receiving_frame_appctx(appctx);
switch (ret) {
case -1: /* error */
goto next;
-
case 0: /* ignore */
- active_r = 1;
break;
-
case 1: /* retry */
- break;
-
+ goto send;
default:
- active_r = 1;
break;
}
}
- /* Don"t try to send new frame we are waiting for at lease a ack, in
- * sync mode or if applet must be closed ASAP
- */
- if (appctx->st0 == SPOE_APPCTX_ST_WAITING_SYNC_ACK || (close_asap && SPOE_APPCTX(appctx)->cur_fpa))
- skip_sending = 1;
-
- /* send_frame loop */
- while (!skip_sending && SPOE_APPCTX(appctx)->cur_fpa < agent->max_fpa) {
- ret = spoe_handle_sending_frame_appctx(appctx, &skip_sending);
- switch (ret) {
- case -1: /* error */
- goto next;
-
- case 0: /* ignore */
- if (SPOE_APPCTX(appctx)->node.key)
- SPOE_APPCTX(appctx)->node.key--;
- active_s++;
- break;
+ send:
+ if (appctx->st0 == SPOE_APPCTX_ST_WAITING_SYNC_ACK)
+ goto end;
- case 1: /* retry */
- break;
+ ret = spoe_handle_sending_frame_appctx(appctx);
+ switch (ret) {
+ case -1: /* error */
+ goto next;
- default:
- if (SPOE_APPCTX(appctx)->node.key)
- SPOE_APPCTX(appctx)->node.key--;
- active_s++;
- break;
- }
+ case 0: /* ignore */
+ break;
- /* if applet must be close ASAP, don't send more than a frame */
- if (close_asap)
+ case 1: /* retry */
break;
- }
- if (active_s || active_r) {
- update_freq_ctr(&agent->rt[tid].processing_per_sec, active_s);
- SPOE_APPCTX(appctx)->task->expire = tick_add_ifset(now_ms, agent->timeout.idle);
- }
+ default:
- if (appctx->st0 == SPOE_APPCTX_ST_PROCESSING && SPOE_APPCTX(appctx)->cur_fpa < agent->max_fpa) {
- /* If applet must be closed, don't switch it in IDLE state and
- * close it when the last waiting frame is acknowledged.
- */
- if (close_asap) {
- if (SPOE_APPCTX(appctx)->cur_fpa)
- goto out;
- SPOE_APPCTX(appctx)->status_code = SPOE_FRM_ERR_NONE;
- appctx->st0 = SPOE_APPCTX_ST_DISCONNECT;
- appctx->st1 = SPOE_APPCTX_ERR_NONE;
- goto next;
- }
- _HA_ATOMIC_INC(&agent->counters.idles);
- agent->rt[tid].idles++;
- appctx->st0 = SPOE_APPCTX_ST_IDLE;
- eb32_insert(&agent->rt[tid].idle_applets, &SPOE_APPCTX(appctx)->node);
+ break;
}
-
- out:
+ end:
return 1;
-
next:
- SPOE_APPCTX(appctx)->task->expire = tick_add_ifset(now_ms, agent->timeout.idle);
return 0;
}
static int
spoe_handle_disconnect_appctx(struct appctx *appctx)
{
- struct spoe_agent *agent = SPOE_APPCTX(appctx)->agent;
char *frame, *buf;
int ret;
}
next:
- SPOE_APPCTX(appctx)->task->expire =
- tick_add_ifset(now_ms, agent->timeout.idle);
return 0;
stop:
return 1;
spoe_handle_appctx(struct appctx *appctx)
{
struct stconn *sc = appctx_sc(appctx);
- struct spoe_agent *agent;
if (SPOE_APPCTX(appctx) == NULL)
return;
}
SPOE_APPCTX(appctx)->status_code = SPOE_FRM_ERR_NONE;
- agent = SPOE_APPCTX(appctx)->agent;
+ if (!SPOE_APPCTX(appctx)->spoe_ctx) {
+ if (appctx->st0 == SPOE_APPCTX_ST_CONNECT)
+ appctx->st0 = SPOE_APPCTX_ST_EXIT;
+ else if (appctx->st0 < SPOE_APPCTX_ST_DISCONNECT)
+ appctx->st0 = SPOE_APPCTX_ST_DISCONNECT;
+ }
switchstate:
switch (appctx->st0) {
goto switchstate;
case SPOE_APPCTX_ST_IDLE:
- _HA_ATOMIC_DEC(&agent->counters.idles);
- agent->rt[tid].idles--;
- eb32_delete(&SPOE_APPCTX(appctx)->node);
- if (stopping &&
- LIST_ISEMPTY(&agent->rt[tid].sending_queue) &&
- LIST_ISEMPTY(&SPOE_APPCTX(appctx)->waiting_queue)) {
- SPOE_APPCTX(appctx)->task->expire =
- tick_add_ifset(now_ms, agent->timeout.idle);
- appctx->st0 = SPOE_APPCTX_ST_DISCONNECT;
- goto switchstate;
- }
appctx->st0 = SPOE_APPCTX_ST_PROCESSING;
__fallthrough;
struct spoe_appctx *spoe_appctx;
struct appctx *appctx;
- /* Do not try to create a new applet if there is no server up for the
- * agent's backend. */
- if (!agent->b.be->srv_act && !agent->b.be->srv_bck)
- goto out;
-
- /* Do not try to create a new applet if we have reached the maximum of
- * connection per seconds */
- if (agent->cps_max > 0) {
- if (!freq_ctr_remain(&agent->rt[tid].conn_per_sec, agent->cps_max, 0))
- goto out;
- }
-
spoe_appctx = pool_zalloc(pool_head_spoe_appctx);
if (spoe_appctx == NULL)
goto out_error;
spoe_appctx->flags = 0;
spoe_appctx->status_code = SPOE_FRM_ERR_NONE;
spoe_appctx->buffer = BUF_NULL;
- spoe_appctx->cur_fpa = 0;
- LIST_INIT(&spoe_appctx->list);
- LIST_INIT(&spoe_appctx->waiting_queue);
if ((appctx = appctx_new_here(&spoe_applet, NULL)) == NULL)
if (appctx_init(appctx) == -1)
goto out_free_appctx;
- /* Increase the per-process number of cumulated connections */
- if (agent->cps_max > 0)
- update_freq_ctr(&agent->rt[tid].conn_per_sec, 1);
-
appctx_wakeup(appctx);
return appctx;
return NULL;
}
-static int
-spoe_queue_context(struct spoe_context *ctx)
-{
- struct spoe_config *conf = FLT_CONF(ctx->filter);
- struct spoe_agent *agent = conf->agent;
- struct spoe_appctx *spoe_appctx;
-
- /* Check if we need to create a new SPOE applet or not. */
- if (!LIST_ISEMPTY(&agent->rt[tid].applets) &&
- (agent->rt[tid].processing < agent->rt[tid].idles ||
- agent->rt[tid].processing < read_freq_ctr(&agent->rt[tid].processing_per_sec)))
- goto end;
-
- spoe_create_appctx(conf);
-
- end:
- /* The only reason to return an error is when there is no applet */
- if (LIST_ISEMPTY(&agent->rt[tid].applets)) {
- ctx->status_code = SPOE_CTX_ERR_RES;
- return -1;
- }
-
- /* Add the SPOE context in the sending queue if the stream has no applet
- * already assigned and wakeup all idle applets. Otherwise, don't queue
- * it. */
- _HA_ATOMIC_INC(&agent->counters.nb_sending);
- spoe_update_stat_time(&ctx->stats.request_ts, &ctx->stats.t_request);
- ctx->stats.queue_ts = now_ns;
- if (ctx->spoe_appctx)
- return 1;
- LIST_APPEND(&agent->rt[tid].sending_queue, &ctx->list);
-
- /* Finally try to wakeup an IDLE applet. */
- if (!eb_is_empty(&agent->rt[tid].idle_applets)) {
- struct eb32_node *node;
-
- node = eb32_first(&agent->rt[tid].idle_applets);
- spoe_appctx = eb32_entry(node, struct spoe_appctx, node);
- if (node && spoe_appctx) {
- eb32_delete(&spoe_appctx->node);
- spoe_appctx->node.key++;
- eb32_insert(&agent->rt[tid].idle_applets, &spoe_appctx->node);
- spoe_wakeup_appctx(spoe_appctx->owner);
- }
- }
- return 1;
-}
-
/***************************************************************************
* Functions that encode SPOE messages
**************************************************************************/
char *p, *end;
p = b_head(&ctx->buffer);
- end = p + agent->rt[tid].frame_size - FRAME_HDR_SIZE;
+ end = p + agent->max_frame_size - FRAME_HDR_SIZE;
if (type == SPOE_MSGS_BY_EVENT) { /* Loop on messages by event */
list_for_each_entry(msg, messages, by_evt) {
if (ctx->stats.start_ts != 0) {
spoe_update_stat_time(&ctx->stats.start_ts, &ctx->stats.t_process);
ctx->stats.t_total += ctx->stats.t_process;
- ctx->stats.request_ts = 0;
- ctx->stats.queue_ts = 0;
- ctx->stats.wait_ts = 0;
- ctx->stats.response_ts = 0;
}
if (agent->var_t_process) {
spoe_handle_processing_error(struct stream *s, struct spoe_agent *agent,
struct spoe_context *ctx, int dir)
{
- if (agent->eps_max > 0)
- update_freq_ctr(&agent->rt[tid].err_per_sec, 1);
-
if (agent->var_on_error) {
struct sample smp;
if (ctx->flags & SPOE_CTX_FL_PROCESS)
return 0;
- agent->rt[tid].processing++;
ctx->stats.start_ts = now_ns;
- ctx->stats.request_ts = now_ns;
- ctx->stats.t_request = -1;
- ctx->stats.t_queue = -1;
- ctx->stats.t_waiting = -1;
- ctx->stats.t_response = -1;
ctx->stats.t_process = -1;
ctx->status_code = 0;
if (!(ctx->flags & SPOE_CTX_FL_PROCESS))
return;
_HA_ATOMIC_INC(&agent->counters.nb_processed);
- if (sa)
- sa->cur_fpa--;
+ if (sa) {
+ sa->spoe_ctx = NULL;
+ spoe_wakeup_appctx(sa->owner);
+ }
/* Reset the flag to allow next processing */
- agent->rt[tid].processing--;
ctx->flags &= ~SPOE_CTX_FL_PROCESS;
/* Reset processing timer */
spoe_release_buffer(&ctx->buffer, &ctx->buffer_wait);
ctx->spoe_appctx = NULL;
-
- if (!LIST_ISEMPTY(&ctx->list)) {
- if (ctx->state == SPOE_CTX_ST_SENDING_MSGS)
- _HA_ATOMIC_DEC(&agent->counters.nb_sending);
- else
- _HA_ATOMIC_DEC(&agent->counters.nb_waiting);
-
- LIST_DELETE(&ctx->list);
- LIST_INIT(&ctx->list);
- }
}
/* Process a list of SPOE messages. First, this functions will process messages
}
if (ctx->state == SPOE_CTX_ST_READY) {
- if (agent->eps_max > 0) {
- if (!freq_ctr_remain(&agent->rt[tid].err_per_sec, agent->eps_max, 0))
- goto skip;
- }
-
if (!tick_isset(ctx->process_exp)) {
ctx->process_exp = tick_add_ifset(now_ms, agent->timeout.processing);
if (dir == SMP_OPT_DIR_REQ)
}
if (ctx->state == SPOE_CTX_ST_ENCODING_MSGS) {
- if (ctx->stats.request_ts == 0)
- ctx->stats.request_ts = now_ns;
+ struct appctx *appctx;
+
if (!spoe_acquire_buffer(&ctx->buffer, &ctx->buffer_wait))
goto out;
ret = spoe_encode_messages(s, ctx, messages, dir, type);
goto end;
if (!ret)
goto skip;
- if (spoe_queue_context(ctx) < 0)
+ appctx = spoe_create_appctx(conf);
+ if (!appctx) {
+ ctx->status_code = SPOE_CTX_ERR_RES;
goto end;
+ }
+ ctx->spoe_appctx = SPOE_APPCTX(appctx);
+ ctx->spoe_appctx->spoe_ctx = ctx;
ctx->state = SPOE_CTX_ST_SENDING_MSGS;
}
ret = 1;
ctx->frame_id++;
ctx->state = SPOE_CTX_ST_READY;
- spoe_update_stat_time(&ctx->stats.response_ts, &ctx->stats.t_response);
goto end;
}
if (ret && ctx->stats.t_process != -1) {
if (ctx->status_code || !(conf->agent_fe.options2 & PR_O2_NOLOGNORM))
send_log(&conf->agent_fe, (!ctx->status_code ? LOG_NOTICE : LOG_WARNING),
- "SPOE: [%s] <GROUP:%s> sid=%u st=%u %ld/%ld/%ld/%ld/%ld %u/%u %u/%u %llu/%llu\n",
- agent->id, group->id, s->uniq_id, ctx->status_code,
- ctx->stats.t_request, ctx->stats.t_queue, ctx->stats.t_waiting,
- ctx->stats.t_response, ctx->stats.t_process,
- agent->counters.idles, agent->counters.applets,
- agent->counters.nb_sending, agent->counters.nb_waiting,
+ "SPOE: [%s] <GROUP:%s> sid=%u st=%u %ld %llu/%llu\n",
+ agent->id, group->id, s->uniq_id, ctx->status_code, ctx->stats.t_process,
agent->counters.nb_errors, agent->counters.nb_processed);
}
return ret;
if (ret && ctx->stats.t_process != -1) {
if (ctx->status_code || !(conf->agent_fe.options2 & PR_O2_NOLOGNORM))
send_log(&conf->agent_fe, (!ctx->status_code ? LOG_NOTICE : LOG_WARNING),
- "SPOE: [%s] <EVENT:%s> sid=%u st=%u %ld/%ld/%ld/%ld/%ld %u/%u %u/%u %llu/%llu\n",
- agent->id, spoe_event_str[ev], s->uniq_id, ctx->status_code,
- ctx->stats.t_request, ctx->stats.t_queue, ctx->stats.t_waiting,
- ctx->stats.t_response, ctx->stats.t_process,
- agent->counters.idles, agent->counters.applets,
- agent->counters.nb_sending, agent->counters.nb_waiting,
+ "SPOE: [%s] <EVENT:%s> sid=%u st=%u %ld %llu/%llu\n",
+ agent->id, spoe_event_str[ev], s->uniq_id, ctx->status_code, ctx->stats.t_process,
agent->counters.nb_errors, agent->counters.nb_processed);
}
return ret;
LIST_INIT(&ctx->buffer_wait.list);
ctx->buffer_wait.target = ctx;
ctx->buffer_wait.wakeup_cb = (int (*)(void *))spoe_wakeup_context;
- LIST_INIT(&ctx->list);
ctx->stream_id = 0;
ctx->frame_id = 1;
ctx->process_exp = TICK_ETERNITY;
ctx->stats.start_ts = 0;
- ctx->stats.request_ts = 0;
- ctx->stats.queue_ts = 0;
- ctx->stats.wait_ts = 0;
- ctx->stats.response_ts= 0;
- ctx->stats.t_request = -1;
- ctx->stats.t_queue = -1;
- ctx->stats.t_waiting = -1;
- ctx->stats.t_response = -1;
ctx->stats.t_process = -1;
ctx->stats.t_total = 0;
ctx->flags &= ~SPOE_CTX_FL_PROCESS;
ctx->stats.start_ts = 0;
- ctx->stats.request_ts = 0;
- ctx->stats.queue_ts = 0;
- ctx->stats.wait_ts = 0;
- ctx->stats.response_ts= 0;
- ctx->stats.t_request = -1;
- ctx->stats.t_queue = -1;
- ctx->stats.t_waiting = -1;
- ctx->stats.t_response = -1;
ctx->stats.t_process = -1;
ctx->stats.t_total = 0;
}
/***************************************************************************
* Hooks that manage the filter lifecycle (init/check/deinit)
**************************************************************************/
-/* Signal handler: Do a soft stop, wakeup SPOE applet */
-static void
-spoe_sig_stop(struct sig_handler *sh)
-{
- struct proxy *p;
-
- p = proxies_list;
- while (p) {
- struct flt_conf *fconf;
-
- /* SPOE filter are not initialized for disabled proxoes. Move to
- * the next one
- */
- if (p->flags & PR_FL_DISABLED) {
- p = p->next;
- continue;
- }
-
- list_for_each_entry(fconf, &p->filter_configs, list) {
- struct spoe_config *conf;
- struct spoe_agent *agent;
- struct spoe_appctx *spoe_appctx;
- int i;
-
- if (fconf->id != spoe_filter_id)
- continue;
-
- conf = fconf->conf;
- agent = conf->agent;
-
- for (i = 0; i < global.nbthread; ++i) {
- HA_SPIN_LOCK(SPOE_APPLET_LOCK, &agent->rt[i].lock);
- list_for_each_entry(spoe_appctx, &agent->rt[i].applets, list)
- spoe_wakeup_appctx(spoe_appctx->owner);
- HA_SPIN_UNLOCK(SPOE_APPLET_LOCK, &agent->rt[i].lock);
- }
- }
- p = p->next;
- }
-}
-
-
/* Initialize the SPOE filter. Returns -1 on error, else 0. */
static int
spoe_init(struct proxy *px, struct flt_conf *fconf)
if (conf->agent->engine_id == NULL)
return -1;
-
- if (!sighandler_registered) {
- signal_register_fct(0, spoe_sig_stop, 0);
- sighandler_registered = 1;
- }
-
fconf->flags |= FLT_CFG_FL_HTX;
return 0;
}
struct flt_conf *f;
struct spoe_config *conf = fconf->conf;
struct proxy *target;
- int i;
/* Check all SPOE filters for proxy <px> to be sure all SPOE agent names
* are uniq */
return 1;
}
- if ((conf->agent->rt = calloc(global.nbthread, sizeof(*conf->agent->rt))) == NULL) {
- ha_alert("Proxy %s : out of memory initializing SPOE agent '%s' declared at %s:%d.\n",
- px->id, conf->agent->id, conf->agent->conf.file, conf->agent->conf.line);
- return 1;
- }
- for (i = 0; i < global.nbthread; ++i) {
- conf->agent->rt[i].frame_size = conf->agent->max_frame_size;
- conf->agent->rt[i].processing = 0;
- conf->agent->rt[i].idles = 0;
- LIST_INIT(&conf->agent->rt[i].applets);
- LIST_INIT(&conf->agent->rt[i].sending_queue);
- HA_SPIN_INIT(&conf->agent->rt[i].lock);
- }
-
if (postresolve_logger_list(NULL, &conf->agent_fe.loggers, "SPOE agent", conf->agent->id) & ERR_CODE)
return 1;
curagent->conf.file = strdup(file);
curagent->conf.line = linenum;
- curagent->timeout.hello = TICK_ETERNITY;
- curagent->timeout.idle = TICK_ETERNITY;
curagent->timeout.processing = TICK_ETERNITY;
curagent->var_pfx = NULL;
curagent->var_t_process = NULL;
curagent->var_t_total = NULL;
curagent->flags = SPOE_FL_PIPELINING;
- curagent->cps_max = 0;
- curagent->eps_max = 0;
curagent->max_frame_size = MAX_FRAME_SIZE;
- curagent->max_fpa = 20;
for (i = 0; i < SPOE_EV_EVENTS; ++i)
LIST_INIT(&curagent->events[i]);
}
if (alertif_too_many_args(2, file, linenum, args, &err_code))
goto out;
- if (strcmp(args[1], "hello") == 0)
- tv = &curagent->timeout.hello;
- else if (strcmp(args[1], "idle") == 0)
- tv = &curagent->timeout.idle;
+ if (strcmp(args[1], "hello") == 0) {
+ /* TODO: Add a warning or a diag ? Ignore it for now */
+ goto out;
+ }
+ else if (strcmp(args[1], "idle") == 0) {
+ /* TODO: Add a warning or a diag ? Ignore it for now */
+ goto out;
+ }
else if (strcmp(args[1], "processing") == 0)
tv = &curagent->timeout.processing;
else {
- ha_alert("parsing [%s:%d] : 'timeout' supports 'hello', 'idle' or 'processing' (got %s).\n",
+ ha_alert("parsing [%s:%d] : 'timeout' supports 'processing' (got %s).\n",
file, linenum, args[1]);
err_code |= ERR_ALERT | ERR_FATAL;
goto out;
err_code |= ERR_ALERT | ERR_FATAL;
goto out;
}
- if (alertif_too_many_args(1, file, linenum, args, &err_code))
- goto out;
- curagent->cps_max = atol(args[1]);
+ /* TODO: Add a warning or a diag ? Ignore it for now */
}
else if (strcmp(args[0], "maxerrrate") == 0) {
if (!*args[1]) {
err_code |= ERR_ALERT | ERR_FATAL;
goto out;
}
- if (alertif_too_many_args(1, file, linenum, args, &err_code))
- goto out;
- curagent->eps_max = atol(args[1]);
+ /* TODO: Add a warning or a diag ? Ignore it for now */
}
else if (strcmp(args[0], "max-frame-size") == 0) {
if (!*args[1]) {
}
if (alertif_too_many_args(1, file, linenum, args, &err_code))
goto out;
- curagent->max_fpa = atol(args[1]);
- if (curagent->max_fpa < 1) {
- ha_alert("parsing [%s:%d] : '%s' expects a positive integer argument.\n",
- file, linenum, args[0]);
- err_code |= ERR_ALERT | ERR_FATAL;
- goto out;
- }
+ /* TODO: Add a warning or a diag ? Ignore it for now */
}
else if (strcmp(args[0], "register-var-names") == 0) {
int cur_arg;
curagent->id, curagent->conf.file, curagent->conf.line);
goto error;
}
- if (curagent->timeout.hello == TICK_ETERNITY ||
- curagent->timeout.idle == TICK_ETERNITY ||
- curagent->timeout.processing == TICK_ETERNITY) {
- ha_warning("Proxy '%s': missing timeouts for SPOE agent '%s' declare at %s:%d.\n"
+ if (curagent->timeout.processing == TICK_ETERNITY) {
+ ha_warning("Proxy '%s': missing 'processing' timeout for SPOE agent '%s' declare at %s:%d.\n"
" | While not properly invalid, you will certainly encounter various problems\n"
- " | with such a configuration. To fix this, please ensure that all following\n"
- " | timeouts are set to a non-zero value: 'hello', 'idle', 'processing'.\n",
+ " | with such a configuration. To fix this, please ensure it is set to a non-zero value.\n",
px->id, curagent->id, curagent->conf.file, curagent->conf.line);
}
if (curagent->var_pfx == NULL) {