]> git.ipfire.org Git - thirdparty/rspamd.git/commitdiff
Another fix to locking logic.
authorVsevolod Stakhov <vsevolod@rambler-co.ru>
Thu, 24 Nov 2011 17:11:27 +0000 (20:11 +0300)
committerVsevolod Stakhov <vsevolod@rambler-co.ru>
Thu, 24 Nov 2011 17:11:27 +0000 (20:11 +0300)
src/kvstorage.c
src/kvstorage.h
src/kvstorage_file.c
src/kvstorage_server.c

index 60d28a833d9f6dd858e468879a0f2665b04fb640..5ad076b3ed5dbfbb9503869f9b70e30237b40384 100644 (file)
@@ -78,31 +78,32 @@ rspamd_kv_storage_new (gint id, const gchar *name, struct rspamd_kv_cache *cache
 
 /** Internal insertion to the kv storage from backend */
 gboolean
-rspamd_kv_storage_insert_internal (struct rspamd_kv_storage *storage, gpointer key, guint keylen,
+rspamd_kv_storage_insert_cache (struct rspamd_kv_storage *storage, gpointer key, guint keylen,
                gpointer data, gsize len, gint flags, guint expire, struct rspamd_kv_element **pelt)
 {
        gint                                                            steps = 0;
-       struct rspamd_kv_element                        *elt = *pelt;
+       struct rspamd_kv_element                        *elt;
 
+       g_static_rw_lock_writer_lock (&storage->rwlock);
        /* Hard limit */
        if (storage->max_memory > 0) {
                if (len > storage->max_memory) {
                        msg_info ("<%s>: trying to insert value of length %z while limit is %z", storage->name,
                                        len, storage->max_memory);
+                       g_static_rw_lock_writer_unlock (&storage->rwlock);
                        return FALSE;
                }
 
                /* Now check limits */
                while (storage->memory + len > storage->max_memory) {
                        if (storage->expire) {
-                               g_static_rw_lock_writer_lock (&storage->rwlock);
                                storage->expire->step_func (storage->expire, storage, time (NULL), steps);
-                               g_static_rw_lock_writer_unlock (&storage->rwlock);
                        }
                        else {
                                msg_warn ("<%s>: storage is full and no expire function is defined", storage->name);
                        }
                        if (++steps > MAX_EXPIRE_STEPS) {
+                               g_static_rw_lock_writer_unlock (&storage->rwlock);
                                msg_warn ("<%s>: cannot expire enough keys in storage", storage->name);
                                return FALSE;
                        }
@@ -110,7 +111,7 @@ rspamd_kv_storage_insert_internal (struct rspamd_kv_storage *storage, gpointer k
        }
 
        /* Insert elt to the cache */
-       g_static_rw_lock_writer_lock (&storage->rwlock);
+
        elt = storage->cache->insert_func (storage->cache, key, keylen, data, len);
 
        if (elt == NULL) {
@@ -120,7 +121,10 @@ rspamd_kv_storage_insert_internal (struct rspamd_kv_storage *storage, gpointer k
        /* Copy data */
        elt->flags = flags;
        elt->expire = expire;
-       *pelt = elt;
+
+       if (pelt != NULL) {
+               *pelt = elt;
+       }
 
        /* Insert to the expire */
        if (storage->expire) {
@@ -145,24 +149,25 @@ rspamd_kv_storage_insert (struct rspamd_kv_storage *storage, gpointer key, guint
        glong                                                           longval;
 
        /* Hard limit */
+       g_static_rw_lock_writer_lock (&storage->rwlock);
        if (storage->max_memory > 0) {
                if (len + sizeof (struct rspamd_kv_element) + keylen >= storage->max_memory) {
                        msg_warn ("<%s>: trying to insert value of length %z while limit is %z", storage->name,
                                        len, storage->max_memory);
+                       g_static_rw_lock_writer_unlock (&storage->rwlock);
                        return FALSE;
                }
 
                /* Now check limits */
                while (storage->memory + len + keylen > storage->max_memory) {
                        if (storage->expire) {
-                               g_static_rw_lock_writer_lock (&storage->rwlock);
                                storage->expire->step_func (storage->expire, storage, time (NULL), steps);
-                               g_static_rw_lock_writer_unlock (&storage->rwlock);
                        }
                        else {
                                msg_warn ("<%s>: storage is full and no expire function is defined", storage->name);
                        }
                        if (++steps > MAX_EXPIRE_STEPS) {
+                               g_static_rw_lock_writer_unlock (&storage->rwlock);
                                msg_warn ("<%s>: cannot expire enough keys in storage", storage->name);
                                return FALSE;
                        }
@@ -173,14 +178,13 @@ rspamd_kv_storage_insert (struct rspamd_kv_storage *storage, gpointer key, guint
                steps = 0;
                while (storage->elts > storage->max_elts) {
                        if (storage->expire) {
-                               g_static_rw_lock_writer_lock (&storage->rwlock);
                                storage->expire->step_func (storage->expire, storage, time (NULL), steps);
-                               g_static_rw_lock_writer_unlock (&storage->rwlock);
                        }
                        else {
                                msg_warn ("<%s>: storage is full and no expire function is defined", storage->name);
                        }
                        if (++steps > MAX_EXPIRE_STEPS) {
+                               g_static_rw_lock_writer_unlock (&storage->rwlock);
                                msg_warn ("<%s>: cannot expire enough keys in storage", storage->name);
                                return FALSE;
                        }
@@ -188,7 +192,7 @@ rspamd_kv_storage_insert (struct rspamd_kv_storage *storage, gpointer key, guint
        }
 
        /* First try to search it in cache */
-       g_static_rw_lock_writer_lock (&storage->rwlock);
+
        elt = storage->cache->lookup_func (storage->cache, key, keylen);
        if (elt) {
                if (storage->expire) {
@@ -309,9 +313,11 @@ rspamd_kv_storage_increment (struct rspamd_kv_storage *storage, gpointer key, gu
                if (belt) {
                        /* Put this element into cache */
                        if ((belt->flags & KV_ELT_INTEGER) != 0) {
-                               rspamd_kv_storage_insert_internal (storage, ELT_KEY (belt), keylen, ELT_DATA (belt),
+                               g_static_rw_lock_writer_unlock (&storage->rwlock);
+                               rspamd_kv_storage_insert_cache (storage, ELT_KEY (belt), keylen, ELT_DATA (belt),
                                        belt->size, belt->flags,
                                        belt->expire, &elt);
+                               g_static_rw_lock_writer_lock (&storage->rwlock);
                        }
                        if ((belt->flags & KV_ELT_DIRTY) == 0) {
                                g_free (belt);
@@ -346,21 +352,19 @@ rspamd_kv_storage_lookup (struct rspamd_kv_storage *storage, gpointer key, guint
        /* First try to look at cache */
        g_static_rw_lock_reader_lock (&storage->rwlock);
        elt = storage->cache->lookup_func (storage->cache, key, keylen);
-       g_static_rw_lock_reader_unlock (&storage->rwlock);
-
 
        /* Next look at the backend */
        if (elt == NULL && storage->backend) {
-               g_static_rw_lock_reader_lock (&storage->rwlock);
                belt = storage->backend->lookup_func (storage->backend, key, keylen);
-               g_static_rw_lock_reader_unlock (&storage->rwlock);
+
                if (belt) {
                        /* Put this element into cache */
-                       rspamd_kv_storage_insert_internal (storage, ELT_KEY (belt), keylen, ELT_DATA (belt),
-                                       belt->size, belt->flags,
-                                       belt->expire, &elt);
                        if ((belt->flags & KV_ELT_DIRTY) == 0) {
-                               g_free (belt);
+                               belt->flags |= KV_ELT_NEED_INSERT;
+                               return belt;
+                       }
+                       else {
+                               elt = belt;
                        }
                }
        }
@@ -372,6 +376,7 @@ rspamd_kv_storage_lookup (struct rspamd_kv_storage *storage, gpointer key, guint
                }
        }
 
+       /* RWlock is still locked */
        return elt;
 }
 
@@ -396,7 +401,14 @@ rspamd_kv_storage_delete (struct rspamd_kv_storage *storage, gpointer key, guint
                }
                storage->elts --;
                storage->memory -= elt->size;
+               if ((elt->flags & KV_ELT_DIRTY) != 0) {
+                       elt->flags |= KV_ELT_NEED_FREE;
+               }
+               else {
+                       g_slice_free1 (ELT_SIZE (elt), elt);
+               }
        }
+
        g_static_rw_lock_writer_unlock (&storage->rwlock);
 
        return elt;
@@ -437,7 +449,7 @@ rspamd_kv_storage_insert_array (struct rspamd_kv_storage *storage, gpointer key,
        es = arr_data;
        *es = elt_size;
        memcpy (arr_data, (gchar *)data + sizeof (guint), len);
-       if (!rspamd_kv_storage_insert_internal (storage, key, keylen, arr_data, len + sizeof (guint),
+       if (!rspamd_kv_storage_insert_cache (storage, key, keylen, arr_data, len + sizeof (guint),
                        flags, expire, &elt)) {
                g_slice_free1 (len + sizeof (guint), arr_data);
                return FALSE;
@@ -583,7 +595,12 @@ rspamd_lru_expire_step (struct rspamd_kv_expire *e, struct rspamd_kv_storage *st
                        storage->elts --;
                        TAILQ_REMOVE (&expire->head, elt, entry);
                        /* Free memory */
-                       g_slice_free1 (ELT_SIZE (elt), elt);
+                       if ((elt->flags & (KV_ELT_DIRTY|KV_ELT_NEED_INSERT)) != 0) {
+                               elt->flags |= KV_ELT_NEED_FREE;
+                       }
+                       else {
+                               g_slice_free1 (ELT_SIZE (elt), elt);
+                       }
                        res = TRUE;
                        /* Check other elements in this queue */
                        TAILQ_FOREACH_SAFE (elt, &expire->head, entry, temp) {
@@ -596,7 +613,7 @@ rspamd_lru_expire_step (struct rspamd_kv_expire *e, struct rspamd_kv_storage *st
                                storage->cache->steal_func (storage->cache, elt);
                                TAILQ_REMOVE (&expire->head, elt, entry);
                                /* Free memory */
-                               if ((elt->flags & KV_ELT_DIRTY) != 0) {
+                               if ((elt->flags & (KV_ELT_DIRTY|KV_ELT_NEED_INSERT)) != 0) {
                                        elt->flags |= KV_ELT_NEED_FREE;
                                }
                                else {
@@ -613,7 +630,7 @@ rspamd_lru_expire_step (struct rspamd_kv_expire *e, struct rspamd_kv_storage *st
                storage->cache->steal_func (storage->cache, oldest_elt);
                TAILQ_REMOVE (&expire->head, oldest_elt, entry);
                /* Free memory */
-               if ((oldest_elt->flags & KV_ELT_DIRTY) != 0) {
+               if ((oldest_elt->flags & (KV_ELT_DIRTY|KV_ELT_NEED_INSERT)) != 0) {
                        oldest_elt->flags |= KV_ELT_NEED_FREE;
                }
                else {
index f970a7086723f6d1561bf9f51e50a6bc27a9cca9..b990505835f6da7069cbbc2df36658aeb9e4ff64 100644 (file)
@@ -65,7 +65,8 @@ enum rspamd_kv_flags {
        KV_ELT_DIRTY = 1 << 2,
        KV_ELT_OUSTED = 1 << 3,
        KV_ELT_NEED_FREE = 1 << 4,
-       KV_ELT_INTEGER = 1 << 5
+       KV_ELT_INTEGER = 1 << 5,
+       KV_ELT_NEED_INSERT = 1 << 6
 };
 
 #define ELT_DATA(elt) (gchar *)(elt)->data + (elt)->keylen + 1
@@ -140,6 +141,10 @@ struct rspamd_kv_storage *rspamd_kv_storage_new (gint id, const gchar *name,
 /** Insert new element to the kv storage */
 gboolean rspamd_kv_storage_insert (struct rspamd_kv_storage *storage, gpointer key, guint keylen, gpointer data, gsize len, gint flags, guint expire);
 
+/** Insert element only in cache */
+gboolean rspamd_kv_storage_insert_cache (struct rspamd_kv_storage *storage, gpointer key, guint keylen,
+               gpointer data, gsize len, gint flags, guint expire, struct rspamd_kv_element **pelt);
+
 /** Replace an element in the kv storage */
 gboolean rspamd_kv_storage_replace (struct rspamd_kv_storage *storage, gpointer key, guint keylen, struct rspamd_kv_element *elt);
 
index c342b1fbfc2e1cd9d3a6be5f22bd021c987a9699..81521720cd8a4715afde91ed4e3eddd76ee996ae 100644 (file)
@@ -201,10 +201,15 @@ file_process_queue (struct rspamd_kv_backend *backend)
        cur = db->ops_queue->head;
        while (cur) {
                op = cur->data;
-               if (op->op == FILE_OP_DELETE || (op->elt->flags & KV_ELT_NEED_FREE) != 0) {
+               if (op->op == FILE_OP_DELETE || ((op->elt->flags & KV_ELT_NEED_FREE) != 0 &&
+                               (op->elt->flags & KV_ELT_NEED_INSERT) == 0)) {
                        /* Also clean memory */
                        g_slice_free1 (ELT_SIZE (op->elt), op->elt);
                }
+               else {
+                       /* Unset dirty flag */
+                       op->elt->flags &= ~KV_ELT_DIRTY;
+               }
                g_slice_free1 (sizeof (struct file_op), op);
                cur = g_list_next (cur);
        }
@@ -419,6 +424,8 @@ rspamd_file_lookup (struct rspamd_kv_backend *backend, gpointer key, guint keyle
 
        close (fd);
 
+       elt->flags &= ~(KV_ELT_DIRTY|KV_ELT_NEED_FREE);
+
        return elt;
 }
 
@@ -426,39 +433,27 @@ static void
 rspamd_file_delete (struct rspamd_kv_backend *backend, gpointer key, guint keylen)
 {
        struct rspamd_file_backend                                      *db = (struct rspamd_file_backend *)backend;
-       struct file_op                                                          *op;
-       struct rspamd_kv_element                                        *elt;
+       gchar                                                                            filebuf[PATH_MAX];
        struct rspamd_kv_element                                         search_elt;
-
-       search_elt.keylen = keylen;
-       search_elt.p = key;
+       struct file_op                                                          *op;
 
        if (!db->initialized) {
                return;
        }
 
+       search_elt.keylen = keylen;
+       search_elt.p = key;
+       /* First search in ops queue */
        if ((op = g_hash_table_lookup (db->ops_hash, &search_elt)) != NULL) {
                op->op = FILE_OP_DELETE;
                return;
        }
-
-       elt = rspamd_file_lookup (backend, key, keylen);
-       if (elt == NULL) {
+       /* Get filename */
+       if (!get_file_name (db, key, keylen, filebuf, sizeof (filebuf))) {
                return;
        }
-       op = g_slice_alloc (sizeof (struct file_op));
-       op->op = FILE_OP_DELETE;
-       op->elt = elt;
-       elt->flags |= KV_ELT_DIRTY;
-
-       g_queue_push_head (db->ops_queue, op);
-       g_hash_table_insert (db->ops_hash, elt, op);
-
-       if (db->sync_ops > 0 && g_queue_get_length (db->ops_queue) >= db->sync_ops) {
-               file_process_queue (backend);
-       }
 
-       return;
+       unlink (filebuf);
 }
 
 static void
index 5ef34cbedba80313bc2e46d52ba873e025c798c3..3e4cedcf9e0b9352ca1bdfa5ffcbaf0b48e22713 100644 (file)
@@ -508,11 +508,13 @@ kvstorage_process_command (struct kvstorage_session *session, gboolean is_redis)
                        }
                        if (elt->flags & KV_ELT_INTEGER) {
                                if (!rspamd_dispatcher_write (session->dispather, intbuf, eltlen, TRUE, TRUE)) {
+                                       g_static_rw_lock_reader_unlock (&session->cf->storage->rwlock);
                                        return FALSE;
                                }
                        }
                        else {
-                               if (!rspamd_dispatcher_write (session->dispather, ELT_DATA(elt), eltlen, TRUE, FALSE)) {
+                               if (!rspamd_dispatcher_write (session->dispather, ELT_DATA(elt), eltlen, TRUE, TRUE)) {
+                                       g_static_rw_lock_reader_unlock (&session->cf->storage->rwlock);
                                        return FALSE;
                                }
                        }
@@ -524,6 +526,9 @@ kvstorage_process_command (struct kvstorage_session *session, gboolean is_redis)
                                res = rspamd_dispatcher_write (session->dispather, CRLF,
                                                sizeof (CRLF) - 1, FALSE, TRUE);
                        }
+                       if (!res) {
+                               g_static_rw_lock_reader_unlock (&session->cf->storage->rwlock);
+                       }
 
                        return res;
                }
@@ -890,7 +895,22 @@ kvstorage_write_socket (void *arg)
        struct kvstorage_session                        *session = (struct kvstorage_session *) arg;
 
        if (session->elt) {
+
+               if ((session->elt->flags & KV_ELT_NEED_INSERT) != 0) {
+                       /* Insert to cache and free element */
+                       session->elt->flags &= ~KV_ELT_NEED_INSERT;
+                       g_static_rw_lock_reader_unlock (&session->cf->storage->rwlock);
+                       rspamd_kv_storage_insert_cache (session->cf->storage, ELT_KEY (session->elt),
+                                                       session->elt->keylen, ELT_DATA (session->elt),
+                                                       session->elt->size, session->elt->flags,
+                                                       session->elt->expire, NULL);
+                       g_free (session->elt);
+                       session->elt = NULL;
+                       return TRUE;
+               }
+               g_static_rw_lock_reader_unlock (&session->cf->storage->rwlock);
                session->elt = NULL;
+
        }
 
        return TRUE;
@@ -972,6 +992,7 @@ kvstorage_thread (gpointer ud)
        sigprocmask (SIG_BLOCK, thr->signals, NULL);
        /* Init thread specific events */
        thr->ev_base = event_init ();
+
        event_set (&thr->bind_ev, thr->worker->cf->listen_sock, EV_READ | EV_PERSIST, thr_accept_socket, (void *)thr);
        event_base_set (thr->ev_base, &thr->bind_ev);
        event_add (&thr->bind_ev, NULL);