#include "config.h"
#include "libutil/util.h"
#include "libutil/multipattern.h"
+#include "libutil/ref.h"
#include "libserver/cfg_file.h"
#include "libserver/cfg_rcl.h"
#include "libserver/worker_util.h"
struct rspamd_multipattern_pending *pending;
unsigned int count;
unsigned int idx;
+ gboolean compile_cb_called;
ev_timer deferred_timer; /* Timer for deferring next operation after error */
+ ref_entry_t ref;
};
+static void
+rspamd_hs_helper_mp_async_ctx_dtor(void *p)
+{
+ struct rspamd_hs_helper_mp_async_ctx *mpctx = p;
+
+ /* Stop the deferred timer if it's somehow still active */
+ if (ev_is_active(&mpctx->deferred_timer)) {
+ ev_timer_stop(mpctx->ctx->event_loop, &mpctx->deferred_timer);
+ }
+ rspamd_multipattern_clear_pending();
+ g_free(mpctx);
+}
+
static void
rspamd_hs_helper_mp_send_notification(struct hs_helper_ctx *ctx,
struct rspamd_worker *worker,
void *ud)
{
struct rspamd_hs_helper_mp_async_ctx *mpctx = ud;
- struct rspamd_multipattern_pending *entry = &mpctx->pending[mpctx->idx];
+ struct rspamd_multipattern_pending *entry;
(void) mp;
+
+ if (mpctx->compile_cb_called) {
+ REF_RELEASE(mpctx);
+ return;
+ }
+ mpctx->compile_cb_called = TRUE;
+
+ entry = &mpctx->pending[mpctx->idx];
rspamd_worker_set_busy(mpctx->worker, mpctx->ctx->event_loop, NULL);
if (!success) {
mpctx->deferred_timer.data = mpctx;
ev_timer_init(&mpctx->deferred_timer, rspamd_hs_helper_mp_deferred_next_cb, 0.0, 0.0);
ev_timer_start(mpctx->ctx->event_loop, &mpctx->deferred_timer);
+
+ REF_RELEASE(mpctx);
}
static void
struct rspamd_hs_helper_mp_async_ctx *mpctx = ud;
struct rspamd_multipattern_pending *entry = &mpctx->pending[mpctx->idx];
bool exists = (success && data == NULL && len == 1);
+ /*
+ * Save entry data before any operation that might trigger ev_run,
+ * as ev_run could process deferred timers that call
+ * rspamd_multipattern_clear_pending() and free the pending array.
+ */
+ struct rspamd_multipattern *mp = entry->mp;
+ const char *entry_name = entry->name;
(void) error;
if (exists) {
- msg_debug_hyperscan("multipattern cache already exists for '%s', skipping compilation", entry->name);
- rspamd_hs_helper_mp_send_notification(mpctx->ctx, mpctx->worker, entry->name);
+ msg_debug_hyperscan("multipattern cache already exists for '%s', skipping compilation", entry_name);
+ rspamd_hs_helper_mp_send_notification(mpctx->ctx, mpctx->worker, entry_name);
mpctx->idx++;
/*
* Defer the next operation to avoid nested Redis callbacks.
mpctx->deferred_timer.data = mpctx;
ev_timer_init(&mpctx->deferred_timer, rspamd_hs_helper_mp_deferred_next_cb, 0.0, 0.0);
ev_timer_start(mpctx->ctx->event_loop, &mpctx->deferred_timer);
+ REF_RELEASE(mpctx);
return;
}
rspamd_worker_set_busy(mpctx->worker, mpctx->ctx->event_loop, "compile multipattern");
/* Flush the busy notification before blocking on compilation */
ev_run(mpctx->ctx->event_loop, EVRUN_NOWAIT);
- rspamd_multipattern_compile_hs_to_cache_async(entry->mp, mpctx->ctx->hs_dir,
+ /* Use saved mp pointer - pending array may have been freed by ev_run */
+ mpctx->compile_cb_called = FALSE;
+ REF_RETAIN(mpctx);
+ rspamd_multipattern_compile_hs_to_cache_async(mp, mpctx->ctx->hs_dir,
mpctx->ctx->event_loop,
rspamd_hs_helper_mp_compiled_cb, mpctx);
}
char cache_key[rspamd_cryptobox_HASHBYTES * 2 + 1];
rspamd_snprintf(cache_key, sizeof(cache_key), "%*xs",
(int) sizeof(entry->hash) / 2, entry->hash);
+ REF_RETAIN(mpctx);
rspamd_hs_cache_lua_exists_async(cache_key, entry->name,
rspamd_hs_helper_mp_exists_cb, mpctx);
return;
}
done:
- /* Stop the deferred timer if it's somehow still active */
- if (ev_is_active(&mpctx->deferred_timer)) {
- ev_timer_stop(mpctx->ctx->event_loop, &mpctx->deferred_timer);
- }
- rspamd_multipattern_clear_pending();
- for (unsigned int i = 0; i < mpctx->count; i++) {
- /* names are freed by clear_pending */
- (void) i;
- }
- g_free(mpctx);
+ REF_RELEASE(mpctx);
}
static void
mpctx->pending = pending;
mpctx->count = count;
mpctx->idx = 0;
+ REF_INIT_RETAIN(mpctx, rspamd_hs_helper_mp_async_ctx_dtor);
rspamd_hs_helper_compile_pending_multipatterns_next(mpctx);
}
struct rspamd_regexp_map_pending *pending;
unsigned int count;
unsigned int idx;
+ gboolean compile_cb_called;
+ ref_entry_t ref;
};
static void rspamd_hs_helper_compile_pending_regexp_maps_next(struct rspamd_hs_helper_remap_async_ctx *rmctx);
+static void
+rspamd_hs_helper_remap_async_ctx_dtor(void *p)
+{
+ struct rspamd_hs_helper_remap_async_ctx *rmctx = p;
+ rspamd_regexp_map_clear_pending();
+ g_free(rmctx);
+}
+
static void
rspamd_hs_helper_remap_send_notification(struct hs_helper_ctx *ctx,
struct rspamd_worker *worker,
void *ud)
{
struct rspamd_hs_helper_remap_async_ctx *rmctx = ud;
- struct rspamd_regexp_map_pending *entry = &rmctx->pending[rmctx->idx];
+ struct rspamd_regexp_map_pending *entry;
(void) re_map;
+
+ if (rmctx->compile_cb_called) {
+ REF_RELEASE(rmctx);
+ return;
+ }
+ rmctx->compile_cb_called = TRUE;
+
+ entry = &rmctx->pending[rmctx->idx];
rspamd_worker_set_busy(rmctx->worker, rmctx->ctx->event_loop, NULL);
if (!success) {
rmctx->idx++;
rspamd_hs_helper_compile_pending_regexp_maps_next(rmctx);
+ REF_RELEASE(rmctx);
}
static void
struct rspamd_hs_helper_remap_async_ctx *rmctx = ud;
struct rspamd_regexp_map_pending *entry = &rmctx->pending[rmctx->idx];
bool exists = (success && data == NULL && len == 1);
+ /*
+ * Save entry data before any operation that might trigger ev_run,
+ * as ev_run could process deferred timers that call
+ * rspamd_regexp_map_clear_pending() and free the pending array.
+ */
+ struct rspamd_regexp_map_helper *re_map = entry->re_map;
+ const char *entry_name = entry->name;
(void) error;
if (exists) {
- msg_debug_hyperscan("regexp map cache already exists for '%s', skipping compilation", entry->name);
- rspamd_hs_helper_remap_send_notification(rmctx->ctx, rmctx->worker, entry->name);
+ msg_debug_hyperscan("regexp map cache already exists for '%s', skipping compilation", entry_name);
+ rspamd_hs_helper_remap_send_notification(rmctx->ctx, rmctx->worker, entry_name);
rmctx->idx++;
rspamd_hs_helper_compile_pending_regexp_maps_next(rmctx);
+ REF_RELEASE(rmctx);
return;
}
rspamd_worker_set_busy(rmctx->worker, rmctx->ctx->event_loop, "compile regexp map");
/* Flush the busy notification before blocking on compilation */
ev_run(rmctx->ctx->event_loop, EVRUN_NOWAIT);
- rspamd_regexp_map_compile_hs_to_cache_async(entry->re_map, rmctx->ctx->hs_dir,
+ /* Use saved re_map pointer - pending array may have been freed by ev_run */
+ rmctx->compile_cb_called = FALSE;
+ REF_RETAIN(rmctx);
+ rspamd_regexp_map_compile_hs_to_cache_async(re_map, rmctx->ctx->hs_dir,
rmctx->ctx->event_loop,
rspamd_hs_helper_remap_compiled_cb, rmctx);
}
char cache_key[rspamd_cryptobox_HASHBYTES * 2 + 1];
rspamd_snprintf(cache_key, sizeof(cache_key), "%*xs",
(int) sizeof(entry->hash) / 2, entry->hash);
+ REF_RETAIN(rmctx);
rspamd_hs_cache_lua_exists_async(cache_key, entry->name,
rspamd_hs_helper_remap_exists_cb, rmctx);
return;
}
done:
- rspamd_regexp_map_clear_pending();
- g_free(rmctx);
+ REF_RELEASE(rmctx);
}
static void
rmctx->pending = pending;
rmctx->count = count;
rmctx->idx = 0;
+ REF_INIT_RETAIN(rmctx, rspamd_hs_helper_remap_async_ctx_dtor);
rspamd_hs_helper_compile_pending_regexp_maps_next(rmctx);
}
gboolean silent;
unsigned int total;
struct rspamd_worker *worker;
+ struct ev_loop *event_loop;
+ ev_timer *timer;
void (*cb)(unsigned int ncompiled, GError *err, void *cbd);
/* Async state */
struct rspamd_re_class *current_class;
enum rspamd_re_cache_compile_state state;
+ ref_entry_t ref;
};
struct rspamd_re_cache_async_ctx {
struct ev_loop *loop;
ev_timer *w;
int n;
+ gboolean callback_processed;
};
static void
rspamd_re_cache_compile_err(EV_P_ ev_timer *w, GError *err,
struct rspamd_re_cache_hs_compile_cbdata *cbdata, bool is_fatal);
+static void
+rspamd_re_cache_hs_compile_cbdata_dtor(void *p)
+{
+ struct rspamd_re_cache_hs_compile_cbdata *cbdata = p;
+
+ if (cbdata->timer && ev_is_active(cbdata->timer)) {
+ ev_timer_stop(cbdata->event_loop, cbdata->timer);
+ }
+ rspamd_heap_destroy(re_compile_queue, &cbdata->compile_queue);
+ g_free(cbdata->timer);
+ g_free(cbdata);
+}
+
static void
rspamd_re_cache_exists_cb(gboolean success, const unsigned char *data, gsize len, const char *err, void *ud)
{
struct rspamd_re_cache_async_ctx *ctx = ud;
- struct rspamd_re_cache_hs_compile_cbdata *cbdata = ctx->cbdata;
+ struct rspamd_re_cache_hs_compile_cbdata *cbdata;
const gboolean lua_backend = rspamd_hs_cache_has_lua_backend();
char path[PATH_MAX];
+ if (ctx->callback_processed) {
+ return;
+ }
+ ctx->callback_processed = TRUE;
+ cbdata = ctx->cbdata;
+
+ if (cbdata->worker && cbdata->worker->state != rspamd_worker_state_running) {
+ g_free(ctx);
+ REF_RELEASE(cbdata);
+ return;
+ }
+
if (success && len > 0) {
/* Exists */
struct rspamd_re_class *re_class = cbdata->current_class;
cbdata->state = RSPAMD_RE_CACHE_COMPILE_STATE_COMPILING;
}
- ev_timer_again(ctx->loop, ctx->w);
+ ev_timer_again(cbdata->event_loop, cbdata->timer);
g_free(ctx);
+ REF_RELEASE(cbdata);
}
static void
rspamd_re_cache_save_cb(gboolean success, const unsigned char *data, gsize len, const char *err, void *ud)
{
struct rspamd_re_cache_async_ctx *ctx = ud;
- struct rspamd_re_cache_hs_compile_cbdata *cbdata = ctx->cbdata;
+ struct rspamd_re_cache_hs_compile_cbdata *cbdata;
+
+ if (ctx->callback_processed) {
+ return;
+ }
+ ctx->callback_processed = TRUE;
+ cbdata = ctx->cbdata;
+
+ if (cbdata->worker && cbdata->worker->state != rspamd_worker_state_running) {
+ g_free(ctx);
+ REF_RELEASE(cbdata);
+ return;
+ }
if (!success) {
GError *gerr = g_error_new(rspamd_re_cache_quark(), EINVAL,
"backend save failed: %s", err ? err : "unknown error");
- rspamd_re_cache_compile_err(ctx->loop, ctx->w, gerr, cbdata, false);
+ rspamd_re_cache_compile_err(cbdata->event_loop, cbdata->timer, gerr, cbdata, false);
}
else {
struct rspamd_re_class *re_class = cbdata->current_class;
cbdata->state = RSPAMD_RE_CACHE_COMPILE_STATE_INIT;
cbdata->current_class = NULL;
- ev_timer_again(ctx->loop, ctx->w);
+ ev_timer_again(cbdata->event_loop, cbdata->timer);
g_free(ctx);
+ REF_RELEASE(cbdata);
}
static void
{
if (is_fatal) {
cbdata->cb(cbdata->total, err, cbdata->cbd);
- ev_timer_stop(EV_A_ w);
- g_free(w);
- g_free(cbdata);
+ REF_RELEASE(cbdata);
}
else {
msg_err("hyperscan compilation error: %s", err->message);
- /* Continue compilation */
cbdata->state = RSPAMD_RE_CACHE_COMPILE_STATE_INIT;
cbdata->current_class = NULL;
ev_timer_again(EV_A_ w);
/* Stop if worker is terminating */
if (cbdata->worker && cbdata->worker->state != rspamd_worker_state_running) {
- ev_timer_stop(EV_A_ w);
cbdata->cb(cbdata->total, NULL, cbdata->cbd);
- rspamd_heap_destroy(re_compile_queue, &cbdata->compile_queue);
- g_free(w);
- g_free(cbdata);
-
+ REF_RELEASE(cbdata);
return;
}
rspamd_heap_pop(re_compile_queue, &cbdata->compile_queue);
if (elt == NULL) {
/* All done */
- ev_timer_stop(EV_A_ w);
cbdata->cb(cbdata->total, NULL, cbdata->cbd);
- rspamd_heap_destroy(re_compile_queue, &cbdata->compile_queue);
- g_free(w);
- g_free(cbdata);
-
+ REF_RELEASE(cbdata);
return;
}
rspamd_snprintf(entity_name, sizeof(entity_name), "re_class:%s",
rspamd_re_cache_type_to_string(re_class->type));
}
+ REF_RETAIN(cbdata);
rspamd_hs_cache_lua_exists_async(re_class->hash, entity_name, rspamd_re_cache_exists_cb, ctx);
- /* Don't stop timer here - the callback (rspamd_re_cache_exists_cb) handles
- * restarting the timer. For file backend the callback runs synchronously
- * within exists_async, so stopping here would undo the timer restart. */
return;
}
rspamd_snprintf(entity_name, sizeof(entity_name), "re_class:%s",
rspamd_re_cache_type_to_string(re_class->type));
}
+ REF_RETAIN(cbdata);
rspamd_hs_cache_lua_save_async(re_class->hash, entity_name, combined, total_len, rspamd_re_cache_save_cb, ctx);
g_free(combined);
CLEANUP_ALLOCATED(false);
g_free(hs_serialized);
- /* Don't stop timer here - the callback (rspamd_re_cache_save_cb) handles
- * restarting the timer. For file backend the callback runs synchronously
- * within save_async, so stopping here would undo the timer restart. */
return;
}
else {
cbdata->silent = silent;
cbdata->total = 0;
cbdata->worker = worker;
+ cbdata->event_loop = event_loop;
timer = g_malloc0(sizeof(*timer));
timer->data = (void *) cbdata;
+ cbdata->timer = timer;
+ REF_INIT_RETAIN(cbdata, rspamd_re_cache_hs_compile_cbdata_dtor);
ev_timer_init(timer, rspamd_re_cache_compile_timer_cb,
timer_interval, timer_interval);