(time_t)session->timestamp);
if (elt) {
- gboolean ratelimited = FALSE;
+ gboolean ratelimited = FALSE, new_ratelimit = FALSE;
if (isnan (elt->cur)) {
/* Ratelimit exceeded, preserve it for the whole ttl */
rspamd_inet_address_to_string (masked),
session->ctx->leaky_bucket_burst);
elt->cur = NAN;
+ new_ratelimit = TRUE;
}
else {
elt->cur ++; /* Allow one more request */
}
}
- rspamd_inet_address_free (masked);
-
if (ratelimited) {
rspamd_fuzzy_maybe_call_blacklisted(session->ctx, session->addr, "ratelimit");
}
+ if (new_ratelimit) {
+ struct rspamd_srv_command srv_cmd;
+
+ srv_cmd.type = RSPAMD_SRV_FUZZY_BLOCKED;
+ srv_cmd.cmd.fuzzy_blocked.af = rspamd_inet_address_get_af(masked);
+
+ if (srv_cmd.cmd.fuzzy_blocked.af == AF_INET || srv_cmd.cmd.fuzzy_blocked.af == AF_INET6) {
+ socklen_t slen;
+ struct sockaddr *sa = rspamd_inet_address_get_sa(masked, &slen);
+
+ if (slen <= sizeof(srv_cmd.cmd.fuzzy_blocked.addr)) {
+ memcpy(&srv_cmd.cmd.fuzzy_blocked.addr, sa, slen);
+ msg_debug("propagating blocked address to other workers");
+ rspamd_srv_send_command(session->worker, session->ctx->event_loop, &srv_cmd, -1, NULL, NULL);
+ }
+ else {
+ msg_err("bad address length: %d, expected to be %d", (int)slen, (int)sizeof(srv_cmd.cmd.fuzzy_blocked.addr));
+ }
+ }
+ }
+
+ rspamd_inet_address_free (masked);
+
return !ratelimited;
}
else {
return TRUE;
}
+static gboolean
+rspamd_fuzzy_control_blocked (struct rspamd_main *rspamd_main,
+ struct rspamd_worker *worker, gint fd,
+ gint attached_fd,
+ struct rspamd_control_command *cmd,
+ gpointer ud)
+{
+ struct rspamd_fuzzy_storage_ctx *ctx = (struct rspamd_fuzzy_storage_ctx *)ud;
+ struct rspamd_control_reply rep;
+ struct rspamd_leaky_bucket_elt *elt;
+ ev_tstamp now = ev_now (ctx->event_loop);
+ rspamd_inet_addr_t *addr = NULL;
+
+ rep.type = RSPAMD_CONTROL_FUZZY_BLOCKED;
+ rep.reply.fuzzy_blocked.status = 0;
+
+ if (cmd->cmd.fuzzy_blocked.af == AF_INET) {
+ addr = rspamd_inet_address_from_sa(&cmd->cmd.fuzzy_blocked.addr.sa,
+ sizeof (struct sockaddr_in));
+ }
+ else if (cmd->cmd.fuzzy_blocked.af == AF_INET6) {
+ addr = rspamd_inet_address_from_sa(&cmd->cmd.fuzzy_blocked.addr.sa,
+ sizeof (struct sockaddr_in6));
+ }
+ else {
+ msg_err ("invalid address family: %d", cmd->cmd.fuzzy_blocked.af);
+ rep.reply.fuzzy_blocked.status = -1;
+ }
+
+ if (addr) {
+ elt = rspamd_lru_hash_lookup(ctx->ratelimit_buckets, addr,
+ (time_t) now);
+
+ if (elt) {
+ if (isnan (elt->cur)) {
+ /* Already ratelimited, ignore */
+ }
+ else {
+ elt->last = now;
+ elt->cur = NAN;
+
+ msg_info ("propagating ratelimiting %s, %.1f max elts",
+ rspamd_inet_address_to_string(addr),
+ ctx->leaky_bucket_burst);
+ rspamd_fuzzy_maybe_call_blacklisted(ctx, addr, "ratelimit");
+ }
+
+ rspamd_inet_address_free(addr);
+
+ }
+ else {
+ /* New bucket */
+ elt = g_malloc(sizeof(*elt));
+ elt->addr = addr; /* transfer ownership */
+ elt->cur = NAN;
+ elt->last = now;
+
+ rspamd_lru_hash_insert(ctx->ratelimit_buckets,
+ addr,
+ elt,
+ (time_t)now,
+ ctx->leaky_bucket_ttl);
+ msg_info ("propagating ratelimiting %s, %.1f max elts",
+ rspamd_inet_address_to_string(addr),
+ ctx->leaky_bucket_burst);
+ rspamd_fuzzy_maybe_call_blacklisted(ctx, addr, "ratelimit");
+ }
+ }
+
+ if (write (fd, &rep, sizeof (rep)) != sizeof (rep)) {
+ msg_err ("cannot write reply to the control socket: %s",
+ strerror (errno));
+ }
+
+ return TRUE;
+}
+
static gboolean
rspamd_fuzzy_storage_reload (struct rspamd_main *rspamd_main,
struct rspamd_worker *worker, gint fd,
rspamd_fuzzy_storage_stat, ctx);
rspamd_control_worker_add_cmd_handler (worker, RSPAMD_CONTROL_FUZZY_SYNC,
rspamd_fuzzy_storage_sync, ctx);
+ rspamd_control_worker_add_cmd_handler (worker, RSPAMD_CONTROL_FUZZY_BLOCKED,
+ rspamd_fuzzy_control_blocked, ctx);
if (ctx->update_map != NULL) {
case RSPAMD_CONTROL_FUZZY_SYNC:
case RSPAMD_CONTROL_LOG_PIPE:
case RSPAMD_CONTROL_CHILD_CHANGE:
+ case RSPAMD_CONTROL_FUZZY_BLOCKED:
break;
case RSPAMD_CONTROL_RERESOLVE:
if (cd->worker->srv->cfg) {
case RSPAMD_SRV_HEALTH:
rspamd_fill_health_reply (srv, &rdata->rep);
break;
- case RSPAMD_NOTICE_HYPERSCAN_CACHE:
+ case RSPAMD_SRV_NOTICE_HYPERSCAN_CACHE:
#ifdef WITH_HYPERSCAN
rspamd_hyperscan_notice_known(cmd.cmd.hyperscan_cache_file.path);
#endif
rdata->rep.reply.hyperscan_cache_file.unused = 0;
break;
+ case RSPAMD_SRV_FUZZY_BLOCKED:
+ /* Broadcast command to all workers */
+ memset (&wcmd, 0, sizeof (wcmd));
+ wcmd.type = RSPAMD_CONTROL_FUZZY_BLOCKED;
+ /* Ensure that memcpy is safe */
+ G_STATIC_ASSERT(sizeof(wcmd.cmd.fuzzy_blocked) == sizeof(cmd.cmd.fuzzy_blocked));
+ memcpy(&wcmd.cmd.fuzzy_blocked, &cmd.cmd.fuzzy_blocked, sizeof(wcmd.cmd.fuzzy_blocked));
+ rspamd_control_broadcast_cmd (srv, &wcmd, rfd,
+ rspamd_control_ignore_io_handler, NULL, worker->pid);
+ break;
default:
msg_err ("unknown command type: %d", cmd.type);
break;
case RSPAMD_SRV_HEALTH:
reply = "health";
break;
- case RSPAMD_NOTICE_HYPERSCAN_CACHE:
+ case RSPAMD_SRV_NOTICE_HYPERSCAN_CACHE:
reply = "notice_hyperscan_cache";
break;
+ case RSPAMD_SRV_FUZZY_BLOCKED:
+ reply = "fuzzy_blocked";
+ break;
}
return reply;