From: Thierry FOURNIER Date: Fri, 23 Feb 2018 17:24:10 +0000 (+0100) Subject: MINOR: spoa-server: Prepare responses X-Git-Tag: v2.0-dev3~29 X-Git-Url: http://git.ipfire.org/cgi-bin/gitweb.cgi?a=commitdiff_plain;h=fbd3824868ebaa02b328c35da9dd4e8a03965ae3;p=thirdparty%2Fhaproxy.git MINOR: spoa-server: Prepare responses This patch adds SPOP responses managament. It provides SPOP encoding primitives. It also move the example function ip_reputation to this new behavior. --- diff --git a/contrib/spoa_server/spoa.c b/contrib/spoa_server/spoa.c index e484315f65..53fc759bc5 100644 --- a/contrib/spoa_server/spoa.c +++ b/contrib/spoa_server/spoa.c @@ -156,26 +156,30 @@ static void check_ipv4_reputation(struct worker *w, struct in_addr *ipv4) { char str[INET_ADDRSTRLEN]; + unsigned int score; if (inet_ntop(AF_INET, ipv4, str, INET_ADDRSTRLEN) == NULL) return; - w->ip_score = random() % 100; + score = random() % 100; + set_var_uint32(w, "ip_score", 8, SPOE_SCOPE_SESS, score); - DEBUG(" IP score for %.*s is: %d", INET_ADDRSTRLEN, str, w->ip_score); + DEBUG(" IP score for %.*s is: %d", INET_ADDRSTRLEN, str, score); } static void check_ipv6_reputation(struct worker *w, struct in6_addr *ipv6) { char str[INET6_ADDRSTRLEN]; + unsigned int score; if (inet_ntop(AF_INET6, ipv6, str, INET6_ADDRSTRLEN) == NULL) return; - w->ip_score = random() % 100; + score = random() % 100; + set_var_uint32(w, "ip_score", 8, SPOE_SCOPE_SESS, score); - DEBUG(" IP score for %.*s is: %d", INET6_ADDRSTRLEN, str, w->ip_score); + DEBUG(" IP score for %.*s is: %d", INET6_ADDRSTRLEN, str, score); } static int @@ -700,6 +704,159 @@ error: return -1; } +/* Encode a ACK frame to send it to HAProxy. It returns -1 if an error occurred, + * the number of written bytes otherwise. */ +static void prepare_agentack(struct worker *w) +{ + w->ack_len = 0; + + /* Frame type */ + w->ack[w->ack_len++] = SPOE_FRM_T_AGENT_ACK; + + /* No flags for now */ + memset(w->ack + w->ack_len, 0, 4); /* No flags */ + w->ack_len += 4; + + /* Set stream-id and frame-id for ACK frames */ + w->ack_len += encode_spoe_varint(w->stream_id, w->ack + w->ack_len); + w->ack_len += encode_spoe_varint(w->frame_id, w->ack + w->ack_len); +} + +static inline +int set_var_name(struct worker *w, const char *name, int name_len, unsigned char scope) +{ + w->ack[w->ack_len++] = SPOE_ACT_T_SET_VAR; /* Action type */ + w->ack[w->ack_len++] = 3; /* Number of args */ + w->ack[w->ack_len++] = scope; /* Arg 1: the scope */ + w->ack_len += encode_spoe_string(name, name_len, w->ack+w->ack_len); /* Arg 2: variable name */ + return 1; +} + +int set_var_null(struct worker *w, + const char *name, int name_len, + unsigned char scope) +{ + if (!set_var_name(w, name, name_len, scope)) + return 0; + w->ack[w->ack_len++] = SPOE_DATA_T_NULL; + return 1; +} + +int set_var_bool(struct worker *w, + const char *name, int name_len, + unsigned char scope, bool value) +{ + if (!set_var_name(w, name, name_len, scope)) + return 0; + w->ack[w->ack_len++] = SPOE_DATA_T_BOOL | (!!value << 4); + return 1; +} + +static inline +int set_var_int(struct worker *w, + const char *name, int name_len, + unsigned char scope, int type, uint64_t value) +{ + if (!set_var_name(w, name, name_len, scope)) + return 0; + w->ack[w->ack_len++] = SPOE_DATA_T_UINT32; + w->ack_len += encode_spoe_varint(value, w->ack+w->ack_len); /* Arg 3: variable value */ + return 1; +} + +int set_var_uint32(struct worker *w, + const char *name, int name_len, + unsigned char scope, uint32_t value) +{ + return set_var_int(w, name, name_len, scope, SPOE_DATA_T_UINT32, value); +} + +int set_var_int32(struct worker *w, + const char *name, int name_len, + unsigned char scope, int32_t value) +{ + return set_var_int(w, name, name_len, scope, SPOE_DATA_T_INT32, value); +} + +int set_var_uint64(struct worker *w, + const char *name, int name_len, + unsigned char scope, uint64_t value) +{ + return set_var_int(w, name, name_len, scope, SPOE_DATA_T_INT32, value); +} + +int set_var_int64(struct worker *w, + const char *name, int name_len, + unsigned char scope, int64_t value) +{ + return set_var_int(w, name, name_len, scope, SPOE_DATA_T_INT32, value); +} + +int set_var_ipv4(struct worker *w, + const char *name, int name_len, + unsigned char scope, + struct in_addr *ipv4) +{ + if (!set_var_name(w, name, name_len, scope)) + return 0; + w->ack[w->ack_len++] = SPOE_DATA_T_IPV4; + memcpy(w->ack+w->ack_len, ipv4, 4); + w->ack_len += 4; + return 1; +} + +int set_var_ipv6(struct worker *w, + const char *name, int name_len, + unsigned char scope, + struct in6_addr *ipv6) +{ + if (!set_var_name(w, name, name_len, scope)) + return 0; + w->ack[w->ack_len++] = SPOE_DATA_T_IPV6; + memcpy(w->ack+w->ack_len, ipv6, 16); + w->ack_len += 16; + return 1; +} + +static inline +int set_var_buf(struct worker *w, + const char *name, int name_len, + unsigned char scope, int type, + const char *str, int str_len) +{ + if (!set_var_name(w, name, name_len, scope)) + return 0; + w->ack[w->ack_len++] = type; + w->ack_len += encode_spoe_string(str, str_len, w->ack+w->ack_len); + return 1; +} + +int set_var_string(struct worker *w, + const char *name, int name_len, + unsigned char scope, + const char *str, int strlen) +{ + return set_var_buf(w, name, name_len, scope, SPOE_DATA_T_STR, str, strlen); +} + +int set_var_bin(struct worker *w, + const char *name, int name_len, + unsigned char scope, + const char *str, int strlen) +{ + return set_var_buf(w, name, name_len, scope, SPOE_DATA_T_BIN, str, strlen); +} + +/* This function is a little bit ugly, + * TODO: improve the response without copying the bufer + */ +static int commit_agentack(struct worker *w) +{ + memcpy(w->buf, w->ack, w->ack_len); + w->len = w->ack_len; + return 1; +} + /* Decode a NOTIFY frame received from HAProxy. It returns -1 if an error * occurred, 0 if the frame must be skipped, otherwise the number of read * bytes. */ @@ -737,6 +894,9 @@ handle_hanotify(struct worker *w) DEBUG("Notify frame received: stream-id=%u - frame-id=%u", w->stream_id, w->frame_id); + /* Prepara ack, if the processing fails tha ack will be cancelled */ + prepare_agentack(w); + /* Loop on messages */ while (idx < w->len) { char *str; @@ -840,39 +1000,6 @@ prepare_agenthello(struct worker *w) return idx; } -/* Encode a ACK frame to send it to HAProxy. It returns -1 if an error occurred, - * the number of written bytes otherwise. */ -static int -prepare_agentack(struct worker *w) -{ - int idx = 0; - - /* Frame type */ - w->buf[idx++] = SPOE_FRM_T_AGENT_ACK; - - /* No flags for now */ - memset(w->buf+idx, 0, 4); /* No flags */ - idx += 4; - - /* Set stream-id and frame-id for ACK frames */ - idx += encode_spoe_varint(w->stream_id, w->buf+idx); - idx += encode_spoe_varint(w->frame_id, w->buf+idx); - - /* Data */ - if (w->ip_score == -1) - goto out; - - w->buf[idx++] = SPOE_ACT_T_SET_VAR; /* Action type */ - w->buf[idx++] = 3; /* Number of args */ - w->buf[idx++] = SPOE_SCOPE_SESS; /* Arg 1: the scope */ - idx += encode_spoe_string("ip_score", 8, w->buf+idx); /* Arg 2: variable name */ - w->buf[idx++] = SPOE_DATA_T_UINT32; - idx += encode_spoe_varint(w->ip_score, w->buf+idx); /* Arg 3: variable value */ -out: - w->len = idx; - return idx; -} - /* Encode a DISCONNECT frame to send it to HAProxy. It returns -1 if an error * occurred, the number of written bytes otherwise. */ static int @@ -957,7 +1084,7 @@ notify_ack_roundtip(int sock, struct worker *w) LOG("Failed to handle Haproxy NOTIFY frame"); goto error_or_quit; } - if (prepare_agentack(w) < 0) { + if (commit_agentack(w) < 0) { LOG("Failed to prepare Agent ACK frame"); goto error_or_quit; } @@ -1022,7 +1149,6 @@ spoa_worker(void *data) if (w.healthcheck == true) goto close; while (1) { - w.ip_score = -1; if (notify_ack_roundtip(csock, &w) < 0) break; } diff --git a/contrib/spoa_server/spoa.h b/contrib/spoa_server/spoa.h index ca8518122a..ee19f37c19 100644 --- a/contrib/spoa_server/spoa.h +++ b/contrib/spoa_server/spoa.h @@ -55,7 +55,8 @@ struct worker { unsigned int stream_id; unsigned int frame_id; bool healthcheck; - int ip_score; /* -1 if unset, else between 0 and 100 */ + char ack[MAX_FRAME_SIZE]; + unsigned int ack_len; }; struct chunk { @@ -106,6 +107,41 @@ extern pthread_key_t worker_id; void ps_register(struct ps *ps); void ps_register_message(struct ps *ps, const char *name, void *ref); +int set_var_null(struct worker *w, + const char *name, int name_len, + unsigned char scope); +int set_var_bool(struct worker *w, + const char *name, int name_len, + unsigned char scope, bool value); +int set_var_uint32(struct worker *w, + const char *name, int name_len, + unsigned char scope, uint32_t value); +int set_var_int32(struct worker *w, + const char *name, int name_len, + unsigned char scope, int32_t value); +int set_var_uint64(struct worker *w, + const char *name, int name_len, + unsigned char scope, uint64_t value); +int set_var_int64(struct worker *w, + const char *name, int name_len, + unsigned char scope, int64_t value); +int set_var_ipv4(struct worker *w, + const char *name, int name_len, + unsigned char scope, + struct in_addr *ipv4); +int set_var_ipv6(struct worker *w, + const char *name, int name_len, + unsigned char scope, + struct in6_addr *ipv6); +int set_var_string(struct worker *w, + const char *name, int name_len, + unsigned char scope, + const char *str, int strlen); +int set_var_bin(struct worker *w, + const char *name, int name_len, + unsigned char scope, + const char *str, int strlen); + #define LOG(fmt, args...) \ do { \ struct timeval now; \