#include "message.h"
#include "fuzzy.h"
#include "bloom.h"
+#include "map.h"
#include "fuzzy_storage.h"
#ifdef WITH_JUDY
guint32 expire;
guint32 frequent_score;
guint32 max_mods;
+ radix_tree_t *update_ips;
+ gchar *update_map;
+ struct event_base *ev_base;
};
struct rspamd_fuzzy_node {
fuzzy_hash_t h;
};
+struct fuzzy_session {
+ struct rspamd_worker *worker;
+ struct fuzzy_cmd cmd;
+ gint fd;
+ u_char *pos;
+ socklen_t salen;
+ union {
+ struct sockaddr ss;
+ struct sockaddr_storage sa;
+ struct sockaddr_in s4;
+ struct sockaddr_in6 v6;
+ } client_addr;
+ struct rspamd_fuzzy_storage_ctx *ctx;
+};
#ifndef HAVE_SA_SIGINFO
static void
bloom_del (bf, node->h.hash_pipe);
server_stat->fuzzy_hashes_expired ++;
server_stat->fuzzy_hashes --;
- g_free (node);
+ g_slice_free1 (sizeof (struct rspamd_fuzzy_node), node);
continue;
}
if (write (fd, node, sizeof (struct rspamd_fuzzy_node)) == -1) {
bloom_del (bf, node->h.hash_pipe);
server_stat->fuzzy_hashes_expired ++;
server_stat->fuzzy_hashes --;
- g_free (node);
+ g_slice_free1 (sizeof (struct rspamd_fuzzy_node), node);
continue;
}
if (write (fd, node, sizeof (struct rspamd_fuzzy_node)) == -1) {
event_del (&worker->bind_ev);
close (worker->cf->listen_sock);
msg_info ("worker's shutdown is pending in %d sec", SOFT_SHUTDOWN_TIME);
- event_loopexit (&tv);
+ event_base_loopexit (ctx->ev_base, &tv);
mods = ctx->max_mods + 1;
sync_cache (worker);
return;
}
for (;;) {
- node = g_malloc (sizeof (struct rspamd_fuzzy_node));
+ node = g_slice_alloc (sizeof (struct rspamd_fuzzy_node));
if (version == 0) {
r = read (fd, &legacy_node, sizeof (legacy_node));
if (r != sizeof (legacy_node)) {
}
}
- h = g_malloc (sizeof (struct rspamd_fuzzy_node));
+ h = g_slice_alloc (sizeof (struct rspamd_fuzzy_node));
memcpy (&h->h.hash_pipe, &cmd->hash, sizeof (cmd->hash));
h->h.block_size = cmd->blocksize;
h->time = (guint64) time (NULL);
if (pvalue) {
data = *pvalue;
res = JudySLDel (&jtree, s->hash_pipe, PJE0);
- g_free (data);
+ g_slice_free1 (sizeof (struct rspamd_fuzzy_node), data);
bloom_del (bf, s->hash_pipe);
msg_info ("fuzzy hash was successfully deleted");
server_stat->fuzzy_hashes --;
while (cur) {
h = cur->data;
if (fuzzy_compare_hashes (&h->h, s) > LEV_LIMIT) {
- g_free (h);
+ g_slice_free1 (sizeof (struct rspamd_fuzzy_node), h);
tmp = cur;
cur = g_list_next (cur);
g_queue_delete_link (hash, tmp);
return res;
}
+/**
+ * Checks the client's address for update commands permission
+ */
+static gboolean
+check_fuzzy_client (struct fuzzy_session *session)
+{
+ if (session->ctx->update_ips != NULL) {
+ /* XXX: cannot work with ipv6 addresses */
+ if (session->client_addr.ss.sa_family != AF_INET) {
+ return FALSE;
+ }
+ if (radix32tree_find (session->ctx->update_ips,
+ ntohl (session->client_addr.s4.sin_addr.s_addr)) == RADIX_NO_VALUE) {
+ return FALSE;
+ }
+ }
+
+ return TRUE;
+}
+
#define CMD_PROCESS(x) \
do { \
if (process_##x##_command (&session->cmd, session->worker->ctx)) { \
- if (sendto (session->fd, "OK" CRLF, sizeof ("OK" CRLF) - 1, 0, (struct sockaddr *)&session->sa, session->salen) == -1) { \
+ if (sendto (session->fd, "OK" CRLF, sizeof ("OK" CRLF) - 1, 0, &session->client_addr.ss, session->salen) == -1) { \
msg_err ("error while writing reply: %s", strerror (errno)); \
} \
} \
else { \
- if (sendto (session->fd, "ERR" CRLF, sizeof ("ERR" CRLF) - 1, 0, (struct sockaddr *)&session->sa, session->salen) == -1) { \
+ if (sendto (session->fd, "ERR" CRLF, sizeof ("ERR" CRLF) - 1, 0, &session->client_addr.ss, session->salen) == -1) { \
msg_err ("error while writing reply: %s", strerror (errno)); \
} \
} \
r = process_check_command (&session->cmd, &flag, session->worker->ctx);
if (r != 0) {
r = rspamd_snprintf (buf, sizeof (buf), "OK %d %d" CRLF, r, flag);
- if (sendto (session->fd, buf, r, 0, (struct sockaddr *)&session->sa, session->salen) == -1) {
+ if (sendto (session->fd, buf, r, 0,
+ &session->client_addr.ss, session->salen) == -1) {
msg_err ("error while writing reply: %s", strerror (errno));
}
}
else {
- if (sendto (session->fd, "ERR" CRLF, sizeof ("ERR" CRLF) - 1, 0, (struct sockaddr *)&session->sa, session->salen) == -1) {
+ if (sendto (session->fd, "ERR" CRLF, sizeof ("ERR" CRLF) - 1, 0,
+ &session->client_addr.ss, session->salen) == -1) {
msg_err ("error while writing reply: %s", strerror (errno));
}
}
break;
case FUZZY_WRITE:
- CMD_PROCESS (write);
+ if (!check_fuzzy_client (session)) {
+ msg_info ("try to insert a hash from an untrusted address");
+ if (sendto (session->fd, "UNAUTH" CRLF, sizeof ("UNAUTH" CRLF) - 1, 0,
+ &session->client_addr.ss, session->salen) == -1) {
+ msg_err ("error while writing reply: %s", strerror (errno));
+ }
+ }
+ else {
+ CMD_PROCESS (write);
+ }
break;
case FUZZY_DEL:
- CMD_PROCESS (delete);
+ if (!check_fuzzy_client (session)) {
+ msg_info ("try to delete a hash from an untrusted address");
+ if (sendto (session->fd, "UNAUTH" CRLF, sizeof ("UNAUTH" CRLF) - 1, 0,
+ &session->client_addr.ss, session->salen) == -1) {
+ msg_err ("error while writing reply: %s", strerror (errno));
+ }
+ }
+ else {
+ CMD_PROCESS (delete);
+ }
break;
default:
- if (sendto (session->fd, "ERR" CRLF, sizeof ("ERR" CRLF) - 1, 0, (struct sockaddr *)&session->sa, session->salen) == -1) {
+ if (sendto (session->fd, "ERR" CRLF, sizeof ("ERR" CRLF) - 1, 0,
+ &session->client_addr.ss, session->salen) == -1) {
msg_err ("error while writing reply: %s", strerror (errno));
}
break;
session.worker = worker;
session.fd = fd;
session.pos = (u_char *) & session.cmd;
- session.salen = sizeof (session.sa);
+ session.salen = sizeof (session.client_addr);
+ session.ctx = worker->ctx;
/* Got some data */
if (what == EV_READ) {
- if ((r = recvfrom (fd, session.pos, sizeof (struct fuzzy_cmd), MSG_WAITALL, (struct sockaddr *)&session.sa, &session.salen)) == -1) {
+ while ((r = recvfrom (fd, session.pos, sizeof (struct fuzzy_cmd),
+ MSG_WAITALL, &session.client_addr.ss, &session.salen)) == -1) {
+ if (errno == EINTR) {
+ continue;
+ }
msg_err ("got error while reading from socket: %d, %s", errno, strerror (errno));
return;
}
- else if (r == sizeof (struct fuzzy_cmd)) {
+ if (r == sizeof (struct fuzzy_cmd)) {
/* Assume that the whole command was read */
process_fuzzy_command (&session);
}
static void
sync_callback (gint fd, short what, void *arg)
{
- struct rspamd_worker *worker = (struct rspamd_worker *)arg;
+ struct rspamd_worker *worker = (struct rspamd_worker *)arg;
+ struct rspamd_fuzzy_storage_ctx *ctx;
+
+ ctx = worker->ctx;
/* Timer event */
evtimer_set (&tev, sync_callback, worker);
+ event_base_set (ctx->ev_base, &tev);
/* Plan event with jitter */
tmv.tv_sec = SYNC_TIMEOUT + SYNC_TIMEOUT * g_random_double ();
tmv.tv_usec = 0;
sync_cache (worker);
}
+static gboolean
+parse_fuzzy_update_list (struct rspamd_fuzzy_storage_ctx *ctx)
+{
+ gchar **strvec, **cur;
+ struct in_addr ina;
+ guint32 mask;
+
+ strvec = g_strsplit_set (ctx->update_map, ",", 0);
+ cur = strvec;
+
+ while (*cur != NULL) {
+ /* XXX: handle only ipv4 addresses */
+ if (parse_ipmask_v4 (*cur, &ina, &mask)) {
+ if (ctx->update_ips == NULL) {
+ ctx->update_ips = radix_tree_create ();
+ }
+ radix32tree_add (ctx->update_ips, htonl (ina.s_addr), mask, 1);
+ }
+ }
+
+ return (ctx->update_ips != NULL);
+}
+
gpointer
init_fuzzy (void)
{
register_worker_opt (type, "max_mods", xml_handle_uint32, ctx,
G_STRUCT_OFFSET (struct rspamd_fuzzy_storage_ctx, max_mods));
register_worker_opt (type, "frequent_score", xml_handle_uint32, ctx,
- G_STRUCT_OFFSET (struct rspamd_fuzzy_storage_ctx, frequent_score));
+ G_STRUCT_OFFSET (struct rspamd_fuzzy_storage_ctx, frequent_score));
register_worker_opt (type, "expire", xml_handle_seconds, ctx,
- G_STRUCT_OFFSET (struct rspamd_fuzzy_storage_ctx, expire));
+ G_STRUCT_OFFSET (struct rspamd_fuzzy_storage_ctx, expire));
register_worker_opt (type, "use_judy", xml_handle_boolean, ctx,
- G_STRUCT_OFFSET (struct rspamd_fuzzy_storage_ctx, use_judy));
+ G_STRUCT_OFFSET (struct rspamd_fuzzy_storage_ctx, use_judy));
+ register_worker_opt (type, "allow_update", xml_handle_string, ctx,
+ G_STRUCT_OFFSET (struct rspamd_fuzzy_storage_ctx, update_map));
return ctx;
}
void
start_fuzzy (struct rspamd_worker *worker)
{
- struct sigaction signals;
- struct event sev;
- gint retries = 0;
+ struct sigaction signals;
+ struct event sev;
+ gint retries = 0;
+ struct rspamd_fuzzy_storage_ctx *ctx = worker->ctx;
worker->srv->pid = getpid ();
- event_init ();
+ ctx->ev_base = event_init ();
server_stat = worker->srv->stat;
/* SIGUSR2 handler */
signal_set (&worker->sig_ev_usr2, SIGUSR2, sigusr2_handler, (void *) worker);
+ event_base_set (ctx->ev_base, &worker->sig_ev_usr2);
signal_add (&worker->sig_ev_usr2, NULL);
/* SIGUSR1 handler */
signal_set (&worker->sig_ev_usr1, SIGUSR1, sigusr1_handler, (void *) worker);
+ event_base_set (ctx->ev_base, &worker->sig_ev_usr1);
signal_add (&worker->sig_ev_usr1, NULL);
signal_set (&sev, SIGTERM, sigterm_handler, (void *)worker);
+ event_base_set (ctx->ev_base, &sev);
signal_add (&sev, NULL);
/* Listen event */
}
/* Timer event */
evtimer_set (&tev, sync_callback, worker);
+ event_base_set (ctx->ev_base, &tev);
/* Plan event with jitter */
tmv.tv_sec = SYNC_TIMEOUT + SYNC_TIMEOUT * g_random_double ();
tmv.tv_usec = 0;
evtimer_add (&tev, &tmv);
event_set (&worker->bind_ev, worker->cf->listen_sock, EV_READ | EV_PERSIST, accept_fuzzy_socket, (void *)worker);
+ event_base_set (ctx->ev_base, &worker->bind_ev);
event_add (&worker->bind_ev, NULL);
+ /* Create radix tree */
+ if (ctx->update_map != NULL) {
+ if (!add_map (worker->srv->cfg, ctx->update_map, "Allow fuzzy updates from specified addresses",
+ read_radix_list, fin_radix_list, (void **)&ctx->update_ips)) {
+ if (!parse_fuzzy_update_list (ctx)) {
+ msg_warn ("cannot load or parse ip list from '%s'", ctx->update_map);
+ }
+ }
+ }
+
+ /* Maps events */
+ start_map_watch (worker->srv->cfg, ctx->ev_base);
+
gperf_profiler_init (worker->srv->cfg, "fuzzy");
- event_loop (0);
+ event_base_loop (ctx->ev_base, 0);
+ close_log (rspamd_main->logger);
exit (EXIT_SUCCESS);
}