enum rspamd_fuzzy_backend_type type;
gdouble expire;
gdouble sync;
- struct ev_loop *ev_base;
+ struct ev_loop *event_loop;
rspamd_fuzzy_periodic_cb periodic_cb;
void *periodic_ud;
const struct rspamd_fuzzy_backend_subr *subr;
void *subr_ud;
- struct event periodic_event;
+ ev_timer periodic_event;
};
static GQuark
}
bk = g_malloc0 (sizeof (*bk));
- bk->ev_base = ev_base;
+ bk->event_loop = ev_base;
bk->expire = expire;
bk->type = type;
bk->subr = &fuzzy_subrs[type];
}
static void
-rspamd_fuzzy_backend_periodic_cb (gint fd, short what, void *ud)
+rspamd_fuzzy_backend_periodic_cb (EV_P_ ev_timer *w, int revents)
{
- struct rspamd_fuzzy_backend *bk = ud;
+ struct rspamd_fuzzy_backend *bk = (struct rspamd_fuzzy_backend *)w->data;
gdouble jittered;
- struct timeval tv;
jittered = rspamd_time_jitter (bk->sync, bk->sync / 2.0);
- double_to_tv (jittered, &tv);
- event_del (&bk->periodic_event);
+ w->repeat = jittered;
rspamd_fuzzy_backend_periodic_sync (bk);
- event_add (&bk->periodic_event, &tv);
+ ev_timer_again (EV_A_ w);
}
void
void *ud)
{
gdouble jittered;
- struct timeval tv;
g_assert (bk != NULL);
if (bk->subr->periodic) {
if (bk->sync > 0.0) {
- event_del (&bk->periodic_event);
+ ev_timer_stop (bk->event_loop, &bk->periodic_event);
}
if (cb) {
rspamd_fuzzy_backend_periodic_sync (bk);
bk->sync = timeout;
jittered = rspamd_time_jitter (timeout, timeout / 2.0);
- double_to_tv (jittered, &tv);
- event_set (&bk->periodic_event, -1, EV_TIMEOUT,
- rspamd_fuzzy_backend_periodic_cb, bk);
- event_base_set (bk->ev_base, &bk->periodic_event);
- event_add (&bk->periodic_event, &tv);
+
+ bk->periodic_event.data = bk;
+ ev_timer_init (&bk->periodic_event, rspamd_fuzzy_backend_periodic_cb,
+ jittered, 0.0);
+ ev_timer_start (bk->event_loop, &bk->periodic_event);
}
}
if (bk->sync > 0.0) {
rspamd_fuzzy_backend_periodic_sync (bk);
- event_del (&bk->periodic_event);
+ ev_timer_stop (bk->event_loop, &bk->periodic_event);
}
bk->subr->close (bk, bk->subr_ud);
struct ev_loop*
rspamd_fuzzy_backend_event_base (struct rspamd_fuzzy_backend *backend)
{
- return backend->ev_base;
+ return backend->event_loop;
}
gdouble
event_set (&priv->ev, priv->fd, what, rspamd_milter_io_handler,
session);
- event_base_set (priv->ev_base, &priv->ev);
+ event_base_set (priv->event_loop, &priv->ev);
event_add (&priv->ev, priv->ptv);
}
priv->err_cb = error_cb;
priv->parser.state = st_len_1;
priv->parser.buf = rspamd_fstring_sized_new (RSPAMD_MILTER_MESSAGE_CHUNK + 5);
- priv->ev_base = ev_base;
+ priv->event_loop = ev_base;
priv->state = RSPAMD_MILTER_READ_MORE;
priv->pool = rspamd_mempool_new (rspamd_mempool_suggest_size (), "milter");
priv->discard_on_reject = milter_ctx->discard_on_reject;
#include "contrib/libev/ev.h"
#include "khash.h"
#include "libutil/str_util.h"
+#include "libutil/libev_helper.h"
enum rspamd_milter_state {
st_len_1 = 0,
struct rspamd_milter_private {
struct rspamd_milter_parser parser;
- struct event ev;
- struct timeval tv;
+ struct rspamd_io_ev ev;
struct rspamd_milter_outbuf *out_chain;
struct timeval *ptv;
- struct ev_loop *ev_base;
+ struct ev_loop *event_loop;
rspamd_mempool_t *pool;
khash_t(milter_headers_hash_t) *headers;
gint cur_hdr;
struct rspamd_monitored_ctx {
struct rspamd_config *cfg;
struct rdns_resolver *resolver;
- struct ev_loop *ev_base;
+ struct ev_loop *event_loop;
GPtrArray *elts;
GHashTable *helts;
mon_change_cb change_cb;
enum rspamd_monitored_flags flags;
struct rspamd_monitored_ctx *ctx;
struct rspamd_monitored_methods proc;
- struct event periodic;
+ ev_timer periodic;
gchar tag[RSPAMD_MONITORED_TAG_LEN];
};
}
static void
-rspamd_monitored_periodic (gint fd, short what, gpointer ud)
+rspamd_monitored_periodic (EV_P_ ev_timer *w, int revents)
{
- struct rspamd_monitored *m = ud;
+ struct rspamd_monitored *m = (struct rspamd_monitored *)w->data;
struct timeval tv;
gdouble jittered;
gboolean ret = FALSE;
}
if (ret) {
- event_add (&m->periodic, &tv);
+ m->periodic.repeat = jittered;
+ ev_timer_again (EV_A_ &m->periodic);
}
}
guint i;
g_assert (ctx != NULL);
- ctx->ev_base = ev_base;
+ ctx->event_loop = ev_base;
ctx->resolver = resolver;
ctx->cfg = cfg;
ctx->initialized = TRUE;
struct ev_loop *
rspamd_monitored_ctx_get_ev_base (struct rspamd_monitored_ctx *ctx)
{
- return ctx->ev_base;
+ return ctx->event_loop;
}
g_ptr_array_add (ctx->elts, m);
- if (ctx->ev_base) {
+ if (ctx->event_loop) {
rspamd_monitored_start (m);
}
{
g_assert (m != NULL);
- if (rspamd_event_pending (&m->periodic, EV_TIMEOUT)) {
- event_del (&m->periodic);
- }
+ ev_timer_stop (m->ctx->event_loop, &m->periodic);
}
void
rspamd_monitored_start (struct rspamd_monitored *m)
{
- struct timeval tv;
gdouble jittered;
g_assert (m != NULL);
msg_debug_mon ("started monitored object %s", m->url);
jittered = rspamd_time_jitter (m->ctx->monitoring_interval * m->monitoring_mult,
0.0);
- double_to_tv (jittered, &tv);
- if (rspamd_event_pending (&m->periodic, EV_TIMEOUT)) {
- event_del (&m->periodic);
+ if (ev_is_active (&m->periodic)) {
+ ev_timer_stop (m->ctx->event_loop, &m->periodic);
}
- event_set (&m->periodic, -1, EV_TIMEOUT, rspamd_monitored_periodic, m);
- event_base_set (m->ctx->ev_base, &m->periodic);
- event_add (&m->periodic, &tv);
+ m->periodic.data = m;
+ ev_timer_init (&m->periodic, rspamd_monitored_periodic, jittered, 0.0);
+ ev_timer_start (m->ctx->event_loop, &m->periodic);
}
void
}
void
-rspamd_protocol_write_reply (struct rspamd_task *task)
+rspamd_protocol_write_reply (struct rspamd_task *task, ev_tstamp timeout)
{
struct rspamd_http_message *msg;
const gchar *ctype = "application/json";
msg->flags |= RSPAMD_HTTP_FLAG_SPAMC;
}
- msg->date = time (NULL);
+ msg->date = ev_time ();
msg_debug_protocol ("writing reply to client");
if (task->err != NULL) {
rspamd_http_connection_reset (task->http_conn);
rspamd_http_connection_write_message (task->http_conn, msg, NULL,
- ctype, task, &task->tv);
+ ctype, task, timeout);
task->processed_stages |= RSPAMD_TASK_STAGE_REPLIED;
}
* @param task task object
* @return 0 if we wrote reply and -1 if there was some error
*/
-void rspamd_protocol_write_reply (struct rspamd_task *task);
+void rspamd_protocol_write_reply (struct rspamd_task *task, ev_tstamp timeout);
/**
* Convert rspamd output to legacy protocol reply
rspamd_strlcpy (row->from_addr, "unknown", sizeof (row->from_addr));
}
- memcpy (&row->tv, &task->tv, sizeof (row->tv));
+ row->timestamp = task->task_timestamp;
/* Strings */
rspamd_strlcpy (row->message_id, task->message_id,
}
}
- row->scan_time = task->time_real_finish - task->time_real;
+ row->scan_time = task->time_real_finish - task->task_timestamp;
row->len = task->msg.len;
g_atomic_int_set (&row->completed, TRUE);
}
elt = ucl_object_lookup (cur, "time");
if (elt && ucl_object_type (elt) == UCL_FLOAT) {
- double_to_tv (ucl_object_todouble (elt), &row->tv);
+ row->timestamp = ucl_object_todouble (elt);
}
elt = ucl_object_lookup (cur, "id");
elt = ucl_object_typed_new (UCL_OBJECT);
- ucl_object_insert_key (elt, ucl_object_fromdouble (
- tv_to_double (&row->tv)), "time", 0, false);
+ ucl_object_insert_key (elt, ucl_object_fromdouble (row->timestamp),
+ "time", 0, false);
ucl_object_insert_key (elt, ucl_object_fromstring (row->message_id),
"id", 0, false);
ucl_object_insert_key (elt, ucl_object_fromstring (row->symbols),
struct rspamd_config;
struct roll_history_row {
- struct timeval tv;
+ ev_tstamp timestamp;
gchar message_id[HISTORY_MAX_ID];
gchar symbols[HISTORY_MAX_SYMBOLS];
gchar user[HISTORY_MAX_USER];
}
new_task->event_loop = ev_base;
-
-#ifdef HAVE_EVENT_NO_CACHE_TIME_FUNC
- if (ev_base) {
- event_base_update_cache_time (ev_base);
- event_base_gettimeofday_cached (ev_base, &new_task->tv);
- new_task->time_real = tv_to_double (&new_task->tv);
- }
- else {
- gettimeofday (&new_task->tv, NULL);
- new_task->time_real = tv_to_double (&new_task->tv);
- }
-#else
- gettimeofday (&new_task->tv, NULL);
- new_task->time_real = tv_to_double (&new_task->tv);
-#endif
-
- new_task->time_virtual = rspamd_get_virtual_ticks ();
+ new_task->task_timestamp = ev_time ();
+ new_task->time_virtual = ev_now (ev_base);
new_task->time_real_finish = NAN;
new_task->time_virtual_finish = NAN;
static void
rspamd_task_reply (struct rspamd_task *task)
{
+ const ev_tstamp write_timeout = 2.0;
+
if (task->fin_callback) {
task->fin_callback (task, task->fin_arg);
}
else {
- rspamd_protocol_write_reply (task);
+ rspamd_protocol_write_reply (task, write_timeout);
}
}
var.begin = numbuf;
break;
case RSPAMD_LOG_TIME_REAL:
- var.begin = rspamd_log_check_time (task->time_real,
+ var.begin = rspamd_log_check_time (task->task_timestamp,
task->time_real_finish,
task->cfg->clock_res);
var.len = strlen (var.begin);
gboolean
rspamd_task_set_finish_time (struct rspamd_task *task)
{
- struct timeval tv;
-
if (isnan (task->time_real_finish)) {
-
-#ifdef HAVE_EVENT_NO_CACHE_TIME_FUNC
- if (task->ev_base) {
- event_base_update_cache_time (task->ev_base);
- event_base_gettimeofday_cached (task->ev_base, &tv);
- task->time_real_finish = tv_to_double (&tv);
- }
- else {
- gettimeofday (&tv, NULL);
- task->time_real_finish = tv_to_double (&tv);
- }
-#else
- gettimeofday (&tv, NULL);
- task->time_real_finish = tv_to_double (&tv);
-#endif
- task->time_virtual_finish = rspamd_get_virtual_ticks ();
+ task->time_real_finish = ev_time ();
+ task->time_virtual_finish = ev_now (task->event_loop);
return TRUE;
}
struct rspamd_config *cfg; /**< pointer to config object */
GError *err;
rspamd_mempool_t *task_pool; /**< memory pool for task */
- double time_real;
double time_virtual;
double time_real_finish;
double time_virtual_finish;
- struct timeval tv;
+ ev_tstamp task_timestamp;
gboolean (*fin_callback)(struct rspamd_task *task, void *arg);
/**< callback for filters finalizing */
void *fin_arg; /**< argument for fin callback */
}
/* Get GMT date and store it to time_t */
if (type == DATE_CONNECT || type == DATE_CONNECT_STRING) {
- tim = (tv_to_msec (&task->tv)) / 1000.;
+ tim = task->task_timestamp;
if (!gmt) {
struct tm t;
{
LUA_TRACE_POINT;
struct rspamd_task *task = lua_check_task (L, 1);
+ struct timeval tv;
if (task != NULL) {
+ double_to_tv (task->task_timestamp, &tv);
lua_createtable (L, 0, 2);
lua_pushstring (L, "tv_sec");
- lua_pushinteger (L, (lua_Integer)task->tv.tv_sec);
+ lua_pushinteger (L, (lua_Integer)tv.tv_sec);
lua_settable (L, -3);
lua_pushstring (L, "tv_usec");
- lua_pushinteger (L, (lua_Integer)task->tv.tv_usec);
+ lua_pushinteger (L, (lua_Integer)tv.tv_usec);
lua_settable (L, -3);
}
else {
}
rspamd_task_set_finish_time (task);
- lua_pushnumber (L, task->time_real_finish - task->time_real);
+ lua_pushnumber (L, task->time_real_finish - task->task_timestamp);
lua_pushnumber (L, task->time_virtual_finish - task->time_virtual);
if (!set) {
* lru hash owns this object now
*/
rspamd_lru_hash_insert (dkim_module_ctx->dkim_hash,
- g_strdup (rspamd_dkim_get_dns_key (ctx)),
- key, res->task->tv.tv_sec, rspamd_dkim_key_get_ttl (key));
+ g_strdup (rspamd_dkim_get_dns_key (ctx)),
+ key, res->task->task_timestamp, rspamd_dkim_key_get_ttl (key));
/* Release key when task is processed */
rspamd_mempool_add_destructor (res->task->task_pool,
dkim_module_key_dtor, res->key);
key = rspamd_lru_hash_lookup (dkim_module_ctx->dkim_hash,
rspamd_dkim_get_dns_key (ctx),
- task->tv.tv_sec);
+ task->task_timestamp);
if (key != NULL) {
cur->key = rspamd_dkim_key_ref (key);
*/
rspamd_lru_hash_insert (dkim_module_ctx->dkim_hash,
g_strdup (rspamd_dkim_get_dns_key (ctx)),
- key, cbd->task->tv.tv_sec, rspamd_dkim_key_get_ttl (key));
+ key, cbd->task->task_timestamp, rspamd_dkim_key_get_ttl (key));
/* Release key when task is processed */
rspamd_mempool_add_destructor (cbd->task->task_pool,
dkim_module_key_dtor, cbd->key);
key = rspamd_lru_hash_lookup (dkim_module_ctx->dkim_hash,
rspamd_dkim_get_dns_key (ctx),
- task->tv.tv_sec);
+ task->task_timestamp);
if (key != NULL) {
cbd->key = rspamd_dkim_key_ref (key);
spf_record_ref (record);
if ((l = rspamd_lru_hash_lookup (spf_module_ctx->spf_hash,
- record->domain, task->tv.tv_sec)) == NULL) {
+ record->domain, task->task_timestamp)) == NULL) {
l = record;
if (record->ttl > 0 &&
rspamd_lru_hash_insert (spf_module_ctx->spf_hash,
record->domain, spf_record_ref (l),
- task->tv.tv_sec, record->ttl);
+ task->task_timestamp, record->ttl);
msg_info_task ("stored record for %s (0x%xuL) in LRU cache for %d seconds, "
"%d/%d elements in the cache",
if (domain) {
if ((l =
rspamd_lru_hash_lookup (spf_module_ctx->spf_hash, domain,
- task->tv.tv_sec)) != NULL) {
+ task->task_timestamp)) != NULL) {
spf_record_ref (l);
spf_check_list (l, task, TRUE);
spf_record_unref (l);
NULL,
"application/json",
task,
- &task->tv);
+ 1.0);
}
}