}
}
+static void
+spoe_handle_processing_error(struct stream *s, struct spoe_agent *agent,
+ struct spoe_context *ctx, int dir)
+{
+ if (agent->eps_max > 0)
+ update_freq_ctr(&agent->err_per_sec, 1);
+
+ if (agent->var_on_error) {
+ struct sample smp;
+
+ memset(&smp, 0, sizeof(smp));
+ smp_set_owner(&smp, s->be, s->sess, s, dir|SMP_OPT_FINAL);
+ smp.data.u.sint = ctx->status_code;
+ smp.data.type = SMP_T_BOOL;
+
+ spoe_set_var(ctx, "txn", agent->var_on_error,
+ strlen(agent->var_on_error), &smp);
+ }
+ SPOE_PRINTF(stderr, "%d.%06d [SPOE/%-15s] %s: stream=%p"
+ " - failed to process messages: code=%u\n",
+ (int)now.tv_sec, (int)now.tv_usec, agent->id,
+ __FUNCTION__, s, ctx->status_code);
+ send_log(ctx->strm->be, LOG_WARNING,
+ "SPOE: [%s] failed to process messages: code=%u\n",
+ agent->id, ctx->status_code);
+
+ ctx->state = ((agent->flags & SPOE_FL_CONT_ON_ERR)
+ ? SPOE_CTX_ST_READY
+ : SPOE_CTX_ST_NONE);
+}
+
/* Process a list of SPOE messages. First, this functions will process messages
* and send them to an agent in a NOTIFY frame. Then, it will wait a ACK frame
* to process corresponding actions. During all the processing, it returns 0
return ret;
error:
- if (agent->eps_max > 0)
- update_freq_ctr(&agent->err_per_sec, 1);
-
- if (agent->var_on_error) {
- struct sample smp;
-
- memset(&smp, 0, sizeof(smp));
- smp_set_owner(&smp, s->be, s->sess, s, dir|SMP_OPT_FINAL);
- smp.data.u.sint = ctx->status_code;
- smp.data.type = SMP_T_BOOL;
-
- spoe_set_var(ctx, "txn", agent->var_on_error,
- strlen(agent->var_on_error), &smp);
- }
- SPOE_PRINTF(stderr, "%d.%06d [SPOE/%-15s] %s: stream=%p"
- " - failed to process messages: code=%u\n",
- (int)now.tv_sec, (int)now.tv_usec, agent->id,
- __FUNCTION__, ctx->strm, ctx->status_code);
- send_log(ctx->strm->be, LOG_WARNING,
- "SPOE: [%s] failed to process messages: code=%u\n",
- agent->id, ctx->status_code);
-
- ctx->state = ((agent->flags & SPOE_FL_CONT_ON_ERR)
- ? SPOE_CTX_ST_READY
- : SPOE_CTX_ST_NONE);
+ spoe_handle_processing_error(s, agent, ctx, dir);
ret = 1;
goto end;
return ret;
}
+/* Process a SPOE group, ie the list of messages attached to the group <grp>.
+ * See spoe_process_message for details. */
+static int
+spoe_process_group(struct stream *s, struct spoe_context *ctx,
+ struct spoe_group *group, int dir)
+{
+ int ret;
+
+ SPOE_PRINTF(stderr, "%d.%06d [SPOE/%-15s] %s: stream=%p"
+ " - ctx-state=%s - Process messages for group=%s\n",
+ (int)now.tv_sec, (int)now.tv_usec,
+ ((struct spoe_config *)FLT_CONF(ctx->filter))->agent->id,
+ __FUNCTION__, s, spoe_ctx_state_str[ctx->state],
+ group->id);
+
+ if (LIST_ISEMPTY(&group->messages))
+ return 1;
+
+ ret = spoe_process_messages(s, ctx, &group->messages, dir, SPOE_MSGS_BY_GROUP);
+ return ret;
+}
+
/* Process a SPOE event, ie the list of messages attached to the event <ev>.
* See spoe_process_message for details. */
static int
int dir, ret;
SPOE_PRINTF(stderr, "%d.%06d [SPOE/%-15s] %s: stream=%p"
- " - ctx-state=%s - event=%s\n",
+ " - ctx-state=%s - Process messages for event=%s\n",
(int)now.tv_sec, (int)now.tv_usec,
((struct spoe_config *)FLT_CONF(ctx->filter))->agent->id,
__FUNCTION__, s, spoe_ctx_state_str[ctx->state],
return -1;
}
-/* Send a SPOE group. TODO */
+/* Send message of a SPOE group. This is the action_ptr callback of a rule
+ * associated to a "send-spoe-group" action.
+ *
+ * It returns ACT_RET_CONT is processing is finished without error, it returns
+ * ACT_RET_YIELD if the action is in progress. Otherwise it returns
+ * ACT_RET_ERR. */
static enum act_return
spoe_send_group(struct act_rule *rule, struct proxy *px,
struct session *sess, struct stream *s, int flags)
}
if (agent == NULL || group == NULL || ctx == NULL)
return ACT_RET_ERR;
+ if (ctx->state == SPOE_CTX_ST_NONE)
+ return ACT_RET_CONT;
+
+ switch (rule->from) {
+ case ACT_F_TCP_REQ_SES: dir = SMP_OPT_DIR_REQ; break;
+ case ACT_F_TCP_REQ_CNT: dir = SMP_OPT_DIR_REQ; break;
+ case ACT_F_TCP_RES_CNT: dir = SMP_OPT_DIR_RES; break;
+ case ACT_F_HTTP_REQ: dir = SMP_OPT_DIR_REQ; break;
+ case ACT_F_HTTP_RES: dir = SMP_OPT_DIR_RES; break;
+ default:
+ SPOE_PRINTF(stderr, "%d.%06d [SPOE/%-15s] %s: stream=%p"
+ " - internal error while execute spoe-send-group\n",
+ (int)now.tv_sec, (int)now.tv_usec, agent->id,
+ __FUNCTION__, s);
+ send_log(px, LOG_ERR, "SPOE: [%s] internal error while execute spoe-send-group\n",
+ agent->id);
+ return ACT_RET_CONT;
+ }
- /* TODO */
- return ACT_RET_CONT;
+ ret = spoe_process_group(s, ctx, group, dir);
+ if (ret == 1)
+ return ACT_RET_CONT;
+ else if (ret == 0) {
+ if (flags & ACT_FLAG_FINAL) {
+ SPOE_PRINTF(stderr, "%d.%06d [SPOE/%-15s] %s: stream=%p"
+ " - failed to process group '%s': interrupted by caller\n",
+ (int)now.tv_sec, (int)now.tv_usec,
+ agent->id, __FUNCTION__, s, group->id);
+ ctx->status_code = SPOE_CTX_ERR_INTERRUPT;
+ spoe_handle_processing_error(s, agent, ctx, dir);
+ spoe_stop_processing(ctx);
+ return ACT_RET_CONT;
+ }
+ return ACT_RET_YIELD;
+ }
+ else
+ return ACT_RET_ERR;
}
/* Check an "send-spoe-group" action. Here, we'll try to find the real SPOE