]> git.ipfire.org Git - thirdparty/rspamd.git/commitdiff
Remove async threads for now.
authorVsevolod Stakhov <vsevolod@highsecure.ru>
Tue, 26 May 2015 08:38:47 +0000 (09:38 +0100)
committerVsevolod Stakhov <vsevolod@highsecure.ru>
Tue, 26 May 2015 08:38:47 +0000 (09:38 +0100)
They are anyway broken in the asynchronous world.

src/libserver/events.c
src/libserver/events.h

index 1b3e845ce2fcfbca136e901dc8b35f589c63a6f3..9af0a84a86790023b43d344710b15a96c498f525 100644 (file)
 #include "main.h"
 #include "events.h"
 
+struct rspamd_async_watcher {
+       event_watcher_t cb;
+       guint remain;
+       gpointer ud;
+};
+
+struct rspamd_async_event {
+       GQuark subsystem;
+       event_finalizer_t fin;
+       void *user_data;
+       struct rspamd_async_watcher *w;
+};
+
+struct rspamd_async_session {
+       session_finalizer_t fin;
+       event_finalizer_t restore;
+       event_finalizer_t cleanup;
+       GHashTable *events;
+       void *user_data;
+       rspamd_mempool_t *pool;
+       struct rspamd_async_watcher *cur_watcher;
+       guint flags;
+};
+
 static gboolean
 rspamd_event_equal (gconstpointer a, gconstpointer b)
 {
@@ -42,27 +66,15 @@ static guint
 rspamd_event_hash (gconstpointer a)
 {
        const struct rspamd_async_event *ev = a;
+       XXH64_state_t st;
 
-       return GPOINTER_TO_UINT (ev->user_data);
-}
+       XXH64_reset (&st, rspamd_hash_seed ());
+       XXH64_update (&st, ev->user_data, sizeof (gpointer));
+       XXH64_update (&st, ev->fin, sizeof (*ev->fin));
 
-#if ((GLIB_MAJOR_VERSION == 2) && (GLIB_MINOR_VERSION <= 30))
-static void
-event_mutex_free (gpointer data)
-{
-       GMutex *mtx = data;
-
-       g_mutex_free (mtx);
+       return XXH64_digest (&st);
 }
 
-static void
-event_cond_free (gpointer data)
-{
-       GCond *cond = data;
-
-       g_cond_free (cond);
-}
-#endif
 
 struct rspamd_async_session *
 new_async_session (rspamd_mempool_t * pool, session_finalizer_t fin,
@@ -70,36 +82,13 @@ new_async_session (rspamd_mempool_t * pool, session_finalizer_t fin,
 {
        struct rspamd_async_session *new;
 
-       new = rspamd_mempool_alloc (pool, sizeof (struct rspamd_async_session));
+       new = rspamd_mempool_alloc0 (pool, sizeof (struct rspamd_async_session));
        new->pool = pool;
        new->fin = fin;
        new->restore = restore;
        new->cleanup = cleanup;
        new->user_data = user_data;
-       new->wanna_die = FALSE;
        new->events = g_hash_table_new (rspamd_event_hash, rspamd_event_equal);
-#if ((GLIB_MAJOR_VERSION == 2) && (GLIB_MINOR_VERSION <= 30))
-       new->mtx = g_mutex_new ();
-       new->cond = g_cond_new ();
-       rspamd_mempool_add_destructor (pool,
-               (rspamd_mempool_destruct_t) event_mutex_free,
-               new->mtx);
-       rspamd_mempool_add_destructor (pool,
-               (rspamd_mempool_destruct_t) event_cond_free,
-               new->cond);
-#else
-       new->mtx = rspamd_mempool_alloc (pool, sizeof (GMutex));
-       g_mutex_init (new->mtx);
-       new->cond = rspamd_mempool_alloc (pool, sizeof (GCond));
-       g_cond_init (new->cond);
-       rspamd_mempool_add_destructor (pool,
-               (rspamd_mempool_destruct_t) g_mutex_clear,
-               new->mtx);
-       rspamd_mempool_add_destructor (pool,
-               (rspamd_mempool_destruct_t) g_cond_clear,
-               new->cond);
-#endif
-       new->threads = 0;
 
        rspamd_mempool_add_destructor (pool,
                (rspamd_mempool_destruct_t) g_hash_table_destroy,
@@ -121,21 +110,26 @@ register_async_event (struct rspamd_async_session *session,
                return;
        }
 
-       g_mutex_lock (session->mtx);
        new = rspamd_mempool_alloc (session->pool,
                        sizeof (struct rspamd_async_event));
        new->fin = fin;
        new->user_data = user_data;
        new->subsystem = subsystem;
 
+       if (session->cur_watcher) {
+               new->w = session->cur_watcher;
+               new->w->remain ++;
+       }
+       else {
+               new->w = NULL;
+       }
+
        g_hash_table_insert (session->events, new, new);
 
        msg_debug ("added event: %p, pending %d events, subsystem: %s",
                user_data,
                g_hash_table_size (session->events),
                g_quark_to_string (subsystem));
-
-       g_mutex_unlock (session->mtx);
 }
 
 void
@@ -150,7 +144,6 @@ remove_normal_event (struct rspamd_async_session *session,
                return;
        }
 
-       g_mutex_lock (session->mtx);
        /* Search for event */
        search_ev.fin = fin;
        search_ev.user_data = ud;
@@ -163,7 +156,6 @@ remove_normal_event (struct rspamd_async_session *session,
                /* Remove event */
                fin (ud);
        }
-       g_mutex_unlock (session->mtx);
 
        check_session_pending (session);
 }
@@ -192,22 +184,10 @@ destroy_session (struct rspamd_async_session *session)
                return FALSE;
        }
 
-       g_mutex_lock (session->mtx);
-       if (session->threads > 0) {
-               /* Wait for conditional variable to finish processing */
-               g_mutex_unlock (session->mtx);
-               g_cond_wait (session->cond, session->mtx);
-       }
-
-       session->wanna_die = TRUE;
-
        g_hash_table_foreach_remove (session->events,
                rspamd_session_destroy,
                session);
 
-       /* Mutex can be destroyed here */
-       g_mutex_unlock (session->mtx);
-
        if (session->cleanup != NULL) {
                session->cleanup (session->user_data);
        }
@@ -217,15 +197,8 @@ destroy_session (struct rspamd_async_session *session)
 gboolean
 check_session_pending (struct rspamd_async_session *session)
 {
-       g_mutex_lock (session->mtx);
-       if (session->wanna_die && g_hash_table_size (session->events) == 0) {
-               session->wanna_die = FALSE;
-               if (session->threads > 0) {
-                       /* Wait for conditional variable to finish processing */
-                       g_cond_wait (session->cond, session->mtx);
-               }
+       if (g_hash_table_size (session->events) == 0) {
                if (session->fin != NULL) {
-                       g_mutex_unlock (session->mtx);
                        if (!session->fin (session->user_data)) {
                                /* Session finished incompletely, perform restoration */
                                if (session->restore != NULL) {
@@ -239,37 +212,9 @@ check_session_pending (struct rspamd_async_session *session)
                                return FALSE;
                        }
                }
-               g_mutex_unlock (session->mtx);
+
                return FALSE;
        }
-       g_mutex_unlock (session->mtx);
-       return TRUE;
-}
-
 
-/**
- * Add new async thread to session
- * @param session session object
- */
-void
-register_async_thread (struct rspamd_async_session *session)
-{
-       g_atomic_int_inc (&session->threads);
-       msg_debug ("added thread: pending %d thread", session->threads);
-}
-
-/**
- * Remove async thread from session and check whether session can be terminated
- * @param session session object
- */
-void
-remove_async_thread (struct rspamd_async_session *session)
-{
-       if (g_atomic_int_dec_and_test (&session->threads)) {
-               /* Signal if there are any sessions waiting */
-               g_mutex_lock (session->mtx);
-               g_cond_signal (session->cond);
-               g_mutex_unlock (session->mtx);
-       }
-       msg_debug ("removed thread: pending %d thread", session->threads);
+       return TRUE;
 }
index e751c1054a312e3735f323ce45323eb881f2ae47..442834f98c601a26a8df731d86a153a663245c17 100644 (file)
 #include "mem_pool.h"
 
 struct rspamd_async_event;
+struct rspamd_async_session;
 
-typedef void (*event_finalizer_t)(void *user_data);
-typedef gboolean (*session_finalizer_t)(void *user_data);
-
-struct rspamd_async_event {
-       GQuark subsystem;
-       event_finalizer_t fin;
-       void *user_data;
-       guint ref;
-};
-
-struct rspamd_async_session {
-       session_finalizer_t fin;
-       event_finalizer_t restore;
-       event_finalizer_t cleanup;
-       GHashTable *events;
-       void *user_data;
-       rspamd_mempool_t *pool;
-       gboolean wanna_die;
-       guint threads;
-       GMutex *mtx;
-       GCond *cond;
-};
+typedef void (*event_finalizer_t)(gpointer ud);
+typedef void (*event_watcher_t)(guint remain, gboolean terminated, gpointer ud);
+typedef gboolean (*session_finalizer_t)(gpointer user_data);
 
 /**
  * Make new async session
@@ -64,7 +46,7 @@ struct rspamd_async_session {
  */
 struct rspamd_async_session * new_async_session (rspamd_mempool_t *pool,
        session_finalizer_t fin, event_finalizer_t restore,
-       event_finalizer_t cleanup, void *user_data);
+       event_finalizer_t cleanup, gpointer user_data);
 
 /**
  * Insert new event to the session
@@ -74,7 +56,7 @@ struct rspamd_async_session * new_async_session (rspamd_mempool_t *pool,
  * @param forced unused
  */
 void register_async_event (struct rspamd_async_session *session,
-       event_finalizer_t fin, void *user_data, GQuark subsystem);
+       event_finalizer_t fin, gpointer user_data, GQuark subsystem);
 
 /**
  * Remove normal event
@@ -84,7 +66,7 @@ void register_async_event (struct rspamd_async_session *session,
  */
 void remove_normal_event (struct rspamd_async_session *session,
        event_finalizer_t fin,
-       void *ud);
+       gpointer ud);
 
 /**
  * Must be called at the end of session, it calls fin functions for all non-forced callbacks
@@ -99,16 +81,4 @@ gboolean destroy_session (struct rspamd_async_session *session);
  */
 gboolean check_session_pending (struct rspamd_async_session *session);
 
-/**
- * Add new async thread to session
- * @param session session object
- */
-void register_async_thread (struct rspamd_async_session *session);
-
-/**
- * Remove async thread from session and check whether session can be terminated
- * @param session session object
- */
-void remove_async_thread (struct rspamd_async_session *session);
-
 #endif /* RSPAMD_EVENTS_H */