From: Vsevolod Stakhov Date: Mon, 7 Nov 2011 00:20:42 +0000 (+0300) Subject: * Implement sync command for manual synchronization with backend. X-Git-Tag: 0.4.5~22 X-Git-Url: http://git.ipfire.org/cgi-bin/gitweb.cgi?a=commitdiff_plain;h=d651a97fe2551d53544d48487e3bcbbfad750eff;p=thirdparty%2Frspamd.git * Implement sync command for manual synchronization with backend. --- diff --git a/src/kvstorage.h b/src/kvstorage.h index fb58482745..e4ad31cc1a 100644 --- a/src/kvstorage.h +++ b/src/kvstorage.h @@ -47,6 +47,7 @@ typedef gboolean (*backend_insert)(struct rspamd_kv_backend *backend, gpointer k typedef gboolean (*backend_replace)(struct rspamd_kv_backend *backend, gpointer key, struct rspamd_kv_element *elt); typedef struct rspamd_kv_element* (*backend_lookup)(struct rspamd_kv_backend *backend, gpointer key); typedef void (*backend_delete)(struct rspamd_kv_backend *backend, gpointer key); +typedef gboolean (*backend_sync)(struct rspamd_kv_backend *backend); typedef void (*backend_destroy)(struct rspamd_kv_backend *backend); /* Callbacks for expire */ @@ -99,6 +100,7 @@ struct rspamd_kv_backend { backend_replace replace_func; /*< this callback is called when element is replaced */ backend_lookup lookup_func; /*< this callback is used for lookup of element */ backend_delete delete_func; /*< this callback is called when an element is deleted */ + backend_sync sync_func; /*< this callback is called when backend need to be synced */ backend_destroy destroy_func; /*< this callback is used for destroying all elements inside backend */ }; struct rspamd_kv_expire { diff --git a/src/kvstorage_bdb.c b/src/kvstorage_bdb.c index 87c343b223..280cb5ce96 100644 --- a/src/kvstorage_bdb.c +++ b/src/kvstorage_bdb.c @@ -45,6 +45,7 @@ struct rspamd_bdb_backend { backend_replace replace_func; /*< this callback is called when element is replaced */ backend_lookup lookup_func; /*< this callback is used for lookup of element */ backend_delete delete_func; /*< this callback is called when an element is deleted */ + backend_sync sync_func; /*< this callback is called when backend need to be synced */ backend_destroy destroy_func; /*< this callback is used for destroying all elements inside backend */ DB_ENV *envp; /*< db environment */ DB *dbp; /*< db pointer */ @@ -93,8 +94,9 @@ bdb_process_single_op (struct rspamd_bdb_backend *db, DB_TXN *txn, struct bdb_op /* Process operations queue */ static gboolean -bdb_process_queue (struct rspamd_bdb_backend *db) +bdb_process_queue (struct rspamd_kv_backend *backend) { + struct rspamd_bdb_backend *db = (struct rspamd_bdb_backend *)backend; struct bdb_op *op; GList *cur; @@ -198,7 +200,7 @@ rspamd_bdb_insert (struct rspamd_kv_backend *backend, gpointer key, struct rspam g_hash_table_insert (db->ops_hash, ELT_KEY (elt), op); if (g_queue_get_length (db->ops_queue) >= db->sync_ops) { - return bdb_process_queue (db); + return bdb_process_queue (backend); } return TRUE; @@ -223,7 +225,7 @@ rspamd_bdb_replace (struct rspamd_kv_backend *backend, gpointer key, struct rspa g_hash_table_insert (db->ops_hash, ELT_KEY (elt), op); if (g_queue_get_length (db->ops_queue) >= db->sync_ops) { - return bdb_process_queue (db); + return bdb_process_queue (backend); } return TRUE; @@ -292,7 +294,7 @@ rspamd_bdb_delete (struct rspamd_kv_backend *backend, gpointer key) g_hash_table_insert (db->ops_hash, ELT_KEY(elt), op); if (g_queue_get_length (db->ops_queue) >= db->sync_ops) { - bdb_process_queue (db); + bdb_process_queue (backend); } return; @@ -304,7 +306,7 @@ rspamd_bdb_destroy (struct rspamd_kv_backend *backend) struct rspamd_bdb_backend *db = (struct rspamd_bdb_backend *)backend; if (db->initialized) { - bdb_process_queue (db); + bdb_process_queue (backend); if (db->dbp != NULL) { db->dbp->close (db->dbp, 0); } @@ -353,6 +355,7 @@ rspamd_kv_bdb_new (const gchar *filename, guint sync_ops) new->lookup_func = rspamd_bdb_lookup; new->delete_func = rspamd_bdb_delete; new->replace_func = rspamd_bdb_replace; + new->sync_func = bdb_process_queue; new->destroy_func = rspamd_bdb_destroy; return (struct rspamd_kv_backend *)new; diff --git a/src/kvstorage_server.c b/src/kvstorage_server.c index c6190454c3..56f25ed976 100644 --- a/src/kvstorage_server.c +++ b/src/kvstorage_server.c @@ -199,16 +199,22 @@ parse_kvstorage_command (struct kvstorage_session *session, f_str_t *in) } } else if (p - c == 4) { - if (memcmp (c, "quit", 4) == 0) { + if (g_ascii_strncasecmp (c, "quit", 4) == 0) { session->command = KVSTORAGE_CMD_QUIT; state = 100; continue; } + if (g_ascii_strncasecmp (c, "sync", 4) == 0) { + session->command = KVSTORAGE_CMD_SYNC; + state = 100; + continue; + } } else if (p - c == 6) { - if (memcmp (c, "delete", 6) == 0) { + if (g_ascii_strncasecmp (c, "delete", 6) == 0) { session->command = KVSTORAGE_CMD_DELETE; } + else { return FALSE; } @@ -445,6 +451,41 @@ kvstorage_read_socket (f_str_t * in, void *arg) } } } + else if (session->command == KVSTORAGE_CMD_SYNC) { + if (session->cf->storage->backend == NULL || session->cf->storage->backend->sync_func == NULL) { + if (!is_redis) { + return rspamd_dispatcher_write (session->dispather, ERROR_COMMON, + sizeof (ERROR_COMMON) - 1, FALSE, TRUE); + } + else { + return rspamd_dispatcher_write (session->dispather, "-ERR unsupported" CRLF, + sizeof ("-ERR unsupported" CRLF) - 1, FALSE, TRUE); + } + } + else { + if (session->cf->storage->backend->sync_func (session->cf->storage->backend)) { + if (!is_redis) { + return rspamd_dispatcher_write (session->dispather, "SYNCED" CRLF, + sizeof ("SYNCED" CRLF) - 1, FALSE, TRUE); + } + else { + return rspamd_dispatcher_write (session->dispather, "+OK" CRLF, + sizeof ("+OK" CRLF) - 1, FALSE, TRUE); + } + } + else { + if (!is_redis) { + return rspamd_dispatcher_write (session->dispather, "NOT_SYNCED" CRLF, + sizeof ("NOT_SYNCED" CRLF) - 1, FALSE, TRUE); + } + else { + return rspamd_dispatcher_write (session->dispather, "-ERR not synced" CRLF, + sizeof ("-ERR not synced" CRLF) - 1, FALSE, TRUE); + } + } + } + g_static_rw_lock_writer_lock (&session->cf->storage->rwlock); + } else if (session->command == KVSTORAGE_CMD_QUIT) { /* Quit session */ free_kvstorage_session (session); diff --git a/src/kvstorage_server.h b/src/kvstorage_server.h index b513d33ab1..4f9e8c9518 100644 --- a/src/kvstorage_server.h +++ b/src/kvstorage_server.h @@ -65,6 +65,7 @@ struct kvstorage_session { KVSTORAGE_CMD_SET, KVSTORAGE_CMD_GET, KVSTORAGE_CMD_DELETE, + KVSTORAGE_CMD_SYNC, KVSTORAGE_CMD_QUIT } command; guint id; diff --git a/src/kvstorage_sqlite.c b/src/kvstorage_sqlite.c index bde3ea1c69..3fa4bab20e 100644 --- a/src/kvstorage_sqlite.c +++ b/src/kvstorage_sqlite.c @@ -51,6 +51,7 @@ struct rspamd_sqlite_backend { backend_replace replace_func; /*< this callback is called when element is replaced */ backend_lookup lookup_func; /*< this callback is used for lookup of element */ backend_delete delete_func; /*< this callback is called when an element is deleted */ + backend_sync sync_func; /*< this callback is called when backend need to be synced */ backend_destroy destroy_func; /*< this callback is used for destroying all elements inside backend */ sqlite3 *dbp; gchar *filename; @@ -100,8 +101,9 @@ sqlite_process_single_op (struct rspamd_sqlite_backend *db, struct sqlite_op *op /* Process operations queue */ static gboolean -sqlite_process_queue (struct rspamd_sqlite_backend *db) +sqlite_process_queue (struct rspamd_kv_backend *backend) { + struct rspamd_sqlite_backend *db = (struct rspamd_sqlite_backend *)backend; struct sqlite_op *op; GList *cur; @@ -261,7 +263,7 @@ rspamd_sqlite_insert (struct rspamd_kv_backend *backend, gpointer key, struct rs g_hash_table_insert (db->ops_hash, ELT_KEY (elt), op); if (g_queue_get_length (db->ops_queue) >= db->sync_ops) { - return sqlite_process_queue (db); + return sqlite_process_queue (backend); } return TRUE; @@ -286,7 +288,7 @@ rspamd_sqlite_replace (struct rspamd_kv_backend *backend, gpointer key, struct r g_hash_table_insert (db->ops_hash, ELT_KEY (elt), op); if (g_queue_get_length (db->ops_queue) >= db->sync_ops) { - return sqlite_process_queue (db); + return sqlite_process_queue (backend); } return TRUE; @@ -356,7 +358,7 @@ rspamd_sqlite_delete (struct rspamd_kv_backend *backend, gpointer key) g_hash_table_insert (db->ops_hash, ELT_KEY(elt), op); if (g_queue_get_length (db->ops_queue) >= db->sync_ops) { - sqlite_process_queue (db); + sqlite_process_queue (backend); } return; @@ -368,7 +370,7 @@ rspamd_sqlite_destroy (struct rspamd_kv_backend *backend) struct rspamd_sqlite_backend *db = (struct rspamd_sqlite_backend *)backend; if (db->initialized) { - sqlite_process_queue (db); + sqlite_process_queue (backend); if (db->get_stmt != NULL) { sqlite3_finalize (db->get_stmt); } @@ -422,6 +424,7 @@ rspamd_kv_sqlite_new (const gchar *filename, guint sync_ops) new->lookup_func = rspamd_sqlite_lookup; new->delete_func = rspamd_sqlite_delete; new->replace_func = rspamd_sqlite_replace; + new->sync_func = sqlite_process_queue; new->destroy_func = rspamd_sqlite_destroy; return (struct rspamd_kv_backend *)new;