'control' => 0,
'statfile' => '',
'deliver_to'=> '',
+ 'weight' => 1,
);
$main::VERSION = '@RSPAMD_VERSION@';
-c config file to parse
-s statfile to use for learn commands
-d define deliver-to header
-imap format: imap:user:<username>:password:<password>:host:<hostname>:mbox:<mboxname>
+-w define weight for fuzzy operations
+imap format: imap:user:<username>:password:[<password>]:host:<hostname>:mbox:<mboxname>
Version: @RSPAMD_VERSION@
EOD
};
sub do_ctrl_auth {
my ($sock) = @_;
+ my $res = 0;
syswrite $sock, "password $cfg{'password'}" . $CRLF;
if (defined (my $reply = <$sock>)) {
- my $end = <$sock>;
if ($reply =~ /^password accepted/) {
- return 1;
+ $res = 1;
}
}
+
+ # END
+ return 0 unless <$sock>;
- return 0;
+ return $res;
}
sub do_control_command {
if (do_ctrl_auth ($sock)) {
my $len = length ($input);
print "Sending $len bytes...\n";
- syswrite $sock, $cfg{'command'} . " $len" . $CRLF;
+ syswrite $sock, $cfg{'command'} . " $len $cfg{'weight'}" . $CRLF;
syswrite $sock, $input . $CRLF;
if (defined (my $reply = <$sock>)) {
if ($reply =~ /^OK/) {
############################# Main part ###########################################
my %args;
-getopt('c:h:p:P:s:d:', \%args);
+getopt('c:h:p:P:s:d:w:', \%args);
my $cmd = shift;
my @path = shift;
my $do_parse_config = 1;
if (defined ($args{d})) {
$cfg{'deliver_to'} = $args{d};
}
+if (defined ($args{w})) {
+ $cfg{'weight'} = $args{w};
+}
if ($cmd =~ /(SYMBOLS|SCAN|PROCESS|CHECK|REPORT_IFSPAM|REPORT|URLS|EMAILS)/i) {
$cfg{'command'} = $1;
static struct event tev;
struct rspamd_fuzzy_node {
+ int32_t value;
fuzzy_hash_t h;
uint64_t time;
};
return TRUE;
}
-static gboolean
+static int
process_check_command (struct fuzzy_cmd *cmd)
{
GList *cur;
int prob = 0;
if (!bloom_check (bf, cmd->hash)) {
- return FALSE;
+ return 0;
}
memcpy (s.hash_pipe, cmd->hash, sizeof (s.hash_pipe));
h = cur->data;
if ((prob = fuzzy_compare_hashes (&h->h, &s)) > LEV_LIMIT) {
msg_info ("fuzzy hash was found, probability %d%%", prob);
- return TRUE;
+ return h->value;
}
cur = g_list_next (cur);
}
msg_debug ("fuzzy hash was NOT found, prob is %d%%", prob);
+ return 0;
+}
+
+static gboolean
+update_hash (struct fuzzy_cmd *cmd)
+{
+ GList *cur;
+ struct rspamd_fuzzy_node *h;
+ fuzzy_hash_t s;
+ int prob = 0;
+
+ memcpy (s.hash_pipe, cmd->hash, sizeof (s.hash_pipe));
+ s.block_size = cmd->blocksize;
+ cur = hashes[cmd->blocksize % BUCKETS]->head;
+
+ /* XXX: too slow way */
+ while (cur) {
+ h = cur->data;
+ if ((prob = fuzzy_compare_hashes (&h->h, &s)) > LEV_LIMIT) {
+ h->value += cmd->value;
+ msg_info ("fuzzy hash was found, probability %d%%, set new value to %d", prob, h->value);
+ return TRUE;
+ }
+ cur = g_list_next (cur);
+ }
+
return FALSE;
}
struct rspamd_fuzzy_node *h;
if (bloom_check (bf, cmd->hash)) {
- return FALSE;
+ if (update_hash (cmd)) {
+ return TRUE;
+ }
}
h = g_malloc (sizeof (struct rspamd_fuzzy_node));
static void
process_fuzzy_command (struct fuzzy_session *session)
{
+ int r;
+ char buf[64];
+
switch (session->cmd.cmd) {
case FUZZY_CHECK:
- CMD_PROCESS (check);
+ if ((r = process_check_command (&session->cmd))) {
+ r = snprintf (buf, sizeof (buf), "OK %d" CRLF, r);
+ if (sendto (session->fd, buf, r, 0, (struct sockaddr *)&session->sa, session->salen) == -1) {
+ msg_err ("error while writing reply: %s", strerror (errno));
+ }
+ }
+ else {
+ if (sendto (session->fd, "ERR" CRLF, sizeof ("ERR" CRLF) - 1, 0, (struct sockaddr *)&session->sa, session->salen) == -1) {
+ msg_err ("error while writing reply: %s", strerror (errno));
+ }
+ }
break;
case FUZZY_WRITE:
CMD_PROCESS (write);
struct event ev;
fuzzy_hash_t *h;
int cmd;
+ int value;
int *saved;
struct timeval tv;
struct controller_session *session;
return fuzzy_check_module_config (cfg);
}
+/* Finalize IO */
static void
fuzzy_io_fin (void *ud)
{
}
}
+/* Call this whenever we got data from fuzzy storage */
static void
fuzzy_io_callback (int fd, short what, void *arg)
{
struct fuzzy_client_session *session = arg;
struct fuzzy_cmd cmd;
- char buf[sizeof ("ERR")];
+ char buf[62], *err_str;
+ int value;
if (what == EV_WRITE) {
/* Send command to storage */
}
}
else if (what == EV_READ) {
+ /* Got reply */
if (read (fd, buf, sizeof (buf)) == -1) {
goto err;
}
else if (buf[0] == 'O' && buf[1] == 'K') {
- insert_result (session->task, fuzzy_module_ctx->metric, fuzzy_module_ctx->symbol, 1, NULL);
+ /* Now try to get value */
+ value = strtol (buf + 3, &err_str, 10);
+ *err_str = '\0';
+ insert_result (session->task, fuzzy_module_ctx->metric, fuzzy_module_ctx->symbol, value, g_list_prepend (NULL,
+ memory_pool_strdup (session->task->task_pool, buf + 3)));
}
goto ok;
}
{
struct fuzzy_learn_session *session = arg;
struct fuzzy_cmd cmd;
- char buf[sizeof ("ERR" CRLF)];
+ char buf[64];
+ int r;
if (what == EV_WRITE) {
/* Send command to storage */
cmd.blocksize = session->h->block_size;
memcpy (cmd.hash, session->h->hash_pipe, sizeof (cmd.hash));
cmd.cmd = session->cmd;
+ cmd.value = session->value;
if (write (fd, &cmd, sizeof (struct fuzzy_cmd)) == -1) {
goto err;
}
if (read (fd, buf, sizeof (buf)) == -1) {
goto err;
}
- goto ok;
+ else if (buf[0] == 'O' && buf[1] == 'K') {
+ r = snprintf (buf, sizeof (buf), "OK" CRLF);
+ rspamd_dispatcher_write (session->session->dispatcher, buf, r, FALSE, FALSE);
+ goto ok;
+ }
+ goto err;
}
return;
err:
msg_err ("got error in IO with server %s:%d, %d, %s", session->server->name, session->server->port, errno, strerror (errno));
+ r = snprintf (buf, sizeof (buf), "Error" CRLF);
+ rspamd_dispatcher_write (session->session->dispatcher, buf, r, FALSE, FALSE);
ok:
close (fd);
remove_normal_event (session->session->s, fuzzy_learn_fin, session);
}
+/* This callback is called when we check message via fuzzy hashes storage */
static void
fuzzy_symbol_callback (struct worker_task *task, void *unused)
{
cur = g_list_next (cur);
continue;
}
+ /* Get upstream */
selected = (struct storage_server *)get_upstream_by_hash (fuzzy_module_ctx->servers, fuzzy_module_ctx->servers_num,
sizeof (struct storage_server), task->ts.tv_sec,
DEFAULT_UPSTREAM_ERROR_TIME, DEFAULT_UPSTREAM_DEAD_TIME, DEFAULT_UPSTREAM_MAXERRORS, part->fuzzy->hash_pipe, sizeof (part->fuzzy->hash_pipe));
msg_warn ("cannot connect to %s, %d, %s", selected->name, errno, strerror (errno));
}
else {
+ /* Create session for a socket */
session = memory_pool_alloc (task->task_pool, sizeof (struct fuzzy_client_session));
event_set (&session->ev, sock, EV_WRITE, fuzzy_io_callback, session);
session->tv.tv_sec = IO_TIMEOUT;
struct mime_text_part *part;
struct storage_server *selected;
GList *cur;
- int sock, r, cmd = 0, *saved;
+ int sock, r, cmd = 0, value = 0, *saved, *sargs;
char out_buf[BUFSIZ];
+ /* Extract arguments */
if (session->other_data) {
- cmd = GPOINTER_TO_SIZE (session->other_data);
+ sargs = session->other_data;
+ cmd = sargs[0];
+ value = sargs[1];
}
+
+ /* Prepare task */
task = construct_task (session->worker);
session->other_data = task;
session->state = STATE_WAIT;
-
+
+ /* Allocate message from string */
task->msg = memory_pool_alloc (task->task_pool, sizeof (f_str_t));
task->msg->begin = in->begin;
task->msg->len = in->len;
+
saved = memory_pool_alloc0 (session->session_pool, sizeof (int));
r = process_message (task);
cur = g_list_next (cur);
continue;
}
+ /* Get upstream */
selected = (struct storage_server *)get_upstream_by_hash (fuzzy_module_ctx->servers, fuzzy_module_ctx->servers_num,
sizeof (struct storage_server), task->ts.tv_sec,
DEFAULT_UPSTREAM_ERROR_TIME, DEFAULT_UPSTREAM_DEAD_TIME, DEFAULT_UPSTREAM_MAXERRORS, part->fuzzy->hash_pipe, sizeof (part->fuzzy->hash_pipe));
if (selected) {
+ /* Create UDP socket */
if ((sock = make_udp_socket (&selected->addr, selected->port, FALSE, TRUE)) == -1) {
msg_warn ("cannot connect to %s, %d, %s", selected->name, errno, strerror (errno));
session->state = STATE_REPLY;
return;
}
else {
+ /* Socket is made, create session */
s = memory_pool_alloc (session->session_pool, sizeof (struct fuzzy_learn_session));
event_set (&s->ev, sock, EV_WRITE, fuzzy_learn_callback, s);
s->tv.tv_sec = IO_TIMEOUT;
s->session = session;
s->server = selected;
s->cmd = cmd;
+ s->value = value;
s->saved = saved;
event_add (&s->ev, &s->tv);
(*saved)++;
}
}
else {
+ /* Cannot write hash */
session->state = STATE_REPLY;
r = snprintf (out_buf, sizeof (out_buf), "cannot write fuzzy hash" CRLF);
rspamd_dispatcher_write (session->dispatcher, out_buf, r, FALSE, FALSE);
{
char *arg, out_buf[BUFSIZ], *err_str;
uint32_t size;
- int r;
+ int r, value, *sargs;
- arg = *args;
+ /* Process size */
+ arg = args[0];
if (!arg || *arg == '\0') {
msg_info ("empty content length");
r = snprintf (out_buf, sizeof (out_buf), "fuzzy command requires length as argument" CRLF);
session->state = STATE_REPLY;
return;
}
-
size = strtoul (arg, &err_str, 10);
if (err_str && *err_str != '\0') {
r = snprintf (out_buf, sizeof (out_buf), "learn size is invalid" CRLF);
session->state = STATE_REPLY;
return;
}
+ /* Process value */
+ arg = args[1];
+ if (!arg || *arg == '\0') {
+ msg_info ("empty value, assume it 1");
+ value = 1;
+ }
+ else {
+ value = strtol (arg, &err_str, 10);
+ }
session->state = STATE_OTHER;
rspamd_set_dispatcher_policy (session->dispatcher, BUFFER_CHARACTER, size);
session->other_handler = fuzzy_process_handler;
- session->other_data = GSIZE_TO_POINTER (cmd);
+ /* Prepare args */
+ sargs = memory_pool_alloc (session->session_pool, sizeof (int) * 2);
+ sargs[0] = cmd;
+ sargs[1] = value;
+ session->other_data = sargs;
}
static void