/* Remove applet from the list of running applets */
SPOE_DEBUG_STMT(agent->rt[tid].applets_act--);
+ HA_ATOMIC_SUB(&agent->counters.applets, 1);
HA_SPIN_LOCK(SPOE_APPLET_LOCK, &agent->rt[tid].lock);
if (!LIST_ISEMPTY(&spoe_appctx->list)) {
LIST_DEL(&spoe_appctx->list);
if (appctx->st0 == SPOE_APPCTX_ST_IDLE) {
eb32_delete(&spoe_appctx->node);
SPOE_DEBUG_STMT(agent->rt[tid].applets_idle--);
+ HA_ATOMIC_SUB(&agent->counters.idles, 1);
}
appctx->st0 = SPOE_APPCTX_ST_END;
list_for_each_entry_safe(ctx, back, &spoe_appctx->waiting_queue, list) {
LIST_DEL(&ctx->list);
LIST_INIT(&ctx->list);
+ HA_ATOMIC_SUB(&agent->counters.nb_waiting, 1);
spoe_update_stat_time(&ctx->stats.tv_wait, &ctx->stats.t_waiting);
ctx->state = SPOE_CTX_ST_ERROR;
ctx->status_code = (spoe_appctx->status_code + 0x100);
list_for_each_entry_safe(ctx, back, &agent->rt[tid].sending_queue, list) {
LIST_DEL(&ctx->list);
LIST_INIT(&ctx->list);
+ HA_ATOMIC_SUB(&agent->counters.nb_sending, 1);
spoe_update_stat_time(&ctx->stats.tv_queue, &ctx->stats.t_queue);
ctx->state = SPOE_CTX_ST_ERROR;
ctx->status_code = (spoe_appctx->status_code + 0x100);
list_for_each_entry_safe(ctx, back, &agent->rt[tid].waiting_queue, list) {
LIST_DEL(&ctx->list);
LIST_INIT(&ctx->list);
+ HA_ATOMIC_SUB(&agent->counters.nb_waiting, 1);
spoe_update_stat_time(&ctx->stats.tv_wait, &ctx->stats.t_waiting);
ctx->state = SPOE_CTX_ST_ERROR;
ctx->status_code = (spoe_appctx->status_code + 0x100);
/* HELLO handshake is finished, set the idle timeout and
* add the applet in the list of running applets. */
SPOE_DEBUG_STMT(agent->rt[tid].applets_idle++);
+ HA_ATOMIC_ADD(&agent->counters.idles, 1);
appctx->st0 = SPOE_APPCTX_ST_IDLE;
SPOE_APPCTX(appctx)->node.key = 0;
eb32_insert(&agent->rt[tid].idle_applets, &SPOE_APPCTX(appctx)->node);
spoe_release_buffer(&ctx->buffer, &ctx->buffer_wait);
LIST_DEL(&ctx->list);
LIST_INIT(&ctx->list);
+ HA_ATOMIC_SUB(&agent->counters.nb_sending, 1);
spoe_update_stat_time(&ctx->stats.tv_queue, &ctx->stats.t_queue);
ctx->spoe_appctx = NULL;
ctx->state = SPOE_CTX_ST_ERROR;
spoe_release_buffer(&ctx->buffer, &ctx->buffer_wait);
LIST_DEL(&ctx->list);
LIST_INIT(&ctx->list);
+ HA_ATOMIC_SUB(&agent->counters.nb_sending, 1);
spoe_update_stat_time(&ctx->stats.tv_queue, &ctx->stats.t_queue);
ctx->spoe_appctx = SPOE_APPCTX(appctx);
if (!(ctx->flags & SPOE_CTX_FL_FRAGMENTED) ||
*skip = 1;
LIST_ADDQ(&SPOE_APPCTX(appctx)->waiting_queue, &ctx->list);
}
+ HA_ATOMIC_ADD(&agent->counters.nb_waiting, 1);
ctx->stats.tv_wait = now;
SPOE_APPCTX(appctx)->frag_ctx.ctx = NULL;
SPOE_APPCTX(appctx)->frag_ctx.cursid = 0;
static int
spoe_handle_receiving_frame_appctx(struct appctx *appctx, int *skip)
{
+ struct spoe_agent *agent = SPOE_APPCTX(appctx)->agent;
struct spoe_context *ctx = NULL;
char *frame;
int ret;
default:
LIST_DEL(&ctx->list);
LIST_INIT(&ctx->list);
+ HA_ATOMIC_SUB(&agent->counters.nb_waiting, 1);
spoe_update_stat_time(&ctx->stats.tv_wait, &ctx->stats.t_waiting);
ctx->stats.tv_response = now;
if (ctx->spoe_appctx) {
if (appctx->st0 == SPOE_APPCTX_ST_PROCESSING && SPOE_APPCTX(appctx)->cur_fpa < agent->max_fpa) {
SPOE_DEBUG_STMT(agent->rt[tid].applets_idle++);
+ HA_ATOMIC_ADD(&agent->counters.idles, 1);
appctx->st0 = SPOE_APPCTX_ST_IDLE;
eb32_insert(&agent->rt[tid].idle_applets, &SPOE_APPCTX(appctx)->node);
}
case SPOE_APPCTX_ST_IDLE:
SPOE_DEBUG_STMT(agent->rt[tid].applets_idle--);
+ HA_ATOMIC_SUB(&agent->counters.idles, 1);
eb32_delete(&SPOE_APPCTX(appctx)->node);
if (stopping &&
LIST_ISEMPTY(&agent->rt[tid].sending_queue) &&
LIST_ADDQ(&conf->agent->rt[tid].applets, &SPOE_APPCTX(appctx)->list);
HA_SPIN_UNLOCK(SPOE_APPLET_LOCK, &conf->agent->rt[tid].lock);
SPOE_DEBUG_STMT(conf->agent->rt[tid].applets_act++);
+ HA_ATOMIC_ADD(&conf->agent->counters.applets, 1);
task_wakeup(SPOE_APPCTX(appctx)->task, TASK_WOKEN_INIT);
task_wakeup(strm->task, TASK_WOKEN_INIT);
/* Add the SPOE context in the sending queue */
LIST_ADDQ(&agent->rt[tid].sending_queue, &ctx->list);
+ HA_ATOMIC_ADD(&agent->counters.nb_sending, 1);
spoe_update_stat_time(&ctx->stats.tv_request, &ctx->stats.t_request);
ctx->stats.tv_queue = now;
/***************************************************************************
* Functions that process SPOE events
**************************************************************************/
+static void
+spoe_update_stats(struct stream *s, struct spoe_agent *agent,
+ struct spoe_context *ctx, int dir)
+{
+ if (!tv_iszero(&ctx->stats.tv_start)) {
+ spoe_update_stat_time(&ctx->stats.tv_start, &ctx->stats.t_process);
+ ctx->stats.t_total += ctx->stats.t_process;
+ tv_zero(&ctx->stats.tv_request);
+ tv_zero(&ctx->stats.tv_queue);
+ tv_zero(&ctx->stats.tv_wait);
+ tv_zero(&ctx->stats.tv_response);
+ }
+
+ if (agent->var_t_process) {
+ struct sample smp;
+
+ memset(&smp, 0, sizeof(smp));
+ smp_set_owner(&smp, s->be, s->sess, s, dir|SMP_OPT_FINAL);
+ smp.data.u.sint = ctx->stats.t_process;
+ smp.data.type = SMP_T_SINT;
+
+ spoe_set_var(ctx, "txn", agent->var_t_process,
+ strlen(agent->var_t_process), &smp);
+ }
+
+ if (agent->var_t_total) {
+ struct sample smp;
+
+ memset(&smp, 0, sizeof(smp));
+ smp_set_owner(&smp, s->be, s->sess, s, dir|SMP_OPT_FINAL);
+ smp.data.u.sint = ctx->stats.t_total;
+ smp.data.type = SMP_T_SINT;
+
+ spoe_set_var(ctx, "txn", agent->var_t_total,
+ strlen(agent->var_t_total), &smp);
+ }
+}
+
+static void
+spoe_handle_processing_error(struct stream *s, struct spoe_agent *agent,
+ struct spoe_context *ctx, int dir)
+{
+ if (agent->eps_max > 0)
+ update_freq_ctr(&agent->rt[tid].err_per_sec, 1);
+
+ if (agent->var_on_error) {
+ struct sample smp;
+
+ memset(&smp, 0, sizeof(smp));
+ smp_set_owner(&smp, s->be, s->sess, s, dir|SMP_OPT_FINAL);
+ smp.data.u.sint = ctx->status_code;
+ smp.data.type = SMP_T_BOOL;
+
+ spoe_set_var(ctx, "txn", agent->var_on_error,
+ strlen(agent->var_on_error), &smp);
+ }
+
+ ctx->state = ((agent->flags & SPOE_FL_CONT_ON_ERR)
+ ? SPOE_CTX_ST_READY
+ : SPOE_CTX_ST_NONE);
+}
+
static inline int
spoe_start_processing(struct spoe_agent *agent, struct spoe_context *ctx, int dir)
{
if (!(ctx->flags & SPOE_CTX_FL_PROCESS))
return;
-
+ HA_ATOMIC_ADD(&agent->counters.nb_processed, 1);
if (sa) {
if (sa->frag_ctx.ctx == ctx) {
sa->frag_ctx.ctx = NULL;
ctx->frag_ctx.flags = 0;
if (!LIST_ISEMPTY(&ctx->list)) {
+ if (ctx->state == SPOE_CTX_ST_SENDING_MSGS)
+ HA_ATOMIC_ADD(&agent->counters.nb_sending, 1);
+ else
+ HA_ATOMIC_SUB(&agent->counters.nb_waiting, 1);
+
LIST_DEL(&ctx->list);
LIST_INIT(&ctx->list);
}
}
-static void
-spoe_update_stats(struct stream *s, struct spoe_agent *agent,
- struct spoe_context *ctx, int dir)
-{
- if (!tv_iszero(&ctx->stats.tv_start)) {
- spoe_update_stat_time(&ctx->stats.tv_start, &ctx->stats.t_process);
- ctx->stats.t_total += ctx->stats.t_process;
- tv_zero(&ctx->stats.tv_request);
- tv_zero(&ctx->stats.tv_queue);
- tv_zero(&ctx->stats.tv_wait);
- tv_zero(&ctx->stats.tv_response);
- }
-
- if (agent->var_t_process) {
- struct sample smp;
-
- memset(&smp, 0, sizeof(smp));
- smp_set_owner(&smp, s->be, s->sess, s, dir|SMP_OPT_FINAL);
- smp.data.u.sint = ctx->stats.t_process;
- smp.data.type = SMP_T_SINT;
-
- spoe_set_var(ctx, "txn", agent->var_t_process,
- strlen(agent->var_t_process), &smp);
- }
-
- if (agent->var_t_total) {
- struct sample smp;
-
- memset(&smp, 0, sizeof(smp));
- smp_set_owner(&smp, s->be, s->sess, s, dir|SMP_OPT_FINAL);
- smp.data.u.sint = ctx->stats.t_total;
- smp.data.type = SMP_T_SINT;
-
- spoe_set_var(ctx, "txn", agent->var_t_total,
- strlen(agent->var_t_total), &smp);
- }
-}
-
-static void
-spoe_handle_processing_error(struct stream *s, struct spoe_agent *agent,
- struct spoe_context *ctx, int dir)
-{
- if (agent->eps_max > 0)
- update_freq_ctr(&agent->rt[tid].err_per_sec, 1);
-
- if (agent->var_on_error) {
- struct sample smp;
-
- memset(&smp, 0, sizeof(smp));
- smp_set_owner(&smp, s->be, s->sess, s, dir|SMP_OPT_FINAL);
- smp.data.u.sint = ctx->status_code;
- smp.data.type = SMP_T_BOOL;
-
- spoe_set_var(ctx, "txn", agent->var_on_error,
- strlen(agent->var_on_error), &smp);
- }
-
- ctx->state = ((agent->flags & SPOE_FL_CONT_ON_ERR)
- ? SPOE_CTX_ST_READY
- : SPOE_CTX_ST_NONE);
-}
-
/* Process a list of SPOE messages. First, this functions will process messages
* and send them to an agent in a NOTIFY frame. Then, it will wait a ACK frame
* to process corresponding actions. During all the processing, it returns 0
int ret = 1;
if (ctx->state == SPOE_CTX_ST_ERROR)
- goto error;
+ goto end;
if (tick_is_expired(ctx->process_exp, now_ms) && ctx->state != SPOE_CTX_ST_DONE) {
SPOE_PRINTF(stderr, "%d.%06d [SPOE/%-15s] %s: stream=%p"
(int)now.tv_sec, (int)now.tv_usec,
agent->id, __FUNCTION__, s);
ctx->status_code = SPOE_CTX_ERR_TOUT;
- goto error;
+ goto end;
}
if (ctx->state == SPOE_CTX_ST_READY) {
goto out;
ret = spoe_encode_messages(s, ctx, messages, dir, type);
if (ret < 0)
- goto error;
+ goto end;
if (!ret)
goto skip;
if (spoe_queue_context(ctx) < 0)
- goto error;
+ goto end;
ctx->state = SPOE_CTX_ST_SENDING_MSGS;
}
out:
return ret;
- error:
- spoe_handle_processing_error(s, agent, ctx, dir);
- ret = 1;
- goto end;
-
skip:
tv_zero(&ctx->stats.tv_start);
ctx->state = SPOE_CTX_ST_READY;
- ret = 1;
+ spoe_stop_processing(agent, ctx);
+ return 1;
end:
spoe_update_stats(s, agent, ctx, dir);
spoe_stop_processing(agent, ctx);
+ if (ctx->status_code) {
+ HA_ATOMIC_ADD(&agent->counters.nb_errors, 1);
+ spoe_handle_processing_error(s, agent, ctx, dir);
+ ret = 1;
+ }
return ret;
}
ret = spoe_process_messages(s, ctx, &group->messages, dir, SPOE_MSGS_BY_GROUP);
if (ret && ctx->stats.t_process != -1) {
SPOE_PRINTF(stderr, "%d.%06d [SPOE/%-15s] %s: stream=%p"
- " - <GROUP:%s> sid=%u st=%u %ld/%ld/%ld/%ld/%ld\n",
+ " - <GROUP:%s> sid=%u st=%u %ld/%ld/%ld/%ld/%ld %u/%u %u/%u %llu/%llu %u/%u\n",
(int)now.tv_sec, (int)now.tv_usec, agent->id,
- __FUNCTION__, s, s->uniq_id, ctx->status_code,
+ __FUNCTION__, s, group->id, s->uniq_id, ctx->status_code,
ctx->stats.t_request, ctx->stats.t_queue, ctx->stats.t_waiting,
- ctx->stats.t_response, ctx->stats.t_process);
- send_log(&conf->agent_fe, (!ctx->status_code ? LOG_NOTICE : LOG_WARNING),
- "SPOE: [%s] <GROUP:%s> sid=%u st=%u %ld/%ld/%ld/%ld/%ld\n",
- agent->id, group->id, s->uniq_id, ctx->status_code,
- ctx->stats.t_request, ctx->stats.t_queue, ctx->stats.t_waiting,
- ctx->stats.t_response, ctx->stats.t_process);
+ ctx->stats.t_response, ctx->stats.t_process,
+ agent->counters.idles, agent->counters.applets,
+ agent->counters.nb_sending, agent->counters.nb_waiting,
+ agent->counters.nb_errors, agent->counters.nb_processed,
+ agent->rt[tid].processing, read_freq_ctr(&agent->rt[tid].processing_per_sec));
+ if (ctx->status_code || !(conf->agent_fe.options2 & PR_O2_NOLOGNORM))
+ send_log(&conf->agent_fe, (!ctx->status_code ? LOG_NOTICE : LOG_WARNING),
+ "SPOE: [%s] <GROUP:%s> sid=%u st=%u %ld/%ld/%ld/%ld/%ld %u/%u %u/%u %llu/%llu\n",
+ agent->id, group->id, s->uniq_id, ctx->status_code,
+ ctx->stats.t_request, ctx->stats.t_queue, ctx->stats.t_waiting,
+ ctx->stats.t_response, ctx->stats.t_process,
+ agent->counters.idles, agent->counters.applets,
+ agent->counters.nb_sending, agent->counters.nb_waiting,
+ agent->counters.nb_errors, agent->counters.nb_processed);
}
return ret;
}
ret = spoe_process_messages(s, ctx, &(ctx->events[ev]), dir, SPOE_MSGS_BY_EVENT);
if (ret && ctx->stats.t_process != -1) {
SPOE_PRINTF(stderr, "%d.%06d [SPOE/%-15s] %s: stream=%p"
- " - <EVENT:%s> sid=%u st=%u %ld/%ld/%ld/%ld/%ld\n",
+ " - <EVENT:%s> sid=%u st=%u %ld/%ld/%ld/%ld/%ld %u/%u %u/%u %llu/%llu %u/%u\n",
(int)now.tv_sec, (int)now.tv_usec, agent->id,
__FUNCTION__, s, spoe_event_str[ev], s->uniq_id, ctx->status_code,
ctx->stats.t_request, ctx->stats.t_queue, ctx->stats.t_waiting,
- ctx->stats.t_response, ctx->stats.t_process);
- send_log(&conf->agent_fe, (!ctx->status_code ? LOG_NOTICE : LOG_WARNING),
- "SPOE: [%s] <EVENT:%s> sid=%u st=%u %ld/%ld/%ld/%ld/%ld\n",
- agent->id, spoe_event_str[ev], s->uniq_id, ctx->status_code,
- ctx->stats.t_request, ctx->stats.t_queue, ctx->stats.t_waiting,
- ctx->stats.t_response, ctx->stats.t_process);
+ ctx->stats.t_response, ctx->stats.t_process,
+ agent->counters.idles, agent->counters.applets,
+ agent->counters.nb_sending, agent->counters.nb_waiting,
+ agent->counters.nb_errors, agent->counters.nb_processed,
+ agent->rt[tid].processing, read_freq_ctr(&agent->rt[tid].processing_per_sec));
+ if (ctx->status_code || !(conf->agent_fe.options2 & PR_O2_NOLOGNORM))
+ send_log(&conf->agent_fe, (!ctx->status_code ? LOG_NOTICE : LOG_WARNING),
+ "SPOE: [%s] <EVENT:%s> sid=%u st=%u %ld/%ld/%ld/%ld/%ld %u/%u %u/%u %llu/%llu\n",
+ agent->id, spoe_event_str[ev], s->uniq_id, ctx->status_code,
+ ctx->stats.t_request, ctx->stats.t_queue, ctx->stats.t_waiting,
+ ctx->stats.t_response, ctx->stats.t_process,
+ agent->counters.idles, agent->counters.applets,
+ agent->counters.nb_sending, agent->counters.nb_waiting,
+ agent->counters.nb_errors, agent->counters.nb_processed);
}
return ret;
}
(int)now.tv_sec, (int)now.tv_usec,
agent->id, __FUNCTION__, s, group->id);
ctx->status_code = SPOE_CTX_ERR_INTERRUPT;
- spoe_handle_processing_error(s, agent, ctx, dir);
spoe_stop_processing(agent, ctx);
+ spoe_handle_processing_error(s, agent, ctx, dir);
return ACT_RET_CONT;
}
return ACT_RET_YIELD;